SSH Agent v2: server framework (#19119)

Adds the basic framework for handling new client connections in the server.

Co-authored-by: Bernd Schoolmann <mail@quexten.com>
pull/19620/merge
neuronull 2 months ago committed by GitHub
parent 477b856519
commit 81adf155ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -31,8 +31,8 @@ where
impl<K, H> BitwardenSSHAgent<K, H>
where
K: KeyStore + Send + Sync,
H: ApprovalRequester,
K: KeyStore + Send + Sync + 'static,
H: ApprovalRequester + 'static,
{
/// Creates a new [`BitwardenSSHAgent`]
pub fn new(keystore: K, approval_handler: H) -> Self {
@ -50,7 +50,9 @@ where
/// Starts the ssh agent server
pub fn start_server(&mut self) -> Result<()> {
debug!("Starting the server.");
self.server.start()
// TODO: PM-30756 Create platform-specific listeners and pass to start()
// self.server.start(listeners)
Ok(())
}
/// Stops the ssh agent server

@ -34,4 +34,4 @@ pub use agent::BitwardenSSHAgent;
pub use approval::{ApprovalError, ApprovalRequester, SignApprovalRequest};
pub use crypto::PublicKey;
pub use server::{AuthRequest, SIGNamespace, SignRequest};
pub use storage::keystore::InMemoryEncryptedKeyStore;
pub use storage::keystore::{InMemoryEncryptedKeyStore, KeyStore};

@ -0,0 +1,70 @@
//! SSH agent client connection and connection handler
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use tracing::info;
use super::{auth_policy::AuthPolicy, peer_info::PeerInfo, KeyStore};
/// An accepted connection from an SSH agent client, bundling the I/O stream
/// with information about the connecting peer.
pub(crate) struct Connection<S> {
/// The I/O stream for this connection
pub(crate) stream: S,
/// Information about the connected peer process
pub(crate) peer_info: PeerInfo,
}
/// Handles an individual SSH agent client connection
pub(crate) struct ConnectionHandler<K, A, S> {
keystore: Arc<K>,
auth_policy: Arc<A>,
connection: Connection<S>,
token: CancellationToken,
}
impl<K, A, S> ConnectionHandler<K, A, S>
where
K: KeyStore,
A: AuthPolicy,
S: AsyncRead + AsyncWrite + Unpin,
{
/// Create a new connection handler
pub fn new(
keystore: Arc<K>,
auth_policy: Arc<A>,
connection: Connection<S>,
token: CancellationToken,
) -> Self {
Self {
keystore,
auth_policy,
connection,
token,
}
}
/// Handle incoming SSH agent protocol messages from the client
#[allow(clippy::never_loop)] // TODO PM-30755 remove
pub async fn handle(self) {
info!(peer_info = ?self.connection.peer_info, "Connection handler started");
loop {
tokio::select! {
() = self.token.cancelled() => {
info!("Connection handler received cancellation signal");
break;
}
// TODO: PM-30755
// read SSH protocol message from self.connection.stream
// parse message type, use auth policy and keystore to satisfy requests
// build response and write back to self.connection.stream
}
}
info!("Connection handler shutting down");
}
}

@ -0,0 +1,60 @@
//! SSH agent client connection listener abstraction
use anyhow::Result;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::mpsc::Sender,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use super::connection::Connection;
/// Implementors handle platform-specific socket/pipe creation and connection acceptance.
#[async_trait::async_trait]
pub(crate) trait Listener: Send + Sync {
/// The stream type returned by `accept()`
type Stream: AsyncRead + AsyncWrite + Send + Unpin + 'static;
/// Accept a new connection
async fn accept(&mut self) -> Result<Connection<Self::Stream>>;
}
/// Spawns an independent tokio task for each listener in `listeners`.
///
/// Each task loops calling `listener.accept()` and forwards accepted connections to `tx`.
/// Tasks exit when the cancellation token is triggered or the channel receiver is dropped.
pub(crate) fn spawn_listener_tasks<L>(
listeners: Vec<L>,
tx: &Sender<Connection<L::Stream>>,
cancel_token: &CancellationToken,
) where
L: Listener + 'static,
{
for mut listener in listeners {
let tx = tx.clone();
let token = cancel_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
() = token.cancelled() => {
debug!("Listener task received cancellation signal");
break;
}
result = listener.accept() => match result {
Ok(conn) => {
// Receiver dropped; main loop has exited
if tx.send(conn).await.is_err() {
break;
}
}
// Continue to retry on transient errors
Err(error) => {
error!(%error, "Listener accept failed");
}
}
}
}
});
}
}

