Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SelectManyQueryOperator.cs / 1305376 / SelectManyQueryOperator.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SelectManyQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
namespace System.Linq.Parallel
{
///
/// SelectMany is effectively a nested loops join. It is given two data sources, an
/// outer and an inner -- actually, the inner is sometimes calculated by invoking a
/// function for each outer element -- and we walk the outer, walking the entire
/// inner enumerator for each outer element. There is an optional result selector
/// function which can transform the output before yielding it as a result element.
///
/// Notes:
/// Although select many takes two enumerable objects as input, it appears to the
/// query analysis infrastructure as a unary operator. That's because it works a
/// little differently than the other binary operators: it has to re-open the right
/// child every time an outer element is walked. The right child is NOT partitioned.
///
///
///
///
internal sealed class SelectManyQueryOperator : UnaryQueryOperator
{
private readonly Func> m_rightChildSelector; // To select a new child each iteration.
private readonly Func> m_indexedRightChildSelector; // To select a new child each iteration.
private readonly Func m_resultSelector; // An optional result selection function.
private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator.
//----------------------------------------------------------------------------------------
// Initializes a new select-many operator.
//
// Arguments:
// leftChild - the left data source from which to pull data.
// rightChild - the right data source from which to pull data.
// rightChildSelector - if no right data source was supplied, the selector function
// will generate a new right child for every unique left element.
// resultSelector - a selection function for creating output elements.
//
internal SelectManyQueryOperator(IEnumerable leftChild,
Func> rightChildSelector,
Func> indexedRightChildSelector,
Func resultSelector)
:base(leftChild)
{
Contract.Assert(leftChild != null, "left child data source cannot be null");
Contract.Assert(rightChildSelector != null || indexedRightChildSelector != null,
"either right child data or selector must be supplied");
Contract.Assert(rightChildSelector == null || indexedRightChildSelector == null,
"either indexed- or non-indexed child selector must be supplied (not both)");
Contract.Assert(typeof(TRightInput) == typeof(TOutput) || resultSelector != null,
"right input and output must be the same types, otherwise the result selector may not be null");
m_rightChildSelector = rightChildSelector;
m_indexedRightChildSelector = indexedRightChildSelector;
m_resultSelector = resultSelector;
// If the SelectMany is indexed, elements must be returned in the order in which
// indices were assigned.
m_outputOrdered = Child.OutputOrdered || indexedRightChildSelector != null;
InitOrderIndex();
}
private void InitOrderIndex()
{
if (m_indexedRightChildSelector != null)
{
// If this is an indexed SelectMany, we need the order keys to be Correct, so that we can pass them
// into the user delegate.
m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Correct);
}
else
{
if (OutputOrdered)
{
// If the output of this SelectMany is ordered, the input keys must be at least increasing. The
// SelectMany algorithm assumes that there will be no duplicate order keys, so if the order keys
// are Shuffled, we need to merge prematurely.
m_prematureMerge = ExchangeUtilities.IsWorseThan(Child.OrdinalIndexState, OrdinalIndexState.Increasing);
}
}
SetOrdinalIndexState(OrdinalIndexState.Shuffled);
}
internal override void WrapPartitionedStream(
PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (m_indexedRightChildSelector != null)
{
PartitionedStream inputStreamInt;
// If the index is not correct, we need to reindex.
if (m_prematureMerge)
{
ListQueryResults listResults =
QueryOperator.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings);
inputStreamInt = listResults.GetPartitionedStream();
}
else
{
inputStreamInt = (PartitionedStream)(object)inputStream;
}
WrapPartitionedStreamIndexed(inputStreamInt, recipient, settings);
return;
}
//
//
if (m_prematureMerge)
{
PartitionedStream inputStreamInt =
QueryOperator.ExecuteAndCollectResults(inputStream, partitionCount, OutputOrdered, preferStriping, settings)
.GetPartitionedStream();
WrapPartitionedStreamNotIndexed(inputStreamInt, recipient, settings);
}
else
{
WrapPartitionedStreamNotIndexed(inputStream, recipient, settings);
}
}
///
/// A helper method for WrapPartitionedStream. We use the helper to reuse a block of code twice, but with
/// a different order key type. (If premature merge occured, the order key type will be "int". Otherwise,
/// it will be the same type as "TLeftKey" in WrapPartitionedStream.)
///
private void WrapPartitionedStreamNotIndexed(
PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
var keyComparer = new PairComparer(inputStream.KeyComparer, Util.GetDefaultComparer());
var outputStream = new PartitionedStream>(partitionCount, keyComparer, OrdinalIndexState);
for (int i = 0; i < partitionCount; i++)
{
outputStream[i] = new SelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
}
recipient.Receive(outputStream);
}
///
/// Similar helper method to WrapPartitionedStreamNotIndexed, except that this one is for the indexed variant
/// of SelectMany (i.e., the SelectMany that passes indices into the user sequence-generating delegate)
///
private void WrapPartitionedStreamIndexed(
PartitionedStream inputStream, IPartitionedStreamRecipient recipient, QuerySettings settings)
{
var keyComparer = new PairComparer(inputStream.KeyComparer, Util.GetDefaultComparer());
var outputStream = new PartitionedStream>(inputStream.PartitionCount, keyComparer, OrdinalIndexState);
for (int i = 0; i < inputStream.PartitionCount; i++)
{
outputStream[i] = new IndexedSelectManyQueryOperatorEnumerator(inputStream[i], this, settings.CancellationState.MergedCancellationToken);
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// Just opens the current operator, including opening the left child and wrapping with a
// partition if needed. The right child is not opened yet -- this is always done on demand
// as the outer elements are enumerated.
//
internal override QueryResults Open(QuerySettings settings, bool preferStriping)
{
QueryResults childQueryResults = Child.Open(settings, preferStriping);
return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable AsSequentialQuery(CancellationToken token)
{
if (m_rightChildSelector != null)
{
if (m_resultSelector != null)
{
return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector, m_resultSelector);
}
return (IEnumerable)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_rightChildSelector));
}
else
{
Contract.Assert(m_indexedRightChildSelector != null);
if (m_resultSelector != null)
{
return CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector, m_resultSelector);
}
return (IEnumerable)(object)(CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token).SelectMany(m_indexedRightChildSelector));
}
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get { return m_prematureMerge; }
}
//----------------------------------------------------------------------------------------
// The enumerator type responsible for executing the SelectMany logic.
//
class IndexedSelectManyQueryOperatorEnumerator : QueryOperatorEnumerator>
{
private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate.
private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use.
private IEnumerator m_currentRightSource; // The current enumerator we're using.
private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
private readonly CancellationToken m_cancellationToken;
private class Mutables
{
internal int m_currentRightSourceIndex = -1; // The index for the right data source.
internal TLeftInput m_currentLeftElement; // The current element in the left data source.
internal int m_currentLeftSourceIndex; // The current key in the left data source.
internal int m_lhsCount; //counts the number of lhs elements enumerated. used for cancellation testing.
}
//---------------------------------------------------------------------------------------
// Instantiates a new select-many enumerator. Notice that the right data source is an
// enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
// data source.
//
internal IndexedSelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource,
SelectManyQueryOperator selectManyOperator,
CancellationToken cancellationToken)
{
Contract.Assert(leftSource != null);
Contract.Assert(selectManyOperator != null);
m_leftSource = leftSource;
m_selectManyOperator = selectManyOperator;
m_cancellationToken = cancellationToken;
}
//----------------------------------------------------------------------------------------
// Straightforward IEnumerator methods.
//
internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey)
{
while (true)
{
if (m_currentRightSource == null)
{
m_mutables = new Mutables();
// Check cancellation every few lhs-enumerations in case none of them are producing
// any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks.
if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// We don't have a "current" right enumerator to use. We have to fetch the next
// one. If the left has run out of elements, however, we're done and just return
// false right away.
if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftSourceIndex))
{
return false;
}
// Use the source selection routine to create a right child.
IEnumerable rightChild =
m_selectManyOperator.m_indexedRightChildSelector(m_mutables.m_currentLeftElement, m_mutables.m_currentLeftSourceIndex);
Contract.Assert(rightChild != null);
m_currentRightSource = rightChild.GetEnumerator();
Contract.Assert(m_currentRightSource != null);
// If we have no result selector, we will need to access the Current element of the right
// data source as though it is a TOutput. Unfortunately, we know that TRightInput must
// equal TOutput (we check it during operator construction), but the type system doesn't.
// Thus we would have to cast the result of invoking Current from type TRightInput to
// TOutput. This is no good, since the results could be value types. Instead, we save the
// enumerator object as an IEnumerator and access that later on.
if (m_selectManyOperator.m_resultSelector == null)
{
m_currentRightSourceAsOutput = (IEnumerator)(object)m_currentRightSource;
Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
"these must be equal, otherwise the surrounding logic will be broken");
}
}
if (m_currentRightSource.MoveNext())
{
m_mutables.m_currentRightSourceIndex++;
// If the inner data source has an element, we can yield it.
if (m_selectManyOperator.m_resultSelector != null)
{
// In the case of a selection function, use that to yield the next element.
currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
}
else
{
// Otherwise, the right input and output types must be the same. We use the
// casted copy of the current right source and just return its current element.
Contract.Assert(m_currentRightSourceAsOutput != null);
currentElement = m_currentRightSourceAsOutput.Current;
}
currentKey = new Pair(m_mutables.m_currentLeftSourceIndex, m_mutables.m_currentRightSourceIndex);
return true;
}
else
{
// Otherwise, we have exhausted the right data source. Loop back around and try
// to get the next left element, then its right, and so on.
m_currentRightSource.Dispose();
m_currentRightSource = null;
m_currentRightSourceAsOutput = null;
}
}
}
protected override void Dispose(bool disposing)
{
m_leftSource.Dispose();
if (m_currentRightSource != null)
{
m_currentRightSource.Dispose();
}
}
}
//----------------------------------------------------------------------------------------
// The enumerator type responsible for executing the SelectMany logic.
//
class SelectManyQueryOperatorEnumerator : QueryOperatorEnumerator>
{
private readonly QueryOperatorEnumerator m_leftSource; // The left data source to enumerate.
private readonly SelectManyQueryOperator m_selectManyOperator; // The select many operator to use.
private IEnumerator m_currentRightSource; // The current enumerator we're using.
private IEnumerator m_currentRightSourceAsOutput; // If we need to access the enumerator for output directly (no result selector).
private Mutables m_mutables; // bag of frequently mutated value types [allocate in moveNext to avoid false-sharing]
private readonly CancellationToken m_cancellationToken;
private class Mutables
{
internal int m_currentRightSourceIndex = -1; // The index for the right data source.
internal TLeftInput m_currentLeftElement; // The current element in the left data source.
internal TLeftKey m_currentLeftKey; // The current key in the left data source.
internal int m_lhsCount; // Counts the number of lhs elements enumerated. used for cancellation testing.
}
//---------------------------------------------------------------------------------------
// Instantiates a new select-many enumerator. Notice that the right data source is an
// enumera*BLE* not an enumera*TOR*. It is re-opened for every single element in the left
// data source.
//
internal SelectManyQueryOperatorEnumerator(QueryOperatorEnumerator leftSource,
SelectManyQueryOperator selectManyOperator,
CancellationToken cancellationToken)
{
Contract.Assert(leftSource != null);
Contract.Assert(selectManyOperator != null);
m_leftSource = leftSource;
m_selectManyOperator = selectManyOperator;
m_cancellationToken = cancellationToken;
}
//----------------------------------------------------------------------------------------
// Straightforward IEnumerator methods.
//
internal override bool MoveNext(ref TOutput currentElement, ref Pair currentKey)
{
while (true)
{
if (m_currentRightSource == null)
{
m_mutables = new Mutables();
// Check cancellation every few lhs-enumerations in case none of them are producing
// any outputs. Otherwise, we rely on the consumer of this operator to be performing the checks.
if ((m_mutables.m_lhsCount++ & CancellationState.POLL_INTERVAL) == 0)
CancellationState.ThrowIfCanceled(m_cancellationToken);
// We don't have a "current" right enumerator to use. We have to fetch the next
// one. If the left has run out of elements, however, we're done and just return
// false right away.
if (!m_leftSource.MoveNext(ref m_mutables.m_currentLeftElement, ref m_mutables.m_currentLeftKey))
{
return false;
}
// Use the source selection routine to create a right child.
IEnumerable rightChild = m_selectManyOperator.m_rightChildSelector(m_mutables.m_currentLeftElement);
Contract.Assert(rightChild != null);
m_currentRightSource = rightChild.GetEnumerator();
Contract.Assert(m_currentRightSource != null);
// If we have no result selector, we will need to access the Current element of the right
// data source as though it is a TOutput. Unfortunately, we know that TRightInput must
// equal TOutput (we check it during operator construction), but the type system doesn't.
// Thus we would have to cast the result of invoking Current from type TRightInput to
// TOutput. This is no good, since the results could be value types. Instead, we save the
// enumerator object as an IEnumerator and access that later on.
if (m_selectManyOperator.m_resultSelector == null)
{
m_currentRightSourceAsOutput = (IEnumerator)(object)m_currentRightSource;
Contract.Assert(m_currentRightSourceAsOutput == m_currentRightSource,
"these must be equal, otherwise the surrounding logic will be broken");
}
}
if (m_currentRightSource.MoveNext())
{
m_mutables.m_currentRightSourceIndex++;
// If the inner data source has an element, we can yield it.
if (m_selectManyOperator.m_resultSelector != null)
{
// In the case of a selection function, use that to yield the next element.
currentElement = m_selectManyOperator.m_resultSelector(m_mutables.m_currentLeftElement, m_currentRightSource.Current);
}
else
{
// Otherwise, the right input and output types must be the same. We use the
// casted copy of the current right source and just return its current element.
Contract.Assert(m_currentRightSourceAsOutput != null);
currentElement = m_currentRightSourceAsOutput.Current;
}
currentKey = new Pair(m_mutables.m_currentLeftKey, m_mutables.m_currentRightSourceIndex);
return true;
}
else
{
// Otherwise, we have exhausted the right data source. Loop back around and try
// to get the next left element, then its right, and so on.
m_currentRightSource.Dispose();
m_currentRightSource = null;
m_currentRightSourceAsOutput = null;
}
}
}
protected override void Dispose(bool disposing)
{
m_leftSource.Dispose();
if (m_currentRightSource != null)
{
m_currentRightSource.Dispose();
}
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- EntityDataSourceDesigner.cs
- HtmlLabelAdapter.cs
- EFColumnProvider.cs
- glyphs.cs
- LayoutTable.cs
- WebPartCancelEventArgs.cs
- WebPartActionVerb.cs
- SafeEventLogWriteHandle.cs
- RuntimeArgumentHandle.cs
- DocumentPageViewAutomationPeer.cs
- DBBindings.cs
- ErrorTolerantObjectWriter.cs
- WebPartTracker.cs
- ExtendedPropertyInfo.cs
- InputScopeNameConverter.cs
- SingletonInstanceContextProvider.cs
- URLIdentityPermission.cs
- RadioButtonRenderer.cs
- LogSwitch.cs
- XmlNamedNodeMap.cs
- DataSysAttribute.cs
- RandomNumberGenerator.cs
- BamlMapTable.cs
- Line.cs
- TTSEngineTypes.cs
- DataBoundControlHelper.cs
- TransformValueSerializer.cs
- SimpleApplicationHost.cs
- TextOutput.cs
- AttributedMetaModel.cs
- MultiSelectRootGridEntry.cs
- CellQuery.cs
- TypeNameConverter.cs
- NativeRightsManagementAPIsStructures.cs
- FilteredDataSetHelper.cs
- SelectorAutomationPeer.cs
- DesignerForm.cs
- RuleRefElement.cs
- PageAdapter.cs
- DataGridViewTextBoxCell.cs
- OrderedDictionary.cs
- TypeDescriptionProvider.cs
- EntityModelSchemaGenerator.cs
- WorkflowOwnerAsyncResult.cs
- EntityType.cs
- ContextDataSourceView.cs
- SQLInt64.cs
- UInt64Storage.cs
- StatusBarPanelClickEvent.cs
- XmlSchemaImport.cs
- RegexMatchCollection.cs
- CommandBindingCollection.cs
- ComponentResourceManager.cs
- RemoteWebConfigurationHostServer.cs
- Module.cs
- PowerModeChangedEventArgs.cs
- ThicknessKeyFrameCollection.cs
- CuspData.cs
- WebExceptionStatus.cs
- XmlBindingWorker.cs
- FamilyCollection.cs
- PolyBezierSegment.cs
- ContextStaticAttribute.cs
- RuntimeArgument.cs
- DataControlField.cs
- StylusLogic.cs
- OracleConnectionString.cs
- _AutoWebProxyScriptHelper.cs
- GenerateTemporaryTargetAssembly.cs
- DataGridTablesFactory.cs
- XmlSortKey.cs
- addressfiltermode.cs
- EnumMember.cs
- Ray3DHitTestResult.cs
- ObjectViewFactory.cs
- TextRunTypographyProperties.cs
- CheckBoxFlatAdapter.cs
- CmsInterop.cs
- CryptoConfig.cs
- DataGridViewCellPaintingEventArgs.cs
- XPathScanner.cs
- IndexOutOfRangeException.cs
- XmlQueryType.cs
- TextSpanModifier.cs
- FlowNode.cs
- GridView.cs
- XmlChildEnumerator.cs
- Pair.cs
- DesignerActionMethodItem.cs
- CompensationToken.cs
- DiscoveryClientDocuments.cs
- EventProviderBase.cs
- InvokeHandlers.cs
- ExpressionEditor.cs
- VisualStateManager.cs
- ProfilePropertyNameValidator.cs
- StrokeCollectionConverter.cs
- Clipboard.cs
- CompressStream.cs
- DataGridSortCommandEventArgs.cs