BlockingCollection.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Sys / System / Collections / Concurrent / BlockingCollection.cs / 1305376 / BlockingCollection.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// BlockingCollection.cs 
//
// [....] 
//
// A class that implements the bounding and blocking functionality while abstracting away
// the underlying storage mechanism. This file also contains BlockingCollection's
// associated debugger view type. 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 
#pragma warning disable 0420 
using System;
using System.Collections.Generic; 
using System.Collections;
using System.Diagnostics;
using System.Globalization;
using System.Security.Permissions; 
using System.Runtime.InteropServices;
using System.Threading; 
 
namespace System.Collections.Concurrent
{ 
    /// 
    /// Provides blocking and bounding capabilities for thread-safe collections that
    /// implement .
    ///  
    /// 
    ///  represents a collection 
    /// that allows for thread-safe adding and removing of data. 
    ///  is used as a wrapper
    /// for an  instance, allowing 
    /// removal attempts from the collection to block until data is available to be removed.  Similarly,
    /// a  can be created to enforce
    /// an upper-bound on the number of data elements allowed in the
    /// ; addition attempts to the 
    /// collection may then block until space is available to store the added items.  In this manner,
    ///  is similar to a traditional 
    /// blocking queue data structure, except that the underlying data storage mechanism is abstracted 
    /// away as an .
    ///  
    /// Specifies the type of elements in the collection.
    [ComVisible(false)]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
    [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))] 
    [DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
    public class BlockingCollection : IEnumerable, ICollection, IDisposable 
    { 
        private IProducerConsumerCollection m_collection;
        private int m_boundedCapacity; 
        private const int NON_BOUNDED = -1;
        private SemaphoreSlim m_freeNodes;
        private SemaphoreSlim m_occupiedNodes;
        private volatile bool m_isDisposed; 
        private CancellationTokenSource m_ConsumersCancellationTokenSource;
        private CancellationTokenSource m_ProducersCancellationTokenSource; 
 
        private volatile int m_currentAdders;
        private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000); 

        #region Enums
        /// An enumerated data type used internal to the class to specify to a generic method
        /// the current mode of operation. 
        private enum OperationMode
        { 
            Add, Take 
        };
        #endregion 

        #region Properties
        /// Gets the bounded capacity of this  instance.
        /// The bounded capacity of this collection, or int.MaxValue if no bound was supplied. 
        /// The  has been disposed. 
        public int BoundedCapacity 
        {
            get 
            {
                CheckDisposed();
                return m_boundedCapacity;
            } 
        }
 
        /// Gets whether this  has been marked as complete for adding. 
        /// Whether this collection has been marked as complete for adding.
        /// The  has been disposed.
        public bool IsAddingCompleted
        {
            get 
            {
                CheckDisposed(); 
                return (m_currentAdders == COMPLETE_ADDING_ON_MASK); 
            }
        } 

        /// Gets whether this  has been marked as complete for adding and is empty.
        /// Whether this collection has been marked as complete for adding and is empty.
        /// The  has been disposed.
        public bool IsCompleted 
        { 
            get
            { 
                CheckDisposed();
                return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
            }
        } 

        /// Gets the number of items contained in the . 
        /// The number of items contained in the . 
        /// The  has been disposed. 
        public int Count
        {
            get
            { 
                CheckDisposed();
                return m_occupiedNodes.CurrentCount; 
            } 
        }
 
        /// Gets a value indicating whether access to the  is synchronized.
        /// The  has been disposed.
        bool ICollection.IsSynchronized 
        {
            get 
            { 
                CheckDisposed();
                return false; 
            }
        }

        ///  
        /// Gets an object that can be used to synchronize access to the . This property is not supported. 
        ///  
        /// The SyncRoot property is not supported.
        object ICollection.SyncRoot 
        {
            get
            {
                throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); 
            }
        } 
        #endregion 

 
        /// Initializes a new instance of the
        /// 
        /// class without an upper-bound.
        ///  
        /// 
        /// The default underlying collection is a ConcurrentQueue<T>. 
        ///  
        public BlockingCollection()
            : this(new ConcurrentQueue()) 
        {
        }

        /// Initializes a new instance of the 
        /// class with the specified upper-bound. 
        ///  
        /// The bounded size of the collection.
        /// The  is 
        /// not a positive value.
        /// 
        /// The default underlying collection is a ConcurrentQueue<T>.
        ///  
        public BlockingCollection(int boundedCapacity)
            : this(new ConcurrentQueue(), boundedCapacity) 
        { 
        }
 
        /// Initializes a new instance of the 
        /// class with the specified upper-bound and using the provided
        ///  as its underlying data store.
        /// The collection to use as the underlying data store. 
        /// The bounded size of the collection.
        /// The  argument is 
        /// null. 
        /// The  is not a positive value.
        /// The supplied  contains more values 
        /// than is permitted by .
        public BlockingCollection(IProducerConsumerCollection collection, int boundedCapacity)
        {
            if (boundedCapacity < 1) 
            {
                throw new ArgumentOutOfRangeException( 
                    "boundedCapacity", boundedCapacity, 
                    SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
            } 
            if (collection == null)
            {
                throw new ArgumentNullException("collection");
            } 
            int count = collection.Count;
            if (count > boundedCapacity) 
            { 
                throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
            } 
            Initialize(collection, boundedCapacity, count);
        }

        /// Initializes a new instance of the  
        /// class without an upper-bound and using the provided
        ///  as its underlying data store. 
        /// The collection to use as the underlying data store. 
        /// The  argument is
        /// null. 
        public BlockingCollection(IProducerConsumerCollection collection)
        {
            if (collection == null)
            { 
                throw new ArgumentNullException("collection");
            } 
            Initialize(collection, NON_BOUNDED, collection.Count); 
        }
 
        /// Initializes the BlockingCollection instance.
        /// The collection to use as the underlying data store.
        /// The bounded size of the collection.
        /// The number of items currently in the underlying collection. 
        private void Initialize(IProducerConsumerCollection collection, int boundedCapacity, int collectionCount)
        { 
            Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED); 

            m_collection = collection; 
            m_boundedCapacity = boundedCapacity; ;
            m_isDisposed = false;
            m_ConsumersCancellationTokenSource = new CancellationTokenSource();
            m_ProducersCancellationTokenSource = new CancellationTokenSource(); 

            if (boundedCapacity == NON_BOUNDED) 
            { 
                m_freeNodes = null;
            } 
            else
            {
                Debug.Assert(boundedCapacity > 0);
                m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount); 
            }
 
 
            m_occupiedNodes = new SemaphoreSlim(collectionCount);
        } 


        /// 
        /// Adds the item to the . 
        /// 
        /// The item to be added to the collection. The value can be a null reference. 
        /// The  has been marked
        /// as complete with regards to additions. 
        /// The  has been disposed.
        /// The underlying collection didn't accept the item.
        ///  
        /// If a bounded capacity was specified when this instance of
        ///  was initialized, 
        /// a call to Add may block until space is available to store the provided item. 
        /// 
        public void Add(T item) 
        {
#if DEBUG
            bool tryAddReturnValue =
#endif 
            TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
#if DEBUG 
            Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); 