@ -4,15 +4,26 @@
//! <https://datatracker.ietf.org/doc/draft-ietf-sshm-ssh-agent/>
mod auth_policy;
mod connection;
mod listener;
mod peer_info;
use std::sync::Arc;
use anyhow::Result;
pub(crate) use auth_policy::AuthPolicy;
// external exports for napi
pub use auth_policy::{AuthRequest, SIGNamespace, SignRequest};
use connection::Connection;
pub(crate) use listener::Listener;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use crate::storage::keystore::KeyStore;
use crate::KeyStore;
/// Buffer accepted connections pending dispatch to handler tasks.
const CONNECTION_CHANNEL_CAPACITY: usize = 32;
/// SSH Agent protocol server.
///
@ -26,14 +37,16 @@ pub struct SSHAgentServer<K, A> {
keystore: Arc<K>,
/// The authenticator policy to invoke for operations that require authorization
auth_policy: Arc<A>,
/// Async task coordination to use when asked to stop. Is `None` when non running.
/// Async task coordination to use when asked to stop. Is `None` when not running.
cancellation_token: Option<CancellationToken>,
/// Task handle for the accept loop. Is `None` when not running.
accept_handle: Option<JoinHandle<()>>,
}
impl<K, A> SSHAgentServer<K, A>
where
K: KeyStore,
A: AuthPolicy,
K: KeyStore + 'static,
A: AuthPolicy + 'static,
{
/// Creates a new [`SSHAgentServer`]
pub fn new(keystore: Arc<K>, auth_policy: Arc<A>) -> Self {
@ -41,18 +54,114 @@ where
keystore,
auth_policy,
cancellation_token: None,
accept_handle: None,
}
}
pub fn start(&mut self) -> Result<()> {
todo!();
/// Starts the server, listening on the provided listeners.
///
/// Each listener runs in its own task and sends accepted connections to a shared
/// channel. The accept loop dispatches each connection to a handler task.
pub fn start<L>(&mut self, listeners: Vec<L>) -> Result<()>
where
L: Listener + 'static,
{
if self.is_running() {
return Err(anyhow::anyhow!("Server is already running"));
}
let cancel_token = CancellationToken::new();
info!("Starting server");
let accept_handle = tokio::spawn(Self::accept(
listeners,
self.keystore.clone(),
self.auth_policy.clone(),
cancel_token.clone(),
));
info!("Server started");
self.accept_handle = Some(accept_handle);
self.cancellation_token = Some(cancel_token);
Ok(())
}
pub fn is_running(&self) -> bool {
todo!();
self.cancellation_token.is_some()
}
pub fn stop(&mut self) {
todo!();
if let Some(cancel_token) = self.cancellation_token.take() {
info!("Stopping server");
// Signal cancellation to all tasks
cancel_token.cancel();
// Abort the accept loop task
if let Some(handle) = self.accept_handle.take() {
handle.abort();
}
info!("Server stopped");
} else {
debug!("Cancellation token is None, server already stopped.");
}
}
/// Spawns listener tasks for each listener.
/// Incoming connections from listener tasks are dispatched to handler tasks.
/// Loops until cancelled or all listener tasks have exited.
async fn accept<L>(
listeners: Vec<L>,
keystore: Arc<K>,
auth_policy: Arc<A>,
cancel_token: CancellationToken,
) where
L: Listener + 'static,
L::Stream: 'static,
{
let (tx, mut rx) = mpsc::channel::<Connection<L::Stream>>(CONNECTION_CHANNEL_CAPACITY);
debug!("Accept loop spawning listener tasks");
listener::spawn_listener_tasks(listeners, &tx, &cancel_token);
// Dropping tx exlicitly allows it to close when all listener tasks exit,
// this is necessary for the recv block below to exit when listeners exit.
drop(tx);
info!("Accept loop starting");
loop {
tokio::select! {
() = cancel_token.cancelled() => {
debug!("Accept loop received cancellation signal");
break;
}
conn = rx.recv() => if let Some(connection) = conn {
info!(peer_info = ?connection.peer_info, "Connection accepted");
// TODO: PM-30755 Spawn handler for this connection
// let handler = ConnectionHandler::new(
// keystore.clone(),
// auth_policy.clone(),
// connection,
// token.clone(),
// );
// tokio::spawn(async move { handler.handle().await });
// TODO: PM-30755 temporary to avoid unused var warnings
let _ = connection;
let _ = keystore;
let _ = auth_policy;
} else {
debug!("All listener tasks exited");
break;
}
}
}
info!("Accept loop exited");
}
}

@ -0,0 +1,5 @@
//! Peer process information for SSH agent connections
/// Information about the connecting peer process
#[derive(Debug, Clone)]
pub(crate) struct PeerInfo {}
Loading…
Cancel
Save