Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SelectManyQueryOperator.cs / 1305376 / SelectManyQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SelectManyQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// SelectMany is effectively a nested loops join. It is given two data sources, an /// outer and an inner -- actually, the inner is sometimes calculated by invoking a /// function for each outer element -- and we walk the outer, walking the entire /// inner enumerator for each outer element. There is an optional result selector /// function which can transform the output before yielding it as a result element. /// /// Notes: /// Although select many takes two enumerable objects as input, it appears to the /// query analysis infrastructure as a unary operator. That's because it works a /// little differently than the other binary operators: it has to re-open the right /// child every time an outer element is walked. The right child is NOT partitioned. /// ////// /// internal sealed class SelectManyQueryOperator : UnaryQueryOperator { private readonly Func > m_rightChildSelector; // To select a new child each iteration. private readonly Func > m_indexedRightChildSelector; // To select a new child each iteration. private readonly Func m_resultSelector; // An optional result selection function. private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. //---------------------------------------------------------------------------------------- // Initializes a new select-many operator. // // Arguments: // leftChild - the left data source from which to pull data. // rightChild - the right data source from which to pull data. // rightChildSelector - if no right data source was supplied, the selector function // will generate a new right child for every unique left element. // resultSelector - a selection function for creating output elements. // internal SelectManyQueryOperator(IEnumerable leftChild, Func > rightChildSelector, Func > indexedRightChildSelector, Func resultSelector) :base(leftChild) { Contract.Assert(leftChild != null, "left child data source cannot be null"); Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null, "either right child data or selector must be supplied"); Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null, "either indexed- or non-indexed child selector must be supplied (not both)"); Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null, "right input and output must be the same types, otherwise the result selector may not be null"); m_rightChildSelector = rightChildSelector; m_indexedRightChildSelector = indexedRightChildSelector; m_resultSelector = resultSelector; // If the SelectMany is indexed, elements must be returned in the order in which // indices were assigned. m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null; InitOrderIndex(); } private void InitOrderIndex() { if (m_indexedRightChildSelector != null) { // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them // into the user delegate. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct); } else { if (OutputOrdered) { // If the output of this SelectMany is ordered, the input keys must be at least increasing. The // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys // are Shuffled, we need to merge prematurely. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Increasing); } } SetOrdinalIndexState(OrdinalIndexState.Shuffled); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; if (m_indexedRightChildSelector != null) { PartitionedStream inputStreamInt; // If the index is not correct, we need to reindex. if (m_prematureMerge) { ListQueryResults listResults = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings); inputStreamInt = listResults.GetPartitionedStream(); } else { inputStreamInt = (PartitionedStream )(object)inputStream; } WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings); return; } // // if (m_prematureMerge) { PartitionedStream inputStreamInt = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings) .GetPartitionedStream(); WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings); } else { WrapPartitionedStreamNotIndexed(inputStream, recipient, settings); } } /// /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise, /// it will be the same type as "TLeftKey" in WrapPartitionedStream.) /// private void WrapPartitionedStreamNotIndexed( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(partitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new SelectManyQueryOperatorEnumerator (inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } /// /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate) /// private void WrapPartitionedStreamIndexed( PartitionedStreaminputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(inputStream.PartitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < inputStream.PartitionCount; i++) { outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the left child and wrapping with a // partition if needed. The right child is not opened yet -- this is always done on demand // as the outer elements are enumerated. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { if (m_rightChildSelector != null) { if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector)); } else { Contract.Assert(m_indexedRightChildSelector != null); if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector)); } } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return m_prematureMerge; } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal int m_currentLeftSourceIndex; // The current key in the left data source. internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class SelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal TLeftKey m_currentLeftKey; // The current key in the left data source. internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DataService.cs
- MemberDomainMap.cs
- UrlPropertyAttribute.cs
- HttpWriter.cs
- ContentFilePart.cs
- ResXResourceWriter.cs
- XPathNode.cs
- ScriptingWebServicesSectionGroup.cs
- ThreadStaticAttribute.cs
- SendKeys.cs
- serverconfig.cs
- ListViewItemEventArgs.cs
- RotateTransform3D.cs
- WebPartHelpVerb.cs
- SiblingIterators.cs
- XMLDiffLoader.cs
- MetadataElement.cs
- Mouse.cs
- NetworkAddressChange.cs
- ActiveDocumentEvent.cs
- _ContextAwareResult.cs
- MembershipAdapter.cs
- UInt16Storage.cs
- RIPEMD160Managed.cs
- InputMethod.cs
- DriveNotFoundException.cs
- PrimitiveType.cs
- BindingMemberInfo.cs
- MaterialGroup.cs
- XmlSchemaFacet.cs
- securitycriticaldataClass.cs
- IPAddress.cs
- SafeRightsManagementHandle.cs
- StreamInfo.cs
- SuspendDesigner.cs
- PrimitiveXmlSerializers.cs
- GridViewRowPresenterBase.cs
- PrintSchema.cs
- SqlMethods.cs
- BasicCellRelation.cs
- SingleAnimationUsingKeyFrames.cs
- InternalConfigRoot.cs
- AnnotationHelper.cs
- DLinqTableProvider.cs
- OdbcError.cs
- AccessDataSourceDesigner.cs
- XmlArrayAttribute.cs
- SqlCaseSimplifier.cs
- SqlNotificationEventArgs.cs
- QilValidationVisitor.cs
- HttpChannelBindingToken.cs
- Point3DAnimationBase.cs
- SqlDataSource.cs
- MDIClient.cs
- DelayLoadType.cs
- ListDictionary.cs
- WizardPanel.cs
- LeafCellTreeNode.cs
- ListViewTableRow.cs
- ThreadAttributes.cs
- WebPartCatalogCloseVerb.cs
- COAUTHIDENTITY.cs
- ConfigXmlComment.cs
- ListBase.cs
- CellNormalizer.cs
- FontEmbeddingManager.cs
- UserPersonalizationStateInfo.cs
- SqlConnection.cs
- ValidatedMobileControlConverter.cs
- ToolStripSeparator.cs
- ToolBarButton.cs
- TraceLevelStore.cs
- DataServices.cs
- SimpleMailWebEventProvider.cs
- MergablePropertyAttribute.cs
- BufferedReadStream.cs
- CustomErrorsSection.cs
- RoleGroup.cs
- SpellCheck.cs
- BitArray.cs
- WebControlsSection.cs
- TextViewBase.cs
- _NativeSSPI.cs
- InvokeHandlers.cs
- BinaryFormatterSinks.cs
- TypeLoadException.cs
- login.cs
- UdpReplyToBehavior.cs
- TypeContext.cs
- EntityChangedParams.cs
- SynchronizedCollection.cs
- EndEvent.cs
- autovalidator.cs
- FixedTextBuilder.cs
- SendKeys.cs
- Int32RectConverter.cs
- EventMappingSettingsCollection.cs
- TypeBrowserDialog.cs
- HttpApplication.cs
- Matrix3DConverter.cs