blob: 9c4878dfc141e3ef993534139d25387800a0fe7c [file] [log] [blame]
//! An implementation of IPC locks, guaranteed to be released if a process dies
//!
//! This module implements a locking server/client where the main `cargo fix`
//! process will start up a server and then all the client processes will
//! connect to it. The main purpose of this file is to enusre that each crate
//! (aka file entry point) is only fixed by one process at a time, currently
//! concurrent fixes can't happen.
//!
//! The basic design here is to use a TCP server which is pretty portable across
//! platforms. For simplicity it just uses threads as well. Clients connect to
//! the main server, inform the server what its name is, and then wait for the
//! server to give it the lock (aka write a byte).
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use failure::{Error, ResultExt};
pub struct LockServer {
listener: TcpListener,
addr: SocketAddr,
threads: HashMap<String, ServerClient>,
done: Arc<AtomicBool>,
}
pub struct LockServerStarted {
done: Arc<AtomicBool>,
addr: SocketAddr,
thread: Option<JoinHandle<()>>,
}
pub struct LockServerClient {
_socket: TcpStream,
}
struct ServerClient {
thread: Option<JoinHandle<()>>,
lock: Arc<Mutex<(bool, Vec<TcpStream>)>>,
}
impl LockServer {
pub fn new() -> Result<LockServer, Error> {
let listener = TcpListener::bind("127.0.0.1:0")
.with_context(|_| "failed to bind TCP listener to manage locking")?;
let addr = listener.local_addr()?;
Ok(LockServer {
listener,
addr,
threads: HashMap::new(),
done: Arc::new(AtomicBool::new(false)),
})
}
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
pub fn start(self) -> Result<LockServerStarted, Error> {
let addr = self.addr;
let done = self.done.clone();
let thread = thread::spawn(|| {
self.run();
});
Ok(LockServerStarted {
addr,
thread: Some(thread),
done,
})
}
fn run(mut self) {
while let Ok((client, _)) = self.listener.accept() {
if self.done.load(Ordering::SeqCst) {
break;
}
// Learn the name of our connected client to figure out if it needs
// to wait for another process to release the lock.
let mut client = BufReader::new(client);
let mut name = String::new();
if client.read_line(&mut name).is_err() {
continue;
}
let client = client.into_inner();
// If this "named mutex" is already registered and the thread is
// still going, put it on the queue. Otherwise wait on the previous
// thread and we'll replace it just below.
if let Some(t) = self.threads.get_mut(&name) {
let mut state = t.lock.lock().unwrap();
if state.0 {
state.1.push(client);
continue;
}
drop(t.thread.take().unwrap().join());
}
let lock = Arc::new(Mutex::new((true, vec![client])));
let lock2 = lock.clone();
let thread = thread::spawn(move || {
loop {
let mut client = {
let mut state = lock2.lock().unwrap();
if state.1.is_empty() {
state.0 = false;
break;
} else {
state.1.remove(0)
}
};
// Inform this client that it now has the lock and wait for
// it to disconnect by waiting for EOF.
if client.write_all(&[1]).is_err() {
continue;
}
let mut dst = Vec::new();
drop(client.read_to_end(&mut dst));
}
});
self.threads.insert(
name,
ServerClient {
thread: Some(thread),
lock,
},
);
}
}
}
impl Drop for LockServer {
fn drop(&mut self) {
for (_, mut client) in self.threads.drain() {
if let Some(thread) = client.thread.take() {
drop(thread.join());
}
}
}
}
impl Drop for LockServerStarted {
fn drop(&mut self) {
self.done.store(true, Ordering::SeqCst);
// Ignore errors here as this is largely best-effort
if TcpStream::connect(&self.addr).is_err() {
return;
}
drop(self.thread.take().unwrap().join());
}
}
impl LockServerClient {
pub fn lock(addr: &SocketAddr, name: impl AsRef<[u8]>) -> Result<LockServerClient, Error> {
let mut client = TcpStream::connect(&addr)
.with_context(|_| "failed to connect to parent lock server")?;
client
.write_all(name.as_ref())
.and_then(|_| client.write_all(b"\n"))
.with_context(|_| "failed to write to lock server")?;
let mut buf = [0];
client
.read_exact(&mut buf)
.with_context(|_| "failed to acquire lock")?;
Ok(LockServerClient { _socket: client })
}
}