Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Sys / System / Collections / Concurrent / BlockingCollection.cs / 1305376 / BlockingCollection.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // BlockingCollection.cs // //[....] // // A class that implements the bounding and blocking functionality while abstracting away // the underlying storage mechanism. This file also contains BlockingCollection's // associated debugger view type. // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- #pragma warning disable 0420 using System; using System.Collections.Generic; using System.Collections; using System.Diagnostics; using System.Globalization; using System.Security.Permissions; using System.Runtime.InteropServices; using System.Threading; namespace System.Collections.Concurrent { ////// Provides blocking and bounding capabilities for thread-safe collections that /// implement ///. /// /// ///represents a collection /// that allows for thread-safe adding and removing of data. /// is used as a wrapper /// for an instance, allowing /// removal attempts from the collection to block until data is available to be removed. Similarly, /// a can be created to enforce /// an upper-bound on the number of data elements allowed in the /// ; addition attempts to the /// collection may then block until space is available to store the added items. In this manner, /// is similar to a traditional /// blocking queue data structure, except that the underlying data storage mechanism is abstracted /// away as an . /// Specifies the type of elements in the collection. [ComVisible(false)] [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))] [DebuggerDisplay("Count = {Count}, Type = {m_collection}")] public class BlockingCollection: IEnumerable , ICollection, IDisposable { private IProducerConsumerCollection m_collection; private int m_boundedCapacity; private const int NON_BOUNDED = -1; private SemaphoreSlim m_freeNodes; private SemaphoreSlim m_occupiedNodes; private volatile bool m_isDisposed; private CancellationTokenSource m_ConsumersCancellationTokenSource; private CancellationTokenSource m_ProducersCancellationTokenSource; private volatile int m_currentAdders; private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000); #region Enums /// An enumerated data type used internal to the class to specify to a generic method /// the current mode of operation. private enum OperationMode { Add, Take }; #endregion #region Properties ///Gets the bounded capacity of this ///instance. The bounded capacity of this collection, or int.MaxValue if no bound was supplied. ///The public int BoundedCapacity { get { CheckDisposed(); return m_boundedCapacity; } } ///has been disposed. Gets whether this ///has been marked as complete for adding. Whether this collection has been marked as complete for adding. ///The public bool IsAddingCompleted { get { CheckDisposed(); return (m_currentAdders == COMPLETE_ADDING_ON_MASK); } } ///has been disposed. Gets whether this ///has been marked as complete for adding and is empty. Whether this collection has been marked as complete for adding and is empty. ///The public bool IsCompleted { get { CheckDisposed(); return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0)); } } ///has been disposed. Gets the number of items contained in the ///. The number of items contained in the ///. The public int Count { get { CheckDisposed(); return m_occupiedNodes.CurrentCount; } } ///has been disposed. Gets a value indicating whether access to the ///is synchronized. The bool ICollection.IsSynchronized { get { CheckDisposed(); return false; } } ///has been disposed. /// Gets an object that can be used to synchronize access to the ///. This property is not supported. /// The SyncRoot property is not supported. object ICollection.SyncRoot { get { throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); } } #endregion ///Initializes a new instance of the /// ////// class without an upper-bound. /// /// The default underlying collection is a public BlockingCollection() : this(new ConcurrentQueueConcurrentQueue<T> . ///()) { } /// Initializes a new instance of the /// The bounded size of the collection. ////// class with the specified upper-bound. /// The ///is /// not a positive value. /// The default underlying collection is a public BlockingCollection(int boundedCapacity) : this(new ConcurrentQueueConcurrentQueue<T> . ///(), boundedCapacity) { } /// Initializes a new instance of the /// The collection to use as the underlying data store. /// The bounded size of the collection. ////// class with the specified upper-bound and using the provided /// as its underlying data store. The ///argument is /// null. The ///is not a positive value. The supplied public BlockingCollection(IProducerConsumerCollectioncontains more values /// than is permitted by . collection, int boundedCapacity) { if (boundedCapacity < 1) { throw new ArgumentOutOfRangeException( "boundedCapacity", boundedCapacity, SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange)); } if (collection == null) { throw new ArgumentNullException("collection"); } int count = collection.Count; if (count > boundedCapacity) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity)); } Initialize(collection, boundedCapacity, count); } /// Initializes a new instance of the /// The collection to use as the underlying data store. ////// class without an upper-bound and using the provided /// as its underlying data store. The public BlockingCollection(IProducerConsumerCollectionargument is /// null. collection) { if (collection == null) { throw new ArgumentNullException("collection"); } Initialize(collection, NON_BOUNDED, collection.Count); } /// Initializes the BlockingCollection instance. /// The collection to use as the underlying data store. /// The bounded size of the collection. /// The number of items currently in the underlying collection. private void Initialize(IProducerConsumerCollectioncollection, int boundedCapacity, int collectionCount) { Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED); m_collection = collection; m_boundedCapacity = boundedCapacity; ; m_isDisposed = false; m_ConsumersCancellationTokenSource = new CancellationTokenSource(); m_ProducersCancellationTokenSource = new CancellationTokenSource(); if (boundedCapacity == NON_BOUNDED) { m_freeNodes = null; } else { Debug.Assert(boundedCapacity > 0); m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount); } m_occupiedNodes = new SemaphoreSlim(collectionCount); } /// /// Adds the item to the /// The item to be added to the collection. The value can be a null reference. ///. /// The ///has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection didn't accept the item. ////// If a bounded capacity was specified when this instance of /// public void Add(T item) { #if DEBUG bool tryAddReturnValue = #endif TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken()); #if DEBUG Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); #endif } ///was initialized, /// a call to Add may block until space is available to store the provided item. /// /// Adds the item to the /// The item to be added to the collection. The value can be a null reference. /// A cancellation token to observe. ///. /// A is thrown if the is /// canceled. /// If the ///is canceled. The ///has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection didn't accept the item. ////// If a bounded capacity was specified when this instance of /// public void Add(T item, CancellationToken cancellationToken) { #if DEBUG bool tryAddReturnValue = #endif TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); #endif } ///was initialized, /// a call to may block until space is available to store the provided item. /// /// Attempts to add the specified item to the /// The item to be added to the collection. ///. /// true if the ///could be added; otherwise, false. The ///has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection didn't accept the item. public bool TryAdd(T item) { return TryAddWithNoTimeValidation(item, 0, new CancellationToken()); } ////// Attempts to add the specified item to the /// The item to be added to the collection. /// A. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// true if the ///could be added to the collection within /// the alloted time; otherwise, false. The ///has been marked /// as complete with regards to additions. The ///has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . The underlying collection didn't accept the item. public bool TryAdd(T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken()); } ////// Attempts to add the specified item to the /// The item to be added to the collection. /// The number of milliseconds to wait, or. /// (-1) to wait indefinitely. /// true if the ///could be added to the collection within /// the alloted time; otherwise, false. The ///has been marked /// as complete with regards to additions. The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection didn't accept the item. public bool TryAdd(T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken()); } ////// Attempts to add the specified item to the /// The item to be added to the collection. /// The number of milliseconds to wait, or. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// A cancellation token to observe. /// true if the ///could be added to the collection within /// the alloted time; otherwise, false. If the ///is canceled. The ///has been marked /// as complete with regards to additions. The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection didn't accept the item. public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken); } ///Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add /// method. If a bounded capacity was specified and the collection was full, /// this method will wait for, at most, the timeout period trying to add the item. /// If the timeout period was exhaused before successfully adding the item this method will /// return false. /// The item to be added to the collection. /// The number of milliseconds to wait for the collection to accept the item, /// or Timeout.Infinite to wait indefinitely. /// A cancellation token to observe. ///False if the collection remained full till the timeout period was exhausted.True otherwise. ///If the ///is canceled. the collection has already been marked /// as complete with regards to additions. ///If the collection has been disposed. ///The underlying collection didn't accept the item. private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken) { CheckDisposed(); if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); if (IsAddingCompleted) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); } bool waitForSemaphoreWasSuccessful = true; if (m_freeNodes != null) { //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding() //was called concurrently with Adding which is not supported by BlockingCollection. CancellationTokenSource linkedTokenSource = null; try { waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken, m_ProducersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } catch (OperationCanceledException) { //if cancellation was via external token, throw an OCE if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx. //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested); throw new InvalidOperationException (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd)); } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } if (waitForSemaphoreWasSuccessful) { // Update the adders count if the complete adding was not requested, otherwise // spins until all adders finish then throw IOE // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have // not been finished yet SpinWait spinner = new SpinWait(); while (true) { int observedAdders = m_currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // CompleteAdding is requested, spin then throw while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); } if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders) { Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit."); break; } spinner.SpinOnce(); } // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if: // 1- m_collection.TryAdd threw an exception // 2- m_collection.TryAdd succeeded // 3- m_collection.TryAdd returned false // so we put the decrement code in the finally block try { //TryAdd is guaranteed to find a place to add the element. Its return value depends //on the semantics of the underlying store. Some underlying stores will not add an already //existing item and thus TryAdd returns false indicating that the size of the underlying //store did not increase. bool addingSucceeded = false; try { //The token may have been canceled before the collection had space available, so we need a check after the wait has completed. //This fixes bug #702328, case 2 of 2. cancellationToken.ThrowIfCancellationRequested(); addingSucceeded = m_collection.TryAdd(item); } catch { //TryAdd did not result in increasing the size of the underlying store and hence we need //to increment back the count of the m_freeNodes semaphore. if (m_freeNodes != null) { m_freeNodes.Release(); } throw; } if (addingSucceeded) { //After adding an element to the underlying storage, signal to the consumers //waiting on m_occupiedNodes that there is a new item added ready to be consumed. m_occupiedNodes.Release(); } else { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed)); } } finally { // decrement the adders count Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0); Interlocked.Decrement(ref m_currentAdders); } } return waitForSemaphoreWasSuccessful; } ///Takes an item from the ///. The item removed from the collection. ///The ///is empty and has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection was modified /// outside of this ///instance. A call to public T Take() { T item; if (!TryTake(out item, Timeout.Infinite, CancellationToken.None)) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); } return item; } ///may block until an item is available to be removed. Takes an item from the ///. The item removed from the collection. ///If the ///is /// canceled or the is empty and has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection was modified /// outside of this ///instance. A call to public T Take(CancellationToken cancellationToken) { T item; if (!TryTake(out item, Timeout.Infinite, cancellationToken)) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); } return item; } ///may block until an item is available to be removed. /// Attempts to remove an item from the /// The item removed from the collection. ///. /// true if an item could be removed; otherwise, false. ///The ///has been disposed. The underlying collection was modified /// outside of this public bool TryTake(out T item) { return TryTake(out item, 0, CancellationToken.None); } ///instance. /// Attempts to remove an item from the /// The item removed from the collection. /// A. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. ///The ///has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . The underlying collection was modified /// outside of this public bool TryTake(out T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null); } ///instance. /// Attempts to remove an item from the /// The item removed from the collection. /// The number of milliseconds to wait, or. /// (-1) to wait indefinitely. /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. ///The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection was modified /// outside of this public bool TryTake(out T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null); } ///instance. /// Attempts to remove an item from the /// The item removed from the collection. /// The number of milliseconds to wait, or. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// A cancellation token to observe. /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. ///If the ///is canceled. The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection was modified /// outside of this public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null); } ///instance. Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false) /// trying to remove an item. If the timeout period was exhaused before successfully removing an item /// this method will return false. /// A /// The item removed from the collection. /// The number of milliseconds to wait for the collection to have an item available /// for removal, or Timeout.Infinite to wait indefinitely. /// A cancellation token to observe. /// A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token /// multiple times. ///is thrown if the is /// canceled. /// False if the collection remained empty till the timeout period was exhausted. True otherwise. ///If the ///is canceled. If the collection has been disposed. private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource) { CheckDisposed(); item = default(T); if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); //If the collection is completed then there is no need to wait. if (IsCompleted) { return false; } bool waitForSemaphoreWasSuccessful = false; // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable) CancellationTokenSource linkedTokenSource = combinedTokenSource; try { waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { // create the linked token if it is not created yet if (combinedTokenSource == null) linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } //The collection became completed while waiting on the semaphore. catch (OperationCanceledException) { if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); return false; } finally { // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it if (linkedTokenSource != null && combinedTokenSource == null) { linkedTokenSource.Dispose(); } } if (waitForSemaphoreWasSuccessful) { bool removeSucceeded = false; bool removeFaulted = true; try { //The token may have been canceled before an item arrived, so we need a check after the wait has completed. //This fixes bug #702328, case 1 of 2. cancellationToken.ThrowIfCancellationRequested(); //If an item was successfully removed from the underlying collection. removeSucceeded = m_collection.TryTake(out item); removeFaulted = false; if (!removeSucceeded) { // Check if the collection is empty which means that the collection was modified outside BlockingCollection throw new InvalidOperationException (SR.GetString(SR.BlockingCollection_Take_CollectionModified)); } } finally { // removeFaulted implies !removeSucceeded, but the reverse is not true. if (removeSucceeded) { if (m_freeNodes != null) { Debug.Assert(m_boundedCapacity != NON_BOUNDED); m_freeNodes.Release(); } } else if (removeFaulted) { m_occupiedNodes.Release(); } //Last remover will detect that it has actually removed the last item from the //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers //but this is not a problem. if (IsCompleted) { CancelWaitingConsumers(); } } } return waitForSemaphoreWasSuccessful; } ////// Adds the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. ///instances. /// The index of the collection in the ///array to which the item was added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. At least one underlying collection didn't accept the item. ///The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. /// If a bounded capacity was specified when all of the /// public static int AddToAny(BlockingCollectioninstances were initialized, /// a call to AddToAny may block until space is available in one of the collections /// to store the provided item. /// [] collections, T item) { #if DEBUG int tryAddAnyReturnValue = #else return #endif TryAddToAny(collections, item, Timeout.Infinite, new CancellationToken()); #if DEBUG Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) , "TryAddToAny() was expected to return an index within the bounds of the collections array."); return tryAddAnyReturnValue; #endif } /// /// Adds the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// A cancellation token to observe. ///instances. /// A is thrown if the is /// canceled. /// The index of the collection in the ///array to which the item was added. If the ///is canceled. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. At least one underlying collection didn't accept the item. ///The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. /// If a bounded capacity was specified when all of the /// public static int AddToAny(BlockingCollectioninstances were initialized, /// a call to AddToAny may block until space is available in one of the collections /// to store the provided item. /// [] collections, T item, CancellationToken cancellationToken) { #if DEBUG int tryAddAnyReturnValue = #else return #endif TryAddToAny(collections, item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) , "TryAddToAny() was expected to return an index within the bounds of the collections array."); return tryAddAnyReturnValue; #endif } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. ///instances. /// The index of the collection in the ////// array to which the item was added, or -1 if the item could not be added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item) { return TryAddToAny(collections, item, 0, new CancellationToken()); } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// Ainstances. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// The index of the collection in the ////// array to which the item was added, or -1 if the item could not be added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryAddTakeAny(collections, ref item, (int)timeout.TotalMilliseconds, OperationMode.Add, new CancellationToken()); } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// The number of milliseconds to wait, orinstances. /// (-1) to wait indefinitely. /// The index of the collection in the ////// array to which the item was added, or -1 if the item could not be added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddTakeAny(collections, ref item, millisecondsTimeout, OperationMode.Add, new CancellationToken()); } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// The number of milliseconds to wait, orinstances. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// The index of the collection in the /// A cancellation token to observe. ////// array to which the item was added, or -1 if the item could not be added. If the ///is canceled. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddTakeAny(collections, ref item, millisecondsTimeout, OperationMode.Add, cancellationToken); } /// Adds/Takes an item to/from anyone of the specified collections. /// A /// The collections into which the item can be added. /// The item to be added or the item removed and returned to the caller. /// The number of milliseconds to wait for a collection to accept the /// operation, or -1 to wait indefinitely. /// Indicates whether this method is called to Add or Take. /// A cancellation token to observe. ///is thrown if the is /// canceled. /// The index into collections for the collection which accepted the /// adding/removal of the item; -1 if the item could not be added/removed. ///If the ///is canceled. If the collections argument is null. ///If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. ///If atleast one of the collections has been disposed. private static int TryAddTakeAny(BlockingCollection[] collections, ref T item, int millisecondsTimeout, OperationMode operationMode, CancellationToken externalCancellationToken) { BlockingCollection [] collectionsCopy = ValidateCollectionsArray(collections, operationMode); #if DEBUG collections = null; // Ensure we always go through collectionsCopy from now on. #endif const int OPERATION_FAILED = -1; // Copy the wait time to another local variable to update it int timeout = millisecondsTimeout; long startTimeTicks = 0; if (millisecondsTimeout != Timeout.Infinite) { startTimeTicks = DateTime.UtcNow.Ticks; } // Fast path for adding if there is at least one unbounded collection if (operationMode == OperationMode.Add) { for (int i = 0; i < collectionsCopy.Length; i++) { if (collectionsCopy[i].m_freeNodes == null) { #if DEBUG bool result = #endif collectionsCopy[i].TryAdd(item); #if DEBUG Debug.Assert(result); #endif return i; } } } // Get wait handles and the tokens for all collections CancellationToken[] collatedCancellationTokens; List handles = GetHandles(collectionsCopy, operationMode, externalCancellationToken, false, out collatedCancellationTokens); //Loop until the timeout period is exhausted or the operation was successful. while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) { #region Update Timeout #endregion //Wait for any collection to become available. int index = -1; CancellationTokenSource linkedTokenSource = null; try { index = WaitHandle_WaitAny(handles, 0, externalCancellationToken, externalCancellationToken); if (index == WaitHandle.WaitTimeout) { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens); index = WaitHandle_WaitAny(handles, timeout, linkedTokenSource.Token, externalCancellationToken); } } catch (OperationCanceledException ex) { // throw OCE if it is external cancellation if (externalCancellationToken.IsCancellationRequested) { Debug.Assert(ex.CancellationToken == externalCancellationToken); throw; } // If the Operation is TryTake, the exception will not be propagated, it will try another collection, and return false if timeout expired // or all collection completed addition if (operationMode == OperationMode.Take && millisecondsTimeout != Timeout.Infinite) { // Get the new handles and exclude the collections that completed addition handles = GetHandles(collectionsCopy, operationMode, externalCancellationToken, true, out collatedCancellationTokens); timeout = UpdateTimeOut(startTimeTicks, millisecondsTimeout); if (handles.Count == 0 || timeout == 0) { return OPERATION_FAILED; } continue; //retry for the updated handles } // Otherwise if it is Take, Add or Try Add ArgumentException will be thrown throw new ArgumentException( SR.GetString(SR.BlockingCollection_CantTakeAnyWhenDone), "collections"); } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); //If the timeout period was not exhausted and the appropriate operation succeeded. if (index != WaitHandle.WaitTimeout) { if (operationMode == OperationMode.Add) { if (collectionsCopy[index].TryAdd(item)) { return index; } } else if (operationMode == OperationMode.Take) { if (collectionsCopy[index].TryTake(out item)) { return index; } } } else { return OPERATION_FAILED; } // Update the timeout if (millisecondsTimeout > 0) { timeout = UpdateTimeOut(startTimeTicks, millisecondsTimeout); if (timeout <= 0) { return OPERATION_FAILED; } } } return OPERATION_FAILED; } /// /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections /// /// The blocking collections /// Add or Take operation /// The original CancellationToken /// True to exclude the compeleted collections /// Complete list of cancellationTokens to observe ///The collections wait handles private static ListGetHandles(BlockingCollection [] collections, OperationMode operationMode, CancellationToken externalCancellationToken, bool excludeCompleted, out CancellationToken[] cancellationTokens) { Debug.Assert(collections != null); List handlesList = new List (collections.Length); List tokensList = new List (collections.Length + 1); // + 1 for the external token tokensList.Add(externalCancellationToken); //Read the appropriate WaitHandle based on the operation mode. if (operationMode == OperationMode.Add) { for (int i = 0; i < collections.Length; i++) { if (collections[i].m_freeNodes != null) { handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle); tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token); } } } else { for (int i = 0; i < collections.Length; i++) { if (excludeCompleted && collections[i].IsCompleted) { continue; } handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle); tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token); } } cancellationTokens = tokensList.ToArray(); return handlesList; } /// /// Helper to perform WaitHandle.WaitAny(.., CancellationToken) /// this should eventually appear on the WaitHandle class. /// /// /// /// /// ///private static int WaitHandle_WaitAny(List handles, int millisecondsTimeout, CancellationToken combinedToken, CancellationToken externalToken) { WaitHandle[] handlesPlusCancelToken = new WaitHandle[handles.Count + 1]; // +1 for the combinedToken handle for (int i = 0; i < handles.Count; i++) handlesPlusCancelToken[i] = handles[i]; handlesPlusCancelToken[handles.Count] = combinedToken.WaitHandle; int retVal = WaitHandle.WaitAny(handlesPlusCancelToken, millisecondsTimeout, false); if (combinedToken.IsCancellationRequested) { //Add the original token if it is external cancellation if (externalToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalToken); throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled)); } return retVal; } /// /// Helper function to measure and update the wait time /// /// The first time (in Ticks) observed when the wait started /// The orginal wait timeoutout in milliseconds ///The new wait time in milliseconds, -1 if the time expired private static int UpdateTimeOut(long startTimeTicks, int originalWaitMillisecondsTimeout) { if (originalWaitMillisecondsTimeout == 0) { return 0; } // The function must be called in case the time out is not infinite Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite); long elapsedMilliseconds = (DateTime.UtcNow.Ticks - startTimeTicks) / TimeSpan.TicksPerMillisecond; // Check the elapsed milliseconds is greater than max int because this property is long if (elapsedMilliseconds > int.MaxValue) { return 0; } // Subtract the elapsed time from the current wait time int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ; if (currentWaitTimeout <= 0) { return 0; } return currentWaitTimeout; } ////// Takes an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. ///instances. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TakeFromAny may block until an item is available to be removed. public static int TakeFromAny(BlockingCollection[] collections, out T item) { #if DEBUG int tryTakeAnyReturnValue = #else return #endif TryTakeFromAny(collections, out item, Timeout.Infinite); #if DEBUG Debug.Assert((tryTakeAnyReturnValue >= 0 && tryTakeAnyReturnValue < collections.Length) , "TryTakeFromAny() was expected to return an index within the bounds of the collections array."); return tryTakeAnyReturnValue; #endif } /// /// Takes an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// A cancellation token to observe. ///instances. /// A is thrown if the is /// canceled. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. If the ///is canceled. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TakeFromAny may block until an item is available to be removed. public static int TakeFromAny(BlockingCollection[] collections, out T item, CancellationToken cancellationToken) { #if DEBUG int tryTakeAnyReturnValue = #else return #endif TryTakeFromAny(collections, out item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert((tryTakeAnyReturnValue >= 0 && tryTakeAnyReturnValue < collections.Length) , "TryTakeFromAny() was expected to return an index within the bounds of the collections array."); return tryTakeAnyReturnValue; #endif } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. ///instances. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item) { return TryTakeFromAny(collections, out item, 0); } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// Ainstances. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, TimeSpan timeout) { ValidateTimeout(timeout); T tempItem = default(T); int returnValue = TryAddTakeAny(collections, ref tempItem, (int)timeout.TotalMilliseconds, OperationMode.Take, new CancellationToken()); item = tempItem; return returnValue; } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// The number of milliseconds to wait, orinstances. /// (-1) to wait indefinitely. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); T tempItem = default(T); int returnValue = TryAddTakeAny(collections, ref tempItem, millisecondsTimeout, OperationMode.Take, new CancellationToken()); item = tempItem; return returnValue; } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// The number of milliseconds to wait, orinstances. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// A cancellation token to observe. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. If the ///is canceled. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); T tempItem = default(T); int returnValue = TryAddTakeAny(collections, ref tempItem, millisecondsTimeout, OperationMode.Take, cancellationToken); item = tempItem; return returnValue; } /// /// Marks the ///instances /// as not accepting any more additions. /// /// After a collection has been marked as complete for adding, adding to the collection is not permitted /// and attempts to remove from the collection will not wait when the collection is empty. /// ///The public void CompleteAdding() { CheckDisposed(); if (IsAddingCompleted) return; SpinWait spinner = new SpinWait(); while (true) { int observedAdders = m_currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); return; } if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders) { spinner.Reset(); while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); if (Count == 0) { CancelWaitingConsumers(); } // We should always wake waiting producers, and have them throw exceptions as // Add&CompleteAdding should not be used concurrently. CancelWaitingProducers(); return; } spinner.SpinOnce(); } } ///has been disposed. Cancels the semaphores. private void CancelWaitingConsumers() { m_ConsumersCancellationTokenSource.Cancel(); } private void CancelWaitingProducers() { m_ProducersCancellationTokenSource.Cancel(); } ////// Releases resources used by the public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } ///instance. /// /// Releases resources used by the /// Whether being disposed explicitly (true) or due to a finalizer (false). protected virtual void Dispose(bool disposing) { if (!m_isDisposed) { if (m_freeNodes != null) { m_freeNodes.Dispose(); } m_occupiedNodes.Dispose(); m_isDisposed = true; } } ///instance. /// Copies the items from the ///instance into a new array. An array containing copies of the elements of the collection. ///The ///has been disposed. /// The copied elements are not removed from the collection. /// public T[] ToArray() { CheckDisposed(); return m_collection.ToArray(); } ///Copies all of the items in the /// The one-dimensional array that is the destination of the elements copied from /// theinstance /// to a compatible one-dimensional array, starting at the specified index of the target array. /// instance. The array must have zero-based indexing. /// The zero-based index in at which copying begins. /// The ///argument is /// null. The ///argument is less than zero. The ///argument is equal to or greater /// than the length of the . The public void CopyTo(T[] array, int index) { ((ICollection)this).CopyTo(array, index); } ///has been disposed. Copies all of the items in the /// The one-dimensional array that is the destination of the elements copied from /// theinstance /// to a compatible one-dimensional array, starting at the specified index of the target array. /// instance. The array must have zero-based indexing. /// The zero-based index in at which copying begins. /// The ///argument is /// null. The ///argument is less than zero. The ///argument is equal to or greater /// than the length of the , the array is multidimensional, or the type parameter for the collection /// cannot be cast automatically to the type of the destination array. The void ICollection.CopyTo(Array array, int index) { CheckDisposed(); //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize //all array exceptions. T[] collectionSnapShot = m_collection.ToArray(); try { Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length); } catch (ArgumentNullException) { throw new ArgumentNullException("array"); } catch (ArgumentOutOfRangeException) { throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative)); } catch (ArgumentException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index"); } catch (RankException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array"); } catch (InvalidCastException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); } catch (ArrayTypeMismatchException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); } } ///has been disposed. Provides a consuming ///for items in the collection. An ///that removes and returns items from the collection. The public IEnumerablehas been disposed. GetConsumingEnumerable() { return GetConsumingEnumerable(CancellationToken.None); } /// Provides a consuming /// A cancellation token to observe. ///for items in the collection. /// Calling MoveNext on the returned enumerable will block if there is no data available, or will /// throw an if the is canceled. /// An ///that removes and returns items from the collection. The ///has been disposed. If the public IEnumerableis canceled. GetConsumingEnumerable(CancellationToken cancellationToken) { CancellationTokenSource linkedTokenSource = null; try { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); while (!IsCompleted) { T item; if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource)) { yield return item; } } } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } /// Provides an ///for items in the collection. An ///for the items in the collection. The IEnumeratorhas been disposed. IEnumerable .GetEnumerator() { CheckDisposed(); return m_collection.GetEnumerator(); } /// Provides an ///for items in the collection. An ///for the items in the collection. The IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerablehas been disposed. )this).GetEnumerator(); } /// Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny() /// and TryTakeFromAny(). /// The collections to/from which an item should be added/removed. /// Indicates whether this method is called to Add or Take. ///A copy of the collections array that acts as a defense to prevent an “outsider” from changing /// elements of the array after we have done the validation on them. ///If the collections argument is null. ///If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. ///If atleast one of the collections has been disposed. private static BlockingCollection[] ValidateCollectionsArray(BlockingCollection [] collections, OperationMode operationMode) { if (collections == null) { throw new ArgumentNullException("collections"); } else if (collections.Length < 1) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections"); } else if ((Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA && collections.Length > 63) || (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA && collections.Length > 62)) //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken { throw new ArgumentOutOfRangeException( "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize)); } BlockingCollection [] collectionsCopy = new BlockingCollection [collections.Length]; for (int i = 0; i < collectionsCopy.Length; ++i) { collectionsCopy[i] = collections[i]; if (collectionsCopy[i] == null) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections"); } if (collectionsCopy[i].m_isDisposed) throw new ObjectDisposedException( "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems)); if (operationMode == OperationMode.Add && collectionsCopy[i].IsAddingCompleted) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_CantTakeAnyWhenDone), "collections"); } } return collectionsCopy; } // --------- // Private Helpers. /// Centeralizes the logic of validating the timeout input argument. /// The TimeSpan to wait for to successfully complete an operation on the collection. ///If the number of millseconds represented by the timeout /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite private static void ValidateTimeout(TimeSpan timeout) { long totalMilliseconds = (long)timeout.TotalMilliseconds; if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite)) { throw new ArgumentOutOfRangeException("timeout", timeout, String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); } } ///Centralizes the logic of validating the millisecondsTimeout input argument. /// The number of milliseconds to wait for to successfully complete an /// operation on the collection. ///If the number of millseconds is less than 0 and not /// equal to Timeout.Infinite. private static void ValidateMillisecondsTimeout(int millisecondsTimeout) { if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite)) { throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout, String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); } } ///Throws a System.ObjectDisposedException if the collection was disposed ///If the collection has been disposed. private void CheckDisposed() { if (m_isDisposed) { throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed)); } } } ///A debugger view of the blocking collection that makes it simple to browse the /// collection's contents at a point in time. ///The type of element that the BlockingCollection will hold. internal sealed class SystemThreadingCollections_BlockingCollectionDebugView{ private BlockingCollection m_blockingCollection; // The collection being viewed. /// Constructs a new debugger view object for the provided blocking collection object. /// A blocking collection to browse in the debugger. public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollectioncollection) { if (collection == null) { throw new ArgumentNullException("collection"); } m_blockingCollection = collection; } /// Returns a snapshot of the underlying collection's elements. [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get { return m_blockingCollection.ToArray(); } } } } #pragma warning restore 0420 // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // BlockingCollection.cs // //[....] // // A class that implements the bounding and blocking functionality while abstracting away // the underlying storage mechanism. This file also contains BlockingCollection's // associated debugger view type. // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- #pragma warning disable 0420 using System; using System.Collections.Generic; using System.Collections; using System.Diagnostics; using System.Globalization; using System.Security.Permissions; using System.Runtime.InteropServices; using System.Threading; namespace System.Collections.Concurrent { ////// Provides blocking and bounding capabilities for thread-safe collections that /// implement ///. /// /// ///represents a collection /// that allows for thread-safe adding and removing of data. /// is used as a wrapper /// for an instance, allowing /// removal attempts from the collection to block until data is available to be removed. Similarly, /// a can be created to enforce /// an upper-bound on the number of data elements allowed in the /// ; addition attempts to the /// collection may then block until space is available to store the added items. In this manner, /// is similar to a traditional /// blocking queue data structure, except that the underlying data storage mechanism is abstracted /// away as an . /// Specifies the type of elements in the collection. [ComVisible(false)] [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))] [DebuggerDisplay("Count = {Count}, Type = {m_collection}")] public class BlockingCollection: IEnumerable , ICollection, IDisposable { private IProducerConsumerCollection m_collection; private int m_boundedCapacity; private const int NON_BOUNDED = -1; private SemaphoreSlim m_freeNodes; private SemaphoreSlim m_occupiedNodes; private volatile bool m_isDisposed; private CancellationTokenSource m_ConsumersCancellationTokenSource; private CancellationTokenSource m_ProducersCancellationTokenSource; private volatile int m_currentAdders; private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000); #region Enums /// An enumerated data type used internal to the class to specify to a generic method /// the current mode of operation. private enum OperationMode { Add, Take }; #endregion #region Properties ///Gets the bounded capacity of this ///instance. The bounded capacity of this collection, or int.MaxValue if no bound was supplied. ///The public int BoundedCapacity { get { CheckDisposed(); return m_boundedCapacity; } } ///has been disposed. Gets whether this ///has been marked as complete for adding. Whether this collection has been marked as complete for adding. ///The public bool IsAddingCompleted { get { CheckDisposed(); return (m_currentAdders == COMPLETE_ADDING_ON_MASK); } } ///has been disposed. Gets whether this ///has been marked as complete for adding and is empty. Whether this collection has been marked as complete for adding and is empty. ///The public bool IsCompleted { get { CheckDisposed(); return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0)); } } ///has been disposed. Gets the number of items contained in the ///. The number of items contained in the ///. The public int Count { get { CheckDisposed(); return m_occupiedNodes.CurrentCount; } } ///has been disposed. Gets a value indicating whether access to the ///is synchronized. The bool ICollection.IsSynchronized { get { CheckDisposed(); return false; } } ///has been disposed. /// Gets an object that can be used to synchronize access to the ///. This property is not supported. /// The SyncRoot property is not supported. object ICollection.SyncRoot { get { throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); } } #endregion ///Initializes a new instance of the /// ////// class without an upper-bound. /// /// The default underlying collection is a public BlockingCollection() : this(new ConcurrentQueueConcurrentQueue<T> . ///()) { } /// Initializes a new instance of the /// The bounded size of the collection. ////// class with the specified upper-bound. /// The ///is /// not a positive value. /// The default underlying collection is a public BlockingCollection(int boundedCapacity) : this(new ConcurrentQueueConcurrentQueue<T> . ///(), boundedCapacity) { } /// Initializes a new instance of the /// The collection to use as the underlying data store. /// The bounded size of the collection. ////// class with the specified upper-bound and using the provided /// as its underlying data store. The ///argument is /// null. The ///is not a positive value. The supplied public BlockingCollection(IProducerConsumerCollectioncontains more values /// than is permitted by . collection, int boundedCapacity) { if (boundedCapacity < 1) { throw new ArgumentOutOfRangeException( "boundedCapacity", boundedCapacity, SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange)); } if (collection == null) { throw new ArgumentNullException("collection"); } int count = collection.Count; if (count > boundedCapacity) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity)); } Initialize(collection, boundedCapacity, count); } /// Initializes a new instance of the /// The collection to use as the underlying data store. ////// class without an upper-bound and using the provided /// as its underlying data store. The public BlockingCollection(IProducerConsumerCollectionargument is /// null. collection) { if (collection == null) { throw new ArgumentNullException("collection"); } Initialize(collection, NON_BOUNDED, collection.Count); } /// Initializes the BlockingCollection instance. /// The collection to use as the underlying data store. /// The bounded size of the collection. /// The number of items currently in the underlying collection. private void Initialize(IProducerConsumerCollectioncollection, int boundedCapacity, int collectionCount) { Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED); m_collection = collection; m_boundedCapacity = boundedCapacity; ; m_isDisposed = false; m_ConsumersCancellationTokenSource = new CancellationTokenSource(); m_ProducersCancellationTokenSource = new CancellationTokenSource(); if (boundedCapacity == NON_BOUNDED) { m_freeNodes = null; } else { Debug.Assert(boundedCapacity > 0); m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount); } m_occupiedNodes = new SemaphoreSlim(collectionCount); } /// /// Adds the item to the /// The item to be added to the collection. The value can be a null reference. ///. /// The ///has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection didn't accept the item. ////// If a bounded capacity was specified when this instance of /// public void Add(T item) { #if DEBUG bool tryAddReturnValue = #endif TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken()); #if DEBUG Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); #endif } ///was initialized, /// a call to Add may block until space is available to store the provided item. /// /// Adds the item to the /// The item to be added to the collection. The value can be a null reference. /// A cancellation token to observe. ///. /// A is thrown if the is /// canceled. /// If the ///is canceled. The ///has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection didn't accept the item. ////// If a bounded capacity was specified when this instance of /// public void Add(T item, CancellationToken cancellationToken) { #if DEBUG bool tryAddReturnValue = #endif TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); #endif } ///was initialized, /// a call to may block until space is available to store the provided item. /// /// Attempts to add the specified item to the /// The item to be added to the collection. ///. /// true if the ///could be added; otherwise, false. The ///has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection didn't accept the item. public bool TryAdd(T item) { return TryAddWithNoTimeValidation(item, 0, new CancellationToken()); } ////// Attempts to add the specified item to the /// The item to be added to the collection. /// A. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// true if the ///could be added to the collection within /// the alloted time; otherwise, false. The ///has been marked /// as complete with regards to additions. The ///has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . The underlying collection didn't accept the item. public bool TryAdd(T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken()); } ////// Attempts to add the specified item to the /// The item to be added to the collection. /// The number of milliseconds to wait, or. /// (-1) to wait indefinitely. /// true if the ///could be added to the collection within /// the alloted time; otherwise, false. The ///has been marked /// as complete with regards to additions. The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection didn't accept the item. public bool TryAdd(T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken()); } ////// Attempts to add the specified item to the /// The item to be added to the collection. /// The number of milliseconds to wait, or. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// A cancellation token to observe. /// true if the ///could be added to the collection within /// the alloted time; otherwise, false. If the ///is canceled. The ///has been marked /// as complete with regards to additions. The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection didn't accept the item. public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken); } ///Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add /// method. If a bounded capacity was specified and the collection was full, /// this method will wait for, at most, the timeout period trying to add the item. /// If the timeout period was exhaused before successfully adding the item this method will /// return false. /// The item to be added to the collection. /// The number of milliseconds to wait for the collection to accept the item, /// or Timeout.Infinite to wait indefinitely. /// A cancellation token to observe. ///False if the collection remained full till the timeout period was exhausted.True otherwise. ///If the ///is canceled. the collection has already been marked /// as complete with regards to additions. ///If the collection has been disposed. ///The underlying collection didn't accept the item. private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken) { CheckDisposed(); if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); if (IsAddingCompleted) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); } bool waitForSemaphoreWasSuccessful = true; if (m_freeNodes != null) { //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding() //was called concurrently with Adding which is not supported by BlockingCollection. CancellationTokenSource linkedTokenSource = null; try { waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken, m_ProducersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } catch (OperationCanceledException) { //if cancellation was via external token, throw an OCE if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx. //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested); throw new InvalidOperationException (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd)); } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } if (waitForSemaphoreWasSuccessful) { // Update the adders count if the complete adding was not requested, otherwise // spins until all adders finish then throw IOE // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have // not been finished yet SpinWait spinner = new SpinWait(); while (true) { int observedAdders = m_currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // CompleteAdding is requested, spin then throw while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); } if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders) { Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit."); break; } spinner.SpinOnce(); } // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if: // 1- m_collection.TryAdd threw an exception // 2- m_collection.TryAdd succeeded // 3- m_collection.TryAdd returned false // so we put the decrement code in the finally block try { //TryAdd is guaranteed to find a place to add the element. Its return value depends //on the semantics of the underlying store. Some underlying stores will not add an already //existing item and thus TryAdd returns false indicating that the size of the underlying //store did not increase. bool addingSucceeded = false; try { //The token may have been canceled before the collection had space available, so we need a check after the wait has completed. //This fixes bug #702328, case 2 of 2. cancellationToken.ThrowIfCancellationRequested(); addingSucceeded = m_collection.TryAdd(item); } catch { //TryAdd did not result in increasing the size of the underlying store and hence we need //to increment back the count of the m_freeNodes semaphore. if (m_freeNodes != null) { m_freeNodes.Release(); } throw; } if (addingSucceeded) { //After adding an element to the underlying storage, signal to the consumers //waiting on m_occupiedNodes that there is a new item added ready to be consumed. m_occupiedNodes.Release(); } else { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed)); } } finally { // decrement the adders count Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0); Interlocked.Decrement(ref m_currentAdders); } } return waitForSemaphoreWasSuccessful; } ///Takes an item from the ///. The item removed from the collection. ///The ///is empty and has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection was modified /// outside of this ///instance. A call to public T Take() { T item; if (!TryTake(out item, Timeout.Infinite, CancellationToken.None)) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); } return item; } ///may block until an item is available to be removed. Takes an item from the ///. The item removed from the collection. ///If the ///is /// canceled or the is empty and has been marked /// as complete with regards to additions. The ///has been disposed. The underlying collection was modified /// outside of this ///instance. A call to public T Take(CancellationToken cancellationToken) { T item; if (!TryTake(out item, Timeout.Infinite, cancellationToken)) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); } return item; } ///may block until an item is available to be removed. /// Attempts to remove an item from the /// The item removed from the collection. ///. /// true if an item could be removed; otherwise, false. ///The ///has been disposed. The underlying collection was modified /// outside of this public bool TryTake(out T item) { return TryTake(out item, 0, CancellationToken.None); } ///instance. /// Attempts to remove an item from the /// The item removed from the collection. /// A. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. ///The ///has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . The underlying collection was modified /// outside of this public bool TryTake(out T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null); } ///instance. /// Attempts to remove an item from the /// The item removed from the collection. /// The number of milliseconds to wait, or. /// (-1) to wait indefinitely. /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. ///The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection was modified /// outside of this public bool TryTake(out T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null); } ///instance. /// Attempts to remove an item from the /// The item removed from the collection. /// The number of milliseconds to wait, or. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// A cancellation token to observe. /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. ///If the ///is canceled. The ///has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. The underlying collection was modified /// outside of this public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null); } ///instance. Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false) /// trying to remove an item. If the timeout period was exhaused before successfully removing an item /// this method will return false. /// A /// The item removed from the collection. /// The number of milliseconds to wait for the collection to have an item available /// for removal, or Timeout.Infinite to wait indefinitely. /// A cancellation token to observe. /// A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token /// multiple times. ///is thrown if the is /// canceled. /// False if the collection remained empty till the timeout period was exhausted. True otherwise. ///If the ///is canceled. If the collection has been disposed. private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource) { CheckDisposed(); item = default(T); if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); //If the collection is completed then there is no need to wait. if (IsCompleted) { return false; } bool waitForSemaphoreWasSuccessful = false; // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable) CancellationTokenSource linkedTokenSource = combinedTokenSource; try { waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { // create the linked token if it is not created yet if (combinedTokenSource == null) linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } //The collection became completed while waiting on the semaphore. catch (OperationCanceledException) { if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); return false; } finally { // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it if (linkedTokenSource != null && combinedTokenSource == null) { linkedTokenSource.Dispose(); } } if (waitForSemaphoreWasSuccessful) { bool removeSucceeded = false; bool removeFaulted = true; try { //The token may have been canceled before an item arrived, so we need a check after the wait has completed. //This fixes bug #702328, case 1 of 2. cancellationToken.ThrowIfCancellationRequested(); //If an item was successfully removed from the underlying collection. removeSucceeded = m_collection.TryTake(out item); removeFaulted = false; if (!removeSucceeded) { // Check if the collection is empty which means that the collection was modified outside BlockingCollection throw new InvalidOperationException (SR.GetString(SR.BlockingCollection_Take_CollectionModified)); } } finally { // removeFaulted implies !removeSucceeded, but the reverse is not true. if (removeSucceeded) { if (m_freeNodes != null) { Debug.Assert(m_boundedCapacity != NON_BOUNDED); m_freeNodes.Release(); } } else if (removeFaulted) { m_occupiedNodes.Release(); } //Last remover will detect that it has actually removed the last item from the //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers //but this is not a problem. if (IsCompleted) { CancelWaitingConsumers(); } } } return waitForSemaphoreWasSuccessful; } ////// Adds the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. ///instances. /// The index of the collection in the ///array to which the item was added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. At least one underlying collection didn't accept the item. ///The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. /// If a bounded capacity was specified when all of the /// public static int AddToAny(BlockingCollectioninstances were initialized, /// a call to AddToAny may block until space is available in one of the collections /// to store the provided item. /// [] collections, T item) { #if DEBUG int tryAddAnyReturnValue = #else return #endif TryAddToAny(collections, item, Timeout.Infinite, new CancellationToken()); #if DEBUG Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) , "TryAddToAny() was expected to return an index within the bounds of the collections array."); return tryAddAnyReturnValue; #endif } /// /// Adds the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// A cancellation token to observe. ///instances. /// A is thrown if the is /// canceled. /// The index of the collection in the ///array to which the item was added. If the ///is canceled. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. At least one underlying collection didn't accept the item. ///The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. /// If a bounded capacity was specified when all of the /// public static int AddToAny(BlockingCollectioninstances were initialized, /// a call to AddToAny may block until space is available in one of the collections /// to store the provided item. /// [] collections, T item, CancellationToken cancellationToken) { #if DEBUG int tryAddAnyReturnValue = #else return #endif TryAddToAny(collections, item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) , "TryAddToAny() was expected to return an index within the bounds of the collections array."); return tryAddAnyReturnValue; #endif } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. ///instances. /// The index of the collection in the ////// array to which the item was added, or -1 if the item could not be added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item) { return TryAddToAny(collections, item, 0, new CancellationToken()); } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// Ainstances. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// The index of the collection in the ////// array to which the item was added, or -1 if the item could not be added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryAddTakeAny(collections, ref item, (int)timeout.TotalMilliseconds, OperationMode.Add, new CancellationToken()); } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// The number of milliseconds to wait, orinstances. /// (-1) to wait indefinitely. /// The index of the collection in the ////// array to which the item was added, or -1 if the item could not be added. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddTakeAny(collections, ref item, millisecondsTimeout, OperationMode.Add, new CancellationToken()); } /// /// Attempts to add the specified item to any one of the specified /// /// The array of collections. /// The item to be added to one of the collections. /// The number of milliseconds to wait, orinstances. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// The index of the collection in the /// A cancellation token to observe. ////// array to which the item was added, or -1 if the item could not be added. If the ///is canceled. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one underlying collection didn't accept the item. ///The count of public static int TryAddToAny(BlockingCollectionis greater than the maximum size of /// 62 for STA and 63 for MTA. [] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddTakeAny(collections, ref item, millisecondsTimeout, OperationMode.Add, cancellationToken); } /// Adds/Takes an item to/from anyone of the specified collections. /// A /// The collections into which the item can be added. /// The item to be added or the item removed and returned to the caller. /// The number of milliseconds to wait for a collection to accept the /// operation, or -1 to wait indefinitely. /// Indicates whether this method is called to Add or Take. /// A cancellation token to observe. ///is thrown if the is /// canceled. /// The index into collections for the collection which accepted the /// adding/removal of the item; -1 if the item could not be added/removed. ///If the ///is canceled. If the collections argument is null. ///If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. ///If atleast one of the collections has been disposed. private static int TryAddTakeAny(BlockingCollection[] collections, ref T item, int millisecondsTimeout, OperationMode operationMode, CancellationToken externalCancellationToken) { BlockingCollection [] collectionsCopy = ValidateCollectionsArray(collections, operationMode); #if DEBUG collections = null; // Ensure we always go through collectionsCopy from now on. #endif const int OPERATION_FAILED = -1; // Copy the wait time to another local variable to update it int timeout = millisecondsTimeout; long startTimeTicks = 0; if (millisecondsTimeout != Timeout.Infinite) { startTimeTicks = DateTime.UtcNow.Ticks; } // Fast path for adding if there is at least one unbounded collection if (operationMode == OperationMode.Add) { for (int i = 0; i < collectionsCopy.Length; i++) { if (collectionsCopy[i].m_freeNodes == null) { #if DEBUG bool result = #endif collectionsCopy[i].TryAdd(item); #if DEBUG Debug.Assert(result); #endif return i; } } } // Get wait handles and the tokens for all collections CancellationToken[] collatedCancellationTokens; List handles = GetHandles(collectionsCopy, operationMode, externalCancellationToken, false, out collatedCancellationTokens); //Loop until the timeout period is exhausted or the operation was successful. while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) { #region Update Timeout #endregion //Wait for any collection to become available. int index = -1; CancellationTokenSource linkedTokenSource = null; try { index = WaitHandle_WaitAny(handles, 0, externalCancellationToken, externalCancellationToken); if (index == WaitHandle.WaitTimeout) { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens); index = WaitHandle_WaitAny(handles, timeout, linkedTokenSource.Token, externalCancellationToken); } } catch (OperationCanceledException ex) { // throw OCE if it is external cancellation if (externalCancellationToken.IsCancellationRequested) { Debug.Assert(ex.CancellationToken == externalCancellationToken); throw; } // If the Operation is TryTake, the exception will not be propagated, it will try another collection, and return false if timeout expired // or all collection completed addition if (operationMode == OperationMode.Take && millisecondsTimeout != Timeout.Infinite) { // Get the new handles and exclude the collections that completed addition handles = GetHandles(collectionsCopy, operationMode, externalCancellationToken, true, out collatedCancellationTokens); timeout = UpdateTimeOut(startTimeTicks, millisecondsTimeout); if (handles.Count == 0 || timeout == 0) { return OPERATION_FAILED; } continue; //retry for the updated handles } // Otherwise if it is Take, Add or Try Add ArgumentException will be thrown throw new ArgumentException( SR.GetString(SR.BlockingCollection_CantTakeAnyWhenDone), "collections"); } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); //If the timeout period was not exhausted and the appropriate operation succeeded. if (index != WaitHandle.WaitTimeout) { if (operationMode == OperationMode.Add) { if (collectionsCopy[index].TryAdd(item)) { return index; } } else if (operationMode == OperationMode.Take) { if (collectionsCopy[index].TryTake(out item)) { return index; } } } else { return OPERATION_FAILED; } // Update the timeout if (millisecondsTimeout > 0) { timeout = UpdateTimeOut(startTimeTicks, millisecondsTimeout); if (timeout <= 0) { return OPERATION_FAILED; } } } return OPERATION_FAILED; } /// /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections /// /// The blocking collections /// Add or Take operation /// The original CancellationToken /// True to exclude the compeleted collections /// Complete list of cancellationTokens to observe ///The collections wait handles private static ListGetHandles(BlockingCollection [] collections, OperationMode operationMode, CancellationToken externalCancellationToken, bool excludeCompleted, out CancellationToken[] cancellationTokens) { Debug.Assert(collections != null); List handlesList = new List (collections.Length); List tokensList = new List (collections.Length + 1); // + 1 for the external token tokensList.Add(externalCancellationToken); //Read the appropriate WaitHandle based on the operation mode. if (operationMode == OperationMode.Add) { for (int i = 0; i < collections.Length; i++) { if (collections[i].m_freeNodes != null) { handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle); tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token); } } } else { for (int i = 0; i < collections.Length; i++) { if (excludeCompleted && collections[i].IsCompleted) { continue; } handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle); tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token); } } cancellationTokens = tokensList.ToArray(); return handlesList; } /// /// Helper to perform WaitHandle.WaitAny(.., CancellationToken) /// this should eventually appear on the WaitHandle class. /// /// /// /// /// ///private static int WaitHandle_WaitAny(List handles, int millisecondsTimeout, CancellationToken combinedToken, CancellationToken externalToken) { WaitHandle[] handlesPlusCancelToken = new WaitHandle[handles.Count + 1]; // +1 for the combinedToken handle for (int i = 0; i < handles.Count; i++) handlesPlusCancelToken[i] = handles[i]; handlesPlusCancelToken[handles.Count] = combinedToken.WaitHandle; int retVal = WaitHandle.WaitAny(handlesPlusCancelToken, millisecondsTimeout, false); if (combinedToken.IsCancellationRequested) { //Add the original token if it is external cancellation if (externalToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalToken); throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled)); } return retVal; } /// /// Helper function to measure and update the wait time /// /// The first time (in Ticks) observed when the wait started /// The orginal wait timeoutout in milliseconds ///The new wait time in milliseconds, -1 if the time expired private static int UpdateTimeOut(long startTimeTicks, int originalWaitMillisecondsTimeout) { if (originalWaitMillisecondsTimeout == 0) { return 0; } // The function must be called in case the time out is not infinite Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite); long elapsedMilliseconds = (DateTime.UtcNow.Ticks - startTimeTicks) / TimeSpan.TicksPerMillisecond; // Check the elapsed milliseconds is greater than max int because this property is long if (elapsedMilliseconds > int.MaxValue) { return 0; } // Subtract the elapsed time from the current wait time int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ; if (currentWaitTimeout <= 0) { return 0; } return currentWaitTimeout; } ////// Takes an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. ///instances. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TakeFromAny may block until an item is available to be removed. public static int TakeFromAny(BlockingCollection[] collections, out T item) { #if DEBUG int tryTakeAnyReturnValue = #else return #endif TryTakeFromAny(collections, out item, Timeout.Infinite); #if DEBUG Debug.Assert((tryTakeAnyReturnValue >= 0 && tryTakeAnyReturnValue < collections.Length) , "TryTakeFromAny() was expected to return an index within the bounds of the collections array."); return tryTakeAnyReturnValue; #endif } /// /// Takes an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// A cancellation token to observe. ///instances. /// A is thrown if the is /// canceled. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. If the ///is canceled. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TakeFromAny may block until an item is available to be removed. public static int TakeFromAny(BlockingCollection[] collections, out T item, CancellationToken cancellationToken) { #if DEBUG int tryTakeAnyReturnValue = #else return #endif TryTakeFromAny(collections, out item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert((tryTakeAnyReturnValue >= 0 && tryTakeAnyReturnValue < collections.Length) , "TryTakeFromAny() was expected to return an index within the bounds of the collections array."); return tryTakeAnyReturnValue; #endif } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. ///instances. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item) { return TryTakeFromAny(collections, out item, 0); } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// Ainstances. /// that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, TimeSpan timeout) { ValidateTimeout(timeout); T tempItem = default(T); int returnValue = TryAddTakeAny(collections, ref tempItem, (int)timeout.TotalMilliseconds, OperationMode.Take, new CancellationToken()); item = tempItem; return returnValue; } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// The number of milliseconds to wait, orinstances. /// (-1) to wait indefinitely. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); T tempItem = default(T); int returnValue = TryAddTakeAny(collections, ref tempItem, millisecondsTimeout, OperationMode.Take, new CancellationToken()); item = tempItem; return returnValue; } /// /// Attempts to remove an item from any one of the specified /// /// The array of collections. /// The item removed from one of the collections. /// The number of milliseconds to wait, orinstances. /// A is thrown if the is /// canceled. /// (-1) to wait indefinitely. /// A cancellation token to observe. /// The index of the collection in the ///array from which /// the item was removed, or -1 if an item could not be removed. If the ///is canceled. The ///argument is /// null. The ///argument is /// a 0-length array or contains a null element. At least one of the ///instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. At least one of the underlying collections was modified /// outside of its ///instance. The count of ///is greater than the maximum size of /// 62 for STA and 63 for MTA. A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); T tempItem = default(T); int returnValue = TryAddTakeAny(collections, ref tempItem, millisecondsTimeout, OperationMode.Take, cancellationToken); item = tempItem; return returnValue; } /// /// Marks the ///instances /// as not accepting any more additions. /// /// After a collection has been marked as complete for adding, adding to the collection is not permitted /// and attempts to remove from the collection will not wait when the collection is empty. /// ///The public void CompleteAdding() { CheckDisposed(); if (IsAddingCompleted) return; SpinWait spinner = new SpinWait(); while (true) { int observedAdders = m_currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); return; } if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders) { spinner.Reset(); while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); if (Count == 0) { CancelWaitingConsumers(); } // We should always wake waiting producers, and have them throw exceptions as // Add&CompleteAdding should not be used concurrently. CancelWaitingProducers(); return; } spinner.SpinOnce(); } } ///has been disposed. Cancels the semaphores. private void CancelWaitingConsumers() { m_ConsumersCancellationTokenSource.Cancel(); } private void CancelWaitingProducers() { m_ProducersCancellationTokenSource.Cancel(); } ////// Releases resources used by the public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } ///instance. /// /// Releases resources used by the /// Whether being disposed explicitly (true) or due to a finalizer (false). protected virtual void Dispose(bool disposing) { if (!m_isDisposed) { if (m_freeNodes != null) { m_freeNodes.Dispose(); } m_occupiedNodes.Dispose(); m_isDisposed = true; } } ///instance. /// Copies the items from the ///instance into a new array. An array containing copies of the elements of the collection. ///The ///has been disposed. /// The copied elements are not removed from the collection. /// public T[] ToArray() { CheckDisposed(); return m_collection.ToArray(); } ///Copies all of the items in the /// The one-dimensional array that is the destination of the elements copied from /// theinstance /// to a compatible one-dimensional array, starting at the specified index of the target array. /// instance. The array must have zero-based indexing. /// The zero-based index in at which copying begins. /// The ///argument is /// null. The ///argument is less than zero. The ///argument is equal to or greater /// than the length of the . The public void CopyTo(T[] array, int index) { ((ICollection)this).CopyTo(array, index); } ///has been disposed. Copies all of the items in the /// The one-dimensional array that is the destination of the elements copied from /// theinstance /// to a compatible one-dimensional array, starting at the specified index of the target array. /// instance. The array must have zero-based indexing. /// The zero-based index in at which copying begins. /// The ///argument is /// null. The ///argument is less than zero. The ///argument is equal to or greater /// than the length of the , the array is multidimensional, or the type parameter for the collection /// cannot be cast automatically to the type of the destination array. The void ICollection.CopyTo(Array array, int index) { CheckDisposed(); //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize //all array exceptions. T[] collectionSnapShot = m_collection.ToArray(); try { Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length); } catch (ArgumentNullException) { throw new ArgumentNullException("array"); } catch (ArgumentOutOfRangeException) { throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative)); } catch (ArgumentException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index"); } catch (RankException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array"); } catch (InvalidCastException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); } catch (ArrayTypeMismatchException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); } } ///has been disposed. Provides a consuming ///for items in the collection. An ///that removes and returns items from the collection. The public IEnumerablehas been disposed. GetConsumingEnumerable() { return GetConsumingEnumerable(CancellationToken.None); } /// Provides a consuming /// A cancellation token to observe. ///for items in the collection. /// Calling MoveNext on the returned enumerable will block if there is no data available, or will /// throw an if the is canceled. /// An ///that removes and returns items from the collection. The ///has been disposed. If the public IEnumerableis canceled. GetConsumingEnumerable(CancellationToken cancellationToken) { CancellationTokenSource linkedTokenSource = null; try { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); while (!IsCompleted) { T item; if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource)) { yield return item; } } } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } /// Provides an ///for items in the collection. An ///for the items in the collection. The IEnumeratorhas been disposed. IEnumerable .GetEnumerator() { CheckDisposed(); return m_collection.GetEnumerator(); } /// Provides an ///for items in the collection. An ///for the items in the collection. The IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerablehas been disposed. )this).GetEnumerator(); } /// Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny() /// and TryTakeFromAny(). /// The collections to/from which an item should be added/removed. /// Indicates whether this method is called to Add or Take. ///A copy of the collections array that acts as a defense to prevent an “outsider” from changing /// elements of the array after we have done the validation on them. ///If the collections argument is null. ///If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. ///If atleast one of the collections has been disposed. private static BlockingCollection[] ValidateCollectionsArray(BlockingCollection [] collections, OperationMode operationMode) { if (collections == null) { throw new ArgumentNullException("collections"); } else if (collections.Length < 1) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections"); } else if ((Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA && collections.Length > 63) || (Thread.CurrentThread.GetApartmentState() == ApartmentState.STA && collections.Length > 62)) //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken { throw new ArgumentOutOfRangeException( "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize)); } BlockingCollection [] collectionsCopy = new BlockingCollection [collections.Length]; for (int i = 0; i < collectionsCopy.Length; ++i) { collectionsCopy[i] = collections[i]; if (collectionsCopy[i] == null) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections"); } if (collectionsCopy[i].m_isDisposed) throw new ObjectDisposedException( "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems)); if (operationMode == OperationMode.Add && collectionsCopy[i].IsAddingCompleted) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_CantTakeAnyWhenDone), "collections"); } } return collectionsCopy; } // --------- // Private Helpers. /// Centeralizes the logic of validating the timeout input argument. /// The TimeSpan to wait for to successfully complete an operation on the collection. ///If the number of millseconds represented by the timeout /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite private static void ValidateTimeout(TimeSpan timeout) { long totalMilliseconds = (long)timeout.TotalMilliseconds; if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite)) { throw new ArgumentOutOfRangeException("timeout", timeout, String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); } } ///Centralizes the logic of validating the millisecondsTimeout input argument. /// The number of milliseconds to wait for to successfully complete an /// operation on the collection. ///If the number of millseconds is less than 0 and not /// equal to Timeout.Infinite. private static void ValidateMillisecondsTimeout(int millisecondsTimeout) { if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite)) { throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout, String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); } } ///Throws a System.ObjectDisposedException if the collection was disposed ///If the collection has been disposed. private void CheckDisposed() { if (m_isDisposed) { throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed)); } } } ///A debugger view of the blocking collection that makes it simple to browse the /// collection's contents at a point in time. ///The type of element that the BlockingCollection will hold. internal sealed class SystemThreadingCollections_BlockingCollectionDebugView{ private BlockingCollection m_blockingCollection; // The collection being viewed. /// Constructs a new debugger view object for the provided blocking collection object. /// A blocking collection to browse in the debugger. public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollectioncollection) { if (collection == null) { throw new ArgumentNullException("collection"); } m_blockingCollection = collection; } /// Returns a snapshot of the underlying collection's elements. [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get { return m_blockingCollection.ToArray(); } } } } #pragma warning restore 0420 // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- CodeTypeDeclarationCollection.cs
- Translator.cs
- OracleRowUpdatingEventArgs.cs
- FormsAuthenticationConfiguration.cs
- PeerApplicationLaunchInfo.cs
- EventLogPermissionAttribute.cs
- EventLog.cs
- HotCommands.cs
- MarkupExtensionParser.cs
- MemoryRecordBuffer.cs
- ToolBarButton.cs
- WebServiceReceive.cs
- CompareInfo.cs
- XmlSerializerVersionAttribute.cs
- RemotingServices.cs
- SqlParameter.cs
- KerberosRequestorSecurityToken.cs
- StringSorter.cs
- InputMethod.cs
- PixelFormatConverter.cs
- WorkflowTransactionService.cs
- FontConverter.cs
- StringUtil.cs
- IntermediatePolicyValidator.cs
- SafeLibraryHandle.cs
- ObjectPropertyMapping.cs
- SqlProviderManifest.cs
- ReadOnlyHierarchicalDataSourceView.cs
- WebRequestModuleElement.cs
- ProcessThread.cs
- TextEditorLists.cs
- PresentationAppDomainManager.cs
- IdnMapping.cs
- ButtonAutomationPeer.cs
- ComponentConverter.cs
- WebPartZoneCollection.cs
- DPTypeDescriptorContext.cs
- NamespaceList.cs
- GcHandle.cs
- VerticalAlignConverter.cs
- AssociationProvider.cs
- ActivityPreviewDesigner.cs
- PageAsyncTaskManager.cs
- PrintingPermissionAttribute.cs
- DisplayInformation.cs
- AlignmentYValidation.cs
- DateTime.cs
- Pair.cs
- CustomCredentialPolicy.cs
- MessageFault.cs
- GridViewDeletedEventArgs.cs
- CqlLexer.cs
- IDispatchConstantAttribute.cs
- BodyGlyph.cs
- DataGridViewRowCancelEventArgs.cs
- DeferrableContentConverter.cs
- Journaling.cs
- BaseCollection.cs
- Substitution.cs
- validation.cs
- FontInfo.cs
- DataGridViewSelectedCellsAccessibleObject.cs
- ConfigXmlText.cs
- SqlFileStream.cs
- TempEnvironment.cs
- SelectedGridItemChangedEvent.cs
- DrawingGroup.cs
- KeyPullup.cs
- KeyPressEvent.cs
- ListViewPagedDataSource.cs
- CanExpandCollapseAllConverter.cs
- BitmapInitialize.cs
- XPathDescendantIterator.cs
- TemplateModeChangedEventArgs.cs
- cookie.cs
- PreDigestedSignedInfo.cs
- Utils.cs
- MetadataImporterQuotas.cs
- EpmSyndicationContentSerializer.cs
- SafeLibraryHandle.cs
- BitmapEffectDrawing.cs
- FloatSumAggregationOperator.cs
- Decoder.cs
- StringDictionary.cs
- ExpressionNode.cs
- DeclarativeExpressionConditionDeclaration.cs
- HtmlHead.cs
- StructuredTypeEmitter.cs
- _LoggingObject.cs
- WebConfigurationHostFileChange.cs
- PhysicalOps.cs
- COM2EnumConverter.cs
- CompilerGlobalScopeAttribute.cs
- CryptoKeySecurity.cs
- DebuggerService.cs
- InertiaTranslationBehavior.cs
- EntityModelBuildProvider.cs
- Hyperlink.cs
- TranslateTransform.cs
- DataGridRelationshipRow.cs