Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / SingleQueryOperator.cs / 1305376 / SingleQueryOperator.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SingleQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// Single searches the input to find the sole element that satisfies the (optional)
/// predicate. If multiple such elements are found, the caller is responsible for
/// producing an error. There is some degree of cross-partition synchronization to
/// proactively hault the search if we ever determine there are multiple elements
/// satisfying the search in the input.
///
///
internal sealed class SingleQueryOperator : UnaryQueryOperator
{
private readonly Func m_predicate; // The optional predicate used during the search.
//----------------------------------------------------------------------------------------
// Initializes a new Single operator.
//
// Arguments:
// child - the child whose data we will reverse
//
internal SingleQueryOperator(IEnumerable child, Func predicate)
:base(child)
{
Contract.Assert(child != null, "child data source cannot be null");
m_predicate = predicate;
}
//---------------------------------------------------------------------------------------
// Just opens the current operator, including opening the child and wrapping it with
// partitions as needed.
//
internal override QueryResults Open(
QuerySettings settings, bool preferStriping)
{
QueryResults childQueryResults = Child.Open(settings, false);
return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
}
internal override void WrapPartitionedStream(
PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
PartitionedStream outputStream = new PartitionedStream(
partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Shuffled);
Shared totalElementCount = new Shared(0);
for (int i = 0; i < partitionCount; i++)
{
outputStream[i] = new SingleQueryOperatorEnumerator(inputStream[i], m_predicate, totalElementCount);
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable AsSequentialQuery(CancellationToken token)
{
Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
throw new NotSupportedException();
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get { return false; }
}
//----------------------------------------------------------------------------------------
// The enumerator type responsible for executing the Single operation.
//
class SingleQueryOperatorEnumerator : QueryOperatorEnumerator
{
private QueryOperatorEnumerator m_source; // The data source to enumerate.
private Func m_predicate; // The optional predicate used during the search.
private bool m_alreadySearched; // Whether we have searched our input already.
private bool m_yieldExtra; // Whether we found more than one element.
// Data shared among partitions.
private Shared m_totalElementCount; // The total count of elements found.
//---------------------------------------------------------------------------------------
// Instantiates a new enumerator.
//
internal SingleQueryOperatorEnumerator(QueryOperatorEnumerator source,
Func predicate, Shared totalElementCount)
{
Contract.Assert(source != null);
Contract.Assert(totalElementCount != null);
m_source = source;
m_predicate = predicate;
m_totalElementCount = totalElementCount;
}
//----------------------------------------------------------------------------------------
// Straightforward IEnumerator methods.
//
internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
{
Contract.Assert(m_source != null);
if (m_alreadySearched)
{
// If we've already searched, we will "fake out" the caller by returning an extra
// element at the end in the case that we've found more than one element.
if (m_yieldExtra)
{
m_yieldExtra = false;
currentElement = default(TSource);
currentKey = 0;
return true;
}
return false;
}
// Scan our input, looking for a match.
bool found = false;
TSource current = default(TSource);
TKey keyUnused = default(TKey);
while (m_source.MoveNext(ref current, ref keyUnused))
{
// If the predicate is null or the current element satisfies it, we will remember
// it so that we can yield it later. We then proceed with scanning the input
// in case there are multiple such elements.
if (m_predicate == null || m_predicate(current))
{
// Notify other partitions.
Interlocked.Increment(ref m_totalElementCount.Value);
currentElement = current;
currentKey = 0;
if (found)
{
// Already found an element previously, we can exit.
m_yieldExtra = true;
break;
}
else
{
found = true;
}
}
// If we've already determined there is more than one matching element in the
// data source, we can exit right away.
if (m_totalElementCount.Value > 1)
{
break;
}
}
m_alreadySearched = true;
return found;
}
protected override void Dispose(bool disposing)
{
m_source.Dispose();
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// SingleQueryOperator.cs
//
// [....]
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics.Contracts;
namespace System.Linq.Parallel
{
///
/// Single searches the input to find the sole element that satisfies the (optional)
/// predicate. If multiple such elements are found, the caller is responsible for
/// producing an error. There is some degree of cross-partition synchronization to
/// proactively hault the search if we ever determine there are multiple elements
/// satisfying the search in the input.
///
///
internal sealed class SingleQueryOperator : UnaryQueryOperator
{
private readonly Func m_predicate; // The optional predicate used during the search.
//----------------------------------------------------------------------------------------
// Initializes a new Single operator.
//
// Arguments:
// child - the child whose data we will reverse
//
internal SingleQueryOperator(IEnumerable child, Func predicate)
:base(child)
{
Contract.Assert(child != null, "child data source cannot be null");
m_predicate = predicate;
}
//---------------------------------------------------------------------------------------
// Just opens the current operator, including opening the child and wrapping it with
// partitions as needed.
//
internal override QueryResults Open(
QuerySettings settings, bool preferStriping)
{
QueryResults childQueryResults = Child.Open(settings, false);
return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
}
internal override void WrapPartitionedStream(
PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
PartitionedStream outputStream = new PartitionedStream(
partitionCount, Util.GetDefaultComparer(), OrdinalIndexState.Shuffled);
Shared totalElementCount = new Shared(0);
for (int i = 0; i < partitionCount; i++)
{
outputStream[i] = new SingleQueryOperatorEnumerator(inputStream[i], m_predicate, totalElementCount);
}
recipient.Receive(outputStream);
}
//---------------------------------------------------------------------------------------
// Returns an enumerable that represents the query executing sequentially.
//
internal override IEnumerable AsSequentialQuery(CancellationToken token)
{
Contract.Assert(false, "This method should never be called as it is an ending operator with LimitsParallelism=false.");
throw new NotSupportedException();
}
//---------------------------------------------------------------------------------------
// Whether this operator performs a premature merge.
//
internal override bool LimitsParallelism
{
get { return false; }
}
//----------------------------------------------------------------------------------------
// The enumerator type responsible for executing the Single operation.
//
class SingleQueryOperatorEnumerator : QueryOperatorEnumerator
{
private QueryOperatorEnumerator m_source; // The data source to enumerate.
private Func m_predicate; // The optional predicate used during the search.
private bool m_alreadySearched; // Whether we have searched our input already.
private bool m_yieldExtra; // Whether we found more than one element.
// Data shared among partitions.
private Shared m_totalElementCount; // The total count of elements found.
//---------------------------------------------------------------------------------------
// Instantiates a new enumerator.
//
internal SingleQueryOperatorEnumerator(QueryOperatorEnumerator source,
Func predicate, Shared totalElementCount)
{
Contract.Assert(source != null);
Contract.Assert(totalElementCount != null);
m_source = source;
m_predicate = predicate;
m_totalElementCount = totalElementCount;
}
//----------------------------------------------------------------------------------------
// Straightforward IEnumerator methods.
//
internal override bool MoveNext(ref TSource currentElement, ref int currentKey)
{
Contract.Assert(m_source != null);
if (m_alreadySearched)
{
// If we've already searched, we will "fake out" the caller by returning an extra
// element at the end in the case that we've found more than one element.
if (m_yieldExtra)
{
m_yieldExtra = false;
currentElement = default(TSource);
currentKey = 0;
return true;
}
return false;
}
// Scan our input, looking for a match.
bool found = false;
TSource current = default(TSource);
TKey keyUnused = default(TKey);
while (m_source.MoveNext(ref current, ref keyUnused))
{
// If the predicate is null or the current element satisfies it, we will remember
// it so that we can yield it later. We then proceed with scanning the input
// in case there are multiple such elements.
if (m_predicate == null || m_predicate(current))
{
// Notify other partitions.
Interlocked.Increment(ref m_totalElementCount.Value);
currentElement = current;
currentKey = 0;
if (found)
{
// Already found an element previously, we can exit.
m_yieldExtra = true;
break;
}
else
{
found = true;
}
}
// If we've already determined there is more than one matching element in the
// data source, we can exit right away.
if (m_totalElementCount.Value > 1)
{
break;
}
}
m_alreadySearched = true;
return found;
}
protected override void Dispose(bool disposing)
{
m_source.Dispose();
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ValueConversionAttribute.cs
- DeadCharTextComposition.cs
- TypeRefElement.cs
- AbandonedMutexException.cs
- ItemContainerProviderWrapper.cs
- entityreference_tresulttype.cs
- WpfSharedXamlSchemaContext.cs
- SystemIcons.cs
- ConditionalAttribute.cs
- UInt32.cs
- WorkflowPersistenceContext.cs
- ThrowOnMultipleAssignment.cs
- HtmlInputText.cs
- Rule.cs
- OptimizerPatterns.cs
- Merger.cs
- Condition.cs
- SurrogateEncoder.cs
- WindowsStartMenu.cs
- SqlInternalConnectionSmi.cs
- unsafenativemethodsother.cs
- ToolboxDataAttribute.cs
- ReflectionPermission.cs
- MappingException.cs
- DbReferenceCollection.cs
- ColorDialog.cs
- AddInIpcChannel.cs
- ProcessThread.cs
- AdvancedBindingPropertyDescriptor.cs
- ObjectDataSourceFilteringEventArgs.cs
- HtmlValidatorAdapter.cs
- GlobalEventManager.cs
- GlyphsSerializer.cs
- StorageEntityTypeMapping.cs
- SweepDirectionValidation.cs
- QilLoop.cs
- KeySpline.cs
- OrderByBuilder.cs
- MailWriter.cs
- RoutedEvent.cs
- TemplateAction.cs
- DiagnosticsConfigurationHandler.cs
- ValueTypePropertyReference.cs
- TextRangeEditTables.cs
- DodSequenceMerge.cs
- ColorTransformHelper.cs
- CrossContextChannel.cs
- cache.cs
- wgx_commands.cs
- XmlAttributeAttribute.cs
- FilteredDataSetHelper.cs
- ParameterModifier.cs
- DataListCommandEventArgs.cs
- GeometryConverter.cs
- TimerElapsedEvenArgs.cs
- MediaTimeline.cs
- GridLength.cs
- PropertyInfoSet.cs
- ObjectFactoryCodeDomTreeGenerator.cs
- InvalidDataContractException.cs
- FuncTypeConverter.cs
- CollectionExtensions.cs
- KeyGestureValueSerializer.cs
- JsonDeserializer.cs
- DoubleCollection.cs
- ToolStripGripRenderEventArgs.cs
- ReadOnlyObservableCollection.cs
- ProtocolViolationException.cs
- MultiPageTextView.cs
- ByteBufferPool.cs
- recordstatescratchpad.cs
- TableLayoutSettings.cs
- ListViewSortEventArgs.cs
- AppLevelCompilationSectionCache.cs
- RTLAwareMessageBox.cs
- XPathDescendantIterator.cs
- SemanticResultValue.cs
- DropDownList.cs
- MetadataUtilsSmi.cs
- StreamGeometry.cs
- XmlEntityReference.cs
- SmiMetaDataProperty.cs
- SessionStateModule.cs
- DbgCompiler.cs
- MailMessage.cs
- MailDefinitionBodyFileNameEditor.cs
- MemoryMappedView.cs
- Font.cs
- UntypedNullExpression.cs
- MultipleFilterMatchesException.cs
- CacheOutputQuery.cs
- CodeSnippetExpression.cs
- ReceiveSecurityHeader.cs
- SecurityHelper.cs
- HTMLTagNameToTypeMapper.cs
- PathFigureCollectionValueSerializer.cs
- Encoding.cs
- Misc.cs
- SmtpLoginAuthenticationModule.cs
- _ConnectOverlappedAsyncResult.cs