MessageQueue.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / SMSvcHost / System / ServiceModel / Activation / MessageQueue.cs / 1 / MessageQueue.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.ServiceModel.Activation 
{
    using System; 
    using System.Diagnostics; 
    using System.Threading;
    using System.Collections.Generic; 
    using System.ServiceModel;
    using System.ServiceModel.Channels;
    using System.ServiceModel.Diagnostics;
    using System.ServiceModel.Activation.Diagnostics; 

    class MessageQueue 
    { 
        static WaitCallback dispatchToNewWorkerCallback = new WaitCallback(DispatchToNewWorkerCallback);
        static WaitCallback dispatchSessionCallback = new WaitCallback(DispatchSessionCallback); 

        static Dictionary registry = new Dictionary();
        static List instances = new List();
        AsyncCallback dispatchSessionCompletedCallback; 
        List paths;
 
        // we use a queue of session-messages for dispatching 
        // we use it to park messages that can't be dispatched and need to be pended
        // we use a queue of WorkerProcess instances to find free ones that can be dispatched to 
        Queue sessionMessages;
        Queue sessionWorkers;
        int maxQueueSize;
 
        TransportType transportType;
 
        // each MessageQueue has a list of WorkerProcess instances. 
        // each WorkerProcess is associated to a single MessageQueue.
        // Self-Hosted: 1 WorkerProcess in the list at all times, always the same WorkerProcess (unless we DCR it). 1st WorkerProcess creates the MessageQueue, last WorkerProcess deletes the MessageQueue. 
        // Web-Hosted: 0-n WorkerProcess in the list. MessageQueue created/delete by WAS explicitly or implicitly by WAS going away.
        List workers;

        internal MessageQueue() 
        {
            transportType = TransportType.Unsupported; 
            paths = new List(); 
            workers = new List();
            sessionWorkers = new Queue(); 
            sessionMessages = new Queue();
            dispatchSessionCompletedCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(DispatchSessionCompletedCallback));

            lock (instances) 
            {
                instances.Add(this); 
            } 
        }
 
#if DEBUG
        internal List SnapshotWorkers()
        {
            lock (this.workers) 
            {
                return new List(workers); 
            } 
        }
#endif 

        internal virtual bool CanDispatch
        {
            get 
            {
                return TransportType != TransportType.Tcp || 
                    !SMSvcHost.IsTcpPortSharingPaused; 
            }
        } 

        internal TransportType TransportType
        {
            get 
            {
                return transportType; 
            } 
        }
 
        object SessionLock
        {
            get
            { 
                return sessionWorkers;
            } 
        } 

        internal static void CloseAll(TransportType transportType) 
        {
            MessageQueue[] instancesCopy;
            lock (instances)
            { 
                instancesCopy = instances.ToArray();
                instances.Clear(); 
            } 
            foreach (MessageQueue messageQueue in instancesCopy)
            { 
                if (messageQueue.TransportType == transportType)
                {
                    messageQueue.CloseCore();
                } 
            }
        } 
 
        protected int PendingCount
        { 
            get
            {
                lock (SessionLock)
                { 
                    return sessionMessages.Count;
                } 
            } 
        }
 
        protected void Close()
        {
            Debug.Print("MessageQueue.Close()");
            // this is only called when all the workers are done 
            // with I/O (they could be in the process of closing)
            lock (instances) 
            { 
                instances.Remove(this);
            } 
            CloseCore();

            if (DiagnosticUtility.ShouldTraceInformation)
            { 
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this);
            } 
        } 

        protected void DropPendingMessages(bool sendFault) 
        {
            lock (SessionLock)
            {
                foreach (ListenerSessionConnection sessionMessage in sessionMessages.ToArray()) 
                {
                    if (sessionMessage != null) 
                    { 
                        if (sendFault)
                        { 
                            TransportListener.SendFault(sessionMessage.Connection, FramingEncodingString.EndpointUnavailableFault);
                        }
                        else
                        { 
                            sessionMessage.Connection.Abort();
                        } 
                    } 

                } 
                sessionMessages.Clear();
            }
        }
 
        void CloseCore()
        { 
            Debug.Print("MessageQueue.CloseCore()"); 
            UnregisterAll();
            DropPendingMessages(false); 
            lock (registry)
            {
                foreach (WorkerProcess worker in workers.ToArray())
                { 
                    worker.Close();
                } 
                workers.Clear(); 
            }
 
            if (DiagnosticUtility.ShouldTraceInformation)
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueClosed, this);
            } 
        }
 
        internal void EnqueueSessionAndDispatch(ListenerSessionConnection session) 
        {
            lock (SessionLock) 
            {
                if (!CanDispatch)
                {
                    TransportListener.SendFault(session.Connection, FramingEncodingString.EndpointUnavailableFault); 
                    OnDispatchFailure(transportType);
                    return; 
                } 
                else if (sessionMessages.Count >= maxQueueSize)
                { 
                    // Abort the connection when the queue is full.
                    session.Connection.Abort();
                    OnDispatchFailure(transportType);
                    return; 
                }
                else 
                { 
                    sessionMessages.Enqueue(session);
                } 
            }

            OnSessionEnqueued();
            DispatchSession(); 
        }
 
        void EnqueueWorkerAndDispatch(WorkerProcess worker, bool canDispatchOnThisThread) 
        {
            lock (SessionLock) 
            {
                sessionWorkers.Enqueue(worker);
            }
 
            if (canDispatchOnThisThread)
            { 
                DispatchSession(); 
            }
            else 
            {
                IOThreadScheduler.ScheduleCallback(dispatchSessionCallback, this);
            }
        } 

        static void DispatchSessionCallback(object state) 
        { 
            MessageQueue thisPtr = (MessageQueue)state;
            thisPtr.DispatchSession(); 
        }

        void DispatchSession()
        { 
            for (; ; )
            { 
                ListenerSessionConnection session = null; 
                lock (SessionLock)
                { 
                    if (sessionMessages.Count > 0)
                    {
                        WorkerProcess worker = null;
                        while (sessionWorkers.Count > 0) 
                        {
                            worker = sessionWorkers.Dequeue(); 
                            if (worker.IsRegistered) 
                            {
                                break; 
                            }
                            worker = null;
                        }
 
                        if (worker == null)
                        { 
                            // There is no more active worker. So break the loop. 
                            break;
                        } 

                        // For better performance, we may want to check whether the message has been timed out in the future.
                        session = sessionMessages.Dequeue();
                        session.WorkerProcess = worker; 
                    }
                } 
 
                if (session == null)
                { 
                    // There is mo more message left. So break the loop.
                    break;
                }
 
                StartDispatchSession(session);
            } 
        } 

        void StartDispatchSession(ListenerSessionConnection session) 
        {
            IAsyncResult dispatchAsyncResult = null;
            try
            { 
                dispatchAsyncResult = session.WorkerProcess.BeginDispatchSession(session, dispatchSessionCompletedCallback, session);
            } 
            catch (Exception exception) 
            {
                if (DiagnosticUtility.IsFatal(exception)) 
                {
                    throw;
                }
 
                if (DiagnosticUtility.ShouldTraceWarning)
                { 
                    DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Warning); 
                }
 
                if (session.WorkerProcess.IsRegistered)
                {
                    // Add the worker back to the queue.
                    EnqueueWorkerAndDispatch(session.WorkerProcess, false); 
                }
            } 
 
            if (dispatchAsyncResult != null && dispatchAsyncResult.CompletedSynchronously)
            { 
                CompleteDispatchSession(dispatchAsyncResult);
            }
        }
 
        void DispatchSessionCompletedCallback(IAsyncResult result)
        { 
            if (result.CompletedSynchronously) 
            {
                return; 
            }

            CompleteDispatchSession(result);
        } 

        void CompleteDispatchSession(IAsyncResult result) 
        { 
            ListenerSessionConnection session = (ListenerSessionConnection)result.AsyncState;
            DiagnosticUtility.DebugAssert(session.WorkerProcess != null, "The WorkerProcess should be set on the message."); 

            if (!session.WorkerProcess.EndDispatchSession(result))
            {
                OnConnectionDispatchFailed(session.Connection); 
            }
 
            EnqueueWorkerAndDispatch(session.WorkerProcess, !result.CompletedSynchronously); 
        }
 
        protected virtual bool CanShare
        {
            get { return false; }
        } 

        internal static void OnDispatchFailure(TransportType transportType) 
        { 
            if (transportType == TransportType.Tcp)
            { 
                ListenerPerfCounters.IncrementDispatchFailuresTcp();
            }
            else if (transportType == TransportType.NamedPipe)
            { 
                ListenerPerfCounters.IncrementDispatchFailuresNamedPipe();
            } 
        } 

        bool OnConnectionDispatchFailed(IConnection connection) 
        {
            TransportListener.SendFault(connection, FramingEncodingString.ConnectionDispatchFailedFault);
            return false;
        } 

        protected void OnNewWorkerAvailable(WorkerProcess worker) 
        { 
            lock (this.workers)
            { 
                worker.Queue = this;
                workers.Add(worker);

                // offload draining the IO queues to this new worker on a different thread 
                IOThreadScheduler.ScheduleCallback(dispatchToNewWorkerCallback, worker);
            } 
        } 

        static void DispatchToNewWorkerCallback(object state) 
        {
            WorkerProcess worker = state as WorkerProcess;
            worker.Queue.EnqueueWorkerAndDispatch(worker, true);
        } 

        public ListenerExceptionStatus Register(BaseUriWithWildcard path) 
        { 
            if (path.BaseAddress.Scheme == Uri.UriSchemeNetTcp)
            { 
                if (transportType == TransportType.NamedPipe)
                {
                    return ListenerExceptionStatus.ProtocolUnsupported;
                } 

                maxQueueSize = ListenerConfig.NetTcp.MaxPendingConnections; 
                transportType = TransportType.Tcp; 
            }
            else if (path.BaseAddress.Scheme == Uri.UriSchemeNetPipe) 
            {
                if (transportType == TransportType.Tcp)
                {
                    return ListenerExceptionStatus.ProtocolUnsupported; 
                }
 
                maxQueueSize = ListenerConfig.NetPipe.MaxPendingConnections; 
                transportType = TransportType.NamedPipe;
            } 
            else
            {
                return ListenerExceptionStatus.ProtocolUnsupported;
            } 

            ListenerExceptionStatus status = RoutingTable.Start(this, path); 
            if (status == ListenerExceptionStatus.Success) 
            {
                paths.Add(path); 
                IncrementUrisRegisteredCounters();
                OnRegisterCompleted();
            }
 
            return status;
        } 
 
        internal static ListenerExceptionStatus Register(BaseUriWithWildcard path, WorkerProcess worker)
        { 
            MessageQueue queue = null;
            lock (registry)
            {
                if (registry.TryGetValue(path, out queue)) 
                {
                    if (!queue.CanShare) 
                    { 
                        return ListenerExceptionStatus.ConflictingRegistration;
                    } 
                }
                else
                {
                    queue = new MessageQueue(); 
                    ListenerExceptionStatus status = ListenerExceptionStatus.FailedToListen;
 
                    try 
                    {
                        status = queue.Register(path); 
                    }
                    catch (Exception exception)
                    {
                        if (DiagnosticUtility.IsFatal(exception)) 
                        {
                            throw; 
                        } 

                        if (DiagnosticUtility.ShouldTraceError) 
                        {
                            ListenerTraceUtility.TraceEvent(TraceEventType.Error, TraceCode.RoutingTableCannotListen, new StringTraceRecord("Path", path.ToString()), null, exception);
                        }
                    } 

                    if (status != ListenerExceptionStatus.Success) 
                    { 
                        // not setting the worker.queue is not a problem, since we can't use this WorkerProcess
                        return status; 
                    }

                    registry.Add(path, queue);
                } 
            }
 
            queue.OnNewWorkerAvailable(worker); 
            return ListenerExceptionStatus.Success;
        } 

        protected virtual void OnSessionEnqueued() {}

        public void UnregisterAll() 
        {
            while (paths.Count > 0) 
            { 
                Unregister(paths[0]);
            } 
        }

        void Unregister(BaseUriWithWildcard path)
        { 
            DiagnosticUtility.DebugAssert(paths.Contains(path), "Unregister: unregistering an unregistered path");
 
            if (DiagnosticUtility.ShouldTraceInformation) 
            {
                ListenerTraceUtility.TraceEvent(TraceEventType.Information, TraceCode.MessageQueueUnregisterSucceeded, new System.ServiceModel.Diagnostics.StringTraceRecord("Path", path.ToString()), this, null); 
            }

            RoutingTable.Stop(this, path);
            IncrementUrisUnregisteredCounters(); 
            OnUnregisterCompleted();
 
            registry.Remove(path); 
            paths.Remove(path);
        } 

        protected virtual void OnUnregisterLastWorker()
        {
            Debug.Print("MessageQueue.OnUnregisterLastWorker() calling Close()"); 
            Close();
        } 
 
        internal virtual void Unregister(WorkerProcess worker)
        { 
            Debug.Print("MessageQueue.Unregister() worker: " + worker.ProcessId);
            lock (registry)
            {
                DiagnosticUtility.DebugAssert(object.Equals(this, worker.Queue), "MessageQueue.Unregister() cannot unregister a worker registered with a queue different than this."); 

                workers.Remove(worker); 
                Debug.Print("MessageQueue.Unregister() left with workers: " + workers.Count); 
                if (workers.Count == 0)
                { 
                    OnUnregisterLastWorker();
                }
            }
        } 

        protected virtual void OnRegisterCompleted() 
        { 
            IncrementRegistrationsActiveCounters();
        } 

        protected virtual void OnUnregisterCompleted()
        {
            DecrementRegistrationsActiveCounters(); 
        }
 
        protected void IncrementRegistrationsActiveCounters() 
        {
            if (this.TransportType == TransportType.Tcp) 
            {
                ListenerPerfCounters.IncrementRegistrationsActiveTcp();
            }
            else 
            {
                ListenerPerfCounters.IncrementRegistrationsActiveNamedPipe(); 
            } 
        }
 
        protected void DecrementRegistrationsActiveCounters()
        {
            if (this.TransportType == TransportType.Tcp)
            { 
                ListenerPerfCounters.DecrementRegistrationsActiveTcp();
            } 
            else 
            {
                ListenerPerfCounters.DecrementRegistrationsActiveNamedPipe(); 
            }
        }

        void IncrementUrisUnregisteredCounters() 
        {
            if (this.TransportType == TransportType.Tcp) 
            { 
                ListenerPerfCounters.IncrementUrisUnregisteredTcp();
            } 
            else
            {
                ListenerPerfCounters.IncrementUrisUnregisteredNamedPipe();
            } 
        }
 
        void IncrementUrisRegisteredCounters() 
        {
            if (this.TransportType == TransportType.Tcp) 
            {
                ListenerPerfCounters.IncrementUrisRegisteredTcp();
            }
            else 
            {
                ListenerPerfCounters.IncrementUrisRegisteredNamedPipe(); 
            } 
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK