Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / BCL / System / Threading / Tasks / Parallel.cs / 1305376 / Parallel.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// Parallel.cs
//
// [....]
//
// A helper class that contains parallel versions of various looping constructs. This
// internally uses the task parallel library, but takes care to expose very little
// evidence of this infrastructure being used.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Security.Permissions;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Threading.Tasks
{
///
/// Stores options that configure the operation of methods on the
/// Parallel class.
///
///
/// By default, methods on the Parallel class attempt to utilize all available processors, are non-cancelable, and target
/// the default TaskScheduler (TaskScheduler.Default). enables
/// overriding these defaults.
///
public class ParallelOptions
{
private TaskScheduler m_scheduler;
private int m_maxDegreeOfParallelism;
private CancellationToken m_cancellationToken;
///
/// Initializes a new instance of the class.
///
///
/// This constructor initializes the instance with default values.
/// is initialized to -1, signifying that there is no upper bound set on how much parallelism should
/// be employed. is initialized to a non-cancelable token,
/// and is initialized to the default scheduler (TaskScheduler.Default).
/// All of these defaults may be overwritten using the property set accessors on the instance.
///
public ParallelOptions()
{
m_scheduler = TaskScheduler.Default;
m_maxDegreeOfParallelism = -1;
m_cancellationToken = CancellationToken.None;
}
///
/// Gets or sets the TaskScheduler
/// associated with this instance. Setting this property to null
/// indicates that the current scheduler should be used.
///
public TaskScheduler TaskScheduler
{
get { return m_scheduler; }
set { m_scheduler = value; }
}
// Convenience property used by TPL logic
internal TaskScheduler EffectiveTaskScheduler
{
get
{
if (m_scheduler == null) return TaskScheduler.Current;
else return m_scheduler;
}
}
///
/// Gets or sets the maximum degree of parallelism enabled by this ParallelOptions instance.
///
///
/// The limits the number of concurrent operations run by Parallel method calls that are passed this
/// ParallelOptions instance to the set value, if it is positive. If is -1, then there is no limit placed on the number of concurrently
/// running operations.
///
///
/// The exception that is thrown when this is set to 0 or some
/// value less than -1.
///
public int MaxDegreeOfParallelism
{
get { return m_maxDegreeOfParallelism; }
set
{
if ((value == 0) || (value < -1))
throw new ArgumentOutOfRangeException("MaxDegreeOfParallelism");
m_maxDegreeOfParallelism = value;
}
}
///
/// Gets or sets the CancellationToken
/// associated with this instance.
///
///
/// Providing a CancellationToken
/// to a Parallel method enables the operation to be
/// exited early. Code external to the operation may cancel the token, and if the operation observes the
/// token being set, it may exit early by throwing an
/// .
///
public CancellationToken CancellationToken
{
get { return m_cancellationToken; }
set
{
if (value == null)
throw new ArgumentNullException("CancellationToken");
m_cancellationToken = value;
}
}
internal int EffectiveMaxConcurrencyLevel
{
get
{
int rval = MaxDegreeOfParallelism;
int schedulerMax = EffectiveTaskScheduler.MaximumConcurrencyLevel;
if ((schedulerMax > 0) && (schedulerMax != Int32.MaxValue))
{
rval = (rval == -1) ? schedulerMax : Math.Min(schedulerMax, rval);
}
return rval;
}
}
}
///
/// Provides support for parallel loops and regions.
///
///
/// The class provides library-based data parallel replacements
/// for common operations such as for loops, for each loops, and execution of a set of statements.
///
[HostProtection(Synchronization = true, ExternalThreading = true)]
public static class Parallel
{
#if !FEATURE_PAL // PAL doesn't support eventing
// static counter for generating unique Fork/Join Context IDs to be used in ETW events
internal static int s_forkJoinContextID;
#endif
// We use a stride for loops to amortize the frequency of interlocked operations.
internal const int DEFAULT_LOOP_STRIDE = 16; // @
// Static variable to hold default parallel options
internal static ParallelOptions s_defaultParallelOptions = new ParallelOptions();
///
/// Executes each of the provided actions, possibly in parallel.
///
/// An array of Actions to execute.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// array contains a null element.
/// The exception that is thrown when any
/// action in the array throws an exception.
///
/// This method can be used to execute a set of operations, potentially in parallel.
/// No guarantees are made about the order in which the operations execute or whether
/// they execute in parallel. This method does not return until each of the
/// provided operations has completed, regardless of whether completion
/// occurs due to normal or exceptional termination.
///
public static void Invoke(params Action[] actions)
{
Invoke(s_defaultParallelOptions, actions);
}
///
/// Executes each of the provided actions, possibly in parallel.
///
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// An array of Actions to execute.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// array contains a null element.
/// The exception that is thrown when
/// the CancellationToken in the
/// is set.
/// The exception that is thrown when any
/// action in the array throws an exception.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
///
/// This method can be used to execute a set of operations, potentially in parallel.
/// No guarantees are made about the order in which the operations execute or whether
/// the they execute in parallel. This method does not return until each of the
/// provided operations has completed, regardless of whether completion
/// occurs due to normal or exceptional termination.
///
public static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
{
if (actions == null)
{
throw new ArgumentNullException("actions");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
// Throw an ODE if we're passed a disposed CancellationToken.
if(parallelOptions.CancellationToken.CanBeCanceled)
parallelOptions.CancellationToken.ThrowIfSourceDisposed();
// Quit early if we're already canceled -- avoid a bunch of work.
if (parallelOptions.CancellationToken.IsCancellationRequested)
throw new OperationCanceledException(parallelOptions.CancellationToken);
// We must validate that the actions array contains no null elements, and also
// make a defensive copy of the actions array.
Action[] actionsCopy = new Action[actions.Length];
for (int i = 0; i < actionsCopy.Length; i++)
{
actionsCopy[i] = actions[i];
if (actionsCopy[i] == null)
{
throw new ArgumentException(Environment.GetResourceString("Parallel_Invoke_ActionNull"));
}
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel Invoke Begin
int forkJoinContextID = 0;
Task callerTask = null;
if (TplEtwProvider.Log.IsEnabled())
{
forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID);
callerTask = Task.InternalCurrent;
TplEtwProvider.Log.ParallelInvokeBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelInvoke,
actionsCopy.Length);
}
#endif
#if DEBUG
actions = null; // Ensure we don't accidentally use this below.
#endif
// If we have no work to do, we are done.
if (actionsCopy.Length < 1) return;
// In the algorithm below, if the number of actions is greater than this, we automatically
// use Parallel.For() to handle the actions, rather than the Task-per-Action strategy.
const int SMALL_ACTIONCOUNT_LIMIT = 10;
try
{
// If we've gotten this far, it's time to process the actions.
if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) ||
(parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
{
// Used to hold any exceptions encountered during action processing
ConcurrentQueue exceptionQ = null; // will be lazily initialized if necessary
// This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism.
try
{
// Launch a self-replicating task to handle the execution of all actions.
// The use of a self-replicating task allows us to use as many cores
// as are available, and no more. The exception to this rule is
// that, in the case of a blocked action, the ThreadPool may inject
// extra threads, which means extra tasks can run.
int actionIndex = 0;
ParallelForReplicatingTask rootTask = new ParallelForReplicatingTask(parallelOptions, delegate
{
// Each for-task will pull an action at a time from the list
int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1
while(myIndex <= actionsCopy.Length)
{
// Catch and store any exceptions. If we don't catch them, the self-replicating
// task will exit, and that may cause other SR-tasks to exit.
// And (absent cancellation) we want all actions to execute.
try
{
actionsCopy[myIndex - 1]();
}
catch (Exception e)
{
LazyInitializer.EnsureInitialized>(ref exceptionQ, () => { return new ConcurrentQueue(); });
exceptionQ.Enqueue(e);
}
// Check for cancellation. If it is encountered, then exit the delegate.
if (parallelOptions.CancellationToken.IsCancellationRequested)
throw new OperationCanceledException(parallelOptions.CancellationToken);
// You're still in the game. Grab your next action index.
myIndex = Interlocked.Increment(ref actionIndex);
}
}, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating);
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
rootTask.Wait();
}
catch (Exception e)
{
LazyInitializer.EnsureInitialized>(ref exceptionQ, () => { return new ConcurrentQueue(); });
// Since we're consuming all action exceptions, there are very few reasons that
// we would see an exception here. Two that come to mind:
// (1) An OCE thrown by one or more actions (AggregateException thrown)
// (2) An exception thrown from the ParallelForReplicatingTask constructor
// (regular exception thrown).
// We'll need to cover them both.
AggregateException ae = e as AggregateException;
if (ae != null)
{
// Strip off outer container of an AggregateException, because downstream
// logic needs OCEs to be at the top level.
foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc);
}
else
{
exceptionQ.Enqueue(e);
}
}
// If we have encountered any exceptions, then throw.
if ((exceptionQ != null) && (exceptionQ.Count > 0))
{
ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken);
throw new AggregateException(exceptionQ);
}
}
else
{
// This is more efficient for a small number of actions and no DOP support
// Initialize our array of tasks, one per action.
Task[] tasks = new Task[actionsCopy.Length];
// One more check before we begin...
if (parallelOptions.CancellationToken.IsCancellationRequested)
throw new OperationCanceledException(parallelOptions.CancellationToken);
// Launch all actions as tasks
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None,
InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler);
}
// Now wait for the tasks to complete. This will not unblock until all of
// them complete, and it will throw an exception if one or more of them also
// threw an exception. We let such exceptions go completely unhandled.
try
{
if (tasks.Length <= 4)
{
// for 4 or less tasks, the sequential waitall version is faster
Task.FastWaitAll(tasks);
}
else
{
// otherwise we revert to the regular WaitAll which delegates the multiple wait to the cooperative event.
Task.WaitAll(tasks);
}
}
catch (AggregateException aggExp)
{
// see if we can combine it into a single OCE. If not propagate the original exception
ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken);
throw;
}
finally
{
for (int i = 0; i < tasks.Length; i++)
{
if(tasks[i].IsCompleted) tasks[i].Dispose();
}
}
}
}
finally
{
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel Invoke End
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelInvokeEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID);
}
#endif
}
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter.
///
public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForWorker(
fromInclusive, toExclusive,
s_defaultParallelOptions,
body, null, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter.
///
public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForWorker64(
fromInclusive, toExclusive, s_defaultParallelOptions,
body, null, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter.
///
public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForWorker(
fromInclusive, toExclusive, parallelOptions,
body, null, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter.
///
public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForWorker64(
fromInclusive, toExclusive, parallelOptions,
body, null, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32),
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
///
/// Calling ParallelLoopState.Break()
/// informs the For operation that iterations after the current one need not
/// execute. However, all iterations before the current one will still need to be executed if they have not already.
/// Therefore, calling Break is similar to using a break operation within a
/// conventional for loop in a language like C#, but it is not a perfect substitute: for example, there is no guarantee that iterations
/// after the current one will definitely not execute.
///
///
/// If executing all iterations before the current one is not necessary,
/// ParallelLoopState.Stop()
/// should be preferred to using Break. Calling Stop informs the For loop that it may abandon all remaining
/// iterations, regardless of whether they're for interations above or below the current,
/// since all required work has already been completed. As with Break, however, there are no guarantees regarding
/// which other iterations will not execute.
///
///
/// When a loop is ended prematurely, the that's returned will contain
/// relevant information about the loop's completion.
///
///
public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForWorker(
fromInclusive, toExclusive, s_defaultParallelOptions,
null, body, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64),
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForWorker64(
fromInclusive, toExclusive, s_defaultParallelOptions,
null, body, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32),
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForWorker(
fromInclusive, toExclusive, parallelOptions,
null, body, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The start index, inclusive.
/// The end index, exclusive.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64),
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions,
Action body)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForWorker64(
fromInclusive, toExclusive, parallelOptions,
null, body, null, null, null);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The type of the thread-local data.
/// The start index, inclusive.
/// The end index, exclusive.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32),
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult For(
int fromInclusive, int toExclusive,
Func localInit,
Func body,
Action localFinally)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
return ForWorker(
fromInclusive, toExclusive, s_defaultParallelOptions,
null, null, body, localInit, localFinally);
}
///
/// Executes a for loop in which iterations may run in parallel. Supports 64-bit indices.
///
/// The type of the thread-local data.
/// The start index, inclusive.
/// The end index, exclusive.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64),
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult For(
long fromInclusive, long toExclusive,
Func localInit,
Func body,
Action localFinally)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
return ForWorker64(
fromInclusive, toExclusive, s_defaultParallelOptions,
null, null, body, localInit, localFinally);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The type of the thread-local data.
/// The start index, inclusive.
/// The end index, exclusive.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32),
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult For(
int fromInclusive, int toExclusive, ParallelOptions parallelOptions,
Func localInit,
Func body,
Action localFinally)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForWorker(
fromInclusive, toExclusive, parallelOptions,
null, null, body, localInit, localFinally);
}
///
/// Executes a for loop in which iterations may run in parallel.
///
/// The type of the thread-local data.
/// The start index, inclusive.
/// The end index, exclusive.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each value in the iteration range:
/// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64),
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult For(
long fromInclusive, long toExclusive, ParallelOptions parallelOptions,
Func localInit,
Func body,
Action localFinally)
{
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForWorker64(
fromInclusive, toExclusive, parallelOptions,
null, null, body, localInit, localFinally);
}
///
/// Performs the major work of the parallel for loop. It assumes that argument validation has already
/// been performed by the caller. This function's whole purpose in life is to enable as much reuse of
/// common implementation details for the various For overloads we offer. Without it, we'd end up
/// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on
/// ParallelState, and (3) for loops with thread local data.
///
/// @
private static ParallelLoopResult ForWorker(
int fromInclusive, int toExclusive,
ParallelOptions parallelOptions,
Action body,
Action bodyWithState,
Func bodyWithLocal,
Func localInit, Action localFinally)
{
Contract.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1,
"expected exactly one body function to be supplied");
Contract.Assert(bodyWithLocal != null || (localInit == null && localFinally == null),
"thread local functions should only be supplied for loops w/ thread local bodies");
// Instantiate our result. Specifics will be filled in later.
ParallelLoopResult result = new ParallelLoopResult();
// We just return immediately if 'to' is smaller (or equal to) 'from'.
if (toExclusive <= fromInclusive)
{
result.m_completed = true;
return result;
}
long sharedIndex64 = fromInclusive;
// For all loops we need a shared flag even though we don't have a body with state,
// because the shared flag contains the exceptional bool, which triggers other workers
// to exit their loops if one worker catches an exception
ParallelLoopStateFlags32 sharedPStateFlags = new ParallelLoopStateFlags32();
TaskCreationOptions creationOptions = TaskCreationOptions.None;
InternalTaskOptions internalOptions = InternalTaskOptions.SelfReplicating;
// Before getting started, do a quick peek to see if we have been canceled already
if (parallelOptions.CancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException(parallelOptions.CancellationToken);
}
// initialize ranges with passed in loop arguments and expected number of workers
int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ?
Environment.ProcessorCount :
parallelOptions.EffectiveMaxConcurrencyLevel;
RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers);
// Keep track of any cancellations
OperationCanceledException oce = null;
CancellationTokenRegistration ctr = new CancellationTokenRegistration();
// if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) =>
{
// Cause processing to stop
sharedPStateFlags.Cancel();
// Record our cancellation
oce = new OperationCanceledException(parallelOptions.CancellationToken);
}, null);
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel For begin
int forkJoinContextID = 0;
Task callingTask = null;
if (TplEtwProvider.Log.IsEnabled())
{
forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID);
callingTask = Task.InternalCurrent;
TplEtwProvider.Log.ParallelLoopBegin((callingTask != null ? callingTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callingTask != null ? callingTask.Id : 0),
forkJoinContextID,TplEtwProvider.ForkJoinOperationType.ParallelFor,
fromInclusive, toExclusive);
}
#endif
ParallelForReplicatingTask rootTask = null; // eliminates "Use of unassigned local variable" compiler bug below.
try
{
// this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel
rootTask = new ParallelForReplicatingTask(
parallelOptions,
delegate
{
//
// first thing we do upon enterying the task is to register as a new "RangeWorker" with the
// shared RangeManager instance.
//
// If this call returns a RangeWorker struct which wraps the state needed by this task
//
// We need to call FindNewWork32() on it to see whether there's a chunk available.
//
// Cache some information about the current task
Task currentWorkerTask = Task.InternalCurrent;
bool bIsRootTask = (currentWorkerTask == rootTask);
RangeWorker currentWorker = new RangeWorker();
Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica;
if (savedStateFromPreviousReplica is RangeWorker)
currentWorker = (RangeWorker)savedStateFromPreviousReplica;
else
currentWorker = rangeManager.RegisterNewWorker();
// These are the local index values to be used in the sequential loop.
// Their values filled in by FindNewWork32
int nFromInclusiveLocal;
int nToExclusiveLocal;
if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false ||
sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true)
{
return; // no need to run
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for ParallelFor Worker Fork
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
#endif
TLocal localValue = default(TLocal);
bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't
try
{
// Create a new state object that references the shared "stopped" and "exceptional" flags
// If needed, it will contain a new instance of thread-local state by invoking the selector.
ParallelLoopState32 state = null;
if (bodyWithState != null)
{
Contract.Assert(sharedPStateFlags != null);
state = new ParallelLoopState32(sharedPStateFlags);
}
else if (bodyWithLocal != null)
{
Contract.Assert(sharedPStateFlags != null);
state = new ParallelLoopState32(sharedPStateFlags);
if (localInit != null)
{
localValue = localInit();
bLocalValueInitialized = true;
}
}
// initialize a loop timer which will help us decide whether we should exit early
LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount);
// Now perform the loop itself.
do
{
if (body != null)
{
for (int j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state
j += 1)
{
body(j);
}
}
else if (bodyWithState != null)
{
for (int j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop(j));
j += 1)
{
state.CurrentIteration = j;
bodyWithState(j, state);
}
}
else
{
for (int j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop(j));
j += 1)
{
state.CurrentIteration = j;
localValue = bodyWithLocal(j, state, localValue);
}
}
// Cooperative multitasking hack for AppDomain fairness.
// Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic
// will detect this, and queue up a replacement task. Note that we don't do this on the root task.
if ( !bIsRootTask && loopTimer.LimitExceeded())
{
currentWorkerTask.SavedStateForNextReplica = (object)currentWorker;
break;
}
}
// Exit if we can't find new work, or if the loop was stoppped.
while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) &&
((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) ||
!sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
}
catch
{
// if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow
sharedPStateFlags.SetExceptional();
throw;
}
finally
{
// If a cleanup function was specified, call it. Otherwise, if the type is
// IDisposable, we will invoke Dispose on behalf of the user.
if (localFinally != null && bLocalValueInitialized)
{
localFinally(localValue);
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for ParallelFor Worker Join
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
#endif
}
},
creationOptions, internalOptions);
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE
rootTask.Wait();
// If we made a cancellation registration, we need to clean it up now before observing the OCE
// Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// If we got through that with no exceptions, and we were canceled, then
// throw our cancellation exception
if (oce != null) throw oce;
}
catch (AggregateException aggExp)
{
// if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// see if we can combine it into a single OCE. If not propagate the original exception
ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken);
throw;
}
catch (TaskSchedulerException)
{
// if we made a cancellation registration, and rootTask.RunSynchronously threw, we need to clean it up here
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
throw;
}
finally
{
int sb_status = sharedPStateFlags.LoopStateFlags;
result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE);
if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0)
{
result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration;
}
if( (rootTask != null) && rootTask.IsCompleted) rootTask.Dispose();
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel For End
if (TplEtwProvider.Log.IsEnabled())
{
int nTotalIterations = 0;
// calculate how many iterations we ran in total
if (sb_status == ParallelLoopStateFlags.PLS_NONE)
nTotalIterations = toExclusive - fromInclusive;
else if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0)
nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive;
else
nTotalIterations = -1; //PLS_STOPPED! We can't determine this if we were stopped..
TplEtwProvider.Log.ParallelLoopEnd((callingTask != null ? callingTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callingTask != null ? callingTask.Id : 0),
forkJoinContextID, nTotalIterations);
}
#endif
}
return result;
}
///
/// Performs the major work of the 64-bit parallel for loop. It assumes that argument validation has already
/// been performed by the caller. This function's whole purpose in life is to enable as much reuse of
/// common implementation details for the various For overloads we offer. Without it, we'd end up
/// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on
/// ParallelState, and (3) for loops with thread local data.
///
/// @
private static ParallelLoopResult ForWorker64(
long fromInclusive, long toExclusive,
ParallelOptions parallelOptions,
Action body,
Action bodyWithState,
Func bodyWithLocal,
Func localInit, Action localFinally)
{
Contract.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1,
"expected exactly one body function to be supplied");
Contract.Assert(bodyWithLocal != null || (localInit == null && localFinally == null),
"thread local functions should only be supplied for loops w/ thread local bodies");
// Instantiate our result. Specifics will be filled in later.
ParallelLoopResult result = new ParallelLoopResult();
// We just return immediately if 'to' is smaller (or equal to) 'from'.
if (toExclusive <= fromInclusive)
{
result.m_completed = true;
return result;
}
long sharedIndex = fromInclusive;
// For all loops we need a shared flag even though we don't have a body with state,
// because the shared flag contains the exceptional bool, which triggers other workers
// to exit their loops if one worker catches an exception
ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64();
TaskCreationOptions creationOptions = TaskCreationOptions.None;
InternalTaskOptions internalOptions = InternalTaskOptions.SelfReplicating;
// Before getting started, do a quick peek to see if we have been canceled already
if (parallelOptions.CancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException(parallelOptions.CancellationToken);
}
// initialize ranges with passed in loop arguments and expected number of workers
int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ?
Environment.ProcessorCount :
parallelOptions.EffectiveMaxConcurrencyLevel;
RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers);
// Keep track of any cancellations
OperationCanceledException oce = null;
CancellationTokenRegistration ctr = new CancellationTokenRegistration();
// if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) =>
{
// Cause processing to stop
sharedPStateFlags.Cancel();
// Record our cancellation
oce = new OperationCanceledException(parallelOptions.CancellationToken);
}, null);
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel For begin
Task callerTask = null;
int forkJoinContextID = 0;
if (TplEtwProvider.Log.IsEnabled())
{
forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID);
callerTask = Task.InternalCurrent;
TplEtwProvider.Log.ParallelLoopBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelFor,
fromInclusive, toExclusive);
}
#endif
ParallelForReplicatingTask rootTask = null; // eliminates "Use of unassigned local variable" compiler bug below.
try
{
// this needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel
rootTask = new ParallelForReplicatingTask(
parallelOptions,
delegate
{
//
// first thing we do upon enterying the task is to register as a new "RangeWorker" with the
// shared RangeManager instance.
//
// If this call returns a RangeWorker struct which wraps the state needed by this task
//
// We need to call FindNewWork() on it to see whether there's a chunk available.
//
// Cache some information about the current task
Task currentWorkerTask = Task.InternalCurrent;
bool bIsRootTask = (currentWorkerTask == rootTask);
RangeWorker currentWorker = new RangeWorker();
Object savedStateFromPreviousReplica = currentWorkerTask.SavedStateFromPreviousReplica;
if (savedStateFromPreviousReplica is RangeWorker)
currentWorker = (RangeWorker)savedStateFromPreviousReplica;
else
currentWorker = rangeManager.RegisterNewWorker();
// These are the local index values to be used in the sequential loop.
// Their values filled in by FindNewWork
long nFromInclusiveLocal;
long nToExclusiveLocal;
if (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) == false ||
sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true)
{
return; // no need to run
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for ParallelFor Worker Fork
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
#endif
TLocal localValue = default(TLocal);
bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't
try
{
// Create a new state object that references the shared "stopped" and "exceptional" flags
// If needed, it will contain a new instance of thread-local state by invoking the selector.
ParallelLoopState64 state = null;
if (bodyWithState != null)
{
Contract.Assert(sharedPStateFlags != null);
state = new ParallelLoopState64(sharedPStateFlags);
}
else if (bodyWithLocal != null)
{
Contract.Assert(sharedPStateFlags != null);
state = new ParallelLoopState64(sharedPStateFlags);
// If a thread-local selector was supplied, invoke it. Otherwise, use the default.
if (localInit != null)
{
localValue = localInit();
bLocalValueInitialized = true;
}
}
// initialize a loop timer which will help us decide whether we should exit early
LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount);
// Now perform the loop itself.
do
{
if (body != null)
{
for (long j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state
j += 1)
{
body(j);
}
}
else if (bodyWithState != null)
{
for (long j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop(j));
j += 1)
{
state.CurrentIteration = j;
bodyWithState(j, state);
}
}
else
{
for (long j = nFromInclusiveLocal;
j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE // fast path check as SEL() doesn't inline
|| !sharedPStateFlags.ShouldExitLoop(j));
j += 1)
{
state.CurrentIteration = j;
localValue = bodyWithLocal(j, state, localValue);
}
}
// Cooperative multitasking hack for AppDomain fairness.
// Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic
// will detect this, and queue up a replacement task. Note that we don't do this on the root task.
if (!bIsRootTask && loopTimer.LimitExceeded())
{
currentWorkerTask.SavedStateForNextReplica = (object)currentWorker;
break;
}
}
// Exit if we can't find new work, or if the loop was stoppped.
while (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) &&
((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE) ||
!sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
}
catch
{
// if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow
sharedPStateFlags.SetExceptional();
throw;
}
finally
{
// If a cleanup function was specified, call it. Otherwise, if the type is
// IDisposable, we will invoke Dispose on behalf of the user.
if (localFinally != null && bLocalValueInitialized)
{
localFinally(localValue);
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for ParallelFor Worker Join
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
#endif
}
},
creationOptions, internalOptions);
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); // might throw TSE
rootTask.Wait();
// If we made a cancellation registration, we need to clean it up now before observing the OCE
// Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// If we got through that with no exceptions, and we were canceled, then
// throw our cancellation exception
if (oce != null) throw oce;
}
catch (AggregateException aggExp)
{
// if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// see if we can combine it into a single OCE. If not propagate the original exception
ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken);
throw;
}
catch (TaskSchedulerException)
{
// if we made a cancellation registration, and rootTask.RunSynchronously threw, we need to clean it up here
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
throw;
}
finally
{
int sb_status = sharedPStateFlags.LoopStateFlags;
result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE);
if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0)
{
result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration;
}
if( (rootTask != null) && rootTask.IsCompleted) rootTask.Dispose();
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel For End
if (TplEtwProvider.Log.IsEnabled())
{
long nTotalIterations = 0;
// calculate how many iterations we ran in total
if (sb_status == ParallelLoopStateFlags.PLS_NONE)
nTotalIterations = toExclusive - fromInclusive;
else if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0)
nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive;
else
nTotalIterations = -1; //PLS_STOPPED! We can't determine this if we were stopped..
TplEtwProvider.Log.ParallelLoopEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID, nTotalIterations);
}
#endif
}
return result;
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// An enumerable data source.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the current element as a parameter.
///
public static ParallelLoopResult ForEach(IEnumerable source, Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForEachWorker(
source, s_defaultParallelOptions, body, null, null, null, null, null, null);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// An enumerable data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the current element as a parameter.
///
public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForEachWorker(
source, parallelOptions, body, null, null, null, null, null, null);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// An enumerable data source.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
public static ParallelLoopResult ForEach(IEnumerable source, Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForEachWorker(
source, s_defaultParallelOptions, null, body, null, null, null, null, null);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// An enumerable data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForEachWorker(
source, parallelOptions, null, body, null, null, null, null, null);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// An enumerable data source.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and the current element's index (an Int64).
///
public static ParallelLoopResult ForEach(IEnumerable source, Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
return ForEachWorker(
source, s_defaultParallelOptions, null, null, body, null, null, null, null);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// An enumerable data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and the current element's index (an Int64).
///
public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForEachWorker(
source, parallelOptions, null, null, body, null, null, null, null);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// The type of the thread-local data.
/// An enumerable data source.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(IEnumerable source, Func localInit,
Func body, Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
return ForEachWorker(
source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// The type of the thread-local data.
/// An enumerable data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(IEnumerable source,
ParallelOptions parallelOptions, Func localInit,
Func body, Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForEachWorker(
source, parallelOptions, null, null, null, body, null, localInit, localFinally);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// The type of the thread-local data.
/// An enumerable data source.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, the current element's index (an Int64), and some local
/// state that may be shared amongst iterations that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(IEnumerable source, Func localInit,
Func body, Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
return ForEachWorker(
source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally);
}
///
/// Executes a for each operation on an
/// in which iterations may run in parallel.
///
/// The type of the data in the source.
/// The type of the thread-local data.
/// An enumerable data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The delegate is invoked once for each element in the
/// enumerable. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, the current element's index (an Int64), and some local
/// state that may be shared amongst iterations that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(IEnumerable source, ParallelOptions parallelOptions, Func localInit,
Func body, Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return ForEachWorker(
source, parallelOptions, null, null, null, null, body, localInit, localFinally);
}
///
/// Performs the major work of the parallel foreach loop. It assumes that argument validation has
/// already been performed by the caller. This function's whole purpose in life is to enable as much
/// reuse of common implementation details for the various For overloads we offer. Without it, we'd
/// end up with lots of duplicate code. It handles: (1) simple foreach loops, (2) foreach loops that
/// depend on ParallelState, and (3) foreach loops that access indices, (4) foreach loops with thread
/// local data, and any necessary permutations thereof.
///
/// @
private static ParallelLoopResult ForEachWorker(
IEnumerable source,
ParallelOptions parallelOptions,
Action body,
Action bodyWithState,
Action bodyWithStateAndIndex,
Func bodyWithStateAndLocal,
Func bodyWithEverything,
Func localInit, Action localFinally)
{
Contract.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) +
(bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1,
"expected exactly one body function to be supplied");
Contract.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null),
"thread local functions should only be supplied for loops w/ thread local bodies");
// Before getting started, do a quick peek to see if we have been canceled already
if (parallelOptions.CancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException(parallelOptions.CancellationToken);
}
// If it's an array, we can use a fast-path that uses ldelems in the IL.
TSource[] sourceAsArray = source as TSource[];
if (sourceAsArray != null)
{
return ForEachWorker(
sourceAsArray, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal,
bodyWithEverything, localInit, localFinally);
}
// If we can index into the list, we can use a faster code-path that doesn't result in
// contention for the single, shared enumerator object.
IList sourceAsList = source as IList;
if (sourceAsList != null)
{
return ForEachWorker(
sourceAsList, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal,
bodyWithEverything, localInit, localFinally);
}
// This is an honest-to-goodness IEnumerable. Wrap it in a Partitioner and defer to our
// ForEach(Partitioner) logic.
return PartitionerForEachWorker(Partitioner.Create(source), parallelOptions, body, bodyWithState,
bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally);
}
///
/// A fast path for the more general ForEachWorker method above. This uses ldelem instructions to
/// access the individual elements of the array, which will be faster.
///
/// The type of the source data.
/// The type of the local data.
/// An array data source.
/// The options to use for execution.
/// The simple loop body.
/// The loop body for ParallelState overloads.
/// The loop body for indexed/ParallelLoopState overloads.
/// The loop body for local/ParallelLoopState overloads.
/// The loop body for the most generic overload.
/// A selector function that returns new thread local state.
/// A cleanup function to destroy thread local state.
/// A structure.
private static ParallelLoopResult ForEachWorker(
TSource[] array,
ParallelOptions parallelOptions,
Action body,
Action bodyWithState,
Action bodyWithStateAndIndex,
Func bodyWithStateAndLocal,
Func bodyWithEverything,
Func localInit, Action localFinally)
{
Contract.Assert(array != null);
Contract.Assert(parallelOptions != null, "ForEachWorker(array): parallelOptions is null");
int from = array.GetLowerBound(0);
int to = array.GetUpperBound(0) + 1;
if (body != null)
{
return ForWorker(
from, to, parallelOptions, (i) => body(array[i]), null, null, null, null);
}
else if (bodyWithState != null)
{
return ForWorker(
from, to, parallelOptions, null, (i, state) => bodyWithState(array[i], state), null, null, null);
}
else if (bodyWithStateAndIndex != null)
{
return ForWorker(
from, to, parallelOptions, null, (i, state) => bodyWithStateAndIndex(array[i], state, i), null, null, null);
}
else if (bodyWithStateAndLocal != null)
{
return ForWorker(
from, to, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(array[i], state, local), localInit, localFinally);
}
else
{
return ForWorker(
from, to, parallelOptions, null, null, (i, state, local) => bodyWithEverything(array[i], state, i, local), localInit, localFinally);
}
}
///
/// A fast path for the more general ForEachWorker method above. This uses IList<T>'s indexer
/// capabilities to access the individual elements of the list rather than an enumerator.
///
/// The type of the source data.
/// The type of the local data.
/// A list data source.
/// The options to use for execution.
/// The simple loop body.
/// The loop body for ParallelState overloads.
/// The loop body for indexed/ParallelLoopState overloads.
/// The loop body for local/ParallelLoopState overloads.
/// The loop body for the most generic overload.
/// A selector function that returns new thread local state.
/// A cleanup function to destroy thread local state.
/// A structure.
private static ParallelLoopResult ForEachWorker(
IList list,
ParallelOptions parallelOptions,
Action body,
Action bodyWithState,
Action bodyWithStateAndIndex,
Func bodyWithStateAndLocal,
Func bodyWithEverything,
Func localInit, Action localFinally)
{
Contract.Assert(list != null);
Contract.Assert(parallelOptions != null, "ForEachWorker(list): parallelOptions is null");
if (body != null)
{
return ForWorker(
0, list.Count, parallelOptions, (i) => body(list[i]), null, null, null, null);
}
else if (bodyWithState != null)
{
return ForWorker(
0, list.Count, parallelOptions, null, (i, state) => bodyWithState(list[i], state), null, null, null);
}
else if (bodyWithStateAndIndex != null)
{
return ForWorker(
0, list.Count, parallelOptions, null, (i, state) => bodyWithStateAndIndex(list[i], state, i), null, null, null);
}
else if (bodyWithStateAndLocal != null)
{
return ForWorker(
0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(list[i], state, local), localInit, localFinally);
}
else
{
return ForWorker(
0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithEverything(list[i], state, i, local), localInit, localFinally);
}
}
// @
///
/// Executes a for each operation on a
/// Partitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The Partitioner that contains the original data source.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the Partitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the Partitioner return null.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner does not return
/// the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner returns an IList
/// with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() method in the Partitioner returns an
/// IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the current element as a parameter.
///
///
public static ParallelLoopResult ForEach(
Partitioner source,
Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
return PartitionerForEachWorker(source, s_defaultParallelOptions, body, null, null, null, null, null, null);
}
///
/// Executes a for each operation on a
/// Partitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The Partitioner that contains the original data source.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the Partitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the Partitioner return null.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner does not return
/// the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner returns an IList
/// with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() method in the Partitioner returns an
/// IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
///
public static ParallelLoopResult ForEach(
Partitioner source,
Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
return PartitionerForEachWorker(source, s_defaultParallelOptions, null, body, null, null, null, null, null);
}
///
/// Executes a for each operation on a
/// OrderablePartitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The OrderablePartitioner that contains the original data source.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when the
/// KeysNormalized property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the OrderablePartitioner return null.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner do not return the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner return an IList with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the
/// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and the current element's index (an Int64).
///
///
public static ParallelLoopResult ForEach(
OrderablePartitioner source,
Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (!source.KeysNormalized)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized"));
}
return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, body, null, null, null, null);
}
///
/// Executes a for each operation on a
/// Partitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The type of the thread-local data.
/// The Partitioner that contains the original data source.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the Partitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the Partitioner return null.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner does not return
/// the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner returns an IList
/// with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() method in the Partitioner returns an
/// IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(
Partitioner source,
Func localInit,
Func body,
Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally);
}
///
/// Executes a for each operation on a
/// OrderablePartitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The type of the thread-local data.
/// The OrderablePartitioner that contains the original data source.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when the
/// KeysNormalized property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the OrderablePartitioner return null.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner do not return the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner return an IList with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the
/// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, the current element's index (an Int64), and some local
/// state that may be shared amongst iterations that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(
OrderablePartitioner source,
Func localInit,
Func body,
Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (!source.KeysNormalized)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized"));
}
return PartitionerForEachWorker(source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally);
}
///
/// Executes a for each operation on a
/// Partitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The Partitioner that contains the original data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the Partitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the Partitioner return null.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner does not return
/// the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner returns an IList
/// with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() method in the Partitioner returns an
/// IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the current element as a parameter.
///
///
public static ParallelLoopResult ForEach(
Partitioner source,
ParallelOptions parallelOptions,
Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return PartitionerForEachWorker(source, parallelOptions, body, null, null, null, null, null, null);
}
///
/// Executes a for each operation on a
/// Partitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The Partitioner that contains the original data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the Partitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the Partitioner return null.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner does not return
/// the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner returns an IList
/// with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() method in the Partitioner returns an
/// IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// and a ParallelLoopState instance that may be
/// used to break out of the loop prematurely.
///
///
public static ParallelLoopResult ForEach(
Partitioner source,
ParallelOptions parallelOptions,
Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return PartitionerForEachWorker(source, parallelOptions, null, body, null, null, null, null, null);
}
///
/// Executes a for each operation on a
/// OrderablePartitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The OrderablePartitioner that contains the original data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The delegate that is invoked once per iteration.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when the
/// KeysNormalized property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the OrderablePartitioner return null.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner do not return the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner return an IList with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the
/// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and the current element's index (an Int64).
///
///
public static ParallelLoopResult ForEach(
OrderablePartitioner source,
ParallelOptions parallelOptions,
Action body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
if (!source.KeysNormalized)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized"));
}
return PartitionerForEachWorker(source, parallelOptions, null, null, body, null, null, null, null);
}
///
/// Executes a for each operation on a
/// Partitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The type of the thread-local data.
/// The Partitioner that contains the original data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the Partitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the Partitioner return null.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner does not return
/// the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() method in the Partitioner returns an IList
/// with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() method in the Partitioner returns an
/// IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, and some local state that may be shared amongst iterations
/// that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(
Partitioner source,
ParallelOptions parallelOptions,
Func localInit,
Func body,
Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
return PartitionerForEachWorker(source, parallelOptions, null, null, null, body, null, localInit, localFinally);
}
///
/// Executes a for each operation on a
/// OrderablePartitioner in which iterations may run in parallel.
///
/// The type of the elements in .
/// The type of the thread-local data.
/// The OrderablePartitioner that contains the original data source.
/// A ParallelOptions
/// instance that configures the behavior of this operation.
/// The function delegate that returns the initial state of the local data
/// for each thread.
/// The delegate that is invoked once per iteration.
/// The delegate that performs a final action on the local state of each
/// thread.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// argument is null.
/// The exception that is thrown when the
/// CancellationToken in the
/// argument is set
/// The exception that is thrown when the
/// SupportsDynamicPartitions property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when the
/// KeysNormalized property in the OrderablePartitioner returns
/// false.
/// The exception that is thrown when any
/// methods in the OrderablePartitioner return null.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner do not return the correct number of partitions.
/// The exception that is thrown when the
/// GetPartitions() or GetOrderablePartitions() methods in the
/// OrderablePartitioner return an IList with at least one null value.
/// The exception that is thrown when the
/// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the
/// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.
/// The exception that is thrown to contain an exception
/// thrown from one of the specified delegates.
/// The exception that is thrown when the
/// the CancellationTokenSource associated with the
/// the CancellationToken in the
/// has been disposed.
/// A ParallelLoopResult structure
/// that contains information on what portion of the loop completed.
///
///
/// The Partitioner is used to retrieve
/// the elements to be processed, in place of the original data source. If the current element's
/// index is desired, the source must be an
/// OrderablePartitioner .
///
///
/// The delegate is invoked once for each element in the
/// Partitioner. It is provided with the following parameters: the current element,
/// a ParallelLoopState instance that may be
/// used to break out of the loop prematurely, the current element's index (an Int64), and some local
/// state that may be shared amongst iterations that execute on the same thread.
///
///
/// The delegate is invoked once for each thread that participates in the loop's
/// execution and returns the initial local state for each of those threads. These initial states are passed to the first
/// invocations on each thread. Then, every subsequent body invocation returns a possibly
/// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value
/// that is passed to the delegate. The localFinally delegate is invoked once per thread to perform a final
/// action on each thread's local state.
///
///
public static ParallelLoopResult ForEach(
OrderablePartitioner source,
ParallelOptions parallelOptions,
Func localInit,
Func body,
Action localFinally)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
if (localInit == null)
{
throw new ArgumentNullException("localInit");
}
if (localFinally == null)
{
throw new ArgumentNullException("localFinally");
}
if (parallelOptions == null)
{
throw new ArgumentNullException("parallelOptions");
}
if (!source.KeysNormalized)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_OrderedPartitionerKeysNotNormalized"));
}
return PartitionerForEachWorker(source, parallelOptions, null, null, null, null, body, localInit, localFinally);
}
// Main worker method for Parallel.ForEach() calls w/ Partitioners.
private static ParallelLoopResult PartitionerForEachWorker(
Partitioner source, // Might be OrderablePartitioner
ParallelOptions parallelOptions,
Action simpleBody,
Action bodyWithState,
Action bodyWithStateAndIndex,
Func bodyWithStateAndLocal,
Func bodyWithEverything,
Func localInit,
Action localFinally)
{
Contract.Assert(((simpleBody == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) +
(bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1,
"PartitionForEach: expected exactly one body function to be supplied");
Contract.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null),
"PartitionForEach: thread local functions should only be supplied for loops w/ thread local bodies");
OrderablePartitioner orderedSource = source as OrderablePartitioner;
Contract.Assert((orderedSource != null) || (bodyWithStateAndIndex == null && bodyWithEverything == null),
"PartitionForEach: bodies with indices are only allowable for OrderablePartitioner");
if (!source.SupportsDynamicPartitions)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerNotDynamic"));
}
// Before getting started, do a quick peek to see if we have been canceled already
if (parallelOptions.CancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException(parallelOptions.CancellationToken);
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel For begin
int forkJoinContextID = 0;
Task callerTask = null;
if (TplEtwProvider.Log.IsEnabled())
{
forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID);
callerTask = Task.InternalCurrent;
TplEtwProvider.Log.ParallelLoopBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelForEach,
0, 0);
}
#endif
// For all loops we need a shared flag even though we don't have a body with state,
// because the shared flag contains the exceptional bool, which triggers other workers
// to exit their loops if one worker catches an exception
ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64();
// Instantiate our result. Specifics will be filled in later.
ParallelLoopResult result = new ParallelLoopResult();
// Keep track of any cancellations
OperationCanceledException oce = null;
CancellationTokenRegistration ctr = new CancellationTokenRegistration();
// if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr = parallelOptions.CancellationToken.InternalRegisterWithoutEC((o) =>
{
// Cause processing to stop
sharedPStateFlags.Cancel();
// Record our cancellation
oce = new OperationCanceledException(parallelOptions.CancellationToken);
}, null);
}
// Get our dynamic partitioner -- depends on whether source is castable to OrderablePartitioner
// Also, do some error checking.
IEnumerable partitionerSource = null;
IEnumerable> orderablePartitionerSource = null;
if (orderedSource != null)
{
orderablePartitionerSource = orderedSource.GetOrderableDynamicPartitions();
if (orderablePartitionerSource == null)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerReturnedNull"));
}
}
else
{
partitionerSource = source.GetDynamicPartitions();
if (partitionerSource == null)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_PartitionerReturnedNull"));
}
}
ParallelForReplicatingTask rootTask = null;
// This is the action that will be run by each replicable task.
Action partitionAction = delegate
{
Task currentWorkerTask = Task.InternalCurrent;
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for ParallelForEach Worker Fork
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelFork((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
#endif
TLocal localValue = default(TLocal);
bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't
try
{
// Create a new state object that references the shared "stopped" and "exceptional" flags.
// If needed, it will contain a new instance of thread-local state by invoking the selector.
ParallelLoopState64 state = null;
if (bodyWithState != null || bodyWithStateAndIndex != null)
{
state = new ParallelLoopState64(sharedPStateFlags);
}
else if (bodyWithStateAndLocal != null || bodyWithEverything != null)
{
state = new ParallelLoopState64(sharedPStateFlags);
// If a thread-local selector was supplied, invoke it. Otherwise, stick with the default.
if (localInit != null)
{
localValue = localInit();
bLocalValueInitialized = true;
}
}
bool bIsRootTask = (rootTask == currentWorkerTask);
// initialize a loop timer which will help us decide whether we should exit early
LoopTimer loopTimer = new LoopTimer(rootTask.ActiveChildCount);
if (orderedSource != null)
{
// Use this path for OrderablePartitioner
// first check if there's saved state from a previous replica that we might be replacing.
// the only state to be passed down in such a transition is the enumerator
IEnumerator> myPartition = currentWorkerTask.SavedStateFromPreviousReplica as IEnumerator>;
if (myPartition == null)
{
// apparently we're a brand new replica, get a fresh enumerator from the partitioner
myPartition = orderablePartitionerSource.GetEnumerator();
if (myPartition == null)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_NullEnumerator"));
}
}
while (myPartition.MoveNext())
{
KeyValuePair kvp = myPartition.Current;
long index = kvp.Key;
TSource value = kvp.Value;
// Update our iteration index
if (state != null) state.CurrentIteration = index;
if (simpleBody != null)
simpleBody(value);
else if (bodyWithState != null)
bodyWithState(value, state);
else if (bodyWithStateAndIndex != null)
bodyWithStateAndIndex(value, state, index);
else if (bodyWithStateAndLocal != null)
localValue = bodyWithStateAndLocal(value, state, localValue);
else
localValue = bodyWithEverything(value, state, index, localValue);
if (sharedPStateFlags.ShouldExitLoop(index)) break;
// Cooperative multitasking hack for AppDomain fairness.
// Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic
// will detect this, and queue up a replacement task. Note that we don't do this on the root task.
if (!bIsRootTask && loopTimer.LimitExceeded())
{
currentWorkerTask.SavedStateForNextReplica = myPartition;
break;
}
}
}
else
{
// Use this path for Partitioner that is not OrderablePartitioner
// first check if there's saved state from a previous replica that we might be replacing.
// the only state to be passed down in such a transition is the enumerator
IEnumerator myPartition = currentWorkerTask.SavedStateFromPreviousReplica as IEnumerator;
if (myPartition == null)
{
// apparently we're a brand new replica, get a fresh enumerator from the partitioner
myPartition = partitionerSource.GetEnumerator();
if (myPartition == null)
{
throw new InvalidOperationException(Environment.GetResourceString("Parallel_ForEach_NullEnumerator"));
}
}
// I'm not going to try to maintain this
if (state != null)
state.CurrentIteration = 0;
while (myPartition.MoveNext())
{
TSource t = myPartition.Current;
if (simpleBody != null)
simpleBody(t);
else if (bodyWithState != null)
bodyWithState(t, state);
else if (bodyWithStateAndLocal != null)
localValue = bodyWithStateAndLocal(t, state, localValue);
else
Contract.Assert(false, "PartitionerForEach: illegal body type in Partitioner handler");
// Any break, stop or exception causes us to halt
// We don't have the global indexing information to discriminate whether or not
// we are before or after a break point.
if (sharedPStateFlags.LoopStateFlags != ParallelLoopStateFlags.PLS_NONE)
break;
// Cooperative multitasking hack for AppDomain fairness.
// Check if allowed loop time is exceeded, if so save current state and return. The self replicating task logic
// will detect this, and queue up a replacement task. Note that we don't do this on the root task.
if (!bIsRootTask && loopTimer.LimitExceeded())
{
currentWorkerTask.SavedStateForNextReplica = myPartition;
break;
}
}
}
}
catch
{
// Inform other tasks of the exception, then rethrow
sharedPStateFlags.SetExceptional();
throw;
}
finally
{
if (localFinally != null && bLocalValueInitialized)
{
localFinally(localValue);
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for ParallelFor Worker Join
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelJoin((currentWorkerTask != null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (currentWorkerTask != null ? currentWorkerTask.Id : 0),
forkJoinContextID);
}
#endif
}
};
try
{
// Create and start the self-replicating task.
// This needs to be in try-block because it can throw in BuggyScheduler.MaxConcurrencyLevel
rootTask = new ParallelForReplicatingTask(parallelOptions, partitionAction, TaskCreationOptions.None,
InternalTaskOptions.SelfReplicating);
// And process it's completion...
// Moved inside try{} block because faulty scheduler may throw here.
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
rootTask.Wait();
// If we made a cancellation registration, we need to clean it up now before observing the OCE
// Otherwise we could be caught in the middle of a callback, and observe PLS_STOPPED, but oce = null
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// If we got through that with no exceptions, and we were canceled, then
// throw our cancellation exception
if (oce != null) throw oce;
}
catch (AggregateException aggExp)
{
// if we made a cancellation registration, and rootTask.Wait threw, we need to clean it up here
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
// see if we can combine it into a single OCE. If not propagate the original exception
ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken);
throw;
}
catch (TaskSchedulerException)
{
// if we made a cancellation registration, and either we threw an exception constructing rootTask or
// rootTask.RunSynchronously threw, we need to clean it up here.
if (parallelOptions.CancellationToken.CanBeCanceled)
{
ctr.Dispose();
}
throw;
}
finally
{
int sb_status = sharedPStateFlags.LoopStateFlags;
result.m_completed = (sb_status == ParallelLoopStateFlags.PLS_NONE);
if ((sb_status & ParallelLoopStateFlags.PLS_BROKEN) != 0)
{
result.m_lowestBreakIteration = sharedPStateFlags.LowestBreakIteration;
}
if( (rootTask != null) && rootTask.IsCompleted) rootTask.Dispose();
//dispose the partitioner source if it implements IDisposable
IDisposable d = null;
if (orderablePartitionerSource != null)
{
d = orderablePartitionerSource as IDisposable;
}
else
{
d = partitionerSource as IDisposable;
}
if (d != null)
{
d.Dispose();
}
#if !FEATURE_PAL // PAL doesn't support eventing
// ETW event for Parallel For End
if (TplEtwProvider.Log.IsEnabled())
{
TplEtwProvider.Log.ParallelLoopEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0),
forkJoinContextID, 0);
}
#endif
}
return result;
}
///
/// Internal utility function that implements the OCE filtering behavior for all Parallel.* APIs.
/// Throws a single OperationCancelledException object with the token if the Exception collection only contains
/// OperationCancelledExceptions with the given CancellationToken.
///
///
/// The exception collection to filter
/// The CancellationToken expected on all inner exceptions
///
internal static void ThrowIfReducableToSingleOCE(IEnumerable excCollection, CancellationToken ct)
{
bool bCollectionNotZeroLength = false;
if (ct.IsCancellationRequested)
{
foreach(Exception e in excCollection)
{
bCollectionNotZeroLength = true;
OperationCanceledException oce = e as OperationCanceledException;
if (oce == null || oce.CancellationToken != ct)
{
// mismatch found
return;
}
}
// all exceptions are OCEs with this token, let's throw it
if(bCollectionNotZeroLength) throw new OperationCanceledException(ct);
}
}
internal struct LoopTimer
{
public LoopTimer(int nWorkerTaskIndex)
{
// This logic ensures that we have a diversity of timeouts across worker tasks (100, 150, 200, 250, 100, etc)
// Otherwise all worker will try to timeout at precisely the same point, which is bad if the work is just about to finish
int timeOut = s_BaseNotifyPeriodMS + (nWorkerTaskIndex % Environment.ProcessorCount) * s_NotifyPeriodIncrementMS;
m_timeLimit = Environment.TickCount + timeOut;
}
public bool LimitExceeded()
{
Contract.Assert(m_timeLimit != 0, "Probably the default initializer for LoopTimer was used somewhere");
// comparing against the next expected time saves an addition operation here
// Also we omit the comparison for wrap around here. The only side effect is one extra early yield every 38 days.
return (Environment.TickCount > m_timeLimit);
}
const int s_BaseNotifyPeriodMS = 100;
const int s_NotifyPeriodIncrementMS = 50;
private int m_timeLimit;
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.