Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Same fix for the Sockets
  • Loading branch information
VSadov committed Nov 22, 2025
commit f20fe64e2f36f6d5f9bf40b3f43982b3d647f9d3
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ private static SocketAsyncEngine[] CreateEngines()
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();

// The scheme works as follows:
// - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
// - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
// - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
// or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
//
// The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
// Another work item isn't enqueued to the thread pool hastily while the state is Determining,
// instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
private enum EventQueueProcessingStage
{
// NotScheduled: has no guarantees
NotScheduled,
Determining,

// Scheduled: means a worker will check work queues and ensure that
// any work items inserted in work queue before setting the flag
// are picked up.
// Note: The state must be cleared by the worker thread _before_
// checking. Otherwise there is a window between finding no work
// and resetting the flag, when the flag is in a wrong state.
// A new work item may be added right before the flag is reset
// without asking for a worker, while the last worker is quitting.
Scheduled
}

Expand Down Expand Up @@ -231,14 +231,9 @@ private void EventLoop()
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");

// Only enqueue a work item if the stage is NotScheduled.
// Otherwise there must be a work item already queued or another thread already handling parallelization.
if (handler.HandleSocketEvents(numEvents) &&
Interlocked.Exchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled)
if (handler.HandleSocketEvents(numEvents))
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
EnsureWorkerScheduled();
}
}
}
Expand All @@ -248,70 +243,42 @@ private void EventLoop()
}
}

private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void EnsureWorkerScheduled()
{
if (!isEventQueueEmpty)
// Only one thread is requested at a time to mitigate Thundering Herd problem.
// That is the minimum - we have inserted a workitem and NotScheduled
// state requires asking for a worker.
if (Interlocked.Exchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled)
{
// There are more events to process, set stage to Scheduled and enqueue a work item.
_eventQueueProcessingStage = EventQueueProcessingStage.Scheduled;
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
else
{
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so schedule one now.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
}

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

void IThreadPoolWorkItem.Execute()
{
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
SocketIOEvent ev;
while (true)
{
Debug.Assert(_eventQueueProcessingStage == EventQueueProcessingStage.Scheduled);
_eventQueueProcessingStage = EventQueueProcessingStage.NotScheduled;

// The change needs to be visible to other threads that may request a worker thread before a work item is attempted
// to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
// Scheduled, and try to dequeue again or request another thread.
_eventQueueProcessingStage = EventQueueProcessingStage.Determining;
Interlocked.MemoryBarrier();
// Checking for items must happen after resetting the processing state.
Interlocked.MemoryBarrier();

if (eventQueue.TryDequeue(out ev))
{
break;
}

// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
if (!eventQueue.TryDequeue(out SocketIOEvent ev))
{
return;
}

UpdateEventQueueProcessingStage(eventQueue.IsEmpty);
// The batch that is currently in the queue could have asked only for one worker.
// We are going to process a workitem, which may take unknown time or even block.
// In a worst case the current workitem will indirectly depend on progress of other
// items and that would lead to a deadlock if noone else checks the queue.
// We must ensure at least one more worker is coming if the queue is not empty.
if (!eventQueue.IsEmpty)
{
EnsureWorkerScheduled();
}

int startTimeMs = Environment.TickCount;
do
Expand Down
Loading