Creating a blocking Queue<T> in .NET?

Posted by spoon16 on Stack Overflow See other posts from Stack Overflow or by spoon16
Published on 2009-02-09T22:02:34Z Indexed on 2010/05/07 14:18 UTC
Read the original article Hit count: 285

Filed under:
|
|
|
|

I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.

The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

© Stack Overflow or respective owner

Related posts about queue

Related posts about multithreading