WorkflowQueuingService.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / WorkflowQueuingService.cs / 1305376 / WorkflowQueuingService.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.     All rights    reserved.
//-----------------------------------------------------------
using System;
using System.Collections; 
using System.Collections.Generic;
using System.Text; 
using System.Diagnostics; 
using System.Globalization;
using System.Workflow.ComponentModel; 
using System.Runtime.Serialization;
using System.Messaging;

namespace System.Workflow.Runtime 
{
    public class WorkflowQueuingService 
    { 
        Object syncRoot = new Object();
        IWorkflowCoreRuntime rootWorkflowExecutor; 
        List dirtyQueues;
        EventQueueState pendingQueueState = new EventQueueState();
        Dictionary persistedQueueStates;
 
        // event handler used by atomic execution context's Q service for message delivery
        List messageArrivalEventHandlers; 
 
        // set for inner queuing service
        WorkflowQueuingService rootQueuingService; 

        // Runtime information visible to host, stored on the root activity
        public readonly static DependencyProperty PendingMessagesProperty = DependencyProperty.RegisterAttached("PendingMessages", typeof(Queue), typeof(WorkflowQueuingService), new PropertyMetadata(DependencyPropertyOptions.NonSerialized));
 
        // Persisted state properties
        internal static DependencyProperty RootPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("RootPersistedQueueStates", typeof(Dictionary), typeof(WorkflowQueuingService)); 
        internal static DependencyProperty LocalPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("LocalPersistedQueueStates", typeof(Dictionary), typeof(WorkflowQueuingService)); 
        private const string pendingNotification = "*PendingNotifications";
 
        // Snapshots created during pre-persist and dumped during post-persist
        // If  persistence fails, changes made to queuing service during pre-persist must be undone
        //    in post-persist.
        // Created for ref. 20575. 
        private Dictionary persistedQueueStatesSnapshot = null;
        private EventQueueState pendingQueueStateSnapshot = null; 
 
