Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SelectManyQueryOperator.cs / 1305376 / SelectManyQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SelectManyQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// SelectMany is effectively a nested loops join. It is given two data sources, an /// outer and an inner -- actually, the inner is sometimes calculated by invoking a /// function for each outer element -- and we walk the outer, walking the entire /// inner enumerator for each outer element. There is an optional result selector /// function which can transform the output before yielding it as a result element. /// /// Notes: /// Although select many takes two enumerable objects as input, it appears to the /// query analysis infrastructure as a unary operator. That's because it works a /// little differently than the other binary operators: it has to re-open the right /// child every time an outer element is walked. The right child is NOT partitioned. /// ////// /// internal sealed class SelectManyQueryOperator : UnaryQueryOperator { private readonly Func > m_rightChildSelector; // To select a new child each iteration. private readonly Func > m_indexedRightChildSelector; // To select a new child each iteration. private readonly Func m_resultSelector; // An optional result selection function. private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. //---------------------------------------------------------------------------------------- // Initializes a new select-many operator. // // Arguments: // leftChild - the left data source from which to pull data. // rightChild - the right data source from which to pull data. // rightChildSelector - if no right data source was supplied, the selector function // will generate a new right child for every unique left element. // resultSelector - a selection function for creating output elements. // internal SelectManyQueryOperator(IEnumerable leftChild, Func > rightChildSelector, Func > indexedRightChildSelector, Func resultSelector) :base(leftChild) { Contract.Assert(leftChild != null, "left child data source cannot be null"); Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null, "either right child data or selector must be supplied"); Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null, "either indexed- or non-indexed child selector must be supplied (not both)"); Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null, "right input and output must be the same types, otherwise the result selector may not be null"); m_rightChildSelector = rightChildSelector; m_indexedRightChildSelector = indexedRightChildSelector; m_resultSelector = resultSelector; // If the SelectMany is indexed, elements must be returned in the order in which // indices were assigned. m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null; InitOrderIndex(); } private void InitOrderIndex() { if (m_indexedRightChildSelector != null) { // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them // into the user delegate. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct); } else { if (OutputOrdered) { // If the output of this SelectMany is ordered, the input keys must be at least increasing. The // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys // are Shuffled, we need to merge prematurely. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Increasing); } } SetOrdinalIndexState(OrdinalIndexState.Shuffled); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; if (m_indexedRightChildSelector != null) { PartitionedStream inputStreamInt; // If the index is not correct, we need to reindex. if (m_prematureMerge) { ListQueryResults listResults = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings); inputStreamInt = listResults.GetPartitionedStream(); } else { inputStreamInt = (PartitionedStream )(object)inputStream; } WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings); return; } // // if (m_prematureMerge) { PartitionedStream inputStreamInt = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings) .GetPartitionedStream(); WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings); } else { WrapPartitionedStreamNotIndexed(inputStream, recipient, settings); } } /// /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise, /// it will be the same type as "TLeftKey" in WrapPartitionedStream.) /// private void WrapPartitionedStreamNotIndexed( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(partitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new SelectManyQueryOperatorEnumerator (inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } /// /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate) /// private void WrapPartitionedStreamIndexed( PartitionedStreaminputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(inputStream.PartitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < inputStream.PartitionCount; i++) { outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the left child and wrapping with a // partition if needed. The right child is not opened yet -- this is always done on demand // as the outer elements are enumerated. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { if (m_rightChildSelector != null) { if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector)); } else { Contract.Assert(m_indexedRightChildSelector != null); if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector)); } } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return m_prematureMerge; } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal int m_currentLeftSourceIndex; // The current key in the left data source. internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class SelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal TLeftKey m_currentLeftKey; // The current key in the left data source. internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // SelectManyQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// SelectMany is effectively a nested loops join. It is given two data sources, an /// outer and an inner -- actually, the inner is sometimes calculated by invoking a /// function for each outer element -- and we walk the outer, walking the entire /// inner enumerator for each outer element. There is an optional result selector /// function which can transform the output before yielding it as a result element. /// /// Notes: /// Although select many takes two enumerable objects as input, it appears to the /// query analysis infrastructure as a unary operator. That's because it works a /// little differently than the other binary operators: it has to re-open the right /// child every time an outer element is walked. The right child is NOT partitioned. /// ////// /// internal sealed class SelectManyQueryOperator : UnaryQueryOperator { private readonly Func > m_rightChildSelector; // To select a new child each iteration. private readonly Func > m_indexedRightChildSelector; // To select a new child each iteration. private readonly Func m_resultSelector; // An optional result selection function. private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. //---------------------------------------------------------------------------------------- // Initializes a new select-many operator. // // Arguments: // leftChild - the left data source from which to pull data. // rightChild - the right data source from which to pull data. // rightChildSelector - if no right data source was supplied, the selector function // will generate a new right child for every unique left element. // resultSelector - a selection function for creating output elements. // internal SelectManyQueryOperator(IEnumerable leftChild, Func > rightChildSelector, Func > indexedRightChildSelector, Func resultSelector) :base(leftChild) { Contract.Assert(leftChild != null, "left child data source cannot be null"); Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null, "either right child data or selector must be supplied"); Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null, "either indexed- or non-indexed child selector must be supplied (not both)"); Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null, "right input and output must be the same types, otherwise the result selector may not be null"); m_rightChildSelector = rightChildSelector; m_indexedRightChildSelector = indexedRightChildSelector; m_resultSelector = resultSelector; // If the SelectMany is indexed, elements must be returned in the order in which // indices were assigned. m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null; InitOrderIndex(); } private void InitOrderIndex() { if (m_indexedRightChildSelector != null) { // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them // into the user delegate. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct); } else { if (OutputOrdered) { // If the output of this SelectMany is ordered, the input keys must be at least increasing. The // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys // are Shuffled, we need to merge prematurely. m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Increasing); } } SetOrdinalIndexState(OrdinalIndexState.Shuffled); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; if (m_indexedRightChildSelector != null) { PartitionedStream inputStreamInt; // If the index is not correct, we need to reindex. if (m_prematureMerge) { ListQueryResults listResults = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings); inputStreamInt = listResults.GetPartitionedStream(); } else { inputStreamInt = (PartitionedStream )(object)inputStream; } WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings); return; } // // if (m_prematureMerge) { PartitionedStream inputStreamInt = QueryOperator .ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings) .GetPartitionedStream(); WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings); } else { WrapPartitionedStreamNotIndexed(inputStream, recipient, settings); } } /// /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise, /// it will be the same type as "TLeftKey" in WrapPartitionedStream.) /// private void WrapPartitionedStreamNotIndexed( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(partitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new SelectManyQueryOperatorEnumerator (inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } /// /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate) /// private void WrapPartitionedStreamIndexed( PartitionedStreaminputStream, IPartitionedStreamRecipient recipient, QuerySettings settings) { var keyComparer = new PairComparer (inputStream.KeyComparer, Util.GetDefaultComparer ()); var outputStream = new PartitionedStream >(inputStream.PartitionCount, keyComparer, OrdinalIndexState); for (int i = 0; i < inputStream.PartitionCount; i++) { outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the left child and wrapping with a // partition if needed. The right child is not opened yet -- this is always done on demand // as the outer elements are enumerated. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { if (m_rightChildSelector != null) { if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector)); } else { Contract.Assert(m_indexedRightChildSelector != null); if (m_resultSelector != null) { return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector); } return (IEnumerable )(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector)); } } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return m_prematureMerge; } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal int m_currentLeftSourceIndex; // The current key in the left data source. internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the SelectMany logic. // class SelectManyQueryOperatorEnumerator : QueryOperatorEnumerator > { private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use. private IEnumerator m_currentRightSource; // The current enumerator we're using. private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector). private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] private readonly CancellationToken m_cancellationToken; private class Mutables { internal int m_currentRightSourceIndex = -1; // The index for the right data source. internal TLeftInput m_currentLeftElement; // The current element in the left data source. internal TLeftKey m_currentLeftKey; // The current key in the left data source. internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing. } //--------------------------------------------------------------------------------------- // Instantiates a new select-many enumerator. Notice that the right data source is an // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left // data source. // internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, SelectManyQueryOperator selectManyOperator, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(selectManyOperator != null); m_leftSource = leftSource; m_selectManyOperator = selectManyOperator; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) { while (true) { if (m_currentRightSource == null) { m_mutables = new Mutables(); // Check cancellation every few lhs-enumerations in case none of them are producing // any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks. if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We don't have a "current" right enumerator to use. We have to fetch the next // one. If the left has run out of elements, however, we're done and just return // false right away. if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey)) { return false; } // Use the source selection routine to create a right child. IEnumerable rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement); Contract.Assert(rightChild != null); m_currentRightSource = rightChild.GetEnumerator(); Contract.Assert(m_currentRightSource != null); // If we have no result selector, we will need to access the Current element of the right // data source as though it is a TOutput. Unfortunately, we know that TRightInput must // equal TOutput (we check it during operator construction), but the type system doesn't. // Thus we would have to cast the result of invoking Current from type TRightInput to // TOutput. This is no good, since the results could be value types. Instead, we save the // enumerator object as an IEnumerator and access that later on. if (m_selectManyOperator.m_resultSelector == null) { m_currentRightSourceAsOutput = (IEnumerator )(object)m_currentRightSource; Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, "these must be equal, otherwise the surrounding logic will be broken"); } } if (m_currentRightSource.MoveNext()) { m_mutables.m_currentRightSourceIndex++; // If the inner data source has an element, we can yield it. if (m_selectManyOperator.m_resultSelector != null) { // In the case of a selection function, use that to yield the next element. currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); } else { // Otherwise, the right input and output types must be the same. We use the // casted copy of the current right source and just return its current element. Contract.Assert(m_currentRightSourceAsOutput != null); currentElement = m_currentRightSourceAsOutput.Current; } currentKey = new Pair (m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex); return true; } else { // Otherwise, we have exhausted the right data source. Loop back around and try // to get the next left element, then its right, and so on. m_currentRightSource.Dispose(); m_currentRightSource = null; m_currentRightSourceAsOutput = null; } } } protected override void Dispose(bool disposing) { m_leftSource.Dispose(); if (m_currentRightSource != null) { m_currentRightSource.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- RuleInfoComparer.cs
- PropertyDescriptorComparer.cs
- TemplateContent.cs
- cookiecontainer.cs
- _NestedSingleAsyncResult.cs
- BCryptSafeHandles.cs
- Bitmap.cs
- StrokeRenderer.cs
- SqlFacetAttribute.cs
- ImageSourceValueSerializer.cs
- LambdaCompiler.Lambda.cs
- ResXBuildProvider.cs
- XmlQueryType.cs
- XmlnsDefinitionAttribute.cs
- ContentPlaceHolder.cs
- BamlLocalizableResource.cs
- Char.cs
- DataGridViewCellPaintingEventArgs.cs
- ProfileManager.cs
- ReadOnlyAttribute.cs
- ObjectStorage.cs
- OleCmdHelper.cs
- SQLDecimal.cs
- PlainXmlSerializer.cs
- SignedInfo.cs
- SqlCharStream.cs
- RefExpr.cs
- Column.cs
- ScriptReferenceBase.cs
- EtwTrace.cs
- HitTestParameters3D.cs
- NegatedCellConstant.cs
- TextEditorMouse.cs
- EditorBrowsableAttribute.cs
- MailDefinition.cs
- WebServicesDescriptionAttribute.cs
- ModelTreeEnumerator.cs
- SqlProfileProvider.cs
- DataGridViewCellToolTipTextNeededEventArgs.cs
- ConstructorBuilder.cs
- ValueTable.cs
- CodeGeneratorOptions.cs
- ConnectionInterfaceCollection.cs
- ApplicationId.cs
- TableLayoutPanelDesigner.cs
- TableItemPatternIdentifiers.cs
- TdsParserSessionPool.cs
- DataSourceCacheDurationConverter.cs
- UndoEngine.cs
- CodeFieldReferenceExpression.cs
- InputLanguageProfileNotifySink.cs
- StringCollection.cs
- Frame.cs
- TextEditorThreadLocalStore.cs
- AsyncCodeActivity.cs
- ForwardPositionQuery.cs
- PromptBuilder.cs
- ReadOnlyMetadataCollection.cs
- TypedDataSourceCodeGenerator.cs
- ImportCatalogPart.cs
- StringCollection.cs
- IFormattable.cs
- ActivityDefaults.cs
- NativeWrapper.cs
- HttpRawResponse.cs
- ImageCodecInfo.cs
- AnnotationMap.cs
- InfoCardServiceInstallComponent.cs
- GridEntry.cs
- ExpressionCopier.cs
- SortKey.cs
- XmlNodeReader.cs
- HeaderUtility.cs
- ThemeDictionaryExtension.cs
- oledbconnectionstring.cs
- MenuAdapter.cs
- _FtpDataStream.cs
- Graph.cs
- XmlSchemaAppInfo.cs
- FieldTemplateUserControl.cs
- XpsSerializationManager.cs
- DebuggerAttributes.cs
- StartUpEventArgs.cs
- GridLength.cs
- ExtensionSimplifierMarkupObject.cs
- DesignUtil.cs
- WindowsGraphicsCacheManager.cs
- BulletedListEventArgs.cs
- PrtCap_Public_Simple.cs
- XmlSchemaDatatype.cs
- HtmlInputControl.cs
- SqlUserDefinedTypeAttribute.cs
- AnchorEditor.cs
- SQLResource.cs
- Point4DConverter.cs
- PreviewKeyDownEventArgs.cs
- WebWorkflowRole.cs
- PathTooLongException.cs
- RoleGroupCollection.cs
- ModelItemExtensions.cs