Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Binary / IntersectQueryOperator.cs / 1305376 / IntersectQueryOperator.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IntersectQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
namespace System.Linq.Parallel
{
///
/// Operator that yields the intersection of two data sources.
///
///
internal sealed class IntersectQueryOperator :
BinaryQueryOperator
{
private readonly IEqualityComparer m_comparer; // An equality comparer.
//----------------------------------------------------------------------------------------
// Constructs a new intersection operator.
//
internal IntersectQueryOperator(ParallelQuery left, ParallelQuery right, IEqualityComparer comparer)
:base(left, right)
{
Contract.Assert(left != null && right != null, "child data sources cannot be null");
m_comparer = comparer;
m_outputOrdered = LeftChild.OutputOrdered;
SetOrdinalIndex(OrdinalIndexState.Shuffled);
}
internal override QueryResults Open(
QuerySettings settings, bool preferStriping)
{
// We just open our child operators, left and then right. Do not propagate the preferStriping value, but
// instead explicitly set it to false. Regardless of whether the parent prefers striping or range
// partitioning, the output will be hash-partititioned.
QueryResults leftChildResults = LeftChild.Open(settings, false);
QueryResults rightChildResults = RightChild.Open(settings, false);
return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
}
public override void WrapPartitionedStream(
PartitionedStream leftPartitionedStream, PartitionedStream rightPartitionedStream,
IPartitionedStreamRecipient outputRecipient, bool preferStriping, QuerySettings settings)
{
Contract.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount);
if (OutputOrdered)
{
WrapPartitionedStreamHelper(
ExchangeUtilities.HashRepartitionOrdered(
leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
}
else
{
WrapPartitionedStreamHelper(
ExchangeUtilities.HashRepartition(
leftPartitionedStream, null, null, m_comparer, settings.CancellationState.MergedCancellationToken),
rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
}
}
//---------------------------------------------------------------------------------------
// This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
// to be, and then call this method with that key as a generic parameter.
//
private void WrapPartitionedStreamHelper(
PartitionedStream, TLeftKey> leftHashStream, PartitionedStream rightPartitionedStream,
IPartitionedStreamRecipient outputRecipient, CancellationToken cancellationToken)
{
int partitionCount = leftHashStream.PartitionCount;
PartitionedStream, int> rightHashStream =
ExchangeUtilities.HashRepartition(
rightPartitionedStream, null, null, m_comparer, cancellationToken);
PartitionedStream outputStream =
new PartitionedStream(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
for (int i = 0; i < partitionCount; i++)
{
if (OutputOrdered)
{
outputStream[i] = new OrderedIntersectQueryOperatorEnumerator(
leftHashStream[i], rightHashStream[i], m_comparer, leftHashStream.KeyComparer, cancellationToken);
}
else
{
outputStream[i] = (QueryOperatorEnumerator)(object)
new IntersectQueryOperatorEnumerator(leftHashStream[i], rightHashStream[i], m_comparer, cancellationToken);
}
}
outputRecipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get { return false; }
}
//---------------------------------------------------------------------------------------
// This enumerator performs the intersection operation incrementally. It does this by
// maintaining a history -- in the form of a set -- of all data already seen. It then
// only returns elements that are seen twice (returning each one only once).
//
class IntersectQueryOperatorEnumerator : QueryOperatorEnumerator
{
private QueryOperatorEnumerator, TLeftKey> m_leftSource; // Left data source.
private QueryOperatorEnumerator, int> m_rightSource; // Right data source.
private IEqualityComparer m_comparer; // Comparer to use for equality/hash-coding.
private Set m_hashLookup; // The hash lookup, used to produce the intersection.
private CancellationToken m_cancellationToken;
private Shared m_outputLoopCount;
//----------------------------------------------------------------------------------------
// Instantiates a new intersection operator.
//
internal IntersectQueryOperatorEnumerator(
QueryOperatorEnumerator, TLeftKey> leftSource,
QueryOperatorEnumerator, int> rightSource,
IEqualityComparer comparer, CancellationToken cancellationToken)
{
Contract.Assert(leftSource != null);
Contract.Assert(rightSource != null);
m_leftSource = leftSource;
m_rightSource = rightSource;
m_comparer = comparer;
m_cancellationToken = cancellationToken;
}
//---------------------------------------------------------------------------------------
// Walks the two data sources, left and then right, to produce the intersection.
//
internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
{
Contract.Assert(m_leftSource != null);
Contract.Assert(m_rightSource != null);
// Build the set out of the right data source, if we haven't already.
if (m_hashLookup == null)
{
m_outputLoopCount = new Shared(0);
// @
m_hashLookup = new Set(m_comparer);
Pair rightElement = default(Pair);
int rightKeyUnused = default(int);
int i = 0;
while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
m_hashLookup.Add(rightElement.First);
}
}
// Now iterate over the left data source, looking for matches.
Pair leftElement = default(Pair);
TLeftKey keyUnused = default(TLeftKey);
while (m_leftSource.MoveNext(ref leftElement, ref keyUnused))
{
if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// If we found the element in our set, and if we haven't returned it yet,
// we can yield it to the caller. We also mark it so we know we've returned
// it once already and never will again.
if (m_hashLookup.Contains(leftElement.First))
{
// @
m_hashLookup.Remove(leftElement.First);
currentElement = leftElement.First;
#if DEBUG
currentKey = unchecked((int)0xdeadbeef);
#endif
return true;
}
}
return false;
}
protected override void Dispose(bool disposing)
{
Contract.Assert(m_leftSource != null && m_rightSource != null);
m_leftSource.Dispose();
m_rightSource.Dispose();
}
}
//----------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable AsSequentialQuery(CancellationToken token)
{
IEnumerable wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
IEnumerable wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
return wrappedLeftChild.Intersect(wrappedRightChild, m_comparer);
}
class OrderedIntersectQueryOperatorEnumerator : QueryOperatorEnumerator
{
private QueryOperatorEnumerator, TLeftKey> m_leftSource; // Left data source.
private QueryOperatorEnumerator, int> m_rightSource; // Right data source.
private IEqualityComparer> m_comparer; // Comparer to use for equality/hash-coding.
private IComparer m_leftKeyComparer; // Comparer to use to determine ordering of order keys.
private Dictionary, Pair> m_hashLookup; // The hash lookup, used to produce the intersection.
private CancellationToken m_cancellationToken;
//----------------------------------------------------------------------------------------
// Instantiates a new intersection operator.
//
internal OrderedIntersectQueryOperatorEnumerator(
QueryOperatorEnumerator, TLeftKey> leftSource,
QueryOperatorEnumerator, int> rightSource,
IEqualityComparer comparer, IComparer leftKeyComparer,
CancellationToken cancellationToken)
{
Contract.Assert(leftSource != null);
Contract.Assert(rightSource != null);
m_leftSource = leftSource;
m_rightSource = rightSource;
m_comparer = new WrapperEqualityComparer(comparer);
m_leftKeyComparer = leftKeyComparer;
m_cancellationToken = cancellationToken;
}
//---------------------------------------------------------------------------------------
// Walks the two data sources, left and then right, to produce the intersection.
//
internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)
{
Contract.Assert(m_leftSource != null);
Contract.Assert(m_rightSource != null);
// Build the set out of the left data source, if we haven't already.
int i = 0;
if (m_hashLookup == null)
{
// @
m_hashLookup = new Dictionary, Pair>(m_comparer);
Pair leftElement = default(Pair);
TLeftKey leftKey = default(TLeftKey);
while (m_leftSource.MoveNext(ref leftElement, ref leftKey))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// For each element, we track the smallest order key for that element that we saw so far
Pair oldEntry;
Wrapper wrappedLeftElem = new Wrapper(leftElement.First);
// If this is the first occurence of this element, or the order key is lower than all keys we saw previously,
// update the order key for this element.
if (!m_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || m_leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
{
// For each "elem" value, we store the smallest key, and the element value that had that key.
// Note that even though two element values are "equal" according to the EqualityComparer,
// we still cannot choose arbitrarily which of the two to yield.
m_hashLookup[wrappedLeftElem] = new Pair(leftElement.First, leftKey);
}
}
}
// Now iterate over the right data source, looking for matches.
Pair rightElement = default(Pair);
int rightKeyUnused = default(int);
while (m_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
{
if ((i++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// If we found the element in our set, and if we haven't returned it yet,
// we can yield it to the caller. We also mark it so we know we've returned
// it once already and never will again.
Pair entry;
Wrapper wrappedRightElem = new Wrapper(rightElement.First);
if (m_hashLookup.TryGetValue(wrappedRightElem, out entry))
{
currentElement = entry.First;
currentKey = entry.Second;
// @
m_hashLookup.Remove(new Wrapper(entry.First));
return true;
}
}
return false;
}
protected override void Dispose(bool disposing)
{
Contract.Assert(m_leftSource != null && m_rightSource != null);
m_leftSource.Dispose();
m_rightSource.Dispose();
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DbConnectionPoolCounters.cs
- TreeSet.cs
- tooltip.cs
- XPathMessageFilterElementComparer.cs
- PersonalizationAdministration.cs
- UserNameSecurityTokenAuthenticator.cs
- basemetadatamappingvisitor.cs
- PagedDataSource.cs
- UnsafeNativeMethodsTablet.cs
- DragCompletedEventArgs.cs
- ImageBrush.cs
- OneWayElement.cs
- ChangeProcessor.cs
- KnownColorTable.cs
- XmlBinaryReader.cs
- SQLBytes.cs
- RawTextInputReport.cs
- XmlSchemaRedefine.cs
- RestClientProxyHandler.cs
- FontInfo.cs
- ToolStripDropDownClosedEventArgs.cs
- Assert.cs
- SingleObjectCollection.cs
- UserControlBuildProvider.cs
- FontDriver.cs
- DispatcherExceptionEventArgs.cs
- Sequence.cs
- WebControlAdapter.cs
- SourceFileInfo.cs
- ListViewItemMouseHoverEvent.cs
- FormViewActionList.cs
- Annotation.cs
- DefaultSerializationProviderAttribute.cs
- FrameDimension.cs
- Camera.cs
- BindingManagerDataErrorEventArgs.cs
- UnicodeEncoding.cs
- DataGridViewRowDividerDoubleClickEventArgs.cs
- NotImplementedException.cs
- httpstaticobjectscollection.cs
- SystemThemeKey.cs
- TransactionTable.cs
- DataGridViewRowPostPaintEventArgs.cs
- DataBindingCollectionConverter.cs
- DescendentsWalker.cs
- XmlMapping.cs
- InfoCardTraceRecord.cs
- WindowsTokenRoleProvider.cs
- OpacityConverter.cs
- ThicknessKeyFrameCollection.cs
- ModuleBuilder.cs
- CompiledRegexRunnerFactory.cs
- CompilerGlobalScopeAttribute.cs
- _FixedSizeReader.cs
- TailCallAnalyzer.cs
- Currency.cs
- UidManager.cs
- KeyNotFoundException.cs
- DataGridViewCellMouseEventArgs.cs
- GeneralTransformCollection.cs
- SchemaImporterExtension.cs
- JsonServiceDocumentSerializer.cs
- HtmlInputReset.cs
- ShaderEffect.cs
- DBAsyncResult.cs
- PageAsyncTaskManager.cs
- SudsWriter.cs
- ISO2022Encoding.cs
- AuthenticationModuleElement.cs
- WizardPanelChangingEventArgs.cs
- Span.cs
- TabletCollection.cs
- XamlToRtfParser.cs
- GAC.cs
- MediaContextNotificationWindow.cs
- HttpGetProtocolImporter.cs
- ObjectDataSourceView.cs
- Style.cs
- WsdlInspector.cs
- GridViewCellAutomationPeer.cs
- OleStrCAMarshaler.cs
- BinaryFormatter.cs
- HashAlgorithm.cs
- ButtonPopupAdapter.cs
- ExpanderAutomationPeer.cs
- PrintDialog.cs
- CodeAttributeDeclarationCollection.cs
- XmlSchemaInfo.cs
- IdnMapping.cs
- FormView.cs
- PointAnimation.cs
- ProxyHwnd.cs
- MenuStrip.cs
- ClientConfigPaths.cs
- UIPropertyMetadata.cs
- WmlMobileTextWriter.cs
- WebBrowserProgressChangedEventHandler.cs
- RequestCache.cs
- LogRecordSequence.cs
- XmlHierarchicalDataSourceView.cs