ThreadPool + Async methods + ManualResetEvent = Bad
Okay, today it was re enforced that the ThreadPool is not good for long running tasks, specifically those that might use the ThreadPool or use async methods or ResetEvents. I didn’t really track down the root cause, but generally it locked while waiting for a set event. Regardless, the solution as is usually the case was to simplify the code. First by not using a ResetEvent and ThreadPooling for queuing the workload. In place of that a simple QueueRunner class was created using the Monitor class. Using the Monitor.Wait and Monitor.Pulse was new to me, but works well and seems to be very fast. There is a great article here about threading in c#. I’m not going to re-explain what already been said there, but below is the code. It could be enhanced for other uses and performance.
[TestFixture]
public class QueueRunnerTest
{
private int _count = 0;
[Test]
public void Test100ExpectSuccess()
{
Queue<int> queue = new Queue<int>();
for (int i = 0; i < 100; i++)
queue.Enqueue(i);
_count = 0;
QueueRunner<int> runner = new QueueRunner<int>(Work, queue);
runner.RunAndWait();
Assert.That(_count, Is.EqualTo(100));
}
private void Work(object obj)
{
Console.WriteLine("Working " + obj);
Interlocked.Increment(ref _count);
}
}
class QueueRunner<T>
{
private readonly object _syncLock = new object();
private readonly Queue<T> _queue = new Queue<T>();
private int _running = 0;
private readonly int _maxThreads;
private readonly ParameterizedThreadStart _workerDelegate;
private int _toBeCompleted = 0;
public QueueRunner(ParameterizedThreadStart workerDelegate, Queue<T> queue)
: this(workerDelegate, queue, 5)
{
}
public QueueRunner(ParameterizedThreadStart workerDelegate, Queue<T> queue, int maxThreads)
{
_workerDelegate = workerDelegate;
_maxThreads = maxThreads;
_queue = queue;
_toBeCompleted = _queue.Count;
}
public void RunAndWait()
{
new Thread(Start).Start();
Wait();
}
private void Start()
{
lock (_syncLock)
while (_queue.Count > 0)
{
while (_running < _maxThreads)
{
new Thread(DelegateWrapper).Start(_queue.Dequeue());
Interlocked.Increment(ref _running);
}
Monitor.Wait(_syncLock);
}
}
private void DelegateWrapper(object state)
{
_workerDelegate(state);
DecrementCount();
}
private void Wait()
{
lock (_syncLock)
while (_toBeCompleted > 0)
Monitor.Wait(_syncLock);
}
private void DecrementCount()
{
Interlocked.Decrement(ref _running);
Interlocked.Decrement(ref _toBeCompleted);
lock (_syncLock)
Monitor.PulseAll(_syncLock);
}
}Resources:
Threading in C# - Joseph Albahari
Delegate BeginInvoke and ManualResetEvent.WaitOne()
Webmentions
These are webmentions via the IndieWeb and webmention.io. Mention this post from your site: