diff --git a/apps/desktop/desktop_native/ssh_agent/src/agent.rs b/apps/desktop/desktop_native/ssh_agent/src/agent.rs index 91a3ee69aff..0b5d95ea5cc 100644 --- a/apps/desktop/desktop_native/ssh_agent/src/agent.rs +++ b/apps/desktop/desktop_native/ssh_agent/src/agent.rs @@ -31,8 +31,8 @@ where impl BitwardenSSHAgent where - K: KeyStore + Send + Sync, - H: ApprovalRequester, + K: KeyStore + Send + Sync + 'static, + H: ApprovalRequester + 'static, { /// Creates a new [`BitwardenSSHAgent`] pub fn new(keystore: K, approval_handler: H) -> Self { @@ -50,7 +50,9 @@ where /// Starts the ssh agent server pub fn start_server(&mut self) -> Result<()> { debug!("Starting the server."); - self.server.start() + // TODO: PM-30756 Create platform-specific listeners and pass to start() + // self.server.start(listeners) + Ok(()) } /// Stops the ssh agent server diff --git a/apps/desktop/desktop_native/ssh_agent/src/lib.rs b/apps/desktop/desktop_native/ssh_agent/src/lib.rs index a47d37f07dd..ace6bbad807 100644 --- a/apps/desktop/desktop_native/ssh_agent/src/lib.rs +++ b/apps/desktop/desktop_native/ssh_agent/src/lib.rs @@ -34,4 +34,4 @@ pub use agent::BitwardenSSHAgent; pub use approval::{ApprovalError, ApprovalRequester, SignApprovalRequest}; pub use crypto::PublicKey; pub use server::{AuthRequest, SIGNamespace, SignRequest}; -pub use storage::keystore::InMemoryEncryptedKeyStore; +pub use storage::keystore::{InMemoryEncryptedKeyStore, KeyStore}; diff --git a/apps/desktop/desktop_native/ssh_agent/src/server/connection.rs b/apps/desktop/desktop_native/ssh_agent/src/server/connection.rs new file mode 100644 index 00000000000..cffe1e0b827 --- /dev/null +++ b/apps/desktop/desktop_native/ssh_agent/src/server/connection.rs @@ -0,0 +1,70 @@ +//! SSH agent client connection and connection handler + +use std::sync::Arc; + +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use super::{auth_policy::AuthPolicy, peer_info::PeerInfo, KeyStore}; + +/// An accepted connection from an SSH agent client, bundling the I/O stream +/// with information about the connecting peer. +pub(crate) struct Connection { + /// The I/O stream for this connection + pub(crate) stream: S, + /// Information about the connected peer process + pub(crate) peer_info: PeerInfo, +} + +/// Handles an individual SSH agent client connection +pub(crate) struct ConnectionHandler { + keystore: Arc, + auth_policy: Arc, + connection: Connection, + token: CancellationToken, +} + +impl ConnectionHandler +where + K: KeyStore, + A: AuthPolicy, + S: AsyncRead + AsyncWrite + Unpin, +{ + /// Create a new connection handler + pub fn new( + keystore: Arc, + auth_policy: Arc, + connection: Connection, + token: CancellationToken, + ) -> Self { + Self { + keystore, + auth_policy, + connection, + token, + } + } + + /// Handle incoming SSH agent protocol messages from the client + #[allow(clippy::never_loop)] // TODO PM-30755 remove + pub async fn handle(self) { + info!(peer_info = ?self.connection.peer_info, "Connection handler started"); + + loop { + tokio::select! { + () = self.token.cancelled() => { + info!("Connection handler received cancellation signal"); + break; + } + + // TODO: PM-30755 + // read SSH protocol message from self.connection.stream + // parse message type, use auth policy and keystore to satisfy requests + // build response and write back to self.connection.stream + } + } + + info!("Connection handler shutting down"); + } +} diff --git a/apps/desktop/desktop_native/ssh_agent/src/server/listener/mod.rs b/apps/desktop/desktop_native/ssh_agent/src/server/listener/mod.rs new file mode 100644 index 00000000000..c248ff188c8 --- /dev/null +++ b/apps/desktop/desktop_native/ssh_agent/src/server/listener/mod.rs @@ -0,0 +1,60 @@ +//! SSH agent client connection listener abstraction + +use anyhow::Result; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::mpsc::Sender, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error}; + +use super::connection::Connection; + +/// Implementors handle platform-specific socket/pipe creation and connection acceptance. +#[async_trait::async_trait] +pub(crate) trait Listener: Send + Sync { + /// The stream type returned by `accept()` + type Stream: AsyncRead + AsyncWrite + Send + Unpin + 'static; + + /// Accept a new connection + async fn accept(&mut self) -> Result>; +} + +/// Spawns an independent tokio task for each listener in `listeners`. +/// +/// Each task loops calling `listener.accept()` and forwards accepted connections to `tx`. +/// Tasks exit when the cancellation token is triggered or the channel receiver is dropped. +pub(crate) fn spawn_listener_tasks( + listeners: Vec, + tx: &Sender>, + cancel_token: &CancellationToken, +) where + L: Listener + 'static, +{ + for mut listener in listeners { + let tx = tx.clone(); + let token = cancel_token.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + () = token.cancelled() => { + debug!("Listener task received cancellation signal"); + break; + } + result = listener.accept() => match result { + Ok(conn) => { + // Receiver dropped; main loop has exited + if tx.send(conn).await.is_err() { + break; + } + } + // Continue to retry on transient errors + Err(error) => { + error!(%error, "Listener accept failed"); + } + } + } + } + }); + } +} diff --git a/apps/desktop/desktop_native/ssh_agent/src/server/mod.rs b/apps/desktop/desktop_native/ssh_agent/src/server/mod.rs index 12cff50f22a..7cbf243a8ce 100644 --- a/apps/desktop/desktop_native/ssh_agent/src/server/mod.rs +++ b/apps/desktop/desktop_native/ssh_agent/src/server/mod.rs @@ -4,15 +4,26 @@ //! mod auth_policy; +mod connection; +mod listener; +mod peer_info; use std::sync::Arc; use anyhow::Result; pub(crate) use auth_policy::AuthPolicy; +// external exports for napi pub use auth_policy::{AuthRequest, SIGNamespace, SignRequest}; +use connection::Connection; +pub(crate) use listener::Listener; +use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; +use tracing::{debug, info}; -use crate::storage::keystore::KeyStore; +use crate::KeyStore; + +/// Buffer accepted connections pending dispatch to handler tasks. +const CONNECTION_CHANNEL_CAPACITY: usize = 32; /// SSH Agent protocol server. /// @@ -26,14 +37,16 @@ pub struct SSHAgentServer { keystore: Arc, /// The authenticator policy to invoke for operations that require authorization auth_policy: Arc, - /// Async task coordination to use when asked to stop. Is `None` when non running. + /// Async task coordination to use when asked to stop. Is `None` when not running. cancellation_token: Option, + /// Task handle for the accept loop. Is `None` when not running. + accept_handle: Option>, } impl SSHAgentServer where - K: KeyStore, - A: AuthPolicy, + K: KeyStore + 'static, + A: AuthPolicy + 'static, { /// Creates a new [`SSHAgentServer`] pub fn new(keystore: Arc, auth_policy: Arc) -> Self { @@ -41,18 +54,114 @@ where keystore, auth_policy, cancellation_token: None, + accept_handle: None, } } - pub fn start(&mut self) -> Result<()> { - todo!(); + /// Starts the server, listening on the provided listeners. + /// + /// Each listener runs in its own task and sends accepted connections to a shared + /// channel. The accept loop dispatches each connection to a handler task. + pub fn start(&mut self, listeners: Vec) -> Result<()> + where + L: Listener + 'static, + { + if self.is_running() { + return Err(anyhow::anyhow!("Server is already running")); + } + + let cancel_token = CancellationToken::new(); + + info!("Starting server"); + + let accept_handle = tokio::spawn(Self::accept( + listeners, + self.keystore.clone(), + self.auth_policy.clone(), + cancel_token.clone(), + )); + + info!("Server started"); + + self.accept_handle = Some(accept_handle); + self.cancellation_token = Some(cancel_token); + + Ok(()) } pub fn is_running(&self) -> bool { - todo!(); + self.cancellation_token.is_some() } pub fn stop(&mut self) { - todo!(); + if let Some(cancel_token) = self.cancellation_token.take() { + info!("Stopping server"); + + // Signal cancellation to all tasks + cancel_token.cancel(); + + // Abort the accept loop task + if let Some(handle) = self.accept_handle.take() { + handle.abort(); + } + + info!("Server stopped"); + } else { + debug!("Cancellation token is None, server already stopped."); + } + } + + /// Spawns listener tasks for each listener. + /// Incoming connections from listener tasks are dispatched to handler tasks. + /// Loops until cancelled or all listener tasks have exited. + async fn accept( + listeners: Vec, + keystore: Arc, + auth_policy: Arc, + cancel_token: CancellationToken, + ) where + L: Listener + 'static, + L::Stream: 'static, + { + let (tx, mut rx) = mpsc::channel::>(CONNECTION_CHANNEL_CAPACITY); + + debug!("Accept loop spawning listener tasks"); + listener::spawn_listener_tasks(listeners, &tx, &cancel_token); + + // Dropping tx exlicitly allows it to close when all listener tasks exit, + // this is necessary for the recv block below to exit when listeners exit. + drop(tx); + + info!("Accept loop starting"); + loop { + tokio::select! { + () = cancel_token.cancelled() => { + debug!("Accept loop received cancellation signal"); + break; + } + conn = rx.recv() => if let Some(connection) = conn { + info!(peer_info = ?connection.peer_info, "Connection accepted"); + + // TODO: PM-30755 Spawn handler for this connection + // let handler = ConnectionHandler::new( + // keystore.clone(), + // auth_policy.clone(), + // connection, + // token.clone(), + // ); + // tokio::spawn(async move { handler.handle().await }); + + // TODO: PM-30755 temporary to avoid unused var warnings + let _ = connection; + let _ = keystore; + let _ = auth_policy; + } else { + debug!("All listener tasks exited"); + break; + } + } + } + + info!("Accept loop exited"); } } diff --git a/apps/desktop/desktop_native/ssh_agent/src/server/peer_info.rs b/apps/desktop/desktop_native/ssh_agent/src/server/peer_info.rs new file mode 100644 index 00000000000..0e08a9d1066 --- /dev/null +++ b/apps/desktop/desktop_native/ssh_agent/src/server/peer_info.rs @@ -0,0 +1,5 @@ +//! Peer process information for SSH agent connections + +/// Information about the connecting peer process +#[derive(Debug, Clone)] +pub(crate) struct PeerInfo {}