Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / MsmqReceiveHelper.cs / 1 / MsmqReceiveHelper.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System.ServiceModel.Diagnostics; using System.Threading; using System.Transactions; sealed class MsmqReceiveHelper { IPoisonHandlingStrategy poisonHandler; string queueName; MsmqQueue queue; MsmqReceiveParameters receiveParameters; Uri uri; string instanceId; IMsmqMessagePool pool; MsmqInputChannelBase channel; MsmqChannelListenerBase listener; ServiceModelActivity activity; internal MsmqReceiveHelper(MsmqReceiveParameters receiveParameters, Uri uri, IMsmqMessagePool messagePool, MsmqInputChannelBase channel, MsmqChannelListenerBase listener) { this.queueName = receiveParameters.AddressTranslator.UriToFormatName(uri); this.receiveParameters = receiveParameters; this.uri = uri; this.queue = new MsmqQueue(this.receiveParameters.AddressTranslator.UriToFormatName(uri), UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.instanceId = uri.ToString().ToUpperInvariant(); this.pool = messagePool; this.poisonHandler = Msmq.CreatePoisonHandler(this); this.channel = channel; this.listener = listener; } internal ServiceModelActivity Activity { get { return this.activity; } } IPoisonHandlingStrategy PoisonHandler { get { return this.poisonHandler; } } internal MsmqReceiveParameters MsmqReceiveParameters { get { return this.receiveParameters; } } internal MsmqInputChannelBase Channel { get { return this.channel; } } internal MsmqChannelListenerBase ChannelListener { get { return this.listener; } } internal Uri ListenUri { get { return this.uri; } } internal string InstanceId { get { return this.instanceId; } } internal MsmqQueue Queue { get { return this.queue; } } internal bool Transactional { get { return this.receiveParameters.ExactlyOnce; } } internal void Open() { this.activity = MsmqDiagnostics.StartListenAtActivity(this); using (MsmqDiagnostics.BoundOpenOperation(this)) { this.queue.EnsureOpen(); this.poisonHandler.Open(); } } internal void Close() { using (ServiceModelActivity.BoundOperation(this.Activity)) { this.poisonHandler.Dispose(); this.queue.Dispose(); } ServiceModelActivity.Stop(this.activity); } internal MsmqInputMessage TakeMessage() { return this.pool.TakeMessage(); } internal void ReturnMessage(MsmqInputMessage message) { this.pool.ReturnMessage(message); } internal static void TryAbortTransactionCurrent() { if (null != Transaction.Current) { try { Transaction.Current.Rollback(); } catch (TransactionAbortedException ex) { MsmqDiagnostics.ExpectedException(ex); } catch (ObjectDisposedException ex) { MsmqDiagnostics.ExpectedException(ex); } } } internal void DropOrRejectReceivedMessage(MsmqMessageProperty messageProperty, bool reject) { if (this.Transactional) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(messageProperty.LookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { using (MsmqEmptyMessage emptyMessage = new MsmqEmptyMessage()) { using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { result = this.Queue.TryReceiveByLookupId(messageProperty.LookupId, emptyMessage, MsmqTransactionMode.CurrentOrThrow); if (MsmqQueue.MoveReceiveResult.Succeeded == result && reject) this.Queue.MarkMessageRejected(messageProperty.LookupId); scope.Complete(); } } if (result == MsmqQueue.MoveReceiveResult.Succeeded) MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, Msmq.IsRejectMessageSupported); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } else { MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, false); } } // internal static void MoveReceivedMessage(MsmqQueue queueFrom, MsmqQueue queueTo, long lookupId) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(lookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { result = queueFrom.TryMoveMessage(lookupId, queueTo, MsmqTransactionMode.Single); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } internal void FinalDisposition(MsmqMessageProperty messageProperty) { this.poisonHandler.FinalDisposition(messageProperty); } // WaitForMessage internal bool WaitForMessage(TimeSpan timeout) { using (MsmqEmptyMessage message = new MsmqEmptyMessage()) { return (MsmqQueue.ReceiveResult.Timeout != this.queue.TryPeek(message, timeout)); } } // internal IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return new WaitForMessageAsyncResult(this.queue, timeout, callback, state); } // public bool EndWaitForMessage(IAsyncResult result) { return WaitForMessageAsyncResult.End(result); } internal bool TryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, out MsmqMessageProperty property) { property = null; MsmqQueue.ReceiveResult receiveResult = this.Queue.TryReceive(msmqMessage, timeout, transactionMode); if (MsmqQueue.ReceiveResult.OperationCancelled == receiveResult) return true; if (MsmqQueue.ReceiveResult.Timeout == receiveResult) return false; else { property = new MsmqMessageProperty(msmqMessage); if (this.Transactional) { if (this.PoisonHandler.CheckAndHandlePoisonMessage(property)) { long lookupId = property.LookupId; property = null; throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new MsmqPoisonMessageException(lookupId)); } } return true; } } // internal IAsyncResult BeginTryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) { if (this.receiveParameters.ExactlyOnce) return new TryTransactedReceiveAsyncResult(this, msmqMessage, timeout, transactionMode, callback, state); else return new TryNonTransactedReceiveAsyncResult(this, msmqMessage, timeout, callback, state); } // internal bool EndTryReceive(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty msmqProperty) { msmqMessage = null; msmqProperty = null; if (null == result) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result"); if (this.receiveParameters.ExactlyOnce) { TryTransactedReceiveAsyncResult receiveResult = result as TryTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } else { TryNonTransactedReceiveAsyncResult receiveResult = result as TryNonTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryNonTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } } // TryReceiveAsyncResult (tx version) class TryTransactedReceiveAsyncResult : AsyncResult { bool expired; MsmqReceiveHelper receiver; TimeoutHelper timeoutHelper; Transaction txCurrent; MsmqInputMessage msmqMessage; MsmqMessageProperty messageProperty; MsmqTransactionMode transactionMode; static WaitCallback onComplete = OnComplete; internal TryTransactedReceiveAsyncResult( MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) : base(callback, state) { this.timeoutHelper = new TimeoutHelper(timeout); this.txCurrent = Transaction.Current; this.receiver = receiver; this.msmqMessage = msmqMessage; this.transactionMode = transactionMode; IOThreadScheduler.ScheduleCallback(onComplete, this); } static void OnComplete(object parameter) { TryTransactedReceiveAsyncResult result = parameter as TryTransactedReceiveAsyncResult; Transaction savedTransaction = Transaction.Current; Transaction.Current = result.txCurrent; try { Exception ex = null; try { result.expired = ! result.receiver.TryReceive(result.msmqMessage, result.timeoutHelper.RemainingTime(), result.transactionMode, out result.messageProperty); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } result.Complete(false, ex); } finally { Transaction.Current = savedTransaction; } } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryTransactedReceiveAsyncResult receiveResult = AsyncResult.End(result); msmqMessage = receiveResult.msmqMessage; property = receiveResult.messageProperty; return ! receiveResult.expired; } } // TryReceiveAsyncResult (non-tx version) class TryNonTransactedReceiveAsyncResult : AsyncResult { MsmqQueue.ReceiveResult receiveResult; MsmqReceiveHelper receiver; MsmqInputMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); internal TryNonTransactedReceiveAsyncResult(MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.receiver = receiver; this.msmqMessage = msmqMessage; receiver.Queue.BeginTryReceive(msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { (result.AsyncState as TryNonTransactedReceiveAsyncResult).OnComplete(result); } void OnComplete(IAsyncResult result) { Exception ex = null; try { receiveResult = receiver.Queue.EndTryReceive(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } Complete(result.CompletedSynchronously, ex); } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryNonTransactedReceiveAsyncResult asyncResult = AsyncResult.End (result); msmqMessage = asyncResult.msmqMessage; property = null; if (MsmqQueue.ReceiveResult.Timeout == asyncResult.receiveResult) return false; else if (MsmqQueue.ReceiveResult.OperationCancelled == asyncResult.receiveResult) return true; else { property = new MsmqMessageProperty(msmqMessage); return true; } } } // WaitForMessageAsyncResult class WaitForMessageAsyncResult : TypedAsyncResult { MsmqQueue msmqQueue; MsmqEmptyMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); public WaitForMessageAsyncResult(MsmqQueue msmqQueue, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.msmqMessage = new MsmqEmptyMessage(); this.msmqQueue = msmqQueue; this.msmqQueue.BeginPeek(this.msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { ((WaitForMessageAsyncResult)result.AsyncState).OnComplete(result); } void OnComplete(IAsyncResult result) { this.msmqMessage.Dispose(); MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; Exception ex = null; try { receiveResult = this.msmqQueue.EndPeek(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } if (null == ex) Complete(receiveResult != MsmqQueue.ReceiveResult.Timeout, result.CompletedSynchronously); else Complete(result.CompletedSynchronously, ex); } } // PostRollbackErrorStrategy interface IPostRollbackErrorStrategy { bool AnotherTryNeeded(); } // SimplePostRollbackErrorStrategy class SimplePostRollbackErrorStrategy : IPostRollbackErrorStrategy { const int Attempts = 50; const int MillisecondsToSleep = 100; int attemptsLeft = Attempts; long lookupId; internal SimplePostRollbackErrorStrategy(long lookupId) { this.lookupId = lookupId; } bool IPostRollbackErrorStrategy.AnotherTryNeeded() { if (--this.attemptsLeft > 0) { if (attemptsLeft == (Attempts - 1)) MsmqDiagnostics.MessageLockedUnderTheTransaction(lookupId); Thread.Sleep(TimeSpan.FromMilliseconds(MillisecondsToSleep)); return true; } else { MsmqDiagnostics.MoveOrDeleteAttemptFailed(lookupId); return false; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- GCHandleCookieTable.cs
- Button.cs
- RewritingProcessor.cs
- XamlSerializerUtil.cs
- BamlStream.cs
- Int64AnimationBase.cs
- FileLevelControlBuilderAttribute.cs
- Compiler.cs
- ListViewDeleteEventArgs.cs
- ProxyGenerator.cs
- CheckBoxFlatAdapter.cs
- FixedSOMTable.cs
- MenuAdapter.cs
- COM2IDispatchConverter.cs
- PrinterSettings.cs
- VersionedStream.cs
- PenLineJoinValidation.cs
- TemplateXamlParser.cs
- XmlSecureResolver.cs
- TwoPhaseCommit.cs
- OptionalColumn.cs
- SetIterators.cs
- ResourceExpressionBuilder.cs
- DeviceSpecificChoice.cs
- TextAnchor.cs
- FixedPosition.cs
- PropertyOrder.cs
- DbConnectionPoolGroupProviderInfo.cs
- Codec.cs
- EditableRegion.cs
- SingleStorage.cs
- OutputCacheSettings.cs
- WithStatement.cs
- PackagePart.cs
- TokenizerHelper.cs
- StandardOleMarshalObject.cs
- AssemblyResourceLoader.cs
- ZipIOZip64EndOfCentralDirectoryBlock.cs
- CoreSwitches.cs
- ToolBar.cs
- ImpersonateTokenRef.cs
- DataTransferEventArgs.cs
- RestHandler.cs
- FolderNameEditor.cs
- RectAnimationUsingKeyFrames.cs
- DataControlFieldCell.cs
- PasswordPropertyTextAttribute.cs
- ImageListUtils.cs
- ContainerActivationHelper.cs
- CheckBoxDesigner.cs
- ProgressChangedEventArgs.cs
- TypeBuilder.cs
- VScrollBar.cs
- WpfKnownMember.cs
- MediaContextNotificationWindow.cs
- HttpApplicationStateWrapper.cs
- IndexerNameAttribute.cs
- RootNamespaceAttribute.cs
- IOThreadTimer.cs
- StylusCaptureWithinProperty.cs
- ReadOnlyDataSource.cs
- EntityUtil.cs
- StringFreezingAttribute.cs
- SqlTypeSystemProvider.cs
- mda.cs
- XmlCharCheckingWriter.cs
- SQLDecimalStorage.cs
- LineBreak.cs
- Helper.cs
- DummyDataSource.cs
- CollectionView.cs
- CommonObjectSecurity.cs
- EmbeddedMailObject.cs
- AmbientValueAttribute.cs
- ToolStripActionList.cs
- SoapSchemaImporter.cs
- DefaultTextStore.cs
- IndentTextWriter.cs
- ReceiveActivityValidator.cs
- ProcessRequestAsyncResult.cs
- TreeViewImageKeyConverter.cs
- SafeSystemMetrics.cs
- ItemCollectionEditor.cs
- OrCondition.cs
- WebPartDisplayModeCancelEventArgs.cs
- RuntimeResourceSet.cs
- RotationValidation.cs
- WebHeaderCollection.cs
- OpenTypeLayout.cs
- DbModificationCommandTree.cs
- QilGeneratorEnv.cs
- WebPartVerbsEventArgs.cs
- HttpRawResponse.cs
- ActivationServices.cs
- SqlOuterApplyReducer.cs
- DataBindingExpressionBuilder.cs
- SqlParameterizer.cs
- VarInfo.cs
- ValueHandle.cs
- GeneralTransform.cs