Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / Unary / TakeOrSkipWhileQueryOperator.cs / 1305376 / TakeOrSkipWhileQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // TakeOrSkipWhileQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Threading; using System.Diagnostics.Contracts; namespace System.Linq.Parallel { ////// Take- and SkipWhile work similarly. Execution is broken into two phases: Search /// and Yield. /// /// During the Search phase, many partitions at once search for the first occurrence /// of a false element. As they search, any time a partition finds a false element /// whose index is lesser than the current lowest-known false element, the new index /// will be published, so other partitions can stop the search. The search stops /// as soon as (1) a partition exhausts its input, (2) the predicate yields false for /// one of the partition's elements, or (3) its input index passes the current lowest- /// known index (sufficient since a given partition's indices are always strictly /// incrementing -- asserted below). Elements are buffered during this process. /// /// Partitions use a barrier after Search and before moving on to Yield. Once all /// have passed the barrier, Yielding begins. At this point, the lowest-known false /// index will be accurate for the entire set, since all partitions have finished /// scanning. This is where TakeWhile and SkipWhile differ. TakeWhile will start at /// the beginning of its buffer and yield all elements whose indices are less than /// the lowest-known false index. SkipWhile, on the other hand, will skipp any such /// elements in the buffer, yielding those whose index is greater than or equal to /// the lowest-known false index, and then finish yielding any remaining elements in /// its data source (since it may have stopped prematurely due to (3) above). /// ///internal sealed class TakeOrSkipWhileQueryOperator : UnaryQueryOperator { // Predicate function used to decide when to stop yielding elements. One pair is used for // index-based evaluation (i.e. it is passed the index as well as the element's value). private Func m_predicate; private Func m_indexedPredicate; private readonly bool m_take; // Whether to take (true) or skip (false). private bool m_prematureMerge = false; // Whether to prematurely merge the input of this operator. //---------------------------------------------------------------------------------------- // Initializes a new take-while operator. // // Arguments: // child - the child data source to enumerate // predicate - the predicate function (if expression tree isn't provided) // indexedPredicate - the index-based predicate function (if expression tree isn't provided) // take - whether this is a TakeWhile (true) or SkipWhile (false) // // Notes: // Only one kind of predicate can be specified, an index-based one or not. If an // expression tree is provided, the delegate cannot also be provided. // internal TakeOrSkipWhileQueryOperator(IEnumerable child, Func predicate, Func indexedPredicate, bool take) :base(child) { Contract.Assert(child != null, "child data source cannot be null"); Contract.Assert(predicate != null || indexedPredicate != null, "need a predicate function"); m_predicate = predicate; m_indexedPredicate = indexedPredicate; m_take = take; SetOrdinalIndexState(OutputOrderIndexState()); } /// /// Determines the order index state for the output operator /// private OrdinalIndexState OutputOrderIndexState() { // SkipWhile/TakeWhile needs an increasing index. However, if the predicate expression depends on the index, // the index needs to be correct, not just increasing. OrdinalIndexState requiredIndexState = OrdinalIndexState.Increasing; if (m_indexedPredicate != null) { requiredIndexState = OrdinalIndexState.Correct; } OrdinalIndexState indexState = ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Correct); if (indexState.IsWorseThan(requiredIndexState)) { m_prematureMerge = true; } if (!m_take) { // If the index was correct, now it is only increasing. indexState = indexState.Worse(OrdinalIndexState.Increasing); } return indexState; } internal override void WrapPartitionedStream( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; PartitionedStream listInputStream; if (m_prematureMerge) { ListQueryResults results = ExecuteAndCollectResults(inputStream, partitionCount, Child.OutputOrdered, preferStriping, settings); listInputStream = results.GetPartitionedStream(); } else { Contract.Assert(typeof(int) == typeof(TKey)); listInputStream = (PartitionedStream )(object)inputStream; } // Create shared data. One is an index that represents the lowest false value found, // while the other is a latch used as a barrier. Shared sharedLowFalse = new Shared (-1); // Note that -1 is a sentinel to mean "not set yet". CountdownEvent sharedBarrier = new CountdownEvent(partitionCount); PartitionedStream partitionedStream = new PartitionedStream (partitionCount, Util.GetDefaultComparer (), OrdinalIndexState); for (int i = 0; i < partitionCount; i++) { partitionedStream[i] = new TakeOrSkipWhileQueryOperatorEnumerator( listInputStream[i], m_predicate, m_indexedPredicate, m_take, sharedLowFalse, sharedBarrier, settings.CancellationState.MergedCancellationToken); } recipient.Receive(partitionedStream); } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { QueryResults childQueryResults = Child.Open(settings, true); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { if (m_take) { if (m_indexedPredicate != null) { return Child.AsSequentialQuery(token).TakeWhile(m_indexedPredicate); } return Child.AsSequentialQuery(token).TakeWhile(m_predicate); } if (m_indexedPredicate != null) { IEnumerable wrappedIndexedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedIndexedChild.SkipWhile(m_indexedPredicate); } IEnumerable wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); return wrappedChild.SkipWhile(m_predicate); } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return true; } } //---------------------------------------------------------------------------------------- // The enumerator type responsible for executing the take- or skip-while. // class TakeOrSkipWhileQueryOperatorEnumerator : QueryOperatorEnumerator { private readonly QueryOperatorEnumerator m_source; // The data source to enumerate. private readonly Func m_predicate; // The actual predicate function. private readonly Func m_indexedPredicate; // The actual index-based predicate function. private readonly bool m_take; // Whether to execute a take- (true) or skip-while (false). // These fields are all shared among partitions. private readonly Shared m_sharedLowFalse; // The lowest false found by any partition. private readonly CountdownEvent m_sharedBarrier; // To separate the search/yield phases. private readonly CancellationToken m_cancellationToken; // Token used to cancel this operator. private List > m_buffer; // Our buffer. private Shared m_bufferIndex; // Our current index within the buffer. [allocate in moveNext to avoid false-sharing] //--------------------------------------------------------------------------------------- // Instantiates a new select enumerator. // internal TakeOrSkipWhileQueryOperatorEnumerator( QueryOperatorEnumerator source, Func predicate, Func indexedPredicate, bool take, Shared sharedLowFalse, CountdownEvent sharedBarrier, CancellationToken cancelToken) { Contract.Assert(source != null); Contract.Assert(predicate != null || indexedPredicate != null); Contract.Assert(sharedLowFalse != null); Contract.Assert(sharedBarrier != null); m_source = source; m_predicate = predicate; m_indexedPredicate = indexedPredicate; m_take = take; m_sharedLowFalse = sharedLowFalse; m_sharedBarrier = sharedBarrier; m_cancellationToken = cancelToken; } //---------------------------------------------------------------------------------------- // Straightforward IEnumerator methods. // internal override bool MoveNext(ref TResult currentElement, ref int currentKey) { // If the buffer has not been created, we will generate it lazily on demand. if (m_buffer == null) { // Create a buffer, but don't publish it yet (in case of exception). List > buffer = new List >(); // Enter the search phase. In this phase, we scan the input until one of three // things happens: (1) all input has been exhausted, (2) the predicate yields // false for one of our elements, or (3) we move past the current lowest index // found by other partitions for a false element. As we go, we have to remember // the elements by placing them into the buffer. // @ try { TResult current = default(TResult); int index = default(int); int i = 0; //counter to help with cancellation while (m_source.MoveNext(ref current, ref index)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); // Add the current element to our buffer. // @ buffer.Add(new Pair (current, index)); // See if another partition has found a false value before this element. If so, // we should stop scanning the input now and reach the barrier ASAP. int currentLowIndex = m_sharedLowFalse.Value; if (currentLowIndex != -1 && index > currentLowIndex) { break; } // Evaluate the predicate, either indexed or not based on info passed to the ctor. bool predicateResult; if (m_predicate != null) { predicateResult = m_predicate(current); } else { Contract.Assert(m_indexedPredicate != null); predicateResult = m_indexedPredicate(current, index); } if (!predicateResult) { // Signal that we've found a false element, racing with other partitions to // set the shared index value. If we lose this ----, that's fine: the one trying // to publish the lowest value will ultimately win; so we retry if ours is // lower, or bail right away otherwise. We use a spin wait to deal with contention. int observedLowIndex; SpinWait s = new SpinWait(); while (true) { // Read the current value of the index with a volatile load to prevent movement. observedLowIndex = Thread.VolatileRead(ref m_sharedLowFalse.Value); // If the current shared index is set and lower than ours, we won't try to CAS. if ((observedLowIndex != -1 && observedLowIndex < index) || Interlocked.CompareExchange(ref m_sharedLowFalse.Value, index, observedLowIndex) == observedLowIndex) { // Either the current value is lower or we succeeded in swapping the // current value with ours. We're done. break; } // If we failed the swap, we will spin briefly to reduce contention. s.SpinOnce(); } // Exit the loop and reach the barrier. break; } } } finally { // No matter whether we exit due to an exception or normal completion, we must ensure // that we signal other partitions that we have completed. Otherwise, we can cause deadlocks. m_sharedBarrier.Signal(); } // Before exiting the search phase, we will synchronize with others. This is a barrier. m_sharedBarrier.Wait(m_cancellationToken); // Publish the buffer and set the index to just before the 1st element. m_buffer = buffer; m_bufferIndex = new Shared (-1); } // Now either enter (or continue) the yielding phase. As soon as we reach this, we know the // current shared "low false" value is the absolute lowest with a false. if (m_take) { // In the case of a take-while, we will yield each element from our buffer for which // the element is lesser than the lowest false index found. if (m_bufferIndex.Value >= m_buffer.Count - 1) { return false; } // Increment the index, and remember the values. ++m_bufferIndex.Value; currentElement = m_buffer[m_bufferIndex.Value].First; currentKey = m_buffer[m_bufferIndex.Value].Second; return m_sharedLowFalse.Value == -1 || m_sharedLowFalse.Value > m_buffer[m_bufferIndex.Value].Second; } else { // If no false was found, the output is empty. if (m_sharedLowFalse.Value == -1) { return false; } // In the case of a skip-while, we must skip over elements whose index is lesser than the // lowest index found. Once we've exhausted the buffer, we must go back and continue // enumerating the data source until it is empty. if (m_bufferIndex.Value < m_buffer.Count - 1) { for (m_bufferIndex.Value++; m_bufferIndex.Value < m_buffer.Count; m_bufferIndex.Value++) { // If the current buffered element's index is greater than or equal to the smallest // false index found, we will yield it as a result. if (m_buffer[m_bufferIndex.Value].Second >= m_sharedLowFalse.Value) { currentElement = m_buffer[m_bufferIndex.Value].First; currentKey = m_buffer[m_bufferIndex.Value].Second; return true; } } } // Lastly, so long as our input still has elements, they will be yieldable. if (m_source.MoveNext(ref currentElement, ref currentKey)) { Contract.Assert(currentKey > m_sharedLowFalse.Value, "expected remaining element indices to be greater than smallest"); return true; } } return false; } protected override void Dispose(bool disposing) { m_source.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- FormsAuthenticationUserCollection.cs
- PropertiesTab.cs
- LongTypeConverter.cs
- OutputCacheModule.cs
- DataGridViewCheckBoxColumn.cs
- EncodingDataItem.cs
- Stack.cs
- XmlDocumentSchema.cs
- ImpersonationContext.cs
- DataServices.cs
- DispatcherEventArgs.cs
- SwitchCase.cs
- OleDbError.cs
- DataGridBoolColumn.cs
- XmlNamespaceManager.cs
- DataGridViewCellStyleConverter.cs
- RankException.cs
- RC2.cs
- TemplatedMailWebEventProvider.cs
- SponsorHelper.cs
- KeyboardNavigation.cs
- SHA512.cs
- PerformanceCountersElement.cs
- ViewgenGatekeeper.cs
- EventRouteFactory.cs
- SoapFault.cs
- NamespaceQuery.cs
- LayoutManager.cs
- Expression.cs
- IsolatedStoragePermission.cs
- SQLBoolean.cs
- CmsInterop.cs
- WebServiceMethodData.cs
- _LoggingObject.cs
- sqlser.cs
- CatalogPart.cs
- ControlIdConverter.cs
- SafeBitVector32.cs
- BitConverter.cs
- DocumentPageViewAutomationPeer.cs
- Transform3D.cs
- ResourceDisplayNameAttribute.cs
- ObjectStateEntryDbUpdatableDataRecord.cs
- ListBoxItem.cs
- DataObject.cs
- _FixedSizeReader.cs
- TypeLibConverter.cs
- CuspData.cs
- Evidence.cs
- EntityDesignerUtils.cs
- TabItemAutomationPeer.cs
- SemanticKeyElement.cs
- SafeWaitHandle.cs
- ImageAutomationPeer.cs
- XslAst.cs
- HwndSource.cs
- PropertyExpression.cs
- _SSPISessionCache.cs
- NamedObject.cs
- TransactionException.cs
- Cloud.cs
- ElementProxy.cs
- CapiSymmetricAlgorithm.cs
- sitestring.cs
- CmsUtils.cs
- SecurityKeyIdentifierClause.cs
- SHA256.cs
- TableParaClient.cs
- ServiceParser.cs
- EntityDesignerUtils.cs
- XsdDateTime.cs
- BitStack.cs
- Compiler.cs
- TypeConvertions.cs
- XamlReaderHelper.cs
- __ConsoleStream.cs
- QilXmlReader.cs
- ValidationResult.cs
- DataGridViewAutoSizeModeEventArgs.cs
- DropShadowEffect.cs
- WebPartConnectVerb.cs
- EdmProperty.cs
- DataControlLinkButton.cs
- ServiceInfo.cs
- PermissionListSet.cs
- HtmlInputControl.cs
- OracleCommand.cs
- TriggerBase.cs
- ObjectDataSourceEventArgs.cs
- NumberFormatInfo.cs
- HttpClientProtocol.cs
- MouseWheelEventArgs.cs
- HtmlElement.cs
- DataColumnCollection.cs
- HttpChannelHelper.cs
- SslStreamSecurityUpgradeProvider.cs
- PersonalizationStateQuery.cs
- DataGridDesigner.cs
- StaticSiteMapProvider.cs
- Enlistment.cs