// Realtime sync via WebSocket — broadcast state-change notifications between devices of the same user. // Cliente reage à notificação fazendo pull do estado via REST. Sem entity-level diffing. import { WebSocketServer } from 'ws'; import { verifyToken } from './auth.js'; const TOKEN = process.env.BOAT_TOKEN; const HEARTBEAT_INTERVAL = 30000; // Map> const clientsByUser = new Map(); function addClient(userId, ws) { if (!clientsByUser.has(userId)) clientsByUser.set(userId, new Set()); clientsByUser.get(userId).add(ws); } function removeClient(userId, ws) { const set = clientsByUser.get(userId); if (!set) return; set.delete(ws); if (set.size === 0) clientsByUser.delete(userId); } // Resolve um token (JWT ou BOAT_TOKEN) → userId. Retorna null se inválido. function authenticateToken(token) { if (!token) return null; if (token === TOKEN) return 1; // legacy single-tenant const payload = verifyToken(token); if (!payload || payload.type !== 'access' || !payload.uid) return null; return payload.uid; } export function initRealtime(httpServer) { const wss = new WebSocketServer({ server: httpServer, path: '/ws' }); wss.on('connection', (ws, req) => { const url = new URL(req.url, `http://${req.headers.host}`); const token = url.searchParams.get('token') || ''; const deviceId = url.searchParams.get('device') || ('anon-' + Math.random().toString(36).slice(2, 8)); const userId = authenticateToken(token); if (!userId) { ws.send(JSON.stringify({ type: 'error', code: 'auth_failed' })); ws.close(1008, 'auth failed'); return; } ws.userId = userId; ws.deviceId = deviceId; ws.isAlive = true; addClient(userId, ws); ws.send(JSON.stringify({ type: 'hello', userId, deviceId, ts: Date.now() })); // Quantos devices estão online pro mesmo user broadcastPresence(userId); ws.on('pong', () => { ws.isAlive = true; }); ws.on('message', (raw) => { // Cliente pode mandar pings explícitos ou notificações. Ignoramos qualquer outra coisa. try { const msg = JSON.parse(raw.toString()); if (msg.type === 'ping') { ws.send(JSON.stringify({ type: 'pong', ts: Date.now() })); } } catch (e) { /* ignore */ } }); ws.on('close', () => { removeClient(userId, ws); broadcastPresence(userId); }); ws.on('error', (err) => { console.warn('[ws] error', err.message); }); }); // Heartbeat: drop dead connections const heartbeat = setInterval(() => { wss.clients.forEach((ws) => { if (ws.isAlive === false) return ws.terminate(); ws.isAlive = false; try { ws.ping(); } catch (e) {} }); }, HEARTBEAT_INTERVAL); wss.on('close', () => clearInterval(heartbeat)); console.log('[ws] WebSocket server attached at /ws'); return wss; } // Notifica todos os devices do user (exceto o que originou) que o estado mudou. // payload: { kind: 'state'|'pending'|'trip'|..., ts, originDeviceId? } export function broadcastStateChange(userId, payload = {}) { const set = clientsByUser.get(userId); if (!set) return; const msg = JSON.stringify({ type: 'state:changed', ts: Date.now(), ...payload, }); for (const ws of set) { if (ws.readyState !== ws.OPEN) continue; if (payload.originDeviceId && ws.deviceId === payload.originDeviceId) continue; try { ws.send(msg); } catch (e) {} } } function broadcastPresence(userId) { const set = clientsByUser.get(userId); if (!set) return; const msg = JSON.stringify({ type: 'presence', count: set.size, ts: Date.now() }); for (const ws of set) { if (ws.readyState === ws.OPEN) { try { ws.send(msg); } catch (e) {} } } } export function getOnlineCount(userId) { return clientsByUser.get(userId)?.size || 0; }