Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Threading / Tasks / ParallelRangeManager.cs / 1305376 / ParallelRangeManager.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ParallelRangeManager.cs // //[....] // // Implements the algorithm for distributing loop indices to parallel loop workers // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Threading; using System.Diagnostics.Contracts; #pragma warning disable 0420 namespace System.Threading.Tasks { ////// Utility class for allocating structs as heap variables /// internal class Shared{ internal T Value; internal Shared(T value) { this.Value = value; } } /// /// Represents an index range /// internal struct IndexRange { // the From and To values for this range. These do not change. internal long m_nFromInclusive; internal long m_nToExclusive; // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual // value saves us from overflows that can happen due to multiple workers racing to increment this. // All updates to this field need to be interlocked. internal Sharedm_nSharedCurrentIndexOffset; // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here. internal int m_bRangeFinished; } /// /// The RangeWorker struct wraps the state needed by a task that services the parallel loop /// internal struct RangeWorker { // reference to the IndexRange array allocated by the range manager internal readonly IndexRange[] m_indexRanges; // index of the current index range that this worker is grabbing chunks from internal int m_nCurrentIndexRange; // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager) internal long m_nStep; // increment value is the current amount that this worker will use // to increment the shared index of the range it's working on internal long m_nIncrementValue; // the increment value is doubled each time this worker finds work, and is capped at this value internal readonly long m_nMaxIncrementValue; ////// Initializes a RangeWorker struct /// internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep) { m_indexRanges = ranges; m_nCurrentIndexRange = nInitialRange; m_nStep = nStep; m_nIncrementValue = nStep; m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep; } ////// Implements the core work search algorithm that will be used for this range worker. /// /// /// Usage pattern is: /// 1) the thread associated with this rangeworker calls FindNewWork /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values /// to execute the sequential loop /// 3) if we return false it means there is no more work left. It's time to quit. /// internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal) { // since we iterate over index ranges circularly, we will use the // count of visited ranges as our exit condition int numIndexRangesToVisit = m_indexRanges.Length; do { // local snap to save array access bounds checks in places where we only read fields IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange]; if (currentRange.m_bRangeFinished == 0) { if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null) { Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null); } // this access needs to be on the array slot long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value, m_nIncrementValue) - m_nIncrementValue; if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset) { // we found work nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset; nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue; // Check for going past end of range, or wrapping if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) ) { nToExclusiveLocal = currentRange.m_nToExclusive; } // We will double our unit of increment until it reaches the maximum. if (m_nIncrementValue < m_nMaxIncrementValue) { m_nIncrementValue *= 2; if (m_nIncrementValue > m_nMaxIncrementValue) { m_nIncrementValue = m_nMaxIncrementValue; } } return true; } else { // this index range is completed, mark it so that others can skip it quickly Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1); } } // move on to the next index range, in circular order. m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length; numIndexRangesToVisit--; } while (numIndexRangesToVisit > 0); // we've visited all index ranges possible => there's no work remaining nFromInclusiveLocal = 0; nToExclusiveLocal = 0; return false; } /// /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values. /// internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32) { long nFromInclusiveLocal; long nToExclusiveLocal; bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal); Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) && (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue)); // convert to 32 bit before returning nFromInclusiveLocal32 = (int)nFromInclusiveLocal; nToExclusiveLocal32 = (int)nToExclusiveLocal; return bRetVal; } } ////// Represents the entire loop operation, keeping track of workers and ranges. /// /// /// The usage pattern is: /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a /// RangeWorker struct to wrap the state it will need to find and execute work, /// and they keep interacting with that struct until the end of the loop internal class RangeManager { internal readonly IndexRange[] m_indexRanges; internal int m_nCurrentIndexRangeToAssign; internal long m_nStep; ////// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges /// internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers) { m_nCurrentIndexRangeToAssign = 0; m_nStep = nStep; // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2. if (nNumExpectedWorkers == 1) nNumExpectedWorkers = 2; // // calculate the size of each index range // ulong uSpan = (ulong)(nToExclusive - nFromInclusive); ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep // otherwise index range transitions will derail us from nStep if (uRangeSize == 0) { uRangeSize = (ulong) nStep; } // // find the actual number of index ranges we will need // Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue); int nNumRanges = (int)(uSpan / uRangeSize); if (uSpan % uRangeSize != 0) { nNumRanges++; } // Convert to signed so the rest of the logic works. // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. long nRangeSize = (long)uRangeSize; // allocate the array of index ranges m_indexRanges = new IndexRange[nNumRanges]; long nCurrentIndex = nFromInclusive; for (int i = 0; i < nNumRanges; i++) { // the fromInclusive of the new index range is always on nCurrentIndex m_indexRanges[i].m_nFromInclusive = nCurrentIndex; m_indexRanges[i].m_nSharedCurrentIndexOffset = null; m_indexRanges[i].m_bRangeFinished = 0; // now increment it to find the toExclusive value for our range nCurrentIndex += nRangeSize; // detect integer overflow or range overage and snap to nToExclusive if (nCurrentIndex < nCurrentIndex - nRangeSize || nCurrentIndex > nToExclusive) { // this should only happen at the last index Contract.Assert(i == nNumRanges - 1); nCurrentIndex = nToExclusive; } // now that the end point of the new range is calculated, assign it. m_indexRanges[i].m_nToExclusive = nCurrentIndex; } } ////// The function that needs to be called by each new worker thread servicing the parallel loop /// in order to get a RangeWorker struct that wraps the state for finding and executing indices /// internal RangeWorker RegisterNewWorker() { Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0); int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length; return new RangeWorker(m_indexRanges, nInitialRange, m_nStep); } } } #pragma warning restore 0420 // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ParallelRangeManager.cs // //[....] // // Implements the algorithm for distributing loop indices to parallel loop workers // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Threading; using System.Diagnostics.Contracts; #pragma warning disable 0420 namespace System.Threading.Tasks { ////// Utility class for allocating structs as heap variables /// internal class Shared{ internal T Value; internal Shared(T value) { this.Value = value; } } /// /// Represents an index range /// internal struct IndexRange { // the From and To values for this range. These do not change. internal long m_nFromInclusive; internal long m_nToExclusive; // The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual // value saves us from overflows that can happen due to multiple workers racing to increment this. // All updates to this field need to be interlocked. internal Sharedm_nSharedCurrentIndexOffset; // to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here. internal int m_bRangeFinished; } /// /// The RangeWorker struct wraps the state needed by a task that services the parallel loop /// internal struct RangeWorker { // reference to the IndexRange array allocated by the range manager internal readonly IndexRange[] m_indexRanges; // index of the current index range that this worker is grabbing chunks from internal int m_nCurrentIndexRange; // the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager) internal long m_nStep; // increment value is the current amount that this worker will use // to increment the shared index of the range it's working on internal long m_nIncrementValue; // the increment value is doubled each time this worker finds work, and is capped at this value internal readonly long m_nMaxIncrementValue; ////// Initializes a RangeWorker struct /// internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep) { m_indexRanges = ranges; m_nCurrentIndexRange = nInitialRange; m_nStep = nStep; m_nIncrementValue = nStep; m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep; } ////// Implements the core work search algorithm that will be used for this range worker. /// /// /// Usage pattern is: /// 1) the thread associated with this rangeworker calls FindNewWork /// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values /// to execute the sequential loop /// 3) if we return false it means there is no more work left. It's time to quit. /// internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal) { // since we iterate over index ranges circularly, we will use the // count of visited ranges as our exit condition int numIndexRangesToVisit = m_indexRanges.Length; do { // local snap to save array access bounds checks in places where we only read fields IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange]; if (currentRange.m_bRangeFinished == 0) { if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null) { Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null); } // this access needs to be on the array slot long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value, m_nIncrementValue) - m_nIncrementValue; if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset) { // we found work nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset; nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue; // Check for going past end of range, or wrapping if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) ) { nToExclusiveLocal = currentRange.m_nToExclusive; } // We will double our unit of increment until it reaches the maximum. if (m_nIncrementValue < m_nMaxIncrementValue) { m_nIncrementValue *= 2; if (m_nIncrementValue > m_nMaxIncrementValue) { m_nIncrementValue = m_nMaxIncrementValue; } } return true; } else { // this index range is completed, mark it so that others can skip it quickly Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1); } } // move on to the next index range, in circular order. m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length; numIndexRangesToVisit--; } while (numIndexRangesToVisit > 0); // we've visited all index ranges possible => there's no work remaining nFromInclusiveLocal = 0; nToExclusiveLocal = 0; return false; } /// /// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values. /// internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32) { long nFromInclusiveLocal; long nToExclusiveLocal; bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal); Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) && (nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue)); // convert to 32 bit before returning nFromInclusiveLocal32 = (int)nFromInclusiveLocal; nToExclusiveLocal32 = (int)nToExclusiveLocal; return bRetVal; } } ////// Represents the entire loop operation, keeping track of workers and ranges. /// /// /// The usage pattern is: /// 1) The Parallel loop entry function (ForWorker) creates an instance of this class /// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a /// RangeWorker struct to wrap the state it will need to find and execute work, /// and they keep interacting with that struct until the end of the loop internal class RangeManager { internal readonly IndexRange[] m_indexRanges; internal int m_nCurrentIndexRangeToAssign; internal long m_nStep; ////// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges /// internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers) { m_nCurrentIndexRangeToAssign = 0; m_nStep = nStep; // Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2. if (nNumExpectedWorkers == 1) nNumExpectedWorkers = 2; // // calculate the size of each index range // ulong uSpan = (ulong)(nToExclusive - nFromInclusive); ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep // otherwise index range transitions will derail us from nStep if (uRangeSize == 0) { uRangeSize = (ulong) nStep; } // // find the actual number of index ranges we will need // Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue); int nNumRanges = (int)(uSpan / uRangeSize); if (uSpan % uRangeSize != 0) { nNumRanges++; } // Convert to signed so the rest of the logic works. // Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2. long nRangeSize = (long)uRangeSize; // allocate the array of index ranges m_indexRanges = new IndexRange[nNumRanges]; long nCurrentIndex = nFromInclusive; for (int i = 0; i < nNumRanges; i++) { // the fromInclusive of the new index range is always on nCurrentIndex m_indexRanges[i].m_nFromInclusive = nCurrentIndex; m_indexRanges[i].m_nSharedCurrentIndexOffset = null; m_indexRanges[i].m_bRangeFinished = 0; // now increment it to find the toExclusive value for our range nCurrentIndex += nRangeSize; // detect integer overflow or range overage and snap to nToExclusive if (nCurrentIndex < nCurrentIndex - nRangeSize || nCurrentIndex > nToExclusive) { // this should only happen at the last index Contract.Assert(i == nNumRanges - 1); nCurrentIndex = nToExclusive; } // now that the end point of the new range is calculated, assign it. m_indexRanges[i].m_nToExclusive = nCurrentIndex; } } ////// The function that needs to be called by each new worker thread servicing the parallel loop /// in order to get a RangeWorker struct that wraps the state for finding and executing indices /// internal RangeWorker RegisterNewWorker() { Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0); int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length; return new RangeWorker(m_indexRanges, nInitialRange, m_nStep); } } } #pragma warning restore 0420 // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- WebPartConnectionsEventArgs.cs
- coordinator.cs
- WorkflowExecutor.cs
- TemplateBindingExtensionConverter.cs
- InstanceDataCollection.cs
- HwndSourceKeyboardInputSite.cs
- CompositionTarget.cs
- SystemTcpStatistics.cs
- Image.cs
- CompilerState.cs
- ElementAtQueryOperator.cs
- ColorAnimationUsingKeyFrames.cs
- ComponentRenameEvent.cs
- ObjectManager.cs
- controlskin.cs
- DPCustomTypeDescriptor.cs
- CLRBindingWorker.cs
- CategoryList.cs
- VBIdentifierDesigner.xaml.cs
- ResourceManager.cs
- LifetimeServices.cs
- LazyTextWriterCreator.cs
- DataGridViewTextBoxCell.cs
- columnmapfactory.cs
- SQLByteStorage.cs
- WindowsComboBox.cs
- BulletDecorator.cs
- Size3D.cs
- CodeTypeReferenceSerializer.cs
- ObjectQueryProvider.cs
- ErrorStyle.cs
- StructureChangedEventArgs.cs
- WebPartZoneCollection.cs
- LinkClickEvent.cs
- MulticastIPAddressInformationCollection.cs
- Figure.cs
- _HTTPDateParse.cs
- Point3DValueSerializer.cs
- webclient.cs
- Lookup.cs
- Image.cs
- CipherData.cs
- BitVector32.cs
- CustomUserNameSecurityTokenAuthenticator.cs
- MailDefinitionBodyFileNameEditor.cs
- DocumentsTrace.cs
- CustomTypeDescriptor.cs
- TypeGeneratedEventArgs.cs
- DataBoundControlHelper.cs
- ConstraintCollection.cs
- Trace.cs
- DataControlFieldHeaderCell.cs
- HttpInputStream.cs
- MouseGestureValueSerializer.cs
- DataViewListener.cs
- FontConverter.cs
- ObjectAnimationUsingKeyFrames.cs
- ListManagerBindingsCollection.cs
- iisPickupDirectory.cs
- AutomationIdentifier.cs
- CopyOfAction.cs
- CursorInteropHelper.cs
- Win32PrintDialog.cs
- NumericUpDown.cs
- PointAnimationClockResource.cs
- MatrixValueSerializer.cs
- TableCellCollection.cs
- ComponentDispatcher.cs
- SqlBuffer.cs
- PolicyException.cs
- StatusBarPanel.cs
- ReadOnlyHierarchicalDataSource.cs
- HttpListener.cs
- MultipartIdentifier.cs
- CodeArrayCreateExpression.cs
- AuthenticateEventArgs.cs
- DefaultValueConverter.cs
- ResXFileRef.cs
- URLMembershipCondition.cs
- FormsAuthenticationUserCollection.cs
- FormatControl.cs
- ClientApiGenerator.cs
- latinshape.cs
- UrlMappingsSection.cs
- SQLInt32.cs
- MenuItemCollectionEditor.cs
- PolicyManager.cs
- AssemblyAttributesGoHere.cs
- CommonProperties.cs
- InstanceLockQueryResult.cs
- IncomingWebResponseContext.cs
- Baml6ConstructorInfo.cs
- PolicyException.cs
- TemplatePropertyEntry.cs
- CookieHandler.cs
- SettingsPropertyNotFoundException.cs
- SID.cs
- XPathDocumentIterator.cs
- NamespaceEmitter.cs
- MemoryMappedFile.cs