Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / AssociativeAggregationOperator.cs / 1305376 / AssociativeAggregationOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // AssociativeAggregationOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// The aggregation operator is a little unique, in that the enumerators it returns /// yield intermediate results instead of the final results. That's because there is /// one last Aggregate operation that must occur in order to perform the final reduction /// over the intermediate streams. In other words, the intermediate enumerators produced /// by this operator are never seen by other query operators or consumers directly. /// /// An aggregation performs parallel prefixing internally. Given a binary operator O, /// it will generate intermediate results by folding O across partitions; then it /// performs a final reduction by folding O accross the intermediate results. The /// analysis engine knows about associativity and commutativity, and will ensure the /// style of partitioning inserted into the tree is compatable with the operator. /// /// For instance, say O is + (meaning it is AC), our input is {1,2,...,8}, and we /// use 4 partitions to calculate the aggregation. Sequentially this would look /// like this O(O(O(1,2),...),8), in other words ((1+2)+...)+8. The parallel prefix /// of this (w/ 4 partitions) instead calculates the intermediate aggregations, i.e.: /// t1 = O(1,2), t2 = O(3,4), ... t4 = O(7,8), aka t1 = 1+2, t2 = 3+4, t4 = 7+8. /// The final step is to aggregate O over these intermediaries, i.e. /// O(O(O(t1,t2),t3),t4), or ((t1+t2)+t3)+t4. This generalizes to any binary operator. /// /// Beause some aggregations use a different input, intermediate, and output types, /// we support an even more generalized aggregation type. In this model, we have /// three operators, an intermediate (used for the incremental aggregations), a /// final (used for the final summary of intermediate results), and a result selector /// (used to perform whatever transformation is needed on the final summary). /// ////// /// internal sealed class AssociativeAggregationOperator : UnaryQueryOperator { private readonly TIntermediate m_seed; // A seed used during aggregation. private readonly bool m_seedIsSpecified; // Whether a seed was specified. If not, the first element will be used. private readonly bool m_throwIfEmpty; // Whether to throw an exception if the data source is empty. // An intermediate reduction function. private Func m_intermediateReduce; // A final reduction function. private Func m_finalReduce; // The result selector function. private Func m_resultSelector; // A function that constructs seed instances private Func m_seedFactory; //---------------------------------------------------------------------------------------- // Constructs a new instance of an associative operator. // // Assumptions: // This operator must be associative. // internal AssociativeAggregationOperator(IEnumerable child, TIntermediate seed, Func seedFactory, bool seedIsSpecified, Func intermediateReduce, Func finalReduce, Func resultSelector, bool throwIfEmpty, QueryAggregationOptions options) :base(child) { Contract.Assert(child != null, "child data source cannot be null"); Contract.Assert(intermediateReduce != null, "need an intermediate reduce function"); Contract.Assert(finalReduce != null, "need a final reduce function"); Contract.Assert(resultSelector != null, "need a result selector function"); Contract.Assert(Enum.IsDefined(typeof(QueryAggregationOptions), options), "enum out of valid range"); Contract.Assert((options & QueryAggregationOptions.Associative) == QueryAggregationOptions.Associative, "expected an associative operator"); Contract.Assert(typeof(TIntermediate) == typeof(TInput) || seedIsSpecified, "seed must be specified if TIntermediate differs from TInput"); m_seed = seed; m_seedFactory = seedFactory; m_seedIsSpecified = seedIsSpecified; m_intermediateReduce = intermediateReduce; m_finalReduce = finalReduce; m_resultSelector = resultSelector; m_throwIfEmpty = throwIfEmpty; } //--------------------------------------------------------------------------------------- // Executes the entire query tree, and aggregates the intermediate results into the // final result based on the binary operators and final reduction. // // Return Value: // The single result of aggregation. // internal TOutput Aggregate() { Contract.Assert(m_finalReduce != null); Contract.Assert(m_resultSelector != null); TIntermediate accumulator = default(TIntermediate); bool hadElements = false; // Because the final reduction is typically much cheaper than the intermediate // reductions over the individual partitions, and because each parallel partition // will do a lot of work to produce a single output element, we prefer to turn off // pipelining, and process the final reductions serially. using (IEnumerator enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true)) { // We just reduce the elements in each output partition. If the operation is associative, // this will yield the correct answer. If not, we should never be calling this routine. while (enumerator.MoveNext()) { if (hadElements) { // Accumulate results by passing the current accumulation and current element to // the reduction operation. try { accumulator = m_finalReduce(accumulator, enumerator.Current); } catch (ThreadAbortException) { // Do not wrap ThreadAbortExceptions throw; } catch (Exception ex) { // We need to wrap all exceptions into an aggregate. throw new AggregateException(ex); } } else { // This is the first element. Just set the accumulator to the first element. accumulator = enumerator.Current; hadElements = true; } } // If there were no elements, we must throw an exception. if (!hadElements) { if (m_throwIfEmpty) { throw new InvalidOperationException(SR.GetString(SR.NoElements)); } else { accumulator = m_seedFactory == null ? m_seed : m_seedFactory(); } } } // Finally, run the selection routine to yield the final element. try { return m_resultSelector(accumulator); } catch (ThreadAbortException) { // Do not wrap ThreadAbortExceptions throw; } catch (Exception ex) { // We need to wrap all exceptions into an aggregate. throw new AggregateException(ex); } } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open the child operator. QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream ( partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Correct); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new AssociativeAggregationOperatorEnumerator (inputStream[i], this, i, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { Contract.Assert(false, "This method should never be called. Associative aggregation can always be parallelized."); throw new NotSupportedException(); } //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //--------------------------------------------------------------------------------------- // This enumerator type encapsulates the intermediary aggregation over the underlying // (possibly partitioned) data source. // private class AssociativeAggregationOperatorEnumerator : QueryOperatorEnumerator { private readonly QueryOperatorEnumerator m_source; // The source data. private readonly AssociativeAggregationOperator m_reduceOperator; // The operator. private readonly int m_partitionIndex; // The index of this partition. private readonly CancellationToken m_cancellationToken; private bool m_accumulated; // Whether we've accumulated already. (false-sharing risk, but only written once) //---------------------------------------------------------------------------------------- // Instantiates a new aggregation operator. // internal AssociativeAggregationOperatorEnumerator(QueryOperatorEnumerator source, AssociativeAggregationOperator reduceOperator, int partitionIndex, CancellationToken cancellationToken) { Contract.Assert(source != null); Contract.Assert(reduceOperator != null); m_source = source; m_reduceOperator = reduceOperator; m_partitionIndex = partitionIndex; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // This API, upon the first time calling it, walks the entire source query tree. It begins // with an accumulator value set to the aggregation operator's seed, and always passes // the accumulator along with the current element from the data source to the binary // intermediary aggregation operator. The return value is kept in the accumulator. At // the end, we will have our intermediate result, ready for final aggregation. // internal override bool MoveNext(ref TIntermediate currentElement, ref int currentKey) { Contract.Assert(m_reduceOperator != null); Contract.Assert(m_reduceOperator.m_intermediateReduce != null, "expected a compiled operator"); // Only produce a single element. Return false if MoveNext() was already called before. if (m_accumulated) { return false; } m_accumulated = true; bool hadNext = false; TIntermediate accumulator = default(TIntermediate); // Initialize the accumulator. if (m_reduceOperator.m_seedIsSpecified) { // If the seed is specified, initialize accumulator to the seed value. accumulator = m_reduceOperator.m_seedFactory == null ? m_reduceOperator.m_seed : m_reduceOperator.m_seedFactory(); } else { // If the seed is not specified, then we take the first element as the seed. // Seed may be unspecified only if TInput is the same as TIntermediate. Contract.Assert(typeof(TInput) == typeof(TIntermediate)); TInput acc = default(TInput); TKey accKeyUnused = default(TKey); if (!m_source.MoveNext(ref acc, ref accKeyUnused)) return false; hadNext = true; accumulator = (TIntermediate)((object)acc); } // Scan through the source and accumulate the result. TInput input = default(TInput); TKey keyUnused = default(TKey); int i = 0; while (m_source.MoveNext(ref input, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); hadNext = true; accumulator = m_reduceOperator.m_intermediateReduce(accumulator, input); } if (hadNext) { currentElement = accumulator; currentKey = m_partitionIndex; // A reduction's "index" is just its partition number. return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // AssociativeAggregationOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// The aggregation operator is a little unique, in that the enumerators it returns /// yield intermediate results instead of the final results. That's because there is /// one last Aggregate operation that must occur in order to perform the final reduction /// over the intermediate streams. In other words, the intermediate enumerators produced /// by this operator are never seen by other query operators or consumers directly. /// /// An aggregation performs parallel prefixing internally. Given a binary operator O, /// it will generate intermediate results by folding O across partitions; then it /// performs a final reduction by folding O accross the intermediate results. The /// analysis engine knows about associativity and commutativity, and will ensure the /// style of partitioning inserted into the tree is compatable with the operator. /// /// For instance, say O is + (meaning it is AC), our input is {1,2,...,8}, and we /// use 4 partitions to calculate the aggregation. Sequentially this would look /// like this O(O(O(1,2),...),8), in other words ((1+2)+...)+8. The parallel prefix /// of this (w/ 4 partitions) instead calculates the intermediate aggregations, i.e.: /// t1 = O(1,2), t2 = O(3,4), ... t4 = O(7,8), aka t1 = 1+2, t2 = 3+4, t4 = 7+8. /// The final step is to aggregate O over these intermediaries, i.e. /// O(O(O(t1,t2),t3),t4), or ((t1+t2)+t3)+t4. This generalizes to any binary operator. /// /// Beause some aggregations use a different input, intermediate, and output types, /// we support an even more generalized aggregation type. In this model, we have /// three operators, an intermediate (used for the incremental aggregations), a /// final (used for the final summary of intermediate results), and a result selector /// (used to perform whatever transformation is needed on the final summary). /// ////// /// internal sealed class AssociativeAggregationOperator : UnaryQueryOperator { private readonly TIntermediate m_seed; // A seed used during aggregation. private readonly bool m_seedIsSpecified; // Whether a seed was specified. If not, the first element will be used. private readonly bool m_throwIfEmpty; // Whether to throw an exception if the data source is empty. // An intermediate reduction function. private Func m_intermediateReduce; // A final reduction function. private Func m_finalReduce; // The result selector function. private Func m_resultSelector; // A function that constructs seed instances private Func m_seedFactory; //---------------------------------------------------------------------------------------- // Constructs a new instance of an associative operator. // // Assumptions: // This operator must be associative. // internal AssociativeAggregationOperator(IEnumerable child, TIntermediate seed, Func seedFactory, bool seedIsSpecified, Func intermediateReduce, Func finalReduce, Func resultSelector, bool throwIfEmpty, QueryAggregationOptions options) :base(child) { Contract.Assert(child != null, "child data source cannot be null"); Contract.Assert(intermediateReduce != null, "need an intermediate reduce function"); Contract.Assert(finalReduce != null, "need a final reduce function"); Contract.Assert(resultSelector != null, "need a result selector function"); Contract.Assert(Enum.IsDefined(typeof(QueryAggregationOptions), options), "enum out of valid range"); Contract.Assert((options & QueryAggregationOptions.Associative) == QueryAggregationOptions.Associative, "expected an associative operator"); Contract.Assert(typeof(TIntermediate) == typeof(TInput) || seedIsSpecified, "seed must be specified if TIntermediate differs from TInput"); m_seed = seed; m_seedFactory = seedFactory; m_seedIsSpecified = seedIsSpecified; m_intermediateReduce = intermediateReduce; m_finalReduce = finalReduce; m_resultSelector = resultSelector; m_throwIfEmpty = throwIfEmpty; } //--------------------------------------------------------------------------------------- // Executes the entire query tree, and aggregates the intermediate results into the // final result based on the binary operators and final reduction. // // Return Value: // The single result of aggregation. // internal TOutput Aggregate() { Contract.Assert(m_finalReduce != null); Contract.Assert(m_resultSelector != null); TIntermediate accumulator = default(TIntermediate); bool hadElements = false; // Because the final reduction is typically much cheaper than the intermediate // reductions over the individual partitions, and because each parallel partition // will do a lot of work to produce a single output element, we prefer to turn off // pipelining, and process the final reductions serially. using (IEnumerator enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true)) { // We just reduce the elements in each output partition. If the operation is associative, // this will yield the correct answer. If not, we should never be calling this routine. while (enumerator.MoveNext()) { if (hadElements) { // Accumulate results by passing the current accumulation and current element to // the reduction operation. try { accumulator = m_finalReduce(accumulator, enumerator.Current); } catch (ThreadAbortException) { // Do not wrap ThreadAbortExceptions throw; } catch (Exception ex) { // We need to wrap all exceptions into an aggregate. throw new AggregateException(ex); } } else { // This is the first element. Just set the accumulator to the first element. accumulator = enumerator.Current; hadElements = true; } } // If there were no elements, we must throw an exception. if (!hadElements) { if (m_throwIfEmpty) { throw new InvalidOperationException(SR.GetString(SR.NoElements)); } else { accumulator = m_seedFactory == null ? m_seed : m_seedFactory(); } } } // Finally, run the selection routine to yield the final element. try { return m_resultSelector(accumulator); } catch (ThreadAbortException) { // Do not wrap ThreadAbortExceptions throw; } catch (Exception ex) { // We need to wrap all exceptions into an aggregate. throw new AggregateException(ex); } } //--------------------------------------------------------------------------------------- // Just opens the current operator, including opening the child and wrapping it with // partitions as needed. // internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // We just open the child operator. QueryResults childQueryResults = Child.Open(settings, preferStriping); return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); } internal override void WrapPartitionedStream ( PartitionedStream inputStream, IPartitionedStreamRecipient recipient, bool preferStriping, QuerySettings settings) { int partitionCount = inputStream.PartitionCount; PartitionedStream outputStream = new PartitionedStream ( partitionCount, Util.GetDefaultComparer (), OrdinalIndexState.Correct); for (int i = 0; i < partitionCount; i++) { outputStream[i] = new AssociativeAggregationOperatorEnumerator (inputStream[i], this, i, settings.CancellationState.MergedCancellationToken); } recipient.Receive(outputStream); } //--------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { Contract.Assert(false, "This method should never be called. Associative aggregation can always be parallelized."); throw new NotSupportedException(); } //---------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } //--------------------------------------------------------------------------------------- // This enumerator type encapsulates the intermediary aggregation over the underlying // (possibly partitioned) data source. // private class AssociativeAggregationOperatorEnumerator : QueryOperatorEnumerator { private readonly QueryOperatorEnumerator m_source; // The source data. private readonly AssociativeAggregationOperator m_reduceOperator; // The operator. private readonly int m_partitionIndex; // The index of this partition. private readonly CancellationToken m_cancellationToken; private bool m_accumulated; // Whether we've accumulated already. (false-sharing risk, but only written once) //---------------------------------------------------------------------------------------- // Instantiates a new aggregation operator. // internal AssociativeAggregationOperatorEnumerator(QueryOperatorEnumerator source, AssociativeAggregationOperator reduceOperator, int partitionIndex, CancellationToken cancellationToken) { Contract.Assert(source != null); Contract.Assert(reduceOperator != null); m_source = source; m_reduceOperator = reduceOperator; m_partitionIndex = partitionIndex; m_cancellationToken = cancellationToken; } //---------------------------------------------------------------------------------------- // This API, upon the first time calling it, walks the entire source query tree. It begins // with an accumulator value set to the aggregation operator's seed, and always passes // the accumulator along with the current element from the data source to the binary // intermediary aggregation operator. The return value is kept in the accumulator. At // the end, we will have our intermediate result, ready for final aggregation. // internal override bool MoveNext(ref TIntermediate currentElement, ref int currentKey) { Contract.Assert(m_reduceOperator != null); Contract.Assert(m_reduceOperator.m_intermediateReduce != null, "expected a compiled operator"); // Only produce a single element. Return false if MoveNext() was already called before. if (m_accumulated) { return false; } m_accumulated = true; bool hadNext = false; TIntermediate accumulator = default(TIntermediate); // Initialize the accumulator. if (m_reduceOperator.m_seedIsSpecified) { // If the seed is specified, initialize accumulator to the seed value. accumulator = m_reduceOperator.m_seedFactory == null ? m_reduceOperator.m_seed : m_reduceOperator.m_seedFactory(); } else { // If the seed is not specified, then we take the first element as the seed. // Seed may be unspecified only if TInput is the same as TIntermediate. Contract.Assert(typeof(TInput) == typeof(TIntermediate)); TInput acc = default(TInput); TKey accKeyUnused = default(TKey); if (!m_source.MoveNext(ref acc, ref accKeyUnused)) return false; hadNext = true; accumulator = (TIntermediate)((object)acc); } // Scan through the source and accumulate the result. TInput input = default(TInput); TKey keyUnused = default(TKey); int i = 0; while (m_source.MoveNext(ref input, ref keyUnused)) { if ((i++ & CancellationState.POLL_INTERVAL) == 0) CancellationState.ThrowIfCanceled(m_cancellationToken); hadNext = true; accumulator = m_reduceOperator.m_intermediateReduce(accumulator, input); } if (hadNext) { currentElement = accumulator; currentKey = m_partitionIndex; // A reduction's "index" is just its partition number. return true; } return false; } protected override void Dispose(bool disposing) { Contract.Assert(m_source != null); m_source.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ReferenceSchema.cs
- SQLBinary.cs
- CharUnicodeInfo.cs
- CompositeFontFamily.cs
- MSAAEventDispatcher.cs
- UnsafeNativeMethods.cs
- TimeZoneInfo.cs
- ProgressPage.cs
- SqlDataSourceStatusEventArgs.cs
- FrugalMap.cs
- Documentation.cs
- CompoundFileStorageReference.cs
- CatalogPartChrome.cs
- EntityCommandCompilationException.cs
- CompilerGeneratedAttribute.cs
- SimplePropertyEntry.cs
- MobileControlsSectionHandler.cs
- PeerNameRecordCollection.cs
- M3DUtil.cs
- Util.cs
- ThemeDictionaryExtension.cs
- TdsRecordBufferSetter.cs
- AnchoredBlock.cs
- HtmlDocument.cs
- Int64Converter.cs
- StickyNoteAnnotations.cs
- IIS7WorkerRequest.cs
- ZipIOFileItemStream.cs
- Stroke.cs
- CompatibleComparer.cs
- AccessorTable.cs
- LazyTextWriterCreator.cs
- CngKeyBlobFormat.cs
- CodeEntryPointMethod.cs
- HitTestParameters.cs
- Drawing.cs
- DropDownButton.cs
- XmlLanguage.cs
- PasswordBox.cs
- EnumValAlphaComparer.cs
- EntityFunctions.cs
- MemoryRecordBuffer.cs
- ContextQuery.cs
- QueryParameter.cs
- XmlUrlResolver.cs
- CodeParameterDeclarationExpressionCollection.cs
- StateMachineDesignerPaint.cs
- DataListCommandEventArgs.cs
- SoapFormatterSinks.cs
- ServiceHttpModule.cs
- ProjectedSlot.cs
- RootBuilder.cs
- ImmutableAssemblyCacheEntry.cs
- ContainerParagraph.cs
- SoapObjectWriter.cs
- HttpsHostedTransportConfiguration.cs
- InputElement.cs
- DataExpression.cs
- FirstMatchCodeGroup.cs
- input.cs
- IPGlobalProperties.cs
- RectAnimationUsingKeyFrames.cs
- OracleRowUpdatingEventArgs.cs
- UnsafeNativeMethods.cs
- FixedTextContainer.cs
- DetailsViewModeEventArgs.cs
- DelegateHelpers.Generated.cs
- SynchronizationContext.cs
- DataGridViewColumn.cs
- ZoneIdentityPermission.cs
- PublisherMembershipCondition.cs
- CmsUtils.cs
- CommandExpr.cs
- Convert.cs
- MonitorWrapper.cs
- CmsInterop.cs
- XDRSchema.cs
- XNameConverter.cs
- TextCollapsingProperties.cs
- TraceContextRecord.cs
- DataColumnMappingCollection.cs
- DetailsViewDeleteEventArgs.cs
- IResourceProvider.cs
- mansign.cs
- Camera.cs
- RawStylusInputReport.cs
- InheritanceUI.cs
- CheckBoxPopupAdapter.cs
- TransformerConfigurationWizardBase.cs
- FixedPage.cs
- CallbackValidatorAttribute.cs
- ScriptingJsonSerializationSection.cs
- WithParamAction.cs
- Timeline.cs
- SplineQuaternionKeyFrame.cs
- BaseTemplateParser.cs
- TextEditorTyping.cs
- TextBox.cs
- KeyedQueue.cs
- AllMembershipCondition.cs