Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / IntersectQueryOperator.cs / 1305376 / IntersectQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // IntersectQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// Operator that yields the intersection of two data sources. /// ///internal sealed class IntersectQueryOperator : BinaryQueryOperator { private readonly IEqualityComparer m_comparer; // An equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new intersection operator. // internal IntersectQueryOperator(ParallelQuery left, ParallelQuery right, IEqualityComparer comparer) :base(left, right) { Contract.Assert(left != null && right != null, "child data sources cannot be null"); m_comparer = comparer; m_outputOrdered = LeftChild.OutputOrdered; SetOrdinalIndex(OrdinalIndexState.Shuffled); } internal override QueryResults Open( QuerySettings settings, bool preferStriping) { // We just open our child operators, left and then right. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults leftChildResults = LeftChild.Open(settings, false); QueryResults rightChildResults = RightChild.Open(settings, false); return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false); } public override void WrapPartitionedStream ( PartitionedStream leftPartitionedStream, PartitionedStream rightPartitionedStream, IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings) { Contract.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount); if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream rightPartitionedStream, IPartitionedStreamRecipient outputRecipient, CancellationToken cancellationToken) { int partitionCount = leftHashStream.PartitionCount; PartitionedStream , int> rightHashStream = ExchangeUtilities.HashRepartition ( rightPartitionedStream, null, null, m_comparer, cancellationToken); PartitionedStream outputStream = new PartitionedStream (partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedIntersectQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], m_comparer, leftHashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new IntersectQueryOperatorEnumerator (leftHashStream[i], rightHashStream[i], m_comparer, cancellationToken); } } outputRecipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //--------------------------------------------------------------------------------------- // This enumerator performs the intersection operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It then // only returns elements that are seen twice (returning each one only once). // class IntersectQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , int> m_rightSource; // Right data source. private IEqualityComparer m_comparer; // Comparer to use for equality/hash-coding. private Set m_hashLookup; // The hash lookup, used to produce the intersection. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; //---------------------------------------------------------------------------------------- // Instantiates a new intersection operator. // internal IntersectQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , int> rightSource, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_comparer = comparer; m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the intersection. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_leftSource != null); Contract.Assert(m_rightSource != null); // Build the set out of the right data source, if we haven't already. if (m_hashLookup == null) { m_outputLoopCount = new Shared (0); // @ m_hashLookup = new Set (m_comparer); Pair rightElement = default(Pair ); int rightKeyUnused = default(int); int i = 0; while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); m_hashLookup.Add(rightElement.First); } } // Now iterate over the left data source, looking for matches. Pair leftElement = default(Pair ); TLeftKey keyUnused = default(TLeftKey); while (m_leftSource.MoveNext(ref leftElement, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // If we found the element in our set, and if we haven't returned it yet, // we can yield it to the caller. We also mark it so we know we've returned // it once already and never will again. if (m_hashLookup.Contains(leftElement.First)) { // @ m_hashLookup.Remove(leftElement.First); currentElement = leftElement.First; #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_leftSource != null && m_rightSource != null); m_leftSource.Dispose(); m_rightSource.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token); IEnumerable wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); return wrappedLeftChild.Intersect(wrappedRightChild, m_comparer); } class OrderedIntersectQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , int> m_rightSource; // Right data source. private IEqualityComparer > m_comparer; // Comparer to use for equality/hash-coding. private IComparer m_leftKeyComparer; // Comparer to use to determine ordering of order keys. private Dictionary , Pair > m_hashLookup; // The hash lookup, used to produce the intersection. private CancellationToken m_cancellationToken; //---------------------------------------------------------------------------------------- // Instantiates a new intersection operator. // internal OrderedIntersectQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , int> rightSource, IEqualityComparer comparer, IComparer leftKeyComparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_comparer = new WrapperEqualityComparer (comparer); m_leftKeyComparer = leftKeyComparer; m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the intersection. // internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey) { Contract.Assert(m_leftSource != null); Contract.Assert(m_rightSource != null); // Build the set out of the left data source, if we haven't already. int i = 0; if (m_hashLookup == null) { // @ m_hashLookup = new Dictionary , Pair >(m_comparer); Pair leftElement = default(Pair ); TLeftKey leftKey = default(TLeftKey); while (m_leftSource.MoveNext(ref leftElement, ref leftKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far Pair oldEntry; Wrapper wrappedLeftElem = new Wrapper (leftElement.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || m_leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedLeftElem] = new Pair (leftElement.First, leftKey); } } } // Now iterate over the right data source, looking for matches. Pair rightElement = default(Pair ); int rightKeyUnused = default(int); while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // If we found the element in our set, and if we haven't returned it yet, // we can yield it to the caller. We also mark it so we know we've returned // it once already and never will again. Pair entry; Wrapper wrappedRightElem = new Wrapper (rightElement.First); if (m_hashLookup.TryGetValue(wrappedRightElem, out entry)) { currentElement = entry.First; currentKey = entry.Second; // @ m_hashLookup.Remove(new Wrapper (entry.First)); return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_leftSource != null && m_rightSource != null); m_leftSource.Dispose(); m_rightSource.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- StoreContentChangedEventArgs.cs
- TransformerTypeCollection.cs
- LinqMaximalSubtreeNominator.cs
- BamlWriter.cs
- ListViewTableRow.cs
- Processor.cs
- ModelPerspective.cs
- BuildProvidersCompiler.cs
- HyperlinkAutomationPeer.cs
- MimeObjectFactory.cs
- EntityParameterCollection.cs
- QilLiteral.cs
- DoubleLinkList.cs
- SBCSCodePageEncoding.cs
- ProviderBase.cs
- ListBoxItem.cs
- DataSourceSelectArguments.cs
- RelationshipEndMember.cs
- OletxVolatileEnlistment.cs
- SHA384.cs
- TextRenderer.cs
- RelatedPropertyManager.cs
- ResourceSetExpression.cs
- AutoGeneratedField.cs
- SendSecurityHeaderElement.cs
- HTTPNotFoundHandler.cs
- RefType.cs
- AncestorChangedEventArgs.cs
- SplitterPanel.cs
- WindowsAuthenticationEventArgs.cs
- SqlGatherConsumedAliases.cs
- MessagePropertyAttribute.cs
- UIPropertyMetadata.cs
- CodeDOMProvider.cs
- XmlIgnoreAttribute.cs
- DesignerDataSchemaClass.cs
- StrokeSerializer.cs
- ListItemConverter.cs
- UrlParameterWriter.cs
- VersionedStreamOwner.cs
- StringValueSerializer.cs
- XmlBoundElement.cs
- Exceptions.cs
- GradientStop.cs
- ProtectedConfiguration.cs
- XmlUtilWriter.cs
- ObjectDataSourceStatusEventArgs.cs
- StandardTransformFactory.cs
- PortCache.cs
- Renderer.cs
- UnsafeNativeMethodsMilCoreApi.cs
- UnauthorizedAccessException.cs
- XmlSerializer.cs
- StreamFormatter.cs
- MapPathBasedVirtualPathProvider.cs
- Metafile.cs
- CompilerGeneratedAttribute.cs
- DoubleCollectionConverter.cs
- DragEventArgs.cs
- AttributeCollection.cs
- KnownTypeAttribute.cs
- DataBindingCollection.cs
- UsernameTokenFactoryCredential.cs
- UndoUnit.cs
- JapaneseLunisolarCalendar.cs
- HitTestParameters3D.cs
- TemplateXamlParser.cs
- GridEntryCollection.cs
- OdbcEnvironment.cs
- DrawingGroupDrawingContext.cs
- RegistryKey.cs
- PackageRelationshipCollection.cs
- CounterSample.cs
- ByteAnimation.cs
- WebConfigurationManager.cs
- BitVector32.cs
- ObjectIDGenerator.cs
- XmlNodeChangedEventArgs.cs
- ValueExpressions.cs
- ClientFormsIdentity.cs
- GridViewColumnCollectionChangedEventArgs.cs
- SymLanguageType.cs
- SqlCrossApplyToCrossJoin.cs
- ConfigurationPropertyAttribute.cs
- TemplateBuilder.cs
- RightsManagementEncryptionTransform.cs
- DataGridViewComponentPropertyGridSite.cs
- TableSectionStyle.cs
- AspCompat.cs
- Rectangle.cs
- BindingMemberInfo.cs
- RectIndependentAnimationStorage.cs
- EdmProperty.cs
- GeneralTransform3DTo2DTo3D.cs
- XsdValidatingReader.cs
- SqlConnectionPoolProviderInfo.cs
- LongAverageAggregationOperator.cs
- WebColorConverter.cs
- TextAnchor.cs
- SafeCloseHandleCritical.cs