Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Threading / Tasks / ParallelRangeManager.cs / 1305376 / ParallelRangeManager.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ParallelRangeManager.cs
//
// [....]
//
// Implements the algorithm for distributing loop indices to parallel loop workers
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Threading;
using System.Diagnostics.Contracts;
#pragma warning disable 0420
namespace System.Threading.Tasks
{
///
/// Utility class for allocating structs as heap variables
///
internal class Shared
{
internal T Value;
internal Shared(T value)
{
this.Value = value;
}
}
///
/// Represents an index range
///
internal struct IndexRange
{
// the From and To values for this range. These do not change.
internal long m_nFromInclusive;
internal long m_nToExclusive;
// The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual
// value saves us from overflows that can happen due to multiple workers racing to increment this.
// All updates to this field need to be interlocked.
internal Shared m_nSharedCurrentIndexOffset;
// to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here.
internal int m_bRangeFinished;
}
///
/// The RangeWorker struct wraps the state needed by a task that services the parallel loop
///
internal struct RangeWorker
{
// reference to the IndexRange array allocated by the range manager
internal readonly IndexRange[] m_indexRanges;
// index of the current index range that this worker is grabbing chunks from
internal int m_nCurrentIndexRange;
// the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager)
internal long m_nStep;
// increment value is the current amount that this worker will use
// to increment the shared index of the range it's working on
internal long m_nIncrementValue;
// the increment value is doubled each time this worker finds work, and is capped at this value
internal readonly long m_nMaxIncrementValue;
///
/// Initializes a RangeWorker struct
///
internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep)
{
m_indexRanges = ranges;
m_nCurrentIndexRange = nInitialRange;
m_nStep = nStep;
m_nIncrementValue = nStep;
m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep;
}
///
/// Implements the core work search algorithm that will be used for this range worker.
///
///
/// Usage pattern is:
/// 1) the thread associated with this rangeworker calls FindNewWork
/// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values
/// to execute the sequential loop
/// 3) if we return false it means there is no more work left. It's time to quit.
///
internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal)
{
// since we iterate over index ranges circularly, we will use the
// count of visited ranges as our exit condition
int numIndexRangesToVisit = m_indexRanges.Length;
do
{
// local snap to save array access bounds checks in places where we only read fields
IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange];
if (currentRange.m_bRangeFinished == 0)
{
if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null)
{
Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null);
}
// this access needs to be on the array slot
long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value,
m_nIncrementValue) - m_nIncrementValue;
if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset)
{
// we found work
nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset;
nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue;
// Check for going past end of range, or wrapping
if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) )
{
nToExclusiveLocal = currentRange.m_nToExclusive;
}
// We will double our unit of increment until it reaches the maximum.
if (m_nIncrementValue < m_nMaxIncrementValue)
{
m_nIncrementValue *= 2;
if (m_nIncrementValue > m_nMaxIncrementValue)
{
m_nIncrementValue = m_nMaxIncrementValue;
}
}
return true;
}
else
{
// this index range is completed, mark it so that others can skip it quickly
Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1);
}
}
// move on to the next index range, in circular order.
m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length;
numIndexRangesToVisit--;
} while (numIndexRangesToVisit > 0);
// we've visited all index ranges possible => there's no work remaining
nFromInclusiveLocal = 0;
nToExclusiveLocal = 0;
return false;
}
///
/// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values.
///
internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32)
{
long nFromInclusiveLocal;
long nToExclusiveLocal;
bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal);
Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) &&
(nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue));
// convert to 32 bit before returning
nFromInclusiveLocal32 = (int)nFromInclusiveLocal;
nToExclusiveLocal32 = (int)nToExclusiveLocal;
return bRetVal;
}
}
///
/// Represents the entire loop operation, keeping track of workers and ranges.
///
///
/// The usage pattern is:
/// 1) The Parallel loop entry function (ForWorker) creates an instance of this class
/// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a
/// RangeWorker struct to wrap the state it will need to find and execute work,
/// and they keep interacting with that struct until the end of the loop
internal class RangeManager
{
internal readonly IndexRange[] m_indexRanges;
internal int m_nCurrentIndexRangeToAssign;
internal long m_nStep;
///
/// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges
///
internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
{
m_nCurrentIndexRangeToAssign = 0;
m_nStep = nStep;
// Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2.
if (nNumExpectedWorkers == 1)
nNumExpectedWorkers = 2;
//
// calculate the size of each index range
//
ulong uSpan = (ulong)(nToExclusive - nFromInclusive);
ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first
uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep
// otherwise index range transitions will derail us from nStep
if (uRangeSize == 0)
{
uRangeSize = (ulong) nStep;
}
//
// find the actual number of index ranges we will need
//
Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue);
int nNumRanges = (int)(uSpan / uRangeSize);
if (uSpan % uRangeSize != 0)
{
nNumRanges++;
}
// Convert to signed so the rest of the logic works.
// Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2.
long nRangeSize = (long)uRangeSize;
// allocate the array of index ranges
m_indexRanges = new IndexRange[nNumRanges];
long nCurrentIndex = nFromInclusive;
for (int i = 0; i < nNumRanges; i++)
{
// the fromInclusive of the new index range is always on nCurrentIndex
m_indexRanges[i].m_nFromInclusive = nCurrentIndex;
m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
m_indexRanges[i].m_bRangeFinished = 0;
// now increment it to find the toExclusive value for our range
nCurrentIndex += nRangeSize;
// detect integer overflow or range overage and snap to nToExclusive
if (nCurrentIndex < nCurrentIndex - nRangeSize ||
nCurrentIndex > nToExclusive)
{
// this should only happen at the last index
Contract.Assert(i == nNumRanges - 1);
nCurrentIndex = nToExclusive;
}
// now that the end point of the new range is calculated, assign it.
m_indexRanges[i].m_nToExclusive = nCurrentIndex;
}
}
///
/// The function that needs to be called by each new worker thread servicing the parallel loop
/// in order to get a RangeWorker struct that wraps the state for finding and executing indices
///
internal RangeWorker RegisterNewWorker()
{
Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0);
int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length;
return new RangeWorker(m_indexRanges, nInitialRange, m_nStep);
}
}
}
#pragma warning restore 0420
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ParallelRangeManager.cs
//
// [....]
//
// Implements the algorithm for distributing loop indices to parallel loop workers
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Threading;
using System.Diagnostics.Contracts;
#pragma warning disable 0420
namespace System.Threading.Tasks
{
///
/// Utility class for allocating structs as heap variables
///
internal class Shared
{
internal T Value;
internal Shared(T value)
{
this.Value = value;
}
}
///
/// Represents an index range
///
internal struct IndexRange
{
// the From and To values for this range. These do not change.
internal long m_nFromInclusive;
internal long m_nToExclusive;
// The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual
// value saves us from overflows that can happen due to multiple workers racing to increment this.
// All updates to this field need to be interlocked.
internal Shared m_nSharedCurrentIndexOffset;
// to be set to 1 by the worker that finishes this range. It's OK to do a non-interlocked write here.
internal int m_bRangeFinished;
}
///
/// The RangeWorker struct wraps the state needed by a task that services the parallel loop
///
internal struct RangeWorker
{
// reference to the IndexRange array allocated by the range manager
internal readonly IndexRange[] m_indexRanges;
// index of the current index range that this worker is grabbing chunks from
internal int m_nCurrentIndexRange;
// the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager)
internal long m_nStep;
// increment value is the current amount that this worker will use
// to increment the shared index of the range it's working on
internal long m_nIncrementValue;
// the increment value is doubled each time this worker finds work, and is capped at this value
internal readonly long m_nMaxIncrementValue;
///
/// Initializes a RangeWorker struct
///
internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep)
{
m_indexRanges = ranges;
m_nCurrentIndexRange = nInitialRange;
m_nStep = nStep;
m_nIncrementValue = nStep;
m_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep;
}
///
/// Implements the core work search algorithm that will be used for this range worker.
///
///
/// Usage pattern is:
/// 1) the thread associated with this rangeworker calls FindNewWork
/// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values
/// to execute the sequential loop
/// 3) if we return false it means there is no more work left. It's time to quit.
///
internal bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal)
{
// since we iterate over index ranges circularly, we will use the
// count of visited ranges as our exit condition
int numIndexRangesToVisit = m_indexRanges.Length;
do
{
// local snap to save array access bounds checks in places where we only read fields
IndexRange currentRange = m_indexRanges[m_nCurrentIndexRange];
if (currentRange.m_bRangeFinished == 0)
{
if (m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset == null)
{
Interlocked.CompareExchange(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset, new Shared(0), null);
}
// this access needs to be on the array slot
long nMyOffset = Interlocked.Add(ref m_indexRanges[m_nCurrentIndexRange].m_nSharedCurrentIndexOffset.Value,
m_nIncrementValue) - m_nIncrementValue;
if (currentRange.m_nToExclusive - currentRange.m_nFromInclusive > nMyOffset)
{
// we found work
nFromInclusiveLocal = currentRange.m_nFromInclusive + nMyOffset;
nToExclusiveLocal = nFromInclusiveLocal + m_nIncrementValue;
// Check for going past end of range, or wrapping
if ( (nToExclusiveLocal > currentRange.m_nToExclusive) || (nToExclusiveLocal < currentRange.m_nFromInclusive) )
{
nToExclusiveLocal = currentRange.m_nToExclusive;
}
// We will double our unit of increment until it reaches the maximum.
if (m_nIncrementValue < m_nMaxIncrementValue)
{
m_nIncrementValue *= 2;
if (m_nIncrementValue > m_nMaxIncrementValue)
{
m_nIncrementValue = m_nMaxIncrementValue;
}
}
return true;
}
else
{
// this index range is completed, mark it so that others can skip it quickly
Interlocked.Exchange(ref m_indexRanges[m_nCurrentIndexRange].m_bRangeFinished, 1);
}
}
// move on to the next index range, in circular order.
m_nCurrentIndexRange = (m_nCurrentIndexRange + 1) % m_indexRanges.Length;
numIndexRangesToVisit--;
} while (numIndexRangesToVisit > 0);
// we've visited all index ranges possible => there's no work remaining
nFromInclusiveLocal = 0;
nToExclusiveLocal = 0;
return false;
}
///
/// 32 bit integer version of FindNewWork. Assumes the ranges were initialized with 32 bit values.
///
internal bool FindNewWork32(out int nFromInclusiveLocal32, out int nToExclusiveLocal32)
{
long nFromInclusiveLocal;
long nToExclusiveLocal;
bool bRetVal = FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal);
Contract.Assert((nFromInclusiveLocal <= Int32.MaxValue) && (nFromInclusiveLocal >= Int32.MinValue) &&
(nToExclusiveLocal <= Int32.MaxValue) && (nToExclusiveLocal >= Int32.MinValue));
// convert to 32 bit before returning
nFromInclusiveLocal32 = (int)nFromInclusiveLocal;
nToExclusiveLocal32 = (int)nToExclusiveLocal;
return bRetVal;
}
}
///
/// Represents the entire loop operation, keeping track of workers and ranges.
///
///
/// The usage pattern is:
/// 1) The Parallel loop entry function (ForWorker) creates an instance of this class
/// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a
/// RangeWorker struct to wrap the state it will need to find and execute work,
/// and they keep interacting with that struct until the end of the loop
internal class RangeManager
{
internal readonly IndexRange[] m_indexRanges;
internal int m_nCurrentIndexRangeToAssign;
internal long m_nStep;
///
/// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges
///
internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
{
m_nCurrentIndexRangeToAssign = 0;
m_nStep = nStep;
// Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2.
if (nNumExpectedWorkers == 1)
nNumExpectedWorkers = 2;
//
// calculate the size of each index range
//
ulong uSpan = (ulong)(nToExclusive - nFromInclusive);
ulong uRangeSize = uSpan / (ulong) nNumExpectedWorkers; // rough estimate first
uRangeSize -= uRangeSize % (ulong) nStep; // snap to multiples of nStep
// otherwise index range transitions will derail us from nStep
if (uRangeSize == 0)
{
uRangeSize = (ulong) nStep;
}
//
// find the actual number of index ranges we will need
//
Contract.Assert((uSpan / uRangeSize) < Int32.MaxValue);
int nNumRanges = (int)(uSpan / uRangeSize);
if (uSpan % uRangeSize != 0)
{
nNumRanges++;
}
// Convert to signed so the rest of the logic works.
// Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2.
long nRangeSize = (long)uRangeSize;
// allocate the array of index ranges
m_indexRanges = new IndexRange[nNumRanges];
long nCurrentIndex = nFromInclusive;
for (int i = 0; i < nNumRanges; i++)
{
// the fromInclusive of the new index range is always on nCurrentIndex
m_indexRanges[i].m_nFromInclusive = nCurrentIndex;
m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
m_indexRanges[i].m_bRangeFinished = 0;
// now increment it to find the toExclusive value for our range
nCurrentIndex += nRangeSize;
// detect integer overflow or range overage and snap to nToExclusive
if (nCurrentIndex < nCurrentIndex - nRangeSize ||
nCurrentIndex > nToExclusive)
{
// this should only happen at the last index
Contract.Assert(i == nNumRanges - 1);
nCurrentIndex = nToExclusive;
}
// now that the end point of the new range is calculated, assign it.
m_indexRanges[i].m_nToExclusive = nCurrentIndex;
}
}
///
/// The function that needs to be called by each new worker thread servicing the parallel loop
/// in order to get a RangeWorker struct that wraps the state for finding and executing indices
///
internal RangeWorker RegisterNewWorker()
{
Contract.Assert(m_indexRanges != null && m_indexRanges.Length != 0);
int nInitialRange = (Interlocked.Increment(ref m_nCurrentIndexRangeToAssign) - 1) % m_indexRanges.Length;
return new RangeWorker(m_indexRanges, nInitialRange, m_nStep);
}
}
}
#pragma warning restore 0420
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- PathNode.cs
- SHA256.cs
- ClaimTypeElement.cs
- PerformanceCounterPermissionEntry.cs
- SafeMemoryMappedViewHandle.cs
- SafeNativeMethods.cs
- TrackingServices.cs
- XmlStreamStore.cs
- ByteAnimation.cs
- SharedRuntimeState.cs
- SettingsPropertyCollection.cs
- FontFaceLayoutInfo.cs
- CredentialCache.cs
- TouchPoint.cs
- StickyNoteHelper.cs
- EditingCoordinator.cs
- EntityDataSourceChangedEventArgs.cs
- XmlnsCache.cs
- RuntimeHandles.cs
- NameTable.cs
- HashCodeCombiner.cs
- CodePageEncoding.cs
- COAUTHINFO.cs
- EntityObject.cs
- CompressStream.cs
- Section.cs
- TraceContextRecord.cs
- TextBoxAutoCompleteSourceConverter.cs
- DesignBindingEditor.cs
- ConsoleTraceListener.cs
- RowToParametersTransformer.cs
- EpmCustomContentDeSerializer.cs
- Debug.cs
- EdmPropertyAttribute.cs
- ObjectStateEntryDbDataRecord.cs
- ProfilePropertySettings.cs
- ColorConvertedBitmapExtension.cs
- MeasurementDCInfo.cs
- WizardPanel.cs
- EncoderReplacementFallback.cs
- EpmSourcePathSegment.cs
- UnitySerializationHolder.cs
- NativeMethods.cs
- BlobPersonalizationState.cs
- ExpressionVisitor.cs
- Single.cs
- arc.cs
- DynamicDataManager.cs
- Privilege.cs
- XmlEncoding.cs
- StaticFileHandler.cs
- PrinterSettings.cs
- DesignerDataView.cs
- ThreadStartException.cs
- AuthenticationModulesSection.cs
- Point3DCollectionConverter.cs
- CommentAction.cs
- RelativeSource.cs
- StylusPointDescription.cs
- CodeTryCatchFinallyStatement.cs
- QueryRewriter.cs
- RelatedView.cs
- VarRemapper.cs
- ImageInfo.cs
- PeerTransportCredentialType.cs
- ControlValuePropertyAttribute.cs
- ToolBarButtonClickEvent.cs
- IntMinMaxAggregationOperator.cs
- CalendarDataBindingHandler.cs
- RtfToXamlLexer.cs
- TableCellCollection.cs
- EnvelopedPkcs7.cs
- SqlDeflator.cs
- LoginViewDesigner.cs
- UnsafeNativeMethods.cs
- XmlSchemaExternal.cs
- CustomCredentialPolicy.cs
- METAHEADER.cs
- HttpTransportElement.cs
- GPStream.cs
- FileDataSourceCache.cs
- CacheChildrenQuery.cs
- SafeTokenHandle.cs
- FontFamilyIdentifier.cs
- DataViewListener.cs
- DynamicPhysicalDiscoSearcher.cs
- SuppressMergeCheckAttribute.cs
- UInt32.cs
- UiaCoreProviderApi.cs
- IssuanceLicense.cs
- XamlFigureLengthSerializer.cs
- WebPartDescription.cs
- ProtocolImporter.cs
- TemplateField.cs
- ValuePatternIdentifiers.cs
- HeaderElement.cs
- DataGridViewRowEventArgs.cs
- StrokeNodeOperations2.cs
- DefaultMemberAttribute.cs
- DiscoveryEndpointElement.cs