From c644aed661dc071b294fdbfbc3d6986d2eaddb17 Mon Sep 17 00:00:00 2001 From: srtk Date: Sun, 8 Feb 2026 15:40:45 +0530 Subject: [PATCH] Initial commit: Rust WebSocket/UDP signaling and media relay server --- .gitignore | 4 ++ Cargo.toml | 18 +++++++ src/handlers.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 55 +++++++++++++++++++++ src/state.rs | 52 +++++++++++++++++++ src/udp.rs | 115 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 373 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/handlers.rs create mode 100644 src/main.rs create mode 100644 src/state.rs create mode 100644 src/udp.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dc10fb5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +target/ +.DS_Store +*.log +.env diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1087c46 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.95" +axum = { version = "0.8.1", features = ["ws"] } +dashmap = "6.1.0" +rand = "0.9.0" +serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.138" +tokio = { version = "1.43.0", features = ["full"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +bincode = "1.3.3" +futures = "0.3.31" +shared = { path = "../shared" } diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..85a7f3c --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,129 @@ +use axum::{ + extract::ws::{Message, WebSocket}, +}; +use futures::{SinkExt, StreamExt}; +use shared::{ControlMsg, PeerInfo, UserId}; +use std::net::SocketAddr; +use tracing::{info, error}; + +use crate::state::{AppState, Peer, Room}; + +pub async fn handle_socket(mut socket: WebSocket, _addr: SocketAddr, state: AppState) { + let (mut sender, mut receiver) = socket.split(); + let mut user_id: Option = None; + let mut room_id: Option = None; + + // Authentication / Join Phase + while let Some(msg) = receiver.next().await { + match msg { + Ok(Message::Text(text)) => { + if let Ok(ControlMsg::Join { room_code, display_name }) = serde_json::from_str(&text) { + let uid = rand::random::(); + + info!("User {} ({}) joining room {}", uid, display_name, room_code); + + let room = state.rooms.entry(room_code.clone()) + .or_insert_with(|| Room::new(room_code.clone())); + + // Gather existing peers + let peers_list: Vec = room.peers.iter().map(|p| { + let k = *p.key(); + let v = p.value(); + PeerInfo { + user_id: k, + display_name: v.display_name.clone(), + } + }).collect(); + + // Notify others + let _ = room.tx.send(ControlMsg::PeerJoined { + user_id: uid, + display_name: display_name.clone(), + }); + + // Add self to room + room.peers.insert(uid, Peer { + id: uid, + display_name: display_name.clone(), + addr: None, + }); + + user_id = Some(uid); + room_id = Some(room_code.clone()); + + // Send Joined response + let resp = ControlMsg::Joined { + self_id: uid, + room_code: room_code.clone(), + peers: peers_list, + }; + if let Err(e) = sender.send(Message::Text(serde_json::to_string(&resp).unwrap().into())).await { + error!("Failed to send Joined response: {}", e); + return; + } + + break; + } + } + Ok(Message::Close(_)) => return, + _ => {} + } + } + + if user_id.is_none() { + return; + } + + let uid = user_id.unwrap(); + let rid = room_id.unwrap(); + + // Subscribe to room broadcasts + let mut rx = { + let room = state.rooms.get(&rid).unwrap(); + room.tx.subscribe() + }; + + // Main Loop + loop { + tokio::select! { + msg = receiver.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + if let Ok(control) = serde_json::from_str::(&text) { + match control { + ControlMsg::UpdateStream { .. } => { + // Broadcast to room + if let Some(room) = state.rooms.get(&rid) { + let _ = room.tx.send(control); + } + } + _ => {} + } + } + } + Some(Ok(Message::Close(_))) => break, + Some(Err(_)) => break, + None => break, + _ => {} + } + } + Ok(msg) = rx.recv() => { + // Forward broadcast to client + let _ = sender.send(Message::Text(serde_json::to_string(&msg).unwrap().into())).await; + } + } + } + + // Cleanup + if let Some(room) = state.rooms.get(&rid) { + if let Some((_, peer)) = room.peers.remove(&uid) { + // Clean up address mapping if present + if let Some(addr) = peer.addr { + state.peers_by_addr.remove(&addr); + } + } + let _ = room.tx.send(ControlMsg::PeerLeft { user_id: uid }); + } + + info!("User {} left room {}", uid, rid); +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..84fd847 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,55 @@ +use axum::{ + extract::{State, Request, ConnectInfo}, + extract::ws::{WebSocketUpgrade, WebSocket, Message}, // Explicit import + response::IntoResponse, + routing::get, + Router, +}; +use std::net::SocketAddr; +use tokio::net::UdpSocket; +use tracing::{info, error}; + +mod state; +mod handlers; +mod udp; +use state::AppState; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let state = AppState::new(); + + // Spawn UDP Server + let udp_state = state.clone(); + tokio::spawn(async move { + if let Err(e) = udp::run_udp_server(udp_state).await { + error!("UDP server error: {}", e); + } + }); + + // HTTP/WS Server + let app = Router::new() + .route("/ws", get(ws_handler)) + .with_state(state); + + let addr = SocketAddr::from(([0, 0, 0, 0], 5000)); + info!("HTTP/WS Server listening on {}", addr); + + let listener = tokio::net::TcpListener::bind(addr).await?; + // Axum 0.7+ serve with connect info + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ).await?; + + Ok(()) +} + +async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State, + ConnectInfo(addr): ConnectInfo, +) -> impl IntoResponse { + ws.on_upgrade(move |socket| handlers::handle_socket(socket, addr, state)) +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..205a453 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,52 @@ +use dashmap::DashMap; +use shared::{ControlMsg, UserId}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::broadcast; + +#[derive(Debug, Clone)] +pub struct AppState { + pub rooms: Arc>, + pub peers_by_addr: Arc>, +} + +#[derive(Debug, Clone)] +pub struct PeerLocation { + pub room_id: String, + pub user_id: UserId, +} + +#[derive(Debug, Clone)] +pub struct Room { + pub id: String, + pub peers: DashMap, + // Channel for broadcasting control messages within the room + pub tx: broadcast::Sender, +} + +#[derive(Debug, Clone)] +pub struct Peer { + pub id: UserId, + pub display_name: String, + pub addr: Option, // UDP address +} + +impl AppState { + pub fn new() -> Self { + Self { + rooms: Arc::new(DashMap::new()), + peers_by_addr: Arc::new(DashMap::new()), + } + } +} + +impl Room { + pub fn new(id: String) -> Self { + let (tx, _) = broadcast::channel(100); + Self { + id, + peers: DashMap::new(), + tx, + } + } +} diff --git a/src/udp.rs b/src/udp.rs new file mode 100644 index 0000000..b85599f --- /dev/null +++ b/src/udp.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; +use tokio::net::UdpSocket; +use tracing::{info, warn, error}; +use shared::{PacketHeader, MediaType, UserId}; +use bincode; +use crate::state::{AppState, PeerLocation}; + +#[derive(serde::Serialize, serde::Deserialize)] +struct Handshake { + user_id: UserId, + room_code: String, +} + +pub async fn run_udp_server(state: AppState) -> anyhow::Result<()> { + let socket = Arc::new(UdpSocket::bind("0.0.0.0:4000").await?); + info!("UDP Server listening on 0.0.0.0:4000"); + + let mut buf = [0u8; 65535]; + + loop { + match socket.recv_from(&mut buf).await { + Ok((len, addr)) => { + let data = &buf[..len]; + + info!("UDP Recv: {} bytes from {}", len, addr); // Log every packet for debug + + // Manually parse header (22 bytes) to match client's raw byte layout: + // Byte 0: version (u8) + // Byte 1: media_type (u8) + // Bytes 2-5: user_id (u32 LE) + // Bytes 6-9: sequence (u32 LE) + // Bytes 10-17: timestamp (u64 LE) + // Byte 18: fragment_index (u8) + // Byte 19: fragment_count (u8) + // Bytes 20-21: flags (u16 LE) + + if data.len() < 22 { + warn!("UDP packet too small: {} bytes from {}", data.len(), addr); + continue; + } + + let version = data[0]; + let media_type_byte = data[1]; + let user_id = u32::from_le_bytes([data[2], data[3], data[4], data[5]]); + let _sequence = u32::from_le_bytes([data[6], data[7], data[8], data[9]]); + let _timestamp = u64::from_le_bytes([data[10], data[11], data[12], data[13], data[14], data[15], data[16], data[17]]); + let _fragment_index = data[18]; + let _fragment_count = data[19]; + let _flags = u16::from_le_bytes([data[20], data[21]]); + + let media_type = match media_type_byte { + 0 => MediaType::Audio, + 1 => MediaType::Video, + 2 => MediaType::Screen, + 3 => MediaType::Command, + _ => MediaType::Unknown, + }; + + let payload = &data[22..]; + + info!("UDP: v{} type={:?} user={} len={}", version, media_type, user_id, data.len()); + + match media_type { + MediaType::Command => { + // Handshake + if let Ok(handshake) = bincode::deserialize::(payload) { + // Validate User in Room + if let Some(room) = state.rooms.get(&handshake.room_code) { + if room.peers.contains_key(&handshake.user_id) { + // Update Address + state.peers_by_addr.insert(addr, PeerLocation { + room_id: handshake.room_code.clone(), + user_id: handshake.user_id, + }); + + // Update Peer in Room + if let Some(mut peer) = room.peers.get_mut(&handshake.user_id) { + peer.addr = Some(addr); + info!( + "UDP Handshake: User {} at {}, Room {}", + handshake.user_id, addr, handshake.room_code + ); + } + } + } + } + } + _ => { + // Media Packet: Relay + if let Some(loc) = state.peers_by_addr.get(&addr) { + let room_id = &loc.room_id; + let sender_id: UserId = loc.user_id; + + // Forward to all valid peers in room + if let Some(room) = state.rooms.get(room_id) { + for peer in room.peers.iter() { + // Don't echo back to sender + if *peer.key() != sender_id { + if let Some(target_addr) = peer.value().addr { + // Send + let _ = socket.send_to(data, target_addr).await; + } + } + } + } + } + } + } + } + Err(e) => { + error!("UDP recv error: {}", e); + } + } + } +}