Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MessageQueue.cs / 1 / MessageQueue.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activation { using System; using System.Diagnostics; using System.Threading; using System.Collections.Generic; using System.ServiceModel; using System.ServiceModel.Channels; using System.ServiceModel.Diagnostics; using System.ServiceModel.Activation.Diagnostics; class MessageQueue { static WaitCallback dispatchToNewWorkerCallback = new WaitCallback(DispatchToNewWorkerCallback); static WaitCallback dispatchSessionCallback = new WaitCallback(DispatchSessionCallback); static Dictionaryregistry = new Dictionary (); static List instances = new List (); AsyncCallback dispatchSessionCompletedCallback; List paths; // we use a queue of session-messages for dispatching // we use it to park messages that can't be dispatched and need to be pended // we use a queue of WorkerProcess instances to find free ones that can be dispatched to Queue sessionMessages; Queue sessionWorkers; int maxQueueSize; TransportType transportType; // each MessageQueue has a list of WorkerProcess instances. // each WorkerProcess is associated to a single MessageQueue. // Self-Hosted: 1 WorkerProcess in the list at all times, always the same WorkerProcess (unless we DCR it). 1st WorkerProcess creates the MessageQueue, last WorkerProcess deletes the MessageQueue. // Web-Hosted: 0-n WorkerProcess in the list. MessageQueue created/delete by WAS explicitly or implicitly by WAS going away. List workers; internal MessageQueue() { transportType = TransportType.Unsupported; paths = new List (); workers = new List (); sessionWorkers = new Queue (); sessionMessages = new Queue (); dispatchSessionCompletedCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(DispatchSessionCompletedCallback)); lock (instances) { instances.Add(this); } } #if DEBUG internal List SnapshotWorkers() { lock (this.workers) { return new List (workers); } } #endif internal virtual bool CanDispatch { get { return TransportType != TransportType.Tcp || !SMSvcHost.IsTcpPortSharingPaused; } } internal TransportType TransportType { get { return transportType; } } object SessionLock { get { return sessionWorkers; } } internal static void CloseAll(TransportType transportType) { MessageQueue[] instancesCopy; lock (instances) { instancesCopy = instances.ToArray(); instances.Clear(); } foreach (MessageQueue messageQueue in instancesCopy) { if (messageQueue.TransportType == transportType) { messageQueue.CloseCore(); } } } protected int PendingCount { get { lock (SessionLock) { return sessionMessages.Count; } } } protected void Close() { Debug.Print("MessageQueue.Close()"); // this is only called when all the workers are done // with I/O (they could be in the process of closing) lock (instances) { instances.Remove(this); } CloseCore(); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } protected void DropPendingMessages(bool sendFault) { lock (SessionLock) { foreach (ListenerSessionConnection sessionMessage in sessionMessages.ToArray()) { if (sessionMessage != null) { if (sendFault) { TransportListener.SendFault(sessionMessage.Connection, FramingEncodingString.EndpointUnavailableFault); } else { sessionMessage.Connection.Abort(); } } } sessionMessages.Clear(); } } void CloseCore() { Debug.Print("MessageQueue.CloseCore()"); UnregisterAll(); DropPendingMessages(false); lock (registry) { foreach (WorkerProcess worker in workers.ToArray()) { worker.Close(); } workers.Clear(); } if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this); } } internal void EnqueueSessionAndDispatch(ListenerSessionConnection session) { lock (SessionLock) { if (!CanDispatch) { TransportListener.SendFault(session.Connection, FramingEncodingString.EndpointUnavailableFault); OnDispatchFailure(transportType); return; } else if (sessionMessages.Count >= maxQueueSize) { // Abort the connection when the queue is full. session.Connection.Abort(); OnDispatchFailure(transportType); return; } else { sessionMessages.Enqueue(session); } } OnSessionEnqueued(); DispatchSession(); } void EnqueueWorkerAndDispatch(WorkerProcess worker, bool canDispatchOnThisThread) { lock (SessionLock) { sessionWorkers.Enqueue(worker); } if (canDispatchOnThisThread) { DispatchSession(); } else { IOThreadScheduler.ScheduleCallback(dispatchSessionCallback, this); } } static void DispatchSessionCallback(object state) { MessageQueue thisPtr = (MessageQueue)state; thisPtr.DispatchSession(); } void DispatchSession() { for (; ; ) { ListenerSessionConnection session = null; lock (SessionLock) { if (sessionMessages.Count > 0) { WorkerProcess worker = null; while (sessionWorkers.Count > 0) { worker = sessionWorkers.Dequeue(); if (worker.IsRegistered) { break; } worker = null; } if (worker == null) { // There is no more active worker. So break the loop. break; } // For better performance, we may want to check whether the message has been timed out in the future. session = sessionMessages.Dequeue(); session.WorkerProcess = worker; } } if (session == null) { // There is mo more message left. So break the loop. break; } StartDispatchSession(session); } } void StartDispatchSession(ListenerSessionConnection session) { IAsyncResult dispatchAsyncResult = null; try { dispatchAsyncResult = session.WorkerProcess.BeginDispatchSession(session, dispatchSessionCompletedCallback, session); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceWarning) { DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Warning); } if (session.WorkerProcess.IsRegistered) { // Add the worker back to the queue. EnqueueWorkerAndDispatch(session.WorkerProcess, false); } } if (dispatchAsyncResult != null && dispatchAsyncResult.CompletedSynchronously) { CompleteDispatchSession(dispatchAsyncResult); } } void DispatchSessionCompletedCallback(IAsyncResult result) { if (result.CompletedSynchronously) { return; } CompleteDispatchSession(result); } void CompleteDispatchSession(IAsyncResult result) { ListenerSessionConnection session = (ListenerSessionConnection)result.AsyncState; DiagnosticUtility.DebugAssert(session.WorkerProcess != null, "The WorkerProcess should be set on the message."); if (!session.WorkerProcess.EndDispatchSession(result)) { OnConnectionDispatchFailed(session.Connection); } EnqueueWorkerAndDispatch(session.WorkerProcess, !result.CompletedSynchronously); } protected virtual bool CanShare { get { return false; } } internal static void OnDispatchFailure(TransportType transportType) { if (transportType == TransportType.Tcp) { ListenerPerfCounters.IncrementDispatchFailuresTcp(); } else if (transportType == TransportType.NamedPipe) { ListenerPerfCounters.IncrementDispatchFailuresNamedPipe(); } } bool OnConnectionDispatchFailed(IConnection connection) { TransportListener.SendFault(connection, FramingEncodingString.ConnectionDispatchFailedFault); return false; } protected void OnNewWorkerAvailable(WorkerProcess worker) { lock (this.workers) { worker.Queue = this; workers.Add(worker); // offload draining the IO queues to this new worker on a different thread IOThreadScheduler.ScheduleCallback(dispatchToNewWorkerCallback, worker); } } static void DispatchToNewWorkerCallback(object state) { WorkerProcess worker = state as WorkerProcess; worker.Queue.EnqueueWorkerAndDispatch(worker, true); } public ListenerExceptionStatus Register(BaseUriWithWildcard path) { if (path.BaseAddress.Scheme == Uri.UriSchemeNetTcp) { if (transportType == TransportType.NamedPipe) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetTcp.MaxPendingConnections; transportType = TransportType.Tcp; } else if (path.BaseAddress.Scheme == Uri.UriSchemeNetPipe) { if (transportType == TransportType.Tcp) { return ListenerExceptionStatus.ProtocolUnsupported; } maxQueueSize = ListenerConfig.NetPipe.MaxPendingConnections; transportType = TransportType.NamedPipe; } else { return ListenerExceptionStatus.ProtocolUnsupported; } ListenerExceptionStatus status = RoutingTable.Start(this, path); if (status == ListenerExceptionStatus.Success) { paths.Add(path); IncrementUrisRegisteredCounters(); OnRegisterCompleted(); } return status; } internal static ListenerExceptionStatus Register(BaseUriWithWildcard path, WorkerProcess worker) { MessageQueue queue = null; lock (registry) { if (registry.TryGetValue(path, out queue)) { if (!queue.CanShare) { return ListenerExceptionStatus.ConflictingRegistration; } } else { queue = new MessageQueue(); ListenerExceptionStatus status = ListenerExceptionStatus.FailedToListen; try { status = queue.Register(path); } catch (Exception exception) { if (DiagnosticUtility.IsFatal(exception)) { throw; } if (DiagnosticUtility.ShouldTraceError) { ListenerTraceUtility.TraceEvent(TraceEventType.Error, TraceCode.RoutingTableCannotListen, new StringTraceRecord("Path", path.ToString()), null, exception); } } if (status != ListenerExceptionStatus.Success) { // not setting the worker.queue is not a problem, since we can't use this WorkerProcess return status; } registry.Add(path, queue); } } queue.OnNewWorkerAvailable(worker); return ListenerExceptionStatus.Success; } protected virtual void OnSessionEnqueued() {} public void UnregisterAll() { while (paths.Count > 0) { Unregister(paths[0]); } } void Unregister(BaseUriWithWildcard path) { DiagnosticUtility.DebugAssert(paths.Contains(path), "Unregister: unregistering an unregistered path"); if (DiagnosticUtility.ShouldTraceInformation) { ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueUnregisterSucceeded, new System.ServiceModel.Diagnostics.StringTraceRecord("Path", path.ToString()), this, null); } RoutingTable.Stop(this, path); IncrementUrisUnregisteredCounters(); OnUnregisterCompleted(); registry.Remove(path); paths.Remove(path); } protected virtual void OnUnregisterLastWorker() { Debug.Print("MessageQueue.OnUnregisterLastWorker() calling Close()"); Close(); } internal virtual void Unregister(WorkerProcess worker) { Debug.Print("MessageQueue.Unregister() worker: " + worker.ProcessId); lock (registry) { DiagnosticUtility.DebugAssert(object.Equals(this, worker.Queue), "MessageQueue.Unregister() cannot unregister a worker registered with a queue different than this."); workers.Remove(worker); Debug.Print("MessageQueue.Unregister() left with workers: " + workers.Count); if (workers.Count == 0) { OnUnregisterLastWorker(); } } } protected virtual void OnRegisterCompleted() { IncrementRegistrationsActiveCounters(); } protected virtual void OnUnregisterCompleted() { DecrementRegistrationsActiveCounters(); } protected void IncrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.IncrementRegistrationsActiveNamedPipe(); } } protected void DecrementRegistrationsActiveCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.DecrementRegistrationsActiveTcp(); } else { ListenerPerfCounters.DecrementRegistrationsActiveNamedPipe(); } } void IncrementUrisUnregisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisUnregisteredTcp(); } else { ListenerPerfCounters.IncrementUrisUnregisteredNamedPipe(); } } void IncrementUrisRegisteredCounters() { if (this.TransportType == TransportType.Tcp) { ListenerPerfCounters.IncrementUrisRegisteredTcp(); } else { ListenerPerfCounters.IncrementUrisRegisteredNamedPipe(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ColumnMapCopier.cs
- HttpHandlersSection.cs
- FolderBrowserDialog.cs
- Clock.cs
- WindowInteropHelper.cs
- HttpCacheParams.cs
- BuildProviderCollection.cs
- ConnectionManagementSection.cs
- ErrorHandler.cs
- LoadedOrUnloadedOperation.cs
- DynamicRouteExpression.cs
- AssemblyContextControlItem.cs
- CodeMemberField.cs
- SelectionEditor.cs
- SortQuery.cs
- GifBitmapDecoder.cs
- Matrix3DConverter.cs
- TextLineResult.cs
- smtpconnection.cs
- GuidelineCollection.cs
- InkCanvasInnerCanvas.cs
- RegistrationContext.cs
- EntityCommandExecutionException.cs
- TabControl.cs
- LayoutExceptionEventArgs.cs
- BitmapCacheBrush.cs
- XPathPatternParser.cs
- BinaryObjectInfo.cs
- DataGridViewUtilities.cs
- HtmlInputButton.cs
- DbModificationCommandTree.cs
- IResourceProvider.cs
- UnmanagedHandle.cs
- ValueTypeFixupInfo.cs
- ImageMap.cs
- PerfCounters.cs
- ServerProtocol.cs
- AutomationProperties.cs
- DelimitedListTraceListener.cs
- ClientConvert.cs
- ClientSponsor.cs
- ExceptionRoutedEventArgs.cs
- DependencyObjectPropertyDescriptor.cs
- VisualBasicDesignerHelper.cs
- CodeAttachEventStatement.cs
- DataObjectEventArgs.cs
- EpmContentDeSerializerBase.cs
- ArglessEventHandlerProxy.cs
- IChannel.cs
- OutArgument.cs
- AnnotationHighlightLayer.cs
- Stack.cs
- IdleTimeoutMonitor.cs
- EntityDataSourceColumn.cs
- DataTableNameHandler.cs
- Deflater.cs
- Exception.cs
- ActivityDesignerAccessibleObject.cs
- MaterialGroup.cs
- WpfXamlLoader.cs
- MethodSignatureGenerator.cs
- Message.cs
- DataServiceHostWrapper.cs
- SoapHttpTransportImporter.cs
- EndpointFilterProvider.cs
- ErrorTableItemStyle.cs
- Zone.cs
- SoapIgnoreAttribute.cs
- COM2Enum.cs
- FontFamily.cs
- ImagingCache.cs
- TextMetrics.cs
- SelectedPathEditor.cs
- VarRemapper.cs
- SqlMethodAttribute.cs
- WindowsImpersonationContext.cs
- HtmlGenericControl.cs
- ResourceType.cs
- X509Chain.cs
- GeneralTransform.cs
- RootDesignerSerializerAttribute.cs
- RecordBuilder.cs
- TextHidden.cs
- ConfigurationStrings.cs
- ToolStripRenderer.cs
- Int32CollectionConverter.cs
- ApplicationManager.cs
- TableLayoutColumnStyleCollection.cs
- TreeNodeStyleCollectionEditor.cs
- SoapMessage.cs
- Schema.cs
- BufferedGraphicsManager.cs
- HttpChannelBindingToken.cs
- AutomationFocusChangedEventArgs.cs
- DataSetUtil.cs
- LockedAssemblyCache.cs
- ListItemViewAttribute.cs
- JsonWriterDelegator.cs
- BindingValueChangedEventArgs.cs
- ConfigXmlComment.cs