From bb3fc6c891507ae1901843b5f2b84b74444c9fd0 Mon Sep 17 00:00:00 2001 From: srtk Date: Sun, 8 Feb 2026 23:26:14 +0530 Subject: [PATCH] Fix VideoTile bug and improve screen sharing --- Cargo.toml | 2 +- README.md | 71 +++++++++++++++++++++++++++++++++++++++++ shared/.gitignore | 1 + shared/Cargo.toml | 8 +++++ shared/src/lib.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 2 +- src/udp.rs | 42 +++++++++++++++---------- 7 files changed, 187 insertions(+), 19 deletions(-) create mode 100644 README.md create mode 100644 shared/.gitignore create mode 100644 shared/Cargo.toml create mode 100644 shared/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1087c46..7285a7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,4 @@ tracing = "0.1.41" tracing-subscriber = "0.3.19" bincode = "1.3.3" futures = "0.3.31" -shared = { path = "../shared" } +shared = { path = "./shared" } diff --git a/README.md b/README.md new file mode 100644 index 0000000..4561e13 --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# Meet Server + +The high-performance signaling and media relay server for the Meet application. Built with **Rust**, **Axum**, and **Tokio**. + +## Features +- **Signaling**: WebSocket-based room management and peer discovery. +- **Media Relay**: Custom UDP protocol for low-latency video and screen sharing. +- **Architecture**: Asynchronous, localized state management using `DashMap`. + +## Prerequisites +- **Rust**: Latest stable version. Install via [rustup.rs](https://rustup.rs). +- **Build Tools**: `build-essential` (Ubuntu) or `Development Tools` group (Fedora/RHEL). + +## Installation + +1. **Clone the Repository**: + ```bash + git clone + cd server + ``` + +2. **Verify Shared Crate**: + Ensure the `shared` directory exists in the root. This contains protocol definitions used by both client (logic port) and server. + ```bash + ls -F shared/ + ``` + +3. **Build**: + ```bash + cargo build --release + ``` + The binary will be located at `target/release/server`. + +## Running the Server + +### Ports +- **TCP/WebSocket**: 6000 (Bind: `0.0.0.0:6000`) +- **UDP (Media)**: 4000 (Bind: `0.0.0.0:4000`) + +### Local Development +```bash +cargo run +``` +Runs on `0.0.0.0:6000` (HTTP/WS) and `0.0.0.0:4000` (UDP). + +### Production Deployment +1. **Run the Binary**: + ```bash + ./target/release/server + ``` +2. **Firewall**: + Ensure **TCP 6000** and **UDP 4000** are open. + ```bash + # Fedora/CentOS + sudo firewall-cmd --add-port=6000/tcp --permanent + sudo firewall-cmd --add-port=4000/udp --permanent + sudo firewall-cmd --reload + + # Ubuntu/Debian (UFW) + sudo ufw allow 6000/tcp + sudo ufw allow 4000/udp + ``` +3. **Systemd (Optional)**: + Create a service file `/etc/systemd/system/meet-server.service` to keep it running. + +## Project Structure +- `src/main.rs`: Entry point. Sets up Axum router and spawns the UDP listener. +- `src/handlers.rs`: WebSocket handlers for joining rooms, signaling, and chat. +- `src/udp.rs`: The core UDP packet handling loop. Manages media relaying. +- `src/state.rs`: Shared application state (Rooms, Peers). +- `shared/`: Local crate containing shared data structures (`Packet` headers, `MediaType` enums). diff --git a/shared/.gitignore b/shared/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/shared/.gitignore @@ -0,0 +1 @@ +/target diff --git a/shared/Cargo.toml b/shared/Cargo.toml new file mode 100644 index 0000000..c83b299 --- /dev/null +++ b/shared/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "shared" +version = "0.1.0" +edition = "2024" + +[dependencies] +bincode = "1.3.3" +serde = { version = "1.0.228", features = ["derive"] } diff --git a/shared/src/lib.rs b/shared/src/lib.rs new file mode 100644 index 0000000..4758cbb --- /dev/null +++ b/shared/src/lib.rs @@ -0,0 +1,80 @@ +use serde::{Deserialize, Serialize}; + +pub type UserId = u32; // Unique identifier for a user within a room +pub type StreamId = u16; // Typically mapped to User + Media Source + +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MediaType { + Audio = 0, + Video = 1, + Screen = 2, + Command = 3, // For handshake/keepalive + Unknown = 255, +} + +/// UDP Packet Header (fixed size binary struct) +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PacketHeader { + pub version: u8, // Protocol version (v1) + pub media_type: MediaType, + pub user_id: UserId, // Identifies the source user (was stream_id) + pub sequence: u32, // User-defined sequence number (for reordering/loss) + pub timestamp: u32, // RTP-like timestamp + pub fragment_index: u16, // If packet is fragmented (0 if not) + pub fragment_count: u16, // Total fragments (1 if not) + pub flags: u8, // Bitmask: 0x01 = Keyframe, etc. +} + +pub const FLAG_KEYFRAME: u8 = 0x01; + +/// Signaling Messages (WebSocket - JSON) +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type", content = "data")] +pub enum ControlMsg { + /// Client -> Server: Request to join a room + Join { + room_code: String, + display_name: String, + }, + /// Server -> Client: Join success + Joined { + self_id: UserId, + room_code: String, + peers: Vec, + }, + /// Server -> Client: New peer joined + PeerJoined { + user_id: UserId, + display_name: String, + }, + /// Server -> Client: Peer left + PeerLeft { + user_id: UserId, + }, + /// Client -> Server: Update stream status (e.g. camera on/off) + UpdateStream { + user_id: UserId, + stream_id: StreamId, + active: bool, + media_type: MediaType, + }, + /// Client <-> Server: Chat message + ChatMessage { + user_id: UserId, + display_name: String, + message: String, + timestamp: u64, + }, + /// General Error + Error { + message: String, + }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PeerInfo { + pub user_id: UserId, + pub display_name: String, + // Could include active streams etc. +} diff --git a/src/main.rs b/src/main.rs index 84fd847..50e5df6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> { .route("/ws", get(ws_handler)) .with_state(state); - let addr = SocketAddr::from(([0, 0, 0, 0], 5000)); + let addr = SocketAddr::from(([0, 0, 0, 0], 6000)); info!("HTTP/WS Server listening on {}", addr); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/src/udp.rs b/src/udp.rs index 0a83022..fe793db 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -21,6 +21,7 @@ pub async fn run_udp_server(state: AppState) -> anyhow::Result<()> { match socket.recv_from(&mut buf).await { Ok((len, addr)) => { let data = &buf[..len]; + info!("UDP RECV from {}: {} bytes", addr, len); // Manually parse header (22 bytes) to match client's raw byte layout: // Byte 0: version (u8) @@ -60,26 +61,31 @@ pub async fn run_udp_server(state: AppState) -> anyhow::Result<()> { match media_type { MediaType::Command => { // Handshake - if let Ok(handshake) = bincode::deserialize::(payload) { - // Validate User in Room - if let Some(room) = state.rooms.get(&handshake.room_code) { - if room.peers.contains_key(&handshake.user_id) { - // Update Address - state.peers_by_addr.insert(addr, PeerLocation { - room_id: handshake.room_code.clone(), - user_id: handshake.user_id, - }); - - // Update Peer in Room - if let Some(mut peer) = room.peers.get_mut(&handshake.user_id) { - peer.addr = Some(addr); - info!( - "UDP Handshake: User {} at {}, Room {}", - handshake.user_id, addr, handshake.room_code - ); + match bincode::deserialize::(payload) { + Ok(handshake) => { + // Validate User in Room + if let Some(room) = state.rooms.get(&handshake.room_code) { + if room.peers.contains_key(&handshake.user_id) { + // Update Address + state.peers_by_addr.insert(addr, PeerLocation { + room_id: handshake.room_code.clone(), + user_id: handshake.user_id, + }); + + // Update Peer in Room + if let Some(mut peer) = room.peers.get_mut(&handshake.user_id) { + peer.addr = Some(addr); + info!( + "UDP Handshake: User {} at {}, Room {}", + handshake.user_id, addr, handshake.room_code + ); + } } } } + Err(e) => { + warn!("Failed to deserialize Handshake from {}: {}", addr, e); + } } } _ => { @@ -100,6 +106,8 @@ pub async fn run_udp_server(state: AppState) -> anyhow::Result<()> { } } } + } else { + warn!("Dropping Relay Packet from unknown sender: {}", addr); } } }