Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / Msmq4PoisonHandler.cs / 1 / Msmq4PoisonHandler.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.IO; using System.Threading; using System.Transactions; sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy { MsmqQueue mainQueue; MsmqQueue mainQueueForMove; MsmqQueue retryQueueForPeek; MsmqQueue retryQueueForMove; MsmqQueue poisonQueue; IOThreadTimer timer; MsmqReceiveHelper receiver; bool disposed; string poisonQueueName; string retryQueueName; string mainQueueName; MsmqRetryQueueMessage retryQueueMessage; static WaitCallback onStartPeek = new WaitCallback(StartPeek); static AsyncCallback onPeekCompleted = DiagnosticUtility.ThunkAsyncCallback(OnPeekCompleted); public Msmq4PoisonHandler(MsmqReceiveHelper receiver) { this.receiver = receiver; this.timer = new IOThreadTimer(OnTimer, null, false); this.disposed = false; this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri); this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison")); this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry")); } MsmqReceiveParameters ReceiveParameters { get { return this.receiver.MsmqReceiveParameters; } } Uri ListenUri { get { return this.receiver.ListenUri; } } public void Open() { this.mainQueue = this.receiver.Queue; this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); // Open up the poison queue (for handling poison messages). this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.retryQueueMessage = new MsmqRetryQueueMessage(); if (Thread.CurrentThread.IsThreadPoolThread) StartPeek(this); else IOThreadScheduler.ScheduleCallback(Msmq4PoisonHandler.onStartPeek, this); } static void StartPeek(object state) { Msmq4PoisonHandler handler = state as Msmq4PoisonHandler; lock(handler) { if(! handler.disposed) { handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler); } } } public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty) { if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount) return false; int retryCycle = messageProperty.MoveCount / 2; lock(this) { if (this.disposed) throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString())); if (retryCycle >= this.ReceiveParameters.MaxRetryCycles) { FinalDisposition(messageProperty); } else { MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId); } } return true; } public void FinalDisposition(MsmqMessageProperty messageProperty) { switch (this.ReceiveParameters.ReceiveErrorHandling) { case ReceiveErrorHandling.Drop: this.receiver.DropOrRejectReceivedMessage(messageProperty, false); break; case ReceiveErrorHandling.Fault: MsmqReceiveHelper.TryAbortTransactionCurrent(); if (null != this.receiver.ChannelListener) this.receiver.ChannelListener.FaultListener(); if (null != this.receiver.Channel) this.receiver.Channel.FaultChannel(); break; case ReceiveErrorHandling.Reject: this.receiver.DropOrRejectReceivedMessage(messageProperty, true); MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId); break; case ReceiveErrorHandling.Move: MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.poisonQueue, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId); break; default: DiagnosticUtility.DebugAssert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)"); break; } } public void Dispose() { lock(this) { if(!this.disposed) { this.disposed = true; this.timer.Cancel(); if (null != this.retryQueueForPeek) this.retryQueueForPeek.Dispose(); if (null != this.retryQueueForMove) this.retryQueueForMove.Dispose(); if (null != this.poisonQueue) this.poisonQueue.Dispose(); if (null != this.mainQueueForMove) this.mainQueueForMove.Dispose(); } } } static void OnPeekCompleted(IAsyncResult result) { Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler; MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; try { receiveResult = handler.retryQueueForPeek.EndPeek(result); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult) { lock(handler) { if(!handler.disposed) { // Check the time - move it, and begin peeking again // if necessary, or wait for the timeout. DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value); TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow; if (waitTime < TimeSpan.Zero) handler.OnTimer(handler); else handler.timer.Set(waitTime); } } } } void OnTimer(object state) { lock(this) { if(!this.disposed) { try { this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this); } } } class MsmqRetryQueueMessage : NativeMsmqMessage { LongProperty lookupId; IntProperty lastMoveTime; public MsmqRetryQueueMessage() : base(2) { this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID); this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME); } public LongProperty LookupId { get { return this.lookupId; } } public IntProperty LastMoveTime { get { return this.lastMoveTime; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ResetableIterator.cs
- COMException.cs
- TargetControlTypeCache.cs
- ComponentGlyph.cs
- LayoutExceptionEventArgs.cs
- ModulesEntry.cs
- MailDefinition.cs
- GenericWebPart.cs
- ModuleConfigurationInfo.cs
- DataRelationPropertyDescriptor.cs
- MenuItemStyleCollection.cs
- DataGridDefaultColumnWidthTypeConverter.cs
- DataServiceRequestOfT.cs
- TrackPointCollection.cs
- ColumnPropertiesGroup.cs
- BasePattern.cs
- RotateTransform3D.cs
- ExpressionPrefixAttribute.cs
- SaveFileDialog.cs
- DecoderNLS.cs
- Vector3DAnimationUsingKeyFrames.cs
- PersonalizationAdministration.cs
- ExternalCalls.cs
- TdsParserSessionPool.cs
- MemberPath.cs
- DesignerOptionService.cs
- DoubleConverter.cs
- QueryOperationResponseOfT.cs
- DiffuseMaterial.cs
- CompilerInfo.cs
- TabPanel.cs
- FlowDocumentScrollViewer.cs
- LayoutEvent.cs
- PropertyCollection.cs
- LinearKeyFrames.cs
- ProgressiveCrcCalculatingStream.cs
- KeyInfo.cs
- HttpCookiesSection.cs
- NTAccount.cs
- SecurityHeaderTokenResolver.cs
- Intellisense.cs
- FatalException.cs
- LoginView.cs
- TypeDescriptor.cs
- TreeViewHitTestInfo.cs
- DbParameterCollectionHelper.cs
- StatusBarDrawItemEvent.cs
- EntityDataSourceEntityTypeFilterItem.cs
- RowParagraph.cs
- TimeoutValidationAttribute.cs
- ScriptIgnoreAttribute.cs
- CustomValidator.cs
- Configuration.cs
- Context.cs
- Sql8ConformanceChecker.cs
- ImageClickEventArgs.cs
- ProvidersHelper.cs
- x509store.cs
- BaseUriHelper.cs
- XmlTypeAttribute.cs
- StringConcat.cs
- FtpWebResponse.cs
- Clipboard.cs
- TemplateManager.cs
- ErrorsHelper.cs
- DesigntimeLicenseContextSerializer.cs
- FontCacheLogic.cs
- ListViewInsertEventArgs.cs
- LicenseException.cs
- UserControlBuildProvider.cs
- HttpCapabilitiesEvaluator.cs
- CommandConverter.cs
- AttachedPropertyBrowsableForChildrenAttribute.cs
- Rectangle.cs
- GetPageCompletedEventArgs.cs
- ScriptManager.cs
- AutoResizedEvent.cs
- EventBuilder.cs
- ComponentDispatcher.cs
- InfoCardArgumentException.cs
- Token.cs
- ControlValuePropertyAttribute.cs
- CodeArgumentReferenceExpression.cs
- Graphics.cs
- UiaCoreTypesApi.cs
- ThreadPool.cs
- XPathNavigatorReader.cs
- WmiInstallComponent.cs
- CommandConverter.cs
- OracleFactory.cs
- BamlLocalizabilityResolver.cs
- JavaScriptObjectDeserializer.cs
- IdleTimeoutMonitor.cs
- DataRelationPropertyDescriptor.cs
- MultiAsyncResult.cs
- MdImport.cs
- SqlPersistenceWorkflowInstanceDescription.cs
- DataSourceConverter.cs
- PartialTrustVisibleAssemblyCollection.cs
- Condition.cs