ExchangeUtilities.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / Utils / ExchangeUtilities.cs / 1305376 / ExchangeUtilities.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// ExchangeUtilities.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// ExchangeUtilities is a static class that contains helper functions to partition and merge
    /// streams.
    ///  
    internal static class ExchangeUtilities
    { 
        //------------------------------------------------------------------------------------ 
        // A factory method to construct a partitioned stream over a data source.
        // 
        // Arguments:
        //    source                      - the data source to be partitioned
        //    partitionCount              - the number of partitions desired
        //    useOrdinalOrderPreservation - whether ordinal position must be tracked 
        //    useStriping                 - whether striped partitioning should be used instead of range partitioning
        // 
 
        internal static PartitionedStream PartitionDataSource(IEnumerable source, int partitionCount, bool useStriping)
        { 
            // The partitioned stream to return.
            PartitionedStream returnValue;

            IParallelPartitionable sourceAsPartitionable = source as IParallelPartitionable; 
            if (sourceAsPartitionable != null)
            { 
                // The type overrides the partitioning algorithm, so we will use it instead of the default. 
                // The returned enumerator must be the same size that we requested, otherwise we throw.
                QueryOperatorEnumerator[] enumerators = sourceAsPartitionable.GetPartitions(partitionCount); 
                if (enumerators == null)
                {
                    throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullReturn));
                } 
                else if (enumerators.Length != partitionCount)
                { 
                    throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_IncorretElementCount)); 
                }
 
                // Now just copy the enumerators into the stream, validating that the result is non-null.
                PartitionedStream stream =
                    new PartitionedStream(partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Correct);
                for (int i = 0; i < partitionCount; i++) 
                {
                    QueryOperatorEnumerator currentEnumerator = enumerators[i]; 
                    if (currentEnumerator == null) 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullElement)); 
                    }
                    stream[i] = currentEnumerator;
                }
 
                returnValue = stream;
            } 
            else 
            {
                returnValue = new PartitionedDataSource(source, partitionCount, useStriping); 
            }

            Contract.Assert(returnValue.PartitionCount == partitionCount);
 
            return returnValue;
        } 
 
        //-----------------------------------------------------------------------------------
        // Converts an enumerator or a partitioned stream into a hash-partitioned stream. In the resulting 
        // partitioning, all elements with the same hash code are guaranteed to be in the same partition.
        //
        // Arguments:
        //    source                      - the data to be hash-partitioned. If it is a partitioned stream, it 
        //                                  must have partitionCount partitions
        //    partitionCount              - the desired number of partitions 
        //    useOrdinalOrderPreservation - whether ordinal order preservation is required 
        //    keySelector                 - function to obtain the key given an element
        //    keyComparer                 - equality comparer for the keys 
        //

        internal static PartitionedStream, int> HashRepartition(
            PartitionedStream source, Func keySelector, IEqualityComparer keyComparer, 
            IEqualityComparer elementComparer, CancellationToken cancellationToken)
        { 
            TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator"); 
            return new UnorderedHashRepartitionStream(source, keySelector, keyComparer, elementComparer, cancellationToken);
        } 

        internal static PartitionedStream, TOrderKey> HashRepartitionOrdered(
            PartitionedStream source, Func keySelector, IEqualityComparer keyComparer,
            IEqualityComparer elementComparer, CancellationToken cancellationToken) 
        {
            TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator"); 
            return new OrderedHashRepartitionStream(source, keySelector, keyComparer, elementComparer, cancellationToken); 
        }
 
        //---------------------------------------------------------------------------------------
        // A helper method that given two OrdinalIndexState values return the "worse" one. For
        // example, if state1 is valid and state2 is increasing, we will return
        // OrdinalIndexState.Increasing. 
        //
 
        internal static OrdinalIndexState Worse(this OrdinalIndexState state1, OrdinalIndexState state2) 
        {
            return state1 > state2 ? state1 : state2; 
        }

        internal static bool IsWorseThan(this OrdinalIndexState state1, OrdinalIndexState state2)
        { 
            return state1 > state2;
        } 
    } 

    ///  
    /// Used during hash partitioning, when the keys being memoized are not used for anything.
    /// 
    internal struct NoKeyMemoizationRequired { }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// 
// ExchangeUtilities.cs 
//
// [....] 
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections.Generic; 
using System.Diagnostics.Contracts;
using System.Threading; 
 
