Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MsmqActivation.cs / 1 / MsmqActivation.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System.ServiceModel.Activation.Diagnostics; using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceProcess; using System.Threading; using System.Messaging; using System.ServiceModel.Diagnostics; using MQMessageQueue = System.Messaging.MessageQueue; using MQMessage = System.Messaging.Message; using MQException = System.Messaging.MessageQueueException; class MsmqActivation : ServiceBase { BindingsManager bindings; ActivationService integrationActivationService; ListenerAdapter integrationListenerAdapter; ActivationService transportActivationService; ListenerAdapter transportListenerAdapter; public MsmqActivation() { ServiceName = ListenerConstants.MsmqActivationServiceName; CanHandlePowerEvent = false; AutoLog = false; CanStop = true; CanPauseAndContinue = true; CanShutdown = true; this.bindings = new BindingsManager(); this.integrationActivationService = new ActivationService(this, MsmqUri.FormatNameAddressTranslator.Scheme); this.transportActivationService = new ActivationService(this, MsmqUri.NetMsmqAddressTranslator.Scheme); } protected override void OnStart(string[] args) { try { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStart, this); } #if DEBUG if (DebuggableService.DelayStart(ServiceName)) { (new Thread(new ThreadStart(Start))).Start(); return; } #endif Start(); } catch (Exception exception) { // Log the error to eventlog. ListenerTraceUtility.EventLog.LogEvent(TraceEventType.Error, EventLogCategory.ListenerAdapter, EventLogEventId.ServiceStartFailed, false, exception.ToString()); throw; } } protected override void OnStop() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStop, this); } Shutdown(); } protected override void OnContinue() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceContinue, this); } this.integrationActivationService.Paused = false; this.transportActivationService.Paused = false; } protected override void OnPause() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServicePause, this); } this.integrationActivationService.Paused = true; this.transportActivationService.Paused = true; } protected override void OnShutdown() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceShutdown, this); } Shutdown(); Stop(); } void Start() { #if DEBUG DebuggableService.WaitForDebugger(ServiceName); #endif if (!SMSvcHost.IsWebhostSupported) { const int ERROR_NOT_SUPPORTED = 50; this.ExitCode = ERROR_NOT_SUPPORTED; throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ServiceRequiresWas))); } this.integrationListenerAdapter = new ListenerAdapter(this.integrationActivationService); this.transportListenerAdapter = new ListenerAdapter(this.transportActivationService); this.integrationListenerAdapter.Open(); this.transportListenerAdapter.Open(); } void Shutdown() { this.integrationListenerAdapter.Close(); this.transportListenerAdapter.Close(); } class BindingsManager { DictionarybindingMonitors; object thisLock = new object(); public BindingsManager() { this.bindingMonitors = new Dictionary (StringComparer.OrdinalIgnoreCase); } public void RegisterBindingFilterIfNecessary(string host, MsmqBindingFilter filter) { lock (this.thisLock) { MsmqBindingMonitor bindingMonitor; if (!this.bindingMonitors.TryGetValue(host, out bindingMonitor)) { bindingMonitor = new MsmqBindingMonitor(host); this.bindingMonitors.Add(host, bindingMonitor); bindingMonitor.Open(); } // register the new filter if it doesn't already exist: if (!bindingMonitor.ContainsFilter(filter)) { bindingMonitor.AddFilter(filter); } } } public void UnregisterBindingFilter(MsmqBindingFilter filter) { lock (this.thisLock) { foreach (MsmqBindingMonitor monitor in this.bindingMonitors.Values) { monitor.RemoveFilter(filter); } } } } class ActivationService : IActivationService { Dictionary groups; string protocol; BindingsManager bindings; object thisLock = new object(); ServiceBase service; bool paused; public ActivationService(MsmqActivation service, string protocol) { this.protocol = protocol; this.bindings = service.bindings; this.service = service; this.paused = false; this.groups = new Dictionary (); } public bool Paused { get { return this.paused; } set { lock(this) { if(this.paused != value) { this.paused = value; if(!this.paused) { foreach(QueueMonitorGroup group in this.groups.Values) { group.Start(); } } } } } } public BindingsManager Bindings { get { return this.bindings; } } public string ActivationServiceName { get { return this.service.ServiceName; } } public string ProtocolName { get { return this.protocol; } } public IActivatedMessageQueue CreateQueue(ListenerAdapter la, App app) { QueueMonitorGroup qmg = new QueueMonitorGroup(this, la, app); lock (this.thisLock) { this.groups[qmg.ListenerChannelContext.ListenerChannelId] = qmg; } return qmg; } public IActivatedMessageQueue FindQueue(int queueId) { lock (this.thisLock) { QueueMonitorGroup group; this.groups.TryGetValue(queueId, out group); return group; } } public void StopService() { this.service.Stop(); } public void QueueMonitorGroupClosed(QueueMonitorGroup qmg) { lock (this.thisLock) { this.groups.Remove(qmg.ListenerChannelContext.ListenerChannelId); } } } class QueueMonitorGroup : IActivatedMessageQueue { static int queueIdCounter = 0; static readonly TimeSpan RetryMonitorInterval = TimeSpan.FromMinutes(5); ActivationService activationService; App app; ActivationBindingFilter filter; ListenerAdapter listenerAdapter; int startQueueInstanceCount; ListenerChannelContext listenerChannelContext; List monitors = new List (); List failedMonitors = new List (); bool enabled; int pendingNotificationCount; IOThreadTimer retryTimer; bool retryScheduled = false; bool hasStartedQueueInstances; public QueueMonitorGroup(ActivationService activationService, ListenerAdapter la, App app) { this.activationService = activationService; this.listenerAdapter = la; this.app = app; this.startQueueInstanceCount = 1; this.listenerChannelContext = new ListenerChannelContext(app.AppKey, Interlocked.Increment(ref queueIdCounter), Guid.Empty); this.pendingNotificationCount = 0; this.filter = new ActivationBindingFilter(this, app.Path); this.retryTimer = new IOThreadTimer(OnRetryTimer, null, false); } public bool CanDispatch { get { return this.enabled && !this.activationService.Paused; } } public App App { get { return this.app; } } public ListenerChannelContext ListenerChannelContext { get { return this.listenerChannelContext; } } bool IActivatedMessageQueue.HasStartedQueueInstances { get { return this.hasStartedQueueInstances; } } void IActivatedMessageQueue.OnQueueInstancesStopped() { this.hasStartedQueueInstances = false; } public void Delete() { this.activationService.QueueMonitorGroupClosed(this); UnregisterAll(); } public void LaunchQueueInstance() { bool startInstance = false; lock (this) { if (this.pendingNotificationCount > 0) { this.pendingNotificationCount--; startInstance = true; } else { // start monitoring for new messages... startQueueInstanceCount++; // Make sure that everyone is peeking: foreach (QueueMonitor monitor in this.monitors) { monitor.Start(); } } } if (startInstance) { if (this.listenerAdapter.OpenListenerChannelInstance(this)) { this.hasStartedQueueInstances = true; } } } public ListenerExceptionStatus Register(BaseUriWithWildcard url) { this.activationService.Bindings.RegisterBindingFilterIfNecessary(url.BaseAddress.Host, this.filter); return ListenerExceptionStatus.Success; } public void Start() { lock(this) { if(this.CanDispatch) { // Ensure that we're started... foreach(QueueMonitor monitor in this.monitors) { monitor.Start(); } } } } public void SetEnabledState(bool enabled) { lock(this) { if (this.enabled != enabled) { this.enabled = enabled; Start(); } } } public void UnregisterAll() { lock (this) { foreach (QueueMonitor monitor in this.monitors) { monitor.Dispose(); } this.monitors.Clear(); this.activationService.Bindings.UnregisterBindingFilter(this.filter); } } public bool NotifyMessageAvailable() { bool startInstance = false; bool shouldContinue = false; lock (this) { if (!this.CanDispatch) { this.pendingNotificationCount++; } else if (this.startQueueInstanceCount == 0) { this.pendingNotificationCount++; } else { this.startQueueInstanceCount--; startInstance = true; shouldContinue = this.startQueueInstanceCount > 0; } } if (startInstance) { MsmqDiagnostics.StartingApplication(this.app.Path); this.listenerAdapter.OpenListenerChannelInstance(this); this.hasStartedQueueInstances = true; } return shouldContinue; } public void ScheduleRetry(QueueMonitor monitor) { lock (this) { this.failedMonitors.Add(monitor); if (!this.retryScheduled) { this.retryTimer.Set(RetryMonitorInterval); this.retryScheduled = true; } } } object AddQueueToGroup(Uri queue) { QueueMonitor monitor = null; lock (this) { monitor = new QueueMonitor(queue, this); this.monitors.Add(monitor); if (this.enabled) { monitor.Start(); } } return monitor; } void OnRetryTimer(object state) { lock (this) { if (this.enabled) { foreach (QueueMonitor monitor in this.failedMonitors) { // Only start it if we still own it... if (this.monitors.Contains(monitor)) { monitor.Start(); } } } this.failedMonitors.Clear(); } } void RemoveQueueFromGroup(object state) { QueueMonitor monitor = (QueueMonitor)state; lock (this) { this.monitors.Remove(monitor); monitor.Dispose(); } } // Note that we inherit from the transport binding filter here - that's not // a big deal, because we never need these uris to create services. class ActivationBindingFilter : MsmqBindingFilter { QueueMonitorGroup group; public ActivationBindingFilter(QueueMonitorGroup group, string path) : base(path, MsmqUri.NetMsmqAddressTranslator) { this.group = group; } public override object MatchFound(string host, string name, bool isPrivate) { MsmqDiagnostics.MatchedApplicationFound(host, name, isPrivate, this.CanonicalPrefix); return this.group.AddQueueToGroup(CreateServiceUri(host, name, isPrivate)); } public override void MatchLost(string host, string name, bool isPrivate, object callbackState) { this.group.RemoveQueueFromGroup(callbackState); } } } class QueueMonitor : IDisposable { static readonly TimeSpan InfiniteTimeout = TimeSpan.FromMilliseconds(UInt32.MaxValue); bool disposed; QueueMonitorGroup group; bool peeking; string queueName; MQMessageQueue queue; public QueueMonitor(Uri uri, QueueMonitorGroup group) { // The defaults don't really matter here - we don't use // the buffer manager. this.group = group; this.queueName = MsmqFormatName.ToSystemMessagingQueueName(MsmqUri.UriToFormatNameByScheme(uri)); this.peeking = false; Debug.Print("opening queue: " + this.queueName); } public void Start() { lock (this) { try { if (this.queue == null) { this.queue = new MQMessageQueue(this.queueName); this.queue.MessageReadPropertyFilter.ClearAll(); this.queue.MessageReadPropertyFilter.LookupId = true; } if (!this.peeking) { this.peeking = true; this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } } catch (MQException) { this.group.ScheduleRetry(this); } } } public void Dispose() { lock (this) { this.disposed = true; if(this.queue != null) { this.queue.Dispose(); } } } void OnPeekCompleted(IAsyncResult result) { bool shouldContinue = true; try { MQMessage message = this.queue.EndPeek(result); Debug.Print("MsmqActivation.QueueMonitor.OnPeekCompleted: message available"); shouldContinue = this.group.NotifyMessageAvailable(); } catch (MQException ex) { MsmqDiagnostics.CannotPeekOnQueue(this.queue.FormatName, ex); this.group.ScheduleRetry(this); return; } catch (Exception ex) { if (DiagnosticUtility.ShouldTraceError) { DiagnosticUtility.ExceptionUtility.TraceHandledException(ex, TraceEventType.Error); } if (!DiagnosticUtility.IsFatal(ex)) { this.group.ScheduleRetry(this); } throw; } lock (this) { if (!this.disposed && shouldContinue) { this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } else { this.peeking = false; } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- EntitySqlQueryState.cs
- ZeroOpNode.cs
- ItemsPanelTemplate.cs
- ColumnCollection.cs
- RegistrationServices.cs
- ObjectContextServiceProvider.cs
- SecurityUtils.cs
- DictionarySectionHandler.cs
- UrlRoutingModule.cs
- CheckBox.cs
- MultipleCopiesCollection.cs
- StylusPoint.cs
- CalloutQueueItem.cs
- AliasExpr.cs
- WorkerRequest.cs
- ComboBoxRenderer.cs
- LambdaReference.cs
- SHA384Managed.cs
- MarshalByRefObject.cs
- SrgsElementFactory.cs
- StandardToolWindows.cs
- OleDbMetaDataFactory.cs
- Transactions.cs
- ResourceExpressionEditor.cs
- ByeMessageApril2005.cs
- AuthenticationServiceManager.cs
- WindowsGraphicsCacheManager.cs
- ValidationErrorCollection.cs
- ListViewDeletedEventArgs.cs
- QilStrConcat.cs
- wmiprovider.cs
- AutoGeneratedField.cs
- BindingMAnagerBase.cs
- Validator.cs
- StorageRoot.cs
- UIntPtr.cs
- Mutex.cs
- AuthenticateEventArgs.cs
- ExpressionEditorAttribute.cs
- IntegerValidator.cs
- CultureTableRecord.cs
- SqlFormatter.cs
- TypeSystemProvider.cs
- EntityCommandDefinition.cs
- HyperLinkStyle.cs
- ToolStripSeparatorRenderEventArgs.cs
- SafeMemoryMappedViewHandle.cs
- SurrogateEncoder.cs
- ExpressionWriter.cs
- BulletChrome.cs
- SocketAddress.cs
- FaultCode.cs
- XPathNodeList.cs
- XmlEnumAttribute.cs
- CacheMode.cs
- TraceHandler.cs
- ThaiBuddhistCalendar.cs
- DataTableCollection.cs
- IndependentAnimationStorage.cs
- FixedSOMGroup.cs
- GregorianCalendarHelper.cs
- PersonalizablePropertyEntry.cs
- TextCompositionEventArgs.cs
- WinFormsComponentEditor.cs
- InternalPolicyElement.cs
- BitmapSource.cs
- InfoCardCryptoHelper.cs
- DictionaryEntry.cs
- ResetableIterator.cs
- Facet.cs
- TypeSystemHelpers.cs
- ListDictionaryInternal.cs
- SimpleWebHandlerParser.cs
- TextHidden.cs
- SslStream.cs
- StreamingContext.cs
- PenLineCapValidation.cs
- SplineKeyFrames.cs
- IriParsingElement.cs
- AggregateNode.cs
- ComplexObject.cs
- AutomationProperties.cs
- KeyEvent.cs
- SmtpDigestAuthenticationModule.cs
- SRef.cs
- DrawTreeNodeEventArgs.cs
- MeasurementDCInfo.cs
- WebControl.cs
- CqlIdentifiers.cs
- EntityStoreSchemaGenerator.cs
- SingleSelectRootGridEntry.cs
- ISAPIWorkerRequest.cs
- HtmlSelectionListAdapter.cs
- EdmComplexPropertyAttribute.cs
- WebBrowser.cs
- DecoderNLS.cs
- ClientRuntimeConfig.cs
- PhoneCall.cs
- DbBuffer.cs
- DocumentAutomationPeer.cs