Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / SpoolingTask.cs / 1305376 / SpoolingTask.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SpoolingTask.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// A factory class to execute spooling logic. /// internal static class SpoolingTask { //------------------------------------------------------------------------------------ // Creates and begins execution of a new spooling task. Executes synchronously, // and by the time this API has returned all of the results have been produced. // // Arguments: // groupState - values for inter-task communication // partitions - the producer enumerators // channels - the producer-consumer channels // taskScheduler - the task manager on which to execute // internal static void SpoolStopAndGo( QueryTaskGroupState groupState, PartitionedStream partitions, SynchronousChannel [] channels, TaskScheduler taskScheduler) { Contract.Assert(partitions.PartitionCount == channels.Length); Contract.Assert(groupState != null); // Ensure all tasks in this query are parented under a common root. Task rootTask = new Task( () => { int maxToRunInParallel = partitions.PartitionCount - 1; // A stop-and-go merge uses the current thread for one task and then blocks before // returning to the caller, until all results have been accumulated. We do this by // running the last partition on the calling thread. for (int i = 0; i < maxToRunInParallel; i++) { TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new StopAndGoSpoolingTask (i, groupState, partitions[i], channels[i]); asyncTask.RunAsynchronously(taskScheduler); } TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); // Run one task synchronously on the current thread. QueryTask syncTask = new StopAndGoSpoolingTask ( maxToRunInParallel, groupState, partitions[maxToRunInParallel], channels[maxToRunInParallel]); syncTask.RunSynchronously(taskScheduler); }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // We don't want to return until the task is finished. Run it on the calling thread. rootTask.RunSynchronously(taskScheduler); // Wait for the query to complete, propagate exceptions, and so on. // For pipelined queries, this step happens in the async enumerator. groupState.QueryEnd(false); } //----------------------------------------------------------------------------------- // Creates and begins execution of a new spooling task. Runs asynchronously. // // Arguments: // groupState - values for inter-task communication // partitions - the producer enumerators // channels - the producer-consumer channels // taskScheduler - the task manager on which to execute // internal static void SpoolPipeline ( QueryTaskGroupState groupState, PartitionedStream partitions, AsynchronousChannel [] channels, TaskScheduler taskScheduler) { Contract.Assert(partitions.PartitionCount == channels.Length); Contract.Assert(groupState != null); // Ensure all tasks in this query are parented under a common root. Because this // is a pipelined query, we detach it from the parent (to avoid blocking the calling // thread), and run the query on a separate thread. Task rootTask = new Task( () => { // Create tasks that will enumerate the partitions in parallel. Because we're pipelining, // we will begin running these tasks in parallel and then return. for (int i = 0; i < partitions.PartitionCount; i++) { TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new PipelineSpoolingTask (i, groupState, partitions[i], channels[i]); asyncTask.RunAsynchronously(taskScheduler); } }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // And schedule it for execution. This is done after beginning to ensure no thread tries to // end the query before its root task has been recorded properly. rootTask.Start(taskScheduler); // We don't call QueryEnd here; when we return, the query is still executing, and the // last enumerator to be disposed of will call QueryEnd for us. } //----------------------------------------------------------------------------------- // Creates and begins execution of a new spooling task. This is a for-all style // execution, meaning that the query will be run fully (for effect) before returning // and that there are no channels into which data will be queued. // // Arguments: // groupState - values for inter-task communication // partitions - the producer enumerators // taskScheduler - the task manager on which to execute // internal static void SpoolForAll ( QueryTaskGroupState groupState, PartitionedStream partitions, TaskScheduler taskScheduler) { Contract.Assert(groupState != null); // Ensure all tasks in this query are parented under a common root. Task rootTask = new Task( () => { int maxToRunInParallel = partitions.PartitionCount - 1; // Create tasks that will enumerate the partitions in parallel "for effect"; in other words, // no data will be placed into any kind of producer-consumer channel. for (int i = 0; i < maxToRunInParallel; i++) { TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] asynchronously", i); QueryTask asyncTask = new ForAllSpoolingTask (i, groupState, partitions[i]); asyncTask.RunAsynchronously(taskScheduler); } TraceHelpers.TraceInfo("SpoolingTask::Spool: Running partition[{0}] synchronously", maxToRunInParallel); // Run one task synchronously on the current thread. QueryTask syncTask = new ForAllSpoolingTask (maxToRunInParallel, groupState, partitions[maxToRunInParallel]); syncTask.RunSynchronously(taskScheduler); }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // We don't want to return until the task is finished. Run it on the calling thread. rootTask.RunSynchronously(taskScheduler); // Wait for the query to complete, propagate exceptions, and so on. // For pipelined queries, this step happens in the async enumerator. groupState.QueryEnd(false); } } /// /// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. /// ////// internal class StopAndGoSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. private QueryOperatorEnumerator m_source; // The destination channel into which data is placed. This can be null if we are // enumerating "for effect", e.g. forall loop. private SynchronousChannel m_destination; //----------------------------------------------------------------------------------- // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // internal StopAndGoSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator source, SynchronousChannel destination) : base(taskIndex, groupState) { Contract.Assert(source != null); m_source = source; m_destination = destination; } //------------------------------------------------------------------------------------ // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { // We just enumerate over the entire source data stream, placing each element // into the destination channel. TInputOutput current = default(TInputOutput); TIgnoreKey keyUnused = default(TIgnoreKey); QueryOperatorEnumerator source = m_source; SynchronousChannel destination = m_destination; CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken; destination.Init(); while (source.MoveNext(ref current, ref keyUnused)) { // If an abort has been requested, stop this worker immediately. if (cancelToken.IsCancellationRequested) { break; } destination.Enqueue(current); } } //----------------------------------------------------------------------------------- // Ensure we signal that the channel is complete. // protected override void SpoolingFinally() { // Call the base implementation. base.SpoolingFinally(); // Signal that we are done, in the case of asynchronous consumption. if (m_destination != null) { m_destination.SetDone(); } // Dispose of the source enumerator *after* signaling that the task is done. // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. m_source.Dispose(); } } /// /// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. /// ////// internal class PipelineSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. private QueryOperatorEnumerator m_source; // The destination channel into which data is placed. This can be null if we are // enumerating "for effect", e.g. forall loop. private AsynchronousChannel m_destination; //------------------------------------------------------------------------------------ // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // internal PipelineSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator source, AsynchronousChannel destination) : base(taskIndex, groupState) { Contract.Assert(source != null); m_source = source; m_destination = destination; } //------------------------------------------------------------------------------------ // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { // We just enumerate over the entire source data stream, placing each element // into the destination channel. TInputOutput current = default(TInputOutput); TIgnoreKey keyUnused = default(TIgnoreKey); QueryOperatorEnumerator source = m_source; AsynchronousChannel destination = m_destination; CancellationToken cancelToken = m_groupState.CancellationState.MergedCancellationToken; while (source.MoveNext(ref current, ref keyUnused)) { // If an abort has been requested, stop this worker immediately. if (cancelToken.IsCancellationRequested) { break; } destination.Enqueue(current); } // Flush remaining data to the query consumer in preparation for channel shutdown. destination.FlushBuffers(); } //----------------------------------------------------------------------------------- // Ensure we signal that the channel is complete. // protected override void SpoolingFinally() { // Call the base implementation. base.SpoolingFinally(); // Signal that we are done, in the case of asynchronous consumption. if (m_destination != null) { m_destination.SetDone(); } // Dispose of the source enumerator *after* signaling that the task is done. // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. m_source.Dispose(); } } /// /// A spooling task handles marshaling data from a producer to a consumer. It's given /// a single enumerator object that contains all of the production algorithms, a single /// destination channel from which consumers draw results, and (optionally) a /// synchronization primitive using which to notify asynchronous consumers. /// ////// internal class ForAllSpoolingTask : SpoolingTaskBase { // The data source from which to pull data. private QueryOperatorEnumerator m_source; //------------------------------------------------------------------------------------ // Creates, but does not execute, a new spooling task. // // Arguments: // taskIndex - the unique index of this task // source - the producer enumerator // destination - the destination channel into which to spool elements // // Assumptions: // Source cannot be null, although the other arguments may be. // internal ForAllSpoolingTask( int taskIndex, QueryTaskGroupState groupState, QueryOperatorEnumerator source) : base(taskIndex, groupState) { Contract.Assert(source != null); m_source = source; } //----------------------------------------------------------------------------------- // This method is responsible for enumerating results and enqueueing them to // the output channel(s) as appropriate. Each base class implements its own. // protected override void SpoolingWork() { // We just enumerate over the entire source data stream for effect. TInputOutput currentUnused = default(TInputOutput); TIgnoreKey keyUnused = default(TIgnoreKey); //Note: this only ever runs with a ForAll operator, and ForAllEnumerator performs cancellation checks while (m_source.MoveNext(ref currentUnused, ref keyUnused)) ; } //----------------------------------------------------------------------------------- // Ensure we signal that the channel is complete. // protected override void SpoolingFinally() { // Call the base implementation. base.SpoolingFinally(); // Dispose of the source enumerator m_source.Dispose(); } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- GeometryConverter.cs
- DataGridGeneralPage.cs
- MenuAutoFormat.cs
- WSSecurityPolicy12.cs
- WebPartVerb.cs
- TdsEnums.cs
- HostedHttpTransportManager.cs
- DrawItemEvent.cs
- WindowsTreeView.cs
- SamlAuthenticationClaimResource.cs
- HtmlTableCell.cs
- XPathDocumentNavigator.cs
- CachingHintValidation.cs
- DocComment.cs
- diagnosticsswitches.cs
- KeyConverter.cs
- TextLineResult.cs
- AssociationTypeEmitter.cs
- ReservationNotFoundException.cs
- InfiniteTimeSpanConverter.cs
- DataRowChangeEvent.cs
- WizardStepBase.cs
- ContextDataSourceView.cs
- TypeUtils.cs
- Attribute.cs
- SspiHelper.cs
- ServicePointManagerElement.cs
- SchemaInfo.cs
- DiscardableAttribute.cs
- FileDetails.cs
- RichTextBoxConstants.cs
- TabItemAutomationPeer.cs
- Ipv6Element.cs
- Bits.cs
- UInt32Converter.cs
- HtmlElement.cs
- UIElementParaClient.cs
- CopyAction.cs
- TypeExtension.cs
- UserControlAutomationPeer.cs
- Debug.cs
- XsdDataContractExporter.cs
- DesignerFrame.cs
- ConstraintConverter.cs
- WorkflowInstanceExtensionProvider.cs
- StreamInfo.cs
- SlotInfo.cs
- SerTrace.cs
- SecurityElementBase.cs
- ChildDocumentBlock.cs
- shaper.cs
- DataGridCommandEventArgs.cs
- PerformanceCounter.cs
- SerialErrors.cs
- DesignOnlyAttribute.cs
- ResourceAssociationSet.cs
- CaseCqlBlock.cs
- CacheManager.cs
- ExpressionVisitorHelpers.cs
- InvalidateEvent.cs
- MulticastOption.cs
- ExternalDataExchangeClient.cs
- DataGridViewLinkCell.cs
- ToolboxItemCollection.cs
- Vertex.cs
- ReachPrintTicketSerializer.cs
- InputProcessorProfiles.cs
- RawStylusInputReport.cs
- BufferedReadStream.cs
- XmlImplementation.cs
- XmlSchemaIdentityConstraint.cs
- XmlReaderDelegator.cs
- AppDomainResourcePerfCounters.cs
- TypeNameHelper.cs
- PropertyStore.cs
- SimpleHandlerBuildProvider.cs
- WorkflowServiceBuildProvider.cs
- MethodBuilder.cs
- ColorDialog.cs
- DataTemplateKey.cs
- OleDbParameterCollection.cs
- Imaging.cs
- DataSourceListEditor.cs
- Point3DAnimation.cs
- MatrixTransform.cs
- StorageScalarPropertyMapping.cs
- DataExpression.cs
- ExpandCollapseProviderWrapper.cs
- OleDbError.cs
- ListViewItemCollectionEditor.cs
- FacetEnabledSchemaElement.cs
- WorkflowDesigner.cs
- HGlobalSafeHandle.cs
- IsolationInterop.cs
- ZipIOExtraFieldElement.cs
- PlaceHolder.cs
- StructuredTypeEmitter.cs
- ColumnHeader.cs
- ExecutionTracker.cs
- OdbcParameterCollection.cs