namespace System.Linq.Parallel
{ 
    /// 
    /// ExchangeUtilities is a static class that contains helper functions to partition and merge
    /// streams.
    ///  
    internal static class ExchangeUtilities
    { 
        //------------------------------------------------------------------------------------ 
        // A factory method to construct a partitioned stream over a data source.
        // 
        // Arguments:
        //    source                      - the data source to be partitioned
        //    partitionCount              - the number of partitions desired
        //    useOrdinalOrderPreservation - whether ordinal position must be tracked 
        //    useStriping                 - whether striped partitioning should be used instead of range partitioning
        // 
 
        internal static PartitionedStream PartitionDataSource(IEnumerable source, int partitionCount, bool useStriping)
        { 
            // The partitioned stream to return.
            PartitionedStream returnValue;

            IParallelPartitionable sourceAsPartitionable = source as IParallelPartitionable; 
            if (sourceAsPartitionable != null)
            { 
                // The type overrides the partitioning algorithm, so we will use it instead of the default. 
                // The returned enumerator must be the same size that we requested, otherwise we throw.
                QueryOperatorEnumerator[] enumerators = sourceAsPartitionable.GetPartitions(partitionCount); 
                if (enumerators == null)
                {
                    throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullReturn));
                } 
                else if (enumerators.Length != partitionCount)
                { 
                    throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_IncorretElementCount)); 
                }
 
                // Now just copy the enumerators into the stream, validating that the result is non-null.
                PartitionedStream stream =
                    new PartitionedStream(partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Correct);
                for (int i = 0; i < partitionCount; i++) 
                {
                    QueryOperatorEnumerator currentEnumerator = enumerators[i]; 
                    if (currentEnumerator == null) 
                    {
                        throw new InvalidOperationException(SR.GetString(SR.ParallelPartitionable_NullElement)); 
                    }
                    stream[i] = currentEnumerator;
                }
 
                returnValue = stream;
            } 
            else 
            {
                returnValue = new PartitionedDataSource(source, partitionCount, useStriping); 
            }

            Contract.Assert(returnValue.PartitionCount == partitionCount);
 
            return returnValue;
        } 
 
        //-----------------------------------------------------------------------------------
        // Converts an enumerator or a partitioned stream into a hash-partitioned stream. In the resulting 
        // partitioning, all elements with the same hash code are guaranteed to be in the same partition.
        //
        // Arguments:
        //    source                      - the data to be hash-partitioned. If it is a partitioned stream, it 
        //                                  must have partitionCount partitions
        //    partitionCount              - the desired number of partitions 
        //    useOrdinalOrderPreservation - whether ordinal order preservation is required 
        //    keySelector                 - function to obtain the key given an element
        //    keyComparer                 - equality comparer for the keys 
        //

        internal static PartitionedStream, int> HashRepartition(
            PartitionedStream source, Func keySelector, IEqualityComparer keyComparer, 
            IEqualityComparer elementComparer, CancellationToken cancellationToken)
        { 
            TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator"); 
            return new UnorderedHashRepartitionStream(source, keySelector, keyComparer, elementComparer, cancellationToken);
        } 

        internal static PartitionedStream, TOrderKey> HashRepartitionOrdered(
            PartitionedStream source, Func keySelector, IEqualityComparer keyComparer,
            IEqualityComparer elementComparer, CancellationToken cancellationToken) 
        {
            TraceHelpers.TraceInfo("PartitionStream<..>.HashRepartitionStream(..):: creating **RE**partitioned stream for nested operator"); 
            return new OrderedHashRepartitionStream(source, keySelector, keyComparer, elementComparer, cancellationToken); 
        }
 
        //---------------------------------------------------------------------------------------
        // A helper method that given two OrdinalIndexState values return the "worse" one. For
        // example, if state1 is valid and state2 is increasing, we will return
        // OrdinalIndexState.Increasing. 
        //
 
        internal static OrdinalIndexState Worse(this OrdinalIndexState state1, OrdinalIndexState state2) 
        {
            return state1 > state2 ? state1 : state2; 
        }

        internal static bool IsWorseThan(this OrdinalIndexState state1, OrdinalIndexState state2)
        { 
            return state1 > state2;
        } 
    } 

    ///  
    /// Used during hash partitioning, when the keys being memoized are not used for anything.
    /// 
    internal struct NoKeyMemoizationRequired { }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK