Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / MsmqReceiveHelper.cs / 1 / MsmqReceiveHelper.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System.ServiceModel.Diagnostics; using System.Threading; using System.Transactions; sealed class MsmqReceiveHelper { IPoisonHandlingStrategy poisonHandler; string queueName; MsmqQueue queue; MsmqReceiveParameters receiveParameters; Uri uri; string instanceId; IMsmqMessagePool pool; MsmqInputChannelBase channel; MsmqChannelListenerBase listener; ServiceModelActivity activity; internal MsmqReceiveHelper(MsmqReceiveParameters receiveParameters, Uri uri, IMsmqMessagePool messagePool, MsmqInputChannelBase channel, MsmqChannelListenerBase listener) { this.queueName = receiveParameters.AddressTranslator.UriToFormatName(uri); this.receiveParameters = receiveParameters; this.uri = uri; this.queue = new MsmqQueue(this.receiveParameters.AddressTranslator.UriToFormatName(uri), UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.instanceId = uri.ToString().ToUpperInvariant(); this.pool = messagePool; this.poisonHandler = Msmq.CreatePoisonHandler(this); this.channel = channel; this.listener = listener; } internal ServiceModelActivity Activity { get { return this.activity; } } IPoisonHandlingStrategy PoisonHandler { get { return this.poisonHandler; } } internal MsmqReceiveParameters MsmqReceiveParameters { get { return this.receiveParameters; } } internal MsmqInputChannelBase Channel { get { return this.channel; } } internal MsmqChannelListenerBase ChannelListener { get { return this.listener; } } internal Uri ListenUri { get { return this.uri; } } internal string InstanceId { get { return this.instanceId; } } internal MsmqQueue Queue { get { return this.queue; } } internal bool Transactional { get { return this.receiveParameters.ExactlyOnce; } } internal void Open() { this.activity = MsmqDiagnostics.StartListenAtActivity(this); using (MsmqDiagnostics.BoundOpenOperation(this)) { this.queue.EnsureOpen(); this.poisonHandler.Open(); } } internal void Close() { using (ServiceModelActivity.BoundOperation(this.Activity)) { this.poisonHandler.Dispose(); this.queue.Dispose(); } ServiceModelActivity.Stop(this.activity); } internal MsmqInputMessage TakeMessage() { return this.pool.TakeMessage(); } internal void ReturnMessage(MsmqInputMessage message) { this.pool.ReturnMessage(message); } internal static void TryAbortTransactionCurrent() { if (null != Transaction.Current) { try { Transaction.Current.Rollback(); } catch (TransactionAbortedException ex) { MsmqDiagnostics.ExpectedException(ex); } catch (ObjectDisposedException ex) { MsmqDiagnostics.ExpectedException(ex); } } } internal void DropOrRejectReceivedMessage(MsmqMessageProperty messageProperty, bool reject) { if (this.Transactional) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(messageProperty.LookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { using (MsmqEmptyMessage emptyMessage = new MsmqEmptyMessage()) { using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { result = this.Queue.TryReceiveByLookupId(messageProperty.LookupId, emptyMessage, MsmqTransactionMode.CurrentOrThrow); if (MsmqQueue.MoveReceiveResult.Succeeded == result && reject) this.Queue.MarkMessageRejected(messageProperty.LookupId); scope.Complete(); } } if (result == MsmqQueue.MoveReceiveResult.Succeeded) MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, Msmq.IsRejectMessageSupported); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } else { MsmqDiagnostics.MessageConsumed(instanceId, messageProperty.MessageId, false); } } // internal static void MoveReceivedMessage(MsmqQueue queueFrom, MsmqQueue queueTo, long lookupId) { TryAbortTransactionCurrent(); IPostRollbackErrorStrategy postRollback = new SimplePostRollbackErrorStrategy(lookupId); MsmqQueue.MoveReceiveResult result = MsmqQueue.MoveReceiveResult.Unknown; do { result = queueFrom.TryMoveMessage(lookupId, queueTo, MsmqTransactionMode.Single); if (result != MsmqQueue.MoveReceiveResult.MessageLockedUnderTransaction) break; } while (postRollback.AnotherTryNeeded()); } internal void FinalDisposition(MsmqMessageProperty messageProperty) { this.poisonHandler.FinalDisposition(messageProperty); } // WaitForMessage internal bool WaitForMessage(TimeSpan timeout) { using (MsmqEmptyMessage message = new MsmqEmptyMessage()) { return (MsmqQueue.ReceiveResult.Timeout != this.queue.TryPeek(message, timeout)); } } // internal IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return new WaitForMessageAsyncResult(this.queue, timeout, callback, state); } // public bool EndWaitForMessage(IAsyncResult result) { return WaitForMessageAsyncResult.End(result); } internal bool TryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, out MsmqMessageProperty property) { property = null; MsmqQueue.ReceiveResult receiveResult = this.Queue.TryReceive(msmqMessage, timeout, transactionMode); if (MsmqQueue.ReceiveResult.OperationCancelled == receiveResult) return true; if (MsmqQueue.ReceiveResult.Timeout == receiveResult) return false; else { property = new MsmqMessageProperty(msmqMessage); if (this.Transactional) { if (this.PoisonHandler.CheckAndHandlePoisonMessage(property)) { long lookupId = property.LookupId; property = null; throw DiagnosticUtility.ExceptionUtility.ThrowHelperCritical(new MsmqPoisonMessageException(lookupId)); } } return true; } } // internal IAsyncResult BeginTryReceive(MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) { if (this.receiveParameters.ExactlyOnce) return new TryTransactedReceiveAsyncResult(this, msmqMessage, timeout, transactionMode, callback, state); else return new TryNonTransactedReceiveAsyncResult(this, msmqMessage, timeout, callback, state); } // internal bool EndTryReceive(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty msmqProperty) { msmqMessage = null; msmqProperty = null; if (null == result) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result"); if (this.receiveParameters.ExactlyOnce) { TryTransactedReceiveAsyncResult receiveResult = result as TryTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } else { TryNonTransactedReceiveAsyncResult receiveResult = result as TryNonTransactedReceiveAsyncResult; if (null == receiveResult) throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument(SR.GetString(SR.InvalidAsyncResult)); return TryNonTransactedReceiveAsyncResult.End(receiveResult, out msmqMessage, out msmqProperty); } } // TryReceiveAsyncResult (tx version) class TryTransactedReceiveAsyncResult : AsyncResult { bool expired; MsmqReceiveHelper receiver; TimeoutHelper timeoutHelper; Transaction txCurrent; MsmqInputMessage msmqMessage; MsmqMessageProperty messageProperty; MsmqTransactionMode transactionMode; static WaitCallback onComplete = OnComplete; internal TryTransactedReceiveAsyncResult( MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, MsmqTransactionMode transactionMode, AsyncCallback callback, object state) : base(callback, state) { this.timeoutHelper = new TimeoutHelper(timeout); this.txCurrent = Transaction.Current; this.receiver = receiver; this.msmqMessage = msmqMessage; this.transactionMode = transactionMode; IOThreadScheduler.ScheduleCallback(onComplete, this); } static void OnComplete(object parameter) { TryTransactedReceiveAsyncResult result = parameter as TryTransactedReceiveAsyncResult; Transaction savedTransaction = Transaction.Current; Transaction.Current = result.txCurrent; try { Exception ex = null; try { result.expired = ! result.receiver.TryReceive(result.msmqMessage, result.timeoutHelper.RemainingTime(), result.transactionMode, out result.messageProperty); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } result.Complete(false, ex); } finally { Transaction.Current = savedTransaction; } } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryTransactedReceiveAsyncResult receiveResult = AsyncResult.End(result); msmqMessage = receiveResult.msmqMessage; property = receiveResult.messageProperty; return ! receiveResult.expired; } } // TryReceiveAsyncResult (non-tx version) class TryNonTransactedReceiveAsyncResult : AsyncResult { MsmqQueue.ReceiveResult receiveResult; MsmqReceiveHelper receiver; MsmqInputMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); internal TryNonTransactedReceiveAsyncResult(MsmqReceiveHelper receiver, MsmqInputMessage msmqMessage, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.receiver = receiver; this.msmqMessage = msmqMessage; receiver.Queue.BeginTryReceive(msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { (result.AsyncState as TryNonTransactedReceiveAsyncResult).OnComplete(result); } void OnComplete(IAsyncResult result) { Exception ex = null; try { receiveResult = receiver.Queue.EndTryReceive(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } Complete(result.CompletedSynchronously, ex); } internal static bool End(IAsyncResult result, out MsmqInputMessage msmqMessage, out MsmqMessageProperty property) { TryNonTransactedReceiveAsyncResult asyncResult = AsyncResult.End (result); msmqMessage = asyncResult.msmqMessage; property = null; if (MsmqQueue.ReceiveResult.Timeout == asyncResult.receiveResult) return false; else if (MsmqQueue.ReceiveResult.OperationCancelled == asyncResult.receiveResult) return true; else { property = new MsmqMessageProperty(msmqMessage); return true; } } } // WaitForMessageAsyncResult class WaitForMessageAsyncResult : TypedAsyncResult { MsmqQueue msmqQueue; MsmqEmptyMessage msmqMessage; static AsyncCallback onCompleteStatic = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnCompleteStatic)); public WaitForMessageAsyncResult(MsmqQueue msmqQueue, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.msmqMessage = new MsmqEmptyMessage(); this.msmqQueue = msmqQueue; this.msmqQueue.BeginPeek(this.msmqMessage, timeout, onCompleteStatic, this); } static void OnCompleteStatic(IAsyncResult result) { ((WaitForMessageAsyncResult)result.AsyncState).OnComplete(result); } void OnComplete(IAsyncResult result) { this.msmqMessage.Dispose(); MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; Exception ex = null; try { receiveResult = this.msmqQueue.EndPeek(result); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; ex = e; } if (null == ex) Complete(receiveResult != MsmqQueue.ReceiveResult.Timeout, result.CompletedSynchronously); else Complete(result.CompletedSynchronously, ex); } } // PostRollbackErrorStrategy interface IPostRollbackErrorStrategy { bool AnotherTryNeeded(); } // SimplePostRollbackErrorStrategy class SimplePostRollbackErrorStrategy : IPostRollbackErrorStrategy { const int Attempts = 50; const int MillisecondsToSleep = 100; int attemptsLeft = Attempts; long lookupId; internal SimplePostRollbackErrorStrategy(long lookupId) { this.lookupId = lookupId; } bool IPostRollbackErrorStrategy.AnotherTryNeeded() { if (--this.attemptsLeft > 0) { if (attemptsLeft == (Attempts - 1)) MsmqDiagnostics.MessageLockedUnderTheTransaction(lookupId); Thread.Sleep(TimeSpan.FromMilliseconds(MillisecondsToSleep)); return true; } else { MsmqDiagnostics.MoveOrDeleteAttemptFailed(lookupId); return false; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- CellTreeNodeVisitors.cs
- XamlRtfConverter.cs
- TransferRequestHandler.cs
- WindowsToolbar.cs
- ArgumentOutOfRangeException.cs
- XMLSyntaxException.cs
- PageCache.cs
- DefinitionUpdate.cs
- ZipIOBlockManager.cs
- FilteredXmlReader.cs
- AssemblyFilter.cs
- BuiltInExpr.cs
- MimeMultiPart.cs
- TransportChannelListener.cs
- ListCommandEventArgs.cs
- PassportAuthenticationModule.cs
- RoleGroupCollection.cs
- Timer.cs
- WebPartCatalogCloseVerb.cs
- UncommonField.cs
- WebColorConverter.cs
- PolyQuadraticBezierSegment.cs
- SimpleType.cs
- FrameworkElement.cs
- TextPointerBase.cs
- ChtmlCommandAdapter.cs
- __Filters.cs
- DbConnectionPoolGroupProviderInfo.cs
- nulltextcontainer.cs
- AttachedPropertyMethodSelector.cs
- PropertyOrder.cs
- X509Certificate2.cs
- WebPartCloseVerb.cs
- OracleBoolean.cs
- UInt16Converter.cs
- Attributes.cs
- JoinCqlBlock.cs
- SingleAnimationUsingKeyFrames.cs
- CreateRefExpr.cs
- OrderingQueryOperator.cs
- EventMap.cs
- NameScopePropertyAttribute.cs
- ErrorCodes.cs
- ScriptServiceAttribute.cs
- Parsers.cs
- LambdaCompiler.ControlFlow.cs
- DetailsViewDeleteEventArgs.cs
- MembershipValidatePasswordEventArgs.cs
- ObjectSecurity.cs
- BuildProviderAppliesToAttribute.cs
- VisualStyleElement.cs
- Border.cs
- ReceiveErrorHandling.cs
- KerberosSecurityTokenAuthenticator.cs
- ArgIterator.cs
- WebPartEventArgs.cs
- DbConnectionStringBuilder.cs
- DataGridViewColumnCollection.cs
- FileRecordSequence.cs
- DispatcherHooks.cs
- CloudCollection.cs
- DbConnectionFactory.cs
- XmlDigitalSignatureProcessor.cs
- ObjectReaderCompiler.cs
- XmlAttributeAttribute.cs
- CodeCompiler.cs
- NavigationProperty.cs
- ImageFormat.cs
- XmlReaderSettings.cs
- GifBitmapEncoder.cs
- EntityParameter.cs
- SqlProcedureAttribute.cs
- TransportReplyChannelAcceptor.cs
- ContentDisposition.cs
- MouseCaptureWithinProperty.cs
- XmlQualifiedName.cs
- HtmlSelect.cs
- Pen.cs
- ScriptModule.cs
- DiffuseMaterial.cs
- SystemDiagnosticsSection.cs
- WriteStateInfoBase.cs
- MdImport.cs
- RowType.cs
- SmtpCommands.cs
- XmlDocumentFragment.cs
- ObjectIDGenerator.cs
- MailDefinition.cs
- FormattedText.cs
- __ComObject.cs
- FileRecordSequenceCompletedAsyncResult.cs
- MaterializeFromAtom.cs
- Vector.cs
- Utility.cs
- HttpRequestCacheValidator.cs
- TimeSpanValidator.cs
- HMACMD5.cs
- CodePrimitiveExpression.cs
- ZipIOModeEnforcingStream.cs
- HttpWebRequestElement.cs