Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MessageQueue.cs / 1 / MessageQueue.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System; using System.Diagnostics; using System.Threading; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceModel.Diagnostics; using System.ServiceModel.Activation.Diagnostics; class MessageQueue { static WaitCallback dispatchToNewWorkerCallback = new WaitCallback(DispatchToNewWorkerCallback); static WaitCallback dispatchSessionCallback = new WaitCallback(DispatchSessionCallback); static Dictionaryregistry = new Dictionary (); static List instances = new List (); AsyncCallback dispatchSessionCompletedCallback; List paths; // we use a queue of session-messages for dispatching // we use it to park messages that can't be dispatched and need to be pended // we use a queue of WorkerProcess instances to find free ones that can be dispatched to Queue sessionMessages; Queue sessionWorkers; int maxQueueSize; TransportType transportType; // each MessageQueue has a list of WorkerProcess instances. // each WorkerProcess is associated to a single MessageQueue. // Self-Hosted: 1 WorkerProcess in the list at all times, always the same WorkerProcess (unless we DCR it). 1st WorkerProcess creates the MessageQueue, last WorkerProcess deletes the MessageQueue. // Web-Hosted: 0-n WorkerProcess in the list. MessageQueue created/delete by WAS explicitly or implicitly by WAS going away. List workers; internal MessageQueue() { transportType = TransportType.Unsupported; paths = new List (); workers = new List (); sessionWorkers = new Queue (); sessionMessages = new Queue (); dispatchSessionCompletedCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(DispatchSessionCompletedCallback)); lock (instances) { instances.Add(this); } } #if DEBUG internal List SnapshotWorkers() { lock (this.workers) { return new List (workers); } } #endif internal virtual bool CanDispatch { get { return TransportType != TransportType.Tcp || !SMSvcHost.IsTcpPortSharingPaused; } } internal TransportType TransportType { get { return transportType; } } object SessionLock { get { return sessionWorkers; } } internal static void CloseAll(TransportType transportType) { MessageQueue[] instancesCopy; lock (instances) { instancesCopy = instances.ToArray(); instances.Clear(); } foreach (MessageQueue messageQueue in instancesCopy) { if (messageQueue.TransportType == transportType) { messageQueue.CloseCore(); } } } protected int PendingCount { get { lock (SessionLock) { return sessionMessages.Count; } } } protected void Close() { Debug.Print("MessageQueue.Close()"); // this is only called when all the workers are done // with I/O (they could be in the process of closing) lock (instances) { instances.Remove(this); } CloseCore(); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } protected void DropPendingMessages(bool sendFault) { lock (SessionLock) { foreach (ListenerSessionConnection sessionMessage in sessionMessages.ToArray()) { if (sessionMessage != null) { if (sendFault) { TransportListener.SendFault(sessionMessage.Connection, FramingEncodingString.EndpointUnavailableFault); } else { sessionMessage.Connection.Abort(); } } } sessionMessages.Clear(); } } void CloseCore() { Debug.Print("MessageQueue.CloseCore()"); UnregisterAll(); DropPendingMessages(false); lock (registry) { foreach (WorkerProcess worker in workers.ToArray()) { worker.Close(); } workers.Clear(); } if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } internal void EnqueueSessionAndDispatch(ListenerSessionConnection session) { lock (SessionLock) { if (!CanDispatch) { TransportListener.SendFault(session.Connection, FramingEncodingString.EndpointUnavailableFault); OnDispatchFailure(transportType); return; } else if (sessionMessages.Count >= maxQueueSize) { // Abort the connection when the queue is full. session.Connection.Abort(); OnDispatchFailure(transportType); return; } else { sessionMessages.Enqueue(session); } } OnSessionEnqueued(); DispatchSession(); } void EnqueueWorkerAndDispatch(WorkerProcess worker, bool canDispatchOnThisThread) { lock (SessionLock) { sessionWorkers.Enqueue(worker); } if (canDispatchOnThisThread) { DispatchSession(); } else { IOThreadScheduler.ScheduleCallback(dispatchSessionCallback, this); } } static void DispatchSessionCallback(object state) { MessageQueue thisPtr = (MessageQueue)state; thisPtr.DispatchSession(); } void DispatchSession() { for (; ; ) { ListenerSessionConnection session = null; lock (SessionLock) { if (sessionMessages.Count > 0) { WorkerProcess worker = null; while (sessionWorkers.Count > 0) { worker = sessionWorkers.Dequeue(); if (worker.IsRegistered) { break; } worker = null; } if (worker == null) { // There is no more active worker. So break the loop. break; } // For better performance, we may want to check whether the message has been timed out in the future. session = sessionMessages.Dequeue(); session.WorkerProcess = worker; } } if (session == null) { // There is mo more message left. So break the loop. break; } StartDispatchSession(session); } } void StartDispatchSession(ListenerSessionConnection session) { IAsyncResult dispatchAsyncResult = null; try { dispatchAsyncResult = session.WorkerProcess.BeginDispatchSession(session, dispatchSessionCompletedCallback, session); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceWarning) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Warning); } if (session.WorkerProcess.IsRegistered) { // Add the worker back to the queue. EnqueueWorkerAndDispatch(session.WorkerProcess, false); } } if (dispatchAsyncResult != null && dispatchAsyncResult.CompletedSynchronously) { CompleteDispatchSession(dispatchAsyncResult); } } void DispatchSessionCompletedCallback(IAsyncResult result) { if (result.CompletedSynchronously) { return; } CompleteDispatchSession(result); } void CompleteDispatchSession(IAsyncResult result) { ListenerSessionConnection session = (ListenerSessionConnection)result.AsyncState; DiagnosticUtility.DebugAssert(session.WorkerProcess != null, "The WorkerProcess should be set on the message."); if (!session.WorkerProcess.EndDispatchSession(result)) { OnConnectionDispatchFailed(session.Connection); } EnqueueWorkerAndDispatch(session.WorkerProcess, !result.CompletedSynchronously); } protected virtual bool CanShare { get { return false; } } internal static void OnDispatchFailure(TransportType transportType) { if (transportType == TransportType.Tcp) { ListenerPerfCounters.IncrementDispatchFailuresTcp(); } else if (transportType == TransportType.NamedPipe) { ListenerPerfCounters.IncrementDispatchFailuresNamedPipe(); } } bool OnConnectionDispatchFailed(IConnection connection) { TransportListener.SendFault(connection, FramingEncodingString.ConnectionDispatchFailedFault); return false; } protected void OnNewWorkerAvailable(WorkerProcess worker) { lock (this.workers) { worker.Queue = this; workers.Add(worker); // offload draining the IO queues to this new worker on a different thread IOThreadScheduler.ScheduleCallback(dispatchToNewWorkerCallback, worker); } } static void DispatchToNewWorkerCallback(object state) { WorkerProcess worker = state as WorkerProcess; worker.Queue.EnqueueWorkerAndDispatch(worker, true); } public ListenerExceptionStatus Register(BaseUriWithWildcard path) { if (path.BaseAddress.Scheme == Uri.UriSchemeNetTcp) { if (transportType == TransportType.NamedPipe) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetTcp.MaxPendingConnections; transportType = TransportType.Tcp; } else if (path.BaseAddress.Scheme == Uri.UriSchemeNetPipe) { if (transportType == TransportType.Tcp) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetPipe.MaxPendingConnections; transportType = TransportType.NamedPipe; } else { return ListenerExceptionStatus.ProtocolUnsupported; } ListenerExceptionStatus status = RoutingTable.Start(this, path); if (status == ListenerExceptionStatus.Success) { paths.Add(path); IncrementUrisRegisteredCounters(); OnRegisterCompleted(); } return status; } internal static ListenerExceptionStatus Register(BaseUriWithWildcard path, WorkerProcess worker) { MessageQueue queue = null; lock (registry) { if (registry.TryGetValue(path, out queue)) { if (!queue.CanShare) { return ListenerExceptionStatus.ConflictingRegistration; } } else { queue = new MessageQueue(); ListenerExceptionStatus status = ListenerExceptionStatus.FailedToListen; try { status = queue.Register(path); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceError) { ListenerTraceUtility.TraceEvent(TraceEventType.Error, TraceCode.RoutingTableCannotListen, new StringTraceRecord("Path", path.ToString()), null, exception); } } if (status != ListenerExceptionStatus.Success) { // not setting the worker.queue is not a problem, since we can't use this WorkerProcess return status; } registry.Add(path, queue); } } queue.OnNewWorkerAvailable(worker); return ListenerExceptionStatus.Success; } protected virtual void OnSessionEnqueued() {} public void UnregisterAll() { while (paths.Count > 0) { Unregister(paths[0]); } } void Unregister(BaseUriWithWildcard path) { DiagnosticUtility.DebugAssert(paths.Contains(path), "Unregister: unregistering an unregistered path"); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueUnregisterSucceeded, new System.ServiceModel.Diagnostics.StringTraceRecord("Path", path.ToString()), this, null); } RoutingTable.Stop(this, path); IncrementUrisUnregisteredCounters(); OnUnregisterCompleted(); registry.Remove(path); paths.Remove(path); } protected virtual void OnUnregisterLastWorker() { Debug.Print("MessageQueue.OnUnregisterLastWorker() calling Close()"); Close(); } internal virtual void Unregister(WorkerProcess worker) { Debug.Print("MessageQueue.Unregister() worker: " + worker.ProcessId); lock (registry) { DiagnosticUtility.DebugAssert(object.Equals(this, worker.Queue), "MessageQueue.Unregister() cannot unregister a worker registered with a queue different than this."); workers.Remove(worker); Debug.Print("MessageQueue.Unregister() left with workers: " + workers.Count); if (workers.Count == 0) { OnUnregisterLastWorker(); } } } protected virtual void OnRegisterCompleted() { IncrementRegistrationsActiveCounters(); } protected virtual void OnUnregisterCompleted() { DecrementRegistrationsActiveCounters(); } protected void IncrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.IncrementRegistrationsActiveNamedPipe(); } } protected void DecrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.DecrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.DecrementRegistrationsActiveNamedPipe(); } } void IncrementUrisUnregisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisUnregisteredTcp(); } else { ListenerPerfCounters.IncrementUrisUnregisteredNamedPipe(); } } void IncrementUrisRegisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisRegisteredTcp(); } else { ListenerPerfCounters.IncrementUrisRegisteredNamedPipe(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- _FtpControlStream.cs
- RootProfilePropertySettingsCollection.cs
- BamlMapTable.cs
- CheckBoxField.cs
- HttpCookieCollection.cs
- CultureTable.cs
- BevelBitmapEffect.cs
- GroupJoinQueryOperator.cs
- PriorityQueue.cs
- DesignRelationCollection.cs
- PeerResolverElement.cs
- SqlDataSourceCommandEventArgs.cs
- XsltSettings.cs
- AdornerDecorator.cs
- SemaphoreSecurity.cs
- Permission.cs
- BufferedGraphicsManager.cs
- TextEditorSpelling.cs
- EndpointPerformanceCounters.cs
- ContextMenuStripGroupCollection.cs
- RuntimeConfig.cs
- AssemblyName.cs
- _RegBlobWebProxyDataBuilder.cs
- __ConsoleStream.cs
- DataObjectMethodAttribute.cs
- ConfigXmlComment.cs
- QilXmlWriter.cs
- Attribute.cs
- UdpDiscoveryEndpoint.cs
- XsdValidatingReader.cs
- ToolStripItemImageRenderEventArgs.cs
- DbConnectionOptions.cs
- StateInitialization.cs
- IList.cs
- CallbackTimeoutsBehavior.cs
- GlyphsSerializer.cs
- SymbolUsageManager.cs
- SecurityElementBase.cs
- InvalidTimeZoneException.cs
- JournalEntryListConverter.cs
- AuthenticationSection.cs
- MgmtConfigurationRecord.cs
- HtmlInputSubmit.cs
- InternalRelationshipCollection.cs
- JsonReaderDelegator.cs
- PropertyGroupDescription.cs
- AttributeTable.cs
- DeviceContext2.cs
- OleDbCommandBuilder.cs
- XmlNodeChangedEventManager.cs
- EntryWrittenEventArgs.cs
- PrtCap_Reader.cs
- SqlCacheDependencyDatabaseCollection.cs
- SerialPort.cs
- MaskedTextBox.cs
- ControlParameter.cs
- WebCategoryAttribute.cs
- EffectiveValueEntry.cs
- UserCancellationException.cs
- ServiceDebugBehavior.cs
- ObjectListTitleAttribute.cs
- GridErrorDlg.cs
- StateMachineAction.cs
- CompilerTypeWithParams.cs
- EntityDataSourceChangingEventArgs.cs
- SBCSCodePageEncoding.cs
- Soap.cs
- CodeDomConfigurationHandler.cs
- DefaultTextStoreTextComposition.cs
- GPPOINT.cs
- NavigationProperty.cs
- XamlDesignerSerializationManager.cs
- XmlSchemaObjectTable.cs
- ContentOperations.cs
- PresentationTraceSources.cs
- QuadraticBezierSegment.cs
- MultilineStringEditor.cs
- UnsafeNativeMethodsTablet.cs
- IsolatedStorage.cs
- EncoderParameters.cs
- externdll.cs
- PassportPrincipal.cs
- BlurBitmapEffect.cs
- HtmlFormParameterWriter.cs
- ProtocolsConfiguration.cs
- HttpRuntimeSection.cs
- XmlSchemaAttributeGroupRef.cs
- TableLayout.cs
- MultiPropertyDescriptorGridEntry.cs
- RootBuilder.cs
- PolyBezierSegment.cs
- BasicViewGenerator.cs
- FastEncoder.cs
- AssemblyInfo.cs
- ScriptRegistrationManager.cs
- StylusButton.cs
- CharacterMetricsDictionary.cs
- OutputCacheProviderCollection.cs
- JpegBitmapDecoder.cs
- MissingFieldException.cs