Skip to content

Commit e006960

Browse files
sfastsfast
authored andcommitted
cleanup
1 parent 1e91172 commit e006960

File tree

17 files changed

+483
-404
lines changed

17 files changed

+483
-404
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/* eslint-disable no-console */
2+
const zmq = require('zeromq');
3+
4+
(async () => {
5+
const address = process.env.ADDR || 'tcp://127.0.0.1:7777';
6+
7+
// Simple config - ZeroMQ handles reconnection natively
8+
const dealer = new zmq.Dealer({
9+
routingId: 'dealer-monitor',
10+
reconnectInterval: 5000, // Retry every 5s (or -1 to disable)
11+
reconnectMaxInterval: 20000 // Max backoff 20s (0 = constant interval)
12+
});
13+
14+
console.log('[dealer] Config:', { address, ...dealer });
15+
console.log('[dealer] Starting...\n');
16+
17+
// Attach event listeners (just for observability)
18+
const on = (name) => dealer.events.on(name, (...args) => {
19+
const ts = new Date().toISOString();
20+
console.log(`[${ts}] 📡 ${name}`, ...args);
21+
});
22+
23+
['connect', 'disconnect', 'connect_retry', 'connect:retry'].forEach(on);
24+
25+
await dealer.connect(address);
26+
console.log(`[dealer] ✅ Connect issued to ${address}\n`);
27+
28+
// Track connection state for smart sending
29+
let isConnected = false;
30+
dealer.events.on('connect', () => {
31+
isConnected = true;
32+
console.log('[dealer] 🟢 CONNECTED - messages will be sent\n');
33+
});
34+
dealer.events.on('disconnect', () => {
35+
isConnected = false;
36+
console.log('[dealer] 🔴 DISCONNECTED - messages will be queued\n');
37+
});
38+
39+
// Send periodic pings (only when connected)
40+
let i = 0;
41+
setInterval(async () => {
42+
if (!isConnected) {
43+
console.log(`[dealer] ⏸️ Skip ping-${i++} (offline)`);
44+
return;
45+
}
46+
47+
try {
48+
const msg = `ping-${i++}`;
49+
await dealer.send(Buffer.from(msg));
50+
console.log(`[dealer] 📤 Sent: ${msg}`);
51+
} catch (e) {
52+
console.log(`[dealer] ❌ Send error: ${e.message}`);
53+
}
54+
}, 2000);
55+
56+
// Receive echoes
57+
(async () => {
58+
for await (const [msg] of dealer) {
59+
console.log(`[dealer] 📥 Received: ${msg.toString()}\n`);
60+
}
61+
})();
62+
63+
// Cleanup
64+
process.on('SIGINT', () => {
65+
console.log('\n[dealer] 🛑 Shutting down...');
66+
dealer.close();
67+
process.exit(0);
68+
});
69+
})();
70+
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/* eslint-disable no-console */
2+
const zmq = require('zeromq');
3+
4+
(async () => {
5+
const address = process.env.ADDR || 'tcp://127.0.0.1:7778';
6+
const sock = new zmq.Router({ routingId: 'router-monitor-7778' });
7+
8+
// Monitor common router-side events
9+
const on = (name) => sock.events.on(name, (...args) => {
10+
const ts = new Date().toISOString();
11+
console.log(`[${ts}] ROUTER events -> ${name}`, ...args);
12+
});
13+
[
14+
'listening',
15+
'accept',
16+
'accept_error',
17+
'bind_error',
18+
'closed',
19+
'close_error',
20+
'disconnect',
21+
].forEach(on);
22+
23+
await sock.bind(address);
24+
console.log(`[router] bound to ${address}`);
25+
26+
// Manual control: you can unbind/rebind the router yourself from the terminal
27+
// Example:
28+
// ADDR=tcp://127.0.0.1:7777 node scripts/zmq-monitor/router.js
29+
// Then in another terminal, stop and restart this process to simulate downtime
30+
31+
// Track connected dealers
32+
const dealers = new Map(); // routingId -> { lastSeen, messageCount }
33+
34+
console.log('[router] Waiting for messages...');
35+
36+
// Echo replies back to sender and track stats
37+
for await (const msg of sock) {
38+
console.log('[router] 🔔 Received message with', msg.length, 'frames');
39+
40+
// Router receives: [routingId, data] (2 frames)
41+
// ZeroMQ handles the empty delimiter internally in v6+
42+
try {
43+
if (msg.length >= 2) {
44+
const [routingId, ...dataFrames] = msg;
45+
const routingIdStr = routingId.toString();
46+
const data = dataFrames[0]?.toString() || '';
47+
48+
console.log(`[router] 🆔 Routing ID: ${routingIdStr}`);
49+
console.log(`[router] 📦 Data: ${data}`);
50+
51+
// Track dealer stats
52+
if (!dealers.has(routingIdStr)) {
53+
dealers.set(routingIdStr, { lastSeen: Date.now(), messageCount: 0 });
54+
console.log(`[router] 🔗 New dealer connected: ${routingIdStr}`);
55+
}
56+
57+
const dealer = dealers.get(routingIdStr);
58+
dealer.lastSeen = Date.now();
59+
dealer.messageCount++;
60+
61+
// Log received message
62+
if (data.startsWith('ping-')) {
63+
console.log(`[router] 📩 Received from ${routingIdStr}: ${data} (total: ${dealer.messageCount})`);
64+
} else {
65+
console.log(`[router] 📩 Received from ${routingIdStr}: ${data}`);
66+
}
67+
68+
// Echo back (router sends: [routingId, data])
69+
await sock.send([routingId, ...dataFrames]);
70+
console.log(`[router] 📤 Echoed back to ${routingIdStr}`);
71+
} else {
72+
console.log(`[router] ⚠️ Unexpected frame count: ${msg.length}`);
73+
}
74+
} catch (e) {
75+
console.log('[router] ❌ Error:', e && e.message);
76+
console.log('[router] Stack:', e && e.stack);
77+
}
78+
}
79+
80+
console.log('[router] Message loop ended');
81+
})();
82+
83+

