Offloading computation

Dependencies

[dependencies]
axum = { version = "0.5" }
tokio = { version = "1", features = ["full"] }

Code

use axum::extract::{Extension, Path};
use axum::response::Json;
use axum::routing::get;
use tokio::sync::{mpsc, oneshot};

// Commands send to the processor. We use the oneshot sender to "call back" to the peer who send us
// the command.
enum Command {
    Sleep {
        secs: u64,
        // Using oneshot::Sender<Result<_>> we can propagate errors back to the caller
        tx: oneshot::Sender<Result<u64, ()>>,
    },
}

async fn process_compute_request(mut rx: mpsc::Receiver<Command>) {
    // Note, that this serializes incoming requests. If you want to handle requests in parallel,
    // you have to spawn tasks once more.
    while let Some(command) = rx.recv().await {
        match command {
            Command::Sleep { secs, tx } => {
                let _ = tx.send(Ok(secs + 23));
            }
        }
    }
}

async fn compute_complex(
    Path(secs): Path<u64>,
    Extension(command_tx): Extension<mpsc::Sender<Command>>,
) -> Json<u64> {
    // Construct a oneshot channel to receive the result from the processor.
    let (tx, rx) = oneshot::channel();

    // Send the command carrying the payload as well as the result sender.
    let _ = command_tx.send(Command::Sleep { secs, tx }).await;

    // Wait for the result to be returned by the processor.
    let result = rx.await.unwrap().unwrap();

    Json(result)
}

async fn compute_simple(Path(secs): Path<u64>) -> Json<u64> {
    println!("asked to sleep for {secs} secs");

    // Spawn an async task on a separate thread to avoid blocking the async run-time.
    let result = tokio::task::spawn_blocking(move || {
        // Unlike tokio::time::sleep, this one blocks the current thread.
        std::thread::sleep(std::time::Duration::from_secs(secs));
        secs + 42
    })
    .await
    .unwrap();

    println!("returned after {secs} secs");

    Json(result)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (tx, rx) = mpsc::channel(128);

    let app = axum::Router::new()
        .route("/compute/simple/:secs", get(compute_simple))
        .route("/compute/complex/:secs", get(compute_complex))
        .layer(Extension(tx));

    let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 3000));

    let server = tokio::task::spawn(async move {
        axum::Server::bind(&addr)
            .serve(app.into_make_service())
            .await
            .unwrap();
    });

    let processor = tokio::task::spawn(async move {
        process_compute_request(rx).await;
    });

    let (_, _) = tokio::join!(server, processor);

    Ok(())
}

Run

Start the server with

cargo run --bin offloading_computation