Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / OrderPreservingPipeliningMergeHelper.cs / 1305376 / OrderPreservingPipeliningMergeHelper.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// OrderPreservingPipeliningMergeHelper.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;
namespace System.Linq.Parallel
{
///
/// A merge helper that yields results in a streaming fashion, while still ensuring correct output
/// ordering. This merge only works if each producer task generates outputs in the correct order,
/// i.e. with an Increasing (or Correct) order index.
///
/// The merge creates DOP producer tasks, each of which will be writing results into a separate
/// buffer.
///
/// The consumer always waits until each producer buffer contains at least one element. If we don't
/// have one element from each producer, we cannot yield the next element. (If the order index is
/// Correct, or in some special cases with the Increasing order, we could yield sooner. The
/// current algorithm does not take advantage of this.)
///
/// The consumer maintains a producer heap, and uses it to decide which producer should yield the next output
/// result. After yielding an element from a particular producer, the consumer will take another element
/// from the same producer. However, if the producer buffer exceeded a particular threshold, the consumer
/// will take the entire buffer, and give the producer an empty buffer to fill.
///
/// Finally, if the producer notices that its buffer has exceeded an even greater threshold, it will
/// go to sleep and wait until the consumer takes the entire buffer.
///
internal class OrderPreservingPipeliningMergeHelper : IMergeHelper
{
private readonly QueryTaskGroupState m_taskGroupState; // State shared among tasks.
private readonly PartitionedStream m_partitions; // Source partitions.
private readonly TaskScheduler m_taskScheduler; // The task manager to execute the query.
///
/// Whether the producer is allowed to buffer up elements before handing a chunk to the consumer.
/// If false, the producer will make each result available to the consumer immediately after it is
/// produced.
///
private readonly bool m_autoBuffered;
///
/// Buffers for the results. Each buffer has elements added by one producer, and removed
/// by the consumer.
///
private readonly Queue>[] m_buffers;
///
/// Whether each producer is done producing. Set to true by individual producers, read by consumer.
///
private readonly bool[] m_producerDone;
///
/// Whether a particular producer is waiting on the consumer. Read by the consumer, set to true
/// by producers, set to false by the consumer.
///
private readonly bool[] m_producerWaiting;
///
/// Whether the consumer is waiting on a particular producer. Read by producers, set to true
/// by consumer, set to false by producer.
///
private readonly bool[] m_consumerWaiting;
///
/// Each object is a lock protecting the corresponding elements in m_buffers, m_producerDone,
/// m_producerWaiting and m_consumerWaiting.
///
private readonly object[] m_bufferLocks;
///
/// A singleton instance of the comparer used by the producer heap. Eager allocation is OK
/// because if the static constructor runs, we will be using this merge.
///
private static ProducerComparer s_producerComparer = new ProducerComparer();
///
/// The initial capacity of the buffer queue. The value was chosen experimentally.
///
internal const int INITIAL_BUFFER_SIZE = 128;
///
/// If the consumer notices that the queue reached this limit, it will take the entire buffer from
/// the producer, instead of just popping off one result. The value was chosen experimentally.
///
internal const int STEAL_BUFFER_SIZE = 1024;
///
/// If the producer notices that the queue reached this limit, it will go to sleep until woken up
/// by the consumer. Chosen experimentally.
///
internal const int MAX_BUFFER_SIZE = 8192;
//------------------------------------------------------------------------------------
// Instantiates a new merge helper.
//
// Arguments:
// partitions - the source partitions from which to consume data.
// ignoreOutput - whether we're enumerating "for effect" or for output.
//
internal OrderPreservingPipeliningMergeHelper(
PartitionedStream partitions,
TaskScheduler taskScheduler,
CancellationState cancellationState,
bool autoBuffered,
int queryId)
{
Contract.Assert(partitions != null);
TraceHelpers.TraceInfo("KeyOrderPreservingMergeHelper::.ctor(..): creating an order preserving merge helper");
m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
m_partitions = partitions;
m_taskScheduler = taskScheduler;
m_autoBuffered = autoBuffered;
int partitionCount = m_partitions.PartitionCount;
m_buffers = new Queue>[partitionCount];
m_producerDone = new bool[partitionCount];
m_consumerWaiting = new bool[partitionCount];
m_producerWaiting = new bool[partitionCount];
m_bufferLocks = new object[partitionCount];
}
//-----------------------------------------------------------------------------------
// Schedules execution of the merge itself.
//
void IMergeHelper.Execute()
{
OrderPreservingPipeliningSpoolingTask.Spool(
m_taskGroupState, m_partitions, m_consumerWaiting, m_producerWaiting, m_producerDone,
m_buffers, m_bufferLocks, m_taskScheduler, m_autoBuffered);
}
//-----------------------------------------------------------------------------------
// Gets the enumerator from which to enumerate output results.
//
IEnumerator IMergeHelper.GetEnumerator()
{
return new OrderedPipeliningMergeEnumerator(this);
}
//-----------------------------------------------------------------------------------
// Returns the results as an array.
//
public TOutput[] GetResultsAsArray()
{
Contract.Assert(false, "An ordered pipelining merge is not intended to be used this way.");
throw new InvalidOperationException();
}
///
/// A structure to represent a producer in the producer heap.
///
private struct Producer
{
internal readonly int MaxKey; // Order index of the next element from this producer
internal readonly int ProducerIndex; // Index of the producer, [0..DOP)
internal Producer(int maxKey, int producerIndex)
{
MaxKey = maxKey;
ProducerIndex = producerIndex;
}
}
///
/// A comparer used by FixedMaxHeap(Of Producer)
///
/// This comparer will be used by max-heap. We want the producer with the smallest MaxKey to
/// end up in the root of the heap.
///
/// x.MaxKey GREATER_THAN y.MaxKey => x LESS_THAN y => return -
/// x.MaxKey EQUALS y.MaxKey => x EQUALS y => return 0
/// x.MaxKey LESS_THAN y.MaxKey => x GREATER_THAN y => return +
///
private class ProducerComparer : IComparer
{
public int Compare(Producer x, Producer y)
{
Contract.Assert(x.MaxKey >= 0 && y.MaxKey >= 0); // Guarantees no overflow on next line
return y.MaxKey - x.MaxKey;
}
}
///
/// Enumerator over the results of an order-preserving pipelining merge.
///
private class OrderedPipeliningMergeEnumerator : MergeEnumerator
{
///
/// Merge helper associated with this enumerator
///
private OrderPreservingPipeliningMergeHelper m_mergeHelper;
///
/// Heap used to efficiently locate the producer whose result should be consumed next.
/// For each producer, stores the order index for the next element to be yielded.
///
/// Read and written by the consumer only.
///
private readonly FixedMaxHeap m_producerHeap;
///
/// Stores the next element to be yielded from each producer. We use a separate array
/// rather than storing this information in the producer heap to keep the Producer struct
/// small.
///
/// Read and written by the consumer only.
///
private readonly TOutput[] m_producerNextElement;
///
/// A private buffer for the consumer. When the size of a producer buffer exceeds a threshold
/// (STEAL_BUFFER_SIZE), the consumer will take ownership of the entire buffer, and give the
/// producer a new empty buffer to place results into.
///
/// Read and written by the consumer only.
///
private readonly Queue>[] m_privateBuffer;
///
/// Tracks whether MoveNext() has already been called previously.
///
private bool m_initialized = false;
///
/// Constructor
///
internal OrderedPipeliningMergeEnumerator(OrderPreservingPipeliningMergeHelper mergeHelper)
:base(mergeHelper.m_taskGroupState)
{
int partitionCount = mergeHelper.m_partitions.PartitionCount;
m_mergeHelper = mergeHelper;
m_producerHeap = new FixedMaxHeap(partitionCount, s_producerComparer);
m_privateBuffer = new Queue>[partitionCount];
m_producerNextElement = new TOutput[partitionCount];
}
///
/// Returns the current result
///
public override TOutput Current
{
get
{
int producerToYield = m_producerHeap.MaxValue.ProducerIndex;
return m_producerNextElement[producerToYield];
}
}
///
/// Moves the enumerator to the next result, or returns false if there are no more results to yield.
///
public override bool MoveNext()
{
if (!m_initialized)
{
//
// Initialization: wait until each producer has produced at least one element. Since the order indices
// are increasing, we cannot start yielding until we have at least one element from each producer.
//
m_initialized = true;
for (int producer = 0; producer < m_mergeHelper.m_partitions.PartitionCount; producer++)
{
Pair element = default(Pair);
// Get the first element from this producer
if (TryWaitForElement(producer, ref element))
{
// Update the producer heap and its helper array with the received element
m_producerHeap.Insert(new Producer(element.First, producer));
m_producerNextElement[producer] = element.Second;
}
else
{
// If this producer didn't produce any results because it encountered an exception,
// cancellation would have been initiated by now. If cancellation has started, we will
// propagate the exception now.
ThrowIfInTearDown();
}
}
}
else
{
// If the producer heap is empty, we are done. In fact, we know that a previous MoveNext() call
// already returned false.
if (m_producerHeap.Count == 0)
{
return false;
}
//
// Get the next element from the producer that yielded a value last. Update the producer heap.
// The next producer to yield will be in the front of the producer heap.
//
// The last producer whose result the merge yielded
int lastProducer = m_producerHeap.MaxValue.ProducerIndex;
// Get the next element from the same producer
Pair element = default(Pair);
if (TryGetPrivateElement(lastProducer, ref element)
|| TryWaitForElement(lastProducer, ref element))
{
// Update the producer heap and its helper array with the received element
m_producerHeap.ReplaceMax(new Producer(element.First, lastProducer));
m_producerNextElement[lastProducer] = element.Second;
}
else
{
// If this producer is done because it encountered an exception, cancellation
// would have been initiated by now. If cancellation has started, we will propagate
// the exception now.
ThrowIfInTearDown();
// This producer is done. Remove it from the producer heap.
m_producerHeap.RemoveMax();
}
}
return m_producerHeap.Count > 0;
}
///
/// If the cancellation of the query has been initiated (because one or more producers
/// encountered exceptions, or because external cancellation token has been set), the method
/// will tear down the query and rethrow the exception.
///
private void ThrowIfInTearDown()
{
if (m_mergeHelper.m_taskGroupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
{
try
{
// Wake up all producers. Since the cancellation token has already been
// set, the producers will eventually stop after waking up.
object[] locks = m_mergeHelper.m_bufferLocks;
for (int i = 0; i < locks.Length; i++)
{
lock (locks[i])
{
Monitor.Pulse(locks[i]);
}
}
// Now, we wait for all producers to wake up, notice the cancellation and stop executing.
// QueryEnd will wait on all tasks to complete and then propagate all exceptions.
m_taskGroupState.QueryEnd(false);
Contract.Assert(false, "QueryEnd() should have thrown an exception.");
}
finally
{
// Clear the producer heap so that future calls to MoveNext simply return false.
m_producerHeap.Clear();
}
}
}
///
/// Wait until a producer's buffer is non-empty, or until that producer is done.
///
/// false if there is no element to yield because the producer is done, true otherwise
private bool TryWaitForElement(int producer, ref Pair element)
{
Queue> buffer = m_mergeHelper.m_buffers[producer];
object bufferLock = m_mergeHelper.m_bufferLocks[producer];
lock (bufferLock)
{
// If the buffer is empty, we need to wait on the producer
if (buffer.Count == 0)
{
// If the producer is already done, return false
if (m_mergeHelper.m_producerDone[producer])
{
element = default(Pair);
return false;
}
m_mergeHelper.m_consumerWaiting[producer] = true;
Monitor.Wait(bufferLock);
// If the buffer is still empty, the producer is done
if (buffer.Count == 0)
{
Contract.Assert(m_mergeHelper.m_producerDone[producer]);
element = default(Pair);
return false;
}
}
Contract.Assert(buffer.Count > 0, "Producer's buffer should not be empty here.");
// If the producer is waiting, wake it up
if (m_mergeHelper.m_producerWaiting[producer])
{
Monitor.Pulse(bufferLock);
m_mergeHelper.m_producerWaiting[producer] = false;
}
if (buffer.Count < STEAL_BUFFER_SIZE)
{
element = buffer.Dequeue();
return true;
}
else
{
// Privatize the entire buffer
m_privateBuffer[producer] = m_mergeHelper.m_buffers[producer];
// Give an empty buffer to the producer
m_mergeHelper.m_buffers[producer] = new Queue>(INITIAL_BUFFER_SIZE);
// No return statement.
// This is the only branch that contines below of the lock region.
}
}
// Get an element out of the private buffer.
bool gotElement = TryGetPrivateElement(producer, ref element);
Contract.Assert(gotElement);
return true;
}
///
/// Looks for an element from a particular producer in the consumer's private buffer.
///
private bool TryGetPrivateElement(int producer, ref Pair element)
{
var privateChunk = m_privateBuffer[producer];
if (privateChunk != null)
{
if (privateChunk.Count > 0)
{
element = privateChunk.Dequeue();
return true;
}
Contract.Assert(m_privateBuffer[producer].Count == 0);
m_privateBuffer[producer] = null;
}
return false;
}
public override void Dispose()
{
// Wake up any waiting producers
int partitionCount = m_mergeHelper.m_buffers.Length;
for (int producer = 0; producer < partitionCount; producer++)
{
object bufferLock = m_mergeHelper.m_bufferLocks[producer];
lock (bufferLock)
{
if (m_mergeHelper.m_producerWaiting[producer])
{
Monitor.Pulse(bufferLock);
}
}
}
base.Dispose();
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SendKeys.cs
- DialogWindow.cs
- Constants.cs
- Matrix.cs
- RichTextBoxDesigner.cs
- ObjectConverter.cs
- QueueProcessor.cs
- TableLayoutStyle.cs
- XslTransform.cs
- WebControlsSection.cs
- AutomationElementIdentifiers.cs
- FormClosedEvent.cs
- XPathDocument.cs
- MergablePropertyAttribute.cs
- TransactionFlowBindingElementImporter.cs
- Geometry3D.cs
- QilGeneratorEnv.cs
- EventLogger.cs
- MemoryStream.cs
- GridViewCommandEventArgs.cs
- LabelLiteral.cs
- OleDbParameter.cs
- DiscardableAttribute.cs
- SimpleRecyclingCache.cs
- SortableBindingList.cs
- StylusEditingBehavior.cs
- StringComparer.cs
- MutexSecurity.cs
- SapiGrammar.cs
- BitmapEffect.cs
- Vertex.cs
- CodeDirectoryCompiler.cs
- MobileUserControl.cs
- MetadataArtifactLoaderXmlReaderWrapper.cs
- ComponentSerializationService.cs
- GenericsInstances.cs
- cryptoapiTransform.cs
- EntityContainerEmitter.cs
- CatalogPart.cs
- EUCJPEncoding.cs
- SystemWebExtensionsSectionGroup.cs
- FormsIdentity.cs
- ListViewSelectEventArgs.cs
- XPathParser.cs
- DataGridViewSelectedCellCollection.cs
- HttpRequest.cs
- sqlcontext.cs
- HebrewCalendar.cs
- ControlPropertyNameConverter.cs
- TCPListener.cs
- QilLoop.cs
- TextBox.cs
- SaveWorkflowAsyncResult.cs
- WsatProxy.cs
- PasswordBoxAutomationPeer.cs
- LoggedException.cs
- OpenTypeCommon.cs
- CreationContext.cs
- DataSourceConverter.cs
- CellNormalizer.cs
- InvariantComparer.cs
- CookieProtection.cs
- BinaryWriter.cs
- SwitchExpression.cs
- AuthorizationRuleCollection.cs
- XmlMemberMapping.cs
- ProcessProtocolHandler.cs
- CommandBinding.cs
- WebHttpBehavior.cs
- SchemaAttDef.cs
- FlowLayout.cs
- PenThread.cs
- Soap.cs
- TreeViewItemAutomationPeer.cs
- Translator.cs
- ExportFileRequest.cs
- RemotingServices.cs
- FixedSOMLineRanges.cs
- BooleanSwitch.cs
- ImageDesigner.cs
- DataGridCommandEventArgs.cs
- UriTemplateClientFormatter.cs
- QilExpression.cs
- SafeNativeMemoryHandle.cs
- TextTreeNode.cs
- ACE.cs
- WindowsTokenRoleProvider.cs
- CapiSafeHandles.cs
- ScriptingWebServicesSectionGroup.cs
- PenCursorManager.cs
- ObjectItemCollectionAssemblyCacheEntry.cs
- SimpleHandlerFactory.cs
- EntityViewContainer.cs
- VersionedStream.cs
- _BaseOverlappedAsyncResult.cs
- LookupBindingPropertiesAttribute.cs
- Assert.cs
- ServiceReference.cs
- ConfigurationValue.cs
- BaseCodePageEncoding.cs