Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / DefaultMergeHelper.cs / 1305376 / DefaultMergeHelper.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DefaultMergeHelper.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// The default merge helper uses a set of straightforward algorithms for output
/// merging. Namely, for synchronous merges, the input data is yielded from the
/// input data streams in "depth first" left-to-right order. For asynchronous merges,
/// on the other hand, we use a biased choice algorithm to favor input channels in
/// a "fair" way. No order preservation is carried out by this helper.
///
///
///
internal class DefaultMergeHelper : IMergeHelper
{
private QueryTaskGroupState m_taskGroupState; // State shared among tasks.
private PartitionedStream m_partitions; // Source partitions.
private AsynchronousChannel[] m_asyncChannels; // Destination channels (async).
private SynchronousChannel[] m_syncChannels; // Destination channels ([....]).
private IEnumerator m_channelEnumerator; // Output enumerator.
private TaskScheduler m_taskScheduler; // The task manager to execute the query.
private bool m_ignoreOutput; // Whether we're enumerating "for effect".
//------------------------------------------------------------------------------------
// Instantiates a new merge helper.
//
// Arguments:
// partitions - the source partitions from which to consume data.
// ignoreOutput - whether we're enumerating "for effect" or for output.
// pipeline - whether to use a pipelined merge.
//
internal DefaultMergeHelper(PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options,
TaskScheduler taskScheduler, CancellationState cancellationState, int queryId)
{
Contract.Assert(partitions != null);
m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId);
m_partitions = partitions;
m_taskScheduler = taskScheduler;
m_ignoreOutput = ignoreOutput;
TraceHelpers.TraceInfo("DefaultMergeHelper::.ctor(..): creating a default merge helper");
// If output won't be ignored, we need to manufacture a set of channels for the consumer.
// Otherwise, when the merge is executed, we'll just invoke the activities themselves.
if (!ignoreOutput)
{
// Create the asynchronous or synchronous channels, based on whether we're pipelining.
if (options != ParallelMergeOptions.FullyBuffered)
{
if (partitions.PartitionCount > 1)
{
m_asyncChannels =
MergeExecutor.MakeAsynchronousChannels(partitions.PartitionCount, options, cancellationState.MergedCancellationToken);
m_channelEnumerator = new AsynchronousChannelMergeEnumerator(m_taskGroupState, m_asyncChannels);
}
else
{
// If there is only one partition, we don't need to create channels. The only producer enumerator
// will be used as the result enumerator.
m_channelEnumerator = ExceptionAggregator.WrapQueryEnumerator(partitions[0], m_taskGroupState.CancellationState).GetEnumerator();
}
}
else
{
m_syncChannels =
MergeExecutor.MakeSynchronousChannels(partitions.PartitionCount);
m_channelEnumerator = new SynchronousChannelMergeEnumerator(m_taskGroupState, m_syncChannels);
}
Contract.Assert(m_asyncChannels == null || m_asyncChannels.Length == partitions.PartitionCount);
Contract.Assert(m_syncChannels == null || m_syncChannels.Length == partitions.PartitionCount);
Contract.Assert(m_channelEnumerator != null, "enumerator can't be null if we're not ignoring output");
}
}
//-----------------------------------------------------------------------------------
// Schedules execution of the merge itself.
//
// Arguments:
// ordinalIndexState - the state of the ordinal index of the merged partitions
//
void IMergeHelper.Execute()
{
if (m_asyncChannels != null)
{
SpoolingTask.SpoolPipeline(m_taskGroupState, m_partitions, m_asyncChannels, m_taskScheduler);
}
else if (m_syncChannels != null)
{
SpoolingTask.SpoolStopAndGo(m_taskGroupState, m_partitions, m_syncChannels, m_taskScheduler);
}
else if (m_ignoreOutput)
{
SpoolingTask.SpoolForAll(m_taskGroupState, m_partitions, m_taskScheduler);
}
else
{
// The last case is a pipelining merge when DOP = 1. In this case, the consumer thread itself will compute the results,
// so we don't need any tasks to compute the results asynchronously.
Contract.Assert(m_partitions.PartitionCount == 1);
}
}
//-----------------------------------------------------------------------------------
// Gets the enumerator from which to enumerate output results.
//
IEnumerator IMergeHelper.GetEnumerator()
{
Contract.Assert(m_ignoreOutput || m_channelEnumerator != null);
return m_channelEnumerator;
}
//-----------------------------------------------------------------------------------
// Returns the results as an array.
//
// @
public TInputOutput[] GetResultsAsArray()
{
if (m_syncChannels != null)
{
// Right size an array.
int totalSize = 0;
for (int i = 0; i < m_syncChannels.Length; i++)
{
totalSize += m_syncChannels[i].Count;
}
TInputOutput[] array = new TInputOutput[totalSize];
// And then blit the elements in.
int current = 0;
for (int i = 0; i < m_syncChannels.Length; i++)
{
m_syncChannels[i].CopyTo(array, current);
current += m_syncChannels[i].Count;
}
return array;
}
else
{
List output = new List();
using (IEnumerator enumerator = ((IMergeHelper)this).GetEnumerator())
{
while (enumerator.MoveNext())
{
output.Add(enumerator.Current);
}
}
return output.ToArray();
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- TransportManager.cs
- MarginsConverter.cs
- DbgUtil.cs
- MessageEventSubscriptionService.cs
- StopStoryboard.cs
- OleDbParameter.cs
- ActivityStateRecord.cs
- Int16Converter.cs
- TabControl.cs
- UnmanagedMemoryStream.cs
- EventLogger.cs
- OleDbEnumerator.cs
- WebServiceHandlerFactory.cs
- XamlValidatingReader.cs
- XPathNavigatorKeyComparer.cs
- HScrollProperties.cs
- FontUnitConverter.cs
- BlobPersonalizationState.cs
- BufferBuilder.cs
- PartialCachingControl.cs
- ProcessModule.cs
- metadatamappinghashervisitor.hashsourcebuilder.cs
- ControlCachePolicy.cs
- future.cs
- JournalEntryListConverter.cs
- PerformanceCounterPermissionEntry.cs
- SiteMapPath.cs
- TableHeaderCell.cs
- ContainerAction.cs
- ErrorFormatterPage.cs
- Decoder.cs
- ModelItemImpl.cs
- basevalidator.cs
- TraceHwndHost.cs
- DefaultShape.cs
- StreamGeometry.cs
- PropertyHelper.cs
- ChildTable.cs
- CollectionViewGroup.cs
- IDReferencePropertyAttribute.cs
- StreamInfo.cs
- CollectionDataContractAttribute.cs
- SqlTypesSchemaImporter.cs
- ServerType.cs
- ReflectEventDescriptor.cs
- ToolboxComponentsCreatingEventArgs.cs
- Bold.cs
- Registry.cs
- FunctionImportMapping.cs
- RNGCryptoServiceProvider.cs
- FixedSOMElement.cs
- AssemblyResourceLoader.cs
- DirectoryObjectSecurity.cs
- TextSelectionProcessor.cs
- ListManagerBindingsCollection.cs
- WhitespaceRuleLookup.cs
- ThreadExceptionDialog.cs
- KerberosTicketHashIdentifierClause.cs
- WindowInteropHelper.cs
- XmlWrappingReader.cs
- ColumnReorderedEventArgs.cs
- SqlDataSourceView.cs
- Size.cs
- XmlTextReader.cs
- DnsEndPoint.cs
- IisTraceListener.cs
- BaseCollection.cs
- SimpleRecyclingCache.cs
- ListViewCancelEventArgs.cs
- InvalidCommandTreeException.cs
- ObjectDataSourceFilteringEventArgs.cs
- HiddenFieldPageStatePersister.cs
- GradientBrush.cs
- VariableAction.cs
- MetadataArtifactLoaderXmlReaderWrapper.cs
- SystemColors.cs
- SettingsSection.cs
- HelloMessage11.cs
- followingquery.cs
- TreeViewEvent.cs
- arc.cs
- BoundingRectTracker.cs
- ExeContext.cs
- DtrList.cs
- MSHTMLHost.cs
- OLEDB_Util.cs
- CatalogZoneBase.cs
- IxmlLineInfo.cs
- EventItfInfo.cs
- CompiledQuery.cs
- ETagAttribute.cs
- IntSecurity.cs
- Utils.cs
- HttpWebResponse.cs
- SoapIgnoreAttribute.cs
- ECDsaCng.cs
- NeutralResourcesLanguageAttribute.cs
- SmtpNtlmAuthenticationModule.cs
- BaseTemplateBuildProvider.cs
- XmlCountingReader.cs