| // Copyright 2020 The ChromiumOS Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Legacy console device that uses a polling thread. This is kept because it is still used by |
| //! Windows ; outside of this use-case, please use [[asynchronous::AsyncConsole]] instead. |
| |
| #[cfg(unix)] |
| pub mod asynchronous; |
| mod sys; |
| |
| use std::collections::VecDeque; |
| use std::io; |
| use std::io::Read; |
| use std::io::Write; |
| use std::ops::DerefMut; |
| use std::result; |
| use std::sync::Arc; |
| use std::thread; |
| |
| use anyhow::anyhow; |
| use anyhow::Context; |
| use base::error; |
| use base::AsRawDescriptor; |
| use base::Descriptor; |
| use base::Event; |
| use base::EventToken; |
| use base::RawDescriptor; |
| use base::WaitContext; |
| use base::WorkerThread; |
| use data_model::Le16; |
| use data_model::Le32; |
| use hypervisor::ProtectionType; |
| use remain::sorted; |
| use sync::Mutex; |
| use thiserror::Error as ThisError; |
| use vm_memory::GuestMemory; |
| use zerocopy::AsBytes; |
| use zerocopy::FromBytes; |
| |
| use crate::virtio::base_features; |
| use crate::virtio::copy_config; |
| use crate::virtio::DeviceType; |
| use crate::virtio::Interrupt; |
| use crate::virtio::Queue; |
| use crate::virtio::Reader; |
| use crate::virtio::SignalableInterrupt; |
| use crate::virtio::VirtioDevice; |
| use crate::Suspendable; |
| |
| pub(crate) const QUEUE_SIZE: u16 = 256; |
| |
| // For now, just implement port 0 (receiveq and transmitq). |
| // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed. |
| const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE]; |
| |
| #[sorted] |
| #[derive(ThisError, Debug)] |
| pub enum ConsoleError { |
| /// There are no more available descriptors to receive into |
| #[error("no rx descriptors available")] |
| RxDescriptorsExhausted, |
| } |
| |
| #[derive(Copy, Clone, Debug, Default, AsBytes, FromBytes)] |
| #[repr(C)] |
| pub struct virtio_console_config { |
| pub cols: Le16, |
| pub rows: Le16, |
| pub max_nr_ports: Le32, |
| pub emerg_wr: Le32, |
| } |
| |
| /// Checks for input from `buffer` and transfers it to the receive queue, if any. |
| /// |
| /// # Arguments |
| /// |
| /// * `mem` - The GuestMemory to write the data into |
| /// * `interrupt` - SignalableInterrupt used to signal that the queue has been used |
| /// * `buffer` - Ring buffer providing data to put into the guest |
| /// * `receive_queue` - The receive virtio Queue |
| fn handle_input<I: SignalableInterrupt>( |
| mem: &GuestMemory, |
| interrupt: &I, |
| buffer: &mut VecDeque<u8>, |
| receive_queue: &Arc<Mutex<Queue>>, |
| ) -> result::Result<(), ConsoleError> { |
| let mut receive_queue = receive_queue |
| .try_lock() |
| .expect("Lock should not be unavailable"); |
| loop { |
| let mut desc = receive_queue |
| .peek(mem) |
| .ok_or(ConsoleError::RxDescriptorsExhausted)?; |
| |
| let writer = &mut desc.writer; |
| while writer.available_bytes() > 0 && !buffer.is_empty() { |
| let (buffer_front, buffer_back) = buffer.as_slices(); |
| let buffer_chunk = if !buffer_front.is_empty() { |
| buffer_front |
| } else { |
| buffer_back |
| }; |
| let written = writer.write(buffer_chunk).unwrap(); |
| drop(buffer.drain(..written)); |
| } |
| |
| let bytes_written = writer.bytes_written() as u32; |
| |
| if bytes_written > 0 { |
| receive_queue.pop_peeked(mem); |
| receive_queue.add_used(mem, desc, bytes_written); |
| receive_queue.trigger_interrupt(mem, interrupt); |
| } |
| |
| if bytes_written == 0 { |
| return Ok(()); |
| } |
| } |
| } |
| |
| /// Writes the available data from the reader into the given output queue. |
| /// |
| /// # Arguments |
| /// |
| /// * `reader` - The Reader with the data we want to write. |
| /// * `output` - The output sink we are going to write the data to. |
| fn process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()> { |
| let len = reader.available_bytes(); |
| let mut data = vec![0u8; len]; |
| reader.read_exact(&mut data)?; |
| output.write_all(&data)?; |
| output.flush()?; |
| Ok(()) |
| } |
| |
| /// Processes the data taken from the given transmit queue into the output sink. |
| /// |
| /// # Arguments |
| /// |
| /// * `mem` - The GuestMemory to take the data from |
| /// * `interrupt` - SignalableInterrupt used to signal (if required) that the queue has been used |
| /// * `transmit_queue` - The transmit virtio Queue |
| /// * `output` - The output sink we are going to write the data into |
| fn process_transmit_queue<I: SignalableInterrupt>( |
| mem: &GuestMemory, |
| interrupt: &I, |
| transmit_queue: &Arc<Mutex<Queue>>, |
| output: &mut dyn io::Write, |
| ) { |
| let mut needs_interrupt = false; |
| let mut transmit_queue = transmit_queue |
| .try_lock() |
| .expect("Lock should not be unavailable"); |
| while let Some(mut avail_desc) = transmit_queue.pop(mem) { |
| process_transmit_request(&mut avail_desc.reader, output) |
| .unwrap_or_else(|e| error!("console: process_transmit_request failed: {}", e)); |
| |
| transmit_queue.add_used(mem, avail_desc, 0); |
| needs_interrupt = true; |
| } |
| |
| if needs_interrupt { |
| transmit_queue.trigger_interrupt(mem, interrupt); |
| } |
| } |
| |
| struct Worker { |
| mem: GuestMemory, |
| interrupt: Interrupt, |
| input: Option<Arc<Mutex<VecDeque<u8>>>>, |
| output: Box<dyn io::Write + Send>, |
| kill_evt: Event, |
| in_avail_evt: Event, |
| receive_queue: Arc<Mutex<Queue>>, |
| receive_evt: Event, |
| transmit_queue: Arc<Mutex<Queue>>, |
| transmit_evt: Event, |
| } |
| |
| /// Starts a thread that reads rx and sends the input back via the returned buffer. |
| /// |
| /// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data |
| /// is available, the caller should lock the returned `Mutex` and read data out of the inner |
| /// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed. |
| /// |
| /// # Arguments |
| /// |
| /// * `rx` - Data source that the reader thread will wait on to send data back to the buffer |
| /// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer |
| fn spawn_input_thread( |
| mut rx: crate::serial::sys::InStreamType, |
| in_avail_evt: &Event, |
| ) -> Option<Arc<Mutex<VecDeque<u8>>>> { |
| let buffer = Arc::new(Mutex::new(VecDeque::<u8>::new())); |
| let buffer_cloned = buffer.clone(); |
| |
| let thread_in_avail_evt = match in_avail_evt.try_clone() { |
| Ok(evt) => evt, |
| Err(e) => { |
| error!("failed to clone in_avail_evt: {}", e); |
| return None; |
| } |
| }; |
| |
| // The input thread runs in detached mode. |
| let res = thread::Builder::new() |
| .name("console_input".to_string()) |
| .spawn(move || { |
| let mut rx_buf = [0u8; 1 << 12]; |
| loop { |
| match rx.read(&mut rx_buf) { |
| Ok(0) => break, // Assume the stream of input has ended. |
| Ok(size) => { |
| buffer.lock().extend(&rx_buf[0..size]); |
| thread_in_avail_evt.signal().unwrap(); |
| } |
| Err(e) => { |
| // Being interrupted is not an error, but everything else is. |
| if sys::is_a_fatal_input_error(&e) { |
| error!( |
| "failed to read for bytes to queue into console device: {}", |
| e |
| ); |
| break; |
| } |
| } |
| } |
| |
| // Depending on the platform, a short sleep is needed here (ie. Windows). |
| sys::read_delay_if_needed(); |
| } |
| }); |
| if let Err(e) = res { |
| error!("failed to spawn input thread: {}", e); |
| return None; |
| } |
| Some(buffer_cloned) |
| } |
| |
| impl Worker { |
| fn run(&mut self) { |
| #[derive(EventToken)] |
| enum Token { |
| ReceiveQueueAvailable, |
| TransmitQueueAvailable, |
| InputAvailable, |
| InterruptResample, |
| Kill, |
| } |
| |
| let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[ |
| (&self.transmit_evt, Token::TransmitQueueAvailable), |
| (&self.receive_evt, Token::ReceiveQueueAvailable), |
| (&self.in_avail_evt, Token::InputAvailable), |
| (&self.kill_evt, Token::Kill), |
| ]) { |
| Ok(pc) => pc, |
| Err(e) => { |
| error!("failed creating WaitContext: {}", e); |
| return; |
| } |
| }; |
| if let Some(resample_evt) = self.interrupt.get_resample_evt() { |
| if wait_ctx |
| .add(resample_evt, Token::InterruptResample) |
| .is_err() |
| { |
| error!("failed adding resample event to WaitContext."); |
| return; |
| } |
| } |
| |
| 'wait: loop { |
| let events = match wait_ctx.wait() { |
| Ok(v) => v, |
| Err(e) => { |
| error!("failed polling for events: {}", e); |
| break; |
| } |
| }; |
| |
| for event in events.iter().filter(|e| e.is_readable) { |
| match event.token { |
| Token::TransmitQueueAvailable => { |
| if let Err(e) = self.transmit_evt.wait() { |
| error!("failed reading transmit queue Event: {}", e); |
| break 'wait; |
| } |
| process_transmit_queue( |
| &self.mem, |
| &self.interrupt, |
| &self.transmit_queue, |
| &mut self.output, |
| ); |
| } |
| Token::ReceiveQueueAvailable => { |
| if let Err(e) = self.receive_evt.wait() { |
| error!("failed reading receive queue Event: {}", e); |
| break 'wait; |
| } |
| if let Some(in_buf_ref) = self.input.as_ref() { |
| match handle_input( |
| &self.mem, |
| &self.interrupt, |
| in_buf_ref.lock().deref_mut(), |
| &self.receive_queue, |
| ) { |
| Ok(()) => {} |
| // Console errors are no-ops, so just continue. |
| Err(_) => { |
| continue; |
| } |
| } |
| } |
| } |
| Token::InputAvailable => { |
| if let Err(e) = self.in_avail_evt.wait() { |
| error!("failed reading in_avail_evt: {}", e); |
| break 'wait; |
| } |
| if let Some(in_buf_ref) = self.input.as_ref() { |
| match handle_input( |
| &self.mem, |
| &self.interrupt, |
| in_buf_ref.lock().deref_mut(), |
| &self.receive_queue, |
| ) { |
| Ok(()) => {} |
| // Console errors are no-ops, so just continue. |
| Err(_) => { |
| continue; |
| } |
| } |
| } |
| } |
| Token::InterruptResample => { |
| self.interrupt.interrupt_resample(); |
| } |
| Token::Kill => break 'wait, |
| } |
| } |
| } |
| } |
| } |
| |
| enum ConsoleInput { |
| FromRead(crate::serial::sys::InStreamType), |
| FromThread(Arc<Mutex<VecDeque<u8>>>), |
| } |
| |
| /// Virtio console device. |
| pub struct Console { |
| base_features: u64, |
| in_avail_evt: Option<Event>, |
| worker_thread: Option<WorkerThread<Worker>>, |
| input: Option<ConsoleInput>, |
| output: Option<Box<dyn io::Write + Send>>, |
| keep_descriptors: Vec<Descriptor>, |
| } |
| |
| impl Console { |
| fn new( |
| protection_type: ProtectionType, |
| input: Option<ConsoleInput>, |
| output: Option<Box<dyn io::Write + Send>>, |
| keep_rds: Vec<RawDescriptor>, |
| ) -> Console { |
| Console { |
| base_features: base_features(protection_type), |
| in_avail_evt: None, |
| worker_thread: None, |
| input, |
| output, |
| keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(), |
| } |
| } |
| } |
| |
| impl VirtioDevice for Console { |
| fn keep_rds(&self) -> Vec<RawDescriptor> { |
| // return the raw descriptors as opposed to descriptor. |
| self.keep_descriptors |
| .iter() |
| .map(|descr| descr.as_raw_descriptor()) |
| .collect() |
| } |
| |
| fn features(&self) -> u64 { |
| self.base_features |
| } |
| |
| fn device_type(&self) -> DeviceType { |
| DeviceType::Console |
| } |
| |
| fn queue_max_sizes(&self) -> &[u16] { |
| QUEUE_SIZES |
| } |
| |
| fn read_config(&self, offset: u64, data: &mut [u8]) { |
| let config = virtio_console_config { |
| max_nr_ports: 1.into(), |
| ..Default::default() |
| }; |
| copy_config(data, 0, config.as_bytes(), offset); |
| } |
| |
| fn activate( |
| &mut self, |
| mem: GuestMemory, |
| interrupt: Interrupt, |
| mut queues: Vec<(Queue, Event)>, |
| ) -> anyhow::Result<()> { |
| if queues.len() < 2 { |
| return Err(anyhow!("expected 2 queues, got {}", queues.len())); |
| } |
| |
| let (receive_queue, receive_evt) = queues.remove(0); |
| let (transmit_queue, transmit_evt) = queues.remove(0); |
| |
| if self.in_avail_evt.is_none() { |
| self.in_avail_evt = Some(Event::new().context("failed creating Event")?); |
| } |
| let in_avail_evt = self |
| .in_avail_evt |
| .as_ref() |
| .unwrap() |
| .try_clone() |
| .context("failed creating input available Event pair")?; |
| |
| // Spawn a separate thread to poll self.input. |
| // A thread is used because io::Read only provides a blocking interface, and there is no |
| // generic way to add an io::Read instance to a poll context (it may not be backed by a file |
| // descriptor). Moving the blocking read call to a separate thread and sending data back to |
| // the main worker thread with an event for notification bridges this gap. |
| let input = match self.input.take() { |
| Some(ConsoleInput::FromRead(read)) => { |
| let buffer = spawn_input_thread(read, self.in_avail_evt.as_ref().unwrap()); |
| if buffer.is_none() { |
| return Err(anyhow!("failed creating input thread")); |
| }; |
| buffer |
| } |
| Some(ConsoleInput::FromThread(buffer)) => Some(buffer), |
| None => None, |
| }; |
| let output = self.output.take().unwrap_or_else(|| Box::new(io::sink())); |
| |
| self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| { |
| let mut worker = Worker { |
| mem, |
| interrupt, |
| input, |
| output, |
| in_avail_evt, |
| kill_evt, |
| // Device -> driver |
| receive_queue: Arc::new(Mutex::new(receive_queue)), |
| receive_evt, |
| // Driver -> device |
| transmit_queue: Arc::new(Mutex::new(transmit_queue)), |
| transmit_evt, |
| }; |
| worker.run(); |
| worker |
| })); |
| Ok(()) |
| } |
| |
| fn reset(&mut self) -> bool { |
| if let Some(worker_thread) = self.worker_thread.take() { |
| let worker = worker_thread.stop(); |
| self.input = worker.input.map(ConsoleInput::FromThread); |
| self.output = Some(worker.output); |
| return true; |
| } |
| false |
| } |
| } |
| |
| impl Suspendable for Console {} |