        // root Q service constructor
        internal WorkflowQueuingService(IWorkflowCoreRuntime rootWorkflowExecutor) 
        {
            this.rootWorkflowExecutor = rootWorkflowExecutor;
            this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages);
            this.persistedQueueStates = (Dictionary)this.rootWorkflowExecutor.RootActivity.GetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty); 
            if (this.persistedQueueStates == null)
            { 
                this.persistedQueueStates = new Dictionary(); 
                this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, this.persistedQueueStates);
            } 
            if ( !this.Exists(pendingNotification))
                this.CreateWorkflowQueue(pendingNotification, false);
        }
 
        // inner Q service constructor
        internal WorkflowQueuingService(WorkflowQueuingService copyFromQueuingService) 
        { 
            this.rootQueuingService = copyFromQueuingService;
            this.rootWorkflowExecutor = copyFromQueuingService.rootWorkflowExecutor; 
            this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages);
            this.persistedQueueStates = new Dictionary();
            this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.LocalPersistedQueueStatesProperty, this.persistedQueueStates);
            SubscribeForRootMessageDelivery(); 
        }
 
        public WorkflowQueue CreateWorkflowQueue(IComparable queueName, bool transactional) 
        {
            if(queueName == null) 
                throw new ArgumentNullException("queueName");

            lock (SyncRoot)
            { 
                // if not transactional create one at the root
                // so it is visible outside this transaction 
                if (this.rootQueuingService != null && !transactional) 
                {
                    return this.rootQueuingService.CreateWorkflowQueue(queueName, false); 
                }

                NewQueue(queueName, true, transactional);
 
                return new WorkflowQueue(this, queueName);
            } 
        } 

        public void DeleteWorkflowQueue(IComparable queueName) 
        {
            if (queueName == null)
                throw new ArgumentNullException("queueName");
 
            lock(SyncRoot)
            { 
                // when we are deleting the queue from activity 
                // message delivery should not happen.
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) 
                {
                    this.rootQueuingService.DeleteWorkflowQueue(queueName);
                    return;
                } 

                EventQueueState queueState = GetEventQueueState(queueName); 
 
                Queue queue = queueState.Messages;
                Queue pendingQueue = this.pendingQueueState.Messages; 

                while (queue.Count != 0)
                {
                    pendingQueue.Enqueue(queue.Dequeue()); 
                }
 
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Deleting Queue with ID {0} for {1}", queueName.GetHashCode(), queueName); 
                this.persistedQueueStates.Remove(queueName);
            } 
        }

        public bool Exists(IComparable queueName)
        { 
            if (queueName == null)
                throw new ArgumentNullException("queueName"); 
 
            lock(SyncRoot)
            { 
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                {
                    return this.rootQueuingService.Exists(queueName);
                } 

                return this.persistedQueueStates.ContainsKey(queueName); 
            } 
        }
 
        public WorkflowQueue GetWorkflowQueue(IComparable queueName)
        {
            if (queueName == null)
                throw new ArgumentNullException("queueName"); 

            lock (SyncRoot) 
            { 
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                { 
                    return this.rootQueuingService.GetWorkflowQueue(queueName);
                }

                GetEventQueueState(queueName); 

                return new WorkflowQueue(this, queueName); 
            } 
        }
 
        #region internal functions

        internal Object SyncRoot
        { 
            get { return syncRoot; }
        } 
 
        internal void EnqueueEvent(IComparable queueName, Object item)
        { 
            if (queueName == null)
                throw new ArgumentNullException("queueName");

            lock (SyncRoot) 
            {
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) 
                { 
                    this.rootQueuingService.EnqueueEvent(queueName, item);
                    return; 
                }

                EventQueueState qState = GetQueue(queueName);
                if (!qState.Enabled) 
                {
                    throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable); 
                } 

                // note enqueue allowed irrespective of dirty flag since it is delivered through 
                qState.Messages.Enqueue(item);

                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName);
 
                // notify message arrived subscribers
                for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i) 
                { 
                    this.messageArrivalEventHandlers[i].OnItemEnqueued(queueName, item);
                } 

                NotifyExternalSubscribers(queueName, qState, item);
            }
        } 
        internal bool SafeEnqueueEvent(IComparable queueName, Object item)
        { 
            if (queueName == null) 
                throw new ArgumentNullException("queueName");
 
            lock (SyncRoot)
            {
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                { 
                    return this.rootQueuingService.SafeEnqueueEvent(queueName, item);
                } 
 
                EventQueueState qState = GetQueue(queueName);
                if (!qState.Enabled) 
                {
                    throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable);
                }
 
                // note enqueue allowed irrespective of dirty flag since it is delivered through
                qState.Messages.Enqueue(item); 
 
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName);
 
                // notify message arrived subscribers
                for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
                {
                    this.messageArrivalEventHandlers[i].OnItemSafeEnqueued(queueName, item); 
                }
 
                NotifySynchronousSubscribers(queueName, qState, item); 
                return QueueAsynchronousEvent(queueName, qState);
            } 
        }


        internal object Peek(IComparable queueName) 
        {
            if (queueName == null) 
                throw new ArgumentNullException("queueName"); 

            lock (SyncRoot) 
            {
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                {
                    return this.rootQueuingService.Peek(queueName); 
                }
 
                EventQueueState queueState = GetEventQueueState(queueName); 
                if (queueState.Messages.Count != 0)
                    return queueState.Messages.Peek(); 

                object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName };
                string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
 
                throw new QueueException(message, MessageQueueErrorCode.MessageNotFound);
            } 
        } 

        internal Object DequeueEvent(IComparable queueName) 
        {
            if (queueName == null)
                throw new ArgumentNullException("queueName");
 
            lock (SyncRoot)
            { 
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) 
                {
                    return this.rootQueuingService.DequeueEvent(queueName); 
                }

                EventQueueState queueState = GetEventQueueState(queueName);
                if (queueState.Messages.Count != 0) 
                    return queueState.Messages.Dequeue();
 
                object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName }; 
                string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
 
                throw new QueueException(message, MessageQueueErrorCode.MessageNotFound);
            }
        }
 
        internal EventQueueState GetQueueState(IComparable eventType)
        { 
            lock (SyncRoot) 
            {
                return GetQueue(eventType); 
            }
        }

        Activity caller; 

        internal Activity CallingActivity 
        { 
            get
            { 
                if (this.rootQueuingService != null)
                    return this.rootQueuingService.CallingActivity;
                return this.caller;
            } 
            set
            { 
                if (this.rootQueuingService != null) 
                    this.rootQueuingService.CallingActivity = value;
 
                this.caller = value;
            }
        }
 
        private bool QueueAsynchronousEvent(IComparable queueName, EventQueueState qState)
        { 
            if (qState.AsynchronousListeners.Count != 0 || IsNestedListenersExist(queueName)) 
            {
                Queue q = GetQueue(pendingNotification).Messages; 
                q.Enqueue(new KeyValuePair(queueName, qState));
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Queued delayed message notification for '{0}'", queueName.ToString());
                return q.Count == 1;
            } 
            return false;
        } 
 
        bool IsNestedListenersExist(IComparable queueName)
        { 
            for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
            {
                WorkflowQueuingService qService = messageArrivalEventHandlers[i];
                EventQueueState queueState = null; 

                if (qService.persistedQueueStates.TryGetValue(queueName, out queueState) && 
                    queueState.AsynchronousListeners.Count != 0) 
                return true;
            } 
            return false;
        }
        internal void ProcessesQueuedAsynchronousEvents()
        { 
            Queue q = GetQueue(pendingNotification).Messages;
            while (q.Count > 0) 
            { 
                KeyValuePair pair = (KeyValuePair)q.Dequeue();
                // notify message arrived subscribers 
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Processing delayed message notification '{0}'", pair.Key.ToString());
                for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
                {
                    WorkflowQueuingService service = this.messageArrivalEventHandlers[i]; 
                    if (service.persistedQueueStates.ContainsKey(pair.Key))
                    { 
                        EventQueueState qState = service.GetQueue(pair.Key); 
                        if (qState.Enabled)
                        { 
                            service.NotifyAsynchronousSubscribers(pair.Key, qState, 1);
                        }
                    }
                } 
                NotifyAsynchronousSubscribers(pair.Key, pair.Value, 1);
            } 
        } 

        internal void NotifyAsynchronousSubscribers(IComparable queueName, EventQueueState qState, int numberOfNotification) 
        {
            for (int i = 0; i < numberOfNotification; ++i)
            {
                QueueEventArgs args = new QueueEventArgs(queueName); 
                lock (SyncRoot)
                { 
                    foreach (ActivityExecutorDelegateInfo subscriber in qState.AsynchronousListeners) 
                    {
                        Activity contextActivity = rootWorkflowExecutor.GetContextActivityForId(subscriber.ContextId); 
                        Debug.Assert(contextActivity != null);
                        subscriber.InvokeDelegate(contextActivity, args, false);
                        WorkflowTrace.Runtime.TraceInformation("Queuing Service: Notifying async subscriber on queue:'{0}' activity:{1}", queueName.ToString(), subscriber.ActivityQualifiedName);
                    } 
                }
            } 
        } 

        ///  
        /// At termination/completion point, need to move messages from all queues to the pending queue
        /// 
        internal void MoveAllMessagesToPendingQueue()
        { 
            lock (SyncRoot)
            { 
                Queue pendingQueue = this.pendingQueueState.Messages; 
                foreach (EventQueueState queueState in this.persistedQueueStates.Values)
                { 
                    Queue queue = queueState.Messages;
                    while (queue.Count != 0)
                    {
                        pendingQueue.Enqueue(queue.Dequeue()); 
                    }
                } 
            } 
        }
 
       #endregion

        #region private root q service helpers
 
        private EventQueueState GetEventQueueState(IComparable queueName)
        { 
            EventQueueState queueState = GetQueue(queueName); 
            if (queueState.Dirty)
            { 
                string message =
                    string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { queueName });

                throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); 
            }
 
            return queueState; 
        }
 
        private void NewQueue(IComparable queueID, bool enabled, bool transactional)
        {
            WorkflowTrace.Runtime.TraceInformation("Queuing Service: Creating new Queue with ID {0} for {1}", queueID.GetHashCode(), queueID);
 
            if (this.persistedQueueStates.ContainsKey(queueID))
            { 
                object[] args = 
                    new object[] { System.Messaging.MessageQueueErrorCode.QueueExists, queueID };
                string message = 
                    string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);

                throw new QueueException(message, MessageQueueErrorCode.QueueExists);
            } 

            EventQueueState queueState = new EventQueueState(); 
            queueState.Enabled = enabled; 
            queueState.queueName = queueID;
            queueState.Transactional = transactional; 
            this.persistedQueueStates.Add(queueID, queueState);
        }

        internal EventQueueState GetQueue(IComparable queueID) 
        {
            EventQueueState queue; 
            if (this.persistedQueueStates.TryGetValue(queueID, out queue)) 
            {
                queue.queueName = queueID; 
                return queue;
            }

            object[] args = 
                new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueID };
            string message = 
                string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args); 

            throw new QueueException(message, MessageQueueErrorCode.QueueNotFound); 
        }

        internal IEnumerable QueueNames
        { 
            get
            { 
                List list = new List(this.persistedQueueStates.Keys); 
                foreach (IComparable name in list)
                { 
                    if (name is String && (String)name == pendingNotification)
                    {
                        list.Remove(name);
                        break; 
                    }
                } 
                return list; 
            }
        } 

        private void ApplyChangesFrom(EventQueueState srcPendingQueueState, Dictionary srcPersistedQueueStates)
        {
            lock (SyncRoot) 
            {
                Dictionary modifiedItems = new Dictionary(); 
 
                foreach (KeyValuePair mergeItem in srcPersistedQueueStates)
                { 
                    Debug.Assert(mergeItem.Value.Transactional, "Queue inside a transactional context is not transactional!");

                    if (mergeItem.Value.Transactional)
                    { 
                        if (this.persistedQueueStates.ContainsKey(mergeItem.Key))
                        { 
                            EventQueueState oldvalue = this.persistedQueueStates[mergeItem.Key]; 
                            if (!oldvalue.Dirty)
                            { 
                                // we could get here when there
                                // are conflicting create Qs
                                string message =
                                    string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { mergeItem.Key }); 

                                throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); 
                            } 
                        }
                        modifiedItems.Add(mergeItem.Key, mergeItem.Value); 
                    }
                }

                // no conflicts detected now make the updates visible 
                foreach (KeyValuePair modifiedItem in modifiedItems)
                { 
                    // shared queue in the root, swap out to new value 
                    // or add new item
                    this.persistedQueueStates[modifiedItem.Key] = modifiedItem.Value; 
                }

                this.pendingQueueState.CopyFrom(srcPendingQueueState);
            } 
        }
 
        // message arrival async notification 
        private void NotifyExternalSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance)
        { 
            NotifySynchronousSubscribers(queueName, qState, eventInstance);
            NotifyAsynchronousSubscribers(queueName, qState, 1);
        }
 
        private void NotifySynchronousSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance)
        { 
            QueueEventArgs args = new QueueEventArgs(queueName); 

            for (int i = 0; i < qState.SynchronousListeners.Count; ++i) 
            {
                if (qState.SynchronousListeners[i].HandlerDelegate != null)
                    qState.SynchronousListeners[i].HandlerDelegate(new WorkflowQueue(this, queueName), args);
                else 
                    qState.SynchronousListeners[i].EventListener.OnEvent(new WorkflowQueue(this, queueName), args);
            } 
        } 

        // returns a valid state only if transactional and entry exists 
        private EventQueueState MarkQueueDirtyIfTransactional(IComparable queueName)
        {
            lock (SyncRoot)
            { 
                Debug.Assert(this.rootQueuingService == null, "MarkQueueDirty should be done at root");
 
                if (!this.persistedQueueStates.ContainsKey(queueName)) 
                    return null;
 
                EventQueueState queueState = GetQueue(queueName);

                if (!queueState.Transactional)
                    return null; 

                if (queueState.Dirty) 
                    return queueState; // already marked 

                queueState.Dirty = true; 

                if (this.dirtyQueues == null)
                    this.dirtyQueues = new List();
 
                // add to the list of dirty queues
                this.dirtyQueues.Add(queueName); 
 
                return queueState;
            } 
        }

        private void AddMessageArrivedEventHandler(WorkflowQueuingService handler)
        { 
            lock (SyncRoot)
            { 
                if (this.messageArrivalEventHandlers == null) 
                    this.messageArrivalEventHandlers = new List();
                this.messageArrivalEventHandlers.Add(handler); 
            }
        }

        private void RemoveMessageArrivedEventHandler(WorkflowQueuingService handler) 
        {
            lock (SyncRoot) 
            { 
                if (this.messageArrivalEventHandlers != null)
                    this.messageArrivalEventHandlers.Remove(handler); 

                if (this.dirtyQueues != null)
                {
                    foreach (IComparable queueName in this.dirtyQueues) 
                    {
                        EventQueueState qState = GetQueue(queueName); 
                        qState.Dirty = false; 
                    }
                } 
            }
        }
       #endregion
 
        #region inner QueuingService functions
        private bool IsTransactionalQueue(IComparable queueName) 
        { 
            // check inner service for existense
            if (!this.persistedQueueStates.ContainsKey(queueName)) 
            {
                EventQueueState queueState = this.rootQueuingService.MarkQueueDirtyIfTransactional(queueName);

                if (queueState != null) 
                {
                    // if transactional proceed to the inner queue service 
                    // for this operation after adding the state 
                    EventQueueState snapshotState = new EventQueueState();
                    snapshotState.CopyFrom(queueState); 
                    this.persistedQueueStates.Add(queueName, snapshotState);
                    return true;
                }
 
                return false;
            } 
 
            return true; // if entry exits, it must be transactional
        } 

        private void SubscribeForRootMessageDelivery()
        {
            if (this.rootQueuingService == null) 
                return;
            this.rootQueuingService.AddMessageArrivedEventHandler(this); 
        } 

        private void UnSubscribeFromRootMessageDelivery() 
        {
            if (this.rootQueuingService == null)
                return;
            this.rootQueuingService.RemoveMessageArrivedEventHandler(this); 
        }
 
        // listen on its internal(parent) queuing service 
        // messages and pull messages. There is one parent queuing service visible to the external
        // host environment. A queueing service snapshot exists per atomic scope and external messages 
        // for existing queues need to be pushed through
        private void OnItemEnqueued(IComparable queueName, object item)
        {
            if (this.persistedQueueStates.ContainsKey(queueName)) 
            {
                // make the message visible to inner queueing service 
                EventQueueState qState = GetQueue(queueName); 
                if (!qState.Enabled)
                { 
                    object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName };
                    string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs);
                    throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
                } 
                qState.Messages.Enqueue(item);
                NotifyExternalSubscribers(queueName, qState, item); 
            } 
        }
 
        private void OnItemSafeEnqueued(IComparable queueName, object item)
        {
            if (this.persistedQueueStates.ContainsKey(queueName))
            { 
                // make the message visible to inner queueing service
                EventQueueState qState = GetQueue(queueName); 
                if (!qState.Enabled) 
                {
                    object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName }; 
                    string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs);
                    throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
                }
                qState.Messages.Enqueue(item); 
                NotifySynchronousSubscribers(queueName, qState, item);
            } 
        } 

        internal void Complete(bool commitSucceeded) 
        {
            if (commitSucceeded)
            {
                this.rootQueuingService.ApplyChangesFrom(this.pendingQueueState, this.persistedQueueStates); 
            }
 
            UnSubscribeFromRootMessageDelivery(); 
        }
        #endregion 

        #region Pre-persist and Post-persist helpers for queuing service states

        // Created for ref. 20575 
        internal void PostPersist(bool isPersistSuccessful)
        { 
            // If persist is unsuccessful, we'll undo the changes done 
            //   because of the call to .Complete() in PrePresist
            if (!isPersistSuccessful) 
            {
                Debug.Assert(rootWorkflowExecutor.CurrentAtomicActivity != null);
                Debug.Assert(pendingQueueStateSnapshot != null);
                Debug.Assert(persistedQueueStatesSnapshot != null); 

                TransactionalProperties transactionalProperties = rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties; 
                Debug.Assert(transactionalProperties != null); 

                // Restore queuing states and set root activity's dependency properties to the new values. 
                pendingQueueState = pendingQueueStateSnapshot;
                persistedQueueStates = persistedQueueStatesSnapshot;
                rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, persistedQueueStatesSnapshot);
                rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, pendingQueueStateSnapshot.Messages); 

                // Also call Subscribe...() because the .Complete() call called Unsubscribe 
                transactionalProperties.LocalQueuingService.SubscribeForRootMessageDelivery(); 
            }
 
            // The backups are no longer necessary.
            // The next call to PrePresistQueuingServiceState() will do a re-backup.
            persistedQueueStatesSnapshot = null;
            pendingQueueStateSnapshot = null; 
        }
 
        // Created for ref. 20575 
        internal void PrePersist()
        { 
            if (rootWorkflowExecutor.CurrentAtomicActivity != null)
            {
                // Create transactionalProperties from currentAtomicActivity
                TransactionalProperties transactionalProperties = this.rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties; 

                // Create backup snapshot of root queuing service's persistedQueuesStates 
                // qService.persistedQueueStates is changed when LocalQueuingService.Complete is called later. 
                persistedQueueStatesSnapshot = new Dictionary();
                foreach (KeyValuePair kv in persistedQueueStates) 
                {
                    EventQueueState individualPersistedQueueStateValue = new EventQueueState();
                    individualPersistedQueueStateValue.CopyFrom(kv.Value);
                    persistedQueueStatesSnapshot.Add(kv.Key, individualPersistedQueueStateValue); 
                }
 
                // Create backup snapshot of root queuing service's pendingQueueState 
                // qService.pendingQueueState is changed when LocalQueuingService.Complete is called later.
                pendingQueueStateSnapshot = new EventQueueState(); 
                pendingQueueStateSnapshot.CopyFrom(pendingQueueState);

                // Reconcile differences between root and local queuing services.
                transactionalProperties.LocalQueuingService.Complete(true); 
            }
        } 
 

        #endregion Pre-persist and post-persist helpers for queuing service states 
    }
}


// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.     All rights    reserved.
//-----------------------------------------------------------
using System;
using System.Collections; 
using System.Collections.Generic;
using System.Text; 
using System.Diagnostics; 
using System.Globalization;
using System.Workflow.ComponentModel; 
using System.Runtime.Serialization;
using System.Messaging;

namespace System.Workflow.Runtime 
{
    public class WorkflowQueuingService 
    { 
        Object syncRoot = new Object();
        IWorkflowCoreRuntime rootWorkflowExecutor; 
        List dirtyQueues;
        EventQueueState pendingQueueState = new EventQueueState();
        Dictionary persistedQueueStates;
 
        // event handler used by atomic execution context's Q service for message delivery
        List messageArrivalEventHandlers; 
 
        // set for inner queuing service
        WorkflowQueuingService rootQueuingService; 

        // Runtime information visible to host, stored on the root activity
        public readonly static DependencyProperty PendingMessagesProperty = DependencyProperty.RegisterAttached("PendingMessages", typeof(Queue), typeof(WorkflowQueuingService), new PropertyMetadata(DependencyPropertyOptions.NonSerialized));
 
        // Persisted state properties
        internal static DependencyProperty RootPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("RootPersistedQueueStates", typeof(Dictionary), typeof(WorkflowQueuingService)); 
        internal static DependencyProperty LocalPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("LocalPersistedQueueStates", typeof(Dictionary), typeof(WorkflowQueuingService)); 
        private const string pendingNotification = "*PendingNotifications";
 
