Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / DistinctQueryOperator.cs / 1305376 / DistinctQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // DistinctQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// This operator yields all of the distinct elements in a single data set. It works quite /// like the above set operations, with the obvious difference being that it only accepts /// a single data source as input. /// ///internal sealed class DistinctQueryOperator : UnaryQueryOperator { private readonly IEqualityComparer m_comparer; // An (optional) equality comparer. //---------------------------------------------------------------------------------------- // Constructs a new distinction operator. // internal DistinctQueryOperator(IEnumerable source, IEqualityComparer comparer) :base(source) { Contract.Assert(source != null, "child data source cannot be null"); m_comparer = comparer; SetOrdinalIndexState(OrdinalIndexState.Shuffled); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open our child operator. Do not propagate the preferStriping value, but // instead explicitly set it to false. Regardless of whether the parent prefers striping or range // partitioning, the output will be hash-partititioned. QueryResults childResults = Child.Open(settings, false); return new UnaryQueryOperatorResults(childResults, this, settings, false); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { // Hash-repartion the source stream if (OutputOrdered) { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartitionOrdered ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } else { WrapPartitionedStreamHelper ( ExchangeUtilities.HashRepartition ( inputStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken), recipient, settings.CancellationState.MergedCancellationToken); } } //--------------------------------------------------------------------------------------- // This is a helper method. WrapPartitionedStream decides what type TKey is going // to be, and then call this method with that key as a generic parameter. // private void WrapPartitionedStreamHelper ( PartitionedStream , TKey> hashStream, IPartitionedStreamRecipient recipient, CancellationToken cancellationToken) { int partitionCount = hashStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream (partitionCount, hashStream.KeyComparer, OrdinalIndexState.Shuffled); for (int i = 0; i < partitionCount; i++) { if (OutputOrdered) { outputStream[i] = new OrderedDistinctQueryOperatorEnumerator (hashStream[i], m_comparer, hashStream.KeyComparer, cancellationToken); } else { outputStream[i] = (QueryOperatorEnumerator )(object) new DistinctQueryOperatorEnumerator (hashStream[i], m_comparer, cancellationToken); } } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //---------------------------------------------------------------------------------------- // This enumerator performs the distinct operation incrementally. It does this by // maintaining a history -- in the form of a set -- of all data already seen. It simply // then doesn't return elements it has already seen before. // class DistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Set m_hashLookup; // The hash lookup, used to produce the distinct set. private CancellationToken m_cancellationToken; private Shared m_outputLoopCount; // Allocated in MoveNext to avoid false sharing. //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal DistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; // @ m_hashLookup = new Set (comparer); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); // Iterate over this set's elements until we find a unique element. TKey keyUnused = default(TKey); Pair current = default(Pair ); if (m_outputLoopCount == null) m_outputLoopCount = new Shared (0); while (m_source.MoveNext(ref current, ref keyUnused)) { if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // We ensure we never return duplicates by tracking them in our set. if (m_hashLookup.Add(current.First)) { #if DEBUG currentKey = unchecked((int)0xdeadbeef); #endif currentElement = current.First; return true; } } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Distinct(m_comparer); } class OrderedDistinctQueryOperatorEnumerator : QueryOperatorEnumerator { private QueryOperatorEnumerator , TKey> m_source; // The data source. private Dictionary , TKey> m_hashLookup; // The hash lookup, used to produce the distinct set. private IComparer m_keyComparer; // Comparer to decide the key order. private IEnumerator , TKey>> m_hashLookupEnumerator; // Enumerates over m_hashLookup. private CancellationToken m_cancellationToken; //--------------------------------------------------------------------------------------- // Instantiates a new distinction operator. // internal OrderedDistinctQueryOperatorEnumerator( QueryOperatorEnumerator , TKey> source, IEqualityComparer comparer, IComparer keyComparer, CancellationToken cancellationToken) { Contract.Assert(source != null); m_source = source; m_keyComparer = keyComparer; // @ m_hashLookup = new Dictionary , TKey>( new WrapperEqualityComparer (comparer)); m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // Walks the single data source, skipping elements it has already seen. // internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) { Contract.Assert(m_source != null); Contract.Assert(m_hashLookup != null); if (m_hashLookupEnumerator == null) { Pair elem = default(Pair ); TKey orderKey = default(TKey); int i = 0; while (m_source.MoveNext(ref elem, ref orderKey)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // For each element, we track the smallest order key for that element that we saw so far TKey oldEntry; Wrapper wrappedElem = new Wrapper (elem.First); // If this is the first occurence of this element, or the order key is lower than all keys we saw previously, // update the order key for this element. if (!m_hashLookup.TryGetValue(wrappedElem, out oldEntry) || m_keyComparer.Compare(orderKey, oldEntry) < 0) { // For each "elem" value, we store the smallest key, and the element value that had that key. // Note that even though two element values are "equal" according to the EqualityComparer, // we still cannot choose arbitrarily which of the two to yield. m_hashLookup[wrappedElem] = orderKey; } } m_hashLookupEnumerator = m_hashLookup.GetEnumerator(); } if (m_hashLookupEnumerator.MoveNext()) { KeyValuePair , TKey> currentPair = m_hashLookupEnumerator.Current; currentElement = currentPair.Key.Value; currentKey = currentPair.Value; return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); if (m_hashLookupEnumerator != null) { m_hashLookupEnumerator.Dispose(); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- PinnedBufferMemoryStream.cs
- Models.cs
- SetStoryboardSpeedRatio.cs
- BitmapMetadata.cs
- EncoderParameters.cs
- InputScopeManager.cs
- GenericWebPart.cs
- XmlSchemaNotation.cs
- PropertyValue.cs
- XhtmlTextWriter.cs
- XmlnsCompatibleWithAttribute.cs
- XmlEncoding.cs
- TreeViewImageIndexConverter.cs
- HttpApplicationFactory.cs
- sqlinternaltransaction.cs
- CacheEntry.cs
- CryptoProvider.cs
- WebPartZoneBase.cs
- MeshGeometry3D.cs
- sqlcontext.cs
- WindowsListViewSubItem.cs
- WmlObjectListAdapter.cs
- X509Certificate.cs
- LogEntryDeserializer.cs
- LinearKeyFrames.cs
- FutureFactory.cs
- EntityDataSourceSelectingEventArgs.cs
- LockedActivityGlyph.cs
- SelectionPatternIdentifiers.cs
- XmlSchemaComplexType.cs
- WithParamAction.cs
- PerspectiveCamera.cs
- ConditionalExpression.cs
- XmlIncludeAttribute.cs
- TimeSpanOrInfiniteConverter.cs
- SecUtil.cs
- DispatcherOperation.cs
- FastEncoder.cs
- TypedReference.cs
- ImpersonateTokenRef.cs
- Popup.cs
- documentsequencetextpointer.cs
- CapabilitiesPattern.cs
- PageCopyCount.cs
- ButtonBaseAdapter.cs
- AtomMaterializerLog.cs
- SecureEnvironment.cs
- EventArgs.cs
- SecurityDescriptor.cs
- Triangle.cs
- ProvideValueServiceProvider.cs
- Size3DConverter.cs
- AssemblyCollection.cs
- XmlSchemaSubstitutionGroup.cs
- UpdatePanel.cs
- mediapermission.cs
- InvokeGenerator.cs
- CriticalExceptions.cs
- Trigger.cs
- ApplicationFileCodeDomTreeGenerator.cs
- XmlEncodedRawTextWriter.cs
- TreeNodeCollectionEditor.cs
- LinqDataSourceSelectEventArgs.cs
- HwndAppCommandInputProvider.cs
- DateTimePicker.cs
- InputLanguageSource.cs
- MatchingStyle.cs
- MediaTimeline.cs
- ToolStripItemEventArgs.cs
- HyperLinkDesigner.cs
- XmlStringTable.cs
- ShaderEffect.cs
- ListViewInsertionMark.cs
- PageHandlerFactory.cs
- _HeaderInfo.cs
- MSAAWinEventWrap.cs
- WindowsStartMenu.cs
- FontNamesConverter.cs
- XmlSchemaSubstitutionGroup.cs
- NameObjectCollectionBase.cs
- Win32MouseDevice.cs
- Walker.cs
- TextEffectCollection.cs
- HeaderFilter.cs
- DocumentationServerProtocol.cs
- WebPartDisplayModeEventArgs.cs
- TextTreeInsertUndoUnit.cs
- CommandField.cs
- QueueProcessor.cs
- WrapPanel.cs
- ScriptHandlerFactory.cs
- RoleServiceManager.cs
- SynchronizedInputProviderWrapper.cs
- FontStretch.cs
- Opcode.cs
- FileStream.cs
- TransactionProtocol.cs
- BamlVersionHeader.cs
- ThreadPool.cs
- HtmlSelect.cs