Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Collections / Concurrent / PartitionerStatic.cs / 1305376 / PartitionerStatic.cs
#pragma warning disable 0420 // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // PartitionerStatic.cs // //[....] // // A class of default partitioners for Partitioner// // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Security.Permissions; using System.Threading; using System.Diagnostics.Contracts; using System.Runtime.InteropServices; namespace System.Collections.Concurrent { // The static class Partitioners implements 3 default partitioning strategies: // 1. dynamic load balance partitioning for indexable data source (IList and arrays) // 2. static partitioning for indexable data source (IList and arrays) // 3. dynamic load balance partitioning for enumerables. Enumerables have indexes, which are the natural order // of elements, but enuemrators are not indexable // - data source of type IList/arrays have both dynamic and static partitioning, as 1 and 3. // We assume that the source data of IList/Array is not changing concurrently. // - data source of type IEnumerable can only be partitioned dynamically (load-balance) // - Dynamic partitioning methods 1 and 3 are same, both being dynamic and load-balance. But the // implementation is different for data source of IList/Array vs. IEnumerable: // * When the source collection is IList/Arrays, we use Interlocked on the shared index; // * When the source collection is IEnumerable, we use Monitor to wrap around the access to the source // enumerator. /// /// Provides common partitioning strategies for arrays, lists, and enumerables. /// ////// [HostProtection(Synchronization = true, ExternalThreading = true)] public static class Partitioner { ////// The static methods on ///are all thread-safe and may be used concurrently /// from multiple threads. However, while a created partitioner is in use, the underlying data source /// should not be modified, whether from the same thread that's using a partitioner or from a separate /// thread. /// /// Creates an orderable partitioner from an ////// instance. /// Type of the elements in source list. /// The list to be partitioned. /// /// A Boolean value that indicates whether the created partitioner should dynamically /// load balance between partitions rather than statically partition. /// ////// An orderable partitioner based on the input list. /// public static OrderablePartitionerCreate (IList list, bool loadBalance) { if (list == null) { throw new ArgumentNullException("list"); } if (loadBalance) { return (new DynamicPartitionerForIList (list)); } else { return (new StaticIndexRangePartitionerForIList (list)); } } /// /// Creates an orderable partitioner from a ///instance. /// Type of the elements in source array. /// The array to be partitioned. /// /// A Boolean value that indicates whether the created partitioner should dynamically load balance /// between partitions rather than statically partition. /// ////// An orderable partitioner based on the input array. /// public static OrderablePartitionerCreate (TSource[] array, bool loadBalance) { // This implementation uses 'ldelem' instructions for element retrieval, rather than using a // method call. if (array == null) { throw new ArgumentNullException("array"); } if (loadBalance) { return (new DynamicPartitionerForArray (array)); } else { return (new StaticIndexRangePartitionerForArray (array)); } } /// /// Creates an orderable partitioner from a ///instance. /// Type of the elements in source enumerable. /// The enumerable to be partitioned. ////// An orderable partitioner based on the input array. /// ////// The ordering used in the created partitioner is determined by the natural order of the elements /// as retrieved from the source enumerable. /// public static OrderablePartitionerCreate (IEnumerable source) { return Create (source, -1); } // Internal version that allows user to specify the maxChunkSize, rather than using the default. // Used by range partitioning methods to insure that only one range at a time is chunked. // A maxChunkSize of -1 means "use the default". internal static OrderablePartitioner Create (IEnumerable source, int maxChunkSize) { if (source == null) { throw new ArgumentNullException("source"); } // Sanity checks. If and when we make this method public, these should be converted to exceptions. Contract.Assert(maxChunkSize != 0, "maxChunkSize specified as 0."); Contract.Assert((maxChunkSize == -1) || (maxChunkSize < (1 << 29)), "maxChunkSize out of range"); return (new DynamicPartitionerForIEnumerable (source, maxChunkSize)); } #if !PFX_LEGACY_3_5 /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. ///A partitioner. ///The public static OrderablePartitionerargument is /// less than or equal to the argument. > Create(long fromInclusive, long toExclusive) { // How many chunks do we want to divide the range into? If this is 1, then the // answer is "one chunk per core". Generally, though, you'll achieve better // load balancing on a busy system if you make it higher than 1. int coreOversubscriptionRate = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); long rangeSize = (toExclusive - fromInclusive) / (Environment.ProcessorCount * coreOversubscriptionRate); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. /// The size of each subrange. ///A partitioner. ///The ///argument is /// less than or equal to the argument. The public static OrderablePartitionerargument is /// less than or equal to 0. > Create(long fromInclusive, long toExclusive, long rangeSize) { if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize"); return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time } // Private method to parcel out range tuples. private static IEnumerable > CreateRanges(long fromInclusive, long toExclusive, long rangeSize) { // Enumerate all of the ranges long from, to; bool shouldQuit = false; for (long i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize) { from = i; try { checked { to = i + rangeSize; } } catch (OverflowException) { to = toExclusive; shouldQuit = true; } if (to > toExclusive) to = toExclusive; yield return new Tuple (from, to); } } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. ///A partitioner. ///The public static OrderablePartitionerargument is /// less than or equal to the argument. > Create(int fromInclusive, int toExclusive) { // How many chunks do we want to divide the range into? If this is 1, then the // answer is "one chunk per core". Generally, though, you'll achieve better // load balancing on a busy system if you make it higher than 1. int coreOversubscriptionRate = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); int rangeSize = (toExclusive - fromInclusive) / (Environment.ProcessorCount * coreOversubscriptionRate); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time } /// Creates a partitioner that chunks the user-specified range. /// The lower, inclusive bound of the range. /// The upper, exclusive bound of the range. /// The size of each subrange. ///A partitioner. ///The ///argument is /// less than or equal to the argument. The public static OrderablePartitionerargument is /// less than or equal to 0. > Create(int fromInclusive, int toExclusive, int rangeSize) { if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); if (rangeSize <= 0) throw new ArgumentOutOfRangeException("rangeSize"); return Partitioner.Create(CreateRanges(fromInclusive, toExclusive, rangeSize), 1); // chunk one range at a time } // Private method to parcel out range tuples. private static IEnumerable > CreateRanges(int fromInclusive, int toExclusive, int rangeSize) { // Enumerate all of the ranges int from, to; bool shouldQuit = false; for (int i = fromInclusive; (i < toExclusive) && !shouldQuit; i += rangeSize) { from = i; try { checked { to = i + rangeSize; } } catch (OverflowException) { to = toExclusive; shouldQuit = true; } if (to > toExclusive) to = toExclusive; yield return new Tuple (from, to); } } #endif #region DynamicPartitionEnumerator_Abstract class /// /// DynamicPartitionEnumerator_Abstract defines the enumerator for each partition for the dynamic load-balance /// partitioning algorithm. /// - Partition is an enumerator of KeyValuePairs, each corresponding to an item in the data source: /// the key is the index in the source collection; the value is the item itself. /// - a set of such partitions share a reader over data source. The type of the reader is specified by /// TSourceReader. /// - each partition requests a contiguous chunk of elements at a time from the source data. The chunk /// size is initially 1, and doubles every time until it reaches the maximum chunk size. /// The implementation for GrabNextChunk() method has two versions: one for data source of IndexRange /// types (IList and the array), one for data source of IEnumerable. /// - The method "Reset" is not supported for any partitioning algorithm. /// - The implementation for MoveNext() method is same for all dynanmic partitioners, so we provide it /// in this abstract class. /// ///Type of the elements in the data source ///Type of the reader on the data source //TSourceReader is // - IList, when source data is IList , the shared reader is source data itself // - TSource[], when source data is TSource[], the shared reader is source data itself // - IEnumerator , when source data is IEnumerable , and the shared reader is an // enumerator of the source data private abstract class DynamicPartitionEnumerator_Abstract : IEnumerator > { //----------------- common fields and constructor for all dynamic partitioners ----------------- //--- shared by all dervied class with souce data type: IList, Array, and IEnumerator protected readonly TSourceReader m_sharedReader; protected static int s_defaultMaxChunkSize = GetDefaultChunkSize (); //deferred allocating in MoveNext() with initial value 0, to avoid false sharing //we also use the fact that: (m_currentChunkSize==null) means MoveNext is never called on this enumerator protected Shared m_currentChunkSize; //deferring allocation in MoveNext() with initial value -1, to avoid false sharing protected Shared m_localOffset; private const int CHUNK_DOUBLING_RATE = 3; // Double the chunk size every this many grabs private int m_doublingCountdown; // Number of grabs remaining until chunk size doubles protected readonly int m_maxChunkSize; // Max chunk size specified by caller, or s_defaultMaxChunkSize // m_sharedIndex shared by this set of partitions, and particularly when m_sharedReader is IEnuerable // it serves as tracking of the natual order of elements in m_sharedReader // the value of this field is passed in from outside (already initialized) by the constructor, protected readonly Shared m_sharedIndex; protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex) : this(sharedReader, sharedIndex, -1) { } protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, Shared sharedIndex, int maxChunkSize) { Contract.Assert((maxChunkSize == -1) || (maxChunkSize > 0), "maxChunkSize 0 or < -1"); m_sharedReader = sharedReader; m_sharedIndex = sharedIndex; if (maxChunkSize == -1) m_maxChunkSize = s_defaultMaxChunkSize; else m_maxChunkSize = maxChunkSize; } // ---------------- abstract method declarations -------------- /// /// Abstract method to request a contiguous chunk of elements from the source collection /// /// specified number of elements requested ////// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// //GrabNextChunk does the following: // - grab # of requestedChunkSize elements from source data through shared reader, // - at the time of function returns, m_currentChunkSize is updated with the number of // elements actually got ----gined (<=requestedChunkSize). // - GrabNextChunk returns true if at least one element is assigned to this partition; // false if the shared reader already hits the last element of the source data before // we call GrabNextChunk protected abstract bool GrabNextChunk(int requestedChunkSize); ////// Abstract property, returns whether or not the shared reader has already read the last /// element of the source data /// protected abstract bool HasNoElementsLeft { get; set; } ////// Get the current element in the current partition. Property required by IEnumerator interface /// This property is abstract because the implementation is different depending on the type /// of the source data: IList, Array or IEnumerable /// public abstract KeyValuePairCurrent { get; } /// /// Dispose is abstract, and depends on the type of the source data: /// - For source data type IList and Array, the type of the shared reader is just the dataitself. /// We don't do anything in Dispose method for IList and Array. /// - For source data type IEnumerable, the type of the shared reader is an enumerator we created. /// Thus we need to dispose this shared reader enumerator, when there is no more active partitions /// left. /// public abstract void Dispose(); ////// Reset on partitions is not supported /// public void Reset() { throw new NotSupportedException(); } ////// Get the current element in the current partition. Property required by IEnumerator interface /// Object IEnumerator.Current { get { return ((DynamicPartitionEnumerator_Abstract)this).Current; } } /// /// Moves to the next element if any. /// Try current chunk first, if the current chunk do not have any elements left, then we /// attempt to grab a chunk from the source collection. /// ////// true if successfully moving to the next position; /// false otherwise, if and only if there is no more elements left in the current chunk /// AND the source collection is exhausted. /// public bool MoveNext() { //perform deferred allocating of the local variables. if (m_localOffset == null) { Contract.Assert(m_currentChunkSize == null); m_localOffset = new Shared(-1); m_currentChunkSize = new Shared (0); m_doublingCountdown = CHUNK_DOUBLING_RATE; } if (m_localOffset.Value < m_currentChunkSize.Value - 1) //attempt to grab the next element from the local chunk { m_localOffset.Value++; return true; } else //otherwise it means we exhausted the local chunk //grab a new chunk from the source enumerator { Contract.Assert(m_localOffset.Value == m_currentChunkSize.Value - 1); //set the requested chunk size to a proper value int requestedChunkSize; if (m_currentChunkSize.Value == 0) //first time grabbing from source enumerator { requestedChunkSize = 1; } else if (m_doublingCountdown > 0) { requestedChunkSize = m_currentChunkSize.Value; } else { requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize); m_doublingCountdown = CHUNK_DOUBLING_RATE; // reset } // Decrement your doubling countdown m_doublingCountdown--; Contract.Assert(requestedChunkSize > 0 && requestedChunkSize <= m_maxChunkSize); //GrabNextChunk will update the value of m_currentChunkSize if (GrabNextChunk(requestedChunkSize)) { Contract.Assert(m_currentChunkSize.Value <= requestedChunkSize && m_currentChunkSize.Value > 0); m_localOffset.Value = 0; return true; } else { return false; } } } } #endregion #region Dynamic Partitioner for source data of IEnuemrable<> type /// /// Inherits from DynamicPartitioners /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance /// of EnumerableOfPartitionsForIEnumerator defined internally /// ///Type of elements in the source data private class DynamicPartitionerForIEnumerable: OrderablePartitioner { IEnumerable m_source; int m_maxChunkSize; // a value of -1 means "use default" //constructor internal DynamicPartitionerForIEnumerable(IEnumerable source, int maxChunkSize) : base(true, false, true) { m_source = source; m_maxChunkSize = maxChunkSize; } /// /// Overrides OrderablePartitioner.GetOrderablePartitions. /// Partitions the underlying collection into the given number of orderable partitions. /// /// number of partitions requested ///A list containing override public IListenumerators. >> GetOrderablePartitions(int partitionCount) { if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } IEnumerator >[] partitions = new IEnumerator >[partitionCount]; IEnumerable > partitionEnumerable = new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize); for (int i = 0; i < partitionCount; i++) { partitions[i] = partitionEnumerable.GetEnumerator(); } return partitions; } /// /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions /// ///a enumerable collection of orderable partitions override public IEnumerable> GetOrderableDynamicPartitions() { return new InternalPartitionEnumerable(m_source.GetEnumerator(), m_maxChunkSize); } /// /// Whether additional partitions can be created dynamically. /// override public bool SupportsDynamicPartitions { get { return true; } } #region Internal classes: InternalPartitionEnumerable, InternalPartitionEnumerator ////// Provides customized implementation for source data of IEnumerable /// Different from the counterpart for IList/Array, this enumerable maintains several additional fields /// shared by the partitions it owns, including a boolean "m_hasNoElementsLef", a shared lock, and a /// shared count "m_activePartitionCount" /// private class InternalPartitionEnumerable : IEnumerable>, IDisposable { //reader through which we access the source data private readonly IEnumerator m_sharedReader; private Shared m_sharedIndex;//initial value -1 //fields shared by all partitions that this Enumerable owns private Shared m_hasNoElementsLeft;//deferring allocation by enumerator //shared synchronization lock, created by this Enumerable private object m_sharedLock;//deferring allocation by enumerator private bool m_disposed; private Shared m_activePartitionCount; private readonly int m_maxChunkSize; internal InternalPartitionEnumerable(IEnumerator sharedReader, int maxChunkSize) { m_sharedReader = sharedReader; m_sharedIndex = new Shared (-1); m_hasNoElementsLeft = new Shared (false); m_sharedLock = new object(); m_activePartitionCount = new Shared (0); m_maxChunkSize = maxChunkSize; } public IEnumerator > GetEnumerator() { if (m_disposed) { throw new ObjectDisposedException(Environment.GetResourceString("PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed")); } else { return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex, m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount, this, m_maxChunkSize); } } IEnumerator IEnumerable.GetEnumerator() { return ((InternalPartitionEnumerable)this).GetEnumerator(); } public void Dispose() { if (!m_disposed) { m_disposed = true; m_sharedReader.Dispose(); } } } /// /// Inherits from DynamicPartitionEnumerator_Abstract directly /// Provides customized implementation for: GrabNextChunk, HasNoElementsLeft, Current, Dispose /// private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract> { //---- fields ---- //cached local copy of the current chunk private KeyValuePair [] m_localList; //defer allocating to avoid false sharing // the values of the following two fields are passed in from // outside(already initialized) by the constructor, private readonly Shared m_hasNoElementsLeft; private readonly object m_sharedLock; private readonly Shared m_activePartitionCount; private InternalPartitionEnumerable m_enumerable; //constructor internal InternalPartitionEnumerator( IEnumerator sharedReader, Shared sharedIndex, Shared hasNoElementsLeft, object sharedLock, Shared activePartitionCount, InternalPartitionEnumerable enumerable, int maxChunkSize) : base(sharedReader, sharedIndex, maxChunkSize) { m_hasNoElementsLeft = hasNoElementsLeft; m_sharedLock = sharedLock; m_enumerable = enumerable; m_activePartitionCount = activePartitionCount; Interlocked.Increment(ref m_activePartitionCount.Value); } //overriding methods /// /// Reserves a contiguous range of elements from source data /// /// specified number of elements requested ////// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// override protected bool GrabNextChunk(int requestedChunkSize) { Contract.Assert(requestedChunkSize > 0); if (HasNoElementsLeft) { return false; } else { lock (m_sharedLock) { if (HasNoElementsLeft) { return false; } else { try { int actualChunkSize; //enumerate over source data until either we got #requestedChunkSize of elements or //MoveNext returns false for (actualChunkSize = 0; actualChunkSize < requestedChunkSize; actualChunkSize++) { if (m_sharedReader.MoveNext()) { //defer allocating to avoid false sharing if (m_localList == null) { m_localList = new KeyValuePair[m_maxChunkSize]; } Contract.Assert(m_sharedIndex != null); //already been allocated in MoveNext() before calling GrabNextChunk m_sharedIndex.Value = checked(m_sharedIndex.Value + 1); m_localList[actualChunkSize] = new KeyValuePair (m_sharedIndex.Value, m_sharedReader.Current); } else { //if MoveNext() return false, we set the flag to inform other partitions HasNoElementsLeft = true; break; } } if (actualChunkSize > 0) { m_currentChunkSize.Value = actualChunkSize; return true; } else { return false; } } catch { // If an exception occurs, don't let the other enumerators try to enumerate. // NOTE: this could instead throw an InvalidOperationException, but that would be unexpected // and not helpful to the end user. We know the root cause is being communicated already.) HasNoElementsLeft = true; throw; } } } } } /// /// Returns whether or not the shared reader has already read the last /// element of the source data /// ////// We cannot call m_sharedReader.MoveNext(), to see if it hits the last element /// or not, because we can't undo MoveNext(). Thus we need to maintain a shared /// boolean value m_hasNoElementsLeft across all partitions /// override protected bool HasNoElementsLeft { get { return m_hasNoElementsLeft.Value; } set { //we only set it from false to true once //we should never set it back in any circumstances Contract.Assert(value); Contract.Assert(!m_hasNoElementsLeft.Value); m_hasNoElementsLeft.Value = true; } } override public KeyValuePairCurrent { get { //verify that MoveNext is at least called once before Current is called if (m_currentChunkSize == null) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Contract.Assert(m_localList != null); Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); return (m_localList[m_localOffset.Value]); } } /// /// If the current partition is to be disposed, we decrement the number of active partitions /// for the shared reader. /// If the number of active partitions becomes 0, we need to dispose the shared reader we created /// override public void Dispose() { if (Interlocked.Decrement(ref m_activePartitionCount.Value) == 0) { m_enumerable.Dispose(); } } } #endregion } #endregion #region Dynamic Partitioner for source data of IndexRange types (IList<> and Array<>) ////// Dynamic load-balance partitioner. This class is abstract and to be derived from by /// the customized partitioner classes for IList, Array, and IEnumerable /// ///Type of the elements in the source data ///Type of the source data collection private abstract class DynamicPartitionerForIndexRange_Abstract: OrderablePartitioner { // TCollection can be: IList , TSource[] and IEnumerable // Derived classes specify TCollection, and implement the abstract method GetOrderableDynamicPartitions_Factory accordingly TCollection m_data; /// /// Constructs a new orderable partitioner /// /// source data collection protected DynamicPartitionerForIndexRange_Abstract(TCollection data) : base(true, false, true) { m_data = data; } ////// Partition the source data and create an enumerable over the resulting partitions. /// /// the source data collection ///an enumerable of partitions of protected abstract IEnumerable> GetOrderableDynamicPartitions_Factory(TCollection data); /// /// Overrides OrderablePartitioner.GetOrderablePartitions. /// Partitions the underlying collection into the given number of orderable partitions. /// /// number of partitions requested ///A list containing override public IListenumerators. >> GetOrderablePartitions(int partitionCount) { if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } IEnumerator >[] partitions = new IEnumerator >[partitionCount]; IEnumerable > partitionEnumerable = GetOrderableDynamicPartitions_Factory(m_data); for (int i = 0; i < partitionCount; i++) { partitions[i] = partitionEnumerable.GetEnumerator(); } return partitions; } /// /// Overrides OrderablePartitioner.GetOrderableDyanmicPartitions /// ///a enumerable collection of orderable partitions override public IEnumerable> GetOrderableDynamicPartitions() { return GetOrderableDynamicPartitions_Factory(m_data); } /// /// Whether additional partitions can be created dynamically. /// override public bool SupportsDynamicPartitions { get { return true; } } } ////// Defines dynamic partition for source data of IList and Array. /// This class inherits DynamicPartitionEnumerator_Abstract /// - implements GrabNextChunk, HasNoElementsLeft, and Dispose methods for IList and Array /// - Current property still remains abstract, implementation is different for IList and Array /// - introduces another abstract method SourceCount, which returns the number of elements in /// the source data. Implementation differs for IList and Array /// ///Type of the elements in the data source ///Type of the reader on the source data private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract: DynamicPartitionEnumerator_Abstract { //fields protected int m_startIndex; //initially zero //constructor protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, Shared sharedIndex) : base(sharedReader, sharedIndex) { } //abstract methods //the Current property is still abstract, and will be implemented by derived classes //we add another abstract method SourceCount to get the number of elements from the source reader /// /// Get the number of elements from the source reader. /// It calls IList.Count or Array.Length /// protected abstract int SourceCount { get; } //overriding methods ////// Reserves a contiguous range of elements from source data /// /// specified number of elements requested ////// true if we successfully reserved at least one element (up to #=requestedChunkSize) /// false if all elements in the source collection have been reserved. /// override protected bool GrabNextChunk(int requestedChunkSize) { Contract.Assert(requestedChunkSize > 0); while (!HasNoElementsLeft) { Contract.Assert(m_sharedIndex != null); long oldSharedIndex = m_sharedIndex.Value; if (HasNoElementsLeft) { //HasNoElementsLeft situation changed from false to true immediately //and oldSharedIndex becomes stale return false; } //there won't be overflow, because the index of IList/array is int, and we //have casted it to long. long newSharedIndex = Math.Min(SourceCount - 1, oldSharedIndex + requestedChunkSize); //the following CAS, if successful, reserves a chunk of elements [oldSharedIndex+1, newSharedIndex] //inclusive in the source collection if (Interlocked.CompareExchange(ref m_sharedIndex.Value, newSharedIndex, oldSharedIndex) == oldSharedIndex) { //set up local indexes. //m_currentChunkSize is always set to requestedChunkSize when source data had //enough elements of what we requested m_currentChunkSize.Value = (int)(newSharedIndex - oldSharedIndex); m_localOffset.Value = -1; m_startIndex = (int)(oldSharedIndex + 1); return true; } } //didn't get any element, return false; return false; } ////// Returns whether or not the shared reader has already read the last /// element of the source data /// override protected bool HasNoElementsLeft { get { Contract.Assert(m_sharedIndex != null); return m_sharedIndex.Value >= SourceCount - 1; } set { Contract.Assert(false); } } ////// For source data type IList and Array, the type of the shared reader is just the data itself. /// We don't do anything in Dispose method for IList and Array. /// override public void Dispose() { } } ////// Inherits from DynamicPartitioners /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance /// of EnumerableOfPartitionsForIList defined internally /// ///Type of elements in the source data private class DynamicPartitionerForIList: DynamicPartitionerForIndexRange_Abstract > { //constructor internal DynamicPartitionerForIList(IList source) : base(source) { } //override methods override protected IEnumerable > GetOrderableDynamicPartitions_Factory(IList m_data) { //m_data itself serves as shared reader return new InternalPartitionEnumerable(m_data); } /// /// Inherits from PartitionList_Abstract /// Provides customized implementation for source data of IList /// private class InternalPartitionEnumerable : IEnumerable> { //reader through which we access the source data private readonly IList m_sharedReader; private Shared m_sharedIndex; internal InternalPartitionEnumerable(IList sharedReader) { m_sharedReader = sharedReader; m_sharedIndex = new Shared (-1); } public IEnumerator > GetEnumerator() { return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); } IEnumerator IEnumerable.GetEnumerator() { return ((InternalPartitionEnumerable)this).GetEnumerator(); } } /// /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract /// Provides customized implementation of SourceCount property and Current property for IList /// private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract> { //constructor internal InternalPartitionEnumerator(IList sharedReader, Shared sharedIndex) : base(sharedReader, sharedIndex) { } //overriding methods override protected int SourceCount { get { return m_sharedReader.Count; } } /// /// return a KeyValuePair of the current element and its key /// override public KeyValuePairCurrent { get { //verify that MoveNext is at least called once before Current is called if (m_currentChunkSize == null) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); return new KeyValuePair (m_startIndex + m_localOffset.Value, m_sharedReader[m_startIndex + m_localOffset.Value]); } } } } /// /// Inherits from DynamicPartitioners /// Provides customized implementation of GetOrderableDynamicPartitions_Factory method, to return an instance /// of EnumerableOfPartitionsForArray defined internally /// ///Type of elements in the source data private class DynamicPartitionerForArray: DynamicPartitionerForIndexRange_Abstract { //constructor internal DynamicPartitionerForArray(TSource[] source) : base(source) { } //override methods override protected IEnumerable > GetOrderableDynamicPartitions_Factory(TSource[] m_data) { return new InternalPartitionEnumerable(m_data); } /// /// Inherits from PartitionList_Abstract /// Provides customized implementation for source data of Array /// private class InternalPartitionEnumerable : IEnumerable> { //reader through which we access the source data private readonly TSource[] m_sharedReader; private Shared m_sharedIndex; internal InternalPartitionEnumerable(TSource[] sharedReader) { m_sharedReader = sharedReader; m_sharedIndex = new Shared (-1); } IEnumerator IEnumerable.GetEnumerator() { return ((InternalPartitionEnumerable)this).GetEnumerator(); } public IEnumerator > GetEnumerator() { return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex); } } /// /// Inherits from DynamicPartitionEnumeratorForIndexRange_Abstract /// Provides customized implementation of SourceCount property and Current property for Array /// private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract{ //constructor internal InternalPartitionEnumerator(TSource[] sharedReader, Shared sharedIndex) : base(sharedReader, sharedIndex) { } //overriding methods override protected int SourceCount { get { return m_sharedReader.Length; } } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_currentChunkSize == null) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Contract.Assert(m_localOffset.Value >= 0 && m_localOffset.Value < m_currentChunkSize.Value); return new KeyValuePair (m_startIndex + m_localOffset.Value, m_sharedReader[m_startIndex + m_localOffset.Value]); } } } } #endregion #region Static partitioning for IList and Array, abstract classes /// /// Static partitioning over IList. /// - dynamic and load-balance /// - Keys are ordered within each partition /// - Keys are ordered across partitions /// - Keys are normalized /// - Number of partitions is fixed once specified, and the elements of the source data are /// distributed to each partition as evenly as possible. /// ///type of the elements ///Type of the source data collection private abstract class StaticIndexRangePartitioner: OrderablePartitioner { protected StaticIndexRangePartitioner() : base(true, true, true) { } /// /// Abstract method to return the number of elements in the source data /// protected abstract int SourceCount { get; } ////// Abstract method to create a partition that covers a range over source data, /// starting from "startIndex", ending at "endIndex" /// /// start index of the current partition on the source data /// end index of the current partition on the source data ///a partition enumerator over the specified range // The partitioning algorithm is implemented in GetOrderablePartitions method // This method delegates according to source data type IList/Array protected abstract IEnumerator> CreatePartition(int startIndex, int endIndex); /// /// Overrides OrderablePartitioner.GetOrderablePartitions /// Return a list of partitions, each of which enumerate a fixed part of the source data /// The elements of the source data are distributed to each partition as evenly as possible. /// Specifically, if the total number of elements is N, and number of partitions is x, and N = a*x +b, /// where a is the quotient, and b is the remainder. Then the first b partitions each has a + 1 elements, /// and the last x-b partitions each has a elements. /// For example, if N=10, x =3, then /// partition 0 ranges [0,3], /// partition 1 ranges [4,6], /// partition 2 ranges [7,9]. /// This also takes care of the situation of (x>N), the last x-N partitions are empty enumerators. /// An empty enumerator is indicated by /// (m_startIndex == list.Count && m_endIndex == list.Count -1) /// /// specified number of partitions ///a list of partitions override public IList>> GetOrderablePartitions(int partitionCount) { if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } int quotient, remainder; quotient = Math.DivRem(SourceCount, partitionCount, out remainder); IEnumerator >[] partitions = new IEnumerator >[partitionCount]; int lastEndIndex = -1; for (int i = 0; i < partitionCount; i++) { int startIndex = lastEndIndex + 1; if (i < remainder) { lastEndIndex = startIndex + quotient; } else { lastEndIndex = startIndex + quotient - 1; } partitions[i] = CreatePartition(startIndex, lastEndIndex); } return partitions; } } /// /// Static Partition for IList/Array. /// This class implements all methods required by IEnumerator interface, except for the Current property. /// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster element /// retrieval. /// //We assume the source collection is not being updated concurrently. Otherwise it will break the //static partitioning, since each partition operates on the source collection directly, it does //not have a local cache of the elements assigned to them. private abstract class StaticIndexRangePartition: IEnumerator > { //the start and end position in the source collection for the current partition //the partition is empty if and only if // (m_startIndex == m_data.Count && m_endIndex == m_data.Count-1) protected readonly int m_startIndex; protected readonly int m_endIndex; //the current index of the current partition while enumerating on the source collection protected volatile int m_offset; /// /// Constructs an instance of StaticIndexRangePartition /// /// the start index in the source collection for the current partition /// the end index in the source collection for the current partition protected StaticIndexRangePartition(int startIndex, int endIndex) { m_startIndex = startIndex; m_endIndex = endIndex; m_offset = startIndex - 1; } ////// Current Property is different for IList and Array. Arrays calls 'ldelem' instructions for faster /// element retrieval. /// public abstract KeyValuePairCurrent { get; } /// /// We don't dispose the source for IList and array /// public void Dispose() { } public void Reset() { throw new NotSupportedException(); } ////// Moves to the next item /// Before the first MoveNext is called: m_offset == m_startIndex-1; /// ///true if successful, false if there is no item left public bool MoveNext() { if (m_offset < m_endIndex) { m_offset++; return true; } else { //After we have enumerated over all elements, we set m_offset to m_endIndex +1. //The reason we do this is, for an empty enumerator, we need to tell the Current //property whether MoveNext has been called or not. //For an empty enumerator, it starts with (m_offset == m_startIndex-1 == m_endIndex), //and we don't set a new value to m_offset, then the above condition will always be //true, and the Current property will mistakenly assume MoveNext is never called. m_offset = m_endIndex + 1; return false; } } Object IEnumerator.Current { get { return ((StaticIndexRangePartition)this).Current; } } } #endregion #region Static partitioning for IList /// /// Inherits from StaticIndexRangePartitioner /// Provides customized implementation of SourceCount and CreatePartition /// ///private class StaticIndexRangePartitionerForIList : StaticIndexRangePartitioner > { IList m_list; internal StaticIndexRangePartitionerForIList(IList list) : base() { Contract.Assert(list != null); m_list = list; } override protected int SourceCount { get { return m_list.Count; } } override protected IEnumerator > CreatePartition(int startIndex, int endIndex) { return new StaticIndexRangePartitionForIList (m_list, startIndex, endIndex); } } /// /// Inherits from StaticIndexRangePartition /// Provides customized implementation of Current property /// ///private class StaticIndexRangePartitionForIList : StaticIndexRangePartition { //the source collection shared by all partitions private volatile IList m_list; internal StaticIndexRangePartitionForIList(IList list, int startIndex, int endIndex) : base(startIndex, endIndex) { Contract.Assert(startIndex >= 0 && endIndex <= list.Count - 1); m_list = list; } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_offset < m_startIndex) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); return (new KeyValuePair (m_offset, m_list[m_offset])); } } } #endregion #region static partitioning for Arrays /// /// Inherits from StaticIndexRangePartitioner /// Provides customized implementation of SourceCount and CreatePartition for Array /// private class StaticIndexRangePartitionerForArray: StaticIndexRangePartitioner { TSource[] m_array; internal StaticIndexRangePartitionerForArray(TSource[] array) : base() { Contract.Assert(array != null); m_array = array; } override protected int SourceCount { get { return m_array.Length; } } override protected IEnumerator > CreatePartition(int startIndex, int endIndex) { return new StaticIndexRangePartitionForArray (m_array, startIndex, endIndex); } } /// /// Inherits from StaticIndexRangePartitioner /// Provides customized implementation of SourceCount and CreatePartition /// private class StaticIndexRangePartitionForArray: StaticIndexRangePartition { //the source collection shared by all partitions private volatile TSource[] m_array; internal StaticIndexRangePartitionForArray(TSource[] array, int startIndex, int endIndex) : base(startIndex, endIndex) { Contract.Assert(startIndex >= 0 && endIndex <= array.Length - 1); m_array = array; } override public KeyValuePair Current { get { //verify that MoveNext is at least called once before Current is called if (m_offset < m_startIndex) { throw new InvalidOperationException(Environment.GetResourceString("PartitionerStatic_CurrentCalledBeforeMoveNext")); } Contract.Assert(m_offset >= m_startIndex && m_offset <= m_endIndex); return (new KeyValuePair (m_offset, m_array[m_offset])); } } } #endregion #region Utility functions /// /// A very simple primitive that allows us to share a value across multiple threads. /// ///private class Shared { internal TSource Value; internal Shared(TSource value) { this.Value = value; } } //-------------------- // The following part calculates the default chunk size. It is copied from System.Linq.Parallel.Scheduling, // because mscorlib.dll cannot access System.Linq.Parallel.Scheduling //-------------------- // The number of bytes we want "chunks" to be, when partitioning, etc. We choose 4 cache // lines worth, assuming 128b cache line. Most (popular) architectures use 64b cache lines, // but choosing 128b works for 64b too whereas a multiple of 64b isn't necessarily sufficient // for 128b cache systems. So 128b it is. private const int DEFAULT_BYTES_PER_CHUNK = 128 * 4; private static int GetDefaultChunkSize () { int chunkSize; if (typeof(TSource).IsValueType) { // @ if (typeof(TSource).StructLayoutAttribute.Value == LayoutKind.Explicit) { chunkSize = Math.Max(1, DEFAULT_BYTES_PER_CHUNK / Marshal.SizeOf(typeof(TSource))); } else { // We choose '128' because this ensures, no matter the actual size of the value type, // the total bytes used will be a multiple of 128. This ensures it's cache aligned. chunkSize = 128; } } else { Contract.Assert((DEFAULT_BYTES_PER_CHUNK % IntPtr.Size) == 0, "bytes per chunk should be a multiple of pointer size"); chunkSize = (DEFAULT_BYTES_PER_CHUNK / IntPtr.Size); } return chunkSize; } #endregion } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- MergeFilterQuery.cs
- Schema.cs
- StagingAreaInputItem.cs
- Int32AnimationBase.cs
- LogManagementAsyncResult.cs
- ComplexLine.cs
- MissingFieldException.cs
- CustomTypeDescriptor.cs
- DataGridViewCell.cs
- DeobfuscatingStream.cs
- Math.cs
- CodeAttributeArgument.cs
- AlphaSortedEnumConverter.cs
- Helpers.cs
- BCLDebug.cs
- Button.cs
- ContractsBCL.cs
- SqlDataReaderSmi.cs
- UpdateExpressionVisitor.cs
- ObjectTypeMapping.cs
- EventHandlersStore.cs
- DocumentEventArgs.cs
- UIServiceHelper.cs
- DataComponentNameHandler.cs
- FamilyTypefaceCollection.cs
- WebScriptEndpoint.cs
- OleDbEnumerator.cs
- WpfPayload.cs
- DecimalConverter.cs
- SystemIcons.cs
- DownloadProgressEventArgs.cs
- GregorianCalendarHelper.cs
- BuildResult.cs
- PersonalizationState.cs
- HttpListenerException.cs
- Compiler.cs
- BaseParagraph.cs
- SpnegoTokenAuthenticator.cs
- FontStyleConverter.cs
- FlowchartDesigner.xaml.cs
- X509Certificate2.cs
- CertificateManager.cs
- ListBindingHelper.cs
- HostProtectionPermission.cs
- ValueUnavailableException.cs
- StreamWithDictionary.cs
- ReturnType.cs
- Internal.cs
- TreeView.cs
- ListBindableAttribute.cs
- SendingRequestEventArgs.cs
- InputLanguageProfileNotifySink.cs
- FormatSettings.cs
- TextParagraphCache.cs
- TextFormatter.cs
- DictionaryTraceRecord.cs
- Bold.cs
- CustomAttribute.cs
- ClientConvert.cs
- CatalogZoneBase.cs
- ObjectAnimationUsingKeyFrames.cs
- XsdDataContractExporter.cs
- UIntPtr.cs
- ImageAnimator.cs
- TableCell.cs
- SQLBytes.cs
- CheckBoxDesigner.cs
- SecureEnvironment.cs
- ToolboxItemSnapLineBehavior.cs
- SerialStream.cs
- GridView.cs
- MappableObjectManager.cs
- HttpProtocolReflector.cs
- PhoneCall.cs
- OciHandle.cs
- GACIdentityPermission.cs
- EntitySetDataBindingList.cs
- TransactionProtocolConverter.cs
- MobileTextWriter.cs
- BitmapScalingModeValidation.cs
- IPAddress.cs
- ExceptionUtil.cs
- OperatingSystem.cs
- EmptyCollection.cs
- ChangeConflicts.cs
- ListViewItemSelectionChangedEvent.cs
- DbReferenceCollection.cs
- CellConstantDomain.cs
- BitmapSource.cs
- ExpressionVisitor.cs
- ImageMap.cs
- XmlSerializationReader.cs
- SqlColumnizer.cs
- ParserOptions.cs
- ProxySimple.cs
- WebPartDisplayModeCancelEventArgs.cs
- XmlSchemaImporter.cs
- XamlFrame.cs
- PropertyKey.cs
- XmlKeywords.cs