        // Snapshots created during pre-persist and dumped during post-persist
        // If  persistence fails, changes made to queuing service during pre-persist must be undone
        //    in post-persist.
        // Created for ref. 20575. 
        private Dictionary persistedQueueStatesSnapshot = null;
        private EventQueueState pendingQueueStateSnapshot = null; 
 
        // root Q service constructor
        internal WorkflowQueuingService(IWorkflowCoreRuntime rootWorkflowExecutor) 
        {
            this.rootWorkflowExecutor = rootWorkflowExecutor;
            this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages);
            this.persistedQueueStates = (Dictionary)this.rootWorkflowExecutor.RootActivity.GetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty); 
            if (this.persistedQueueStates == null)
            { 
                this.persistedQueueStates = new Dictionary(); 
                this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, this.persistedQueueStates);
            } 
            if ( !this.Exists(pendingNotification))
                this.CreateWorkflowQueue(pendingNotification, false);
        }
 
        // inner Q service constructor
        internal WorkflowQueuingService(WorkflowQueuingService copyFromQueuingService) 
        { 
            this.rootQueuingService = copyFromQueuingService;
            this.rootWorkflowExecutor = copyFromQueuingService.rootWorkflowExecutor; 
            this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages);
            this.persistedQueueStates = new Dictionary();
            this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.LocalPersistedQueueStatesProperty, this.persistedQueueStates);
            SubscribeForRootMessageDelivery(); 
        }
 
        public WorkflowQueue CreateWorkflowQueue(IComparable queueName, bool transactional) 
        {
            if(queueName == null) 
                throw new ArgumentNullException("queueName");

            lock (SyncRoot)
            { 
                // if not transactional create one at the root
                // so it is visible outside this transaction 
                if (this.rootQueuingService != null && !transactional) 
                {
                    return this.rootQueuingService.CreateWorkflowQueue(queueName, false); 
                }

                NewQueue(queueName, true, transactional);
 
                return new WorkflowQueue(this, queueName);
            } 
        } 

        public void DeleteWorkflowQueue(IComparable queueName) 
        {
            if (queueName == null)
                throw new ArgumentNullException("queueName");
 
            lock(SyncRoot)
            { 
                // when we are deleting the queue from activity 
                // message delivery should not happen.
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) 
                {
                    this.rootQueuingService.DeleteWorkflowQueue(queueName);
                    return;
                } 

                EventQueueState queueState = GetEventQueueState(queueName); 
 
                Queue queue = queueState.Messages;
                Queue pendingQueue = this.pendingQueueState.Messages; 

                while (queue.Count != 0)
                {
                    pendingQueue.Enqueue(queue.Dequeue()); 
                }
 
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Deleting Queue with ID {0} for {1}", queueName.GetHashCode(), queueName); 
                this.persistedQueueStates.Remove(queueName);
            } 
        }

        public bool Exists(IComparable queueName)
        { 
            if (queueName == null)
                throw new ArgumentNullException("queueName"); 
 
            lock(SyncRoot)
            { 
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                {
                    return this.rootQueuingService.Exists(queueName);
                } 

                return this.persistedQueueStates.ContainsKey(queueName); 
            } 
        }
 
        public WorkflowQueue GetWorkflowQueue(IComparable queueName)
        {
            if (queueName == null)
                throw new ArgumentNullException("queueName"); 

            lock (SyncRoot) 
            { 
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                { 
                    return this.rootQueuingService.GetWorkflowQueue(queueName);
                }

                GetEventQueueState(queueName); 

                return new WorkflowQueue(this, queueName); 
            } 
        }
 
        #region internal functions

        internal Object SyncRoot
        { 
            get { return syncRoot; }
        } 
 
        internal void EnqueueEvent(IComparable queueName, Object item)
        { 
            if (queueName == null)
                throw new ArgumentNullException("queueName");

            lock (SyncRoot) 
            {
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) 
                { 
                    this.rootQueuingService.EnqueueEvent(queueName, item);
                    return; 
                }

                EventQueueState qState = GetQueue(queueName);
                if (!qState.Enabled) 
                {
                    throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable); 
                } 

                // note enqueue allowed irrespective of dirty flag since it is delivered through 
                qState.Messages.Enqueue(item);

                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName);
 
                // notify message arrived subscribers
                for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i) 
                { 
                    this.messageArrivalEventHandlers[i].OnItemEnqueued(queueName, item);
                } 

                NotifyExternalSubscribers(queueName, qState, item);
            }
        } 
        internal bool SafeEnqueueEvent(IComparable queueName, Object item)
        { 
            if (queueName == null) 
                throw new ArgumentNullException("queueName");
 
            lock (SyncRoot)
            {
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                { 
                    return this.rootQueuingService.SafeEnqueueEvent(queueName, item);
                } 
 
                EventQueueState qState = GetQueue(queueName);
                if (!qState.Enabled) 
                {
                    throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable);
                }
 
                // note enqueue allowed irrespective of dirty flag since it is delivered through
                qState.Messages.Enqueue(item); 
 
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName);
 
                // notify message arrived subscribers
                for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
                {
                    this.messageArrivalEventHandlers[i].OnItemSafeEnqueued(queueName, item); 
                }
 
                NotifySynchronousSubscribers(queueName, qState, item); 
                return QueueAsynchronousEvent(queueName, qState);
            } 
        }


        internal object Peek(IComparable queueName) 
        {
            if (queueName == null) 
                throw new ArgumentNullException("queueName"); 

            lock (SyncRoot) 
            {
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
                {
                    return this.rootQueuingService.Peek(queueName); 
                }
 
                EventQueueState queueState = GetEventQueueState(queueName); 
                if (queueState.Messages.Count != 0)
                    return queueState.Messages.Peek(); 

                object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName };
                string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
 
                throw new QueueException(message, MessageQueueErrorCode.MessageNotFound);
            } 
        } 

        internal Object DequeueEvent(IComparable queueName) 
        {
            if (queueName == null)
                throw new ArgumentNullException("queueName");
 
            lock (SyncRoot)
            { 
                if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) 
                {
                    return this.rootQueuingService.DequeueEvent(queueName); 
                }

                EventQueueState queueState = GetEventQueueState(queueName);
                if (queueState.Messages.Count != 0) 
                    return queueState.Messages.Dequeue();
 
                object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName }; 
                string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
 
                throw new QueueException(message, MessageQueueErrorCode.MessageNotFound);
            }
        }
 
        internal EventQueueState GetQueueState(IComparable eventType)
        { 
            lock (SyncRoot) 
            {
                return GetQueue(eventType); 
            }
        }

        Activity caller; 

        internal Activity CallingActivity 
        { 
            get
            { 
                if (this.rootQueuingService != null)
                    return this.rootQueuingService.CallingActivity;
                return this.caller;
            } 
            set
            { 
                if (this.rootQueuingService != null) 
                    this.rootQueuingService.CallingActivity = value;
 
                this.caller = value;
            }
        }
 
        private bool QueueAsynchronousEvent(IComparable queueName, EventQueueState qState)
        { 
            if (qState.AsynchronousListeners.Count != 0 || IsNestedListenersExist(queueName)) 
            {
                Queue q = GetQueue(pendingNotification).Messages; 
                q.Enqueue(new KeyValuePair(queueName, qState));
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Queued delayed message notification for '{0}'", queueName.ToString());
                return q.Count == 1;
            } 
            return false;
        } 
 
        bool IsNestedListenersExist(IComparable queueName)
        { 
            for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
            {
                WorkflowQueuingService qService = messageArrivalEventHandlers[i];
                EventQueueState queueState = null; 

                if (qService.persistedQueueStates.TryGetValue(queueName, out queueState) && 
                    queueState.AsynchronousListeners.Count != 0) 
                return true;
            } 
            return false;
        }
        internal void ProcessesQueuedAsynchronousEvents()
        { 
            Queue q = GetQueue(pendingNotification).Messages;
            while (q.Count > 0) 
            { 
                KeyValuePair pair = (KeyValuePair)q.Dequeue();
                // notify message arrived subscribers 
                WorkflowTrace.Runtime.TraceInformation("Queuing Service: Processing delayed message notification '{0}'", pair.Key.ToString());
                for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
                {
                    WorkflowQueuingService service = this.messageArrivalEventHandlers[i]; 
                    if (service.persistedQueueStates.ContainsKey(pair.Key))
                    { 
                        EventQueueState qState = service.GetQueue(pair.Key); 
                        if (qState.Enabled)
                        { 
                            service.NotifyAsynchronousSubscribers(pair.Key, qState, 1);
                        }
                    }
                } 
                NotifyAsynchronousSubscribers(pair.Key, pair.Value, 1);
            } 
        } 

        internal void NotifyAsynchronousSubscribers(IComparable queueName, EventQueueState qState, int numberOfNotification) 
        {
            for (int i = 0; i < numberOfNotification; ++i)
            {
                QueueEventArgs args = new QueueEventArgs(queueName); 
                lock (SyncRoot)
                { 
                    foreach (ActivityExecutorDelegateInfo subscriber in qState.AsynchronousListeners) 
                    {
                        Activity contextActivity = rootWorkflowExecutor.GetContextActivityForId(subscriber.ContextId); 
                        Debug.Assert(contextActivity != null);
                        subscriber.InvokeDelegate(contextActivity, args, false);
                        WorkflowTrace.Runtime.TraceInformation("Queuing Service: Notifying async subscriber on queue:'{0}' activity:{1}", queueName.ToString(), subscriber.ActivityQualifiedName);
                    } 
                }
            } 
        } 

        ///  
        /// At termination/completion point, need to move messages from all queues to the pending queue
        /// 
        internal void MoveAllMessagesToPendingQueue()
        { 
            lock (SyncRoot)
            { 
                Queue pendingQueue = this.pendingQueueState.Messages; 
                foreach (EventQueueState queueState in this.persistedQueueStates.Values)
                { 
                    Queue queue = queueState.Messages;
                    while (queue.Count != 0)
                    {
                        pendingQueue.Enqueue(queue.Dequeue()); 
                    }
                } 
            } 
        }
 
       #endregion

        #region private root q service helpers
 
        private EventQueueState GetEventQueueState(IComparable queueName)
        { 
            EventQueueState queueState = GetQueue(queueName); 
            if (queueState.Dirty)
            { 
                string message =
                    string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { queueName });

                throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); 
            }
 
            return queueState; 
        }
 
        private void NewQueue(IComparable queueID, bool enabled, bool transactional)
        {
            WorkflowTrace.Runtime.TraceInformation("Queuing Service: Creating new Queue with ID {0} for {1}", queueID.GetHashCode(), queueID);
 
            if (this.persistedQueueStates.ContainsKey(queueID))
            { 
                object[] args = 
                    new object[] { System.Messaging.MessageQueueErrorCode.QueueExists, queueID };
                string message = 
                    string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);

                throw new QueueException(message, MessageQueueErrorCode.QueueExists);
            } 

            EventQueueState queueState = new EventQueueState(); 
            queueState.Enabled = enabled; 
            queueState.queueName = queueID;
            queueState.Transactional = transactional; 
            this.persistedQueueStates.Add(queueID, queueState);
        }

        internal EventQueueState GetQueue(IComparable queueID) 
        {
            EventQueueState queue; 
            if (this.persistedQueueStates.TryGetValue(queueID, out queue)) 
            {
                queue.queueName = queueID; 
                return queue;
            }

            object[] args = 
                new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueID };
            string message = 
                string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args); 

            throw new QueueException(message, MessageQueueErrorCode.QueueNotFound); 
        }

        internal IEnumerable QueueNames
        { 
            get
            { 
                List list = new List(this.persistedQueueStates.Keys); 
                foreach (IComparable name in list)
                { 
                    if (name is String && (String)name == pendingNotification)
                    {
                        list.Remove(name);
                        break; 
                    }
                } 
                return list; 
            }
        } 

        private void ApplyChangesFrom(EventQueueState srcPendingQueueState, Dictionary srcPersistedQueueStates)
        {
            lock (SyncRoot) 
            {
                Dictionary modifiedItems = new Dictionary(); 
 
                foreach (KeyValuePair mergeItem in srcPersistedQueueStates)
                { 
                    Debug.Assert(mergeItem.Value.Transactional, "Queue inside a transactional context is not transactional!");

                    if (mergeItem.Value.Transactional)
                    { 
                        if (this.persistedQueueStates.ContainsKey(mergeItem.Key))
                        { 
                            EventQueueState oldvalue = this.persistedQueueStates[mergeItem.Key]; 
                            if (!oldvalue.Dirty)
                            { 
                                // we could get here when there
                                // are conflicting create Qs
                                string message =
                                    string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { mergeItem.Key }); 

                                throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); 
                            } 
                        }
                        modifiedItems.Add(mergeItem.Key, mergeItem.Value); 
                    }
                }

                // no conflicts detected now make the updates visible 
                foreach (KeyValuePair modifiedItem in modifiedItems)
                { 
                    // shared queue in the root, swap out to new value 
                    // or add new item
                    this.persistedQueueStates[modifiedItem.Key] = modifiedItem.Value; 
                }

                this.pendingQueueState.CopyFrom(srcPendingQueueState);
            } 
        }
 
        // message arrival async notification 
        private void NotifyExternalSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance)
        { 
            NotifySynchronousSubscribers(queueName, qState, eventInstance);
            NotifyAsynchronousSubscribers(queueName, qState, 1);
        }
 
        private void NotifySynchronousSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance)
        { 
            QueueEventArgs args = new QueueEventArgs(queueName); 

            for (int i = 0; i < qState.SynchronousListeners.Count; ++i) 
            {
                if (qState.SynchronousListeners[i].HandlerDelegate != null)
                    qState.SynchronousListeners[i].HandlerDelegate(new WorkflowQueue(this, queueName), args);
                else 
                    qState.SynchronousListeners[i].EventListener.OnEvent(new WorkflowQueue(this, queueName), args);
            } 
        } 

        // returns a valid state only if transactional and entry exists 
        private EventQueueState MarkQueueDirtyIfTransactional(IComparable queueName)
        {
            lock (SyncRoot)
            { 
                Debug.Assert(this.rootQueuingService == null, "MarkQueueDirty should be done at root");
 
                if (!this.persistedQueueStates.ContainsKey(queueName)) 
                    return null;
 
                EventQueueState queueState = GetQueue(queueName);

                if (!queueState.Transactional)
                    return null; 

                if (queueState.Dirty) 
                    return queueState; // already marked 

                queueState.Dirty = true; 

                if (this.dirtyQueues == null)
                    this.dirtyQueues = new List();
 
                // add to the list of dirty queues
                this.dirtyQueues.Add(queueName); 
 
                return queueState;
            } 
        }

        private void AddMessageArrivedEventHandler(WorkflowQueuingService handler)
        { 
            lock (SyncRoot)
            { 
                if (this.messageArrivalEventHandlers == null) 
                    this.messageArrivalEventHandlers = new List();
                this.messageArrivalEventHandlers.Add(handler); 
            }
        }

        private void RemoveMessageArrivedEventHandler(WorkflowQueuingService handler) 
        {
            lock (SyncRoot) 
            { 
                if (this.messageArrivalEventHandlers != null)
                    this.messageArrivalEventHandlers.Remove(handler); 

                if (this.dirtyQueues != null)
                {
                    foreach (IComparable queueName in this.dirtyQueues) 
                    {
                        EventQueueState qState = GetQueue(queueName); 
                        qState.Dirty = false; 
                    }
                } 
            }
        }
       #endregion
 
        #region inner QueuingService functions
        private bool IsTransactionalQueue(IComparable queueName) 
        { 
            // check inner service for existense
            if (!this.persistedQueueStates.ContainsKey(queueName)) 
            {
                EventQueueState queueState = this.rootQueuingService.MarkQueueDirtyIfTransactional(queueName);

                if (queueState != null) 
                {
                    // if transactional proceed to the inner queue service 
                    // for this operation after adding the state 
                    EventQueueState snapshotState = new EventQueueState();
                    snapshotState.CopyFrom(queueState); 
                    this.persistedQueueStates.Add(queueName, snapshotState);
                    return true;
                }
 
                return false;
            } 
 
            return true; // if entry exits, it must be transactional
        } 

        private void SubscribeForRootMessageDelivery()
        {
            if (this.rootQueuingService == null) 
                return;
            this.rootQueuingService.AddMessageArrivedEventHandler(this); 
        } 

        private void UnSubscribeFromRootMessageDelivery() 
        {
            if (this.rootQueuingService == null)
                return;
            this.rootQueuingService.RemoveMessageArrivedEventHandler(this); 
        }
 
        // listen on its internal(parent) queuing service 
        // messages and pull messages. There is one parent queuing service visible to the external
        // host environment. A queueing service snapshot exists per atomic scope and external messages 
        // for existing queues need to be pushed through
        private void OnItemEnqueued(IComparable queueName, object item)
        {
            if (this.persistedQueueStates.ContainsKey(queueName)) 
            {
                // make the message visible to inner queueing service 
                EventQueueState qState = GetQueue(queueName); 
                if (!qState.Enabled)
                { 
                    object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName };
                    string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs);
                    throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
                } 
                qState.Messages.Enqueue(item);
                NotifyExternalSubscribers(queueName, qState, item); 
            } 
        }
 
        private void OnItemSafeEnqueued(IComparable queueName, object item)
        {
            if (this.persistedQueueStates.ContainsKey(queueName))
            { 
                // make the message visible to inner queueing service
                EventQueueState qState = GetQueue(queueName); 
                if (!qState.Enabled) 
                {
                    object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName }; 
                    string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs);
                    throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
                }
                qState.Messages.Enqueue(item); 
                NotifySynchronousSubscribers(queueName, qState, item);
            } 
        } 

        internal void Complete(bool commitSucceeded) 
        {
            if (commitSucceeded)
            {
                this.rootQueuingService.ApplyChangesFrom(this.pendingQueueState, this.persistedQueueStates); 
            }
 
            UnSubscribeFromRootMessageDelivery(); 
        }
        #endregion 

        #region Pre-persist and Post-persist helpers for queuing service states

        // Created for ref. 20575 
        internal void PostPersist(bool isPersistSuccessful)
        { 
            // If persist is unsuccessful, we'll undo the changes done 
            //   because of the call to .Complete() in PrePresist
            if (!isPersistSuccessful) 
            {
                Debug.Assert(rootWorkflowExecutor.CurrentAtomicActivity != null);
                Debug.Assert(pendingQueueStateSnapshot != null);
                Debug.Assert(persistedQueueStatesSnapshot != null); 

                TransactionalProperties transactionalProperties = rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties; 
                Debug.Assert(transactionalProperties != null); 

                // Restore queuing states and set root activity's dependency properties to the new values. 
                pendingQueueState = pendingQueueStateSnapshot;
                persistedQueueStates = persistedQueueStatesSnapshot;
                rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, persistedQueueStatesSnapshot);
                rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, pendingQueueStateSnapshot.Messages); 

                // Also call Subscribe...() because the .Complete() call called Unsubscribe 
                transactionalProperties.LocalQueuingService.SubscribeForRootMessageDelivery(); 
            }
 
            // The backups are no longer necessary.
            // The next call to PrePresistQueuingServiceState() will do a re-backup.
            persistedQueueStatesSnapshot = null;
            pendingQueueStateSnapshot = null; 
        }
 
        // Created for ref. 20575 
        internal void PrePersist()
        { 
            if (rootWorkflowExecutor.CurrentAtomicActivity != null)
            {
                // Create transactionalProperties from currentAtomicActivity
                TransactionalProperties transactionalProperties = this.rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties; 

                // Create backup snapshot of root queuing service's persistedQueuesStates 
                // qService.persistedQueueStates is changed when LocalQueuingService.Complete is called later. 
                persistedQueueStatesSnapshot = new Dictionary();
                foreach (KeyValuePair kv in persistedQueueStates) 
                {
                    EventQueueState individualPersistedQueueStateValue = new EventQueueState();
                    individualPersistedQueueStateValue.CopyFrom(kv.Value);
                    persistedQueueStatesSnapshot.Add(kv.Key, individualPersistedQueueStateValue); 
                }
 
                // Create backup snapshot of root queuing service's pendingQueueState 
                // qService.pendingQueueState is changed when LocalQueuingService.Complete is called later.
                pendingQueueStateSnapshot = new EventQueueState(); 
                pendingQueueStateSnapshot.CopyFrom(pendingQueueState);

                // Reconcile differences between root and local queuing services.
                transactionalProperties.LocalQueuingService.Complete(true); 
            }
        } 
 

        #endregion Pre-persist and post-persist helpers for queuing service states 
    }
}


// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK