I’ve built a simple SSE (Server-Sent Events) server in Node.js using Express. When I try to connect multiple clients (4000) from the same machine using a stress test script, the server fails to maintain all connections and throws the following error:
[SSE ERROR] Client 1754470519403.0117 connection error: aborted
On the client side, I also see:
./sse_stress_test.sh: fork: Resource temporarily unavailable
Node.js SSE Server Code
import { Router } from "express";
export const DiscoverStationsRoute = Router()
// Store all connected clients
const connectedClients = [];
let broadcastInterval;
// Function to broadcast message to all connected clients
const broadcastToAllClients = (data) => {
const message = `data: ${JSON.stringify(data)}\n\n`;
connectedClients.forEach((client, index) => {
try {
if (client.res.writableEnded) {
connectedClients.splice(index, 1);
} else {
client.res.write(message);
}
} catch (error) {
console.error(`[BROADCAST ERROR] Failed to send to client ${client.id}:`, error.message);
connectedClients.splice(index, 1);
}
});
};
// Start broadcasting timer if not already running
const startBroadcasting = () => {
if (!broadcastInterval) {
let count = 0;
broadcastInterval = setInterval(() => {
count++;
const broadcastData = {
timestamp: new Date().toISOString(),
count: count,
message: "Broadcast update",
data: "data" // You can modify this data as needed
};
broadcastToAllClients(broadcastData);
}, 5000); // Broadcast every 5 seconds
}
};
// Stop broadcasting timer
const stopBroadcasting = () => {
if (broadcastInterval) {
clearInterval(broadcastInterval);
broadcastInterval = null;
}
};
DiscoverStationsRoute.get("/", (req, res) => {
try {
// Set headers for SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control');
res.flushHeaders();
// Add this client to the connected clients array
const clientId = Date.now() + Math.random();
const client = { id: clientId, res: res };
connectedClients.push(client);
// Send initial data to this client
const initialData = {
timestamp: new Date().toISOString(),
message: "Connected to SSE",
clientId: clientId,
totalClients: connectedClients.length,
data: 1
};
try {
res.write(`data: ${JSON.stringify(initialData)}\n\n`);
} catch (error) {
console.error(`[SSE ERROR] Failed to send initial data to client ${clientId}:`, error.message);
}
// Start broadcasting if not already running
startBroadcasting();
// Clean up when client closes connection
req.on('close', () => {
const index = connectedClients.findIndex(client => client.id === clientId);
if (index > -1) {
connectedClients.splice(index, 1);
}
// Stop broadcasting if no clients are connected
if (connectedClients.length === 0 && broadcastInterval) {
stopBroadcasting();
}
});
// Handle client errors
req.on('error', (error) => {
console.error(`[SSE ERROR] Client ${clientId} connection error:`, error.message);
const index = connectedClients.findIndex(client => client.id === clientId);
if (index > -1) {
connectedClients.splice(index, 1);
}
});
} catch (error) {
console.error('[SSE CRITICAL ERROR] Failed to setup SSE connection:', error.message);
console.error('[SSE CRITICAL ERROR] Stack trace:', error.stack);
res.status(500).end();
}
});
// Optional: Add an endpoint to manually trigger a broadcast
DiscoverStationsRoute.post("/broadcast", (req, res) => {
try {
const customData = {
timestamp: new Date().toISOString(),
message: "Manual broadcast triggered",
data: req.body || "default data"
};
broadcastToAllClients(customData);
res.json({
success: true,
message: `Broadcast sent to ${connectedClients.length} clients`,
connectedClients: connectedClients.length
});
} catch (error) {
console.error('[MANUAL BROADCAST ERROR] Failed to process manual broadcast:', error.message);
res.status(500).json({
success: false,
error: error.message
});
}
});
// Optional: Add an endpoint to get current connection status
DiscoverStationsRoute.get("/status", (req, res) => {
try {
const status = {
connectedClients: connectedClients.length,
isBroadcasting: !!broadcastInterval,
clients: connectedClients.map(client => ({ id: client.id })),
timestamp: new Date().toISOString()
};
res.json(status);
} catch (error) {
console.error('[STATUS ERROR] Failed to get status:', error.message);
res.status(500).json({
error: error.message
});
}
});
// Graceful shutdown handler
process.on('SIGINT', () => {
stopBroadcasting();
process.exit(0);
});
process.on('SIGTERM', () => {
stopBroadcasting();
process.exit(0);
});
Stress Test Script:
#!/bin/bash
URL="http://localhost:3000/api/v1/discover-stations"
TOTAL_CLIENTS=4000
LOG_DIR="./sse_logs"
rm -rf "$LOG_DIR"
mkdir -p "$LOG_DIR"
echo "Starting $TOTAL_CLIENTS SSE connections to $URL"
for i in $(seq 1 $TOTAL_CLIENTS); do
{
CLIENT_LOG="$LOG_DIR/client_$i.log"
echo "Client $i connected" >> "$CLIENT_LOG"
COUNT=0
curl -s -N "$URL" | while read -r line; do
if [[ "$line" == data:* ]]; then
COUNT=$((COUNT + 1))
echo "Client $i - Message #$COUNT received: $line" >> "$CLIENT_LOG"
fi
done
} &
done
echo "All $TOTAL_CLIENTS connections initiated."
echo "Streaming logs are saved in $LOG_DIR/"
Observations:
After a certain number of connections, errors start appearing:
-fork: Resource temporarily unavailable
-Client connection aborted
-I'm using macOS and running both server and stress test script locally.