Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / SpoolingTask.cs / 1305376 / SpoolingTask.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SpoolingTask.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// A factory class to execute spooling logic. /// internal static class SpoolingTask { //------------------------------------------------------------------------------------ // Creates and begins execution of a new spooling task. Executes synchronously, // and by the time this API has returned all of the results have been produced. // // Arguments: // groupState - values for inter-task communication // partitions - the producer enumerators // channels - the producer-consumer channels // taskScheduler - the task manager on which to execute // internal static void SpoolStopAndGo( QueryTaskGroupState groupState, PartitionedStream partitions, SynchronousChannel [] channels, TaskScheduler taskScheduler) { Contract.Assert(partitions.PartitionCount == channels.Length); Contract.Assert(groupState != null); // Ensure all tasks in this query are parented under a common root. Task rootTask = new Task( () => { int maxToRunInParallel = partitions.PartitionCount - 1; // A stop-and-go merge uses the current thread for one task and then blocks before // returning to the caller, until all results have been accumulated. We do this by // running the last partition on the calling thread. for (int i = 0; i < maxToRunInParallel; i++) { TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new StopAndGoSpoolingTask (i, groupState, partitions[i], channels[i]); asyncTask.RunAsynchronously(taskScheduler); } TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); // Run one task synchronously on the current thread. QueryTask syncTask = new StopAndGoSpoolingTask ( maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]); syncTask.RunSynchronously(taskScheduler); }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // We don't want to return until the task is finished. Run it on the calling thread. rootTask.RunSynchronously(taskScheduler); // Wait for the query to complete, propagate exceptions, and so on. // For pipelined queries, this step happens in the async enumerator. groupState.QueryEnd(false); } //----------------------------------------------------------------------------------- // Creates and begins execution of a new spooling task. Runs asynchronously. // // Arguments: // groupState - values for inter-task communication // partitions - the producer enumerators // channels - the producer-consumer channels // taskScheduler - the task manager on which to execute // internal static void SpoolPipeline ( QueryTaskGroupState groupState, PartitionedStream partitions, AsynchronousChannel [] channels, TaskScheduler taskScheduler) { Contract.Assert(partitions.PartitionCount == channels.Length); Contract.Assert(groupState != null); // Ensure all tasks in this query are parented under a common root. Because this // is a pipelined query, we detach it from the parent (to avoid blocking the calling // thread), and run the query on a separate thread. Task rootTask = new Task( () => { // Create tasks that will enumerate the partitions in parallel. Because we're pipelining, // we will begin running these tasks in parallel and then return. for (int i = 0; i < partitions.PartitionCount; i++) { TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new PipelineSpoolingTask (i, groupState, partitions[i], channels[i]); asyncTask.RunAsynchronously(taskScheduler); } }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // And schedule it for execution. This is done after beginning to ensure no thread tries to // end the query before its root task has been recorded properly. rootTask.Start(taskScheduler); // We don't call QueryEnd here; when we return, the query is still executing, and the // last enumerator to be disposed of will call QueryEnd for us. } //----------------------------------------------------------------------------------- // Creates and begins execution of a new spooling task. This is a for-all style // execution, meaning that the query will be run fully (for effect) before returning // and that there are no channels into which data will be queued. // // Arguments: // groupState - values for inter-task communication // partitions - the producer enumerators // taskScheduler - the task manager on which to execute // internal static void SpoolForAll ( QueryTaskGroupState groupState, PartitionedStream partitions, TaskScheduler taskScheduler) { Contract.Assert(groupState != null); // Ensure all tasks in this query are parented under a common root. Task rootTask = new Task( () => { int maxToRunInParallel = partitions.PartitionCount - 1; // Create tasks that will enumerate the partitions in parallel "for effect"; in other words, // no data will be placed into any kind of producer-consumer channel. for (int i = 0; i < maxToRunInParallel; i++) { TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new ForAllSpoolingTask (i, groupState, partitions[i]); asyncTask.RunAsynchronously(taskScheduler); } TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); // Run one task synchronously on the current thread. QueryTask syncTask = new ForAllSpoolingTask (maxToRunInParallel, groupState, partitions[maxToRunInParallel]); syncTask.RunSynchronously(taskScheduler); }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // We don't want to return until the task is finished. Run it on the calling thread. rootTask.RunSynchronously(taskScheduler); // Wait for the query to complete, propagate exceptions, and so on. // For pipelined queries, this step happens in the async enumerator. groupState.QueryEnd(false); } } /// /// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. /// ////// internal class StopAndGoSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. private QueryOperatorEnumerator m_source; // The destination channel into which data is placed. This can be null if we are // enumerating "for effect", e.g. forall loop. private SynchronousChannel m_destination; //----------------------------------------------------------------------------------- // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // internal StopAndGoSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator source, SynchronousChannel destination) : base(taskIndex, groupState) { Contract.Assert(source != null); m_source = source; m_destination = destination; } //------------------------------------------------------------------------------------ // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { // We just enumerate over the entire source data stream, placing each element // into the destination channel. TInputOutput current = default(TInputOutput); TIgnoreKey keyUnused = default(TIgnoreKey); QueryOperatorEnumerator source = m_source; SynchronousChannel destination = m_destination; CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken; destination.Init(); while (source.MoveNext(ref current, ref keyUnused)) { // If an abort has been requested, stop this worker immediately. if (cancelToken.IsCancellationRequested) { break; } destination.Enqueue(current); } } //----------------------------------------------------------------------------------- // Ensure we signal that the channel is complete. // protected override void SpoolingFinally() { // Call the base implementation. base.SpoolingFinally(); // Signal that we are done, in the case of asynchronous consumption. if (m_destination != null) { m_destination.SetDone(); } // Dispose of the source enumerator *after* signaling that the task is done. // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. m_source.Dispose(); } } /// /// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. /// ////// internal class PipelineSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. private QueryOperatorEnumerator m_source; // The destination channel into which data is placed. This can be null if we are // enumerating "for effect", e.g. forall loop. private AsynchronousChannel m_destination; //------------------------------------------------------------------------------------ // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // internal PipelineSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator source, AsynchronousChannel destination) : base(taskIndex, groupState) { Contract.Assert(source != null); m_source = source; m_destination = destination; } //------------------------------------------------------------------------------------ // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { // We just enumerate over the entire source data stream, placing each element // into the destination channel. TInputOutput current = default(TInputOutput); TIgnoreKey keyUnused = default(TIgnoreKey); QueryOperatorEnumerator source = m_source; AsynchronousChannel destination = m_destination; CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken; while (source.MoveNext(ref current, ref keyUnused)) { // If an abort has been requested, stop this worker immediately. if (cancelToken.IsCancellationRequested) { break; } destination.Enqueue(current); } // Flush remaining data to the query consumer in preparation for channel shutdown. destination.FlushBuffers(); } //----------------------------------------------------------------------------------- // Ensure we signal that the channel is complete. // protected override void SpoolingFinally() { // Call the base implementation. base.SpoolingFinally(); // Signal that we are done, in the case of asynchronous consumption. if (m_destination != null) { m_destination.SetDone(); } // Dispose of the source enumerator *after* signaling that the task is done. // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. m_source.Dispose(); } } /// /// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. /// ////// internal class ForAllSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. private QueryOperatorEnumerator m_source; //------------------------------------------------------------------------------------ // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // internal ForAllSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator source) : base(taskIndex, groupState) { Contract.Assert(source != null); m_source = source; } //----------------------------------------------------------------------------------- // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { // We just enumerate over the entire source data stream for effect. TInputOutput currentUnused = default(TInputOutput); TIgnoreKey keyUnused = default(TIgnoreKey); //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks while (m_source.MoveNext(ref currentUnused, ref keyUnused)) ; } //----------------------------------------------------------------------------------- // Ensure we signal that the channel is complete. // protected override void SpoolingFinally() { // Call the base implementation. base.SpoolingFinally(); // Dispose of the source enumerator m_source.Dispose(); } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ChannelDispatcher.cs
- ListViewItem.cs
- URLAttribute.cs
- BasicBrowserDialog.designer.cs
- Soap11ServerProtocol.cs
- Permission.cs
- ComboBoxRenderer.cs
- GetRecipientListRequest.cs
- OutputCacheSettingsSection.cs
- CommandBinding.cs
- DataQuery.cs
- ListBindableAttribute.cs
- DataSourceXmlSerializer.cs
- ProgressBarBrushConverter.cs
- XmlAutoDetectWriter.cs
- StructuredProperty.cs
- BufferModesCollection.cs
- _CookieModule.cs
- ButtonColumn.cs
- ScaleTransform.cs
- DependentList.cs
- Subtree.cs
- MessageBox.cs
- ResumeStoryboard.cs
- ItemCheckEvent.cs
- ServiceNameElement.cs
- ImageMetadata.cs
- DataGridTablesFactory.cs
- StreamResourceInfo.cs
- MimeTypeMapper.cs
- IntegerCollectionEditor.cs
- NamespaceQuery.cs
- XsdValidatingReader.cs
- DetailsViewUpdatedEventArgs.cs
- NameValueConfigurationCollection.cs
- SmiEventSink_DeferedProcessing.cs
- FlowDocumentFormatter.cs
- WebServiceData.cs
- BroadcastEventHelper.cs
- MetadataItem_Static.cs
- ComMethodElementCollection.cs
- InputBinder.cs
- FlagsAttribute.cs
- webeventbuffer.cs
- RTLAwareMessageBox.cs
- TimeSpanSecondsOrInfiniteConverter.cs
- WorkflowMarkupSerializerMapping.cs
- Freezable.cs
- SyntaxCheck.cs
- PathFigure.cs
- DependentList.cs
- ToolboxComponentsCreatedEventArgs.cs
- RegexInterpreter.cs
- FixedFlowMap.cs
- ClientType.cs
- cryptoapiTransform.cs
- RangeValidator.cs
- FormViewInsertEventArgs.cs
- ColumnResizeUndoUnit.cs
- DbDataAdapter.cs
- MenuStrip.cs
- RequestCacheEntry.cs
- CompareValidator.cs
- SelectionItemProviderWrapper.cs
- InvokeHandlers.cs
- ExpressionsCollectionEditor.cs
- RelationshipNavigation.cs
- TextSelectionHelper.cs
- SerialErrors.cs
- ReferentialConstraint.cs
- PermissionAttributes.cs
- SelectionEditor.cs
- RadioButtonPopupAdapter.cs
- ObjectStorage.cs
- WebSysDisplayNameAttribute.cs
- ConnectionStringSettingsCollection.cs
- ParameterCollectionEditor.cs
- ImagingCache.cs
- StoreContentChangedEventArgs.cs
- DataGridViewColumnDesigner.cs
- AccessorTable.cs
- Assembly.cs
- SplineQuaternionKeyFrame.cs
- TaskbarItemInfo.cs
- Literal.cs
- RuntimeEnvironment.cs
- CustomSignedXml.cs
- WindowsStatic.cs
- WorkflowStateRollbackService.cs
- EntityDataSourceWizardForm.cs
- DeviceOverridableAttribute.cs
- TreeNodeClickEventArgs.cs
- DataGridRow.cs
- CompiledQueryCacheKey.cs
- HttpRuntime.cs
- TableLayoutPanel.cs
- LabelEditEvent.cs
- RtfToXamlReader.cs
- MouseEventArgs.cs
- KeyConverter.cs