Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / ZipQueryOperator.cs / 1305376 / ZipQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ZipQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// A Zip operator combines two input data sources into a single output stream, /// using a pairwise element matching algorithm. For example, the result of zipping /// two vectors a = {0, 1, 2, 3} and b = {9, 8, 7, 6} is the vector of pairs, /// c = {(0,9), (1,8), (2,7), (3,6)}. Because the expectation is that each element /// is matched with the element in the other data source at the same ordinal /// position, the zip operator requires order preservation. /// ////// /// internal sealed class ZipQueryOperator : QueryOperator { private readonly Func m_resultSelector; // To select result elements. private readonly QueryOperator m_leftChild; private readonly QueryOperator m_rightChild; private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source //---------------------------------------------------------------------------------------- // Initializes a new zip operator. // // Arguments: // leftChild - the left data source from which to pull data. // rightChild - the right data source from which to pull data. // internal ZipQueryOperator( ParallelQuery leftChildSource, IEnumerable rightChildSource, Func resultSelector) :this( QueryOperator .AsQueryOperator(leftChildSource), QueryOperator .AsQueryOperator(rightChildSource), resultSelector) { } private ZipQueryOperator( QueryOperator left, QueryOperator right, Func resultSelector) : base(left.SpecifiedQuerySettings.Merge(right.SpecifiedQuerySettings)) { Contract.Assert(resultSelector != null, "operator cannot be null"); m_leftChild = left; m_rightChild = right; m_resultSelector = resultSelector; m_outputOrdered = m_leftChild.OutputOrdered || m_rightChild.OutputOrdered; m_prematureMergeLeft = m_leftChild.OrdinalIndexState != OrdinalIndexState.Indexible; m_prematureMergeRight = m_rightChild.OrdinalIndexState != OrdinalIndexState.Indexible; } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the children and wrapping them with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operators, left and then right. QueryResults leftChildResults = m_leftChild.Open(settings, preferStriping); QueryResults rightChildResults = m_rightChild.Open(settings, preferStriping); int partitionCount = settings.DegreeOfParallelism.Value; if (m_prematureMergeLeft) { PartitionedStreamMerger merger = new PartitionedStreamMerger ( false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_leftChild.OutputOrdered, settings.CancellationState, settings.QueryId); leftChildResults.GivePartitionedStream(merger); leftChildResults = new ListQueryResults ( merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping); } if (m_prematureMergeRight) { PartitionedStreamMerger merger = new PartitionedStreamMerger ( false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_rightChild.OutputOrdered, settings.CancellationState, settings.QueryId); rightChildResults.GivePartitionedStream(merger); rightChildResults = new ListQueryResults ( merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping); } return new ZipQueryOperatorResults(leftChildResults, rightChildResults, m_resultSelector, partitionCount, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { using(IEnumerator leftEnumerator = m_leftChild.AsSequentialQuery(token).GetEnumerator()) using(IEnumerator rightEnumerator = m_rightChild.AsSequentialQuery(token).GetEnumerator()) { while(leftEnumerator.MoveNext() && rightEnumerator.MoveNext()) { yield return m_resultSelector(leftEnumerator.Current, rightEnumerator.Current); } } } //--------------------------------------------------------------------------------------- // The state of the order index of the results returned by this operator. // internal override OrdinalIndexState OrdinalIndexState { get { return OrdinalIndexState.Indexible; } } //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return m_prematureMergeLeft || m_prematureMergeRight; } } //--------------------------------------------------------------------------------------- // A special QueryResults class for the Zip operator. It requires that both of the child // QueryResults are indexible. // internal class ZipQueryOperatorResults : QueryResults { private readonly QueryResults m_leftChildResults; private readonly QueryResults m_rightChildResults; private readonly Func m_resultSelector; // To select result elements. private readonly int m_count; private readonly int m_partitionCount; private readonly bool m_preferStriping; internal ZipQueryOperatorResults( QueryResults leftChildResults, QueryResults rightChildResults, Func resultSelector, int partitionCount, bool preferStriping) { m_leftChildResults = leftChildResults; m_rightChildResults = rightChildResults; m_resultSelector = resultSelector; m_partitionCount = partitionCount; m_preferStriping = preferStriping; Contract.Assert(m_leftChildResults.IsIndexible); Contract.Assert(m_rightChildResults.IsIndexible); m_count = Math.Min(m_leftChildResults.Count, m_rightChildResults.Count); } internal override int ElementsCount { get { return m_count; } } internal override bool IsIndexible { get { return true; } } internal override TOutput GetElement(int index) { return m_resultSelector(m_leftChildResults.GetElement(index), m_rightChildResults.GetElement(index)); } internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient) { PartitionedStream partitionedStream = ExchangeUtilities.PartitionDataSource(this, m_partitionCount, m_preferStriping); recipient.Receive(partitionedStream); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // ZipQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// A Zip operator combines two input data sources into a single output stream, /// using a pairwise element matching algorithm. For example, the result of zipping /// two vectors a = {0, 1, 2, 3} and b = {9, 8, 7, 6} is the vector of pairs, /// c = {(0,9), (1,8), (2,7), (3,6)}. Because the expectation is that each element /// is matched with the element in the other data source at the same ordinal /// position, the zip operator requires order preservation. /// ////// /// internal sealed class ZipQueryOperator : QueryOperator { private readonly Func m_resultSelector; // To select result elements. private readonly QueryOperator m_leftChild; private readonly QueryOperator m_rightChild; private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source //---------------------------------------------------------------------------------------- // Initializes a new zip operator. // // Arguments: // leftChild - the left data source from which to pull data. // rightChild - the right data source from which to pull data. // internal ZipQueryOperator( ParallelQuery leftChildSource, IEnumerable rightChildSource, Func resultSelector) :this( QueryOperator .AsQueryOperator(leftChildSource), QueryOperator .AsQueryOperator(rightChildSource), resultSelector) { } private ZipQueryOperator( QueryOperator left, QueryOperator right, Func resultSelector) : base(left.SpecifiedQuerySettings.Merge(right.SpecifiedQuerySettings)) { Contract.Assert(resultSelector != null, "operator cannot be null"); m_leftChild = left; m_rightChild = right; m_resultSelector = resultSelector; m_outputOrdered = m_leftChild.OutputOrdered || m_rightChild.OutputOrdered; m_prematureMergeLeft = m_leftChild.OrdinalIndexState != OrdinalIndexState.Indexible; m_prematureMergeRight = m_rightChild.OrdinalIndexState != OrdinalIndexState.Indexible; } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the children and wrapping them with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operators, left and then right. QueryResults leftChildResults = m_leftChild.Open(settings, preferStriping); QueryResults rightChildResults = m_rightChild.Open(settings, preferStriping); int partitionCount = settings.DegreeOfParallelism.Value; if (m_prematureMergeLeft) { PartitionedStreamMerger merger = new PartitionedStreamMerger ( false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_leftChild.OutputOrdered, settings.CancellationState, settings.QueryId); leftChildResults.GivePartitionedStream(merger); leftChildResults = new ListQueryResults ( merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping); } if (m_prematureMergeRight) { PartitionedStreamMerger merger = new PartitionedStreamMerger ( false, ParallelMergeOptions.FullyBuffered, settings.TaskScheduler, m_rightChild.OutputOrdered, settings.CancellationState, settings.QueryId); rightChildResults.GivePartitionedStream(merger); rightChildResults = new ListQueryResults ( merger.MergeExecutor.GetResultsAsArray(), partitionCount, preferStriping); } return new ZipQueryOperatorResults(leftChildResults, rightChildResults, m_resultSelector, partitionCount, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { using(IEnumerator leftEnumerator = m_leftChild.AsSequentialQuery(token).GetEnumerator()) using(IEnumerator rightEnumerator = m_rightChild.AsSequentialQuery(token).GetEnumerator()) { while(leftEnumerator.MoveNext() && rightEnumerator.MoveNext()) { yield return m_resultSelector(leftEnumerator.Current, rightEnumerator.Current); } } } //--------------------------------------------------------------------------------------- // The state of the order index of the results returned by this operator. // internal override OrdinalIndexState OrdinalIndexState { get { return OrdinalIndexState.Indexible; } } //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return m_prematureMergeLeft || m_prematureMergeRight; } } //--------------------------------------------------------------------------------------- // A special QueryResults class for the Zip operator. It requires that both of the child // QueryResults are indexible. // internal class ZipQueryOperatorResults : QueryResults { private readonly QueryResults m_leftChildResults; private readonly QueryResults m_rightChildResults; private readonly Func m_resultSelector; // To select result elements. private readonly int m_count; private readonly int m_partitionCount; private readonly bool m_preferStriping; internal ZipQueryOperatorResults( QueryResults leftChildResults, QueryResults rightChildResults, Func resultSelector, int partitionCount, bool preferStriping) { m_leftChildResults = leftChildResults; m_rightChildResults = rightChildResults; m_resultSelector = resultSelector; m_partitionCount = partitionCount; m_preferStriping = preferStriping; Contract.Assert(m_leftChildResults.IsIndexible); Contract.Assert(m_rightChildResults.IsIndexible); m_count = Math.Min(m_leftChildResults.Count, m_rightChildResults.Count); } internal override int ElementsCount { get { return m_count; } } internal override bool IsIndexible { get { return true; } } internal override TOutput GetElement(int index) { return m_resultSelector(m_leftChildResults.GetElement(index), m_rightChildResults.GetElement(index)); } internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient) { PartitionedStream partitionedStream = ExchangeUtilities.PartitionDataSource(this, m_partitionCount, m_preferStriping); recipient.Receive(partitionedStream); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- TdsParserStaticMethods.cs
- Asn1Utilities.cs
- SafeRightsManagementSessionHandle.cs
- DefaultBinder.cs
- HttpChannelHelpers.cs
- UInt64Storage.cs
- RegexInterpreter.cs
- TemplateControlParser.cs
- ItemChangedEventArgs.cs
- Utils.cs
- XamlInt32CollectionSerializer.cs
- MimeMultiPart.cs
- UserControlParser.cs
- TypeConverterHelper.cs
- Timer.cs
- SqlDataSourceStatusEventArgs.cs
- XhtmlConformanceSection.cs
- SemanticResolver.cs
- RIPEMD160Managed.cs
- ProxyWebPart.cs
- Pkcs9Attribute.cs
- WebPartMenuStyle.cs
- AttachInfo.cs
- GeometryHitTestParameters.cs
- SystemIcons.cs
- ComponentResourceKey.cs
- MasterPageBuildProvider.cs
- NetworkInterface.cs
- OletxDependentTransaction.cs
- DataServiceConfiguration.cs
- RegexCompiler.cs
- PerformanceCounterLib.cs
- DataGridCommandEventArgs.cs
- DescendantBaseQuery.cs
- ReadWriteObjectLock.cs
- StatusBarPanelClickEvent.cs
- ArglessEventHandlerProxy.cs
- GenericWebPart.cs
- MatrixUtil.cs
- InputManager.cs
- CqlWriter.cs
- Point3DConverter.cs
- ChannelSinkStacks.cs
- WebPartCancelEventArgs.cs
- FormattedTextSymbols.cs
- WebPartVerbCollection.cs
- GlobalAllocSafeHandle.cs
- SystemIPInterfaceProperties.cs
- String.cs
- XmlArrayItemAttribute.cs
- TiffBitmapDecoder.cs
- DownloadProgressEventArgs.cs
- ComponentEvent.cs
- PassportAuthenticationModule.cs
- KeyValueSerializer.cs
- ArrowControl.xaml.cs
- NegotiateStream.cs
- DesignOnlyAttribute.cs
- ProfileModule.cs
- DeploymentSectionCache.cs
- EdmTypeAttribute.cs
- ColorConverter.cs
- ObjectFactoryCodeDomTreeGenerator.cs
- RouteParser.cs
- DockPatternIdentifiers.cs
- LogStream.cs
- Stylesheet.cs
- TextEditorTables.cs
- EnumerationRangeValidationUtil.cs
- PeerOutputChannel.cs
- CatalogPartCollection.cs
- CultureSpecificStringDictionary.cs
- TextRange.cs
- ReadOnlyKeyedCollection.cs
- FixedTextPointer.cs
- ResourceWriter.cs
- NativeMethods.cs
- ServiceModelEnumValidator.cs
- ISAPIApplicationHost.cs
- XamlTypeMapper.cs
- ExpressionBuilder.cs
- GetWinFXPath.cs
- HttpModulesSection.cs
- SqlCacheDependencyDatabaseCollection.cs
- DummyDataSource.cs
- ColorBlend.cs
- DispatcherSynchronizationContext.cs
- ObjectViewListener.cs
- DocumentViewerHelper.cs
- MenuStrip.cs
- XamlWriter.cs
- ValueType.cs
- _SecureChannel.cs
- LoggedException.cs
- KeyPressEvent.cs
- WorkflowRuntimeSection.cs
- BufferAllocator.cs
- DataFieldCollectionEditor.cs
- AppSettingsExpressionEditor.cs
- invalidudtexception.cs