src/protocol/client.js

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,11 @@ export default class Client extends Protocol {
236236
const socket = this._getSocket()
237237

238238
try {
239-
// Connect transport
240-
await socket.connect(serverAddress, timeout)
241-
242-
_scope.serverPeerInfo.setState('CONNECTED')
239+
// Issue connect (non-blocking - returns immediately)
240+
// ZeroMQ will connect in background and emit TRANSPORT_READY when ready
241+
await socket.connect(serverAddress)
243242

244-
// Wait for handshake to complete (CLIENT_READY event)
243+
// Wait for handshake to complete (includes implicit transport ready wait)
245244
await new Promise((resolve, reject) => {
246245
// Use provided timeout, then config, then global default, finally 10s
247246
const config = this.getConfig()
@@ -268,18 +267,17 @@ export default class Client extends Protocol {
268267

269268
this._stopPing()
270269

271-
// Notify server
272-
if (this.isReady()) {
273-
try {
274-
// ✅ Use internal API to send system event (client stop)
275-
this._sendSystemTick({
276-
event: ProtocolSystemEvent.CLIENT_STOP,
277-
data: { clientId: this.getId() }
278-
})
279-
} catch (err) {
280-
this.debugMode() && this.logger?.error('Error sending client stop: ', err)
281-
}
270+
// Try to notify server
271+
try {
272+
// ✅ Use internal API to send system event (client stop)
273+
this._sendSystemTick({
274+
event: ProtocolSystemEvent.CLIENT_STOP,
275+
data: { clientId: this.getId() }
276+
})
277+
} catch (err) {
278+
this.debugMode() && this.logger?.error('Error sending client stop: ', err)
282279
}
280+
283281

284282
// disconnect from transport and detach listeners
285283
await super.disconnect();

src/protocol/protocol.js

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -808,20 +808,32 @@ export default class Protocol extends EventEmitter {
808808
*/
809809
async disconnect () {
810810
let { socket } = _private.get(this)
811-
this._detachSocketEventHandlers(socket)
812-
813811
await socket.disconnect();
814812
}
815813

814+
/**
815+
* Unbind protocol from transport events without closing or rejecting pending.
816+
* - Idempotent: safe to call multiple times
817+
* - Does NOT set closed flag
818+
* - Does NOT reject pending requests
819+
* - Does NOT close underlying transport
820+
*/
821+
822+
async unbind () {
823+
let { socket } = _private.get(this)
824+
// Keep socket event handlers attached so further transport events (e.g., CLOSED)
825+
// still propagate through Protocol to consumers. Just unbind transport here.
826+
await socket.unbind();
827+
}
828+
816829
/**
817830
* Close the protocol and cleanup resources.
818831
* - Idempotent
819832
* - Detaches protocol-attached socket listeners
820833
* - Rejects and clears pending requests
821834
* - Optionally closes the underlying transport
822835
*
823-
* @param {Object} [options]
824-
* @param {boolean} [options.closeTransport=true] - Whether to close the socket
836+
* @param {boolean} [closeTransport=false] - Whether to close the socket
825837
*/
826838
async close (closeTransport = false) {
827839
let _scope = _private.get(this)

src/protocol/server.js

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,29 +190,26 @@ export default class Server extends Protocol {
190190
async unbind () {
191191
this._stopHealthChecks()
192192

193-
// Notify all clients
194-
if (this.isOnline()) {
195-
try {
196-
this.tick({
197-
event: ProtocolSystemEvent.SERVER_STOP,
198-
data: { serverId: this.getId() }
199-
})
200-
} catch (err) {
201-
// Ignore if offline
193+
// Notify all clients individually with system event before unbind
194+
try {
195+
let { clientPeers } = _private.get(this)
196+
for (const clientId of clientPeers.keys()) {
197+
this._sendSystemTick({
198+
to: clientId,
199+
event: ProtocolSystemEvent.SERVER_STOP,
200+
data: { serverId: this.getId() }
201+
})
202202
}
203+
} catch (err) {
204+
this.debugMode() && this.logger?.error('Error sending server stop: ', err)
203205
}
204206

205-
// ✅ Use Protocol's socket (via protected method)
206-
const socket = this._getSocket()
207-
await socket.unbind()
207+
await super.unbind()
208208
}
209209

210210
async close () {
211211
await this.unbind()
212-
213-
// ✅ Use Protocol's socket (via protected method)
214-
const socket = this._getSocket()
215-
await socket.close()
212+
await super.close(true)
216213
}
217214

218215
getAddress () {

src/transport/errors.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export const TransportErrorCode = {
3131

3232
// Address errors
3333
INVALID_ADDRESS: 'TRANSPORT_INVALID_ADDRESS', // Invalid address format
34-
ADDRESS_REQUIRED: 'TRANSPORT_ADDRESS_REQUIRED', // Address not provided
34+
// ADDRESS_REQUIRED: 'TRANSPORT_ADDRESS_REQUIRED', // Address not provided
3535

3636
// Lifecycle errors
3737
CLOSE_FAILED: 'TRANSPORT_CLOSE_FAILED' // Failed to close cleanly

src/transport/zeromq/config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ export const ZMQConfigDefaults = {
108108
* ZMQ_RECONNECT_IVL: Initial reconnection interval in milliseconds
109109
* How often ZeroMQ tries to reconnect after losing connection
110110
* Default: 100ms (fast reconnection)
111+
* -1 means no reconnection
111112
*/
112113
ZMQ_RECONNECT_IVL: 100,
113114

0 commit comments

Comments
 (0)