Offloading computation

Dependencies

[dependencies] axum = { version = "0.5" } tokio = { version = "1", features = ["full"] }

Code

use axum::extract::{Extension, Path}; use axum::response::Json; use axum::routing::get; use tokio::sync::{mpsc, oneshot}; // Commands send to the processor. We use the oneshot sender to "call back" to the peer who send us // the command. enum Command { Sleep { secs: u64, // Using oneshot::Sender<Result<_>> we can propagate errors back to the caller tx: oneshot::Sender<Result<u64, ()>>, }, } async fn process_compute_request(mut rx: mpsc::Receiver<Command>) { // Note, that this serializes incoming requests. If you want to handle requests in parallel, // you have to spawn tasks once more. while let Some(command) = rx.recv().await { match command { Command::Sleep { secs, tx } => { let _ = tx.send(Ok(secs + 23)); } } } } async fn compute_complex( Path(secs): Path<u64>, Extension(command_tx): Extension<mpsc::Sender<Command>>, ) -> Json<u64> { // Construct a oneshot channel to receive the result from the processor. let (tx, rx) = oneshot::channel(); // Send the command carrying the payload as well as the result sender. let _ = command_tx.send(Command::Sleep { secs, tx }).await; // Wait for the result to be returned by the processor. let result = rx.await.unwrap().unwrap(); Json(result) } async fn compute_simple(Path(secs): Path<u64>) -> Json<u64> { println!("asked to sleep for {secs} secs"); // Spawn an async task on a separate thread to avoid blocking the async run-time. let result = tokio::task::spawn_blocking(move || { // Unlike tokio::time::sleep, this one blocks the current thread. std::thread::sleep(std::time::Duration::from_secs(secs)); secs + 42 }) .await .unwrap(); println!("returned after {secs} secs"); Json(result) } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let (tx, rx) = mpsc::channel(128); let app = axum::Router::new() .route("/compute/simple/:secs", get(compute_simple)) .route("/compute/complex/:secs", get(compute_complex)) .layer(Extension(tx)); let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000)); let server = tokio::task::spawn(async move { axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); }); let processor = tokio::task::spawn(async move { process_compute_request(rx).await; }); let (_, _) = tokio::join!(server, processor); Ok(()) }

Run

Start the server with

cargo run --bin offloading_computation