Compare commits

...
Sign in to create a new pull request.

2 commits
master ... tcp

4 changed files with 168 additions and 191 deletions

75
scripts/test-network.js Normal file
View file

@ -0,0 +1,75 @@
const WebSocket = require('ws');
const SERVER = process.argv[2] || 'wss://meet.srtk.in/ws';
const ROOM = 'diag-test';
const NAME = 'diag-bot-' + Math.floor(Math.random() * 1000);
console.log(`--- Network Diagnostic Tool ---`);
console.log(`Connecting to: ${SERVER}`);
console.log(`Room: ${ROOM}, Identity: ${NAME}`);
const url = `${SERVER}?room=${ROOM}&name=${NAME}`;
const ws = new WebSocket(url);
let startTime = Date.now();
let textPings = 0;
let binaryPings = 0;
let textAcks = 0;
let binaryAcks = 0;
ws.on('open', () => {
console.log('✅ WebSocket Connected');
// 1. Join
ws.send(JSON.stringify({
type: 'Join',
data: { room_code: ROOM, display_name: NAME }
}));
// 2. Start Test Cycle
setInterval(() => {
// Send Text Ping
ws.send(JSON.stringify({ type: 'Heartbeat' }));
textPings++;
// Send Binary Ping (1KB)
const dummy = Buffer.alloc(1024);
dummy[0] = 0xAA; // Diagnostic marker
ws.send(dummy);
binaryPings++;
console.log(`Stats: TX[Text:${textPings}, Bin:${binaryPings}] RX[Text:${textAcks}, Bin:${binaryAcks}]`);
}, 2000);
});
ws.on('message', (data, isBinary) => {
if (isBinary) {
binaryAcks++;
} else {
try {
const msg = JSON.parse(data.toString());
if (msg.type === 'Heartbeat') {
textAcks++;
} else if (msg.type === 'Joined') {
console.log('✅ Joined successfully! Self ID:', msg.data.self_id);
}
} catch (e) { }
}
});
ws.on('error', (err) => {
console.error('❌ WebSocket Error:', err.message);
});
ws.on('close', (code, reason) => {
console.log(`⚠️ Connection Closed (Code: ${code}, Reason: ${reason})`);
process.exit(0);
});
setTimeout(() => {
console.log('--- Test Finished ---');
console.log(`Summary:`);
console.log(`Text Pings: ${textPings}, Acks: ${textAcks} (${Math.round(textAcks / textPings * 100)}%)`);
console.log(`Binary Pings: ${binaryPings}, Acks: ${binaryAcks} (${Math.round(binaryAcks / binaryPings * 100)}%)`);
process.exit(0);
}, 20000);

View file

