Skip to content

Commit

Permalink
Fix std and picow demos, cleaning warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
mkj committed Jun 3, 2024
1 parent 59471c0 commit 411a670
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 138 deletions.
124 changes: 32 additions & 92 deletions embassy/demos/common/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
#[allow(unused_imports)]
use log::{debug, error, info, log, trace, warn};

use embassy_sync::mutex::Mutex;
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_net::tcp::TcpSocket;
use embassy_net::Stack;
use embassy_net_driver::Driver;
Expand All @@ -13,7 +11,7 @@ use embedded_io_async::Write;

use heapless::String;

use sunset::{event::{ChanRequest, ServFirstAuth, ServOpenSession, ServPasswordAuth}, *};
use sunset::{event::{ServFirstAuth, ServOpenSession, ServPasswordAuth, ServPubkeyAuth}, *};
use sunset_embassy::{SSHServer, SunsetMutex};

use crate::SSHConfig;
Expand All @@ -28,7 +26,7 @@ pub async fn listener<D: Driver, S: DemoServer>(stack: &'static Stack<D>,
// frames more efficiently, RX doesn't matter so much?
// How does this interact with the channel copy buffer sizes?
let mut rx_buffer = [0; 1550];
let mut tx_buffer = [0; 4500];
let mut tx_buffer = [0; 1550];

loop {
let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
Expand All @@ -45,9 +43,8 @@ pub async fn listener<D: Driver, S: DemoServer>(stack: &'static Stack<D>,
warn!("Ended with error {e:#?}");
}

// Make sure a TCP socket reset is sent to the remote host
// Make sure a TCP socket reset is sent on exit to the remote host
socket.abort();

if let Err(e) = socket.flush().await {
warn!("Ended with error {e:?}");
}
Expand All @@ -61,29 +58,30 @@ async fn session<S: DemoServer>(socket: &mut TcpSocket<'_>, config: &SunsetMutex
let src = socket.remote_endpoint().unwrap();
info!("Connection from {}:{}", src.addr, src.port);

let demo = S::new(init);

let conf = config.lock().await.clone();
let app = ServerApp::new(conf)?;

// Create the SSH instance. These buffers are for decoding/encoding
// SSH packets.
let mut ssh_rxbuf = [0; 2000];
let mut ssh_txbuf = [0; 2000];
let mut ssh_txbuf = [0; 1000];
let serv = SSHServer::new(&mut ssh_rxbuf, &mut ssh_txbuf)?;
let serv = &serv;

let session = demo.run(serv, app);
// Create the handler. ServerApp is common handling (this file),
// demo is the specific demo (std or picow).
let demo = S::new(init);
let conf = config.lock().await.clone();
let app = ServerApp::new(conf)?;
// .run returns a future that runs for the life of the session
let session = demo.run(&serv, app);

// Connect the SSH instance to the sockets, .run is a future
// that reads and writes sockets.
let (mut rsock, mut wsock) = socket.split();

let run = serv.run(&mut rsock, &mut wsock);

let f = select(run, session).await;
match f {
Either::First(r) => r?,
Either::Second(r) => r?,
// Run until completion
match select(run, session).await {
Either::First(r) => r,
Either::Second(r) => r,
}

Ok(())
}

/// Provides `ServBehaviour` for the server
Expand All @@ -92,6 +90,8 @@ async fn session<S: DemoServer>(socket: &mut TcpSocket<'_>, config: &SunsetMutex
pub struct ServerApp {
config: SSHConfig,

opened: bool,
// Can be taken by the demoserver to run an interactive session.
pub sess: Option<ChanHandle>,
}

Expand All @@ -102,6 +102,7 @@ impl ServerApp {

Ok(Self {
sess: None,
opened: false,
config,
})
}
Expand All @@ -117,6 +118,9 @@ impl ServerApp {
ServEvent::PasswordAuth(a) => {
self.handle_password(a)
}
| ServEvent::PubkeyAuth(a) => {
self.handle_pubkey(a)
}
ServEvent::OpenSession(a) => {
self.open_session(a)
}
Expand All @@ -126,9 +130,6 @@ impl ServerApp {
ServEvent::SessionExec(a) => {
a.fail()
}
| ServEvent::PubkeyAuth(a) => {
a.deny()
}
| ServEvent::Defunct
| ServEvent::SessionShell(_) => {
error!("Expected caller to handle {event:?}");
Expand Down Expand Up @@ -163,15 +164,14 @@ impl ServerApp {
Ok(())
}

fn handle_pubkey(&mut self, a: ServPasswordAuth) -> Result<()> {
todo!();
fn handle_pubkey(&mut self, a: ServPubkeyAuth) -> Result<()> {
a.deny()
}

fn handle_firstauth(&self, a: ServFirstAuth) -> Result<()> {
let username = a.username()?;
if !self.is_admin(username) && self.config.console_noauth {
info!("Allowing auth for user {username}");
// self.shell.authed(username.as_str().unwrap_or("")).await;
return a.allow()
};

Expand All @@ -180,10 +180,12 @@ impl ServerApp {
}

fn open_session(&mut self, a: ServOpenSession) -> Result<()> {
if self.sess.is_some() {
// Already have one
if self.opened {
// only allow one session
a.reject(ChanFail::SSH_OPEN_ADMINISTRATIVELY_PROHIBITED)
} else {
self.opened = true;
// store the ChanHandle for the DemoServer to use
self.sess = Some(a.accept()?);
Ok(())
}
Expand All @@ -194,75 +196,13 @@ impl ServerApp {
}
}

// impl ServBehaviour for ServerApp {

// fn hostkeys(&mut self) -> BhResult<heapless::Vec<&SignKey, 2>> {
// // OK unwrap: only one element
// Ok(heapless::Vec::from_slice(&[&self.config.hostkey]).unwrap())
// }

// async fn auth_unchallenged(&mut self, username: TextString<'_>) -> bool {
// }

// fn have_auth_password(&self, username: TextString) -> bool {
// if self.is_admin(username) {
// self.config.admin_pw.is_some()
// } else {
// self.config.console_pw.is_some()
// }
// }

// fn have_auth_pubkey(&self, username: TextString) -> bool {
// todo!();
// // if self.is_admin(username) {
// // self.config.admin_keys.iter().any(|k| k.is_some())
// // } else {
// // self.config.console_keys.iter().any(|k| k.is_some())
// // }
// }

// fn open_session(&mut self, chan: ChanHandle) -> ChanOpened {
// if self.sess.is_some() {
// ChanOpened::Failure((ChanFail::SSH_OPEN_ADMINISTRATIVELY_PROHIBITED, chan))
// } else {
// self.sess = Some(chan.num());
// self.handle = Some(chan);
// ChanOpened::Success
// }
// }

// fn sess_shell(&mut self, chan: ChanNum) -> bool {
// if self.sess != Some(chan) {
// return false
// }

// if let Some(handle) = self.handle.take() {
// debug_assert_eq!(self.sess, Some(handle.num()));
// // self.shell.open_shell(handle);
// true
// } else {
// false
// }
// }

// fn sess_pty(&mut self, chan: ChanNum, _pty: &Pty) -> bool {
// self.sess == Some(chan)
// }

// fn disconnected(&mut self, desc: TextString) {
// info!("Disconnect by client: {}", desc.as_str().unwrap_or("bad"));
// }
// }

pub trait DemoServer {
/// State to be passed to each new connection by the server
type Init;

fn new(init: &Self::Init) -> Self;

/// A task to run for each incoming connection.
// TODO: eventually the compiler should add must_use automatically?
#[must_use]
async fn run(&self, serv: &SSHServer<'_>, common: ServerApp) -> Result<()>;
}

Expand All @@ -274,7 +214,7 @@ pub struct BufOutput {
/// Sufficient to hold output produced from a single keystroke input. Further output will be discarded
// pub s: String<300>,
// todo size
pub s: String<3000>,
pub s: String<500>,
}

impl BufOutput {
Expand Down
23 changes: 14 additions & 9 deletions embassy/demos/picow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core::ops::ControlFlow;
use {panic_probe as _};

use embassy_executor::Spawner;
use embassy_futures::select::select;
use embassy_futures::select::{select, Either};
use embassy_net::{Stack, HardwareAddress, EthernetAddress};
use embedded_io_async::{Write, Read};

Expand All @@ -18,8 +18,6 @@ use heapless::{String, Vec};
use static_cell::StaticCell;

use demo_common::menu::Runner as MenuRunner;
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::signal::Signal;
use embassy_sync::channel::Channel;

use sunset::*;
Expand Down Expand Up @@ -322,16 +320,20 @@ impl DemoServer for PicoServer {
if let Some(ch) = common.sess.take() {
debug_assert!(ch.num() == a.channel()?);
a.succeed()?;
chan_pipe.try_send(ch);
let _ = chan_pipe.try_send(ch);
} else {
a.fail()?;
}
}
ServEvent::FirstAuth(a) => {
username.lock().await.push_str(a.username()?);
common.handle_event(ServEvent::FirstAuth(a))?;
ServEvent::FirstAuth(ref a) => {
// record the username
if username.lock().await.push_str(a.username()?).is_err() {
warn!("Too long username")
}
// handle the rest
common.handle_event(ev)?;
}
other => common.handle_event(other)?,
_ => common.handle_event(ev)?,
};
};
#[allow(unreachable_code)]
Expand Down Expand Up @@ -360,6 +362,9 @@ impl DemoServer for PicoServer {
}
};

session.await
match select(prog_loop, session).await {
Either::First(r) => r,
Either::Second(r) => r,
}
}
}
12 changes: 2 additions & 10 deletions embassy/demos/std/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use rand::RngCore;

use demo_common::menu::Runner as MenuRunner;
use embedded_io_async::Read;
use embassy_sync::signal::Signal;
use embassy_sync::channel::Channel;
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_futures::select::select;
use embassy_net_tuntap::TunTapDevice;

Expand Down Expand Up @@ -76,9 +74,7 @@ async fn main_task(spawner: Spawner) {
}

#[derive(Default)]
struct StdDemo {
notify: Signal<NoopRawMutex, ChanHandle>,
}
struct StdDemo;

impl DemoServer for StdDemo {
type Init = ();
Expand All @@ -87,10 +83,6 @@ impl DemoServer for StdDemo {
Default::default()
}

fn open_shell(&self, handle: ChanHandle) {
self.notify.signal(handle);
}

async fn run(&self, serv: &SSHServer<'_>, mut common: ServerApp) -> Result<()>
{
let chan_pipe = Channel::<SunsetRawMutex, ChanHandle, 1>::new();
Expand All @@ -106,7 +98,7 @@ impl DemoServer for StdDemo {
if let Some(ch) = common.sess.take() {
debug_assert!(ch.num() == a.channel()?);
a.succeed()?;
chan_pipe.try_send(ch);
let _ = chan_pipe.try_send(ch);
} else {
a.fail()?;
}
Expand Down
4 changes: 3 additions & 1 deletion embassy/src/embassy_sunset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use pin_utils::pin_mut;

use sunset::{error, ChanData, ChanHandle, ChanNum, Error, Result, Runner};
use sunset::config::MAX_CHANNELS;
use sunset::event::{Event, CliEvent, ServEvent};
use sunset::event::Event;

// For now we only support single-threaded executors.
// In future this could be behind a cfg to allow different
Expand Down Expand Up @@ -149,6 +149,7 @@ impl<'a> EmbassySunset<'a> {
break Err::<(), sunset::Error>(Error::ChannelEOF)
}
}
.inspect(|r| warn!("tx complete {r:?}"))
};
let tx = select(tx, tx_stop.wait());

Expand Down Expand Up @@ -178,6 +179,7 @@ impl<'a> EmbassySunset<'a> {
buf = &buf[n..];
}
}
.inspect(|r| warn!("rx complete {r:?}"))
};

// TODO: if RX fails (bad decrypt etc) it doesn't cancel prog, so gets stuck
Expand Down
15 changes: 13 additions & 2 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,23 @@ impl Channels {
}
}

pub(crate) fn term_break(&self, num: ChanNum, length: u32, s: &mut TrafSend) -> Result<()> {
let ch = self.get(num)?;
let br = packets::Break {
length: if length == 0 { 0 } else { length.clamp(500, 3000) }
};
match ch.ty {
ChanType::Session => Req::Break(br).send(ch, s),
_ => error::BadChannelData.fail(),
}
}

fn dispatch_open(
&mut self,
p: &ChannelOpen<'_>,
s: &mut TrafSend,
) -> Result<DispatchEvent> {
match self.dispatch_open_inner(p, s) {
match self.dispatch_open_inner(p) {
Err(DispatchOpenError::Failure(f)) => {
s.send(packets::ChannelOpenFailure {
num: p.sender_num,
Expand All @@ -248,7 +259,7 @@ impl Channels {
}

// the caller will send failure messages if required
fn dispatch_open_inner(&mut self, p: &ChannelOpen, s: &mut TrafSend)
fn dispatch_open_inner(&mut self, p: &ChannelOpen)
-> Result<DispatchEvent, DispatchOpenError> {

// Check validity before reserving a channel
Expand Down
Loading

0 comments on commit 411a670

Please sign in to comment.