#endif
        } 

        /// 
        /// Adds the item to the .
        /// A  is thrown if the  is 
        /// canceled.
        ///  
        /// The item to be added to the collection. The value can be a null reference. 
        /// A cancellation token to observe.
        /// If the  is canceled. 
        /// The  has been marked
        /// as complete with regards to additions.
        /// The  has been disposed.
        /// The underlying collection didn't accept the item. 
        ///  
        /// If a bounded capacity was specified when this instance of
        ///  was initialized, 
        /// a call to  may block until space is available to store the provided item.
        /// 
        public void Add(T item, CancellationToken cancellationToken)
        { 
#if DEBUG
            bool tryAddReturnValue = 
#endif 
            TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken);
#if DEBUG 
            Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
        }
 
        /// 
        /// Attempts to add the specified item to the . 
        ///  
        /// The item to be added to the collection.
        /// true if the  could be added; otherwise, false. 
        /// The  has been marked
        /// as complete with regards to additions.
        /// The  has been disposed.
        /// The underlying collection didn't accept the item. 
        public bool TryAdd(T item) 
        {
            return TryAddWithNoTimeValidation(item, 0, new CancellationToken()); 
        }

        /// 
        /// Attempts to add the specified item to the . 
        /// 
        /// The item to be added to the collection. 
        /// A  that represents the number of milliseconds 
        /// to wait, or a  that represents -1 milliseconds to wait indefinitely.
        ///  
        /// true if the  could be added to the collection within
        /// the alloted time; otherwise, false.
        /// The  has been marked 
        /// as complete with regards to additions.
        /// The  has been disposed. 
        ///  is a negative number
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than 
        /// .
        /// The underlying collection didn't accept the item.
        public bool TryAdd(T item, TimeSpan timeout)
        { 
            ValidateTimeout(timeout);
            return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken()); 
        } 

        ///  
        /// Attempts to add the specified item to the .
        /// 
        /// The item to be added to the collection.
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.
        /// true if the  could be added to the collection within 
        /// the alloted time; otherwise, false. 
        /// The  has been marked 
        /// as complete with regards to additions.
        /// The  has been disposed.
        ///  is a 
        /// negative number other than -1, which represents an infinite time-out.
        /// The underlying collection didn't accept the item. 
        public bool TryAdd(T item, int millisecondsTimeout) 
        {
            ValidateMillisecondsTimeout(millisecondsTimeout); 
            return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken());
        }

        ///  
        /// Attempts to add the specified item to the .
        /// A  is thrown if the  is 
        /// canceled. 
        /// 
        /// The item to be added to the collection. 
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.
        /// A cancellation token to observe.
        /// true if the  could be added to the collection within 
        /// the alloted time; otherwise, false.
        /// If the  is canceled. 
        /// The  has been marked
        /// as complete with regards to additions. 
        /// The  has been disposed.
        ///  is a
        /// negative number other than -1, which represents an infinite time-out. 
        /// The underlying collection didn't accept the item.
        public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken) 
        { 
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken); 
        }

        /// Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add
        /// method. If a bounded capacity was specified and the collection was full, 
        /// this method will wait for, at most, the timeout period trying to add the item.
        /// If the timeout period was exhaused before successfully adding the item this method will 
        /// return false. 
        /// The item to be added to the collection.
        /// The number of milliseconds to wait for the collection to accept the item, 
        /// or Timeout.Infinite to wait indefinitely.
        /// A cancellation token to observe.
        /// False if the collection remained full till the timeout period was exhausted.True otherwise.
        /// If the  is canceled. 
        /// the collection has already been marked
        /// as complete with regards to additions. 
        /// If the collection has been disposed. 
        /// The underlying collection didn't accept the item.
        private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken) 
        {
            CheckDisposed();

            if (cancellationToken.IsCancellationRequested) 
                throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
 
            if (IsAddingCompleted) 
            {
                throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); 
            }

            bool waitForSemaphoreWasSuccessful = true;
 
            if (m_freeNodes != null)
            { 
                //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding() 
                //was called concurrently with Adding which is not supported by BlockingCollection.
                CancellationTokenSource linkedTokenSource = null; 
                try
                {
                    waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
                    if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) 
                    {
                        linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( 
                            cancellationToken, m_ProducersCancellationTokenSource.Token); 
                        waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
                    } 
                }
                catch (OperationCanceledException)
                {
                    //if cancellation was via external token, throw an OCE 
                    if (cancellationToken.IsCancellationRequested)
                        throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); 
 
                    //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
                    //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested); 

                    throw new InvalidOperationException
                        (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
                } 
                finally
                { 
                    if (linkedTokenSource != null) 
                    {
                        linkedTokenSource.Dispose(); 
                    }
                }
            }
            if (waitForSemaphoreWasSuccessful) 
            {
                // Update the adders count if the complete adding was not requested, otherwise 
                // spins until all adders finish then throw IOE 
                // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
                // not been finished yet 
                SpinWait spinner = new SpinWait();
                while (true)
                {
                    int observedAdders = m_currentAdders; 
                    if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
                    { 
                        spinner.Reset(); 
                        // CompleteAdding is requested, spin then throw
                        while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); 
                        throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
                    }
                    if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
                    { 
                        Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
                        break; 
                    } 
                    spinner.SpinOnce();
                } 

                // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
                // 1- m_collection.TryAdd threw an exception
                // 2- m_collection.TryAdd succeeded 
                // 3- m_collection.TryAdd returned false
                // so we put the decrement code in the finally block 
                try 
                {
 
                    //TryAdd is guaranteed to find a place to add the element. Its return value depends
                    //on the semantics of the underlying store. Some underlying stores will not add an already
                    //existing item and thus TryAdd returns false indicating that the size of the underlying
                    //store did not increase. 

 
                    bool addingSucceeded = false; 
                    try
                    { 
                        //The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
                        //This fixes bug #702328, case 2 of 2.
                        cancellationToken.ThrowIfCancellationRequested();
                        addingSucceeded = m_collection.TryAdd(item); 
                    }
                    catch 
                    { 
                        //TryAdd did not result in increasing the size of the underlying store and hence we need
                        //to increment back the count of the m_freeNodes semaphore. 
                        if (m_freeNodes != null)
                        {
                            m_freeNodes.Release();
                        } 
                        throw;
                    } 
                    if (addingSucceeded) 
                    {
                        //After adding an element to the underlying storage, signal to the consumers 
                        //waiting on m_occupiedNodes that there is a new item added ready to be consumed.
                        m_occupiedNodes.Release();
                    }
                    else 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed)); 
                    } 
                }
                finally 
                {
                    // decrement the adders count
                    Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
                    Interlocked.Decrement(ref m_currentAdders); 
                }
 
 
            }
            return waitForSemaphoreWasSuccessful; 
        }

        /// Takes an item from the .
        /// The item removed from the collection. 
        /// The  is empty and has been marked 
        /// as complete with regards to additions. 
        /// The  has been disposed. 
        /// The underlying collection was modified
        /// outside of this  instance.
        /// A call to  may block until an item is available to be removed. 
        public T Take()
        { 
            T item; 

            if (!TryTake(out item, Timeout.Infinite, CancellationToken.None)) 
            {
                throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
            }
 
            return item;
        } 
 
        /// Takes an item from the .
        /// The item removed from the collection. 
        /// If the  is
        /// canceled or the  is empty and has been marked
        /// as complete with regards to additions. 
        /// The  has been disposed. 
        /// The underlying collection was modified 
        /// outside of this  instance. 
        /// A call to  may block until an item is available to be removed.
        public T Take(CancellationToken cancellationToken)
        {
            T item; 

            if (!TryTake(out item, Timeout.Infinite, cancellationToken)) 
            { 
                throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
            } 

            return item;
        }
 
        /// 
        /// Attempts to remove an item from the . 
        ///  
        /// The item removed from the collection.
        /// true if an item could be removed; otherwise, false. 
        /// The  has been disposed.
        /// The underlying collection was modified
        /// outside of this  instance.
        public bool TryTake(out T item) 
        { 
            return TryTake(out item, 0, CancellationToken.None);
        } 

        /// 
        /// Attempts to remove an item from the .
        ///  
        /// The item removed from the collection.
        /// A  that represents the number of milliseconds 
        /// to wait, or a  that represents -1 milliseconds to wait indefinitely. 
        /// 
        /// true if an item could be removed from the collection within 
        /// the alloted time; otherwise, false.
        /// The  has been disposed.
        ///  is a negative number 
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
        /// . 
        /// The underlying collection was modified 
        /// outside of this  instance. 
        public bool TryTake(out T item, TimeSpan timeout)
        {
            ValidateTimeout(timeout);
            return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null); 
        }
 
        ///  
        /// Attempts to remove an item from the .
        ///  
        /// The item removed from the collection.
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.
        /// true if an item could be removed from the collection within 
        /// the alloted time; otherwise, false.
        /// The  has been disposed. 
        ///  is a
        /// negative number other than -1, which represents an infinite time-out. 
        /// The underlying collection was modified
        /// outside of this  instance.
        public bool TryTake(out T item, int millisecondsTimeout) 
        {
            ValidateMillisecondsTimeout(millisecondsTimeout); 
            return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null); 
        }
 
        /// 
        /// Attempts to remove an item from the .
        /// A  is thrown if the  is
        /// canceled. 
        /// 
        /// The item removed from the collection. 
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.
        /// A cancellation token to observe. 
        /// true if an item could be removed from the collection within
        /// the alloted time; otherwise, false.
        /// If the  is canceled.
        /// The  has been disposed.
        ///  is a 
        /// negative number other than -1, which represents an infinite time-out. 
        /// The underlying collection was modified
        /// outside of this  instance.
        public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout); 
            return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
        } 
 
        /// Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take
        /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false) 
        /// trying to remove an item. If the timeout period was exhaused before successfully removing an item
        /// this method will return false.
        /// A  is thrown if the  is
        /// canceled. 
        /// 
        /// The item removed from the collection. 
        /// The number of milliseconds to wait for the collection to have an item available 
        /// for removal, or Timeout.Infinite to wait indefinitely.
        /// A cancellation token to observe. 
        /// A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token
        /// multiple times.
        /// False if the collection remained empty till the timeout period was exhausted. True otherwise.
        /// If the  is canceled. 
        /// If the collection has been disposed.
        private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource) 
        { 
            CheckDisposed();
            item = default(T); 

            if (cancellationToken.IsCancellationRequested)
                throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
 
            //If the collection is completed then there is no need to wait.
            if (IsCompleted) 
            { 
                return false;
            } 
            bool waitForSemaphoreWasSuccessful = false;

            // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable)
            CancellationTokenSource linkedTokenSource = combinedTokenSource; 
            try
            { 
                waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0); 
                if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
                { 
                    // create the linked token if it is not created yet
                    if (combinedTokenSource == null)
                        linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
                                                                                          m_ConsumersCancellationTokenSource.Token); 
                    waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
                } 
            } 
            //The collection became completed while waiting on the semaphore.
            catch (OperationCanceledException) 
            {
                if (cancellationToken.IsCancellationRequested)
                    throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
 
                return false;
            } 
            finally 
            {
                // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it 
                if (linkedTokenSource != null && combinedTokenSource == null)
                {
                    linkedTokenSource.Dispose();
                } 
            }
 
            if (waitForSemaphoreWasSuccessful) 
            {
                bool removeSucceeded = false; 
                bool removeFaulted = true;
                try
                {
                    //The token may have been canceled before an item arrived, so we need a check after the wait has completed. 
                    //This fixes bug #702328, case 1 of 2.
                    cancellationToken.ThrowIfCancellationRequested(); 
 
                    //If an item was successfully removed from the underlying collection.
                    removeSucceeded = m_collection.TryTake(out item); 
                    removeFaulted = false;
                    if (!removeSucceeded)
                    {
                        // Check if the collection is empty which means that the collection was modified outside BlockingCollection 
                        throw new InvalidOperationException
                            (SR.GetString(SR.BlockingCollection_Take_CollectionModified)); 
                    } 
                }
                finally 
                {
                    // removeFaulted implies !removeSucceeded, but the reverse is not true.
                    if (removeSucceeded)
                    { 
                        if (m_freeNodes != null)
                        { 
                            Debug.Assert(m_boundedCapacity != NON_BOUNDED); 
                            m_freeNodes.Release();
                        } 
                    }
                    else if (removeFaulted)
                    {
                        m_occupiedNodes.Release(); 
                    }
                    //Last remover will detect that it has actually removed the last item from the 
                    //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores 
                    //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers
                    //but this is not a problem. 
                    if (IsCompleted)
                    {
                        CancelWaitingConsumers();
                    } 
                }
            } 
            return waitForSemaphoreWasSuccessful; 
        }
 


        /// 
        /// Adds the specified item to any one of the specified 
        ///  instances.
        ///  
        /// The array of collections. 
        /// The item to be added to one of the collections.
        /// The index of the collection in the  array to which the item was added. 
        /// The  argument is
        /// null.
        /// The  argument is
        /// a 0-length array or contains a null element, or at least one of collections has been 
        /// marked as complete for adding.
        /// At least one of the  instances has been disposed. 
        /// At least one underlying collection didn't accept the item.
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA.
        /// 
        /// If a bounded capacity was specified when all of the
        ///  instances were initialized, 
        /// a call to AddToAny may block until space is available in one of the collections
        /// to store the provided item. 
        ///  
        public static int AddToAny(BlockingCollection[] collections, T item)
        { 
#if DEBUG
            int tryAddAnyReturnValue =
#else
            return 
#endif
 TryAddToAny(collections, item, Timeout.Infinite, new CancellationToken()); 
#if DEBUG 
            Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
                                , "TryAddToAny() was expected to return an index within the bounds of the collections array."); 
            return tryAddAnyReturnValue;
