use axum::{ extract::ws::{Message, WebSocket}, }; use futures::{SinkExt, StreamExt}; use shared::{ControlMsg, PeerInfo, UserId}; use std::net::SocketAddr; use tracing::{info, error}; use crate::state::{AppState, Peer, Room}; pub async fn handle_socket(mut socket: WebSocket, _addr: SocketAddr, state: AppState) { let (mut sender, mut receiver) = socket.split(); let mut user_id: Option = None; let mut room_id: Option = None; // Authentication / Join Phase while let Some(msg) = receiver.next().await { match msg { Ok(Message::Text(text)) => { if let Ok(ControlMsg::Join { room_code, display_name }) = serde_json::from_str(&text) { let uid = rand::random::(); info!("User {} ({}) joining room {}", uid, display_name, room_code); let room = state.rooms.entry(room_code.clone()) .or_insert_with(|| Room::new(room_code.clone())); // Gather existing peers let peers_list: Vec = room.peers.iter().map(|p| { let k = *p.key(); let v = p.value(); PeerInfo { user_id: k, display_name: v.display_name.clone(), } }).collect(); // Notify others let _ = room.tx.send(ControlMsg::PeerJoined { user_id: uid, display_name: display_name.clone(), }); // Add self to room room.peers.insert(uid, Peer { id: uid, display_name: display_name.clone(), addr: None, }); user_id = Some(uid); room_id = Some(room_code.clone()); // Send Joined response let resp = ControlMsg::Joined { self_id: uid, room_code: room_code.clone(), peers: peers_list, }; if let Err(e) = sender.send(Message::Text(serde_json::to_string(&resp).unwrap().into())).await { error!("Failed to send Joined response: {}", e); return; } break; } } Ok(Message::Close(_)) => return, _ => {} } } if user_id.is_none() { return; } let uid = user_id.unwrap(); let rid = room_id.unwrap(); // Subscribe to room broadcasts let mut rx = { let room = state.rooms.get(&rid).unwrap(); room.tx.subscribe() }; // Main Loop loop { tokio::select! { msg = receiver.next() => { match msg { Some(Ok(Message::Text(text))) => { if let Ok(control) = serde_json::from_str::(&text) { match control { ControlMsg::UpdateStream { .. } => { // Broadcast to room if let Some(room) = state.rooms.get(&rid) { let _ = room.tx.send(control); } } _ => {} } } } Some(Ok(Message::Close(_))) => break, Some(Err(_)) => break, None => break, _ => {} } } Ok(msg) = rx.recv() => { // Forward broadcast to client let _ = sender.send(Message::Text(serde_json::to_string(&msg).unwrap().into())).await; } } } // Cleanup if let Some(room) = state.rooms.get(&rid) { if let Some((_, peer)) = room.peers.remove(&uid) { // Clean up address mapping if present if let Some(addr) = peer.addr { state.peers_by_addr.remove(&addr); } } let _ = room.tx.send(ControlMsg::PeerLeft { user_id: uid }); } info!("User {} left room {}", uid, rid); }