Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / DistinctQueryOperator.cs / 1305376 / DistinctQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SynchronizationContext.cs
- RtfControlWordInfo.cs
- TcpServerChannel.cs
- FolderBrowserDialogDesigner.cs
- CachedBitmap.cs
- MappedMetaModel.cs
- Thread.cs
- ControlBuilder.cs
- DependencyPropertyValueSerializer.cs
- MediaElement.cs
- ZipIOCentralDirectoryFileHeader.cs
- TypeToArgumentTypeConverter.cs
- coordinatorscratchpad.cs
- ThreadSafeMessageFilterTable.cs
- TdsEnums.cs
- MetadataItem_Static.cs
- CapabilitiesPattern.cs
- CompilationUtil.cs
- PageStatePersister.cs
- AutoGeneratedFieldProperties.cs
- TextTreeRootTextBlock.cs
- LinkedResource.cs
- RectConverter.cs
- TemplateKeyConverter.cs
- Imaging.cs
- EditableLabelControl.cs
- GeometryGroup.cs
- TableItemStyle.cs
- HtmlInputFile.cs
- ApplicationHost.cs
- ComNativeDescriptor.cs
- ColorInterpolationModeValidation.cs
- RemotingAttributes.cs
- DataFormat.cs
- MissingSatelliteAssemblyException.cs
- _NtlmClient.cs
- TextParaLineResult.cs
- DataSourceView.cs
- WebConfigurationHost.cs
- EUCJPEncoding.cs
- SqlGatherConsumedAliases.cs
- DataRecordObjectView.cs
- SinglePageViewer.cs
- CalendarButton.cs
- ScrollEvent.cs
- ParagraphVisual.cs
- DataServices.cs
- SqlAliasesReferenced.cs
- SyndicationItemFormatter.cs
- LockedAssemblyCache.cs
- CheckBox.cs
- TokenizerHelper.cs
- StorageRoot.cs
- ToolStripItemImageRenderEventArgs.cs
- GridView.cs
- SplitterCancelEvent.cs
- FileAuthorizationModule.cs
- Converter.cs
- PipelineComponent.cs
- ListViewItemMouseHoverEvent.cs
- WebConvert.cs
- XmlRawWriterWrapper.cs
- SettingsSection.cs
- SQLDateTimeStorage.cs
- AsyncCompletedEventArgs.cs
- SimpleTypeResolver.cs
- Part.cs
- DataServicePagingProviderWrapper.cs
- SkinBuilder.cs
- RichTextBoxAutomationPeer.cs
- OleDbException.cs
- CodeLinePragma.cs
- ASCIIEncoding.cs
- EntityViewGenerationAttribute.cs
- EntityDataSource.cs
- PtsHost.cs
- TreeNodeClickEventArgs.cs
- HotSpot.cs
- ConfigurationSectionGroupCollection.cs
- ThousandthOfEmRealDoubles.cs
- OptimizerPatterns.cs
- InputReportEventArgs.cs
- ImageField.cs
- Item.cs
- AttributeData.cs
- CodeTypeReference.cs
- NumberFunctions.cs
- OdbcCommand.cs
- ParentQuery.cs
- UnicastIPAddressInformationCollection.cs
- LambdaCompiler.cs
- AspNetRouteServiceHttpHandler.cs
- InlineUIContainer.cs
- SqlMethodTransformer.cs
- FontFamilyConverter.cs
- ImageProxy.cs
- StateFinalizationDesigner.cs
- DataBindEngine.cs
- DesignBinding.cs
- CssClassPropertyAttribute.cs