Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Scheduling / OrderPreservingPipeliningSpoolingTask.cs / 1305376 / OrderPreservingPipeliningSpoolingTask.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // OrderPreservingPipeliningSpoolingTask.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Collections.Generic; using System.Linq; using System.Linq.Parallel; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { class OrderPreservingPipeliningSpoolingTask: SpoolingTaskBase { private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks. private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query. private readonly QueryOperatorEnumerator m_partition; // The source partition. private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer private readonly bool[] m_producerDone; // Whether each producer is done private readonly int m_partitionIndex; // Index of the partition owned by this task. private readonly Queue >[] m_buffers; // The buffer for the results private readonly object m_bufferLock; // A lock for the buffer /// /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer. /// If false, the producer will make each result available to the consumer immediately after it is /// produced. /// private readonly bool m_autoBuffered; ////// The number of elements to accumulate on the producer before copying the elements to the /// producer-consumer buffer. This constant is only used in the AutoBuffered mode. /// /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization /// cost. /// private const int PRODUCER_BUFFER_AUTO_SIZE = 16; ////// Constructor /// internal OrderPreservingPipeliningSpoolingTask( QueryOperatorEnumeratorpartition, QueryTaskGroupState taskGroupState, bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone, int partitionIndex, Queue >[] buffers, object bufferLock, TaskScheduler taskScheduler, bool autoBuffered) :base(partitionIndex, taskGroupState) { Contract.Assert(partition != null); Contract.Assert(taskGroupState != null); Contract.Assert(consumerWaiting != null); Contract.Assert(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length); Contract.Assert(producerDone != null && producerDone.Length == consumerWaiting.Length); Contract.Assert(buffers != null && buffers.Length == consumerWaiting.Length); Contract.Assert(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length); m_partition = partition; m_taskGroupState = taskGroupState; m_producerDone = producerDone; m_consumerWaiting = consumerWaiting; m_producerWaiting = producerWaiting; m_partitionIndex = partitionIndex; m_buffers = buffers; m_bufferLock = bufferLock; m_taskScheduler = taskScheduler; m_autoBuffered = autoBuffered; } /// /// This method is responsible for enumerating results and enqueueing them to /// the output buffer as appropriate. Each base class implements its own. /// protected override void SpoolingWork() { TOutput element = default(TOutput); int key = default(int); int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1; Pair[] chunk = new Pair [chunkSize]; var partition = m_partition; CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken; int lastChunkSize; do { lastChunkSize = 0; while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key)) { chunk[lastChunkSize] = new Pair (key, element); lastChunkSize++; } if (lastChunkSize == 0) break; lock (m_bufferLock) { // Check if the query has been cancelled. if (cancelToken.IsCancellationRequested) { break; } for (int i = 0; i < lastChunkSize; i++) { m_buffers[m_partitionIndex].Enqueue(chunk[i]); } if (m_consumerWaiting[m_partitionIndex]) { Monitor.Pulse(m_bufferLock); m_consumerWaiting[m_partitionIndex] = false; } // If the producer buffer is too large, wait. // Note: we already checked for cancellation after acquiring the lock on this producer. // That guarantees that the consumer will eventually wake up the producer. if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper .MAX_BUFFER_SIZE) { m_producerWaiting[m_partitionIndex] = true; Monitor.Wait(m_bufferLock); } } } while (lastChunkSize == chunkSize); } /// /// Creates and begins execution of a new set of spooling tasks. /// public static void Spool( QueryTaskGroupState groupState, PartitionedStreampartitions, bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone, Queue >[] buffers, object[] bufferLocks, TaskScheduler taskScheduler, bool autoBuffered) { Contract.Assert(groupState != null); Contract.Assert(partitions != null); Contract.Assert(producerDone != null && producerDone.Length == partitions.PartitionCount); Contract.Assert(buffers != null && buffers.Length == partitions.PartitionCount); Contract.Assert(bufferLocks != null); int degreeOfParallelism = partitions.PartitionCount; // Initialize the buffers and buffer locks. for (int i = 0; i < degreeOfParallelism; i++) { buffers[i] = new Queue >(OrderPreservingPipeliningMergeHelper .INITIAL_BUFFER_SIZE); bufferLocks[i] = new object(); } // Ensure all tasks in this query are parented under a common root. Because this // is a pipelined query, we detach it from the parent (to avoid blocking the calling // thread), and run the query on a separate thread. Task rootTask = new Task( () => { for (int i = 0; i < degreeOfParallelism; i++) { QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask ( partitions[i], groupState, consumerWaiting, producerWaiting, producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered); asyncTask.RunAsynchronously(taskScheduler); } }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // And schedule it for execution. This is done after beginning to ensure no thread tries to // end the query before its root task has been recorded properly. rootTask.Start(taskScheduler); // We don't call QueryEnd here; when we return, the query is still executing, and the // last enumerator to be disposed of will call QueryEnd for us. } /// /// Dispose the underlying enumerator and wake up the consumer if necessary. /// protected override void SpoolingFinally() { // Let the consumer know that this producer is done. lock (m_bufferLock) { m_producerDone[m_partitionIndex] = true; if (m_consumerWaiting[m_partitionIndex]) { Monitor.Pulse(m_bufferLock); m_consumerWaiting[m_partitionIndex] = false; } } // Call the base implementation. base.SpoolingFinally(); // Dispose of the source enumerator *after* signaling that the task is done. // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. m_partition.Dispose(); } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // OrderPreservingPipeliningSpoolingTask.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Collections.Generic; using System.Linq; using System.Linq.Parallel; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { class OrderPreservingPipeliningSpoolingTask: SpoolingTaskBase { private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks. private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query. private readonly QueryOperatorEnumerator m_partition; // The source partition. private readonly bool[] m_consumerWaiting; // Whether a consumer is waiting on a particular producer private readonly bool[] m_producerWaiting; // Whether a particular producer is waiting on the consumer private readonly bool[] m_producerDone; // Whether each producer is done private readonly int m_partitionIndex; // Index of the partition owned by this task. private readonly Queue >[] m_buffers; // The buffer for the results private readonly object m_bufferLock; // A lock for the buffer /// /// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer. /// If false, the producer will make each result available to the consumer immediately after it is /// produced. /// private readonly bool m_autoBuffered; ////// The number of elements to accumulate on the producer before copying the elements to the /// producer-consumer buffer. This constant is only used in the AutoBuffered mode. /// /// Experimentally, 16 appears to be sufficient buffer size to compensate for the synchronization /// cost. /// private const int PRODUCER_BUFFER_AUTO_SIZE = 16; ////// Constructor /// internal OrderPreservingPipeliningSpoolingTask( QueryOperatorEnumeratorpartition, QueryTaskGroupState taskGroupState, bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone, int partitionIndex, Queue >[] buffers, object bufferLock, TaskScheduler taskScheduler, bool autoBuffered) :base(partitionIndex, taskGroupState) { Contract.Assert(partition != null); Contract.Assert(taskGroupState != null); Contract.Assert(consumerWaiting != null); Contract.Assert(producerWaiting != null && producerWaiting.Length == consumerWaiting.Length); Contract.Assert(producerDone != null && producerDone.Length == consumerWaiting.Length); Contract.Assert(buffers != null && buffers.Length == consumerWaiting.Length); Contract.Assert(partitionIndex >= 0 && partitionIndex < consumerWaiting.Length); m_partition = partition; m_taskGroupState = taskGroupState; m_producerDone = producerDone; m_consumerWaiting = consumerWaiting; m_producerWaiting = producerWaiting; m_partitionIndex = partitionIndex; m_buffers = buffers; m_bufferLock = bufferLock; m_taskScheduler = taskScheduler; m_autoBuffered = autoBuffered; } /// /// This method is responsible for enumerating results and enqueueing them to /// the output buffer as appropriate. Each base class implements its own. /// protected override void SpoolingWork() { TOutput element = default(TOutput); int key = default(int); int chunkSize = m_autoBuffered ? PRODUCER_BUFFER_AUTO_SIZE : 1; Pair[] chunk = new Pair [chunkSize]; var partition = m_partition; CancellationToken cancelToken = m_taskGroupState.CancellationState.MergedCancellationToken; int lastChunkSize; do { lastChunkSize = 0; while (lastChunkSize < chunkSize && partition.MoveNext(ref element, ref key)) { chunk[lastChunkSize] = new Pair (key, element); lastChunkSize++; } if (lastChunkSize == 0) break; lock (m_bufferLock) { // Check if the query has been cancelled. if (cancelToken.IsCancellationRequested) { break; } for (int i = 0; i < lastChunkSize; i++) { m_buffers[m_partitionIndex].Enqueue(chunk[i]); } if (m_consumerWaiting[m_partitionIndex]) { Monitor.Pulse(m_bufferLock); m_consumerWaiting[m_partitionIndex] = false; } // If the producer buffer is too large, wait. // Note: we already checked for cancellation after acquiring the lock on this producer. // That guarantees that the consumer will eventually wake up the producer. if (m_buffers[m_partitionIndex].Count >= OrderPreservingPipeliningMergeHelper .MAX_BUFFER_SIZE) { m_producerWaiting[m_partitionIndex] = true; Monitor.Wait(m_bufferLock); } } } while (lastChunkSize == chunkSize); } /// /// Creates and begins execution of a new set of spooling tasks. /// public static void Spool( QueryTaskGroupState groupState, PartitionedStreampartitions, bool[] consumerWaiting, bool[] producerWaiting, bool[] producerDone, Queue >[] buffers, object[] bufferLocks, TaskScheduler taskScheduler, bool autoBuffered) { Contract.Assert(groupState != null); Contract.Assert(partitions != null); Contract.Assert(producerDone != null && producerDone.Length == partitions.PartitionCount); Contract.Assert(buffers != null && buffers.Length == partitions.PartitionCount); Contract.Assert(bufferLocks != null); int degreeOfParallelism = partitions.PartitionCount; // Initialize the buffers and buffer locks. for (int i = 0; i < degreeOfParallelism; i++) { buffers[i] = new Queue >(OrderPreservingPipeliningMergeHelper .INITIAL_BUFFER_SIZE); bufferLocks[i] = new object(); } // Ensure all tasks in this query are parented under a common root. Because this // is a pipelined query, we detach it from the parent (to avoid blocking the calling // thread), and run the query on a separate thread. Task rootTask = new Task( () => { for (int i = 0; i < degreeOfParallelism; i++) { QueryTask asyncTask = new OrderPreservingPipeliningSpoolingTask ( partitions[i], groupState, consumerWaiting, producerWaiting, producerDone, i, buffers, bufferLocks[i], taskScheduler, autoBuffered); asyncTask.RunAsynchronously(taskScheduler); } }); // Begin the query on the calling thread. groupState.QueryBegin(rootTask); // And schedule it for execution. This is done after beginning to ensure no thread tries to // end the query before its root task has been recorded properly. rootTask.Start(taskScheduler); // We don't call QueryEnd here; when we return, the query is still executing, and the // last enumerator to be disposed of will call QueryEnd for us. } /// /// Dispose the underlying enumerator and wake up the consumer if necessary. /// protected override void SpoolingFinally() { // Let the consumer know that this producer is done. lock (m_bufferLock) { m_producerDone[m_partitionIndex] = true; if (m_consumerWaiting[m_partitionIndex]) { Monitor.Pulse(m_bufferLock); m_consumerWaiting[m_partitionIndex] = false; } } // Call the base implementation. base.SpoolingFinally(); // Dispose of the source enumerator *after* signaling that the task is done. // We call Dispose() last to ensure that if it throws an exception, we will not cause a deadlock. m_partition.Dispose(); } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- AsnEncodedData.cs
- ResolveCriteria.cs
- StdValidatorsAndConverters.cs
- Canvas.cs
- Byte.cs
- DiscriminatorMap.cs
- HtmlLink.cs
- Inline.cs
- SafeCryptContextHandle.cs
- ScriptDescriptor.cs
- ProfessionalColors.cs
- TextViewSelectionProcessor.cs
- TcpClientCredentialType.cs
- MetadataItemEmitter.cs
- EncoderBestFitFallback.cs
- EventLogInternal.cs
- DeploymentSectionCache.cs
- BindingExpressionBase.cs
- JournalNavigationScope.cs
- HtmlTableCellCollection.cs
- JsonObjectDataContract.cs
- BrowserCapabilitiesFactory35.cs
- HtmlTable.cs
- UIElement.cs
- EditorZoneBase.cs
- XmlPreloadedResolver.cs
- RadialGradientBrush.cs
- ExtenderProvidedPropertyAttribute.cs
- KnowledgeBase.cs
- TextParagraphProperties.cs
- IdnElement.cs
- HTMLTagNameToTypeMapper.cs
- DataGridViewCellCollection.cs
- ShaderEffect.cs
- ProcessThread.cs
- RelationshipEndMember.cs
- AnnotationObservableCollection.cs
- HtmlWindowCollection.cs
- NameSpaceExtractor.cs
- IdnElement.cs
- ActiveDocumentEvent.cs
- TableHeaderCell.cs
- ContainerControl.cs
- Internal.cs
- LinqDataSourceSelectEventArgs.cs
- SchemaTypeEmitter.cs
- AtomMaterializer.cs
- xamlnodes.cs
- WebException.cs
- XmlUtilWriter.cs
- StreamWithDictionary.cs
- Message.cs
- WebPartExportVerb.cs
- SupportingTokenProviderSpecification.cs
- MergeLocalizationDirectives.cs
- ResourceExpression.cs
- DataGridViewCellParsingEventArgs.cs
- OpenTypeLayoutCache.cs
- ModelItemDictionary.cs
- BlockCollection.cs
- BamlResourceContent.cs
- SystemTcpConnection.cs
- InboundActivityHelper.cs
- MsmqHostedTransportConfiguration.cs
- CheckBoxRenderer.cs
- EventProviderTraceListener.cs
- TransformPatternIdentifiers.cs
- XmlSchemaAny.cs
- SystemIPInterfaceProperties.cs
- EditorServiceContext.cs
- StructuralObject.cs
- IItemContainerGenerator.cs
- XmlCharCheckingReader.cs
- URLIdentityPermission.cs
- ValidationErrorCollection.cs
- PartialToken.cs
- Lasso.cs
- TextComposition.cs
- Delegate.cs
- ApplicationFileParser.cs
- SafeRegistryKey.cs
- BezierSegment.cs
- StyleSheet.cs
- DataGridViewButtonColumn.cs
- RecordsAffectedEventArgs.cs
- CodePageEncoding.cs
- MemoryPressure.cs
- FileDialogPermission.cs
- PagesSection.cs
- EntityDataSourceViewSchema.cs
- SystemColors.cs
- PeerDuplexChannel.cs
- StorageMappingItemCollection.cs
- TreeNodeStyle.cs
- FieldToken.cs
- Crc32.cs
- Ipv6Element.cs
- EncoderParameter.cs
- DataGridAutomationPeer.cs
- TimeSpanValidator.cs