|  | //! [Unlock Notification](http://sqlite.org/unlock_notify.html) | 
|  |  | 
|  | use std::os::raw::c_int; | 
|  | use std::os::raw::c_void; | 
|  | use std::panic::catch_unwind; | 
|  | use std::sync::{Condvar, Mutex}; | 
|  |  | 
|  | use crate::ffi; | 
|  |  | 
|  | struct UnlockNotification { | 
|  | cond: Condvar,      // Condition variable to wait on | 
|  | mutex: Mutex<bool>, // Mutex to protect structure | 
|  | } | 
|  |  | 
|  | #[allow(clippy::mutex_atomic)] | 
|  | impl UnlockNotification { | 
|  | fn new() -> UnlockNotification { | 
|  | UnlockNotification { | 
|  | cond: Condvar::new(), | 
|  | mutex: Mutex::new(false), | 
|  | } | 
|  | } | 
|  |  | 
|  | fn fired(&self) { | 
|  | let mut flag = unpoison(self.mutex.lock()); | 
|  | *flag = true; | 
|  | self.cond.notify_one(); | 
|  | } | 
|  |  | 
|  | fn wait(&self) { | 
|  | let mut fired = unpoison(self.mutex.lock()); | 
|  | while !*fired { | 
|  | fired = unpoison(self.cond.wait(fired)); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | #[inline] | 
|  | fn unpoison<T>(r: Result<T, std::sync::PoisonError<T>>) -> T { | 
|  | r.unwrap_or_else(std::sync::PoisonError::into_inner) | 
|  | } | 
|  |  | 
|  | /// This function is an unlock-notify callback | 
|  | unsafe extern "C" fn unlock_notify_cb(ap_arg: *mut *mut c_void, n_arg: c_int) { | 
|  | use std::slice::from_raw_parts; | 
|  | let args = from_raw_parts(ap_arg as *const &UnlockNotification, n_arg as usize); | 
|  | for un in args { | 
|  | drop(catch_unwind(std::panic::AssertUnwindSafe(|| un.fired()))); | 
|  | } | 
|  | } | 
|  |  | 
|  | pub unsafe fn is_locked(db: *mut ffi::sqlite3, rc: c_int) -> bool { | 
|  | rc == ffi::SQLITE_LOCKED_SHAREDCACHE | 
|  | || (rc & 0xFF) == ffi::SQLITE_LOCKED | 
|  | && ffi::sqlite3_extended_errcode(db) == ffi::SQLITE_LOCKED_SHAREDCACHE | 
|  | } | 
|  |  | 
|  | /// This function assumes that an SQLite API call (either `sqlite3_prepare_v2()` | 
|  | /// or `sqlite3_step()`) has just returned `SQLITE_LOCKED`. The argument is the | 
|  | /// associated database connection. | 
|  | /// | 
|  | /// This function calls `sqlite3_unlock_notify()` to register for an | 
|  | /// unlock-notify callback, then blocks until that callback is delivered | 
|  | /// and returns `SQLITE_OK`. The caller should then retry the failed operation. | 
|  | /// | 
|  | /// Or, if `sqlite3_unlock_notify()` indicates that to block would deadlock | 
|  | /// the system, then this function returns `SQLITE_LOCKED` immediately. In | 
|  | /// this case the caller should not retry the operation and should roll | 
|  | /// back the current transaction (if any). | 
|  | #[cfg(feature = "unlock_notify")] | 
|  | pub unsafe fn wait_for_unlock_notify(db: *mut ffi::sqlite3) -> c_int { | 
|  | let un = UnlockNotification::new(); | 
|  | /* Register for an unlock-notify callback. */ | 
|  | let rc = ffi::sqlite3_unlock_notify( | 
|  | db, | 
|  | Some(unlock_notify_cb), | 
|  | &un as *const UnlockNotification as *mut c_void, | 
|  | ); | 
|  | debug_assert!( | 
|  | rc == ffi::SQLITE_LOCKED || rc == ffi::SQLITE_LOCKED_SHAREDCACHE || rc == ffi::SQLITE_OK | 
|  | ); | 
|  | if rc == ffi::SQLITE_OK { | 
|  | un.wait(); | 
|  | } | 
|  | rc | 
|  | } | 
|  |  | 
|  | #[cfg(test)] | 
|  | mod test { | 
|  | use crate::{Connection, OpenFlags, Result, Transaction, TransactionBehavior}; | 
|  | use std::sync::mpsc::sync_channel; | 
|  | use std::thread; | 
|  | use std::time; | 
|  |  | 
|  | #[test] | 
|  | fn test_unlock_notify() -> Result<()> { | 
|  | let url = "file::memory:?cache=shared"; | 
|  | let flags = OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_URI; | 
|  | let db1 = Connection::open_with_flags(url, flags)?; | 
|  | db1.execute_batch("CREATE TABLE foo (x)")?; | 
|  | let (rx, tx) = sync_channel(0); | 
|  | let child = thread::spawn(move || { | 
|  | let mut db2 = Connection::open_with_flags(url, flags).unwrap(); | 
|  | let tx2 = Transaction::new(&mut db2, TransactionBehavior::Immediate).unwrap(); | 
|  | tx2.execute_batch("INSERT INTO foo VALUES (42)").unwrap(); | 
|  | rx.send(1).unwrap(); | 
|  | let ten_millis = time::Duration::from_millis(10); | 
|  | thread::sleep(ten_millis); | 
|  | tx2.commit().unwrap(); | 
|  | }); | 
|  | assert_eq!(tx.recv().unwrap(), 1); | 
|  | let the_answer: i64 = db1.one_column("SELECT x FROM foo")?; | 
|  | assert_eq!(42i64, the_answer); | 
|  | child.join().unwrap(); | 
|  | Ok(()) | 
|  | } | 
|  | } |