| use crate::fs::OpenOptions; |
| use crate::io; |
| use crate::io::Result; |
| use crate::io::SeekFrom; |
| use crate::io::{BufRead, Read, Seek, Write}; |
| use crate::os::unix::io::AsRawFd; |
| use crate::sys_common::io::test::tmpdir; |
| |
| #[test] |
| fn copy_specialization() -> Result<()> { |
| use crate::io::{BufReader, BufWriter}; |
| |
| let tmp_path = tmpdir(); |
| let source_path = tmp_path.join("copy-spec.source"); |
| let sink_path = tmp_path.join("copy-spec.sink"); |
| |
| let result: Result<()> = try { |
| let mut source = crate::fs::OpenOptions::new() |
| .read(true) |
| .write(true) |
| .create(true) |
| .truncate(true) |
| .open(&source_path)?; |
| source.write_all(b"abcdefghiklmnopqr")?; |
| source.seek(SeekFrom::Start(8))?; |
| let mut source = BufReader::with_capacity(8, source.take(5)); |
| source.fill_buf()?; |
| assert_eq!(source.buffer(), b"iklmn"); |
| source.get_mut().set_limit(6); |
| source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg" |
| let mut source = source.take(10); // "iklmnbcdef" |
| |
| let mut sink = crate::fs::OpenOptions::new() |
| .read(true) |
| .write(true) |
| .create(true) |
| .truncate(true) |
| .open(&sink_path)?; |
| sink.write_all(b"000000")?; |
| let mut sink = BufWriter::with_capacity(5, sink); |
| sink.write_all(b"wxyz")?; |
| assert_eq!(sink.buffer(), b"wxyz"); |
| |
| let copied = crate::io::copy(&mut source, &mut sink)?; |
| assert_eq!(copied, 10, "copy obeyed limit imposed by Take"); |
| assert_eq!(sink.buffer().len(), 0, "sink buffer was flushed"); |
| assert_eq!(source.limit(), 0, "outer Take was exhausted"); |
| assert_eq!(source.get_ref().buffer().len(), 0, "source buffer should be drained"); |
| assert_eq!( |
| source.get_ref().get_ref().limit(), |
| 1, |
| "inner Take allowed reading beyond end of file, some bytes should be left" |
| ); |
| |
| let mut sink = sink.into_inner()?; |
| sink.seek(SeekFrom::Start(0))?; |
| let mut copied = Vec::new(); |
| sink.read_to_end(&mut copied)?; |
| assert_eq!(&copied, b"000000wxyziklmnbcdef"); |
| }; |
| |
| let rm1 = crate::fs::remove_file(source_path); |
| let rm2 = crate::fs::remove_file(sink_path); |
| |
| result.and(rm1).and(rm2) |
| } |
| |
| #[test] |
| fn copies_append_mode_sink() -> Result<()> { |
| let tmp_path = tmpdir(); |
| let source_path = tmp_path.join("copies_append_mode.source"); |
| let sink_path = tmp_path.join("copies_append_mode.sink"); |
| let mut source = |
| OpenOptions::new().create(true).truncate(true).write(true).read(true).open(&source_path)?; |
| write!(source, "not empty")?; |
| source.seek(SeekFrom::Start(0))?; |
| let mut sink = OpenOptions::new().create(true).append(true).open(&sink_path)?; |
| |
| let copied = crate::io::copy(&mut source, &mut sink)?; |
| |
| assert_eq!(copied, 9); |
| |
| Ok(()) |
| } |
| |
| #[bench] |
| fn bench_file_to_file_copy(b: &mut test::Bencher) { |
| const BYTES: usize = 128 * 1024; |
| let temp_path = tmpdir(); |
| let src_path = temp_path.join("file-copy-bench-src"); |
| let mut src = crate::fs::OpenOptions::new() |
| .create(true) |
| .truncate(true) |
| .read(true) |
| .write(true) |
| .open(src_path) |
| .unwrap(); |
| src.write(&vec![0u8; BYTES]).unwrap(); |
| |
| let sink_path = temp_path.join("file-copy-bench-sink"); |
| let mut sink = crate::fs::OpenOptions::new() |
| .create(true) |
| .truncate(true) |
| .write(true) |
| .open(sink_path) |
| .unwrap(); |
| |
| b.bytes = BYTES as u64; |
| b.iter(|| { |
| src.seek(SeekFrom::Start(0)).unwrap(); |
| sink.seek(SeekFrom::Start(0)).unwrap(); |
| assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); |
| }); |
| } |
| |
| #[bench] |
| fn bench_file_to_socket_copy(b: &mut test::Bencher) { |
| const BYTES: usize = 128 * 1024; |
| let temp_path = tmpdir(); |
| let src_path = temp_path.join("pipe-copy-bench-src"); |
| let mut src = OpenOptions::new() |
| .create(true) |
| .truncate(true) |
| .read(true) |
| .write(true) |
| .open(src_path) |
| .unwrap(); |
| src.write(&vec![0u8; BYTES]).unwrap(); |
| |
| let sink_drainer = crate::net::TcpListener::bind("localhost:0").unwrap(); |
| let mut sink = crate::net::TcpStream::connect(sink_drainer.local_addr().unwrap()).unwrap(); |
| let mut sink_drainer = sink_drainer.accept().unwrap().0; |
| |
| crate::thread::spawn(move || { |
| let mut sink_buf = vec![0u8; 1024 * 1024]; |
| loop { |
| sink_drainer.read(&mut sink_buf[..]).unwrap(); |
| } |
| }); |
| |
| b.bytes = BYTES as u64; |
| b.iter(|| { |
| src.seek(SeekFrom::Start(0)).unwrap(); |
| assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); |
| }); |
| } |
| |
| #[bench] |
| fn bench_file_to_uds_copy(b: &mut test::Bencher) { |
| const BYTES: usize = 128 * 1024; |
| let temp_path = tmpdir(); |
| let src_path = temp_path.join("uds-copy-bench-src"); |
| let mut src = OpenOptions::new() |
| .create(true) |
| .truncate(true) |
| .read(true) |
| .write(true) |
| .open(src_path) |
| .unwrap(); |
| src.write(&vec![0u8; BYTES]).unwrap(); |
| |
| let (mut sink, mut sink_drainer) = crate::os::unix::net::UnixStream::pair().unwrap(); |
| |
| crate::thread::spawn(move || { |
| let mut sink_buf = vec![0u8; 1024 * 1024]; |
| loop { |
| sink_drainer.read(&mut sink_buf[..]).unwrap(); |
| } |
| }); |
| |
| b.bytes = BYTES as u64; |
| b.iter(|| { |
| src.seek(SeekFrom::Start(0)).unwrap(); |
| assert_eq!(BYTES as u64, io::copy(&mut src, &mut sink).unwrap()); |
| }); |
| } |
| |
| #[cfg(any(target_os = "linux", target_os = "android"))] |
| #[bench] |
| fn bench_socket_pipe_socket_copy(b: &mut test::Bencher) { |
| use super::CopyResult; |
| use crate::io::ErrorKind; |
| use crate::process::{ChildStdin, ChildStdout}; |
| use crate::sys_common::FromInner; |
| |
| let (read_end, write_end) = crate::sys::pipe::anon_pipe().unwrap(); |
| |
| let mut read_end = ChildStdout::from_inner(read_end); |
| let write_end = ChildStdin::from_inner(write_end); |
| |
| let acceptor = crate::net::TcpListener::bind("localhost:0").unwrap(); |
| let mut remote_end = crate::net::TcpStream::connect(acceptor.local_addr().unwrap()).unwrap(); |
| |
| let local_end = crate::sync::Arc::new(acceptor.accept().unwrap().0); |
| |
| // the data flow in this benchmark: |
| // |
| // socket(tx) local_source |
| // remote_end (write) +--------> (splice to) |
| // write_end |
| // + |
| // | |
| // | pipe |
| // v |
| // read_end |
| // remote_end (read) <---------+ (splice to) * |
| // socket(rx) local_end |
| // |
| // * benchmark loop using io::copy |
| |
| crate::thread::spawn(move || { |
| let mut sink_buf = vec![0u8; 1024 * 1024]; |
| remote_end.set_nonblocking(true).unwrap(); |
| loop { |
| match remote_end.write(&mut sink_buf[..]) { |
| Err(err) if err.kind() == ErrorKind::WouldBlock => {} |
| Ok(_) => {} |
| err => { |
| err.expect("write failed"); |
| } |
| }; |
| match remote_end.read(&mut sink_buf[..]) { |
| Err(err) if err.kind() == ErrorKind::WouldBlock => {} |
| Ok(_) => {} |
| err => { |
| err.expect("read failed"); |
| } |
| }; |
| } |
| }); |
| |
| // check that splice works, otherwise the benchmark would hang |
| let probe = super::sendfile_splice( |
| super::SpliceMode::Splice, |
| local_end.as_raw_fd(), |
| write_end.as_raw_fd(), |
| 1, |
| ); |
| |
| match probe { |
| CopyResult::Ended(1) => { |
| // splice works |
| } |
| _ => { |
| eprintln!("splice failed, skipping benchmark"); |
| return; |
| } |
| } |
| |
| let local_source = local_end.clone(); |
| crate::thread::spawn(move || { |
| loop { |
| super::sendfile_splice( |
| super::SpliceMode::Splice, |
| local_source.as_raw_fd(), |
| write_end.as_raw_fd(), |
| u64::MAX, |
| ); |
| } |
| }); |
| |
| const BYTES: usize = 128 * 1024; |
| b.bytes = BYTES as u64; |
| b.iter(|| { |
| assert_eq!( |
| BYTES as u64, |
| io::copy(&mut (&mut read_end).take(BYTES as u64), &mut &*local_end).unwrap() |
| ); |
| }); |
| } |