DurableInstanceManager.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Activities / System / ServiceModel / Activities / Dispatcher / DurableInstanceManager.cs / 1407647 / DurableInstanceManager.cs

                            //---------------------------------------------------------------- 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//---------------------------------------------------------------

namespace System.ServiceModel.Activities.Dispatcher 
{
    using System.Activities.DurableInstancing; 
    using System.Activities.Persistence; 
    using System.Collections.Generic;
    using System.ComponentModel; 
    using System.Linq;
    using System.Runtime;
    using System.Runtime.DurableInstancing;
    using System.ServiceModel.Activities.Description; 
    using System.ServiceModel.Channels;
    using System.ServiceModel.Description; 
    using System.Threading; 
    using System.Transactions;
    using System.Xml.Linq; 
    using System.ServiceModel.Activation;

    sealed class DurableInstanceManager
    { 
        static AsyncCallback waitAndHandleStoreEventsCallback = Fx.ThunkCallback(new AsyncCallback(WaitAndHandleStoreEventsCallback));
 
        int state; 
        InstanceStore store;
        InstanceHandle handle; 
        InstanceOwner owner;
        CreateWorkflowOwnerCommand createOwnerCommand;
        object thisLock;
        IDictionary instanceMetadataChanges; 
        AsyncWaitHandle waitForStoreEventsLoop;
 
        internal DurableInstanceManager(WorkflowServiceHost host) 
        {
            DurableInstancingOptions = new DurableInstancingOptions(this); 
            this.createOwnerCommand = new CreateWorkflowOwnerCommand();
            this.instanceMetadataChanges = new Dictionary();
            this.thisLock = new object();
 
            // This is for collision detection.  Will replace with the real service name prior to executing.
            InstanceValue sentinel = new InstanceValue(XNamespace.Get("http://tempuri.org").GetName("Sentinel")); 
            this.createOwnerCommand.InstanceOwnerMetadata.Add(WorkflowNamespace.WorkflowHostType, sentinel); 
            this.instanceMetadataChanges.Add(WorkflowNamespace.WorkflowHostType, sentinel);
            this.instanceMetadataChanges.Add(PersistenceMetadataNamespace.InstanceType, new InstanceValue(WorkflowNamespace.WorkflowHostType, InstanceValueOptions.WriteOnly)); 

            this.Host = host;
        }
 
        WorkflowServiceHost Host { get; set; }
 
        internal PersistenceProviderDirectory PersistenceProviderDirectory { get; set; } 

        public DurableInstancingOptions DurableInstancingOptions { get; private set; } 

        public InstanceStore InstanceStore
        {
            get 
            {
                return this.store; 
            } 
            set
            { 
                ThrowIfDisposedOrImmutable(this.state);
                this.store = value;
            }
        } 

        public void AddInstanceOwnerValues(IDictionary readWriteValues, IDictionary writeOnlyValues) 
        { 
            ThrowIfDisposedOrImmutable(this.state);
 
            if (readWriteValues != null)
            {
                foreach (KeyValuePair property in readWriteValues)
                { 
                    if (this.createOwnerCommand.InstanceOwnerMetadata.ContainsKey(property.Key))
                    { 
                        throw FxTrace.Exception.Argument("readWriteValues", SR.ConflictingValueName(property.Key)); 
                    }
                    this.createOwnerCommand.InstanceOwnerMetadata.Add(property.Key, new InstanceValue(property.Value)); 
                }
            }

            if (writeOnlyValues != null) 
            {
                foreach (KeyValuePair property in writeOnlyValues) 
                { 
                    if (this.createOwnerCommand.InstanceOwnerMetadata.ContainsKey(property.Key))
                    { 
                        throw FxTrace.Exception.Argument("writeOnlyValues", SR.ConflictingValueName(property.Key));
                    }
                    this.createOwnerCommand.InstanceOwnerMetadata.Add(property.Key, new InstanceValue(property.Value,
                        InstanceValueOptions.Optional | InstanceValueOptions.WriteOnly)); 
                }
            } 
        } 

        public void AddInitialInstanceValues(IDictionary writeOnlyValues) 
        {
            ThrowIfDisposedOrImmutable(this.state);

            if (writeOnlyValues != null) 
            {
                foreach (KeyValuePair pair in writeOnlyValues) 
                { 
                    if (this.instanceMetadataChanges.ContainsKey(pair.Key))
                    { 
                        throw FxTrace.Exception.Argument("writeOnlyValues", SR.ConflictingValueName(pair.Key));
                    }
                    this.instanceMetadataChanges.Add(pair.Key, new InstanceValue(pair.Value, InstanceValueOptions.Optional | InstanceValueOptions.WriteOnly));
                } 
            }
        } 
 
