Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MsmqActivation.cs / 1 / MsmqActivation.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System.ServiceModel.Activation.Diagnostics; using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceProcess; using System.Threading; using System.Messaging; using System.ServiceModel.Diagnostics; using MQMessageQueue = System.Messaging.MessageQueue; using MQMessage = System.Messaging.Message; using MQException = System.Messaging.MessageQueueException; class MsmqActivation : ServiceBase { BindingsManager bindings; ActivationService integrationActivationService; ListenerAdapter integrationListenerAdapter; ActivationService transportActivationService; ListenerAdapter transportListenerAdapter; public MsmqActivation() { ServiceName = ListenerConstants.MsmqActivationServiceName; CanHandlePowerEvent = false; AutoLog = false; CanStop = true; CanPauseAndContinue = true; CanShutdown = true; this.bindings = new BindingsManager(); this.integrationActivationService = new ActivationService(this, MsmqUri.FormatNameAddressTranslator.Scheme); this.transportActivationService = new ActivationService(this, MsmqUri.NetMsmqAddressTranslator.Scheme); } protected override void OnStart(string[] args) { try { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStart, this); } #if DEBUG if (DebuggableService.DelayStart(ServiceName)) { (new Thread(new ThreadStart(Start))).Start(); return; } #endif Start(); } catch (Exception exception) { // Log the error to eventlog. ListenerTraceUtility.EventLog.LogEvent(TraceEventType.Error, EventLogCategory.ListenerAdapter, EventLogEventId.ServiceStartFailed, false, exception.ToString()); throw; } } protected override void OnStop() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceStop, this); } Shutdown(); } protected override void OnContinue() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceContinue, this); } this.integrationActivationService.Paused = false; this.transportActivationService.Paused = false; } protected override void OnPause() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServicePause, this); } this.integrationActivationService.Paused = true; this.transportActivationService.Paused = true; } protected override void OnShutdown() { if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.ServiceShutdown, this); } Shutdown(); Stop(); } void Start() { #if DEBUG DebuggableService.WaitForDebugger(ServiceName); #endif if (!SMSvcHost.IsWebhostSupported) { const int ERROR_NOT_SUPPORTED = 50; this.ExitCode = ERROR_NOT_SUPPORTED; throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ServiceRequiresWas))); } this.integrationListenerAdapter = new ListenerAdapter(this.integrationActivationService); this.transportListenerAdapter = new ListenerAdapter(this.transportActivationService); this.integrationListenerAdapter.Open(); this.transportListenerAdapter.Open(); } void Shutdown() { this.integrationListenerAdapter.Close(); this.transportListenerAdapter.Close(); } class BindingsManager { DictionarybindingMonitors; object thisLock = new object(); public BindingsManager() { this.bindingMonitors = new Dictionary (StringComparer.OrdinalIgnoreCase); } public void RegisterBindingFilterIfNecessary(string host, MsmqBindingFilter filter) { lock (this.thisLock) { MsmqBindingMonitor bindingMonitor; if (!this.bindingMonitors.TryGetValue(host, out bindingMonitor)) { bindingMonitor = new MsmqBindingMonitor(host); this.bindingMonitors.Add(host, bindingMonitor); bindingMonitor.Open(); } // register the new filter if it doesn't already exist: if (!bindingMonitor.ContainsFilter(filter)) { bindingMonitor.AddFilter(filter); } } } public void UnregisterBindingFilter(MsmqBindingFilter filter) { lock (this.thisLock) { foreach (MsmqBindingMonitor monitor in this.bindingMonitors.Values) { monitor.RemoveFilter(filter); } } } } class ActivationService : IActivationService { Dictionary groups; string protocol; BindingsManager bindings; object thisLock = new object(); ServiceBase service; bool paused; public ActivationService(MsmqActivation service, string protocol) { this.protocol = protocol; this.bindings = service.bindings; this.service = service; this.paused = false; this.groups = new Dictionary (); } public bool Paused { get { return this.paused; } set { lock(this) { if(this.paused != value) { this.paused = value; if(!this.paused) { foreach(QueueMonitorGroup group in this.groups.Values) { group.Start(); } } } } } } public BindingsManager Bindings { get { return this.bindings; } } public string ActivationServiceName { get { return this.service.ServiceName; } } public string ProtocolName { get { return this.protocol; } } public IActivatedMessageQueue CreateQueue(ListenerAdapter la, App app) { QueueMonitorGroup qmg = new QueueMonitorGroup(this, la, app); lock (this.thisLock) { this.groups[qmg.ListenerChannelContext.ListenerChannelId] = qmg; } return qmg; } public IActivatedMessageQueue FindQueue(int queueId) { lock (this.thisLock) { QueueMonitorGroup group; this.groups.TryGetValue(queueId, out group); return group; } } public void StopService() { this.service.Stop(); } public void QueueMonitorGroupClosed(QueueMonitorGroup qmg) { lock (this.thisLock) { this.groups.Remove(qmg.ListenerChannelContext.ListenerChannelId); } } } class QueueMonitorGroup : IActivatedMessageQueue { static int queueIdCounter = 0; static readonly TimeSpan RetryMonitorInterval = TimeSpan.FromMinutes(5); ActivationService activationService; App app; ActivationBindingFilter filter; ListenerAdapter listenerAdapter; int startQueueInstanceCount; ListenerChannelContext listenerChannelContext; List monitors = new List (); List failedMonitors = new List (); bool enabled; int pendingNotificationCount; IOThreadTimer retryTimer; bool retryScheduled = false; bool hasStartedQueueInstances; public QueueMonitorGroup(ActivationService activationService, ListenerAdapter la, App app) { this.activationService = activationService; this.listenerAdapter = la; this.app = app; this.startQueueInstanceCount = 1; this.listenerChannelContext = new ListenerChannelContext(app.AppKey, Interlocked.Increment(ref queueIdCounter), Guid.Empty); this.pendingNotificationCount = 0; this.filter = new ActivationBindingFilter(this, app.Path); this.retryTimer = new IOThreadTimer(OnRetryTimer, null, false); } public bool CanDispatch { get { return this.enabled && !this.activationService.Paused; } } public App App { get { return this.app; } } public ListenerChannelContext ListenerChannelContext { get { return this.listenerChannelContext; } } bool IActivatedMessageQueue.HasStartedQueueInstances { get { return this.hasStartedQueueInstances; } } void IActivatedMessageQueue.OnQueueInstancesStopped() { this.hasStartedQueueInstances = false; } public void Delete() { this.activationService.QueueMonitorGroupClosed(this); UnregisterAll(); } public void LaunchQueueInstance() { bool startInstance = false; lock (this) { if (this.pendingNotificationCount > 0) { this.pendingNotificationCount--; startInstance = true; } else { // start monitoring for new messages... startQueueInstanceCount++; // Make sure that everyone is peeking: foreach (QueueMonitor monitor in this.monitors) { monitor.Start(); } } } if (startInstance) { if (this.listenerAdapter.OpenListenerChannelInstance(this)) { this.hasStartedQueueInstances = true; } } } public ListenerExceptionStatus Register(BaseUriWithWildcard url) { this.activationService.Bindings.RegisterBindingFilterIfNecessary(url.BaseAddress.Host, this.filter); return ListenerExceptionStatus.Success; } public void Start() { lock(this) { if(this.CanDispatch) { // Ensure that we're started... foreach(QueueMonitor monitor in this.monitors) { monitor.Start(); } } } } public void SetEnabledState(bool enabled) { lock(this) { if (this.enabled != enabled) { this.enabled = enabled; Start(); } } } public void UnregisterAll() { lock (this) { foreach (QueueMonitor monitor in this.monitors) { monitor.Dispose(); } this.monitors.Clear(); this.activationService.Bindings.UnregisterBindingFilter(this.filter); } } public bool NotifyMessageAvailable() { bool startInstance = false; bool shouldContinue = false; lock (this) { if (!this.CanDispatch) { this.pendingNotificationCount++; } else if (this.startQueueInstanceCount == 0) { this.pendingNotificationCount++; } else { this.startQueueInstanceCount--; startInstance = true; shouldContinue = this.startQueueInstanceCount > 0; } } if (startInstance) { MsmqDiagnostics.StartingApplication(this.app.Path); this.listenerAdapter.OpenListenerChannelInstance(this); this.hasStartedQueueInstances = true; } return shouldContinue; } public void ScheduleRetry(QueueMonitor monitor) { lock (this) { this.failedMonitors.Add(monitor); if (!this.retryScheduled) { this.retryTimer.Set(RetryMonitorInterval); this.retryScheduled = true; } } } object AddQueueToGroup(Uri queue) { QueueMonitor monitor = null; lock (this) { monitor = new QueueMonitor(queue, this); this.monitors.Add(monitor); if (this.enabled) { monitor.Start(); } } return monitor; } void OnRetryTimer(object state) { lock (this) { if (this.enabled) { foreach (QueueMonitor monitor in this.failedMonitors) { // Only start it if we still own it... if (this.monitors.Contains(monitor)) { monitor.Start(); } } } this.failedMonitors.Clear(); } } void RemoveQueueFromGroup(object state) { QueueMonitor monitor = (QueueMonitor)state; lock (this) { this.monitors.Remove(monitor); monitor.Dispose(); } } // Note that we inherit from the transport binding filter here - that's not // a big deal, because we never need these uris to create services. class ActivationBindingFilter : MsmqBindingFilter { QueueMonitorGroup group; public ActivationBindingFilter(QueueMonitorGroup group, string path) : base(path, MsmqUri.NetMsmqAddressTranslator) { this.group = group; } public override object MatchFound(string host, string name, bool isPrivate) { MsmqDiagnostics.MatchedApplicationFound(host, name, isPrivate, this.CanonicalPrefix); return this.group.AddQueueToGroup(CreateServiceUri(host, name, isPrivate)); } public override void MatchLost(string host, string name, bool isPrivate, object callbackState) { this.group.RemoveQueueFromGroup(callbackState); } } } class QueueMonitor : IDisposable { static readonly TimeSpan InfiniteTimeout = TimeSpan.FromMilliseconds(UInt32.MaxValue); bool disposed; QueueMonitorGroup group; bool peeking; string queueName; MQMessageQueue queue; public QueueMonitor(Uri uri, QueueMonitorGroup group) { // The defaults don't really matter here - we don't use // the buffer manager. this.group = group; this.queueName = MsmqFormatName.ToSystemMessagingQueueName(MsmqUri.UriToFormatNameByScheme(uri)); this.peeking = false; Debug.Print("opening queue: " + this.queueName); } public void Start() { lock (this) { try { if (this.queue == null) { this.queue = new MQMessageQueue(this.queueName); this.queue.MessageReadPropertyFilter.ClearAll(); this.queue.MessageReadPropertyFilter.LookupId = true; } if (!this.peeking) { this.peeking = true; this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } } catch (MQException) { this.group.ScheduleRetry(this); } } } public void Dispose() { lock (this) { this.disposed = true; if(this.queue != null) { this.queue.Dispose(); } } } void OnPeekCompleted(IAsyncResult result) { bool shouldContinue = true; try { MQMessage message = this.queue.EndPeek(result); Debug.Print("MsmqActivation.QueueMonitor.OnPeekCompleted: message available"); shouldContinue = this.group.NotifyMessageAvailable(); } catch (MQException ex) { MsmqDiagnostics.CannotPeekOnQueue(this.queue.FormatName, ex); this.group.ScheduleRetry(this); return; } catch (Exception ex) { if (DiagnosticUtility.ShouldTraceError) { DiagnosticUtility.ExceptionUtility.TraceHandledException(ex, TraceEventType.Error); } if (!DiagnosticUtility.IsFatal(ex)) { this.group.ScheduleRetry(this); } throw; } lock (this) { if (!this.disposed && shouldContinue) { this.queue.BeginPeek(InfiniteTimeout, null, DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnPeekCompleted))); } else { this.peeking = false; } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- XmlConvert.cs
- EventLogPermissionEntryCollection.cs
- XsltException.cs
- SchemaNamespaceManager.cs
- MouseEventArgs.cs
- XmlKeywords.cs
- Trace.cs
- ReceiveContent.cs
- TraceUtils.cs
- XPathAncestorIterator.cs
- BulletedList.cs
- Ops.cs
- BrowserCapabilitiesCodeGenerator.cs
- Splitter.cs
- DocumentGrid.cs
- FixedTextView.cs
- SafeNativeMethodsOther.cs
- DataColumnMappingCollection.cs
- CoTaskMemHandle.cs
- DocumentApplicationJournalEntryEventArgs.cs
- FontResourceCache.cs
- StateItem.cs
- CompilationLock.cs
- GenericNameHandler.cs
- XmlSchemaInclude.cs
- ImageEditor.cs
- CollectionViewSource.cs
- NodeFunctions.cs
- HorizontalAlignConverter.cs
- ValueQuery.cs
- XPathCompileException.cs
- NativeMethods.cs
- DefaultSettingsSection.cs
- WorkflowApplicationIdleEventArgs.cs
- PowerModeChangedEventArgs.cs
- Point4DValueSerializer.cs
- ToolboxItem.cs
- SatelliteContractVersionAttribute.cs
- HtmlInputReset.cs
- EventProxy.cs
- Timeline.cs
- WindowsFormsHelpers.cs
- XPathDocumentIterator.cs
- HostSecurityManager.cs
- CapabilitiesPattern.cs
- SafeNativeMethods.cs
- EmptyQuery.cs
- BeginEvent.cs
- EntityStoreSchemaGenerator.cs
- IPHostEntry.cs
- CodeGenerator.cs
- DbProviderManifest.cs
- DesignerProperties.cs
- ResourceCategoryAttribute.cs
- ProjectionPruner.cs
- HttpModuleActionCollection.cs
- CacheDependency.cs
- WSSecurityJan2004.cs
- ObjectDataSourceMethodEventArgs.cs
- ResXFileRef.cs
- CollectionView.cs
- PersonalizationStateInfo.cs
- ReferenceEqualityComparer.cs
- NetworkAddressChange.cs
- BindingBase.cs
- DispatcherHookEventArgs.cs
- DeleteBookmarkScope.cs
- TreeSet.cs
- KeyGesture.cs
- X509SubjectKeyIdentifierClause.cs
- SchemaElement.cs
- ECDiffieHellman.cs
- DataTableReaderListener.cs
- TemplateParser.cs
- DataGridViewRowDividerDoubleClickEventArgs.cs
- EventSourceCreationData.cs
- PopupEventArgs.cs
- BinHexEncoding.cs
- TableStyle.cs
- HtmlMeta.cs
- ListViewUpdatedEventArgs.cs
- ViewEventArgs.cs
- CachedTypeface.cs
- SkewTransform.cs
- DataGridViewCellStyleBuilderDialog.cs
- SqlDataSource.cs
- StringArrayConverter.cs
- DesignerActionList.cs
- AutomationIdentifier.cs
- SiteMapPath.cs
- ValueType.cs
- DataGridrowEditEndingEventArgs.cs
- EDesignUtil.cs
- TypeLoadException.cs
- LayoutExceptionEventArgs.cs
- DecimalStorage.cs
- VersionedStreamOwner.cs
- BitmapEffectGeneralTransform.cs
- PublisherMembershipCondition.cs
- XPathDocument.cs