Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / DefaultMergeHelper.cs / 1305376 / DefaultMergeHelper.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DefaultMergeHelper.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// The default merge helper uses a set of straightforward algorithms for output /// merging. Namely, for synchronous merges, the input data is yielded from the /// input data streams in "depth first" left-to-right order. For asynchronous merges, /// on the other hand, we use a biased choice algorithm to favor input channels in /// a "fair" way. No order preservation is carried out by this helper. /// ////// internal class DefaultMergeHelper : IMergeHelper { private QueryTaskGroupState m_taskGroupState; // State shared among tasks. private PartitionedStream m_partitions; // Source partitions. private AsynchronousChannel [] m_asyncChannels; // Destination channels (async). private SynchronousChannel [] m_syncChannels; // Destination channels ([....]). private IEnumerator m_channelEnumerator; // Output enumerator. private TaskScheduler m_taskScheduler; // The task manager to execute the query. private bool m_ignoreOutput; // Whether we're enumerating "for effect". //------------------------------------------------------------------------------------ // Instantiates a new merge helper. // // Arguments: // partitions - the source partitions from which to consume data. // ignoreOutput - whether we're enumerating "for effect" or for output. // pipeline - whether to use a pipelined merge. // internal DefaultMergeHelper(PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, CancellationState cancellationState, int queryId) { Contract.Assert(partitions != null); m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId); m_partitions = partitions; m_taskScheduler = taskScheduler; m_ignoreOutput = ignoreOutput; TraceHelpers.TraceInfo("DefaultMergeHelper::.ctor(..): creating a default merge helper"); // If output won't be ignored, we need to manufacture a set of channels for the consumer. // Otherwise, when the merge is executed, we'll just invoke the activities themselves. if (!ignoreOutput) { // Create the asynchronous or synchronous channels, based on whether we're pipelining. if (options != ParallelMergeOptions.FullyBuffered) { if (partitions.PartitionCount > 1) { m_asyncChannels = MergeExecutor .MakeAsynchronousChannels(partitions.PartitionCount, options, cancellationState.MergedCancellationToken); m_channelEnumerator = new AsynchronousChannelMergeEnumerator (m_taskGroupState, m_asyncChannels); } else { // If there is only one partition, we don't need to create channels. The only producer enumerator // will be used as the result enumerator. m_channelEnumerator = ExceptionAggregator.WrapQueryEnumerator(partitions[0], m_taskGroupState.CancellationState).GetEnumerator(); } } else { m_syncChannels = MergeExecutor .MakeSynchronousChannels(partitions.PartitionCount); m_channelEnumerator = new SynchronousChannelMergeEnumerator (m_taskGroupState, m_syncChannels); } Contract.Assert(m_asyncChannels == null || m_asyncChannels.Length == partitions.PartitionCount); Contract.Assert(m_syncChannels == null || m_syncChannels.Length == partitions.PartitionCount); Contract.Assert(m_channelEnumerator != null, "enumerator can't be null if we're not ignoring output"); } } //----------------------------------------------------------------------------------- // Schedules execution of the merge itself. // // Arguments: // ordinalIndexState - the state of the ordinal index of the merged partitions // void IMergeHelper .Execute() { if (m_asyncChannels != null) { SpoolingTask.SpoolPipeline (m_taskGroupState, m_partitions, m_asyncChannels, m_taskScheduler); } else if (m_syncChannels != null) { SpoolingTask.SpoolStopAndGo (m_taskGroupState, m_partitions, m_syncChannels, m_taskScheduler); } else if (m_ignoreOutput) { SpoolingTask.SpoolForAll (m_taskGroupState, m_partitions, m_taskScheduler); } else { // The last case is a pipelining merge when DOP = 1. In this case, the consumer thread itself will compute the results, // so we don't need any tasks to compute the results asynchronously. Contract.Assert(m_partitions.PartitionCount == 1); } } //----------------------------------------------------------------------------------- // Gets the enumerator from which to enumerate output results. // IEnumerator IMergeHelper .GetEnumerator() { Contract.Assert(m_ignoreOutput || m_channelEnumerator != null); return m_channelEnumerator; } //----------------------------------------------------------------------------------- // Returns the results as an array. // // @ public TInputOutput[] GetResultsAsArray() { if (m_syncChannels != null) { // Right size an array. int totalSize = 0; for (int i = 0; i < m_syncChannels.Length; i++) { totalSize += m_syncChannels[i].Count; } TInputOutput[] array = new TInputOutput[totalSize]; // And then blit the elements in. int current = 0; for (int i = 0; i < m_syncChannels.Length; i++) { m_syncChannels[i].CopyTo(array, current); current += m_syncChannels[i].Count; } return array; } else { List output = new List (); using (IEnumerator enumerator = ((IMergeHelper )this).GetEnumerator()) { while (enumerator.MoveNext()) { output.Add(enumerator.Current); } } return output.ToArray(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DefaultMergeHelper.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// The default merge helper uses a set of straightforward algorithms for output /// merging. Namely, for synchronous merges, the input data is yielded from the /// input data streams in "depth first" left-to-right order. For asynchronous merges, /// on the other hand, we use a biased choice algorithm to favor input channels in /// a "fair" way. No order preservation is carried out by this helper. /// ////// internal class DefaultMergeHelper : IMergeHelper { private QueryTaskGroupState m_taskGroupState; // State shared among tasks. private PartitionedStream m_partitions; // Source partitions. private AsynchronousChannel [] m_asyncChannels; // Destination channels (async). private SynchronousChannel [] m_syncChannels; // Destination channels ([....]). private IEnumerator m_channelEnumerator; // Output enumerator. private TaskScheduler m_taskScheduler; // The task manager to execute the query. private bool m_ignoreOutput; // Whether we're enumerating "for effect". //------------------------------------------------------------------------------------ // Instantiates a new merge helper. // // Arguments: // partitions - the source partitions from which to consume data. // ignoreOutput - whether we're enumerating "for effect" or for output. // pipeline - whether to use a pipelined merge. // internal DefaultMergeHelper(PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, CancellationState cancellationState, int queryId) { Contract.Assert(partitions != null); m_taskGroupState = new QueryTaskGroupState(cancellationState, queryId); m_partitions = partitions; m_taskScheduler = taskScheduler; m_ignoreOutput = ignoreOutput; TraceHelpers.TraceInfo("DefaultMergeHelper::.ctor(..): creating a default merge helper"); // If output won't be ignored, we need to manufacture a set of channels for the consumer. // Otherwise, when the merge is executed, we'll just invoke the activities themselves. if (!ignoreOutput) { // Create the asynchronous or synchronous channels, based on whether we're pipelining. if (options != ParallelMergeOptions.FullyBuffered) { if (partitions.PartitionCount > 1) { m_asyncChannels = MergeExecutor .MakeAsynchronousChannels(partitions.PartitionCount, options, cancellationState.MergedCancellationToken); m_channelEnumerator = new AsynchronousChannelMergeEnumerator (m_taskGroupState, m_asyncChannels); } else { // If there is only one partition, we don't need to create channels. The only producer enumerator // will be used as the result enumerator. m_channelEnumerator = ExceptionAggregator.WrapQueryEnumerator(partitions[0], m_taskGroupState.CancellationState).GetEnumerator(); } } else { m_syncChannels = MergeExecutor .MakeSynchronousChannels(partitions.PartitionCount); m_channelEnumerator = new SynchronousChannelMergeEnumerator (m_taskGroupState, m_syncChannels); } Contract.Assert(m_asyncChannels == null || m_asyncChannels.Length == partitions.PartitionCount); Contract.Assert(m_syncChannels == null || m_syncChannels.Length == partitions.PartitionCount); Contract.Assert(m_channelEnumerator != null, "enumerator can't be null if we're not ignoring output"); } } //----------------------------------------------------------------------------------- // Schedules execution of the merge itself. // // Arguments: // ordinalIndexState - the state of the ordinal index of the merged partitions // void IMergeHelper .Execute() { if (m_asyncChannels != null) { SpoolingTask.SpoolPipeline (m_taskGroupState, m_partitions, m_asyncChannels, m_taskScheduler); } else if (m_syncChannels != null) { SpoolingTask.SpoolStopAndGo (m_taskGroupState, m_partitions, m_syncChannels, m_taskScheduler); } else if (m_ignoreOutput) { SpoolingTask.SpoolForAll (m_taskGroupState, m_partitions, m_taskScheduler); } else { // The last case is a pipelining merge when DOP = 1. In this case, the consumer thread itself will compute the results, // so we don't need any tasks to compute the results asynchronously. Contract.Assert(m_partitions.PartitionCount == 1); } } //----------------------------------------------------------------------------------- // Gets the enumerator from which to enumerate output results. // IEnumerator IMergeHelper .GetEnumerator() { Contract.Assert(m_ignoreOutput || m_channelEnumerator != null); return m_channelEnumerator; } //----------------------------------------------------------------------------------- // Returns the results as an array. // // @ public TInputOutput[] GetResultsAsArray() { if (m_syncChannels != null) { // Right size an array. int totalSize = 0; for (int i = 0; i < m_syncChannels.Length; i++) { totalSize += m_syncChannels[i].Count; } TInputOutput[] array = new TInputOutput[totalSize]; // And then blit the elements in. int current = 0; for (int i = 0; i < m_syncChannels.Length; i++) { m_syncChannels[i].CopyTo(array, current); current += m_syncChannels[i].Count; } return array; } else { List output = new List (); using (IEnumerator enumerator = ((IMergeHelper )this).GetEnumerator()) { while (enumerator.MoveNext()) { output.Add(enumerator.Current); } } return output.ToArray(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- CurrentChangingEventManager.cs
- Transform.cs
- FilePrompt.cs
- SerializationFieldInfo.cs
- EventMappingSettingsCollection.cs
- SAPICategories.cs
- SystemFonts.cs
- FigureParaClient.cs
- FlowSwitch.cs
- AddInSegmentDirectoryNotFoundException.cs
- StringDictionaryCodeDomSerializer.cs
- BaseProcessor.cs
- DataGridViewColumnHeaderCell.cs
- Animatable.cs
- Process.cs
- DrawingContextWalker.cs
- GraphicsPath.cs
- Selection.cs
- ServiceDesigner.cs
- CustomAttributeSerializer.cs
- FixedDocumentSequencePaginator.cs
- ButtonFieldBase.cs
- EventDescriptor.cs
- FloaterParagraph.cs
- WebExceptionStatus.cs
- WinEventQueueItem.cs
- TableCell.cs
- IISUnsafeMethods.cs
- EntityAdapter.cs
- BindingManagerDataErrorEventArgs.cs
- DataControlFieldCollection.cs
- DataGridViewLinkColumn.cs
- CompilationUnit.cs
- Ipv6Element.cs
- AssociatedControlConverter.cs
- ServicePointManagerElement.cs
- SecurityHelper.cs
- DbConnectionStringBuilder.cs
- XPathBinder.cs
- _UriSyntax.cs
- CqlIdentifiers.cs
- ServiceModelTimeSpanValidator.cs
- FrameworkElementFactoryMarkupObject.cs
- WebBrowserEvent.cs
- DrawingCollection.cs
- DataControlFieldCollection.cs
- D3DImage.cs
- Rectangle.cs
- ComNativeDescriptor.cs
- TrustManager.cs
- BindingExpressionUncommonField.cs
- ValidatorCompatibilityHelper.cs
- TraceContextRecord.cs
- AppSettingsExpressionBuilder.cs
- AutoCompleteStringCollection.cs
- ExtensionSimplifierMarkupObject.cs
- NumericPagerField.cs
- DbDeleteCommandTree.cs
- SnapshotChangeTrackingStrategy.cs
- ColorDialog.cs
- CodeBinaryOperatorExpression.cs
- RelationshipManager.cs
- AvtEvent.cs
- IIS7UserPrincipal.cs
- GuidConverter.cs
- WebPartUserCapability.cs
- XmlDataFileEditor.cs
- GeneralTransform3DTo2D.cs
- ExpandedProjectionNode.cs
- InfocardExtendedInformationEntry.cs
- ToolStripItemEventArgs.cs
- SchemaImporterExtensionElement.cs
- bidPrivateBase.cs
- CodeNamespaceImportCollection.cs
- MimeWriter.cs
- RectConverter.cs
- oledbmetadatacollectionnames.cs
- OdbcPermission.cs
- TableLayoutRowStyleCollection.cs
- ExpressionList.cs
- StartFileNameEditor.cs
- DataGridViewColumnStateChangedEventArgs.cs
- Line.cs
- D3DImage.cs
- Win32SafeHandles.cs
- TemplateField.cs
- Variable.cs
- HitTestWithPointDrawingContextWalker.cs
- Visual3D.cs
- HtmlWindow.cs
- WorkflowDesigner.cs
- HwndSubclass.cs
- IISMapPath.cs
- SchemaMerger.cs
- ISFClipboardData.cs
- UnauthorizedWebPart.cs
- ResponseBodyWriter.cs
- SHA1.cs
- ErrorFormatterPage.cs
- RegexRunnerFactory.cs