        static void ThrowIfDisposedOrImmutable(int state)
        { 
            if (state == States.Aborted)
            {
                throw FxTrace.Exception.AsError(new CommunicationObjectAbortedException(SR.ServiceHostExtensionAborted));
            } 
            if (state == States.Closed)
            { 
                throw FxTrace.Exception.AsError(new ObjectDisposedException(typeof(DurableInstanceManager).Name)); 
            }
            if (state == States.Opened) 
            {
                throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ServiceHostExtensionImmutable));
            }
        } 

        static void ThrowIfClosedOrAborted(int state) 
        { 
            if (state == States.Aborted)
            { 
                throw FxTrace.Exception.AsError(new CommunicationObjectAbortedException(SR.ServiceHostExtensionAborted));
            }
            if (state == States.Closed)
            { 
                throw FxTrace.Exception.AsError(new ObjectDisposedException(typeof(DurableInstanceManager).Name));
            } 
        } 

        void InitializePersistenceProviderDirectory() 
        {
            WorkflowServiceBehavior workflowServiceBehavior = Host.Description.Behaviors.Find();
            Fx.Assert(workflowServiceBehavior != null, "WorkflowServiceBehavior must be present on WorkflowServiceHost.");
 
            int maxInstances = ServiceThrottlingBehavior.DefaultMaxConcurrentInstances;
            ServiceThrottlingBehavior serviceThrottlingBehavior = Host.Description.Behaviors.Find(); 
            if (serviceThrottlingBehavior != null) 
            {
                maxInstances = serviceThrottlingBehavior.MaxConcurrentInstances; 
            }

            if (InstanceStore != null)
            { 
                PersistenceProviderDirectory = new PersistenceProviderDirectory(InstanceStore, this.owner, this.instanceMetadataChanges, workflowServiceBehavior.Activity, Host, DurableConsistencyScope.Global, maxInstances);
            } 
            else 
            {
                PersistenceProviderDirectory = new PersistenceProviderDirectory(workflowServiceBehavior.Activity, Host, maxInstances); 
            }

            bool aborted;
            lock (this.thisLock) 
            {
                aborted = this.state == States.Aborted; 
            } 

            if (aborted) 
            {
                if (this.handle != null)
                {
                    this.handle.Free(); 
                }
 
                PersistenceProviderDirectory.Abort(); 
            }
 
            // Start listening to store event
            if (InstanceStore != null && !aborted)
            {
                this.waitForStoreEventsLoop = new AsyncWaitHandle(EventResetMode.ManualReset); 
                BeginWaitAndHandleStoreEvents(waitAndHandleStoreEventsCallback, this);
            } 
        } 

        IAsyncResult BeginWaitAndHandleStoreEvents(AsyncCallback callback, object state) 
        {
            return new WaitAndHandleStoreEventsAsyncResult(this, callback, state);
        }
 
        void EndWaitAndHandleStoreEvents(IAsyncResult result)
        { 
            WaitAndHandleStoreEventsAsyncResult.End(result); 
        }
 
        static void WaitAndHandleStoreEventsCallback(IAsyncResult result)
        {
            DurableInstanceManager thisPtr = (DurableInstanceManager)result.AsyncState;
            bool stop = false; 
            try
            { 
                thisPtr.EndWaitAndHandleStoreEvents(result); 
            }
            catch (OperationCanceledException exception) 
            {
                FxTrace.Exception.AsWarning(exception);

                // The OCE, bubbled to this layer, is only from store.BeginWaitForEvents. 
                // This indicates handle is freed by 1) normal closing sequence 2) store
                // is dead (eg. lock owner expired).  We will fault the host as well as 
                // cease the loop. 
                if (thisPtr.Host.State == CommunicationState.Opening || thisPtr.Host.State == CommunicationState.Opened)
                { 
                    thisPtr.Host.Fault(exception);
                }
                stop = true;
            } 
            catch (Exception exception)
            { 
                if (Fx.IsFatal(exception) || !thisPtr.HandleException(exception)) 
                {
                    throw; 
                }
            }

            // Continue 
            if (!stop && thisPtr.state == States.Opened)
            { 
                thisPtr.BeginWaitAndHandleStoreEvents(waitAndHandleStoreEventsCallback, thisPtr); 
            }
            else 
            {
                thisPtr.waitForStoreEventsLoop.Set();
            }
        } 

        bool HandleException(Exception exception) 
        { 
            if (exception is TimeoutException ||
                exception is OperationCanceledException || 
                exception is TransactionException ||
                exception is CommunicationObjectAbortedException ||
                // When abort raised by WorkflowServiceInstance
                exception is FaultException || 
                exception is InstancePersistenceException)
            { 
                FxTrace.Exception.AsWarning(exception); 
                this.Host.FaultServiceHostIfNecessary(exception);
                return true; 
            }
            return false;
        }
 
        void CheckPersistenceProviderBehavior()
        { 
            foreach (IServiceBehavior behavior in Host.Description.Behaviors) 
            {
                if (behavior.GetType().FullName == "System.ServiceModel.Description.PersistenceProviderBehavior") 
                {
                    throw FxTrace.Exception.AsError(new CommunicationException(SR.UseInstanceStoreInsteadOfPersistenceProvider));
                }
            } 
        }
 
        internal IAsyncResult BeginGetInstance(InstanceKey instanceKey, ICollection additionalKeys, WorkflowGetInstanceContext parameters, TimeSpan timeout, AsyncCallback callback, object state) 
        {
            ThrowIfClosedOrAborted(this.state); 
            return new GetInstanceAsyncResult(this, instanceKey, additionalKeys, parameters, timeout, callback, state);
        }

        internal IAsyncResult BeginGetInstance(Guid instanceId, WorkflowGetInstanceContext parameters, 
            TimeSpan timeout, AsyncCallback callback, object state)
        { 
            ThrowIfClosedOrAborted(this.state); 
            return new GetInstanceAsyncResult(this, instanceId, parameters, timeout, callback, state);
        } 

        internal WorkflowServiceInstance EndGetInstance(IAsyncResult result)
        {
            return GetInstanceAsyncResult.End(result); 
        }
 
        void AbortDirectory() 
        {
            lock (this.thisLock) 
            {
                if (this.state == States.Aborted)
                {
                    return; 
                }
                this.state = States.Aborted; 
            } 

            if (this.handle != null) 
            {
                this.handle.Free();
            }
 
            // PersistenceProviderDirectory is assigned on opened.  Abort could happen before (eg. after created)
            if (PersistenceProviderDirectory != null) 
            { 
                PersistenceProviderDirectory.Abort();
            } 
        }

        void SetDefaultOwnerMetadata()
        { 
            // Replace the sentinal value with the real scoping name here.
            this.createOwnerCommand.InstanceOwnerMetadata[WorkflowNamespace.WorkflowHostType] = new InstanceValue(Host.DurableInstancingOptions.ScopeName); 
            this.instanceMetadataChanges[WorkflowNamespace.WorkflowHostType] = new InstanceValue(Host.DurableInstancingOptions.ScopeName); 

            if (!this.instanceMetadataChanges.ContainsKey(WorkflowServiceNamespace.Service)) 
            {
                this.instanceMetadataChanges[WorkflowServiceNamespace.Service] = new InstanceValue(Host.ServiceName, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
            }
 
            // add instance metadata about all of our endpoints
            foreach (ServiceEndpoint endpoint in this.Host.Description.Endpoints) 
            { 
                if (endpoint.Name != null)
                { 
                    // treat the control endpoint as special
                    if (endpoint is WorkflowControlEndpoint)
                    {
                        if (!this.createOwnerCommand.InstanceOwnerMetadata.ContainsKey(WorkflowServiceNamespace.ControlEndpoint)) 
                        {
                            this.createOwnerCommand.InstanceOwnerMetadata.Add(WorkflowServiceNamespace.ControlEndpoint, new InstanceValue(endpoint.ListenUri)); 
                        } 
                    }
                    else 
                    {
                        XName endpointName = WorkflowServiceNamespace.EndpointsPath.GetName(endpoint.Name);
                        if (!this.createOwnerCommand.InstanceOwnerMetadata.ContainsKey(endpointName))
                        { 
                            this.createOwnerCommand.InstanceOwnerMetadata.Add(endpointName, new InstanceValue(endpoint.ListenUri));
                        } 
                    } 
                }
            } 

            // as well as additional metadata if we're hosted
            VirtualPathExtension virtualPathExtension = this.Host.Extensions.Find();
            if (virtualPathExtension != null && !this.instanceMetadataChanges.ContainsKey(PersistenceMetadataNamespace.ActivationType)) 
            {
                // Example values for various web-host properties 
                // SiteName: "Default Website" 
                // RelativeApplicationPath/ApplicationVirtualPath: "/myApp1"
                // Virtual Path: "~/ShoppingCartService/ShoppingCartService.xaml" 
                // Relative Service Path: "/myApp1/ShoppingCartService/ShoppingCartService.xaml"
                this.instanceMetadataChanges.Add(PersistenceMetadataNamespace.ActivationType, new InstanceValue(PersistenceMetadataNamespace.ActivationTypes.WAS, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional));

                // The remaining properties will get overritten if the user set them manually.  To control activation, the user should also set ActivationType, even if just to WAS. 
                this.instanceMetadataChanges[WorkflowServiceNamespace.SiteName] = new InstanceValue(virtualPathExtension.SiteName, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
                this.instanceMetadataChanges[WorkflowServiceNamespace.RelativeApplicationPath] = new InstanceValue(virtualPathExtension.ApplicationVirtualPath, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional); 
 
                string virtualPath = virtualPathExtension.VirtualPath.Substring(1);
                string relativePath = ("/" == virtualPathExtension.ApplicationVirtualPath) ? virtualPath : virtualPathExtension.ApplicationVirtualPath + virtualPath; 

                this.instanceMetadataChanges[WorkflowServiceNamespace.RelativeServicePath] = new InstanceValue(relativePath, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
            }
        } 

        public void Open(TimeSpan timeout) 
        { 
            Fx.Assert(Host != null, "Extension should have been attached in WorkflowServiceHost constructor.");
 
            lock (this.thisLock)
            {
                ThrowIfDisposedOrImmutable(this.state);
                this.state = States.Opened; 
            }
 
            CheckPersistenceProviderBehavior(); 

            SetDefaultOwnerMetadata(); 

            if (InstanceStore != null)
            {
                using (new TransactionScope(TransactionScopeOption.Suppress)) 
                {
                    TimeoutHelper timeoutHelper = new TimeoutHelper(timeout); 
                    InstanceHandle handle = null; 
                    try
                    { 
                        handle = InstanceStore.CreateInstanceHandle(null);
                        this.owner = InstanceStore.Execute(handle, this.createOwnerCommand, timeoutHelper.RemainingTime()).InstanceOwner;
                        this.handle = handle;
                        handle = null; 
                    }
                    catch (InstancePersistenceException exception) 
                    { 
                        throw FxTrace.Exception.AsError(new CommunicationException(SR.UnableToOpenAndRegisterStore, exception));
                    } 
                    finally
                    {
                        if (handle != null)
                        { 
                            handle.Free();
                        } 
                    } 
                }
            } 

            InitializePersistenceProviderDirectory();
        }
 
        public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            Fx.Assert(Host != null, "Extension should have been attached in WorkflowServiceHost constructor."); 

            using (new TransactionScope(TransactionScopeOption.Suppress)) 
            {
                return new OpenInstanceStoreAsyncResult(this, timeout, callback, state);
            }
        } 

        public void EndOpen(IAsyncResult result) 
        { 
            OpenInstanceStoreAsyncResult.End(result);
        } 

        public void Close(TimeSpan timeout)
        {
            // We normally would have a purely synchronous path for our synchronous 
            // overload, but PersistenceIOParticipant.OnBeginSave() doesn't have a synchronous counterpart.
            // Given that, at the very least we'd have to do PersistencePipeline.EndSave(PersistencePipeline.BeginSave). 
            // Therefore we resign ourselves to End(Begin) and take comfort in the unification of logic by not having two codepaths 
            CloseAsyncResult.End(new CloseAsyncResult(this, timeout, null, null));
        } 

        public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return new CloseAsyncResult(this, timeout, callback, state); 
        }
 
        public void EndClose(IAsyncResult result) 
        {
            CloseAsyncResult.End(result); 
        }

        public void Abort()
        { 
            AbortDirectory();
        } 
 
        static class States
        { 
            public const int Created = 0;
            public const int Opened = 1;
            public const int Closed = 2;
            public const int Aborted = 3; 
        }
 
        class OpenInstanceStoreAsyncResult : AsyncResult 
        {
            static AsyncCompletion handleEndExecute = new AsyncCompletion(HandleEndExecute); 
            static Action onFinally = new Action(OnFinally);

            DurableInstanceManager instanceManager;
            TimeoutHelper timeoutHelper; 
            InstanceHandle handle;
 
            public OpenInstanceStoreAsyncResult(DurableInstanceManager instanceManager, TimeSpan timeout, AsyncCallback callback, object state) 
                : base(callback, state)
            { 
                this.instanceManager = instanceManager;
                this.timeoutHelper = new TimeoutHelper(timeout);

                lock (this.instanceManager.thisLock) 
                {
                    DurableInstanceManager.ThrowIfDisposedOrImmutable(this.instanceManager.state); 
                    this.instanceManager.state = States.Opened; 
                }
 
                instanceManager.CheckPersistenceProviderBehavior();

                this.instanceManager.SetDefaultOwnerMetadata();
 
                this.OnCompleting = OpenInstanceStoreAsyncResult.onFinally;
 
                bool completeSelf; 
                Exception completionException = null;
                try 
                {
                    if (instanceManager.InstanceStore == null)
                    {
                        completeSelf = CreateDirectory(); 
                    }
                    else 
                    { 
                        this.handle = this.instanceManager.InstanceStore.CreateInstanceHandle(null);
                        IAsyncResult executeResult = this.instanceManager.InstanceStore.BeginExecute(this.handle, 
                            this.instanceManager.createOwnerCommand, this.timeoutHelper.RemainingTime(),
                            this.PrepareAsyncCompletion(OpenInstanceStoreAsyncResult.handleEndExecute), this);
                        completeSelf = SyncContinue(executeResult);
                    } 
                }
                catch (Exception exception) 
                { 
                    if (Fx.IsFatal(exception))
                    { 
                        throw;
                    }

                    completionException = exception; 
                    completeSelf = true;
                } 
                if (completeSelf) 
                {
                    Complete(true, completionException); 
                }
            }

            static bool HandleEndExecute(IAsyncResult result) 
            {
                OpenInstanceStoreAsyncResult thisPtr = (OpenInstanceStoreAsyncResult)result.AsyncState; 
 
                thisPtr.instanceManager.owner = thisPtr.instanceManager.InstanceStore.EndExecute(result).InstanceOwner;
 
                return thisPtr.CreateDirectory();
            }

            static void OnFinally(AsyncResult result, Exception exception) 
            {
                if (exception != null) 
                { 
                    try
                    { 
                        if (exception is InstancePersistenceException)
                        {
                            throw FxTrace.Exception.AsError(new CommunicationException(SR.UnableToOpenAndRegisterStore, exception));
                        } 
                    }
                    finally 
                    { 
                        OpenInstanceStoreAsyncResult thisPtr = (OpenInstanceStoreAsyncResult)result;
                        if (thisPtr.handle != null) 
                        {
                            thisPtr.handle.Free();
                        }
                    } 
                }
            } 
 
            public static void End(IAsyncResult result)
            { 
                AsyncResult.End(result);
            }

            bool CreateDirectory() 
            {
                this.instanceManager.InitializePersistenceProviderDirectory(); 
                this.instanceManager.handle = this.handle; 
                this.handle = null;
                return true; 
            }
        }

        class CloseAsyncResult : AsyncResult 
        {
            static AsyncCompletion handleEndReleaseInstance = new AsyncCompletion(HandleEndReleaseInstance); 
            static AsyncCompletion handleEndExecute = new AsyncCompletion(HandleEndExecute); 
            static Action handleWaitForStoreEvents = new Action(HandleWaitForStoreEvents);
 
            TimeoutHelper timeoutHelper;
            DurableInstanceManager instanceManager;
            IEnumerator workflowServiceInstances;
            WorkflowServiceInstance currentInstance; 
            InstanceHandle handle;
 
            public CloseAsyncResult(DurableInstanceManager instanceManager, TimeSpan timeout, AsyncCallback callback, object state) 
                : base(callback, state)
            { 
                this.instanceManager = instanceManager;
                this.timeoutHelper = new TimeoutHelper(timeout);

                if (this.instanceManager.state == States.Opened && this.instanceManager.handle != null) 
                {
                    // Note: since we change state before actual openning, this may 
                    // get NullRef (---- already exists in other places) if Close 
                    // is called on an unsuccessful or incompleted opened DIM.
                    // Assuming it is a non supported scenario. 
                    this.instanceManager.handle.Free();
                    if (WaitForStoreEventsLoop())
                    {
                        Complete(true); 
                    }
                } 
                else 
                {
                    if (PerformClose()) 
                    {
                        Complete(true);
                    }
                } 
            }
 
            bool PerformClose() 
            {
                bool closed; 
                bool opened;
                bool aborted;

                lock (this.instanceManager.thisLock) 
                {
                    closed = this.instanceManager.state == States.Closed; 
                    opened = this.instanceManager.state == States.Opened; 
                    aborted = this.instanceManager.state == States.Aborted;
                    if (opened) 
                    {
                        this.instanceManager.state = States.Closed;
                    }
                } 

                if (closed) 
                { 
                    return true;
                } 
                if (!opened)
                {
                    if (!aborted)
                    { 
                        this.instanceManager.AbortDirectory();
                    } 
 
                    // We cannot throw here if the DurableInstanceManager is already aborted since service host could
                    // be aborted due to a timeout exception. Simply return here 
                    return true;
                }

                this.workflowServiceInstances = this.instanceManager.PersistenceProviderDirectory.GetContexts().GetEnumerator(); 
                return Process();
            } 
 
            bool WaitForStoreEventsLoop()
            { 
                // Event never get initialized, meaning we have not started the WaitForStoreEvents loop
                if (this.instanceManager.waitForStoreEventsLoop == null
                    || this.instanceManager.waitForStoreEventsLoop.WaitAsync(handleWaitForStoreEvents, this, this.timeoutHelper.RemainingTime()))
                { 
                    return PerformClose();
                } 
                else 
                {
                    return false; 
                }
            }

            static void HandleWaitForStoreEvents(object state, TimeoutException exception) 
            {
                CloseAsyncResult thisPtr = (CloseAsyncResult)state; 
                if (exception != null) 
                {
                    thisPtr.Complete(false, exception); 
                    return;
                }

                bool completeSelf = false; 
                Exception completionException = null;
 
                try 
                {
                    completeSelf = thisPtr.PerformClose(); 
                }
                catch (Exception e)
                {
                    if (Fx.IsFatal(e)) 
                    {
                        throw; 
                    } 
                    completionException = exception;
                    completeSelf = true; 
                }

                if (completeSelf)
                { 
                    thisPtr.Complete(false, completionException);
                } 
            } 

            bool Process() 
            {
                while (this.workflowServiceInstances.MoveNext())
                {
                    this.currentInstance = this.workflowServiceInstances.Current.GetInstance(null); 
                    if (this.currentInstance != null)
                    { 
                        IAsyncResult result = this.currentInstance.BeginReleaseInstance( 
                            false,
                            this.timeoutHelper.RemainingTime(), 
                            this.PrepareAsyncCompletion(CloseAsyncResult.handleEndReleaseInstance),
                            this);
                        if (!CheckSyncContinue(result))
                        { 
                            return false;
                        } 
                        this.currentInstance.EndReleaseInstance(result); 
                    }
                } 

                //cleanup any buffered receives unassociated with workflowServiceInstances
                BufferedReceiveManager bufferedReceiveManager = this.instanceManager.Host.Extensions.Find();
                if (bufferedReceiveManager != null) 
                {
                    bufferedReceiveManager.AbandonBufferedReceives(); 
                } 

                // Send the DeleteWorkflowOwner command to the instance store. 
                if (this.instanceManager.InstanceStore != null)
                {
                    IAsyncResult executeResult = null;
                    this.handle = this.instanceManager.InstanceStore.CreateInstanceHandle(this.instanceManager.owner); 
                    try
                    { 
                        executeResult = this.instanceManager.InstanceStore.BeginExecute(this.handle, 
                            new DeleteWorkflowOwnerCommand(), this.timeoutHelper.RemainingTime(),
                            this.PrepareAsyncCompletion(CloseAsyncResult.handleEndExecute), this); 
                    }
                    // Ignore some exceptions because DeleteWorkflowOwner is best effort.
                    catch (InstancePersistenceCommandException) { }
                    catch (InstanceOwnerException) { } 
                    catch (OperationCanceledException) { }
                    finally 
                    { 
                        if (executeResult == null)
                        { 
                            this.handle.Free();
                            this.handle = null;
                        }
                    } 
                    return this.SyncContinue(executeResult);
                } 
                else 
                {
                    return this.CloseProviderDirectory(); 
                }
            }

            static bool HandleEndReleaseInstance(IAsyncResult result) 
            {
                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState; 
                thisPtr.currentInstance.EndReleaseInstance(result); 
                return thisPtr.Process();
            } 

            bool CloseProviderDirectory()
            {
                bool success = false; 
                try
                { 
                    this.instanceManager.PersistenceProviderDirectory.Close(); 
                    success = true;
                } 
                finally
                {
                    if (!success)
                    { 
                        this.instanceManager.AbortDirectory();
                    } 
                } 
                return true;
            } 

            static bool HandleEndExecute(IAsyncResult result)
            {
                CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState; 

                try 
                { 
                    thisPtr.instanceManager.owner = thisPtr.instanceManager.InstanceStore.EndExecute(result).InstanceOwner;
                } 
                // Ignore some exceptions because DeleteWorkflowOwner is best effort.
                catch (InstancePersistenceCommandException) { }
                catch (InstanceOwnerException) { }
                catch (OperationCanceledException) { } 
                finally
                { 
                    thisPtr.handle.Free(); 
                    thisPtr.handle = null;
                } 

                return thisPtr.CloseProviderDirectory();
            }
 
            public static void End(IAsyncResult result)
            { 
                AsyncResult.End(result); 
            }
        } 

        // Need to ensure that any failure in the methods of GetInstanceAsyncResult after a WorkflowServiceInstance has been acquired
        // results in one of three outcomes, namely :
        // - the WorkflowServiceInstance is set to null 
        // - the WorkflowServiceInstance is aborted
        // - ReleaseReference is called on the WorkflowServiceInstance to ensure that unload happens 
        //   (ultimately resulting in the WorkflowServiceInstance being aborted) 
        // This is to prevent leaking WorkflowServiceInstances since nothing else has a handle to the WorkflowServiceInstance in those
        // scenarios. 

        class GetInstanceAsyncResult : AsyncResult
        {
            static AsyncCompletion handleEndAcquireReference = new AsyncCompletion(HandleEndAcquireReference); 
            static AsyncCompletion handleEndLoad = new AsyncCompletion(HandleEndLoad);
            static AsyncCompletion handleAssociateInfrastructureKeys = new AsyncCompletion(HandleAssociateInfrastructureKeys); 
            static AsyncCompletion handleCommit = new AsyncCompletion(HandleCommit); 
            static AsyncCompletion handleEndEnlistContext = new AsyncCompletion(HandleEndEnlistContext);
            static Action onCompleting = new Action(Finally); 

            DurableInstanceManager instanceManager;
            Guid instanceId;
            InstanceKey instanceKey; 
            ICollection additionalKeys;
            TimeSpan timeout; 
            WorkflowServiceInstance durableInstance; 
            bool referenceAcquired;
            PersistenceContext persistenceContext; 
            WorkflowGetInstanceContext parameters;
            DependentTransaction transaction;
            CommittableTransaction committableTransaction;
            bool loadAny; 

            public GetInstanceAsyncResult(DurableInstanceManager instanceManager, InstanceKey instanceKey, ICollection additionalKeys, WorkflowGetInstanceContext parameters, 
                TimeSpan timeout, AsyncCallback callback, object state) 
                : this(instanceManager, parameters, timeout, callback, state)
            { 
                Fx.Assert(instanceKey != null, "Instance key must be set.");

                this.instanceKey = instanceKey;
                this.additionalKeys = additionalKeys; 

                if (this.GetInstance()) 
                { 
                    this.Complete(true);
                } 
            }

            public GetInstanceAsyncResult(DurableInstanceManager instanceManager, Guid instanceId, WorkflowGetInstanceContext parameters,
                TimeSpan timeout, AsyncCallback callback, object state) 
                : this(instanceManager, parameters, timeout, callback, state)
            { 
                this.instanceId = instanceId; 

                if (this.GetInstance()) 
                {
                    this.Complete(true);
                }
            } 

            GetInstanceAsyncResult(DurableInstanceManager instanceManager, WorkflowGetInstanceContext parameters, 
                TimeSpan timeout, AsyncCallback callback, object state) 
                : base(callback, state)
            { 
                this.instanceManager = instanceManager;
                this.parameters = parameters;
                this.timeout = timeout;
                this.loadAny = parameters == null; 
                this.OnCompleting = onCompleting;
 
                Transaction currentTransaction = Transaction.Current; 
                if (currentTransaction == null && this.instanceManager.Host.IsLoadTransactionRequired)
                { 
                    this.committableTransaction = new CommittableTransaction(this.timeout);
                    currentTransaction = committableTransaction;
                }
                if (currentTransaction != null) 
                {
                    this.transaction = currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete); 
                } 
            }
 
            public static WorkflowServiceInstance End(IAsyncResult result)
            {
                return AsyncResult.End(result).durableInstance;
            } 

            bool TryAcquire(bool fromCache) 
            { 
                this.durableInstance = this.persistenceContext.GetInstance(this.parameters);
 
                if (!fromCache)
                {
                    this.referenceAcquired = true;
                    return AssociateKeys(); 
                }
 
                IAsyncResult nextResult = this.durableInstance.BeginTryAcquireReference(this.timeout, this.PrepareAsyncCompletion(handleEndAcquireReference), this); 
                return SyncContinue(nextResult);
            } 

            static bool HandleEndAcquireReference(IAsyncResult result)
            {
                GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState; 

                if (thisPtr.durableInstance.EndTryAcquireReference(result)) 
                { 
                    thisPtr.referenceAcquired = true;
                    return thisPtr.TryEnlistContext(); 
                }
                else
                {
                    //We have to re-dispense this Durable Instance this is not usable. 
                    thisPtr.referenceAcquired = false;
                    thisPtr.durableInstance = null; 
                    return thisPtr.GetInstance(); 
                }
            } 

            bool TryEnlistContext()
            {
                IAsyncResult enlistResult = null; 
                bool tryAgain = false;
 
                // We need to enlist for the transaction. This call will wait until 
                // we obtain the transaction lock on the PersistenceContext, too. If there is no current transaction, this call
                // will still wait to get the transaction lock, but we not create an enlistment. 
                using (PrepareTransactionalCall(this.transaction))
                {
                    try
                    { 
                        enlistResult = this.persistenceContext.BeginEnlist(this.timeout, PrepareAsyncCompletion(handleEndEnlistContext), this);
                    } 
                    catch (ObjectDisposedException) 
                    {
                        tryAgain = true; 
                    }
                    catch (CommunicationObjectAbortedException)
                    {
                        throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason)); 
                    }
                } 
 
                if (tryAgain)
                { 
                    this.referenceAcquired = false;
                    this.durableInstance = null;
                    return this.GetInstance();
                } 
                else
                { 
                    return SyncContinue(enlistResult); 
                }
            } 

            static bool HandleEndEnlistContext(IAsyncResult result)
            {
                GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState; 

                // 
 
                try
                { 
                    thisPtr.persistenceContext.EndEnlist(result);
                }
                catch (ObjectDisposedException)
                { 
                    // It's possible that the PersistenceContext was closed and removed from the cache
                    // while we were queued up for it. In that situation, this call to EndEnlist will 
                    // throw an ObjectDisposedException because the PersistenceContext is in the closed 
                    // state. If that happens, we need to try the load again from the beginning.
                    thisPtr.referenceAcquired = false; 
                    thisPtr.durableInstance = null;
                    return thisPtr.GetInstance();
                }
                catch (CommunicationObjectAbortedException) 
                {
                    throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason)); 
                } 

                return thisPtr.AssociateKeys(); 
            }

            bool GetInstance()
            { 
                IAsyncResult nextResult = null;
 
                if (!this.loadAny && this.parameters.CanCreateInstance) 
                {
                    if (this.instanceKey != null && this.instanceKey.IsValid) 
                    {
                        nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoadOrCreate(
                            this.instanceKey, Guid.Empty, this.additionalKeys, this.transaction,
                            this.timeout, PrepareAsyncCompletion(handleEndLoad), this); 
                    }
                    else 
                    { 
                        // Either invalid key (new instance) or lookup by instance ID.
                        nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoadOrCreate( 
                            this.instanceId, this.additionalKeys, this.transaction,
                            this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
                    }
                } 
                else
                { 
                    if (this.instanceKey != null) 
                    {
                        nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoad( 
                            this.instanceKey, this.additionalKeys, this.transaction,
                            this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
                    }
                    else 
                    {
                        nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoad( 
                            this.instanceId, null, this.transaction, this.loadAny, 
                            this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
                    } 
                }
                return SyncContinue(nextResult);
            }
 
            bool AssociateKeys()
            { 
                if (this.additionalKeys != null && this.additionalKeys.Count > 0) 
                {
                    IAsyncResult result; 
                    try
                    {
                        result = this.durableInstance.BeginAssociateInfrastructureKeys(this.additionalKeys, this.transaction, this.timeout,
                                PrepareAsyncCompletion(handleAssociateInfrastructureKeys), this); 
                    }
                    catch (Exception exception) 
                    { 
                        if (Fx.IsFatal(exception))
                        { 
                            throw;
                        }
                        this.persistenceContext.Abort();
                        throw; 
                    }
 
                    return SyncContinue(result); 
                }
                else 
                {
                    return CommitTransaction();
                }
            } 

            static bool HandleEndLoad(IAsyncResult result) 
            { 
                GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 
                PersistenceContext previousPersistenceContext = thisPtr.persistenceContext;
                bool fromCache;
                if (!thisPtr.loadAny && thisPtr.parameters.CanCreateInstance)
                { 
                    thisPtr.persistenceContext = thisPtr.instanceManager.PersistenceProviderDirectory.EndLoadOrCreate(result, out fromCache);
                } 
                else 
                {
                    thisPtr.persistenceContext = thisPtr.instanceManager.PersistenceProviderDirectory.EndLoad(result, out fromCache); 
                }
                Fx.AssertAndThrow(previousPersistenceContext != thisPtr.persistenceContext, "PPD should not load same PersistenceContext for the same GetInstanceAsyncResult!");
                return thisPtr.TryAcquire(fromCache);
            } 

            static bool HandleAssociateInfrastructureKeys(IAsyncResult result) 
            { 
                GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 
                try
                {
                    thisPtr.durableInstance.EndAssociateInfrastructureKeys(result);
                } 
                catch (Exception exception)
                { 
                    if (Fx.IsFatal(exception)) 
                    {
                        throw; 
                    }
                    thisPtr.persistenceContext.Abort();
                    throw;
                } 

                return thisPtr.CommitTransaction(); 
            } 

            bool CommitTransaction() 
            {
                if (this.transaction != null)
                {
                    this.transaction.Complete(); 
                }
                if (this.committableTransaction != null) 
                { 
                    IAsyncResult result = this.committableTransaction.BeginCommit(PrepareAsyncCompletion(handleCommit), this);
                    return SyncContinue(result); 
                }
                else
                {
                    return true; 
                }
            } 
 
            static bool HandleCommit(IAsyncResult result)
            { 
                GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
                thisPtr.committableTransaction.EndCommit(result);
                thisPtr.committableTransaction = null;
                return true; 
            }
 
            static void Finally(AsyncResult result, Exception exception) 
            {
                GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result; 

                if (thisPtr.committableTransaction != null)
                {
                    Fx.Assert(exception != null, "Shouldn't get here in the success case."); 

                    try 
                    { 
                        thisPtr.committableTransaction.Rollback(exception);
                    } 
                    catch (Exception rollbackException)
                    {
                        if (Fx.IsFatal(rollbackException))
                        { 
                            throw;
                        } 
                        FxTrace.Exception.AsWarning(rollbackException); 
                    }
                } 

                // Reference is acquired on an instance but we fail perform subsequent task before
                // return an instance to the client (Tx Enlist timeout).  We are responsible to
                // release the reference.  We don't need to worry about Aborted or other State (has 
                // no effect on ref counting).
                if (thisPtr.referenceAcquired && exception != null) 
                { 
                    Fx.Assert(thisPtr.durableInstance != null, "durableInstance must not be null!");
                    thisPtr.durableInstance.ReleaseReference(); 
                }
            }
        }
 
        // This async result waits for store events and handle them (currently only support HasRunnableWorkflowEvent).
        // It is intended to always complete async to simplify caller usage. 
        // 1) no code to handle [....] completion. 
        // 2) recursive call will be safe from StackOverflow.
        // For simplicity, we handle (load/run) each event one-by-one. 
        // We ---- certain set of exception (see HandleException).  Other will crash the process.
        // InvalidOperation is also handled due to TryLoadRunnableWorkflowCommand could fail if ---- with other hosts.
        class WaitAndHandleStoreEventsAsyncResult : AsyncResult
        { 
            static Action waitAndHandleStoreEvents = new Action(WaitAndHandleStoreEvents);
            static AsyncCompletion handleEndWaitForStoreEvents = new AsyncCompletion(HandleEndWaitForStoreEvents); 
            static AsyncCompletion handleEndGetInstance = new AsyncCompletion(HandleEndGetInstance); 
            static AsyncCompletion handleEndRunInstance = new AsyncCompletion(HandleEndRunInstance);
 
            DurableInstanceManager instanceManager;
            IEnumerator events;
            WorkflowServiceInstance currentInstance;
 
            public WaitAndHandleStoreEventsAsyncResult(DurableInstanceManager instanceManager, AsyncCallback callback, object state)
                : base(callback, state) 
            { 
                this.instanceManager = instanceManager;
                ActionItem.Schedule(waitAndHandleStoreEvents, this); 
            }

            public static void End(IAsyncResult result)
            { 
                AsyncResult.End(result);
            } 
 
            static void WaitAndHandleStoreEvents(object state)
            { 
                WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)state;

                bool completeSelf;
                Exception completionException = null; 
                try
                { 
                    completeSelf = thisPtr.WaitForStoreEvents(); 
                }
                catch (Exception exception) 
                {
                    if (Fx.IsFatal(exception))
                    {
                        throw; 
                    }
 
                    completionException = exception; 
                    completeSelf = true;
                } 

                if (completeSelf)
                {
                    thisPtr.Complete(false, completionException); 
                }
            } 
 
            bool WaitForStoreEvents()
            { 
                // Defense in depth with a predefined timeout
                IAsyncResult result = this.instanceManager.InstanceStore.BeginWaitForEvents(this.instanceManager.handle,
                    TimeSpan.FromSeconds(600), PrepareAsyncCompletion(handleEndWaitForStoreEvents), this);
                return SyncContinue(result); 
            }
 
            static bool HandleEndWaitForStoreEvents(IAsyncResult result) 
            {
                WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)result.AsyncState; 
                thisPtr.events = thisPtr.instanceManager.InstanceStore.EndWaitForEvents(result).GetEnumerator();
                return thisPtr.HandleStoreEvents();
            }
 
            bool HandleStoreEvents()
            { 
                if (!this.events.MoveNext()) 
                {
                    return true; 
                }

                InstancePersistenceEvent currentEvent = this.events.Current;
                if (currentEvent.Name == HasRunnableWorkflowEvent.Value.Name) 
                {
                    try 
                    { 
                        IAsyncResult result = this.instanceManager.BeginGetInstance(Guid.Empty, null, this.instanceManager.Host.PersistTimeout,
                            PrepareAsyncCompletion(handleEndGetInstance), this); 
                        return SyncContinue(result);
                    }
                    catch (Exception exception)
                    { 
                        if (Fx.IsFatal(exception) || !this.instanceManager.HandleException(exception))
                        { 
                            throw; 
                        }
                    } 
                }
                else
                {
                    Fx.AssertAndThrow("Unknown InstancePersistenceEvent (" + currentEvent.Name + ")!"); 
                }
                return HandleStoreEvents(); 
            } 

            static bool HandleEndGetInstance(IAsyncResult result) 
            {
                WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)result.AsyncState;
                try
                { 
                    thisPtr.currentInstance = thisPtr.instanceManager.EndGetInstance(result);
                    return thisPtr.RunInstance(); 
                } 
                catch (Exception exception)
                { 
                    if (Fx.IsFatal(exception) || !thisPtr.instanceManager.HandleException(exception))
                    {
                        throw;
                    } 
                }
                return thisPtr.HandleStoreEvents(); 
            } 

            bool RunInstance() 
            {
                try
                {
                    IAsyncResult result = this.currentInstance.BeginRun(null, TimeSpan.MaxValue, PrepareAsyncCompletion(handleEndRunInstance), this); 
                    return SyncContinue(result);
                } 
                catch (Exception exception) 
                {
                    if (Fx.IsFatal(exception)) 
                    {
                        throw;
                    }
                    if (this.currentInstance != null) 
                    {
                        this.currentInstance.ReleaseReference(); 
                        this.currentInstance = null; 
                    }
                    if (!this.instanceManager.HandleException(exception)) 
                    {
                        throw;
                    }
                } 
                return HandleStoreEvents();
            } 
 
            static bool HandleEndRunInstance(IAsyncResult result)
            { 
                WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)result.AsyncState;
                try
                {
                    thisPtr.currentInstance.EndRun(result); 
                }
                catch (Exception exception) 
                { 
                    if (Fx.IsFatal(exception) || !thisPtr.instanceManager.HandleException(exception))
                    { 
                        throw;
                    }
                }
                finally 
                {
                    thisPtr.currentInstance.ReleaseReference(); 
                    thisPtr.currentInstance = null; 
                }
                return thisPtr.HandleStoreEvents(); 
            }
        }
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK