Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / TakeOrSkipQueryOperator.cs / 1305376 / TakeOrSkipQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // TakeOrSkipQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Threading; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// Take and Skip either take or skip a specified number of elements, captured in the /// count argument. These will work a little bit like TakeWhile and SkipWhile: there /// are two phases, (1) Search and (2) Yield. In the search phase, our goal is to /// find the 'count'th index from the input. We do this in parallel by sharing a count- /// sized array. Each thread ----s to populate the array with indices in ascending /// order. This requires synchronization for inserts. We use a simple heap, for decent /// worst case performance. After a thread has scanned �count� elements, or its current /// index is greater than or equal to the maximum index in the array (and the array is /// fully populated), the thread can stop searching. All threads issue a barrier before /// moving to the Yield phase. When the Yield phase is entered, the count-1th element /// of the array contains: in the case of Take, the maximum index (exclusive) to be /// returned; or in the case of Skip, the minimum index (inclusive) to be returned. The /// Yield phase simply consists of yielding these elements as output. /// ///internal sealed class TakeOrSkipQueryOperator : UnaryQueryOperator { private readonly int m_count; // The number of elements to take or skip. private readonly bool m_take; // Whether to take (true) or skip (false). private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. //---------------------------------------------------------------------------------------- // Initializes a new take-while operator. // // Arguments: // child - the child data source to enumerate // count - the number of elements to take or skip // take - whether this is a Take (true) or Skip (false) // internal TakeOrSkipQueryOperator(IEnumerable child, int count, bool take) :base(child) { Contract.Assert(child != null, "child data source cannot be null"); m_count = count; m_take = take; SetOrdinalIndexState(OutputOrdinalIndexState()); } /// /// Determines the order index state for the output operator /// private OrdinalIndexState OutputOrdinalIndexState() { OrdinalIndexState indexState = Child.OrdinalIndexState; if (indexState == OrdinalIndexState.Indexible) { return OrdinalIndexState.Indexible; } if (indexState.IsWorseThan(OrdinalIndexState.Increasing)) { m_prematureMerge = true; indexState = OrdinalIndexState.Correct; } // If the operator is skip and the index was correct, now it is only increasing. if (!m_take && indexState == OrdinalIndexState.Correct) { indexState = OrdinalIndexState.Increasing; } return indexState; } internal override void WrapPartitionedStream( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { Contract.Assert(Child.OrdinalIndexState != OrdinalIndexState.Indexible, "Don't take this code path if the child is indexible."); PartitionedStream inputIntStream; // If the index is not at least increasing, we need to reindex. if (m_prematureMerge) { ListQueryResults results = ExecuteAndCollectResults( inputStream, inputStream.PartitionCount, Child.OutputOrdered, preferStriping, settings); inputIntStream = results.GetPartitionedStream(); } else { Contract.Assert(typeof(TKey) == typeof(int)); inputIntStream = (PartitionedStream )((object)inputStream); } int partitionCount = inputStream.PartitionCount; FixedMaxHeap sharedIndices = new FixedMaxHeap (m_count); // an array used to track the sequence of indices leading up to the Nth index CountdownEvent sharredBarrier = new CountdownEvent(partitionCount); // a barrier to synchronize before yielding PartitionedStream outputStream = new PartitionedStream (partitionCount, Util.GetDefaultComparer (), OrdinalIndexState); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new TakeOrSkipQueryOperatorEnumerator( inputIntStream[i], m_count, m_take, sharedIndices, sharredBarrier, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { QueryResults childQueryResults = Child.Open(settings, true); return TakeOrSkipQueryOperatorResults.NewResults(childQueryResults, this, settings, preferStriping); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return OrdinalIndexState != OrdinalIndexState.Indexible; } } //--------------------------------------------------------------------------------------- // The enumerator type responsible for executing the Take or Skip. // class TakeOrSkipQueryOperatorEnumerator : QueryOperatorEnumerator { private readonly QueryOperatorEnumerator m_source; // The data source to enumerate. private readonly int m_count; // The number of elements to take or skip. private readonly bool m_take; // Whether to execute a Take (true) or Skip (false). // These fields are all shared among partitions. private readonly FixedMaxHeap m_sharedIndices; // The indices shared among partitions. private readonly CountdownEvent m_sharedBarrier; // To separate the search/yield phases. private readonly CancellationToken m_cancellationToken; // Indicates that cancellation has occurred. private List > m_buffer; // Our buffer. private Shared m_bufferIndex; // Our current index within the buffer. [allocate in moveNext to avoid false-sharing] //---------------------------------------------------------------------------------------- // Instantiates a new select enumerator. // internal TakeOrSkipQueryOperatorEnumerator( QueryOperatorEnumerator source, int count, bool take, FixedMaxHeap sharedIndices, CountdownEvent sharedBarrier, CancellationToken cancellationToken) { Contract.Assert(source != null); Contract.Assert(sharedIndices != null); Contract.Assert(sharedBarrier != null); Contract.Assert(sharedIndices.Size == count); m_source = source; m_count = count; m_take = take; m_sharedIndices = sharedIndices; m_sharedBarrier = sharedBarrier; m_cancellationToken = cancellationToken; } //--------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TResult currentElement, ref int currentKey) { Contract.Assert(m_sharedIndices != null); // If the buffer has not been created, we will populate it lazily on demand. if (m_buffer == null && m_count > 0) { // Create a buffer, but don't publish it yet (in case of exception). List > buffer = new List >(); // Enter the search phase. In this phase, all partitions ---- to populate // the shared indices with their first 'count' contiguous elements. TResult current = default(TResult); int index = default(int); int i = 0; //counter to help with cancellation while (buffer.Count < m_count && m_source.MoveNext(ref current, ref index)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // Add the current element to our buffer. // @ buffer.Add(new Pair (current, index)); // Now we will try to insert our index into the shared indices list, quitting if // our index is greater than all of the indices already inside it. lock (m_sharedIndices) { if (!m_sharedIndices.Insert(index)) { // We have read past the maximum index. We can move to the barrier now. break; } } } // Before exiting the search phase, we will synchronize with others. This is a barrier. m_sharedBarrier.Signal(); m_sharedBarrier.Wait(m_cancellationToken); // Publish the buffer and set the index to just before the 1st element. m_buffer = buffer; m_bufferIndex = new Shared (-1); } // Now either enter (or continue) the yielding phase. As soon as we reach this, we know the // index of the 'count'-th input element. if (m_take) { // In the case of a Take, we will yield each element from our buffer for which // the element is lesser than the 'count'-th index found. if (m_count == 0 || m_bufferIndex.Value >= m_buffer.Count - 1) { return false; } // Increment the index, and remember the values. ++m_bufferIndex.Value; currentElement = m_buffer[m_bufferIndex.Value].First; currentKey = m_buffer[m_bufferIndex.Value].Second; // Only yield the element if its index is less than or equal to the max index. int maxIndex = m_sharedIndices.MaxValue; return maxIndex == -1 || m_buffer[m_bufferIndex.Value].Second <= maxIndex; } else { int minIndex = -1; // If the count to skip was greater than 0, look at the buffer. if (m_count > 0) { // If there wasn't enough input to skip, return right away. if (m_sharedIndices.Count < m_count) { return false; } minIndex = m_sharedIndices.MaxValue; // In the case of a skip, we must skip over elements whose index is lesser than the // 'count'-th index found. Once we've exhausted the buffer, we must go back and continue // enumerating the data source until it is empty. if (m_bufferIndex.Value < m_buffer.Count - 1) { for (m_bufferIndex.Value++; m_bufferIndex.Value < m_buffer.Count; m_bufferIndex.Value++) { // If the current buffered element's index is greater than the 'count'-th index, // we will yield it as a result. if (m_buffer[m_bufferIndex.Value].Second > minIndex) { currentElement = m_buffer[m_bufferIndex.Value].First; currentKey = m_buffer[m_bufferIndex.Value].Second; return true; } } } } // Lastly, so long as our input still has elements, they will be yieldable. if (m_source.MoveNext(ref currentElement, ref currentKey)) { Contract.Assert(currentKey > minIndex, "expected remaining element indices to be greater than smallest"); return true; } } return false; } protected override void Dispose(bool disposing) { m_source.Dispose(); } } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { if (m_take) { return Child.AsSequentialQuery(token).Take(m_count); } IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.Skip(m_count); } //------------------------------------------------------------------------------------ // Query results for a Take or a Skip operator. The results are indexible if the child // results were indexible. // class TakeOrSkipQueryOperatorResults : UnaryQueryOperatorResults { TakeOrSkipQueryOperator m_takeOrSkipOp; // The operator that generated the results int m_childCount; // The number of elements in child results public static QueryResults NewResults( QueryResults childQueryResults, TakeOrSkipQueryOperator op, QuerySettings settings, bool preferStriping) { if (childQueryResults.IsIndexible) { return new TakeOrSkipQueryOperatorResults( childQueryResults, op, settings, preferStriping); } else { return new UnaryQueryOperatorResults( childQueryResults, op, settings, preferStriping); } } private TakeOrSkipQueryOperatorResults( QueryResults childQueryResults, TakeOrSkipQueryOperator takeOrSkipOp, QuerySettings settings, bool preferStriping) : base(childQueryResults, takeOrSkipOp, settings, preferStriping) { m_takeOrSkipOp = takeOrSkipOp; Contract.Assert(m_childQueryResults.IsIndexible); m_childCount = m_childQueryResults.ElementsCount; } internal override bool IsIndexible { get { return m_childCount >= 0; } } internal override int ElementsCount { get { Contract.Assert(m_childCount >= 0); if (m_takeOrSkipOp.m_take) { return Math.Min(m_childCount, m_takeOrSkipOp.m_count); } else { return Math.Max(m_childCount - m_takeOrSkipOp.m_count, 0); } } } internal override TResult GetElement(int index) { Contract.Assert(m_childCount >= 0); Contract.Assert(index >= 0); Contract.Assert(index < ElementsCount); if (m_takeOrSkipOp.m_take) { return m_childQueryResults.GetElement(index); } else { return m_childQueryResults.GetElement(m_takeOrSkipOp.m_count + index); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ScriptComponentDescriptor.cs
- ConfigurationStrings.cs
- WebConfigurationManager.cs
- MultiDataTrigger.cs
- TypeUsage.cs
- OuterGlowBitmapEffect.cs
- FileSystemEnumerable.cs
- HotSpotCollection.cs
- AnyAllSearchOperator.cs
- DisposableCollectionWrapper.cs
- Context.cs
- ECDiffieHellmanCng.cs
- DesignerCommandSet.cs
- PopupEventArgs.cs
- ControlUtil.cs
- X509CertificateValidator.cs
- XmlEventCache.cs
- BitStream.cs
- DictionaryManager.cs
- EmptyImpersonationContext.cs
- EntityProviderFactory.cs
- TextRenderingModeValidation.cs
- CompModSwitches.cs
- ToolStripItemRenderEventArgs.cs
- PolygonHotSpot.cs
- LocalValueEnumerator.cs
- DataColumn.cs
- TypeInfo.cs
- MissingMemberException.cs
- MetadataItemSerializer.cs
- ResetableIterator.cs
- DataServiceQueryOfT.cs
- AlphaSortedEnumConverter.cs
- SemanticAnalyzer.cs
- ImageAttributes.cs
- View.cs
- MatrixUtil.cs
- DirectoryObjectSecurity.cs
- TypeGeneratedEventArgs.cs
- X509SubjectKeyIdentifierClause.cs
- IsolationInterop.cs
- ExpressionBinding.cs
- DataKeyArray.cs
- SqlCaseSimplifier.cs
- TreeNodeStyleCollection.cs
- FastEncoder.cs
- OleDbTransaction.cs
- QueryExpr.cs
- UniqueConstraint.cs
- DebugInfoGenerator.cs
- GcHandle.cs
- FollowerQueueCreator.cs
- SchemaAttDef.cs
- AgileSafeNativeMemoryHandle.cs
- EnumerableCollectionView.cs
- RuntimeArgumentHandle.cs
- TextTreeDeleteContentUndoUnit.cs
- WinFormsUtils.cs
- bindurihelper.cs
- PropertyMetadata.cs
- LinkButton.cs
- ToolStripArrowRenderEventArgs.cs
- arabicshape.cs
- XmlQualifiedNameTest.cs
- ListChangedEventArgs.cs
- _CookieModule.cs
- PtsPage.cs
- ConnectivityStatus.cs
- RuntimeConfigLKG.cs
- CallbackHandler.cs
- ToolStripRenderEventArgs.cs
- CreateUserWizard.cs
- SiteMapPath.cs
- SocketElement.cs
- SeekStoryboard.cs
- Point3DAnimationUsingKeyFrames.cs
- CommentEmitter.cs
- FontDialog.cs
- SubqueryRules.cs
- base64Transforms.cs
- PrePrepareMethodAttribute.cs
- CustomTrackingRecord.cs
- TextTreeInsertUndoUnit.cs
- UseAttributeSetsAction.cs
- odbcmetadatacolumnnames.cs
- IResourceProvider.cs
- ColumnClickEvent.cs
- GradientStopCollection.cs
- RotateTransform.cs
- XPathChildIterator.cs
- SplitContainer.cs
- FeatureSupport.cs
- QilUnary.cs
- GridViewSortEventArgs.cs
- DictionaryTraceRecord.cs
- XmlSchemaSimpleTypeRestriction.cs
- WindowsScrollBarBits.cs
- ReflectionServiceProvider.cs
- BatchWriter.cs
- TcpClientSocketManager.cs