Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / Msmq4PoisonHandler.cs / 1 / Msmq4PoisonHandler.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.IO; using System.Threading; using System.Transactions; sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy { MsmqQueue mainQueue; MsmqQueue mainQueueForMove; MsmqQueue retryQueueForPeek; MsmqQueue retryQueueForMove; MsmqQueue poisonQueue; IOThreadTimer timer; MsmqReceiveHelper receiver; bool disposed; string poisonQueueName; string retryQueueName; string mainQueueName; MsmqRetryQueueMessage retryQueueMessage; static WaitCallback onStartPeek = new WaitCallback(StartPeek); static AsyncCallback onPeekCompleted = DiagnosticUtility.ThunkAsyncCallback(OnPeekCompleted); public Msmq4PoisonHandler(MsmqReceiveHelper receiver) { this.receiver = receiver; this.timer = new IOThreadTimer(OnTimer, null, false); this.disposed = false; this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri); this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison")); this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry")); } MsmqReceiveParameters ReceiveParameters { get { return this.receiver.MsmqReceiveParameters; } } Uri ListenUri { get { return this.receiver.ListenUri; } } public void Open() { this.mainQueue = this.receiver.Queue; this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); // Open up the poison queue (for handling poison messages). this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS); this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS); this.retryQueueMessage = new MsmqRetryQueueMessage(); if (Thread.CurrentThread.IsThreadPoolThread) StartPeek(this); else IOThreadScheduler.ScheduleCallback(Msmq4PoisonHandler.onStartPeek, this); } static void StartPeek(object state) { Msmq4PoisonHandler handler = state as Msmq4PoisonHandler; lock(handler) { if(! handler.disposed) { handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler); } } } public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty) { if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount) return false; int retryCycle = messageProperty.MoveCount / 2; lock(this) { if (this.disposed) throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString())); if (retryCycle >= this.ReceiveParameters.MaxRetryCycles) { FinalDisposition(messageProperty); } else { MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId); } } return true; } public void FinalDisposition(MsmqMessageProperty messageProperty) { switch (this.ReceiveParameters.ReceiveErrorHandling) { case ReceiveErrorHandling.Drop: this.receiver.DropOrRejectReceivedMessage(messageProperty, false); break; case ReceiveErrorHandling.Fault: MsmqReceiveHelper.TryAbortTransactionCurrent(); if (null != this.receiver.ChannelListener) this.receiver.ChannelListener.FaultListener(); if (null != this.receiver.Channel) this.receiver.Channel.FaultChannel(); break; case ReceiveErrorHandling.Reject: this.receiver.DropOrRejectReceivedMessage(messageProperty, true); MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId); break; case ReceiveErrorHandling.Move: MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.poisonQueue, messageProperty.LookupId); MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId); break; default: DiagnosticUtility.DebugAssert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)"); break; } } public void Dispose() { lock(this) { if(!this.disposed) { this.disposed = true; this.timer.Cancel(); if (null != this.retryQueueForPeek) this.retryQueueForPeek.Dispose(); if (null != this.retryQueueForMove) this.retryQueueForMove.Dispose(); if (null != this.poisonQueue) this.poisonQueue.Dispose(); if (null != this.mainQueueForMove) this.mainQueueForMove.Dispose(); } } } static void OnPeekCompleted(IAsyncResult result) { Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler; MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown; try { receiveResult = handler.retryQueueForPeek.EndPeek(result); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult) { lock(handler) { if(!handler.disposed) { // Check the time - move it, and begin peeking again // if necessary, or wait for the timeout. DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value); TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow; if (waitTime < TimeSpan.Zero) handler.OnTimer(handler); else handler.timer.Set(waitTime); } } } } void OnTimer(object state) { lock(this) { if(!this.disposed) { try { this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single); } catch (MsmqException ex) { MsmqDiagnostics.ExpectedException(ex); } this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this); } } } class MsmqRetryQueueMessage : NativeMsmqMessage { LongProperty lookupId; IntProperty lastMoveTime; public MsmqRetryQueueMessage() : base(2) { this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID); this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME); } public LongProperty LookupId { get { return this.lookupId; } } public IntProperty LastMoveTime { get { return this.lastMoveTime; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Mapping.cs
- SettingsProperty.cs
- Logging.cs
- ReaderContextStackData.cs
- PointAnimationClockResource.cs
- SelfIssuedAuthRSAPKCS1SignatureDeformatter.cs
- XmlReflectionImporter.cs
- DataListCommandEventArgs.cs
- PropertyMapper.cs
- HttpListenerPrefixCollection.cs
- SemanticResolver.cs
- Label.cs
- SqlTypesSchemaImporter.cs
- TextElementCollectionHelper.cs
- AppDomainResourcePerfCounters.cs
- RuntimeConfig.cs
- SuppressMessageAttribute.cs
- Rect.cs
- XXXInfos.cs
- OleDbConnectionPoolGroupProviderInfo.cs
- PartManifestEntry.cs
- XmlSchemaInclude.cs
- ResolveInfo.cs
- mediaclock.cs
- FileDialogPermission.cs
- AssemblyFilter.cs
- EventLogHandle.cs
- StringFunctions.cs
- ConfigXmlCDataSection.cs
- VectorConverter.cs
- DataGridTextBoxColumn.cs
- RuntimeResourceSet.cs
- XmlWriter.cs
- WmfPlaceableFileHeader.cs
- ILGenerator.cs
- DataViewListener.cs
- TransformPatternIdentifiers.cs
- SpeakInfo.cs
- DataGridColumnCollection.cs
- WeakEventManager.cs
- IntegerValidator.cs
- Number.cs
- DbTypeMap.cs
- OleDbStruct.cs
- X509ThumbprintKeyIdentifierClause.cs
- LogEntryUtils.cs
- CounterSampleCalculator.cs
- StringSorter.cs
- TransformPattern.cs
- HelloMessage11.cs
- ProcessModelSection.cs
- ConfigurationSectionCollection.cs
- HMACRIPEMD160.cs
- IsolatedStorage.cs
- CalendarDayButton.cs
- InputScope.cs
- ExceptionHelpers.cs
- ResourceReferenceExpression.cs
- XmlSchemaAny.cs
- Stroke2.cs
- DataListItemEventArgs.cs
- WebPartMenu.cs
- ParentUndoUnit.cs
- SessionEndedEventArgs.cs
- WebBaseEventKeyComparer.cs
- MetadataItemEmitter.cs
- EventMappingSettings.cs
- AutoGeneratedFieldProperties.cs
- SessionStateContainer.cs
- ADMembershipProvider.cs
- RenderContext.cs
- streamingZipPartStream.cs
- SafeFileMappingHandle.cs
- HttpRequest.cs
- MetadataCache.cs
- Size.cs
- SqlConnectionManager.cs
- PointF.cs
- PrimitiveXmlSerializers.cs
- XmlChildEnumerator.cs
- UInt64.cs
- WorkItem.cs
- RawTextInputReport.cs
- CodeSubDirectoriesCollection.cs
- InkCollectionBehavior.cs
- AnimationClockResource.cs
- TextParagraphCache.cs
- SqlProvider.cs
- SecurityException.cs
- SqlNodeTypeOperators.cs
- SchemaNames.cs
- GregorianCalendarHelper.cs
- ByteConverter.cs
- dbdatarecord.cs
- SqlProfileProvider.cs
- CreateInstanceBinder.cs
- Directory.cs
- NeutralResourcesLanguageAttribute.cs
- DataGridViewCellFormattingEventArgs.cs
- DbConnectionPoolGroup.cs