ConcatQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / ConcatQueryOperator.cs / 1305376 / ConcatQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// ConcatQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// Concatenates one data source with another.  Order preservation is used to ensure
    /// the output is actually a concatenation -- i.e. one after the other.  The only
    /// special synchronization required is to find the largest index N in the first data 
    /// source so that the indices of elements in the second data source can be offset
    /// by adding N+1.  This makes it appear to the order preservation infrastructure as 
    /// though all elements in the second came after all elements in the first, which is 
    /// precisely what we want.
    ///  
    /// 
    internal sealed class ConcatQueryOperator : BinaryQueryOperator
    {
 
        private readonly bool m_prematureMergeLeft = false; // Whether to prematurely merge the left data source
        private readonly bool m_prematureMergeRight = false; // Whether to prematurely merge the right data source 
 
        //----------------------------------------------------------------------------------------
        // Initializes a new concatenation operator. 
        //
        // Arguments:
        //     child                - the child whose data we will reverse
        // 

        internal ConcatQueryOperator(ParallelQuery firstChild, ParallelQuery secondChild) 
            : base(firstChild, secondChild) 
        {
            Contract.Assert(firstChild != null, "first child data source cannot be null"); 
            Contract.Assert(secondChild != null, "second child data source cannot be null");
            m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered;

            m_prematureMergeLeft = LeftChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing); 
            m_prematureMergeRight = RightChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
 
            if ((LeftChild.OrdinalIndexState == OrdinalIndexState.Indexible) 
                && (RightChild.OrdinalIndexState == OrdinalIndexState.Indexible))
            { 
                SetOrdinalIndex(OrdinalIndexState.Indexible);
            }
            else
            { 
                SetOrdinalIndex(OrdinalIndexState.Shuffled);
            } 
        } 

        //--------------------------------------------------------------------------------------- 
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed.
        //
 
        internal override QueryResults Open(QuerySettings settings, bool preferStriping)
        { 
            // We just open the children operators. 
            QueryResults leftChildResults = LeftChild.Open(settings, preferStriping);
            QueryResults rightChildResults = RightChild.Open(settings, preferStriping); 

            return ConcatQueryOperatorResults.NewResults(leftChildResults, rightChildResults, this, settings, preferStriping);
        }
 
        public override void WrapPartitionedStream(
            PartitionedStream leftStream, PartitionedStream rightStream, 
            IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings) 
        {
            OrdinalIndexState leftChildIndexState = leftStream.OrdinalIndexState; 
            int partitionCount = leftStream.PartitionCount;

            PartitionedStream leftStreamInc;
            PartitionedStream rightStreamInc; 

            // Prematurely merge the left results, if necessary 
            if (m_prematureMergeLeft) 
            {
                ListQueryResults leftStreamResults = 
                    ExecuteAndCollectResults(leftStream, partitionCount, LeftChild.OutputOrdered, preferStriping, settings);
                leftStreamInc =  leftStreamResults.GetPartitionedStream();
            }
            else 
            {
                Contract.Assert(!ExchangeUtilities.IsWorseThan(leftStream.OrdinalIndexState, OrdinalIndexState.Increasing)); 
                leftStreamInc = (PartitionedStream)(object)leftStream; 
            }
 
            // Prematurely merge the right results, if necessary
            if (m_prematureMergeRight)
            {
                ListQueryResults rightStreamResults = 
                    ExecuteAndCollectResults(rightStream, partitionCount, LeftChild.OutputOrdered, preferStriping, settings);
                rightStreamInc =  rightStreamResults.GetPartitionedStream(); 
            } 
            else
            { 
                Contract.Assert(!ExchangeUtilities.IsWorseThan(rightStream.OrdinalIndexState, OrdinalIndexState.Increasing));
                rightStreamInc = (PartitionedStream)(object)rightStream;
            }
 
            // Generate the shared data.
            IComparer> comparer = ConcatKey.MakeComparer( 
                leftStreamInc.KeyComparer, rightStreamInc.KeyComparer); 
            var outputStream = new PartitionedStream>(partitionCount, comparer, OrdinalIndexState);
 
            for (int i = 0; i < partitionCount; i++)
            {
                outputStream[i] = new ConcatQueryOperatorEnumerator(leftStreamInc[i], rightStreamInc[i]);
            } 

            outputRecipient.Receive(outputStream); 
        } 

 
        //---------------------------------------------------------------------------------------
        // Returns an enumerable that represents the query executing sequentially.
        //
 
        internal override IEnumerable AsSequentialQuery(CancellationToken token)
        { 
            return LeftChild.AsSequentialQuery(token).Concat(RightChild.AsSequentialQuery(token)); 
        }
 

        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        // 

        internal override bool LimitsParallelism 
        { 
            get
            { 
                return m_prematureMergeLeft || m_prematureMergeLeft;
            }
        }
 
        //----------------------------------------------------------------------------------------
        // The enumerator type responsible for concatenating two data sources. 
        // 

        class ConcatQueryOperatorEnumerator : QueryOperatorEnumerator> 
        {

            private QueryOperatorEnumerator m_firstSource; // The first data source to enumerate.
            private QueryOperatorEnumerator m_secondSource; // The second data source to enumerate. 
            private bool m_begunSecond; // Whether this partition has begun enumerating the second source yet.
 
            //--------------------------------------------------------------------------------------- 
            // Instantiates a new select enumerator.
            // 

            internal ConcatQueryOperatorEnumerator(
                QueryOperatorEnumerator firstSource,
                QueryOperatorEnumerator secondSource) 
            {
                Contract.Assert(firstSource != null); 
                Contract.Assert(secondSource != null); 

                m_firstSource = firstSource; 
                m_secondSource = secondSource;
            }

            //---------------------------------------------------------------------------------------- 
            // MoveNext advances to the next element in the output.  While the first data source has
            // elements, this consists of just advancing through it.  After this, all partitions must 
            // synchronize at a barrier and publish the maximum index N.  Finally, all partitions can 
            // move on to the second data source, adding N+1 to indices in order to get the correct
            // index offset. 
            //

            internal override bool MoveNext(ref TSource currentElement, ref ConcatKey currentKey)
            { 
                Contract.Assert(m_firstSource != null);
                Contract.Assert(m_secondSource != null); 
 
                // If we are still enumerating the first source, fetch the next item.
                if (!m_begunSecond) 
                {
                    // If elements remain, just return true and continue enumerating the left.
                    TLeftKey leftKey = default(TLeftKey);
                    if (m_firstSource.MoveNext(ref currentElement, ref leftKey)) 
                    {
                        currentKey = ConcatKey.MakeLeft(leftKey); 
                        return true; 
                    }
                    m_begunSecond = true; 
                }

                // Now either move on to, or continue, enumerating the right data source.
                TRightKey rightKey = default(TRightKey); 
                if (m_secondSource.MoveNext(ref currentElement, ref rightKey))
                { 
                    currentKey = ConcatKey.MakeRight(rightKey); 
                    return true;
                } 

                return false;
            }
 
            protected override void Dispose(bool disposing)
            { 
                m_firstSource.Dispose(); 
                m_secondSource.Dispose();
            } 
        }


        //------------------------------------------------------------------------------------ 
        // Query results for a Concat operator. The results are indexible if the child
        // results were indexible. 
        // 

        class ConcatQueryOperatorResults : BinaryQueryOperatorResults 
        {
            ConcatQueryOperator m_concatOp; // Operator that generated the results
            int m_leftChildCount; // The number of elements in the left child result set
            int m_rightChildCount; // The number of elements in the right child result set 

            public static QueryResults NewResults( 
                QueryResults leftChildQueryResults, QueryResults rightChildQueryResults, 
                ConcatQueryOperator op, QuerySettings settings,
                bool preferStriping) 
            {
                if (leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible)
                {
                    return new ConcatQueryOperatorResults( 
                        leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
                } 
                else 
                {
                    return new BinaryQueryOperatorResults( 
                        leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
                }
            }
 
            private ConcatQueryOperatorResults(
                QueryResults leftChildQueryResults, QueryResults rightChildQueryResults, 
                ConcatQueryOperator concatOp, QuerySettings settings, 
                bool preferStriping)
                : base(leftChildQueryResults, rightChildQueryResults, concatOp, settings, preferStriping) 
            {
                m_concatOp = concatOp;
                Contract.Assert(leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible);
 
                m_leftChildCount = leftChildQueryResults.ElementsCount;
                m_rightChildCount = rightChildQueryResults.ElementsCount; 
            } 

            internal override bool IsIndexible 
            {
                get { return true; }
            }
 
            internal override int ElementsCount
            { 
                get 
                {
                    Contract.Assert(m_leftChildCount >= 0 && m_rightChildCount >= 0); 
                    return m_leftChildCount + m_rightChildCount;
                }
            }
 
            internal override TSource GetElement(int index)
            { 
                if (index < m_leftChildCount) 
                {
                    return m_leftChildQueryResults.GetElement(index); 
                }
                else
                {
                    return m_rightChildQueryResults.GetElement(index - m_leftChildCount); 
                }
            } 
        } 

    } 

    //---------------------------------------------------------------------------------------
    // ConcatKey represents an ordering key for the Concat operator. It knows whether the
    // element it is associated with is from the left source or the right source, and also 
    // the elements ordering key.
    // 
 
    internal struct ConcatKey
    { 
        private readonly TLeftKey m_leftKey;
        private readonly TRightKey m_rightKey;
        private readonly bool m_isLeft;
 
        private ConcatKey(TLeftKey leftKey, TRightKey rightKey, bool isLeft)
        { 
            m_leftKey = leftKey; 
            m_rightKey = rightKey;
            m_isLeft = isLeft; 
        }

        internal static ConcatKey MakeLeft(TLeftKey leftKey)
        { 
            return new ConcatKey(leftKey, default(TRightKey), true);
        } 
 
        internal static ConcatKey MakeRight(TRightKey rightKey)
        { 
            return new ConcatKey(default(TLeftKey), rightKey, false);
        }

        internal static IComparer> MakeComparer( 
            IComparer leftComparer, IComparer rightComparer)
        { 
            return new ConcatKeyComparer(leftComparer, rightComparer); 
        }
 
        //----------------------------------------------------------------------------------------
        // ConcatKeyComparer compares ConcatKeys, so that elements from the left source come
        // before elements from the right source, and elements within each source are ordered
        // according to the corresponding order key. 
        //
 
        private class ConcatKeyComparer : IComparer> 
        {
            private IComparer m_leftComparer; 
            private IComparer m_rightComparer;

            internal ConcatKeyComparer(IComparer leftComparer, IComparer rightComparer)
            { 
                m_leftComparer = leftComparer;
                m_rightComparer = rightComparer; 
            } 

            public int Compare(ConcatKey x, ConcatKey y) 
            {
                // If one element is from the left source and the other not, the element from the left source
                // comes earlier.
                if (x.m_isLeft != y.m_isLeft) 
                {
                    return x.m_isLeft ? -1 : 1; 
                } 

                // Elements are from the same source (left or right). Compare the corresponding keys. 
                if (x.m_isLeft)
                {
                    return m_leftComparer.Compare(x.m_leftKey, y.m_leftKey);
                } 
                return m_rightComparer.Compare(x.m_rightKey, y.m_rightKey);
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK