diff --git a/scripts/test-network.js b/scripts/test-network.js deleted file mode 100644 index b4b99d2..0000000 --- a/scripts/test-network.js +++ /dev/null @@ -1,75 +0,0 @@ -const WebSocket = require('ws'); - -const SERVER = process.argv[2] || 'wss://meet.srtk.in/ws'; -const ROOM = 'diag-test'; -const NAME = 'diag-bot-' + Math.floor(Math.random() * 1000); - -console.log(`--- Network Diagnostic Tool ---`); -console.log(`Connecting to: ${SERVER}`); -console.log(`Room: ${ROOM}, Identity: ${NAME}`); - -const url = `${SERVER}?room=${ROOM}&name=${NAME}`; -const ws = new WebSocket(url); - -let startTime = Date.now(); -let textPings = 0; -let binaryPings = 0; -let textAcks = 0; -let binaryAcks = 0; - -ws.on('open', () => { - console.log('✅ WebSocket Connected'); - - // 1. Join - ws.send(JSON.stringify({ - type: 'Join', - data: { room_code: ROOM, display_name: NAME } - })); - - // 2. Start Test Cycle - setInterval(() => { - // Send Text Ping - ws.send(JSON.stringify({ type: 'Heartbeat' })); - textPings++; - - // Send Binary Ping (1KB) - const dummy = Buffer.alloc(1024); - dummy[0] = 0xAA; // Diagnostic marker - ws.send(dummy); - binaryPings++; - - console.log(`Stats: TX[Text:${textPings}, Bin:${binaryPings}] RX[Text:${textAcks}, Bin:${binaryAcks}]`); - }, 2000); -}); - -ws.on('message', (data, isBinary) => { - if (isBinary) { - binaryAcks++; - } else { - try { - const msg = JSON.parse(data.toString()); - if (msg.type === 'Heartbeat') { - textAcks++; - } else if (msg.type === 'Joined') { - console.log('✅ Joined successfully! Self ID:', msg.data.self_id); - } - } catch (e) { } - } -}); - -ws.on('error', (err) => { - console.error('❌ WebSocket Error:', err.message); -}); - -ws.on('close', (code, reason) => { - console.log(`⚠️ Connection Closed (Code: ${code}, Reason: ${reason})`); - process.exit(0); -}); - -setTimeout(() => { - console.log('--- Test Finished ---'); - console.log(`Summary:`); - console.log(`Text Pings: ${textPings}, Acks: ${textAcks} (${Math.round(textAcks / textPings * 100)}%)`); - console.log(`Binary Pings: ${binaryPings}, Acks: ${binaryAcks} (${Math.round(binaryAcks / binaryPings * 100)}%)`); - process.exit(0); -}, 20000); diff --git a/src/main/network.ts b/src/main/network.ts index 41d170c..af0886f 100644 --- a/src/main/network.ts +++ b/src/main/network.ts @@ -1,9 +1,10 @@ import { EventEmitter } from 'events'; - +import * as dgram from 'dgram'; import WebSocket from 'ws'; import { BrowserWindow } from 'electron'; // Constants +const SERVER_UDP_PORT = 4000; // Packet Header Structure (24 bytes) const HEADER_SIZE = 24; @@ -14,27 +15,70 @@ export enum MediaType { Screen = 2, } +// Token Bucket Pacer Constants +const PACER_RATE_BYTES_PER_MS = 1500; // ~12 Mbps limit (Targeting 8-10 Mbps for 1080p60) +const PACER_BUCKET_SIZE_BYTES = 15000; // Allow 10 packets burst (Instant Keyframe start) +const MAX_PAYLOAD = 1200; // Reduced from 1400 to be safe with MTU + export class NetworkManager extends EventEmitter { private ws: WebSocket | null = null; + private udp: dgram.Socket | null = null; private userId: number = 0; + private roomCode: string = ''; private videoSeq: number = 0; private audioSeq: number = 0; private screenSeq = 0; private mainWindow: BrowserWindow; - private pingInterval: NodeJS.Timeout | null = null; - private lastBinaryLog: number = 0; - private binaryCount: number = 0; - private dropCount: number = 0; + private serverUdpHost: string = '127.0.0.1'; + + // Pacing + private udpQueue: Buffer[] = []; + private pacerTokens: number = PACER_BUCKET_SIZE_BYTES; + private lastPacerUpdate: number = Date.now(); + private pacerInterval: NodeJS.Timeout | null = null; constructor(mainWindow: BrowserWindow) { super(); this.mainWindow = mainWindow; + this.startPacer(); + } + + private startPacer() { + this.pacerInterval = setInterval(() => { + if (!this.udp) return; + + const now = Date.now(); + const elapsed = now - this.lastPacerUpdate; + this.lastPacerUpdate = now; + + // Refill tokens + this.pacerTokens += elapsed * PACER_RATE_BYTES_PER_MS; + if (this.pacerTokens > PACER_BUCKET_SIZE_BYTES) { + this.pacerTokens = PACER_BUCKET_SIZE_BYTES; + } + + // Drain queue + while (this.udpQueue.length > 0) { + const packet = this.udpQueue[0]; + if (this.pacerTokens >= packet.length) { + this.pacerTokens -= packet.length; + this.udpQueue.shift(); + this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => { + if (err) console.error('UDP Send Error', err); + }); + } else { + break; // Not enough tokens, wait for next tick + } + } + }, 2); // Check every 2ms } async connect(serverUrl: string, roomCode: string, displayName: string): Promise { + this.roomCode = roomCode; // Store for UDP handshake return new Promise((resolve, reject) => { // Determine Host and Protocol let host = serverUrl.trim().replace(/^wss?:\/\//, '').replace(/\/$/, ''); + this.serverUdpHost = host.split(':')[0]; // Hostname only for UDP (strip port if present) // Auto-detect protocol: localhost/IP uses ws://, domains use wss:// (HTTPS) const isLocal = host.includes('localhost') || @@ -44,12 +88,12 @@ export class NetworkManager extends EventEmitter { const protocol = isLocal ? 'ws' : 'wss'; const wsUrl = `${protocol}://${host}/ws`; - console.log(`[Network] Connecting to WS: ${wsUrl}`); + console.log(`[Network] Connecting to WS: ${wsUrl}, UDP Host: ${this.serverUdpHost}`); this.ws = new WebSocket(`${wsUrl}?room=${roomCode}&name=${displayName}`); this.ws.on('open', () => { - console.log('[Network] WebSocket Connected'); + console.log('WS Connected'); const joinMsg = { type: 'Join', data: { @@ -59,61 +103,37 @@ export class NetworkManager extends EventEmitter { }; this.ws?.send(JSON.stringify(joinMsg)); console.log('Sent Join:', joinMsg); - this.startHeartbeat(); }); - this.ws.on('message', (data: WebSocket.Data, isBinary: boolean) => { - if (isBinary) { - this.handleBinaryMessage(data as Buffer); - } else { - const text = data.toString(); - try { - const msg = JSON.parse(text); - if (msg.type !== 'Heartbeat') { - console.log('[Network] Received Text Message:', text.substring(0, 100)); - } - this.handleWsMessage(msg, resolve, reject); - } catch (e) { - console.error('Failed to parse WS msg', e); - } + this.ws.on('message', (data: WebSocket.Data) => { + try { + const msg = JSON.parse(data.toString()); + this.handleWsMessage(msg, resolve, reject); + } catch (e) { + console.error('Failed to parse WS msg', e); } }); this.ws.on('error', (err) => { - console.error('[Network] WebSocket Error:', err); + console.error('WS Error', err); reject(err); }); - this.ws.on('close', (code, reason) => { - console.warn(`[Network] WebSocket Closed: code=${code}, reason=${reason}`); - this.stopHeartbeat(); + this.ws.on('close', () => { + console.log('WS Closed'); this.emit('disconnected'); + if (this.udp) this.udp.close(); }); }); } - private startHeartbeat() { - this.stopHeartbeat(); - this.pingInterval = setInterval(() => { - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - this.ws.send(JSON.stringify({ type: 'Heartbeat' })); - } - }, 10000); - } - - private stopHeartbeat() { - if (this.pingInterval) { - clearInterval(this.pingInterval); - this.pingInterval = null; - } - } - handleWsMessage(msg: any, resolve: any, reject: any) { console.log('Received WS Message:', msg.type); switch (msg.type) { case 'Joined': console.log('Joined Room:', msg.data); this.userId = msg.data.self_id; + this.setupUdp(); resolve(msg.data); break; case 'PeerJoined': @@ -126,7 +146,11 @@ export class NetworkManager extends EventEmitter { this.safeSend('chat-message', msg.data); break; case 'UpdateStream': - this.safeSend('peer-stream-update', msg.data); + // Ignore stream updates for self (we manage local state directly) + if (msg.data.user_id !== this.userId) { + console.log(`[Network] Peer Stream Update: User=${msg.data.user_id} Type=${msg.data.media_type} Active=${msg.data.active}`); + this.safeSend('peer-stream-update', msg.data); + } break; case 'Error': console.error('WS Error Msg:', msg.data); @@ -166,18 +190,29 @@ export class NetworkManager extends EventEmitter { this.ws.send(JSON.stringify(msg)); } - handleBinaryMessage(msg: Buffer) { + private heartbeatInterval: NodeJS.Timeout | null = null; + + setupUdp() { + this.udp = dgram.createSocket('udp4'); + + this.udp.on('listening', () => { + const addr = this.udp?.address(); + console.log(`UDP Listening on ${addr?.port}`); + this.startHeartbeat(); + }); + + this.udp.on('message', (msg, rinfo) => { + // console.log(`[UDP] Msg from ${rinfo.address}:${rinfo.port} - ${msg.length} bytes`); + this.handleUdpMessage(msg); + }); + + this.udp.bind(0); // Bind random port + } + + handleUdpMessage(msg: Buffer) { if (msg.length < HEADER_SIZE) return; - this.binaryCount++; - const now = Date.now(); - if (now - this.lastBinaryLog > 5000) { - console.log(`[Network] Media fragments received in last 5s: ${this.binaryCount}`); - this.binaryCount = 0; - this.lastBinaryLog = now; - } - - // Parse Header + const version = msg.readUInt8(0); const mediaType = msg.readUInt8(1); const userId = msg.readUInt32LE(2); const seq = msg.readUInt32LE(6); @@ -190,9 +225,15 @@ export class NetworkManager extends EventEmitter { const payload = msg.subarray(HEADER_SIZE); if (mediaType === MediaType.Audio) { + // Original simple approach - just forward to renderer (PCM) this.safeSend('audio-frame', { user_id: userId, data: payload }); } else if (mediaType === MediaType.Video || mediaType === MediaType.Screen) { + // Differentiate based on MediaType const streamType = mediaType === MediaType.Screen ? 'screen' : 'video'; + if (mediaType === MediaType.Screen && fragIdx === 0) { + console.log(`[Network] RX Screen Chunk User=${userId} Seq=${seq}`); + } + this.safeSend('video-chunk', { user_id: userId, data: payload, @@ -201,7 +242,7 @@ export class NetworkManager extends EventEmitter { fidx: fragIdx, fcnt: fragCnt, isKeyFrame, - streamType + streamType // Pass this to renderer }); } } @@ -209,6 +250,9 @@ export class NetworkManager extends EventEmitter { private safeSend(channel: string, data: any) { if (this.mainWindow && !this.mainWindow.isDestroyed() && this.mainWindow.webContents) { try { + if (channel === 'audio-fragment') { + console.log(`[Network] safeSend audio-fragment to renderer, data size=${data.data?.length}`); + } this.mainWindow.webContents.send(channel, data); } catch (e) { console.error(`Failed to send ${channel} to renderer:`, e); @@ -221,22 +265,19 @@ export class NetworkManager extends EventEmitter { // --- New Encode Methods --- sendEncodedVideoChunk(chunk: any, isKeyFrame: boolean, timestamp: number, streamType: 'video' | 'screen' = 'video') { - if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + if (!this.udp) return; - // Backpressure check: If we have > 2.5MB buffered, drop this frame. - // For video, it's better to drop than to lag. - if (this.ws.bufferedAmount > 2.5 * 1024 * 1024) { - this.dropCount++; - if (this.dropCount % 60 === 0) { - console.warn(`[Network] Backpressure! Dropped ${this.dropCount} video frames. Buffered: ${this.ws.bufferedAmount} bytes`); - } - return; - } - - const MAX_PAYLOAD = 16384; + const MAX_PAYLOAD = 1400; const totalSize = chunk.length; + // Use generic videoSeq for both? Or separate? + // Best to separate to avoid gap detection issues if one stream is idle. + // But for now, let's share for simplicity or use screenSeq if screen. + // Actually, let's use separate seq if possible, but I only have videoSeq. + // Let's use videoSeq for both for now, assuming the receiver tracks them separately or doesn't care about gaps across types. + // Better: Use a map or separate counters. const seq = streamType === 'screen' ? this.screenSeq++ : this.videoSeq++; + const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD); for (let i = 0; i < fragmentCount; i++) { @@ -244,7 +285,7 @@ export class NetworkManager extends EventEmitter { const end = Math.min(start + MAX_PAYLOAD, totalSize); const slice = chunk.slice(start, end); - // Header (24 bytes) + // Header (22 bytes) const header = Buffer.alloc(HEADER_SIZE); header.writeUInt8(1, 0); // Version const mType = streamType === 'screen' ? MediaType.Screen : MediaType.Video; @@ -261,15 +302,14 @@ export class NetworkManager extends EventEmitter { const packet = Buffer.concat([header, slice]); - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - this.ws.send(packet); - } + // Enqueue for pacing + this.udpQueue.push(packet); } } - // Simple audio frame sending (raw PCM) + // Simple audio frame sending (raw PCM) - matches original working implementation sendAudioFrame(frame: Uint8Array) { - if (!this.ws) return; + if (!this.udp) return; const header = Buffer.alloc(HEADER_SIZE); header.writeUInt8(1, 0); // Version @@ -283,20 +323,27 @@ export class NetworkManager extends EventEmitter { const packet = Buffer.concat([header, Buffer.from(frame)]); - if (this.ws.readyState === WebSocket.OPEN) { - this.ws.send(packet); - } + // Send directly via pacer queue + this.udpQueue.push(packet); } sendEncodedAudioChunk(chunk: Uint8Array, timestamp: number) { - if (!this.ws) return; + if (!this.udp) { + console.warn('[Network] UDP Socket not ready for Audio'); + return; + } const totalSize = chunk.length; - const MAX_PAYLOAD = 1400; + const MAX_PAYLOAD = 1400; // Safe MTU // PCM packets (approx 2KB) need fragmentation. + // We use the same logic as video but with Audio MediaType. + const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD); + // Log randomly to avoid spam but confirm activity + if (Math.random() < 0.05) console.log(`[Network] Sending Audio Chunk size=${totalSize} frags=${fragmentCount}`); + for (let i = 0; i < fragmentCount; i++) { const start = i * MAX_PAYLOAD; const end = Math.min(start + MAX_PAYLOAD, totalSize); @@ -310,22 +357,74 @@ export class NetworkManager extends EventEmitter { header.writeBigUInt64LE(BigInt(Math.floor(timestamp)), 10); header.writeUInt16LE(i, 18); // Frag idx header.writeUInt16LE(fragmentCount, 20); // Frag cnt - header.writeUInt16LE(1, 22); // Flags + header.writeUInt16LE(1, 22); // Flags (1=Keyframe, audio is always key) const packet = Buffer.concat([header, Buffer.from(slice)]); - - if (this.ws.readyState === WebSocket.OPEN) { - this.ws.send(packet); - } + this.udpQueue.push(packet); } this.audioSeq++; } - disconnect() { - if (this.ws) { - this.ws.close(); - this.ws = null; + startHeartbeat() { + if (this.heartbeatInterval) clearInterval(this.heartbeatInterval); + + // Send immediately + this.sendHandshake(); + + // Send 3 bursts to ensure traversal + setTimeout(() => this.sendHandshake(), 500); + setTimeout(() => this.sendHandshake(), 1000); + + // Keep-alive every 3 seconds + this.heartbeatInterval = setInterval(() => { + this.sendHandshake(); + }, 3000); + } + + sendHandshake() { + if (!this.udp || !this.userId || !this.roomCode) { + // console.error('[UDP] Cannot send handshake: missing udp, userId, or roomCode'); + return; } + + const roomCodeBytes = Buffer.from(this.roomCode, 'utf-8'); + const payloadLen = 4 + 8 + roomCodeBytes.length; + const payload = Buffer.alloc(payloadLen); + payload.writeUInt32LE(this.userId, 0); // user_id + payload.writeBigUInt64LE(BigInt(roomCodeBytes.length), 4); // string length + roomCodeBytes.copy(payload, 12); // room_code + + const header = Buffer.alloc(HEADER_SIZE); + header.writeUInt8(1, 0); // Version + header.writeUInt8(3, 1); // MediaType.Command = 3 + header.writeUInt32LE(this.userId, 2); + header.writeUInt32LE(0, 6); // Sequence + header.writeBigUInt64LE(BigInt(Date.now()), 10); + header.writeUInt16LE(0, 18); // Frag idx + header.writeUInt16LE(1, 20); // Frag cnt + header.writeUInt16LE(0, 22); // Flags + + const packet = Buffer.concat([header, payload]); + + // console.log(`[UDP] Sending Handshake: userId=${this.userId}, room=${this.roomCode}, ${packet.length} bytes to ${this.serverUdpHost}:${SERVER_UDP_PORT}`); + this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => { + if (err) console.error('UDP Handshake Send Error', err); + }); + } + + disconnect() { + if (this.pacerInterval) { + clearInterval(this.pacerInterval); + this.pacerInterval = null; + } + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + if (this.ws) this.ws.close(); + if (this.udp) this.udp.close(); + this.ws = null; + this.udp = null; } } diff --git a/src/renderer/src/App.tsx b/src/renderer/src/App.tsx index 6c6741e..83ae0c8 100644 --- a/src/renderer/src/App.tsx +++ b/src/renderer/src/App.tsx @@ -482,7 +482,7 @@ function App() { video: { width: 1280, height: 720, - frameRate: 60, + frameRate: 30, deviceId: selectedVideoDevice ? { exact: selectedVideoDevice } : undefined } }); @@ -566,8 +566,7 @@ function App() { chromeMediaSource: 'desktop', chromeMediaSourceId: sourceId, maxWidth: 1920, - maxHeight: 1080, - maxFrameRate: 60 + maxHeight: 1080 } } } as any); diff --git a/src/renderer/src/utils/MediaEngine.ts b/src/renderer/src/utils/MediaEngine.ts index 7408bf4..05bed94 100644 --- a/src/renderer/src/utils/MediaEngine.ts +++ b/src/renderer/src/utils/MediaEngine.ts @@ -51,8 +51,8 @@ export class MediaEngine extends SimpleEventEmitter { codec: 'avc1.42001f', // H.264 Baseline Profile Level 3.1 (720p safe) width: 1280, height: 720, - bitrate: 4_000_000, - framerate: 60, + bitrate: 2_000_000, + framerate: 30, latencyMode: 'realtime', avc: { format: 'annexb' } }; @@ -62,8 +62,8 @@ export class MediaEngine extends SimpleEventEmitter { codec: 'avc1.64002a', width: 1920, height: 1080, - bitrate: 8_000_000, // Reduced to 2 Mbps for better stability/FPS - framerate: 60, + bitrate: 2_000_000, // Reduced to 2 Mbps for better stability/FPS + framerate: 30, latencyMode: 'realtime', // Changed from 'quality' to 'realtime' for lower latency avc: { format: 'annexb' } }; @@ -193,7 +193,7 @@ export class MediaEngine extends SimpleEventEmitter { // Note: Decoders are usually more flexible, but giving a hint helps. // Screen share uses High Profile, Video uses Baseline. const config: VideoDecoderConfig = streamType === 'screen' - ? { codec: 'avc1.64002a', optimizeForLatency: true } + ? { codec: 'avc1.64002a', optimizeForLatency: false } : { codec: 'avc1.42001f', optimizeForLatency: true }; decoder.configure(config);