Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Core / System / Linq / Parallel / QueryOperators / PartitionerQueryOperator.cs / 1305376 / PartitionerQueryOperator.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // PartitionerQueryOperator.cs // //[....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Collections.Concurrent; using System.Linq.Parallel; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// A QueryOperator that represents the output of the query partitioner.AsParallel(). /// internal class PartitionerQueryOperator: QueryOperator { private Partitioner m_partitioner; // The partitioner to use as data source. internal PartitionerQueryOperator(Partitioner partitioner) : base(false, QuerySettings.Empty) { m_partitioner = partitioner; } internal bool Orderable { get { return m_partitioner is OrderablePartitioner ; } } internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // Notice that the preferStriping argument is not used. Partitioner does not support // striped partitioning. return new PartitionerQueryOperatorResults(m_partitioner, settings); } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { using (IEnumerator enumerator = m_partitioner.GetPartitions(1)[0]) { while (enumerator.MoveNext()) { yield return enumerator.Current; } } } //--------------------------------------------------------------------------------------- // The state of the order index of the results returned by this operator. // internal override OrdinalIndexState OrdinalIndexState { get { return GetOrdinalIndexState(m_partitioner); } } /// /// Determines the OrdinalIndexState for a partitioner /// internal static OrdinalIndexState GetOrdinalIndexState(Partitionerpartitioner) { OrderablePartitioner orderablePartitioner = partitioner as OrderablePartitioner ; if (orderablePartitioner == null) { return OrdinalIndexState.Shuffled; } if (orderablePartitioner.KeysOrderedInEachPartition) { if (orderablePartitioner.KeysNormalized) { return OrdinalIndexState.Correct; } else { return OrdinalIndexState.Increasing; } } else { return OrdinalIndexState.Shuffled; } } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } /// /// QueryResults for a PartitionerQueryOperator /// private class PartitionerQueryOperatorResults : QueryResults{ private Partitioner m_partitioner; // The data source for the query private QuerySettings m_settings; // Settings collected from the query internal PartitionerQueryOperatorResults(Partitioner partitioner, QuerySettings settings) { m_partitioner = partitioner; m_settings = settings; } internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient) { Contract.Assert(m_settings.DegreeOfParallelism.HasValue); int partitionCount = m_settings.DegreeOfParallelism.Value; OrderablePartitioner orderablePartitioner = m_partitioner as OrderablePartitioner ; // If the partitioner is not orderable, it will yield zeros as order keys. The order index state // is irrelevant. OrdinalIndexState indexState = (orderablePartitioner != null) ? GetOrdinalIndexState(orderablePartitioner) : OrdinalIndexState.Shuffled; PartitionedStream partitions = new PartitionedStream ( partitionCount, Util.GetDefaultComparer (), indexState); if (orderablePartitioner != null) { IList >> partitionerPartitions = orderablePartitioner.GetOrderablePartitions(partitionCount); if (partitionerPartitions == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList)); } if (partitionerPartitions.Count != partitionCount) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions)); } for (int i = 0; i < partitionCount; i++) { IEnumerator > partition = partitionerPartitions[i]; if (partition == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition)); } partitions[i] = new OrderablePartitionerEnumerator(partition); } } else { IList > partitionerPartitions = m_partitioner.GetPartitions(partitionCount); if (partitionerPartitions == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList)); } if (partitionerPartitions.Count != partitionCount) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions)); } for (int i = 0; i < partitionCount; i++) { IEnumerator partition = partitionerPartitions[i]; if (partition == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition)); } partitions[i] = new PartitionerEnumerator(partition); } } recipient.Receive (partitions); } } /// /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner /// to a QueryOperatorEnumerator used by PLINQ internally. /// private class OrderablePartitionerEnumerator : QueryOperatorEnumerator{ private IEnumerator > m_sourceEnumerator; internal OrderablePartitionerEnumerator(IEnumerator > sourceEnumerator) { m_sourceEnumerator = sourceEnumerator; } internal override bool MoveNext(ref TElement currentElement, ref int currentKey) { if (!m_sourceEnumerator.MoveNext()) return false; KeyValuePair current = m_sourceEnumerator.Current; currentElement = current.Value; checked { currentKey = (int)current.Key; } return true; } protected override void Dispose(bool disposing) { Contract.Assert(m_sourceEnumerator != null); m_sourceEnumerator.Dispose(); } } /// /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner /// to a QueryOperatorEnumerator used by PLINQ internally. /// private class PartitionerEnumerator : QueryOperatorEnumerator{ private IEnumerator m_sourceEnumerator; internal PartitionerEnumerator(IEnumerator sourceEnumerator) { m_sourceEnumerator = sourceEnumerator; } internal override bool MoveNext(ref TElement currentElement, ref int currentKey) { if (!m_sourceEnumerator.MoveNext()) return false; currentElement = m_sourceEnumerator.Current; currentKey = 0; return true; } protected override void Dispose(bool disposing) { Contract.Assert(m_sourceEnumerator != null); m_sourceEnumerator.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // PartitionerQueryOperator.cs // // [....] // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Collections.Concurrent; using System.Linq.Parallel; using System.Diagnostics.Contracts; using System.Threading; namespace System.Linq.Parallel { ////// A QueryOperator that represents the output of the query partitioner.AsParallel(). /// internal class PartitionerQueryOperator: QueryOperator { private Partitioner m_partitioner; // The partitioner to use as data source. internal PartitionerQueryOperator(Partitioner partitioner) : base(false, QuerySettings.Empty) { m_partitioner = partitioner; } internal bool Orderable { get { return m_partitioner is OrderablePartitioner ; } } internal override QueryResults Open(QuerySettings settings, bool preferStriping) { // Notice that the preferStriping argument is not used. Partitioner does not support // striped partitioning. return new PartitionerQueryOperatorResults(m_partitioner, settings); } //---------------------------------------------------------------------------------------- // Returns an enumerable that represents the query executing sequentially. // internal override IEnumerable AsSequentialQuery(CancellationToken token) { using (IEnumerator enumerator = m_partitioner.GetPartitions(1)[0]) { while (enumerator.MoveNext()) { yield return enumerator.Current; } } } //--------------------------------------------------------------------------------------- // The state of the order index of the results returned by this operator. // internal override OrdinalIndexState OrdinalIndexState { get { return GetOrdinalIndexState(m_partitioner); } } /// /// Determines the OrdinalIndexState for a partitioner /// internal static OrdinalIndexState GetOrdinalIndexState(Partitionerpartitioner) { OrderablePartitioner orderablePartitioner = partitioner as OrderablePartitioner ; if (orderablePartitioner == null) { return OrdinalIndexState.Shuffled; } if (orderablePartitioner.KeysOrderedInEachPartition) { if (orderablePartitioner.KeysNormalized) { return OrdinalIndexState.Correct; } else { return OrdinalIndexState.Increasing; } } else { return OrdinalIndexState.Shuffled; } } //--------------------------------------------------------------------------------------- // Whether this operator performs a premature merge. // internal override bool LimitsParallelism { get { return false; } } /// /// QueryResults for a PartitionerQueryOperator /// private class PartitionerQueryOperatorResults : QueryResults{ private Partitioner m_partitioner; // The data source for the query private QuerySettings m_settings; // Settings collected from the query internal PartitionerQueryOperatorResults(Partitioner partitioner, QuerySettings settings) { m_partitioner = partitioner; m_settings = settings; } internal override void GivePartitionedStream(IPartitionedStreamRecipient recipient) { Contract.Assert(m_settings.DegreeOfParallelism.HasValue); int partitionCount = m_settings.DegreeOfParallelism.Value; OrderablePartitioner orderablePartitioner = m_partitioner as OrderablePartitioner ; // If the partitioner is not orderable, it will yield zeros as order keys. The order index state // is irrelevant. OrdinalIndexState indexState = (orderablePartitioner != null) ? GetOrdinalIndexState(orderablePartitioner) : OrdinalIndexState.Shuffled; PartitionedStream partitions = new PartitionedStream ( partitionCount, Util.GetDefaultComparer (), indexState); if (orderablePartitioner != null) { IList >> partitionerPartitions = orderablePartitioner.GetOrderablePartitions(partitionCount); if (partitionerPartitions == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList)); } if (partitionerPartitions.Count != partitionCount) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions)); } for (int i = 0; i < partitionCount; i++) { IEnumerator > partition = partitionerPartitions[i]; if (partition == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition)); } partitions[i] = new OrderablePartitionerEnumerator(partition); } } else { IList > partitionerPartitions = m_partitioner.GetPartitions(partitionCount); if (partitionerPartitions == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartitionList)); } if (partitionerPartitions.Count != partitionCount) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_WrongNumberOfPartitions)); } for (int i = 0; i < partitionCount; i++) { IEnumerator partition = partitionerPartitions[i]; if (partition == null) { throw new InvalidOperationException(SR.GetString(SR.PartitionerQueryOperator_NullPartition)); } partitions[i] = new PartitionerEnumerator(partition); } } recipient.Receive (partitions); } } /// /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner /// to a QueryOperatorEnumerator used by PLINQ internally. /// private class OrderablePartitionerEnumerator : QueryOperatorEnumerator{ private IEnumerator > m_sourceEnumerator; internal OrderablePartitionerEnumerator(IEnumerator > sourceEnumerator) { m_sourceEnumerator = sourceEnumerator; } internal override bool MoveNext(ref TElement currentElement, ref int currentKey) { if (!m_sourceEnumerator.MoveNext()) return false; KeyValuePair current = m_sourceEnumerator.Current; currentElement = current.Value; checked { currentKey = (int)current.Key; } return true; } protected override void Dispose(bool disposing) { Contract.Assert(m_sourceEnumerator != null); m_sourceEnumerator.Dispose(); } } /// /// Enumerator that converts an enumerator over key-value pairs exposed by a partitioner /// to a QueryOperatorEnumerator used by PLINQ internally. /// private class PartitionerEnumerator : QueryOperatorEnumerator{ private IEnumerator m_sourceEnumerator; internal PartitionerEnumerator(IEnumerator sourceEnumerator) { m_sourceEnumerator = sourceEnumerator; } internal override bool MoveNext(ref TElement currentElement, ref int currentKey) { if (!m_sourceEnumerator.MoveNext()) return false; currentElement = m_sourceEnumerator.Current; currentKey = 0; return true; } protected override void Dispose(bool disposing) { Contract.Assert(m_sourceEnumerator != null); m_sourceEnumerator.Dispose(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DirectoryObjectSecurity.cs
- LinqDataSourceDisposeEventArgs.cs
- DialogBaseForm.cs
- SqlConnectionFactory.cs
- UpDownBase.cs
- Preprocessor.cs
- StylusOverProperty.cs
- AnimationTimeline.cs
- GridViewColumnCollection.cs
- List.cs
- FloaterBaseParagraph.cs
- MachineKeyConverter.cs
- WorkflowRuntimeService.cs
- JpegBitmapDecoder.cs
- PriorityBinding.cs
- IdentitySection.cs
- MimeFormatExtensions.cs
- ReturnValue.cs
- DataKeyArray.cs
- SiteMapProvider.cs
- Fx.cs
- DataGridViewCellParsingEventArgs.cs
- ReceiveMessageRecord.cs
- AddressHeaderCollection.cs
- HtmlTableRowCollection.cs
- CompoundFileReference.cs
- TreeView.cs
- EmbossBitmapEffect.cs
- ClientData.cs
- LocalizeDesigner.cs
- MsmqNonTransactedPoisonHandler.cs
- ContentPlaceHolder.cs
- EnumMember.cs
- RuleInfoComparer.cs
- WindowShowOrOpenTracker.cs
- RequestCachePolicyConverter.cs
- AutomationPattern.cs
- NativeMethods.cs
- UpDownEvent.cs
- AddInSegmentDirectoryNotFoundException.cs
- DataColumnPropertyDescriptor.cs
- EdmFunctionAttribute.cs
- PassportAuthenticationEventArgs.cs
- OleDbConnectionInternal.cs
- TcpStreams.cs
- HtmlEmptyTagControlBuilder.cs
- DockProviderWrapper.cs
- XmlNamespaceMapping.cs
- CellNormalizer.cs
- AuditLog.cs
- MailMessageEventArgs.cs
- NoClickablePointException.cs
- ProfilePropertyNameValidator.cs
- MemberRelationshipService.cs
- ReadOnlyHierarchicalDataSource.cs
- ScopelessEnumAttribute.cs
- RangeValuePatternIdentifiers.cs
- LinkTarget.cs
- InternalSafeNativeMethods.cs
- WebPartConnectionsConfigureVerb.cs
- AssociationEndMember.cs
- ToolStripOverflow.cs
- XmlBufferedByteStreamReader.cs
- CorrelationManager.cs
- ADMembershipUser.cs
- BufferedWebEventProvider.cs
- ScriptResourceInfo.cs
- EventWaitHandle.cs
- XmlTextReaderImpl.cs
- ChannelBuilder.cs
- WindowPattern.cs
- ControlType.cs
- PointIndependentAnimationStorage.cs
- XamlInt32CollectionSerializer.cs
- PanelStyle.cs
- DecoderExceptionFallback.cs
- RuleCache.cs
- GuidConverter.cs
- ImageCodecInfoPrivate.cs
- JsonWriterDelegator.cs
- TextSelectionHelper.cs
- SingleTagSectionHandler.cs
- RandomNumberGenerator.cs
- Operator.cs
- ImportedNamespaceContextItem.cs
- HttpCachePolicy.cs
- CustomCategoryAttribute.cs
- AsyncPostBackErrorEventArgs.cs
- PixelShader.cs
- ErrorRuntimeConfig.cs
- ListViewItem.cs
- SchemaImporterExtensionElementCollection.cs
- TypeToTreeConverter.cs
- ExpressionEditorAttribute.cs
- CalendarKeyboardHelper.cs
- PrivateFontCollection.cs
- SynchronizationLockException.cs
- WindowsRichEdit.cs
- CaseDesigner.xaml.cs
- NavigatorInput.cs