WhereQueryOperator.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / WhereQueryOperator.cs / 1305376 / WhereQueryOperator.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// WhereQueryOperator.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// The operator type for Where statements. This operator filters out elements that
    /// don't match a filter function (supplied at instantiation time).
    ///  
    /// 
    internal sealed class WhereQueryOperator : UnaryQueryOperator 
    { 

        // Predicate function. Used to filter out non-matching elements during execution. 
        private Func m_predicate;

        //----------------------------------------------------------------------------------------
        // Initializes a new where operator. 
        //
        // Arguments: 
        //    child         - the child operator or data source from which to pull data 
        //    predicate     - a delegate representing the predicate function
        // 
        // Assumptions:
        //    predicate must be non null.
        //
 
        internal WhereQueryOperator(IEnumerable child, Func predicate)
            : base(child) 
        { 
            Contract.Assert(child != null, "child data source cannot be null");
            Contract.Assert(predicate != null, "need a filter function"); 

            SetOrdinalIndexState(
                ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Increasing));
 
            m_predicate = predicate;
        } 
 
        internal override void WrapPartitionedStream(
            PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) 
        {
            PartitionedStream outputStream = new PartitionedStream(
                inputStream.PartitionCount, inputStream.KeyComparer, OrdinalIndexState);
            for (int i = 0; i < inputStream.PartitionCount; i++) 
            {
                outputStream[i] = new WhereQueryOperatorEnumerator(inputStream[i], m_predicate, 
                    settings.CancellationState.MergedCancellationToken); 
            }
 
            recipient.Receive(outputStream);
        }

        //--------------------------------------------------------------------------------------- 
        // Just opens the current operator, including opening the child and wrapping it with
        // partitions as needed. 
        // 

        internal override QueryResults Open(QuerySettings settings, bool preferStriping) 
        {
            // We just open the child operator.
            QueryResults childQueryResults = Child.Open(settings, preferStriping);
 
            // And then return the query results
            return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); 
        } 

        //--------------------------------------------------------------------------------------- 
        // Returns an enumerable that represents the query executing sequentially.
        //

        internal override IEnumerable AsSequentialQuery(CancellationToken token) 
        {
            IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); 
            return wrappedChild.Where(m_predicate); 
        }
 
        //---------------------------------------------------------------------------------------
        // Whether this operator performs a premature merge.
        //
 
        internal override bool LimitsParallelism
        { 
            get { return false; } 
        }
 
        //------------------------------------------------------------------------------------
        // An enumerator that implements the filtering logic.
        //
 
        private class WhereQueryOperatorEnumerator : QueryOperatorEnumerator
        { 
 
            private readonly QueryOperatorEnumerator m_source; // The data source to enumerate.
            private readonly Func m_predicate; // The predicate used for filtering. 
            private CancellationToken m_cancellationToken;
            private Shared m_outputLoopCount;

            //----------------------------------------------------------------------------------- 
            // Instantiates a new enumerator.
            // 
 
            internal WhereQueryOperatorEnumerator(QueryOperatorEnumerator source, Func predicate,
                CancellationToken cancellationToken) 
            {
                Contract.Assert(source != null);
                Contract.Assert(predicate != null);
 
                m_source = source;
                m_predicate = predicate; 
                m_cancellationToken = cancellationToken; 
            }
 
            //------------------------------------------------------------------------------------
            // Moves to the next matching element in the underlying data stream.
            //
 
            internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey)
            { 
                Contract.Assert(m_predicate != null, "expected a compiled operator"); 

                // Iterate through the input until we reach the end of the sequence or find 
                // an element matching the predicate.

                if (m_outputLoopCount == null)
                    m_outputLoopCount = new Shared(0); 

                while (m_source.MoveNext(ref currentElement, ref currentKey)) 
                { 
                    if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
                        CancellationState.ThrowIfCanceled(m_cancellationToken); 

                    if (m_predicate(currentElement))
                    {
                        return true; 
                    }
                } 
                return false; 
            }
 
            protected override void Dispose(bool disposing)
            {
                Contract.Assert(m_source != null);
                m_source.Dispose(); 
            }
        } 
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK