use webassembly with the server acting only as an intermediate

This commit is contained in:
Sebastian Hugentobler 2023-03-09 08:35:24 +01:00
parent 9aa1130c35
commit 7d0ef62c42
Signed by: shu
GPG key ID: BB32CF3CA052C2F0
29 changed files with 850 additions and 325 deletions

56
woweb/src/main.rs Normal file
View file

@ -0,0 +1,56 @@
use axum::routing::get;
use axum::Router;
use axum_extra::routing::SpaRouter;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use dist_text::text::Text;
mod ws;
struct AppState {
doc: RwLock<Text>,
tx: broadcast::Sender<String>,
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "woweb_poc=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let (tx, _rx) = broadcast::channel(100);
let app_state = Arc::new(AppState {
doc: RwLock::new(Text::default()),
tx,
});
let app = Router::new()
.merge(SpaRouter::new("/", "assets").index_file("index.html"))
.route("/ws", get(ws::route))
.with_state(app_state)
.layer(TraceLayer::new_for_http());
let args: Vec<String> = env::args().collect();
let host = if args.len() > 1 {
&args[1]
} else {
"127.0.0.1:3000"
};
let addr = host
.parse()
.unwrap_or_else(|_| SocketAddr::from(([127, 0, 0, 1], 3000)));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}

89
woweb/src/ws.rs Normal file
View file

@ -0,0 +1,89 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
};
use futures::{sink::SinkExt, stream::StreamExt};
use dist_text::crdts::list::Op;
use crate::AppState;
#[derive(Serialize, Deserialize)]
struct NewState {
client: String,
doc: String,
ops: Vec<Op<u16, String>>,
}
#[derive(Clone, Serialize, Deserialize)]
pub(crate) struct Ops {
pub(crate) client: String,
ops: Vec<Op<u16, String>>,
}
pub(crate) async fn route(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
async fn handle_socket(stream: WebSocket, state: Arc<AppState>) {
let (mut sender, mut receiver) = stream.split();
{
let doc = state.doc.read().await;
let doc: String = doc.to_string();
let new_state = NewState {
client: String::from("0000"),
doc: doc.to_string(),
ops: Vec::new(),
};
let payload = serde_json::to_string(&new_state).unwrap();
let _ = sender.send(Message::Text(payload)).await;
}
let mut rx = state.tx.subscribe();
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
});
let tx = state.tx.clone();
let state = state.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
if let Ok(new_ops) = serde_json::from_str::<Ops>(&text) {
tracing::debug!("update from {}", new_ops.client.clone());
let doc = state.doc.read().await;
let mut doc = doc.to_owned();
doc.apply_ops(new_ops.ops.to_owned());
let new_state = NewState {
client: new_ops.client,
doc: doc.to_string(),
ops: new_ops.ops,
};
let payload = serde_json::to_string(&new_state).unwrap();
let _ = tx.send(payload);
}
}
});
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
}
;
}