Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / UnionQueryOperator.cs / 1305376 / UnionQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // UnionQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// Operator that yields the union of two data sources. /// ///internal sealed class UnionQueryOperator : BinaryQueryOperator { private readonly IEqualityComparer m_comparer; // An equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new union operator. // internal UnionQueryOperator(ParallelQuery left, ParallelQuery right, IEqualityComparer comparer) :base(left, right) { Contract.Assert(left != null && right != null, "child data sources cannot be null"); m_comparer = comparer; m_outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered; } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open( QuerySettings settings, bool preferStriping) { // We just open our child operators, left and then right. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults leftChildResults = LeftChild.Open(settings, false); QueryResults rightChildResults = RightChild.Open(settings, false); return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false); } public override void WrapPartitionedStream ( PartitionedStream leftStream, PartitionedStream rightStream, IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings) { Contract.Assert(leftStream.PartitionCount == rightStream.PartitionCount); int partitionCount = leftStream.PartitionCount; // Wrap both child streams with hash repartition if (LeftChild.OutputOrdered) { PartitionedStream , TLeftKey> leftHashStream = ExchangeUtilities.HashRepartitionOrdered ( leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken); WrapPartitionedStreamFixedLeftType ( leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); } else { PartitionedStream , int> leftHashStream = ExchangeUtilities.HashRepartition ( leftStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken); WrapPartitionedStreamFixedLeftType ( leftHashStream, rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // A helper method that allows WrapPartitionedStream to fix the TLeftKey type parameter. // private void WrapPartitionedStreamFixedLeftType ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream rightStream, IPartitionedStreamRecipient outputRecipient, int partitionCount, CancellationToken cancellationToken) { if (RightChild.OutputOrdered) { PartitionedStream , TRightKey> rightHashStream = ExchangeUtilities.HashRepartitionOrdered ( rightStream, null, null, m_comparer, cancellationToken); WrapPartitionedStreamFixedBothTypes ( leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken); } else { PartitionedStream , int> rightHashStream = ExchangeUtilities.HashRepartition ( rightStream, null, null, m_comparer, cancellationToken); WrapPartitionedStreamFixedBothTypes ( leftHashStream, rightHashStream, outputRecipient, partitionCount, cancellationToken); } } //--------------------------------------------------------------------------------------- // A helper method that allows WrapPartitionedStreamHelper to fix the TRightKey type parameter. // private void WrapPartitionedStreamFixedBothTypes ( PartitionedStream , TLeftKey> leftHashStream, PartitionedStream , TRightKey> rightHashStream, IPartitionedStreamRecipient outputRecipient, int partitionCount, CancellationToken cancellationToken) { if (LeftChild.OutputOrdered || RightChild.OutputOrdered) { IComparer > compoundKeyComparer = ConcatKey .MakeComparer(leftHashStream.KeyComparer, rightHashStream.KeyComparer); PartitionedStream > outputStream = new PartitionedStream >(partitionCount, compoundKeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new OrderedUnionQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], LeftChild.OutputOrdered, RightChild.OutputOrdered, m_comparer, compoundKeyComparer, cancellationToken); } outputRecipient.Receive(outputStream); } else { PartitionedStream outputStream = new PartitionedStream (partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new UnionQueryOperatorEnumerator ( leftHashStream[i], rightHashStream[i], i, m_comparer, cancellationToken); } outputRecipient.Receive(outputStream); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token); IEnumerable wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); return wrappedLeftChild.Union(wrappedRightChild, m_comparer); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the union operation incrementally. It does this by maintaining // a history -- in the form of a set -- of all data already seen. It is careful not to // return any duplicates. // class UnionQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , TRightKey> m_rightSource; // Right data source. private readonly int m_partitionIndex; // The current partition. private Set m_hashLookup; // The hash lookup, used to produce the union. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; private readonly IEqualityComparer m_comparer; //---------------------------------------------------------------------------------------- // Instantiates a new union operator. // internal UnionQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , TRightKey> rightSource, int partitionIndex, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_partitionIndex = partitionIndex; // @ m_comparer = comparer; m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the union. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { if (m_hashLookup == null) { m_hashLookup = new Set (m_comparer); m_outputLoopCount = new Shared (0); } Contract.Assert(m_hashLookup != null); // Enumerate the left and then right data source. When each is done, we set the // field to null so we will skip it upon subsequent calls to MoveNext. if (m_leftSource != null) { // Iterate over this set's elements until we find a unique element. TLeftKey keyUnused = default(TLeftKey); Pair currentLeftElement = default(Pair ); int i = 0; while (m_leftSource.MoveNext(ref currentLeftElement, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(currentLeftElement.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = currentLeftElement.First; return true; } } m_leftSource.Dispose(); m_leftSource = null; } if (m_rightSource != null) { // Iterate over this set's elements until we find a unique element. TRightKey keyUnused = default(TRightKey); Pair currentRightElement = default(Pair ); while (m_rightSource.MoveNext(ref currentRightElement, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(currentRightElement.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = currentRightElement.First; return true; } } m_rightSource.Dispose(); m_rightSource = null; } return false; } protected override void Dispose(bool disposing) { if (m_leftSource != null) { m_leftSource.Dispose(); } if (m_rightSource != null) { m_rightSource.Dispose(); } } } class OrderedUnionQueryOperatorEnumerator : QueryOperatorEnumerator > { private QueryOperatorEnumerator , TLeftKey> m_leftSource; // Left data source. private QueryOperatorEnumerator , TRightKey> m_rightSource; // Right data source. private IComparer > m_keyComparer; // Comparer for compound order keys. private IEnumerator , Pair >>> m_outputEnumerator; // Enumerator over the output of the union. private bool m_leftOrdered; // Whether the left data source is ordered. private bool m_rightOrdered; // Whether the right data source is ordered. private IEqualityComparer m_comparer; // Comparer for the elements. private CancellationToken m_cancellationToken; //---------------------------------------------------------------------------------------- // Instantiates a new union operator. // internal OrderedUnionQueryOperatorEnumerator( QueryOperatorEnumerator , TLeftKey> leftSource, QueryOperatorEnumerator , TRightKey> rightSource, bool leftOrdered, bool rightOrdered, IEqualityComparer comparer, IComparer > keyComparer, CancellationToken cancellationToken) { Contract.Assert(leftSource != null); Contract.Assert(rightSource != null); m_leftSource = leftSource; m_rightSource = rightSource; m_keyComparer = keyComparer; m_leftOrdered = leftOrdered; m_rightOrdered = rightOrdered; m_comparer = comparer; if (m_comparer == null) { m_comparer = EqualityComparer .Default; } m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Walks the two data sources, left and then right, to produce the union. // internal override bool MoveNext(ref TInputOutput currentElement, ref ConcatKey currentKey) { Contract.Assert(m_leftSource != null); Contract.Assert(m_rightSource != null); if (m_outputEnumerator == null) { IEqualityComparer > wrapperComparer = new WrapperEqualityComparer (m_comparer); Dictionary , Pair >> union = new Dictionary , Pair >>(wrapperComparer); Pair elem = default(Pair ); TLeftKey leftKey = default(TLeftKey); int i = 0; while (m_leftSource.MoveNext(ref elem, ref leftKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); ConcatKey key = ConcatKey .MakeLeft(m_leftOrdered ? leftKey : default(TLeftKey)); Pair > oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0) { union[wrappedElem] = new Pair >(elem.First, key); } } TRightKey rightKey = default(TRightKey); while (m_rightSource.MoveNext(ref elem, ref rightKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); ConcatKey key = ConcatKey .MakeRight(m_rightOrdered ? rightKey : default(TRightKey)); Pair > oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); if (!union.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(key, oldEntry.Second) < 0) { union[wrappedElem] = new Pair >(elem.First, key); ; } } m_outputEnumerator = union.GetEnumerator(); } if (m_outputEnumerator.MoveNext()) { Pair > current = m_outputEnumerator.Current.Value; currentElement = current.First; currentKey = current.Second; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_leftSource != null && m_rightSource != null); m_leftSource.Dispose(); m_rightSource.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- KeyboardDevice.cs
- Int16Animation.cs
- ResourceFallbackManager.cs
- CompositeControl.cs
- XmlSerializationGeneratedCode.cs
- NeutralResourcesLanguageAttribute.cs
- ISessionStateStore.cs
- XmlDataSourceView.cs
- peersecuritysettings.cs
- RpcCryptoContext.cs
- BrowserDefinitionCollection.cs
- TextTreeNode.cs
- CommonGetThemePartSize.cs
- DesignColumnCollection.cs
- RegexFCD.cs
- UMPAttributes.cs
- DBAsyncResult.cs
- SystemColors.cs
- ReadOnlyTernaryTree.cs
- ToolStripItemClickedEventArgs.cs
- ToolboxItemFilterAttribute.cs
- SqlTriggerContext.cs
- HtmlTableRowCollection.cs
- DESCryptoServiceProvider.cs
- NativeMethods.cs
- EntityTypeEmitter.cs
- SamlAuthenticationClaimResource.cs
- Convert.cs
- Converter.cs
- CryptoHelper.cs
- IsolatedStorage.cs
- DictionaryManager.cs
- QuaternionKeyFrameCollection.cs
- LocatorManager.cs
- PatternMatcher.cs
- BackgroundFormatInfo.cs
- SchemaManager.cs
- IndicCharClassifier.cs
- SQLMembershipProvider.cs
- ObservableCollection.cs
- ToolStripGripRenderEventArgs.cs
- OutOfProcStateClientManager.cs
- _CookieModule.cs
- AnnotationHighlightLayer.cs
- CanonicalXml.cs
- JournalEntryStack.cs
- StaticExtension.cs
- Avt.cs
- assertwrapper.cs
- WebServiceHostFactory.cs
- HtmlTable.cs
- TraceData.cs
- AvTraceDetails.cs
- Helpers.cs
- DataListItemCollection.cs
- CodeStatement.cs
- ObjectDataSourceEventArgs.cs
- SerializerWriterEventHandlers.cs
- NameTable.cs
- COM2ExtendedUITypeEditor.cs
- EventProviderClassic.cs
- ProfileSettingsCollection.cs
- EntityDataSourceValidationException.cs
- MetabaseServerConfig.cs
- ExceptionWrapper.cs
- ToolboxBitmapAttribute.cs
- TextEndOfLine.cs
- PocoEntityKeyStrategy.cs
- NonVisualControlAttribute.cs
- MemberJoinTreeNode.cs
- DoubleAnimationClockResource.cs
- ClientScriptManager.cs
- SpoolingTask.cs
- SafeNativeMethods.cs
- OneOfElement.cs
- AutoGeneratedFieldProperties.cs
- returneventsaver.cs
- DbParameterHelper.cs
- XPathPatternBuilder.cs
- DataGridViewCellValidatingEventArgs.cs
- Clause.cs
- PrimaryKeyTypeConverter.cs
- DesignerObjectListAdapter.cs
- SettingsSavedEventArgs.cs
- SlipBehavior.cs
- PixelShader.cs
- ConfigurationElementCollection.cs
- HtmlFormWrapper.cs
- WindowsTokenRoleProvider.cs
- ErrorWrapper.cs
- ReadOnlyDictionary.cs
- ToolStripComboBox.cs
- ServiceModelSecurityTokenRequirement.cs
- DispatcherSynchronizationContext.cs
- X509Certificate2.cs
- ClientTarget.cs
- ServiceOperationParameter.cs
- dtdvalidator.cs
- SecurityElementBase.cs
- MachineSettingsSection.cs