From b3f5902c445bfeaf3308679a3e7aa12e4d8f1a0d Mon Sep 17 00:00:00 2001 From: srtk Date: Fri, 13 Feb 2026 22:04:27 +0530 Subject: [PATCH] Migrate client to TCP-only streaming and add diagnostics --- scripts/test-network.js | 75 ++++++++++++ src/main/network.ts | 250 ++++++++++++---------------------------- 2 files changed, 147 insertions(+), 178 deletions(-) create mode 100644 scripts/test-network.js diff --git a/scripts/test-network.js b/scripts/test-network.js new file mode 100644 index 0000000..b4b99d2 --- /dev/null +++ b/scripts/test-network.js @@ -0,0 +1,75 @@ +const WebSocket = require('ws'); + +const SERVER = process.argv[2] || 'wss://meet.srtk.in/ws'; +const ROOM = 'diag-test'; +const NAME = 'diag-bot-' + Math.floor(Math.random() * 1000); + +console.log(`--- Network Diagnostic Tool ---`); +console.log(`Connecting to: ${SERVER}`); +console.log(`Room: ${ROOM}, Identity: ${NAME}`); + +const url = `${SERVER}?room=${ROOM}&name=${NAME}`; +const ws = new WebSocket(url); + +let startTime = Date.now(); +let textPings = 0; +let binaryPings = 0; +let textAcks = 0; +let binaryAcks = 0; + +ws.on('open', () => { + console.log('✅ WebSocket Connected'); + + // 1. Join + ws.send(JSON.stringify({ + type: 'Join', + data: { room_code: ROOM, display_name: NAME } + })); + + // 2. Start Test Cycle + setInterval(() => { + // Send Text Ping + ws.send(JSON.stringify({ type: 'Heartbeat' })); + textPings++; + + // Send Binary Ping (1KB) + const dummy = Buffer.alloc(1024); + dummy[0] = 0xAA; // Diagnostic marker + ws.send(dummy); + binaryPings++; + + console.log(`Stats: TX[Text:${textPings}, Bin:${binaryPings}] RX[Text:${textAcks}, Bin:${binaryAcks}]`); + }, 2000); +}); + +ws.on('message', (data, isBinary) => { + if (isBinary) { + binaryAcks++; + } else { + try { + const msg = JSON.parse(data.toString()); + if (msg.type === 'Heartbeat') { + textAcks++; + } else if (msg.type === 'Joined') { + console.log('✅ Joined successfully! Self ID:', msg.data.self_id); + } + } catch (e) { } + } +}); + +ws.on('error', (err) => { + console.error('❌ WebSocket Error:', err.message); +}); + +ws.on('close', (code, reason) => { + console.log(`⚠️ Connection Closed (Code: ${code}, Reason: ${reason})`); + process.exit(0); +}); + +setTimeout(() => { + console.log('--- Test Finished ---'); + console.log(`Summary:`); + console.log(`Text Pings: ${textPings}, Acks: ${textAcks} (${Math.round(textAcks / textPings * 100)}%)`); + console.log(`Binary Pings: ${binaryPings}, Acks: ${binaryAcks} (${Math.round(binaryAcks / binaryPings * 100)}%)`); + process.exit(0); +}, 20000); diff --git a/src/main/network.ts b/src/main/network.ts index af0886f..9ca19c7 100644 --- a/src/main/network.ts +++ b/src/main/network.ts @@ -1,10 +1,9 @@ import { EventEmitter } from 'events'; -import * as dgram from 'dgram'; + import WebSocket from 'ws'; import { BrowserWindow } from 'electron'; // Constants -const SERVER_UDP_PORT = 4000; // Packet Header Structure (24 bytes) const HEADER_SIZE = 24; @@ -15,70 +14,28 @@ export enum MediaType { Screen = 2, } -// Token Bucket Pacer Constants -const PACER_RATE_BYTES_PER_MS = 1500; // ~12 Mbps limit (Targeting 8-10 Mbps for 1080p60) -const PACER_BUCKET_SIZE_BYTES = 15000; // Allow 10 packets burst (Instant Keyframe start) -const MAX_PAYLOAD = 1200; // Reduced from 1400 to be safe with MTU - export class NetworkManager extends EventEmitter { private ws: WebSocket | null = null; - private udp: dgram.Socket | null = null; private userId: number = 0; private roomCode: string = ''; private videoSeq: number = 0; private audioSeq: number = 0; private screenSeq = 0; private mainWindow: BrowserWindow; - private serverUdpHost: string = '127.0.0.1'; - - // Pacing - private udpQueue: Buffer[] = []; - private pacerTokens: number = PACER_BUCKET_SIZE_BYTES; - private lastPacerUpdate: number = Date.now(); - private pacerInterval: NodeJS.Timeout | null = null; + private pingInterval: NodeJS.Timeout | null = null; + private lastBinaryLog: number = 0; + private binaryCount: number = 0; constructor(mainWindow: BrowserWindow) { super(); this.mainWindow = mainWindow; - this.startPacer(); - } - - private startPacer() { - this.pacerInterval = setInterval(() => { - if (!this.udp) return; - - const now = Date.now(); - const elapsed = now - this.lastPacerUpdate; - this.lastPacerUpdate = now; - - // Refill tokens - this.pacerTokens += elapsed * PACER_RATE_BYTES_PER_MS; - if (this.pacerTokens > PACER_BUCKET_SIZE_BYTES) { - this.pacerTokens = PACER_BUCKET_SIZE_BYTES; - } - - // Drain queue - while (this.udpQueue.length > 0) { - const packet = this.udpQueue[0]; - if (this.pacerTokens >= packet.length) { - this.pacerTokens -= packet.length; - this.udpQueue.shift(); - this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => { - if (err) console.error('UDP Send Error', err); - }); - } else { - break; // Not enough tokens, wait for next tick - } - } - }, 2); // Check every 2ms } async connect(serverUrl: string, roomCode: string, displayName: string): Promise { - this.roomCode = roomCode; // Store for UDP handshake + this.roomCode = roomCode; return new Promise((resolve, reject) => { // Determine Host and Protocol let host = serverUrl.trim().replace(/^wss?:\/\//, '').replace(/\/$/, ''); - this.serverUdpHost = host.split(':')[0]; // Hostname only for UDP (strip port if present) // Auto-detect protocol: localhost/IP uses ws://, domains use wss:// (HTTPS) const isLocal = host.includes('localhost') || @@ -88,12 +45,12 @@ export class NetworkManager extends EventEmitter { const protocol = isLocal ? 'ws' : 'wss'; const wsUrl = `${protocol}://${host}/ws`; - console.log(`[Network] Connecting to WS: ${wsUrl}, UDP Host: ${this.serverUdpHost}`); + console.log(`[Network] Connecting to WS: ${wsUrl}`); this.ws = new WebSocket(`${wsUrl}?room=${roomCode}&name=${displayName}`); this.ws.on('open', () => { - console.log('WS Connected'); + console.log('[Network] WebSocket Connected'); const joinMsg = { type: 'Join', data: { @@ -103,37 +60,61 @@ export class NetworkManager extends EventEmitter { }; this.ws?.send(JSON.stringify(joinMsg)); console.log('Sent Join:', joinMsg); + this.startHeartbeat(); }); - this.ws.on('message', (data: WebSocket.Data) => { - try { - const msg = JSON.parse(data.toString()); - this.handleWsMessage(msg, resolve, reject); - } catch (e) { - console.error('Failed to parse WS msg', e); + this.ws.on('message', (data: WebSocket.Data, isBinary: boolean) => { + if (isBinary) { + this.handleBinaryMessage(data as Buffer); + } else { + const text = data.toString(); + try { + const msg = JSON.parse(text); + if (msg.type !== 'Heartbeat') { + console.log('[Network] Received Text Message:', text.substring(0, 100)); + } + this.handleWsMessage(msg, resolve, reject); + } catch (e) { + console.error('Failed to parse WS msg', e); + } } }); this.ws.on('error', (err) => { - console.error('WS Error', err); + console.error('[Network] WebSocket Error:', err); reject(err); }); - this.ws.on('close', () => { - console.log('WS Closed'); + this.ws.on('close', (code, reason) => { + console.warn(`[Network] WebSocket Closed: code=${code}, reason=${reason}`); + this.stopHeartbeat(); this.emit('disconnected'); - if (this.udp) this.udp.close(); }); }); } + private startHeartbeat() { + this.stopHeartbeat(); + this.pingInterval = setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify({ type: 'Heartbeat' })); + } + }, 10000); + } + + private stopHeartbeat() { + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + } + } + handleWsMessage(msg: any, resolve: any, reject: any) { console.log('Received WS Message:', msg.type); switch (msg.type) { case 'Joined': console.log('Joined Room:', msg.data); this.userId = msg.data.self_id; - this.setupUdp(); resolve(msg.data); break; case 'PeerJoined': @@ -190,29 +171,18 @@ export class NetworkManager extends EventEmitter { this.ws.send(JSON.stringify(msg)); } - private heartbeatInterval: NodeJS.Timeout | null = null; - - setupUdp() { - this.udp = dgram.createSocket('udp4'); - - this.udp.on('listening', () => { - const addr = this.udp?.address(); - console.log(`UDP Listening on ${addr?.port}`); - this.startHeartbeat(); - }); - - this.udp.on('message', (msg, rinfo) => { - // console.log(`[UDP] Msg from ${rinfo.address}:${rinfo.port} - ${msg.length} bytes`); - this.handleUdpMessage(msg); - }); - - this.udp.bind(0); // Bind random port - } - - handleUdpMessage(msg: Buffer) { + handleBinaryMessage(msg: Buffer) { if (msg.length < HEADER_SIZE) return; - const version = msg.readUInt8(0); + this.binaryCount++; + const now = Date.now(); + if (now - this.lastBinaryLog > 5000) { + console.log(`[Network] Media fragments received in last 5s: ${this.binaryCount}`); + this.binaryCount = 0; + this.lastBinaryLog = now; + } + + // Parse Header const mediaType = msg.readUInt8(1); const userId = msg.readUInt32LE(2); const seq = msg.readUInt32LE(6); @@ -225,15 +195,9 @@ export class NetworkManager extends EventEmitter { const payload = msg.subarray(HEADER_SIZE); if (mediaType === MediaType.Audio) { - // Original simple approach - just forward to renderer (PCM) this.safeSend('audio-frame', { user_id: userId, data: payload }); } else if (mediaType === MediaType.Video || mediaType === MediaType.Screen) { - // Differentiate based on MediaType const streamType = mediaType === MediaType.Screen ? 'screen' : 'video'; - if (mediaType === MediaType.Screen && fragIdx === 0) { - console.log(`[Network] RX Screen Chunk User=${userId} Seq=${seq}`); - } - this.safeSend('video-chunk', { user_id: userId, data: payload, @@ -242,7 +206,7 @@ export class NetworkManager extends EventEmitter { fidx: fragIdx, fcnt: fragCnt, isKeyFrame, - streamType // Pass this to renderer + streamType }); } } @@ -250,9 +214,6 @@ export class NetworkManager extends EventEmitter { private safeSend(channel: string, data: any) { if (this.mainWindow && !this.mainWindow.isDestroyed() && this.mainWindow.webContents) { try { - if (channel === 'audio-fragment') { - console.log(`[Network] safeSend audio-fragment to renderer, data size=${data.data?.length}`); - } this.mainWindow.webContents.send(channel, data); } catch (e) { console.error(`Failed to send ${channel} to renderer:`, e); @@ -265,19 +226,10 @@ export class NetworkManager extends EventEmitter { // --- New Encode Methods --- sendEncodedVideoChunk(chunk: any, isKeyFrame: boolean, timestamp: number, streamType: 'video' | 'screen' = 'video') { - if (!this.udp) return; - - const MAX_PAYLOAD = 1400; + const MAX_PAYLOAD = 1400; // WS can handle larger but keeping small for consistency const totalSize = chunk.length; - // Use generic videoSeq for both? Or separate? - // Best to separate to avoid gap detection issues if one stream is idle. - // But for now, let's share for simplicity or use screenSeq if screen. - // Actually, let's use separate seq if possible, but I only have videoSeq. - // Let's use videoSeq for both for now, assuming the receiver tracks them separately or doesn't care about gaps across types. - // Better: Use a map or separate counters. const seq = streamType === 'screen' ? this.screenSeq++ : this.videoSeq++; - const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD); for (let i = 0; i < fragmentCount; i++) { @@ -302,14 +254,15 @@ export class NetworkManager extends EventEmitter { const packet = Buffer.concat([header, slice]); - // Enqueue for pacing - this.udpQueue.push(packet); + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(packet); + } } } - // Simple audio frame sending (raw PCM) - matches original working implementation + // Simple audio frame sending (raw PCM) sendAudioFrame(frame: Uint8Array) { - if (!this.udp) return; + if (!this.ws) return; const header = Buffer.alloc(HEADER_SIZE); header.writeUInt8(1, 0); // Version @@ -323,27 +276,20 @@ export class NetworkManager extends EventEmitter { const packet = Buffer.concat([header, Buffer.from(frame)]); - // Send directly via pacer queue - this.udpQueue.push(packet); + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(packet); + } } sendEncodedAudioChunk(chunk: Uint8Array, timestamp: number) { - if (!this.udp) { - console.warn('[Network] UDP Socket not ready for Audio'); - return; - } + if (!this.ws) return; const totalSize = chunk.length; - const MAX_PAYLOAD = 1400; // Safe MTU + const MAX_PAYLOAD = 1400; // PCM packets (approx 2KB) need fragmentation. - // We use the same logic as video but with Audio MediaType. - const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD); - // Log randomly to avoid spam but confirm activity - if (Math.random() < 0.05) console.log(`[Network] Sending Audio Chunk size=${totalSize} frags=${fragmentCount}`); - for (let i = 0; i < fragmentCount; i++) { const start = i * MAX_PAYLOAD; const end = Math.min(start + MAX_PAYLOAD, totalSize); @@ -357,74 +303,22 @@ export class NetworkManager extends EventEmitter { header.writeBigUInt64LE(BigInt(Math.floor(timestamp)), 10); header.writeUInt16LE(i, 18); // Frag idx header.writeUInt16LE(fragmentCount, 20); // Frag cnt - header.writeUInt16LE(1, 22); // Flags (1=Keyframe, audio is always key) + header.writeUInt16LE(1, 22); // Flags const packet = Buffer.concat([header, Buffer.from(slice)]); - this.udpQueue.push(packet); + + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(packet); + } } this.audioSeq++; } - startHeartbeat() { - if (this.heartbeatInterval) clearInterval(this.heartbeatInterval); - - // Send immediately - this.sendHandshake(); - - // Send 3 bursts to ensure traversal - setTimeout(() => this.sendHandshake(), 500); - setTimeout(() => this.sendHandshake(), 1000); - - // Keep-alive every 3 seconds - this.heartbeatInterval = setInterval(() => { - this.sendHandshake(); - }, 3000); - } - - sendHandshake() { - if (!this.udp || !this.userId || !this.roomCode) { - // console.error('[UDP] Cannot send handshake: missing udp, userId, or roomCode'); - return; - } - - const roomCodeBytes = Buffer.from(this.roomCode, 'utf-8'); - const payloadLen = 4 + 8 + roomCodeBytes.length; - const payload = Buffer.alloc(payloadLen); - payload.writeUInt32LE(this.userId, 0); // user_id - payload.writeBigUInt64LE(BigInt(roomCodeBytes.length), 4); // string length - roomCodeBytes.copy(payload, 12); // room_code - - const header = Buffer.alloc(HEADER_SIZE); - header.writeUInt8(1, 0); // Version - header.writeUInt8(3, 1); // MediaType.Command = 3 - header.writeUInt32LE(this.userId, 2); - header.writeUInt32LE(0, 6); // Sequence - header.writeBigUInt64LE(BigInt(Date.now()), 10); - header.writeUInt16LE(0, 18); // Frag idx - header.writeUInt16LE(1, 20); // Frag cnt - header.writeUInt16LE(0, 22); // Flags - - const packet = Buffer.concat([header, payload]); - - // console.log(`[UDP] Sending Handshake: userId=${this.userId}, room=${this.roomCode}, ${packet.length} bytes to ${this.serverUdpHost}:${SERVER_UDP_PORT}`); - this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => { - if (err) console.error('UDP Handshake Send Error', err); - }); - } - disconnect() { - if (this.pacerInterval) { - clearInterval(this.pacerInterval); - this.pacerInterval = null; + if (this.ws) { + this.ws.close(); + this.ws = null; } - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - this.heartbeatInterval = null; - } - if (this.ws) this.ws.close(); - if (this.udp) this.udp.close(); - this.ws = null; - this.udp = null; } }