import { EventEmitter } from 'events'; import * as dgram from 'dgram'; import WebSocket from 'ws'; import { BrowserWindow } from 'electron'; // Constants const SERVER_UDP_PORT = 4000; // Packet Header Structure (24 bytes) const HEADER_SIZE = 24; export enum MediaType { Audio = 0, Video = 1, Screen = 2, } // Token Bucket Pacer Constants const PACER_RATE_BYTES_PER_MS = 1500; // ~12 Mbps limit (Targeting 8-10 Mbps for 1080p60) const PACER_BUCKET_SIZE_BYTES = 15000; // Allow 10 packets burst (Instant Keyframe start) const MAX_PAYLOAD = 1200; // Reduced from 1400 to be safe with MTU export class NetworkManager extends EventEmitter { private ws: WebSocket | null = null; private udp: dgram.Socket | null = null; private userId: number = 0; private roomCode: string = ''; private videoSeq: number = 0; private audioSeq: number = 0; private screenSeq = 0; private mainWindow: BrowserWindow; private serverUdpHost: string = '127.0.0.1'; // Pacing private udpQueue: Buffer[] = []; private pacerTokens: number = PACER_BUCKET_SIZE_BYTES; private lastPacerUpdate: number = Date.now(); private pacerInterval: NodeJS.Timeout | null = null; constructor(mainWindow: BrowserWindow) { super(); this.mainWindow = mainWindow; this.startPacer(); } private startPacer() { this.pacerInterval = setInterval(() => { if (!this.udp) return; const now = Date.now(); const elapsed = now - this.lastPacerUpdate; this.lastPacerUpdate = now; // Refill tokens this.pacerTokens += elapsed * PACER_RATE_BYTES_PER_MS; if (this.pacerTokens > PACER_BUCKET_SIZE_BYTES) { this.pacerTokens = PACER_BUCKET_SIZE_BYTES; } // Drain queue while (this.udpQueue.length > 0) { const packet = this.udpQueue[0]; if (this.pacerTokens >= packet.length) { this.pacerTokens -= packet.length; this.udpQueue.shift(); this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => { if (err) console.error('UDP Send Error', err); }); } else { break; // Not enough tokens, wait for next tick } } }, 2); // Check every 2ms } async connect(serverUrl: string, roomCode: string, displayName: string): Promise { this.roomCode = roomCode; // Store for UDP handshake return new Promise((resolve, reject) => { // Determine Host and Protocol let host = serverUrl.trim().replace(/^wss?:\/\//, '').replace(/\/$/, ''); this.serverUdpHost = host.split(':')[0]; // Hostname only for UDP (strip port if present) // Auto-detect protocol: localhost/IP uses ws://, domains use wss:// (HTTPS) const isLocal = host.includes('localhost') || host.includes('127.0.0.1') || host.startsWith('192.168.') || host.startsWith('10.'); const protocol = isLocal ? 'ws' : 'wss'; const wsUrl = `${protocol}://${host}/ws`; console.log(`[Network] Connecting to WS: ${wsUrl}, UDP Host: ${this.serverUdpHost}`); this.ws = new WebSocket(`${wsUrl}?room=${roomCode}&name=${displayName}`); this.ws.on('open', () => { console.log('WS Connected'); const joinMsg = { type: 'Join', data: { room_code: roomCode, display_name: displayName } }; this.ws?.send(JSON.stringify(joinMsg)); console.log('Sent Join:', joinMsg); }); this.ws.on('message', (data: WebSocket.Data) => { try { const msg = JSON.parse(data.toString()); this.handleWsMessage(msg, resolve, reject); } catch (e) { console.error('Failed to parse WS msg', e); } }); this.ws.on('error', (err) => { console.error('WS Error', err); reject(err); }); this.ws.on('close', () => { console.log('WS Closed'); this.emit('disconnected'); if (this.udp) this.udp.close(); }); }); } handleWsMessage(msg: any, resolve: any, reject: any) { console.log('Received WS Message:', msg.type); switch (msg.type) { case 'Joined': console.log('Joined Room:', msg.data); this.userId = msg.data.self_id; this.setupUdp(); resolve(msg.data); break; case 'PeerJoined': this.safeSend('peer-joined', msg.data); break; case 'PeerLeft': this.safeSend('peer-left', msg.data); break; case 'ChatMessage': this.safeSend('chat-message', msg.data); break; case 'UpdateStream': this.safeSend('peer-stream-update', msg.data); break; case 'Error': console.error('WS Error Msg:', msg.data); reject(msg.data); break; } } sendChat(message: string, displayName: string) { if (!this.ws) return; const chatMsg = { type: 'ChatMessage', data: { user_id: this.userId, display_name: displayName, message, timestamp: Date.now() } }; this.ws.send(JSON.stringify(chatMsg)); } updateStream(active: boolean, mediaType: MediaType) { if (!this.ws) return; const mediaTypeStr = mediaType === MediaType.Audio ? 'Audio' : mediaType === MediaType.Video ? 'Video' : 'Screen'; const msg = { type: 'UpdateStream', data: { user_id: this.userId, stream_id: 0, active, media_type: mediaTypeStr } }; this.ws.send(JSON.stringify(msg)); } private heartbeatInterval: NodeJS.Timeout | null = null; setupUdp() { this.udp = dgram.createSocket('udp4'); this.udp.on('listening', () => { const addr = this.udp?.address(); console.log(`UDP Listening on ${addr?.port}`); this.startHeartbeat(); }); this.udp.on('message', (msg, rinfo) => { // console.log(`[UDP] Msg from ${rinfo.address}:${rinfo.port} - ${msg.length} bytes`); this.handleUdpMessage(msg); }); this.udp.bind(0); // Bind random port } handleUdpMessage(msg: Buffer) { if (msg.length < HEADER_SIZE) return; const version = msg.readUInt8(0); const mediaType = msg.readUInt8(1); const userId = msg.readUInt32LE(2); const seq = msg.readUInt32LE(6); const timestamp = Number(msg.readBigUInt64LE(10)); const fragIdx = msg.readUInt16LE(18); const fragCnt = msg.readUInt16LE(20); const flags = msg.readUInt16LE(22); const isKeyFrame = (flags & 1) !== 0; const payload = msg.subarray(HEADER_SIZE); if (mediaType === MediaType.Audio) { // Audio can be fragmented now (PCM) this.safeSend('video-chunk', { // Use 'video-chunk' handler in renderer for reassembly? // Wait, App.tsx has separate 'audio-chunk' which doesn't reassemble. // We need to reassemble here or change App.tsx. // Reassembling in main process is easier or reusing video logic. // Let's use 'audio-chunk' but we need to pass frag info? // No, App.tsx 'audio-chunk' handler just decodes immediately. // It expects a full frame. // We MUST reassemble here or update App.tsx. // Updating App.tsx to use the reassembler for Audio is cleaner. // But 'video-chunk' in App.tsx calls 'handleIncomingVideoFragment' which uses 'MediaEngine.decodeVideoChunk'. // Option: Treat Audio as "Video" for transport, but with streamType='audio'? // MediaType.Audio is distinct. // Let's implement reassembly here in NetworkManager? // Or update App.tsx to use 'handleIncomingVideoFragment' for audio too? // 'handleIncomingVideoFragment' does `decodeVideoChunk`. // Let's change App.tsx to have `handleIncomingAudioFragment`? // Or just reassemble here. UDP reassembly in Node.js is fine. // ACtually, App.tsx's `handleIncomingVideoFragment` is nice. // Let's emit 'audio-fragment' and add a handler in App.tsx. user_id: userId, data: payload, seq: this.audioSeq, // Wait, seq is in packet ts: timestamp, fidx: fragIdx, fcnt: fragCnt, isKeyFrame, streamType: 'audio' // We can't use 'video-chunk' channel because it calls decodeVideoChunk. }); // Actually, let's just send it to 'audio-fragment' channel this.safeSend('audio-fragment', { user_id: userId, data: payload, seq: seq, // We need valid seq from packet ts: timestamp, fidx: fragIdx, fcnt: fragCnt, isKeyFrame }); } else if (mediaType === MediaType.Video || mediaType === MediaType.Screen) { // Differentiate based on MediaType const streamType = mediaType === MediaType.Screen ? 'screen' : 'video'; if (mediaType === MediaType.Screen && fragIdx === 0) { console.log(`[Network] RX Screen Chunk User=${userId} Seq=${seq}`); } this.safeSend('video-chunk', { user_id: userId, data: payload, seq, ts: timestamp, fidx: fragIdx, fcnt: fragCnt, isKeyFrame, streamType // Pass this to renderer }); } } private safeSend(channel: string, data: any) { if (this.mainWindow && !this.mainWindow.isDestroyed() && this.mainWindow.webContents) { try { this.mainWindow.webContents.send(channel, data); } catch (e) { console.error(`Failed to send ${channel} to renderer:`, e); } } } // --- New Encode Methods --- sendEncodedVideoChunk(chunk: any, isKeyFrame: boolean, timestamp: number, streamType: 'video' | 'screen' = 'video') { if (!this.udp) return; const MAX_PAYLOAD = 1400; const totalSize = chunk.length; // Use generic videoSeq for both? Or separate? // Best to separate to avoid gap detection issues if one stream is idle. // But for now, let's share for simplicity or use screenSeq if screen. // Actually, let's use separate seq if possible, but I only have videoSeq. // Let's use videoSeq for both for now, assuming the receiver tracks them separately or doesn't care about gaps across types. // Better: Use a map or separate counters. const seq = streamType === 'screen' ? this.screenSeq++ : this.videoSeq++; const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD); for (let i = 0; i < fragmentCount; i++) { const start = i * MAX_PAYLOAD; const end = Math.min(start + MAX_PAYLOAD, totalSize); const slice = chunk.slice(start, end); // Header (22 bytes) const header = Buffer.alloc(HEADER_SIZE); header.writeUInt8(1, 0); // Version const mType = streamType === 'screen' ? MediaType.Screen : MediaType.Video; header.writeUInt8(mType, 1); header.writeUInt32LE(this.userId, 2); header.writeUInt32LE(seq, 6); header.writeBigUInt64LE(BigInt(timestamp), 10); header.writeUInt16LE(i, 18); // Frag Idx (u16) header.writeUInt16LE(fragmentCount, 20); // Frag Cnt (u16) let flags = 0; if (isKeyFrame) flags |= 1; header.writeUInt16LE(flags, 22); const packet = Buffer.concat([header, slice]); // Enqueue for pacing this.udpQueue.push(packet); } } sendEncodedAudioChunk(chunk: Uint8Array, timestamp: number) { if (!this.udp) { console.warn('[Network] UDP Socket not ready for Audio'); return; } const totalSize = chunk.length; const MAX_PAYLOAD = 1400; // Safe MTU // PCM packets (approx 2KB) need fragmentation. // We use the same logic as video but with Audio MediaType. const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD); // Log randomly to avoid spam but confirm activity if (Math.random() < 0.05) console.log(`[Network] Sending Audio Chunk size=${totalSize} frags=${fragmentCount}`); for (let i = 0; i < fragmentCount; i++) { const start = i * MAX_PAYLOAD; const end = Math.min(start + MAX_PAYLOAD, totalSize); const slice = chunk.slice(start, end); const header = Buffer.alloc(HEADER_SIZE); header.writeUInt8(1, 0); // Version header.writeUInt8(MediaType.Audio, 1); header.writeUInt32LE(this.userId, 2); header.writeUInt32LE(this.audioSeq, 6); // Same seq for all fragments header.writeBigUInt64LE(BigInt(Math.floor(timestamp)), 10); header.writeUInt16LE(i, 18); // Frag idx header.writeUInt16LE(fragmentCount, 20); // Frag cnt header.writeUInt16LE(1, 22); // Flags (1=Keyframe, audio is always key) const packet = Buffer.concat([header, Buffer.from(slice)]); this.udpQueue.push(packet); } this.audioSeq++; } startHeartbeat() { if (this.heartbeatInterval) clearInterval(this.heartbeatInterval); // Send immediately this.sendHandshake(); // Send 3 bursts to ensure traversal setTimeout(() => this.sendHandshake(), 500); setTimeout(() => this.sendHandshake(), 1000); // Keep-alive every 3 seconds this.heartbeatInterval = setInterval(() => { this.sendHandshake(); }, 3000); } sendHandshake() { if (!this.udp || !this.userId || !this.roomCode) { // console.error('[UDP] Cannot send handshake: missing udp, userId, or roomCode'); return; } const roomCodeBytes = Buffer.from(this.roomCode, 'utf-8'); const payloadLen = 4 + 8 + roomCodeBytes.length; const payload = Buffer.alloc(payloadLen); payload.writeUInt32LE(this.userId, 0); // user_id payload.writeBigUInt64LE(BigInt(roomCodeBytes.length), 4); // string length roomCodeBytes.copy(payload, 12); // room_code const header = Buffer.alloc(HEADER_SIZE); header.writeUInt8(1, 0); // Version header.writeUInt8(3, 1); // MediaType.Command = 3 header.writeUInt32LE(this.userId, 2); header.writeUInt32LE(0, 6); // Sequence header.writeBigUInt64LE(BigInt(Date.now()), 10); header.writeUInt16LE(0, 18); // Frag idx header.writeUInt16LE(1, 20); // Frag cnt header.writeUInt16LE(0, 22); // Flags const packet = Buffer.concat([header, payload]); // console.log(`[UDP] Sending Handshake: userId=${this.userId}, room=${this.roomCode}, ${packet.length} bytes to ${this.serverUdpHost}:${SERVER_UDP_PORT}`); this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => { if (err) console.error('UDP Handshake Send Error', err); }); } disconnect() { if (this.pacerInterval) { clearInterval(this.pacerInterval); this.pacerInterval = null; } if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } if (this.ws) this.ws.close(); if (this.udp) this.udp.close(); this.ws = null; this.udp = null; } }