#endif
        }
 
        /// 
        /// Adds the specified item to any one of the specified 
        ///  instances. 
        /// A  is thrown if the  is
        /// canceled. 
        /// 
        /// The array of collections.
        /// The item to be added to one of the collections.
        /// A cancellation token to observe. 
        /// The index of the collection in the  array to which the item was added.
        /// If the  is canceled. 
        /// The  argument is 
        /// null.
        /// The  argument is 
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding.
        /// At least one of the  instances has been disposed. 
        /// At least one underlying collection didn't accept the item.
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA. 
        /// 
        /// If a bounded capacity was specified when all of the 
        ///  instances were initialized,
        /// a call to AddToAny may block until space is available in one of the collections
        /// to store the provided item.
        ///  
        public static int AddToAny(BlockingCollection[] collections, T item, CancellationToken cancellationToken)
        { 
#if DEBUG 
            int tryAddAnyReturnValue =
#else 
            return
#endif
 TryAddToAny(collections, item, Timeout.Infinite, cancellationToken);
#if DEBUG 
            Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
                                , "TryAddToAny() was expected to return an index within the bounds of the collections array."); 
            return tryAddAnyReturnValue; 
#endif
        } 

        /// 
        /// Attempts to add the specified item to any one of the specified
        ///  instances. 
        /// 
        /// The array of collections. 
        /// The item to be added to one of the collections. 
        /// The index of the collection in the 
        /// array to which the item was added, or -1 if the item could not be added. 
        /// The  argument is
        /// null.
        /// The  argument is
        /// a 0-length array or contains a null element, or at least one of collections has been 
        /// marked as complete for adding.
        /// At least one of the  instances has been disposed. 
        /// At least one underlying collection didn't accept the item.
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA.
        public static int TryAddToAny(BlockingCollection[] collections, T item)
        {
            return TryAddToAny(collections, item, 0, new CancellationToken()); 
        }
 
        ///  
        /// Attempts to add the specified item to any one of the specified
        ///  instances. 
        /// 
        /// The array of collections.
        /// The item to be added to one of the collections.
        /// A  that represents the number of milliseconds 
        /// to wait, or a  that represents -1 milliseconds to wait indefinitely.
        ///  
        /// The index of the collection in the  
        /// array to which the item was added, or -1 if the item could not be added.
        /// The  argument is 
        /// null.
        /// The  argument is
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding. 
        /// At least one of the  instances has been disposed. 
        ///  is a negative number 
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
        /// . 
        /// At least one underlying collection didn't accept the item.
        /// The count of  is greater than the maximum size of
        /// 62 for STA and 63 for MTA.
        public static int TryAddToAny(BlockingCollection[] collections, T item, TimeSpan timeout) 
        {
            ValidateTimeout(timeout); 
            return TryAddTakeAny(collections, ref item, (int)timeout.TotalMilliseconds, OperationMode.Add, new CancellationToken()); 
        }
 
        /// 
        /// Attempts to add the specified item to any one of the specified
        ///  instances.
        ///  
        /// The array of collections.
        /// The item to be added to one of the collections. 
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.        /// The index of the collection in the 
        /// array to which the item was added, or -1 if the item could not be added. 
        /// The  argument is
        /// null.
        /// The  argument is
        /// a 0-length array or contains a null element, or at least one of collections has been 
        /// marked as complete for adding.
        /// At least one of the  instances has been disposed. 
        ///  is a
        /// negative number other than -1, which represents an infinite time-out. 
        /// At least one underlying collection didn't accept the item.
        /// The count of  is greater than the maximum size of
        /// 62 for STA and 63 for MTA.
        public static int TryAddToAny(BlockingCollection[] collections, T item, int millisecondsTimeout) 
        {
            ValidateMillisecondsTimeout(millisecondsTimeout); 
            return TryAddTakeAny(collections, ref item, millisecondsTimeout, OperationMode.Add, new CancellationToken()); 
        }
 
        /// 
        /// Attempts to add the specified item to any one of the specified
        ///  instances.
        /// A  is thrown if the  is 
        /// canceled.
        ///  
        /// The array of collections. 
        /// The item to be added to one of the collections.
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.        /// The index of the collection in the 
        /// array to which the item was added, or -1 if the item could not be added.
        /// A cancellation token to observe.
        /// If the  is canceled. 
        /// The  argument is
        /// null. 
        /// The  argument is 
        /// a 0-length array or contains a null element, or at least one of collections has been
        /// marked as complete for adding. 
        /// At least one of the  instances has been disposed.
        ///  is a
        /// negative number other than -1, which represents an infinite time-out. 
        /// At least one underlying collection didn't accept the item.
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA. 
        public static int TryAddToAny(BlockingCollection[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
        { 
            ValidateMillisecondsTimeout(millisecondsTimeout);
            return TryAddTakeAny(collections, ref item, millisecondsTimeout, OperationMode.Add, cancellationToken);
        }
 
        /// Adds/Takes an item to/from anyone of the specified collections.
        /// A  is thrown if the  is 
        /// canceled. 
        /// 
        /// The collections into which the item can be added. 
        /// The item to be added or the item removed and returned to the caller.
        /// The number of milliseconds to wait for a collection to accept the
        /// operation, or -1 to wait indefinitely.
        /// Indicates whether this method is called to Add or Take. 
        /// A cancellation token to observe.
        /// The index into collections for the collection which accepted the 
        /// adding/removal of the item; -1 if the item could not be added/removed. 
        /// If the  is canceled.
        /// If the collections argument is null. 
        /// If the collections argument is a 0-length array or contains a
        /// null element. Also, if atleast one of the collections has been marked complete for adds.
        /// If atleast one of the collections has been disposed.
        private static int TryAddTakeAny(BlockingCollection[] collections, ref T item, int millisecondsTimeout, OperationMode operationMode, CancellationToken externalCancellationToken) 
        {
            BlockingCollection[] collectionsCopy = ValidateCollectionsArray(collections, operationMode); 
#if DEBUG 
            collections = null; // Ensure we always go through collectionsCopy from now on.
#endif 
            const int OPERATION_FAILED = -1;

            // Copy the wait time to another local variable to update it
            int timeout = millisecondsTimeout; 

            long startTimeTicks = 0; 
            if (millisecondsTimeout != Timeout.Infinite) 
            {
                startTimeTicks = DateTime.UtcNow.Ticks; 
            }

            // Fast path for adding if there is at least one unbounded collection
            if (operationMode == OperationMode.Add) 
            {
                for (int i = 0; i < collectionsCopy.Length; i++) 
                { 
                    if (collectionsCopy[i].m_freeNodes == null)
                    { 
#if DEBUG
                        bool result =
#endif
                        collectionsCopy[i].TryAdd(item); 
#if DEBUG
                        Debug.Assert(result); 
#endif 
                        return i;
                    } 
                }
            }

            // Get wait handles and the tokens for all collections 
            CancellationToken[] collatedCancellationTokens;
            List handles = GetHandles(collectionsCopy, operationMode, externalCancellationToken, false, out collatedCancellationTokens); 
 
            //Loop until the timeout period is exhausted or the operation was successful.
            while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) 
            {
                #region Update Timeout

                #endregion 

                //Wait for any collection to become available. 
                int index = -1; 
                CancellationTokenSource linkedTokenSource = null;
                try 
                {
                    index = WaitHandle_WaitAny(handles, 0, externalCancellationToken, externalCancellationToken);
                    if (index == WaitHandle.WaitTimeout)
                    { 
                        linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens);
                        index = WaitHandle_WaitAny(handles, timeout, linkedTokenSource.Token, 
                                                   externalCancellationToken); 
                    }
                } 
                catch (OperationCanceledException ex)
                {
                    // throw OCE if it is external cancellation
                    if (externalCancellationToken.IsCancellationRequested) 
                    {
                        Debug.Assert(ex.CancellationToken == externalCancellationToken); 
                        throw; 
                    }
 
                    // If the Operation is TryTake, the exception will not be propagated, it will try another collection, and return false if timeout expired
                    // or all collection completed addition
                    if (operationMode == OperationMode.Take && millisecondsTimeout != Timeout.Infinite)
                    { 
                        // Get the new handles and exclude the collections that completed addition
                        handles = GetHandles(collectionsCopy, operationMode, externalCancellationToken, true, out collatedCancellationTokens); 
                        timeout = UpdateTimeOut(startTimeTicks, millisecondsTimeout); 
                        if (handles.Count == 0 || timeout == 0)
                        { 
                            return OPERATION_FAILED;
                        }
                        continue; //retry for the updated handles
                    } 

                    // Otherwise if it is Take, Add or Try Add ArgumentException will be thrown 
                    throw new ArgumentException( 
                               SR.GetString(SR.BlockingCollection_CantTakeAnyWhenDone), "collections");
                } 
                finally
                {
                    if (linkedTokenSource != null)
                    { 
                        linkedTokenSource.Dispose();
                    } 
                } 

                Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); 

                //If the timeout period was not exhausted and the appropriate operation succeeded.
                if (index != WaitHandle.WaitTimeout)
                { 
                    if (operationMode == OperationMode.Add)
                    { 
                        if (collectionsCopy[index].TryAdd(item)) 
                        {
                            return index; 
                        }
                    }
                    else if (operationMode == OperationMode.Take)
                    { 
                        if (collectionsCopy[index].TryTake(out item))
                        { 
                            return index; 
                        }
                    } 
                }
                else
                {
                    return OPERATION_FAILED; 
                }
 
                // Update the timeout 
                if (millisecondsTimeout > 0)
                { 
                    timeout = UpdateTimeOut(startTimeTicks, millisecondsTimeout);
                    if (timeout <= 0)
                    {
                        return OPERATION_FAILED; 
                    }
                } 
            } 

            return OPERATION_FAILED; 
        }

        /// 
        /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections 
        /// 
        /// The blocking collections 
        /// Add or Take operation 
        /// The original CancellationToken
        /// True to exclude the compeleted collections 
        /// Complete list of cancellationTokens to observe
        /// The collections wait handles
        private static List GetHandles(BlockingCollection[] collections, OperationMode operationMode, CancellationToken externalCancellationToken, bool excludeCompleted, out CancellationToken[] cancellationTokens)
        { 

            Debug.Assert(collections != null); 
            List handlesList = new List(collections.Length); 
            List tokensList = new List(collections.Length + 1); // + 1 for the external token
            tokensList.Add(externalCancellationToken); 

            //Read the appropriate WaitHandle based on the operation mode.
            if (operationMode == OperationMode.Add)
            { 

                for (int i = 0; i < collections.Length; i++) 
                { 
                    if (collections[i].m_freeNodes != null)
                    { 
                        handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle);
                        tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token);
                    }
                } 
            }
            else 
            { 

                for (int i = 0; i < collections.Length; i++) 
                {
                    if (excludeCompleted && collections[i].IsCompleted)
                    {
                        continue; 
                    }
                    handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle); 
                    tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token); 
                }
            } 

            cancellationTokens = tokensList.ToArray();
            return handlesList;
        } 

 
        ///  
        /// Helper to perform WaitHandle.WaitAny(.., CancellationToken)
        /// this should eventually appear on the WaitHandle class. 
        /// 
        /// 
        /// 
        ///  
        /// 
        ///  
        private static int WaitHandle_WaitAny(List handles, int millisecondsTimeout, CancellationToken combinedToken, CancellationToken externalToken) 
        {
            WaitHandle[] handlesPlusCancelToken = new WaitHandle[handles.Count + 1]; // +1 for the combinedToken handle 
            for (int i = 0; i < handles.Count; i++)
                handlesPlusCancelToken[i] = handles[i];
            handlesPlusCancelToken[handles.Count] = combinedToken.WaitHandle;
            int retVal = WaitHandle.WaitAny(handlesPlusCancelToken, millisecondsTimeout, false); 

            if (combinedToken.IsCancellationRequested) 
            { 
                //Add the original token if it is external cancellation
                if (externalToken.IsCancellationRequested) 
                    throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalToken);

                throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled));
            } 

            return retVal; 
        } 

        ///  
        /// Helper function to measure and update the wait time
        /// 
        ///  The first time (in Ticks) observed when the wait started
        /// The orginal wait timeoutout in milliseconds 
        /// The new wait time in milliseconds, -1 if the time expired
        private static int UpdateTimeOut(long startTimeTicks, int originalWaitMillisecondsTimeout) 
        { 
            if (originalWaitMillisecondsTimeout == 0)
            { 
                return 0;
            }
            // The function must be called in case the time out is not infinite
            Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite); 

            long elapsedMilliseconds = (DateTime.UtcNow.Ticks - startTimeTicks) / TimeSpan.TicksPerMillisecond; 
 
            // Check the elapsed milliseconds is greater than max int because this property is long
            if (elapsedMilliseconds > int.MaxValue) 
            {
                return 0;
            }
 
            // Subtract the elapsed time from the current wait time
            int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ; 
            if (currentWaitTimeout <= 0) 
            {
                return 0; 
            }

            return currentWaitTimeout;
        } 
        /// 
        /// Takes an item from any one of the specified 
        ///  instances. 
        /// 
        /// The array of collections. 
        /// The item removed from one of the collections.
        /// The index of the collection in the  array from which
        /// the item was removed, or -1 if an item could not be removed.
        /// The  argument is 
        /// null.
        /// The  argument is 
        /// a 0-length array or contains a null element. 
        /// At least one of the  instances has been disposed. 
        /// At least one of the underlying collections was modified
        /// outside of its  instance.
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA.
        /// A call to TakeFromAny may block until an item is available to be removed. 
        public static int TakeFromAny(BlockingCollection[] collections, out T item) 
        {
#if DEBUG 
            int tryTakeAnyReturnValue =
#else
            return
#endif 
 TryTakeFromAny(collections, out item, Timeout.Infinite);
#if DEBUG 
            Debug.Assert((tryTakeAnyReturnValue >= 0 && tryTakeAnyReturnValue < collections.Length) 
                                , "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
            return tryTakeAnyReturnValue; 
#endif

        }
 
        /// 
        /// Takes an item from any one of the specified 
        ///  instances. 
        /// A  is thrown if the  is
        /// canceled. 
        /// 
        /// The array of collections.
        /// The item removed from one of the collections.
        /// A cancellation token to observe. 
        /// The index of the collection in the  array from which
        /// the item was removed, or -1 if an item could not be removed. 
        /// The  argument is 
        /// null.
        /// If the  is canceled. 
        /// The  argument is
        /// a 0-length array or contains a null element.
        /// At least one of the  instances has been disposed. 
        /// At least one of the underlying collections was modified
        /// outside of its  instance. 
        /// The count of  is greater than the maximum size of
        /// 62 for STA and 63 for MTA. 
        /// A call to TakeFromAny may block until an item is available to be removed.
        public static int TakeFromAny(BlockingCollection[] collections, out T item, CancellationToken cancellationToken)
        {
#if DEBUG 
            int tryTakeAnyReturnValue =
#else 
            return 
#endif
 TryTakeFromAny(collections, out item, Timeout.Infinite, cancellationToken); 
#if DEBUG
            Debug.Assert((tryTakeAnyReturnValue >= 0 && tryTakeAnyReturnValue < collections.Length)
                                , "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
            return tryTakeAnyReturnValue; 
#endif
        } 
 
        /// 
        /// Attempts to remove an item from any one of the specified 
        ///  instances.
        /// 
        /// The array of collections.
        /// The item removed from one of the collections. 
        /// The index of the collection in the  array from which
        /// the item was removed, or -1 if an item could not be removed. 
        /// The  argument is 
        /// null.
        /// The  argument is 
        /// a 0-length array or contains a null element.
        /// At least one of the  instances has been disposed.
        /// At least one of the underlying collections was modified 
        /// outside of its  instance. 
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA.
        /// A call to TryTakeFromAny may block until an item is available to be removed. 
        public static int TryTakeFromAny(BlockingCollection[] collections, out T item)
        {
            return TryTakeFromAny(collections, out item, 0);
        } 

        ///  
        /// Attempts to remove an item from any one of the specified 
        ///  instances.
        ///  
        /// The array of collections.
        /// The item removed from one of the collections.
        /// A  that represents the number of milliseconds
        /// to wait, or a  that represents -1 milliseconds to wait indefinitely. 
        /// 
        /// The index of the collection in the  array from which 
        /// the item was removed, or -1 if an item could not be removed. 
        /// The  argument is
        /// null. 
        /// The  argument is
        /// a 0-length array or contains a null element.
        /// At least one of the  instances has been disposed. 
        ///  is a negative number
        /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than 
        /// . 
        /// At least one of the underlying collections was modified
        /// outside of its  instance.
        /// The count of  is greater than the maximum size of
        /// 62 for STA and 63 for MTA.
        /// A call to TryTakeFromAny may block until an item is available to be removed. 
        public static int TryTakeFromAny(BlockingCollection[] collections, out T item, TimeSpan timeout)
        { 
            ValidateTimeout(timeout); 
            T tempItem = default(T);
            int returnValue = TryAddTakeAny(collections, ref tempItem, (int)timeout.TotalMilliseconds, OperationMode.Take, new CancellationToken()); 
            item = tempItem;
            return returnValue;
        }
 
        /// 
        /// Attempts to remove an item from any one of the specified 
        ///  instances. 
        /// 
        /// The array of collections. 
        /// The item removed from one of the collections.
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.
        /// The index of the collection in the  array from which 
        /// the item was removed, or -1 if an item could not be removed.
        /// The  argument is 
        /// null. 
        /// The  argument is
        /// a 0-length array or contains a null element. 
        /// At least one of the  instances has been disposed.
        ///  is a
        /// negative number other than -1, which represents an infinite time-out. 
        /// At least one of the underlying collections was modified
        /// outside of its  instance. 
        /// The count of  is greater than the maximum size of
        /// 62 for STA and 63 for MTA. 
        /// A call to TryTakeFromAny may block until an item is available to be removed.
        public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout)
        {
            ValidateMillisecondsTimeout(millisecondsTimeout); 
            T tempItem = default(T);
            int returnValue = TryAddTakeAny(collections, ref tempItem, millisecondsTimeout, OperationMode.Take, new CancellationToken()); 
            item = tempItem; 
            return returnValue;
        } 

        /// 
        /// Attempts to remove an item from any one of the specified
        ///  instances. 
        /// A  is thrown if the  is
        /// canceled. 
        ///  
        /// The array of collections.
        /// The item removed from one of the collections. 
        /// The number of milliseconds to wait, or  (-1) to wait indefinitely.
        /// A cancellation token to observe.
        /// The index of the collection in the  array from which 
        /// the item was removed, or -1 if an item could not be removed.
        /// If the  is canceled. 
        /// The  argument is 
        /// null.
        /// The  argument is 
        /// a 0-length array or contains a null element.
        /// At least one of the  instances has been disposed.
        ///  is a 
        /// negative number other than -1, which represents an infinite time-out.
        /// At least one of the underlying collections was modified 
        /// outside of its  instance.
        /// The count of  is greater than the maximum size of 
        /// 62 for STA and 63 for MTA.
        /// A call to TryTakeFromAny may block until an item is available to be removed.
        public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
        { 
            ValidateMillisecondsTimeout(millisecondsTimeout);
            T tempItem = default(T); 
            int returnValue = TryAddTakeAny(collections, ref tempItem, millisecondsTimeout, OperationMode.Take, cancellationToken); 
            item = tempItem;
            return returnValue; 
        }

        /// 
        /// Marks the  instances 
        /// as not accepting any more additions.
        ///  
        ///  
        /// After a collection has been marked as complete for adding, adding to the collection is not permitted
        /// and attempts to remove from the collection will not wait when the collection is empty. 
        /// 
        /// The  has been disposed.
        public void CompleteAdding() 
        {
            CheckDisposed(); 
 
            if (IsAddingCompleted)
                return; 

            SpinWait spinner = new SpinWait();
            while (true)
            { 
                int observedAdders = m_currentAdders;
                if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) 
                { 
                    spinner.Reset();
                    // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes 
                    while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
                    return;
                }
 
                if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders)
                { 
                    spinner.Reset(); 
                    while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
 
                    if (Count == 0)
                    {
                        CancelWaitingConsumers();
                    } 

                    // We should always wake waiting producers, and have them throw exceptions as 
                    // Add&CompleteAdding should not be used concurrently. 
                    CancelWaitingProducers();
                    return; 

                }
                spinner.SpinOnce();
            } 
        }
 
        /// Cancels the semaphores. 
        private void CancelWaitingConsumers()
        { 
            m_ConsumersCancellationTokenSource.Cancel();
        }

        private void CancelWaitingProducers() 
        {
            m_ProducersCancellationTokenSource.Cancel(); 
        } 

 
        /// 
        /// Releases resources used by the  instance.
        /// 
        public void Dispose() 
        {
            Dispose(true); 
            GC.SuppressFinalize(this); 
        }
 
        /// 
        /// Releases resources used by the  instance.
        /// 
        /// Whether being disposed explicitly (true) or due to a finalizer (false). 
        protected virtual void Dispose(bool disposing)
        { 
            if (!m_isDisposed) 
            {
                if (m_freeNodes != null) 
                {
                    m_freeNodes.Dispose();
                }
 
                m_occupiedNodes.Dispose();
 
                m_isDisposed = true; 
            }
        } 

        /// Copies the items from the  instance into a new array.
        /// An array containing copies of the elements of the collection.
        /// The  has been disposed.
        ///  
        /// The copied elements are not removed from the collection. 
        /// 
        public T[] ToArray() 
        {
            CheckDisposed();
            return m_collection.ToArray();
        } 

        /// Copies all of the items in the  instance 
        /// to a compatible one-dimensional array, starting at the specified index of the target array. 
        /// 
        /// The one-dimensional array that is the destination of the elements copied from 
        /// the  instance. The array must have zero-based indexing.
        /// The zero-based index in  at which copying begins.
        /// The  argument is
        /// null. 
        /// The  argument is less than zero.
        /// The  argument is equal to or greater 
        /// than the length of the . 
        /// The  has been disposed. 
        public void CopyTo(T[] array, int index)
        {
            ((ICollection)this).CopyTo(array, index);
        } 

        /// Copies all of the items in the  instance 
        /// to a compatible one-dimensional array, starting at the specified index of the target array. 
        /// 
        /// The one-dimensional array that is the destination of the elements copied from 
        /// the  instance. The array must have zero-based indexing.
        /// The zero-based index in  at which copying begins.
        /// The  argument is
        /// null. 
        /// The  argument is less than zero.
        /// The  argument is equal to or greater 
        /// than the length of the , the array is multidimensional, or the type parameter for the collection 
        /// cannot be cast automatically to the type of the destination array.
        /// The  has been disposed.
        void ICollection.CopyTo(Array array, int index)
        {
            CheckDisposed(); 

            //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize 
            //all array exceptions. 
            T[] collectionSnapShot = m_collection.ToArray();
 
            try
            {
                Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length);
            } 
            catch (ArgumentNullException)
            { 
                throw new ArgumentNullException("array"); 
            }
            catch (ArgumentOutOfRangeException) 
            {
                throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative));
            }
            catch (ArgumentException) 
            {
                throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index"); 
            } 
            catch (RankException)
            { 
                throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array");
            }
            catch (InvalidCastException)
            { 
                throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
            } 
            catch (ArrayTypeMismatchException) 
            {
                throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); 
            }
        }

        /// Provides a consuming  for items in the collection. 
        /// An  that removes and returns items from the collection.
        /// The  has been disposed. 
        public IEnumerable GetConsumingEnumerable()
        { 
            return GetConsumingEnumerable(CancellationToken.None);
        }

        /// Provides a consuming  for items in the collection. 
        /// Calling MoveNext on the returned enumerable will block if there is no data available, or will
        /// throw an  if the  is canceled. 
        ///  
        /// A cancellation token to observe.
        /// An  that removes and returns items from the collection. 
        /// The  has been disposed.
        /// If the  is canceled.
        public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken) 
        {
            CancellationTokenSource linkedTokenSource = null; 
            try 
            {
                linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); 
                while (!IsCompleted)
                {
                    T item;
                    if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource)) 
                    {
                        yield return item; 
                    } 
                }
            } 
            finally
            {
                if (linkedTokenSource != null)
                { 
                    linkedTokenSource.Dispose();
                } 
            } 
        }
 
        /// Provides an  for items in the collection.
        /// An  for the items in the collection.
        /// The  has been disposed. 
        IEnumerator IEnumerable.GetEnumerator()
        { 
            CheckDisposed(); 
            return m_collection.GetEnumerator();
 
        }

        /// Provides an  for items in the collection.
        /// An  for the items in the collection. 
        /// The  has been disposed. 
        IEnumerator IEnumerable.GetEnumerator() 
        {
            return ((IEnumerable)this).GetEnumerator(); 
        }

        /// Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny()
        /// and TryTakeFromAny(). 
        /// The collections to/from which an item should be added/removed.
        /// Indicates whether this method is called to Add or Take. 
        /// A copy of the collections array that acts as a defense to prevent an “outsider” from changing 
        /// elements of the array after we have done the validation on them.
        /// If the collections argument is null. 
        /// If the collections argument is a 0-length array or contains a
        /// null element. Also, if atleast one of the collections has been marked complete for adds.
        /// If atleast one of the collections has been disposed.
        private static BlockingCollection[] ValidateCollectionsArray(BlockingCollection[] collections, OperationMode operationMode) 
        {
            if (collections == null) 
            { 
                throw new ArgumentNullException("collections");
            } 
            else if (collections.Length < 1)
            {
                throw new ArgumentException(
                    SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections"); 
            }
            else if ((Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA && collections.Length > 63) 
                   || (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA && collections.Length > 62)) 
            //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken
            { 
                throw new ArgumentOutOfRangeException(
                    "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize));
            }
 
            BlockingCollection[] collectionsCopy = new BlockingCollection[collections.Length];
            for (int i = 0; i < collectionsCopy.Length; ++i) 
            { 
                collectionsCopy[i] = collections[i];
                if (collectionsCopy[i] == null) 
                {
                    throw new ArgumentException(
                        SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections");
                } 

                if (collectionsCopy[i].m_isDisposed) 
                    throw new ObjectDisposedException( 
                        "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems));
 
                if (operationMode == OperationMode.Add && collectionsCopy[i].IsAddingCompleted)
                {
                    throw new ArgumentException(
                        SR.GetString(SR.BlockingCollection_CantTakeAnyWhenDone), "collections"); 
                }
            } 
 
            return collectionsCopy;
        } 


        // ---------
        // Private Helpers. 
        /// Centeralizes the logic of validating the timeout input argument.
        /// The TimeSpan to wait for to successfully complete an operation on the collection. 
        /// If the number of millseconds represented by the timeout 
        /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite
        private static void ValidateTimeout(TimeSpan timeout) 
        {
            long totalMilliseconds = (long)timeout.TotalMilliseconds;
            if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite))
            { 
                throw new ArgumentOutOfRangeException("timeout", timeout,
                    String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); 
            } 
        }
 
        /// Centralizes the logic of validating the millisecondsTimeout input argument.
        /// The number of milliseconds to wait for to successfully complete an
        /// operation on the collection.
        /// If the number of millseconds is less than 0 and not 
        /// equal to Timeout.Infinite.
        private static void ValidateMillisecondsTimeout(int millisecondsTimeout) 
        { 
            if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite))
            { 
                throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout,
                    String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
            }
        } 

        /// Throws a System.ObjectDisposedException if the collection was disposed 
        /// If the collection has been disposed. 
        private void CheckDisposed()
        { 
            if (m_isDisposed)
            {
                throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed));
            } 
        }
 
    } 

 

    /// A debugger view of the blocking collection that makes it simple to browse the
    /// collection's contents at a point in time.
    /// The type of element that the BlockingCollection will hold. 
    internal sealed class SystemThreadingCollections_BlockingCollectionDebugView
    { 
        private BlockingCollection m_blockingCollection; // The collection being viewed. 

        /// Constructs a new debugger view object for the provided blocking collection object. 
        /// A blocking collection to browse in the debugger.
        public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection collection)
        {
            if (collection == null) 
            {
                throw new ArgumentNullException("collection"); 
            } 

            m_blockingCollection = collection; 
        }

        /// Returns a snapshot of the underlying collection's elements.
        [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] 
        public T[] Items
        { 
            get 
            {
                return m_blockingCollection.ToArray(); 
            }
        }

    } 
}
#pragma warning restore 0420 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK