SelectManyQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SelectManyQueryOperator.cs / 1305376 / SelectManyQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// SelectManyQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// SelectMany is effectively a nested loops join. It is given two data sources, an
    /// outer and an inner -- actually, the inner is sometimes calculated by invoking a
    /// function for each outer element -- and we walk the outer, walking the entire 
    /// inner enumerator for each outer element. There is an optional result selector
    /// function which can transform the output before yielding it as a result element. 
    /// 
    /// Notes:
    ///     Although select many takes two enumerable objects as input, it appears to the 
    ///     query analysis infrastructure as a unary operator. That's because it works a
    ///     little differently than the other binary operators: it has to re-open the right
    ///     child every time an outer element is walked. The right child is NOT partitioned.
    ///  
    /// 
    ///  
    ///  
    internal sealed class SelectManyQueryOperator : UnaryQueryOperator
    { 

        private readonly Func> m_rightChildSelector; // To select a new child each iteration.
        private readonly Func> m_indexedRightChildSelector; // To select a new child each iteration.
        private readonly Func m_resultSelector; // An optional result selection function. 
        private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
 
        //---------------------------------------------------------------------------------------- 
        // Initializes a new select-many operator.
        // 
        // Arguments:
        //    leftChild             - the left data source from which to pull data.
        //    rightChild            - the right data source from which to pull data.
        //    rightChildSelector    - if no right data source was supplied, the selector function 
        //                            will generate a new right child for every unique left element.
        //    resultSelector        - a selection function for creating output elements. 
        // 

        internal SelectManyQueryOperator(IEnumerable leftChild, 
                                         Func> rightChildSelector,
                                         Func> indexedRightChildSelector,
                                         Func resultSelector)
            :base(leftChild) 
        {
            Contract.Assert(leftChild != null, "left child data source cannot be null"); 
            Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null, 
                            "either right child data or selector must be supplied");
            Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null, 
                            "either indexed- or non-indexed child selector must be supplied (not both)");
            Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null,
                            "right input and output must be the same types, otherwise the result selector may not be null");
 
            m_rightChildSelector = rightChildSelector;
            m_indexedRightChildSelector = indexedRightChildSelector; 
            m_resultSelector = resultSelector; 

            // If the SelectMany is indexed, elements must be returned in the order in which 
            // indices were assigned.
            m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null;

            InitOrderIndex(); 
        }
 
        private void InitOrderIndex() 
        {
            if (m_indexedRightChildSelector != null) 
            {
                // If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them
                // into the user delegate.
                m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct); 
            }
            else 
            { 
                if (OutputOrdered)
                { 
                    // If the output of this SelectMany is ordered, the input keys must be at least increasing. The
                    // SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys
                    // are Shuffled, we need to merge prematurely.
                    m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Increasing); 
                }
            } 
 
            SetOrdinalIndexState(OrdinalIndexState.Shuffled);
        } 

        internal override void WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings)
        { 
            int partitionCount = inputStream.PartitionCount;
 
            if (m_indexedRightChildSelector != null) 
            {
                PartitionedStream inputStreamInt; 

                // If the index is not correct, we need to reindex.
                if (m_prematureMerge)
                { 
                    ListQueryResults listResults =
                        QueryOperator.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings); 
                    inputStreamInt = listResults.GetPartitionedStream(); 
                }
                else 
                {
                    inputStreamInt = (PartitionedStream)(object)inputStream;
                }
                WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings); 
                return;
            } 
 
            //
            // 
            if (m_prematureMerge)
            {
                PartitionedStream inputStreamInt =
                    QueryOperator.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings) 
                    .GetPartitionedStream();
                WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings); 
            } 
            else
            { 
                WrapPartitionedStreamNotIndexed(inputStream, recipient, settings);
            }
        }
 
        /// 
        /// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with 
        /// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise, 
        /// it will be the same type as "TLeftKey" in WrapPartitionedStream.)
        ///  
        private void WrapPartitionedStreamNotIndexed(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings)
        {
            int partitionCount = inputStream.PartitionCount; 
            var keyComparer = new PairComparer(inputStream.KeyComparer, Util.GetDefaultComparer());
            var outputStream = new PartitionedStream>(partitionCount, keyComparer, OrdinalIndexState); 
            for (int i = 0; i < partitionCount; i++) 
            {
                outputStream[i] = new SelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken); 
            }

            recipient.Receive(outputStream);
        } 

        ///  
        /// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant 
        /// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate)
        ///  
        private void WrapPartitionedStreamIndexed(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings)
        {
            var keyComparer = new PairComparer(inputStream.KeyComparer, Util.GetDefaultComparer()); 

            var outputStream = new PartitionedStream>(inputStream.PartitionCount, keyComparer, OrdinalIndexState); 
 
            for (int i = 0; i < inputStream.PartitionCount; i++)
            { 
                outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
            }

            recipient.Receive(outputStream); 
        }
 
        //--------------------------------------------------------------------------------------- 
        // Just opens the current operator, including opening the left child and wrapping with a
        // partition if needed. The right child is not opened yet -- this is always done on demand 
        // as the outer elements are enumerated.
        //

        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        {
            QueryResults childQueryResults = Child.Open(settings, preferStriping); 
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); 
        }
 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        { 
            if (m_rightChildSelector != null) 
            {
                if (m_resultSelector != null) 
                {
                    return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector);
                }
                return (IEnumerable)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector)); 
            }
            else 
            { 
                Contract.Assert(m_indexedRightChildSelector != null);
                if (m_resultSelector != null) 
                {
                    return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector);
                }
 
                return (IEnumerable)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector));
            } 
        } 

 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        //
 
        internal override bool LimitsParallelism
        { 
            get { return m_prematureMerge; } 
        }
 
        //----------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the SelectMany logic.
        //
 
        class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator>
        { 
            private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate. 
            private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use.
            private IEnumerator m_currentRightSource; // The current enumerator we're using. 
            private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
            private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
            private readonly CancellationToken m_cancellationToken;
 
            private class Mutables
            { 
                internal int m_currentRightSourceIndex = -1; // The index for the right data source. 
                internal TLeftInput m_currentLeftElement; // The current element in the left data source.
                internal int m_currentLeftSourceIndex; // The current key in the left data source. 
                internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing.
            }

 
            //---------------------------------------------------------------------------------------
            // Instantiates a new select-many enumerator. Notice that the right data source is an 
            // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left 
            // data source.
            // 

            internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource,
                                                              SelectManyQueryOperator selectManyOperator,
                CancellationToken cancellationToken) 
            {
                Contract.Assert(leftSource != null); 
                Contract.Assert(selectManyOperator != null); 

                m_leftSource = leftSource; 
                m_selectManyOperator = selectManyOperator;
                m_cancellationToken = cancellationToken;
            }
 
            //----------------------------------------------------------------------------------------
            // Straightforward IEnumerator methods. 
            // 

            internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) 
            {
                while (true)
                {
                    if (m_currentRightSource == null) 
                    {
                        m_mutables = new Mutables(); 
 
                        // Check cancellation every few lhs-enumerations in case none of them are producing
                        // any outputs.  Otherwise, we rely on the consumer of this operator to be performing the checks. 
                        if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
                            CancellationState.ThrowIfCanceled(m_cancellationToken);

                        // We don't have a "current" right enumerator to use. We have to fetch the next 
                        // one. If the left has run out of elements, however, we're done and just return
                        // false right away. 
                        if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex)) 
                        {
                            return false; 
                        }

                        // Use the source selection routine to create a right child.
                        IEnumerable rightChild = 
                            m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex);
 
                        Contract.Assert(rightChild != null); 
                        m_currentRightSource = rightChild.GetEnumerator();
 
                        Contract.Assert(m_currentRightSource != null);

                        // If we have no result selector, we will need to access the Current element of the right
                        // data source as though it is a TOutput. Unfortunately, we know that TRightInput must 
                        // equal TOutput (we check it during operator construction), but the type system doesn't.
                        // Thus we would have to cast the result of invoking Current from type TRightInput to 
                        // TOutput. This is no good, since the results could be value types. Instead, we save the 
                        // enumerator object as an IEnumerator and access that later on.
                        if (m_selectManyOperator.m_resultSelector == null) 
                        {
                            m_currentRightSourceAsOutput = (IEnumerator)(object)m_currentRightSource;
                            Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
                                            "these must be equal, otherwise the surrounding logic will be broken"); 
                        }
                    } 
 
                    if (m_currentRightSource.MoveNext())
                    { 
                        m_mutables.m_currentRightSourceIndex++;

                        // If the inner data source has an element, we can yield it.
                        if (m_selectManyOperator.m_resultSelector != null) 
                        {
                            // In the case of a selection function, use that to yield the next element. 
                            currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current); 
                        }
                        else 
                        {
                            // Otherwise, the right input and output types must be the same. We use the
                            // casted copy of the current right source and just return its current element.
                            Contract.Assert(m_currentRightSourceAsOutput != null); 
                            currentElement = m_currentRightSourceAsOutput.Current;
                        } 
                        currentKey = new Pair(m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex); 

                        return true; 
                    }
                    else
                    {
                        // Otherwise, we have exhausted the right data source. Loop back around and try 
                        // to get the next left element, then its right, and so on.
                        m_currentRightSource.Dispose(); 
                        m_currentRightSource = null; 
                        m_currentRightSourceAsOutput = null;
                    } 
                }
            }

            protected override void Dispose(bool disposing) 
            {
                m_leftSource.Dispose(); 
                if (m_currentRightSource != null) 
                {
                    m_currentRightSource.Dispose(); 
                }
            }
        }
 
        //----------------------------------------------------------------------------------------
        // The enumerator type responsible for executing the SelectMany logic. 
        // 

        class SelectManyQueryOperatorEnumerator : QueryOperatorEnumerator> 
        {
            private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate.
            private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use.
            private IEnumerator m_currentRightSource; // The current enumerator we're using. 
            private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
            private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing] 
            private readonly CancellationToken m_cancellationToken; 

            private class Mutables 
            {
                internal int m_currentRightSourceIndex = -1; // The index for the right data source.
                internal TLeftInput m_currentLeftElement; // The current element in the left data source.
                internal TLeftKey m_currentLeftKey; // The current key in the left data source. 
                internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing.
            } 
 

            //--------------------------------------------------------------------------------------- 
            // Instantiates a new select-many enumerator. Notice that the right data source is an
            // enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
            // data source.
            // 

            internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource, 
                                                       SelectManyQueryOperator selectManyOperator, 
                                                       CancellationToken cancellationToken)
            { 
                Contract.Assert(leftSource != null);
                Contract.Assert(selectManyOperator != null);

                m_leftSource = leftSource; 
                m_selectManyOperator = selectManyOperator;
                m_cancellationToken = cancellationToken; 
            } 

            //---------------------------------------------------------------------------------------- 
            // Straightforward IEnumerator methods.
            //

            internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey) 
            {
                while (true) 
                { 
                    if (m_currentRightSource == null)
                    { 
                        m_mutables = new Mutables();

                        // Check cancellation every few lhs-enumerations in case none of them are producing
                        // any outputs.  Otherwise, we rely on the consumer of this operator to be performing the checks. 
                        if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
                            CancellationState.ThrowIfCanceled(m_cancellationToken); 
 
                        // We don't have a "current" right enumerator to use. We have to fetch the next
                        // one. If the left has run out of elements, however, we're done and just return 
                        // false right away.

                        if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey))
                        { 
                            return false;
                        } 
 
                        // Use the source selection routine to create a right child.
                        IEnumerable rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement); 

                        Contract.Assert(rightChild != null);
                        m_currentRightSource = rightChild.GetEnumerator();
 
                        Contract.Assert(m_currentRightSource != null);
 
                        // If we have no result selector, we will need to access the Current element of the right 
                        // data source as though it is a TOutput. Unfortunately, we know that TRightInput must
                        // equal TOutput (we check it during operator construction), but the type system doesn't. 
                        // Thus we would have to cast the result of invoking Current from type TRightInput to
                        // TOutput. This is no good, since the results could be value types. Instead, we save the
                        // enumerator object as an IEnumerator and access that later on.
                        if (m_selectManyOperator.m_resultSelector == null) 
                        {
                            m_currentRightSourceAsOutput = (IEnumerator)(object)m_currentRightSource; 
                            Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource, 
                                            "these must be equal, otherwise the surrounding logic will be broken");
                        } 
                    }

                    if (m_currentRightSource.MoveNext())
                    { 
                        m_mutables.m_currentRightSourceIndex++;
 
                        // If the inner data source has an element, we can yield it. 
                        if (m_selectManyOperator.m_resultSelector != null)
                        { 
                            // In the case of a selection function, use that to yield the next element.
                            currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
                        }
                        else 
                        {
                            // Otherwise, the right input and output types must be the same. We use the 
                            // casted copy of the current right source and just return its current element. 
                            Contract.Assert(m_currentRightSourceAsOutput != null);
                            currentElement = m_currentRightSourceAsOutput.Current; 
                        }
                        currentKey = new Pair(m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex);

                        return true; 
                    }
                    else 
                    { 
                        // Otherwise, we have exhausted the right data source. Loop back around and try
                        // to get the next left element, then its right, and so on. 
                        m_currentRightSource.Dispose();
                        m_currentRightSource = null;
                        m_currentRightSourceAsOutput = null;
                    } 
                }
            } 
 
            protected override void Dispose(bool disposing)
            { 
                m_leftSource.Dispose();
                if (m_currentRightSource != null)
                {
                    m_currentRightSource.Dispose(); 
                }
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK