Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Merging / MergeExecutor.cs / 1305376 / MergeExecutor.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // MergeExecutor.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// Drives execution of an actual merge operation, including creating channel data /// structures and scheduling parallel work as appropriate. The algorithms used /// internally are parameterized based on the type of data in the partitions; e.g. /// if an order preserved stream is found, the merge will automatically use an /// order preserving merge, and so forth. /// ///internal class MergeExecutor : IEnumerable { // Many internal algorithms are parameterized based on the data. The IMergeHelper // is the pluggable interface whose implementations perform those algorithms. private IMergeHelper m_mergeHelper; // Private constructor. MergeExecutor should only be constructed via the // MergeExecutor.Execute static method. private MergeExecutor() { } //------------------------------------------------------------------------------------ // Creates and executes a new merge executor object. // // Arguments: // partitions - the partitions whose data will be merged into one stream // ignoreOutput - if true, we are enumerating "for effect", and we won't actually // generate data in the output stream // pipeline - whether to use a pipelined merge or not. // isOrdered - whether to perform an ordering merge. // internal static MergeExecutor Execute ( PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, bool isOrdered, CancellationState cancellationState, int queryId) { Contract.Assert(partitions != null); Contract.Assert(partitions.PartitionCount > 0); Contract.Assert(!ignoreOutput || options == ParallelMergeOptions.FullyBuffered, "@BUGBUG: pipelining w/ no output not supported -- need it?"); MergeExecutor mergeExecutor = new MergeExecutor (); if (isOrdered && !ignoreOutput) { if (options != ParallelMergeOptions.FullyBuffered && !partitions.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing)) { Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered); bool autoBuffered = (options == ParallelMergeOptions.AutoBuffered); if (partitions.PartitionCount > 1) { // We use a pipelining ordered merge mergeExecutor.m_mergeHelper = new OrderPreservingPipeliningMergeHelper ( (PartitionedStream )(object)partitions, taskScheduler, cancellationState, autoBuffered, queryId); } else { // When DOP=1, the default merge simply returns the single producer enumerator to the consumer. This way, ordering // does not add any extra overhead, and no producer task needs to be scheduled. mergeExecutor.m_mergeHelper = new DefaultMergeHelper ( partitions, false, options, taskScheduler, cancellationState, queryId); } } else { // We use a stop-and-go ordered merge helper mergeExecutor.m_mergeHelper = new OrderPreservingMergeHelper (partitions, taskScheduler, cancellationState, queryId); } } else { // We use a default - unordered - merge helper. mergeExecutor.m_mergeHelper = new DefaultMergeHelper (partitions, ignoreOutput, options, taskScheduler, cancellationState, queryId); } mergeExecutor.Execute(); return mergeExecutor; } //----------------------------------------------------------------------------------- // Initiates execution of the merge. // private void Execute() { Contract.Assert(m_mergeHelper != null); m_mergeHelper.Execute(); } //----------------------------------------------------------------------------------- // Returns an enumerator that will yield elements from the resulting merged data // stream. // IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable )this).GetEnumerator(); } public IEnumerator GetEnumerator() { Contract.Assert(m_mergeHelper != null); return m_mergeHelper.GetEnumerator(); } //----------------------------------------------------------------------------------- // Returns the merged results as an array. // internal TInputOutput[] GetResultsAsArray() { return m_mergeHelper.GetResultsAsArray(); } //------------------------------------------------------------------------------------ // This internal helper method is used to generate a set of asynchronous channels. // The algorithm used by each channel contains the necessary synchronizationis to // ensure it is suitable for pipelined consumption. // // Arguments: // partitionsCount - the number of partitions for which to create new channels. // // Return Value: // An array of asynchronous channels, one for each partition. // internal static AsynchronousChannel [] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, CancellationToken cancellationToken) { AsynchronousChannel [] channels = new AsynchronousChannel [partitionCount]; Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered); TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} async channels in prep for pipeline", partitionCount); // If we are pipelining, we need a channel that contains the necessary synchronization // in it. We choose a bounded/blocking channel data structure: bounded so that we can // limit the amount of memory overhead used by the query by putting a cap on the // buffer size into which producers place data, and blocking so that the consumer can // wait for additional data to arrive in the case that it's found to be empty. int chunkSize = 0; // 0 means automatic chunk size if (options == ParallelMergeOptions.NotBuffered) { chunkSize = 1; } for (int i = 0; i < channels.Length; i++) { channels[i] = new AsynchronousChannel (chunkSize, cancellationToken); } return channels; } //----------------------------------------------------------------------------------- // This internal helper method is used to generate a set of synchronous channels. // The channel data structure used has been optimized for sequential execution and // does not support pipelining. // // Arguments: // partitionsCount - the number of partitions for which to create new channels. // // Return Value: // An array of synchronous channels, one for each partition. // internal static SynchronousChannel [] MakeSynchronousChannels(int partitionCount) { SynchronousChannel [] channels = new SynchronousChannel [partitionCount]; TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} channels in prep for stop-and-go", partitionCount); // We just build up the results in memory using simple, dynamically growable FIFO queues. for (int i = 0; i < channels.Length; i++) { channels[i] = new SynchronousChannel (); } return channels; } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // MergeExecutor.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// Drives execution of an actual merge operation, including creating channel data /// structures and scheduling parallel work as appropriate. The algorithms used /// internally are parameterized based on the type of data in the partitions; e.g. /// if an order preserved stream is found, the merge will automatically use an /// order preserving merge, and so forth. /// ///internal class MergeExecutor : IEnumerable { // Many internal algorithms are parameterized based on the data. The IMergeHelper // is the pluggable interface whose implementations perform those algorithms. private IMergeHelper m_mergeHelper; // Private constructor. MergeExecutor should only be constructed via the // MergeExecutor.Execute static method. private MergeExecutor() { } //------------------------------------------------------------------------------------ // Creates and executes a new merge executor object. // // Arguments: // partitions - the partitions whose data will be merged into one stream // ignoreOutput - if true, we are enumerating "for effect", and we won't actually // generate data in the output stream // pipeline - whether to use a pipelined merge or not. // isOrdered - whether to perform an ordering merge. // internal static MergeExecutor Execute ( PartitionedStream partitions, bool ignoreOutput, ParallelMergeOptions options, TaskScheduler taskScheduler, bool isOrdered, CancellationState cancellationState, int queryId) { Contract.Assert(partitions != null); Contract.Assert(partitions.PartitionCount > 0); Contract.Assert(!ignoreOutput || options == ParallelMergeOptions.FullyBuffered, "@BUGBUG: pipelining w/ no output not supported -- need it?"); MergeExecutor mergeExecutor = new MergeExecutor (); if (isOrdered && !ignoreOutput) { if (options != ParallelMergeOptions.FullyBuffered && !partitions.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing)) { Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered); bool autoBuffered = (options == ParallelMergeOptions.AutoBuffered); if (partitions.PartitionCount > 1) { // We use a pipelining ordered merge mergeExecutor.m_mergeHelper = new OrderPreservingPipeliningMergeHelper ( (PartitionedStream )(object)partitions, taskScheduler, cancellationState, autoBuffered, queryId); } else { // When DOP=1, the default merge simply returns the single producer enumerator to the consumer. This way, ordering // does not add any extra overhead, and no producer task needs to be scheduled. mergeExecutor.m_mergeHelper = new DefaultMergeHelper ( partitions, false, options, taskScheduler, cancellationState, queryId); } } else { // We use a stop-and-go ordered merge helper mergeExecutor.m_mergeHelper = new OrderPreservingMergeHelper (partitions, taskScheduler, cancellationState, queryId); } } else { // We use a default - unordered - merge helper. mergeExecutor.m_mergeHelper = new DefaultMergeHelper (partitions, ignoreOutput, options, taskScheduler, cancellationState, queryId); } mergeExecutor.Execute(); return mergeExecutor; } //----------------------------------------------------------------------------------- // Initiates execution of the merge. // private void Execute() { Contract.Assert(m_mergeHelper != null); m_mergeHelper.Execute(); } //----------------------------------------------------------------------------------- // Returns an enumerator that will yield elements from the resulting merged data // stream. // IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable )this).GetEnumerator(); } public IEnumerator GetEnumerator() { Contract.Assert(m_mergeHelper != null); return m_mergeHelper.GetEnumerator(); } //----------------------------------------------------------------------------------- // Returns the merged results as an array. // internal TInputOutput[] GetResultsAsArray() { return m_mergeHelper.GetResultsAsArray(); } //------------------------------------------------------------------------------------ // This internal helper method is used to generate a set of asynchronous channels. // The algorithm used by each channel contains the necessary synchronizationis to // ensure it is suitable for pipelined consumption. // // Arguments: // partitionsCount - the number of partitions for which to create new channels. // // Return Value: // An array of asynchronous channels, one for each partition. // internal static AsynchronousChannel [] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, CancellationToken cancellationToken) { AsynchronousChannel [] channels = new AsynchronousChannel [partitionCount]; Contract.Assert(options == ParallelMergeOptions.NotBuffered || options == ParallelMergeOptions.AutoBuffered); TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} async channels in prep for pipeline", partitionCount); // If we are pipelining, we need a channel that contains the necessary synchronization // in it. We choose a bounded/blocking channel data structure: bounded so that we can // limit the amount of memory overhead used by the query by putting a cap on the // buffer size into which producers place data, and blocking so that the consumer can // wait for additional data to arrive in the case that it's found to be empty. int chunkSize = 0; // 0 means automatic chunk size if (options == ParallelMergeOptions.NotBuffered) { chunkSize = 1; } for (int i = 0; i < channels.Length; i++) { channels[i] = new AsynchronousChannel (chunkSize, cancellationToken); } return channels; } //----------------------------------------------------------------------------------- // This internal helper method is used to generate a set of synchronous channels. // The channel data structure used has been optimized for sequential execution and // does not support pipelining. // // Arguments: // partitionsCount - the number of partitions for which to create new channels. // // Return Value: // An array of synchronous channels, one for each partition. // internal static SynchronousChannel [] MakeSynchronousChannels(int partitionCount) { SynchronousChannel [] channels = new SynchronousChannel [partitionCount]; TraceHelpers.TraceInfo("MergeExecutor::MakeChannels: setting up {0} channels in prep for stop-and-go", partitionCount); // We just build up the results in memory using simple, dynamically growable FIFO queues. for (int i = 0; i < channels.Length; i++) { channels[i] = new SynchronousChannel (); } return channels; } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- XamlNamespaceHelper.cs
- DataGridViewSortCompareEventArgs.cs
- AccessViolationException.cs
- FixedSOMTable.cs
- Wrapper.cs
- __Filters.cs
- SaveFileDialog.cs
- BinHexDecoder.cs
- DataGrid.cs
- DefaultHttpHandler.cs
- SmtpFailedRecipientException.cs
- UrlPath.cs
- EntityStoreSchemaFilterEntry.cs
- NavigationHelper.cs
- StateChangeEvent.cs
- TrackingProfileManager.cs
- Animatable.cs
- EmptyQuery.cs
- DebugInfoGenerator.cs
- X509UI.cs
- GregorianCalendar.cs
- XPathDescendantIterator.cs
- SqlVisitor.cs
- PropertyGridEditorPart.cs
- CompoundFileDeflateTransform.cs
- TreeViewImageKeyConverter.cs
- TiffBitmapDecoder.cs
- HtmlTable.cs
- FormatterConverter.cs
- EventProxy.cs
- AnnotationDocumentPaginator.cs
- ValidationRuleCollection.cs
- FileAuthorizationModule.cs
- DataGridParentRows.cs
- hresults.cs
- StorageAssociationTypeMapping.cs
- PixelShader.cs
- DecoderReplacementFallback.cs
- OracleCommandSet.cs
- FacetValues.cs
- sqlnorm.cs
- CaseInsensitiveHashCodeProvider.cs
- RuntimeConfigLKG.cs
- HitTestWithPointDrawingContextWalker.cs
- _ListenerRequestStream.cs
- X500Name.cs
- PlaceHolder.cs
- FillBehavior.cs
- ChtmlCommandAdapter.cs
- RangeValidator.cs
- _RegBlobWebProxyDataBuilder.cs
- SmiContextFactory.cs
- MonthCalendar.cs
- WindowsFont.cs
- EndpointAddressElementBase.cs
- NamedPipeAppDomainProtocolHandler.cs
- Misc.cs
- StateChangeEvent.cs
- X509ServiceCertificateAuthentication.cs
- BitmapVisualManager.cs
- SymmetricSecurityProtocol.cs
- HttpResponse.cs
- ScrollableControl.cs
- CFStream.cs
- InternalResources.cs
- MonthCalendar.cs
- IndexingContentUnit.cs
- WmlObjectListAdapter.cs
- AnonymousIdentificationSection.cs
- EmptyStringExpandableObjectConverter.cs
- EventsTab.cs
- ConstraintEnumerator.cs
- XmlDocument.cs
- NavigatorOutput.cs
- XmlSchemas.cs
- ServiceOperationWrapper.cs
- securitymgrsite.cs
- Aggregates.cs
- WSFederationHttpSecurityMode.cs
- ProgressBar.cs
- ObjectConverter.cs
- EntitySetBaseCollection.cs
- WebEvents.cs
- BindingGraph.cs
- StorageEntitySetMapping.cs
- Serializer.cs
- NetworkInformationPermission.cs
- StreamGeometryContext.cs
- SByteConverter.cs
- WindowProviderWrapper.cs
- DeflateEmulationStream.cs
- TemplateBindingExtensionConverter.cs
- XmlStreamStore.cs
- XamlInterfaces.cs
- SudsCommon.cs
- SmiTypedGetterSetter.cs
- Label.cs
- DataObjectSettingDataEventArgs.cs
- Exceptions.cs
- MasterPage.cs