what's wrong with my producer-consumer queue design?

Posted by toasteroven on Stack Overflow See other posts from Stack Overflow or by toasteroven
Published on 2010-06-02T19:39:46Z Indexed on 2010/06/02 19:44 UTC
Read the original article Hit count: 303

I'm starting with the C# code example here. I'm trying to adapt it for a couple reasons: 1) in my scenario, all tasks will be put in the queue up-front before consumers will start, and 2) I wanted to abstract the worker into a separate class instead of having raw Thread members within the WorkerQueue class.

My queue doesn't seem to dispose of itself though, it just hangs, and when I break in Visual Studio it's stuck on the _th.Join() line for WorkerThread #1. Also, is there a better way to organize this? Something about exposing the WaitOne() and Join() methods seems wrong, but I couldn't think of an appropriate way to let the WorkerThread interact with the queue.

Also, an aside - if I call q.Start(#) at the top of the using block, only some of the threads every kick in (e.g. threads 1, 2, and 8 process every task). Why is this? Is it a race condition of some sort, or am I doing something wrong?


using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;

namespace QueueTest
{
    class Program
    {
        static void Main(string[] args)
        {
            using (WorkQueue q = new WorkQueue())
            {
                q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });

                Random r = new Random();
                foreach (int i in Enumerable.Range(1, 10))
                    q.Enqueue(r.Next(100, 500));

                Console.WriteLine("All jobs queued");
                q.Start(8);
            }
        }
    }

    class WorkQueue : IDisposable
    {
        private Queue _jobs = new Queue();
        private int _job_count;
        private EventWaitHandle _wh = new AutoResetEvent(false);
        private object _lock = new object();
        private List _th;
        public event Action Finished;

        public WorkQueue()
        {
        }

        public void Start(int num_threads)
        {
            _job_count = _jobs.Count;
            _th = new List(num_threads);
            foreach (int i in Enumerable.Range(1, num_threads))
            {
                _th.Add(new WorkerThread(i, this));
                _th[_th.Count - 1].JobFinished += new Action(WorkQueue_JobFinished);
            }
        }

        void WorkQueue_JobFinished(int obj)
        {
            lock (_lock)
            {
                _job_count--;
                if (_job_count == 0 && Finished != null)
                    Finished();
            }
        }

        public void Enqueue(int job)
        {
            lock (_lock)
                _jobs.Enqueue(job);

            _wh.Set();
        }

        public void Dispose()
        {
            Enqueue(Int32.MinValue);
            _th.ForEach(th => th.Join());
            _wh.Close();
        }

        public int GetNextJob()
        {
            lock (_lock)
            {
                if (_jobs.Count > 0)
                    return _jobs.Dequeue();
                else
                    return Int32.MinValue;
            }
        }

        public void WaitOne()
        {
            _wh.WaitOne();
        }
    }

    class WorkerThread
    {
        private Thread _th;
        private WorkQueue _q;
        private int _i;

        public event Action JobFinished;

        public WorkerThread(int i, WorkQueue q)
        {
            _i = i;
            _q = q;
            _th = new Thread(DoWork);
            _th.Start();
        }

        public void Join()
        {
            _th.Join();
        }

        private void DoWork()
        {
            while (true)
            {
                int job = _q.GetNextJob();
                if (job != Int32.MinValue)
                {
                    Console.WriteLine("Thread {0} Got job {1}", _i, job);
                    Thread.Sleep(job * 10); // in reality would to actual work here
                    if (JobFinished != null)
                        JobFinished(job);
                }
                else
                {
                    Console.WriteLine("Thread {0} no job available", _i);
                    _q.WaitOne();
                }
            }
        }
    }
}

© Stack Overflow or respective owner

Related posts about c#

Related posts about multithreading