@ -1,10 +1,9 @@
import { EventEmitter } from 'events';
import * as dgram from 'dgram';
import WebSocket from 'ws';
import { BrowserWindow } from 'electron';
// Constants
const SERVER_UDP_PORT = 4000;
// Packet Header Structure (24 bytes)
const HEADER_SIZE = 24;
@ -15,70 +14,27 @@ export enum MediaType {
Screen = 2,
}
// Token Bucket Pacer Constants
const PACER_RATE_BYTES_PER_MS = 1500; // ~12 Mbps limit (Targeting 8-10 Mbps for 1080p60)
const PACER_BUCKET_SIZE_BYTES = 15000; // Allow 10 packets burst (Instant Keyframe start)
const MAX_PAYLOAD = 1200; // Reduced from 1400 to be safe with MTU
export class NetworkManager extends EventEmitter {
private ws: WebSocket | null = null;
private udp: dgram.Socket | null = null;
private userId: number = 0;
private roomCode: string = '';
private videoSeq: number = 0;
private audioSeq: number = 0;
private screenSeq = 0;
private mainWindow: BrowserWindow;
private serverUdpHost: string = '127.0.0.1';
// Pacing
private udpQueue: Buffer[] = [];
private pacerTokens: number = PACER_BUCKET_SIZE_BYTES;
private lastPacerUpdate: number = Date.now();
private pacerInterval: NodeJS.Timeout | null = null;
private pingInterval: NodeJS.Timeout | null = null;
private lastBinaryLog: number = 0;
private binaryCount: number = 0;
private dropCount: number = 0;
constructor(mainWindow: BrowserWindow) {
super();
this.mainWindow = mainWindow;
this.startPacer();
}
private startPacer() {
this.pacerInterval = setInterval(() => {
if (!this.udp) return;
const now = Date.now();
const elapsed = now - this.lastPacerUpdate;
this.lastPacerUpdate = now;
// Refill tokens
this.pacerTokens += elapsed * PACER_RATE_BYTES_PER_MS;
if (this.pacerTokens > PACER_BUCKET_SIZE_BYTES) {
this.pacerTokens = PACER_BUCKET_SIZE_BYTES;
}
// Drain queue
while (this.udpQueue.length > 0) {
const packet = this.udpQueue[0];
if (this.pacerTokens >= packet.length) {
this.pacerTokens -= packet.length;
this.udpQueue.shift();
this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => {
if (err) console.error('UDP Send Error', err);
});
} else {
break; // Not enough tokens, wait for next tick
}
}
}, 2); // Check every 2ms
}
async connect(serverUrl: string, roomCode: string, displayName: string): Promise<any> {
this.roomCode = roomCode; // Store for UDP handshake
return new Promise((resolve, reject) => {
// Determine Host and Protocol
let host = serverUrl.trim().replace(/^wss?:\/\//, '').replace(/\/$/, '');
this.serverUdpHost = host.split(':')[0]; // Hostname only for UDP (strip port if present)
// Auto-detect protocol: localhost/IP uses ws://, domains use wss:// (HTTPS)
const isLocal = host.includes('localhost') ||
@ -88,12 +44,12 @@ export class NetworkManager extends EventEmitter {
const protocol = isLocal ? 'ws' : 'wss';
const wsUrl = `${protocol}://${host}/ws`;
console.log(`[Network] Connecting to WS: ${wsUrl}, UDP Host: ${this.serverUdpHost}`);
console.log(`[Network] Connecting to WS: ${wsUrl}`);
this.ws = new WebSocket(`${wsUrl}?room=${roomCode}&name=${displayName}`);
this.ws.on('open', () => {
console.log('WS Connected');
console.log('[Network] WebSocket Connected');
const joinMsg = {
type: 'Join',
data: {
@ -103,37 +59,61 @@ export class NetworkManager extends EventEmitter {
};
this.ws?.send(JSON.stringify(joinMsg));
console.log('Sent Join:', joinMsg);
this.startHeartbeat();
});
this.ws.on('message', (data: WebSocket.Data) => {
try {
const msg = JSON.parse(data.toString());
this.handleWsMessage(msg, resolve, reject);
} catch (e) {
console.error('Failed to parse WS msg', e);
this.ws.on('message', (data: WebSocket.Data, isBinary: boolean) => {
if (isBinary) {
this.handleBinaryMessage(data as Buffer);
} else {
const text = data.toString();
try {
const msg = JSON.parse(text);
if (msg.type !== 'Heartbeat') {
console.log('[Network] Received Text Message:', text.substring(0, 100));
}
this.handleWsMessage(msg, resolve, reject);
} catch (e) {
console.error('Failed to parse WS msg', e);
}
}
});
this.ws.on('error', (err) => {
console.error('WS Error', err);
console.error('[Network] WebSocket Error:', err);
reject(err);
});
this.ws.on('close', () => {
console.log('WS Closed');
this.ws.on('close', (code, reason) => {
console.warn(`[Network] WebSocket Closed: code=${code}, reason=${reason}`);
this.stopHeartbeat();
this.emit('disconnected');
if (this.udp) this.udp.close();
});
});
}
private startHeartbeat() {
this.stopHeartbeat();
this.pingInterval = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'Heartbeat' }));
}
}, 10000);
}
private stopHeartbeat() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
handleWsMessage(msg: any, resolve: any, reject: any) {
console.log('Received WS Message:', msg.type);
switch (msg.type) {
case 'Joined':
console.log('Joined Room:', msg.data);
this.userId = msg.data.self_id;
this.setupUdp();
resolve(msg.data);
break;
case 'PeerJoined':
@ -146,11 +126,7 @@ export class NetworkManager extends EventEmitter {
this.safeSend('chat-message', msg.data);
break;
case 'UpdateStream':
// Ignore stream updates for self (we manage local state directly)
if (msg.data.user_id !== this.userId) {
console.log(`[Network] Peer Stream Update: User=${msg.data.user_id} Type=${msg.data.media_type} Active=${msg.data.active}`);
this.safeSend('peer-stream-update', msg.data);
}
this.safeSend('peer-stream-update', msg.data);
break;
case 'Error':
console.error('WS Error Msg:', msg.data);
@ -190,29 +166,18 @@ export class NetworkManager extends EventEmitter {
this.ws.send(JSON.stringify(msg));
}
private heartbeatInterval: NodeJS.Timeout | null = null;
setupUdp() {
this.udp = dgram.createSocket('udp4');
this.udp.on('listening', () => {
const addr = this.udp?.address();
console.log(`UDP Listening on ${addr?.port}`);
this.startHeartbeat();
});
this.udp.on('message', (msg, rinfo) => {
// console.log(`[UDP] Msg from ${rinfo.address}:${rinfo.port} - ${msg.length} bytes`);
this.handleUdpMessage(msg);
});
this.udp.bind(0); // Bind random port
}
handleUdpMessage(msg: Buffer) {
handleBinaryMessage(msg: Buffer) {
if (msg.length < HEADER_SIZE) return;
const version = msg.readUInt8(0);
this.binaryCount++;
const now = Date.now();
if (now - this.lastBinaryLog > 5000) {
console.log(`[Network] Media fragments received in last 5s: ${this.binaryCount}`);
this.binaryCount = 0;
this.lastBinaryLog = now;
}
// Parse Header
const mediaType = msg.readUInt8(1);
const userId = msg.readUInt32LE(2);
const seq = msg.readUInt32LE(6);
@ -225,15 +190,9 @@ export class NetworkManager extends EventEmitter {
const payload = msg.subarray(HEADER_SIZE);
if (mediaType === MediaType.Audio) {
// Original simple approach - just forward to renderer (PCM)
this.safeSend('audio-frame', { user_id: userId, data: payload });
} else if (mediaType === MediaType.Video || mediaType === MediaType.Screen) {
// Differentiate based on MediaType
const streamType = mediaType === MediaType.Screen ? 'screen' : 'video';
if (mediaType === MediaType.Screen && fragIdx === 0) {
console.log(`[Network] RX Screen Chunk User=${userId} Seq=${seq}`);
}
this.safeSend('video-chunk', {
user_id: userId,
data: payload,
@ -242,7 +201,7 @@ export class NetworkManager extends EventEmitter {
fidx: fragIdx,
fcnt: fragCnt,
isKeyFrame,
streamType // Pass this to renderer
streamType
});
}
}
@ -250,9 +209,6 @@ export class NetworkManager extends EventEmitter {
private safeSend(channel: string, data: any) {
if (this.mainWindow && !this.mainWindow.isDestroyed() && this.mainWindow.webContents) {
try {
if (channel === 'audio-fragment') {
console.log(`[Network] safeSend audio-fragment to renderer, data size=${data.data?.length}`);
}
this.mainWindow.webContents.send(channel, data);
} catch (e) {
console.error(`Failed to send ${channel} to renderer:`, e);
@ -265,19 +221,22 @@ export class NetworkManager extends EventEmitter {
// --- New Encode Methods ---
sendEncodedVideoChunk(chunk: any, isKeyFrame: boolean, timestamp: number, streamType: 'video' | 'screen' = 'video') {
if (!this.udp) return;
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
const MAX_PAYLOAD = 1400;
// Backpressure check: If we have > 2.5MB buffered, drop this frame.
// For video, it's better to drop than to lag.
if (this.ws.bufferedAmount > 2.5 * 1024 * 1024) {
this.dropCount++;
if (this.dropCount % 60 === 0) {
console.warn(`[Network] Backpressure! Dropped ${this.dropCount} video frames. Buffered: ${this.ws.bufferedAmount} bytes`);
}
return;
}
const MAX_PAYLOAD = 16384;
const totalSize = chunk.length;
// Use generic videoSeq for both? Or separate?
// Best to separate to avoid gap detection issues if one stream is idle.
// But for now, let's share for simplicity or use screenSeq if screen.
// Actually, let's use separate seq if possible, but I only have videoSeq.
// Let's use videoSeq for both for now, assuming the receiver tracks them separately or doesn't care about gaps across types.
// Better: Use a map or separate counters.
const seq = streamType === 'screen' ? this.screenSeq++ : this.videoSeq++;
const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD);
for (let i = 0; i < fragmentCount; i++) {
@ -285,7 +244,7 @@ export class NetworkManager extends EventEmitter {
const end = Math.min(start + MAX_PAYLOAD, totalSize);
const slice = chunk.slice(start, end);
// Header (22 bytes)
// Header (24 bytes)
const header = Buffer.alloc(HEADER_SIZE);
header.writeUInt8(1, 0); // Version
const mType = streamType === 'screen' ? MediaType.Screen : MediaType.Video;
@ -302,14 +261,15 @@ export class NetworkManager extends EventEmitter {
const packet = Buffer.concat([header, slice]);
// Enqueue for pacing
this.udpQueue.push(packet);
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(packet);
}
}
}
// Simple audio frame sending (raw PCM) - matches original working implementation
// Simple audio frame sending (raw PCM)
sendAudioFrame(frame: Uint8Array) {
if (!this.udp) return;
if (!this.ws) return;
const header = Buffer.alloc(HEADER_SIZE);
header.writeUInt8(1, 0); // Version
@ -323,27 +283,20 @@ export class NetworkManager extends EventEmitter {
const packet = Buffer.concat([header, Buffer.from(frame)]);
// Send directly via pacer queue
this.udpQueue.push(packet);
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(packet);
}
}
sendEncodedAudioChunk(chunk: Uint8Array, timestamp: number) {
if (!this.udp) {
console.warn('[Network] UDP Socket not ready for Audio');
return;
}
if (!this.ws) return;
const totalSize = chunk.length;
const MAX_PAYLOAD = 1400; // Safe MTU
const MAX_PAYLOAD = 1400;
// PCM packets (approx 2KB) need fragmentation.
// We use the same logic as video but with Audio MediaType.
const fragmentCount = Math.ceil(totalSize / MAX_PAYLOAD);
// Log randomly to avoid spam but confirm activity
if (Math.random() < 0.05) console.log(`[Network] Sending Audio Chunk size=${totalSize} frags=${fragmentCount}`);
for (let i = 0; i < fragmentCount; i++) {
const start = i * MAX_PAYLOAD;
const end = Math.min(start + MAX_PAYLOAD, totalSize);
@ -357,74 +310,22 @@ export class NetworkManager extends EventEmitter {
header.writeBigUInt64LE(BigInt(Math.floor(timestamp)), 10);
header.writeUInt16LE(i, 18); // Frag idx
header.writeUInt16LE(fragmentCount, 20); // Frag cnt
header.writeUInt16LE(1, 22); // Flags (1=Keyframe, audio is always key)
header.writeUInt16LE(1, 22); // Flags
const packet = Buffer.concat([header, Buffer.from(slice)]);
this.udpQueue.push(packet);
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(packet);
}
}
this.audioSeq++;
}
startHeartbeat() {
if (this.heartbeatInterval) clearInterval(this.heartbeatInterval);
// Send immediately
this.sendHandshake();
// Send 3 bursts to ensure traversal
setTimeout(() => this.sendHandshake(), 500);
setTimeout(() => this.sendHandshake(), 1000);
// Keep-alive every 3 seconds
this.heartbeatInterval = setInterval(() => {
this.sendHandshake();
}, 3000);
}
sendHandshake() {
if (!this.udp || !this.userId || !this.roomCode) {
// console.error('[UDP] Cannot send handshake: missing udp, userId, or roomCode');
return;
}
const roomCodeBytes = Buffer.from(this.roomCode, 'utf-8');
const payloadLen = 4 + 8 + roomCodeBytes.length;
const payload = Buffer.alloc(payloadLen);
payload.writeUInt32LE(this.userId, 0); // user_id
payload.writeBigUInt64LE(BigInt(roomCodeBytes.length), 4); // string length
roomCodeBytes.copy(payload, 12); // room_code
const header = Buffer.alloc(HEADER_SIZE);
header.writeUInt8(1, 0); // Version
header.writeUInt8(3, 1); // MediaType.Command = 3
header.writeUInt32LE(this.userId, 2);
header.writeUInt32LE(0, 6); // Sequence
header.writeBigUInt64LE(BigInt(Date.now()), 10);
header.writeUInt16LE(0, 18); // Frag idx
header.writeUInt16LE(1, 20); // Frag cnt
header.writeUInt16LE(0, 22); // Flags
const packet = Buffer.concat([header, payload]);
// console.log(`[UDP] Sending Handshake: userId=${this.userId}, room=${this.roomCode}, ${packet.length} bytes to ${this.serverUdpHost}:${SERVER_UDP_PORT}`);
this.udp.send(packet, SERVER_UDP_PORT, this.serverUdpHost, (err) => {
if (err) console.error('UDP Handshake Send Error', err);
});
}
disconnect() {
if (this.pacerInterval) {
clearInterval(this.pacerInterval);
this.pacerInterval = null;
if (this.ws) {
this.ws.close();
this.ws = null;
}
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.ws) this.ws.close();
if (this.udp) this.udp.close();
this.ws = null;
this.udp = null;
}
}

View file

@ -482,7 +482,7 @@ function App() {
video: {
width: 1280,
height: 720,
frameRate: 30,
frameRate: 60,
deviceId: selectedVideoDevice ? { exact: selectedVideoDevice } : undefined
}
});
@ -566,7 +566,8 @@ function App() {
chromeMediaSource: 'desktop',
chromeMediaSourceId: sourceId,
maxWidth: 1920,
maxHeight: 1080
maxHeight: 1080,
maxFrameRate: 60
}
}
} as any);

View file

@ -51,8 +51,8 @@ export class MediaEngine extends SimpleEventEmitter {
codec: 'avc1.42001f', // H.264 Baseline Profile Level 3.1 (720p safe)
width: 1280,
height: 720,
bitrate: 2_000_000,
framerate: 30,
bitrate: 4_000_000,
framerate: 60,
latencyMode: 'realtime',
avc: { format: 'annexb' }
};
@ -62,8 +62,8 @@ export class MediaEngine extends SimpleEventEmitter {
codec: 'avc1.64002a',
width: 1920,
height: 1080,
bitrate: 2_000_000, // Reduced to 2 Mbps for better stability/FPS
framerate: 30,
bitrate: 8_000_000, // Reduced to 2 Mbps for better stability/FPS
framerate: 60,
latencyMode: 'realtime', // Changed from 'quality' to 'realtime' for lower latency
avc: { format: 'annexb' }
};
@ -193,7 +193,7 @@ export class MediaEngine extends SimpleEventEmitter {
// Note: Decoders are usually more flexible, but giving a hint helps.
// Screen share uses High Profile, Video uses Baseline.
const config: VideoDecoderConfig = streamType === 'screen'
? { codec: 'avc1.64002a', optimizeForLatency: false }
? { codec: 'avc1.64002a', optimizeForLatency: true }
: { codec: 'avc1.42001f', optimizeForLatency: true };
decoder.configure(config);