Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.Activities / System / Activities / WorkflowApplication.cs / 1305376 / WorkflowApplication.cs
//------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//-----------------------------------------------------------------------------
namespace System.Activities
{
using System;
using System.Activities.Hosting;
using System.Activities.DurableInstancing;
using System.Activities.Persistence;
using System.Activities.Runtime;
using System.Activities.Tracking;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.ComponentModel;
using System.Linq;
using System.Runtime;
using System.Runtime.DurableInstancing;
using System.Runtime.Serialization;
using System.Threading;
using System.Transactions;
using System.Xml.Linq;
// WorkflowApplication is free-threaded. It is responsible for the correct locking and usage of the ActivityExecutor.
// Given that there are two simultaneous users of ActivityExecutor (WorkflowApplication and NativeActivityContext),
// it is imperative that WorkflowApplication only calls into ActivityExecutor when there are no activities executing
// (and thus no worries about colliding with AEC calls).
// SYNCHRONIZATION SCHEME
// WorkflowInstance is defined to not be thread safe and to disallow all operations while it is (potentially
// asynchronously) running. The WorkflowInstance is in the "running" state between a call to Run and the
// subsequent call to either WorkflowInstance NotifyPaused or NotifyUnhandledException.
// WorkflowApplication keeps track of a boolean "isBusy" and a list of pending operations. WI is busy whenever
// it is servicing an operation or the runtime is in the "running" state.
// Enqueue - This enqueues an operation into the pending operation list. If WI is not busy then the operation
// can be serviced immediately. This is the only place where "isBusy" flips to true.
// OnNotifiedUnhandledException - This method performs some processing and then calls OnNotifiedPaused.
// OnNotifiedPaused - This method is only ever called when "isBusy" is true. It first checks to see if there
// is other work to be done (prioritization: raise completed, handle an operation, resume execution, raise
// idle, stop). This is the only place where "isBusy" flips to false and this only occurs when there is no
// other work to be done.
// [Force]NotifyOperationComplete - These methods are called by individual operations when they are done
// processing. If the operation was notified (IE - actually performed in the eyes of WI) then this is simply
// a call to OnNotifiedPaused.
// Operation notification - The InstanceOperation class keeps tracks of whether a specified operation was
// dispatched by WI or not. If it was dispatched (determined either in Enqueue, FindOperation, or Remove)
// then it MUST result in a call to OnNotifiedPaused when complete.
[Fx.Tag.XamlVisible(false)]
public sealed class WorkflowApplication : WorkflowInstance
{
static AsyncCallback eventFrameCallback;
static IdleEventHandler idleHandler;
static CompletedEventHandler completedHandler;
static UnhandledExceptionEventHandler unhandledExceptionHandler;
static Action waitAsyncCompleteCallback;
Action onAborted;
Action onUnloaded;
Action onCompleted;
Func onUnhandledException;
Func onPersistableIdle;
Action onIdle;
WorkflowEventData eventData;
WorkflowInstanceExtensionManager extensions;
PersistencePipeline persistencePipelineInUse;
InstanceStore instanceStore;
PersistenceManager persistenceManager;
WorkflowApplicationState state;
int handlerThreadId;
bool isInHandler;
Action invokeCompletedCallback;
Guid instanceId;
bool instanceIdSet; // Checking for Guid.Empty is expensive.
// Tracking for one-time actions per in-memory pulse
bool hasCalledAbort;
bool hasCalledRun;
// Tracking for one-time actions per instance lifetime (these end up being persisted)
bool hasRaisedCompleted;
Quack pendingOperations;
bool isBusy;
bool hasExecutionOccurredSinceLastIdle;
// We use this to keep track of the number of "interesting" things that have happened.
// Notifying operations and calling Run on the runtime count as interesting things.
// All operations are stamped with the actionCount at the time of being enqueued.
int actionCount;
// Initial creation data
IDictionary initialWorkflowArguments;
IList rootExecutionProperties;
IDictionary instanceMetadata;
public WorkflowApplication(Activity workflowDefinition)
: base(workflowDefinition)
{
this.pendingOperations = new Quack();
Fx.Assert(this.state == WorkflowApplicationState.Paused, "We always start out paused (the default)");
}
public WorkflowApplication(Activity workflowDefinition, IDictionary inputs)
: this(workflowDefinition)
{
if (inputs == null)
{
throw FxTrace.Exception.ArgumentNull("inputs");
}
this.initialWorkflowArguments = inputs;
}
WorkflowApplication(Activity workflowDefinition, IDictionary inputs, IList executionProperties)
: this(workflowDefinition)
{
this.initialWorkflowArguments = inputs;
this.rootExecutionProperties = executionProperties;
}
public InstanceStore InstanceStore
{
get
{
return this.instanceStore;
}
set
{
ThrowIfReadOnly();
this.instanceStore = value;
}
}
public WorkflowInstanceExtensionManager Extensions
{
get
{
if (this.extensions == null)
{
this.extensions = new WorkflowInstanceExtensionManager();
if (base.IsReadOnly)
{
this.extensions.MakeReadOnly();
}
}
return this.extensions;
}
}
public Action Aborted
{
get
{
return this.onAborted;
}
set
{
ThrowIfMulticast(value);
this.onAborted = value;
}
}
public Action Unloaded
{
get
{
return this.onUnloaded;
}
set
{
ThrowIfMulticast(value);
this.onUnloaded = value;
}
}
public Action Completed
{
get
{
return this.onCompleted;
}
set
{
ThrowIfMulticast(value);
this.onCompleted = value;
}
}
public Func OnUnhandledException
{
get
{
return this.onUnhandledException;
}
set
{
ThrowIfMulticast(value);
this.onUnhandledException = value;
}
}
public Action Idle
{
get
{
return this.onIdle;
}
set
{
ThrowIfMulticast(value);
this.onIdle = value;
}
}
public Func PersistableIdle
{
get
{
return this.onPersistableIdle;
}
set
{
ThrowIfMulticast(value);
this.onPersistableIdle = value;
}
}
public override Guid Id
{
get
{
if (!this.instanceIdSet)
{
lock (this.pendingOperations)
{
if (!this.instanceIdSet)
{
this.instanceId = Guid.NewGuid();
this.instanceIdSet = true;
}
}
}
return this.instanceId;
}
}
protected internal override bool SupportsInstanceKeys
{
get
{
return false;
}
}
static AsyncCallback EventFrameCallback
{
get
{
if (eventFrameCallback == null)
{
eventFrameCallback = Fx.ThunkCallback(new AsyncCallback(EventFrame));
}
return eventFrameCallback;
}
}
WorkflowEventData EventData
{
get
{
if (this.eventData == null)
{
this.eventData = new WorkflowEventData(this);
}
return this.eventData;
}
}
bool HasPersistenceProvider
{
get
{
return this.persistenceManager != null;
}
}
bool IsHandlerThread
{
get
{
return this.isInHandler && this.handlerThreadId == Thread.CurrentThread.ManagedThreadId;
}
}
bool IsInTerminalState
{
get
{
return this.state == WorkflowApplicationState.Unloaded || this.state == WorkflowApplicationState.Aborted;
}
}
public void AddInitialInstanceValues(IDictionary writeOnlyValues)
{
ThrowIfReadOnly();
if (writeOnlyValues != null)
{
if (this.instanceMetadata == null)
{
this.instanceMetadata = new Dictionary(writeOnlyValues.Count);
}
foreach (KeyValuePair pair in writeOnlyValues)
{
// We use the indexer so that we can replace keys that already exist
this.instanceMetadata[pair.Key] = new InstanceValue(pair.Value, InstanceValueOptions.Optional | InstanceValueOptions.WriteOnly);
}
}
}
// host-facing access to our cascading ExtensionManager resolution. Used by WorkflowApplicationEventArgs
internal IEnumerable InternalGetExtensions() where T : class
{
return base.GetExtensions();
}
static void EventFrame(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
WorkflowEventData data = (WorkflowEventData)result.AsyncState;
WorkflowApplication thisPtr = data.Instance;
bool done = true;
try
{
Exception abortException = null;
try
{
// The "false" is to notify that we are not still [....]
done = data.NextCallback(result, thisPtr, false);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
abortException = e;
}
if (abortException != null)
{
thisPtr.AbortInstance(abortException, true);
}
}
finally
{
if (done)
{
thisPtr.OnNotifyPaused();
}
}
}
bool ShouldRaiseComplete(WorkflowInstanceState state)
{
return state == WorkflowInstanceState.Complete && !this.hasRaisedCompleted;
}
void Enqueue(InstanceOperation operation)
{
Enqueue(operation, false);
}
void Enqueue(InstanceOperation operation, bool push)
{
lock (this.pendingOperations)
{
// first make sure we're ready to run
EnsureInitialized();
operation.ActionId = this.actionCount;
if (this.isBusy)
{
if (operation.InterruptsScheduler)
{
this.Controller.RequestPause();
}
AddToPending(operation, push);
}
else if (!operation.CanRun(this))
{
AddToPending(operation, push);
}
else
{
// Action: Notifying an operation
this.actionCount++;
// We've essentially just notified this
// operation that it is free to do its
// thing
try
{
}
finally
{
operation.Notified = true;
this.isBusy = true;
}
}
}
}
void AddToPending(InstanceOperation operation, bool push)
{
if (push)
{
this.pendingOperations.PushFront(operation);
}
else
{
this.pendingOperations.Enqueue(operation);
}
operation.OnEnqueued();
}
bool Remove(InstanceOperation operation)
{
lock (this.pendingOperations)
{
return this.pendingOperations.Remove(operation);
}
}
bool WaitForTurn(InstanceOperation operation, TimeSpan timeout)
{
Enqueue(operation);
if (!operation.WaitForTurn(timeout))
{
if (Remove(operation))
{
throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeout)));
}
}
return true;
}
bool WaitForTurnAsync(InstanceOperation operation, TimeSpan timeout, Action callback, object state)
{
return WaitForTurnAsync(operation, false, timeout, callback, state);
}
bool WaitForTurnAsync(InstanceOperation operation, bool push, TimeSpan timeout, Action callback, object state)
{
Enqueue(operation, push);
if (waitAsyncCompleteCallback == null)
{
waitAsyncCompleteCallback = new Action(OnWaitAsyncComplete);
}
return operation.WaitForTurnAsync(timeout, waitAsyncCompleteCallback, new WaitForTurnData(callback, state, operation, this));
}
static void OnWaitAsyncComplete(object state, TimeoutException exception)
{
WaitForTurnData data = (WaitForTurnData)state;
if (!data.Instance.Remove(data.Operation))
{
exception = null;
}
data.Callback(data.State, exception);
}
// For when we know that the operation is non-null
// and notified (like in async paths)
void ForceNotifyOperationComplete()
{
OnNotifyPaused();
}
void NotifyOperationComplete(InstanceOperation operation)
{
if (operation != null && operation.Notified)
{
OnNotifyPaused();
}
}
InstanceOperation FindOperation()
{
if (this.pendingOperations.Count > 0)
{
// Special case the first one
InstanceOperation temp = this.pendingOperations[0];
// Even if we can't run this operation we want to notify
// it if all the operations are invalid. This will cause
// the Validate* method to throw to the caller.
if (temp.CanRun(this) || this.IsInTerminalState)
{
// Action: Notifying an operation
this.actionCount++;
temp.Notified = true;
this.pendingOperations.Dequeue();
return temp;
}
else
{
for (int i = 0; i < this.pendingOperations.Count; i++)
{
temp = this.pendingOperations[i];
if (temp.CanRun(this))
{
// Action: Notifying an operation
this.actionCount++;
temp.Notified = true;
this.pendingOperations.Remove(i);
return temp;
}
}
}
}
return null;
}
// assumes that we're called under the pendingOperations lock
void EnsureInitialized()
{
if (!base.IsReadOnly)
{
// For newly created workflows (e.g. not the Load() case), we need to initialize now
base.RegisterExtensionManager(this.extensions);
base.Initialize(this.initialWorkflowArguments, this.rootExecutionProperties);
// make sure we have a persistence manager if necessary
if (this.persistenceManager == null && this.instanceStore != null)
{
Fx.Assert(this.Id != Guid.Empty, "should have a valid Id at this point");
this.persistenceManager = new PersistenceManager(this.instanceStore, this.instanceMetadata, this.Id);
}
}
}
protected override void OnNotifyPaused()
{
Fx.Assert(this.isBusy, "We're always busy when we get this notification.");
WorkflowInstanceState localInstanceState = this.Controller.State;
WorkflowApplicationState localApplicationState = this.state;
bool stillSync = true;
while (stillSync)
{
if (ShouldRaiseComplete(localInstanceState))
{
Exception abortException = null;
try
{
// We're about to notify the world that this instance is completed
// so let's make it official.
this.hasRaisedCompleted = true;
if (completedHandler == null)
{
completedHandler = new CompletedEventHandler();
}
stillSync = completedHandler.Run(this);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
abortException = e;
}
if (abortException != null)
{
AbortInstance(abortException, true);
}
}
else
{
InstanceOperation toRun = null;
bool shouldRunNow;
bool shouldRaiseIdleNow;
lock (this.pendingOperations)
{
toRun = FindOperation();
// Cache the state in local variables to ensure that none
// of the decision points in the ensuing "if" statement flip
// when control gets out of the lock.
shouldRunNow = (localInstanceState == WorkflowInstanceState.Runnable && localApplicationState == WorkflowApplicationState.Runnable);
shouldRaiseIdleNow = this.hasExecutionOccurredSinceLastIdle && localInstanceState == WorkflowInstanceState.Idle && !this.hasRaisedCompleted;
if (toRun == null && !shouldRunNow && !shouldRaiseIdleNow)
{
this.isBusy = false;
stillSync = false;
}
}
if (toRun != null)
{
toRun.NotifyTurn();
stillSync = false;
}
else if (shouldRaiseIdleNow)
{
this.hasExecutionOccurredSinceLastIdle = false;
Fx.Assert(this.isBusy, "we must be busy if we're raising idle");
Exception abortException = null;
try
{
if (idleHandler == null)
{
idleHandler = new IdleEventHandler();
}
stillSync = idleHandler.Run(this);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
abortException = e;
}
if (abortException != null)
{
AbortInstance(abortException, true);
}
}
else if (shouldRunNow)
{
this.hasExecutionOccurredSinceLastIdle = true;
// Action: Running the scheduler
this.actionCount++;
this.Controller.Run();
stillSync = false;
}
}
}
}
// used by WorkflowInvoker in the InvokeAsync case
internal void GetCompletionStatus(out Exception terminationException, out bool cancelled)
{
IDictionary dummyOutputs;
ActivityInstanceState completionState = this.Controller.GetCompletionState(out dummyOutputs, out terminationException);
Fx.Assert(completionState != ActivityInstanceState.Executing, "Activity cannot be executing when this method is called");
cancelled = (completionState == ActivityInstanceState.Canceled);
}
protected internal override void OnRequestAbort(Exception reason)
{
this.AbortInstance(reason, false);
}
public void Abort()
{
Abort(SR.DefaultAbortReason);
}
public void Abort(string reason)
{
// This is pretty loose check, but it is okay if we
// go down the abort path multiple times
if (this.state != WorkflowApplicationState.Aborted)
{
AbortInstance(new WorkflowApplicationAbortedException(reason), false);
}
}
void AbortPersistence()
{
if (this.persistenceManager != null)
{
this.persistenceManager.Abort();
}
PersistencePipeline currentPersistencePipeline = this.persistencePipelineInUse;
if (currentPersistencePipeline != null)
{
currentPersistencePipeline.Abort();
}
}
void AbortInstance(Exception reason, bool isWorkflowThread)
{
this.state = WorkflowApplicationState.Aborted;
// Need to ensure that either components see the Aborted state, this method sees the components, or both.
Thread.MemoryBarrier();
// We do this outside of the lock since persistence
// might currently be blocking access to the lock.
AbortPersistence();
if (isWorkflowThread)
{
if (!this.hasCalledAbort)
{
this.hasCalledAbort = true;
this.Controller.Abort(reason);
// We should get off this thread because we're unsure of its state
ScheduleTrackAndRaiseAborted(reason);
}
}
else
{
bool completeSelf = true;
InstanceOperation operation = null;
try
{
operation = new InstanceOperation();
completeSelf = WaitForTurnAsync(operation, true, ActivityDefaults.AcquireLockTimeout, new Action(OnAbortWaitComplete), reason);
if (completeSelf)
{
if (!this.hasCalledAbort)
{
this.hasCalledAbort = true;
this.Controller.Abort(reason);
// We need to get off this thread so we don't block the caller
// of abort
ScheduleTrackAndRaiseAborted(reason);
}
}
}
finally
{
if (completeSelf)
{
NotifyOperationComplete(operation);
}
}
}
}
void OnAbortWaitComplete(object state, TimeoutException exception)
{
if (exception != null)
{
// We ---- this exception because we were simply doing our
// best to get the lock. Note that we won't proceed without
// the lock because we may have already succeeded on another
// thread. Technically this abort call has failed.
return;
}
bool shouldRaise = false;
Exception reason = (Exception)state;
try
{
if (!this.hasCalledAbort)
{
shouldRaise = true;
this.hasCalledAbort = true;
this.Controller.Abort(reason);
}
}
finally
{
ForceNotifyOperationComplete();
}
if (shouldRaise)
{
// We call this from this thread because we've already
// had a thread switch
TrackAndRaiseAborted(reason);
}
}
void ScheduleTrackAndRaiseAborted(Exception reason)
{
if (this.Controller.HasPendingTrackingRecords || this.Aborted != null)
{
ActionItem.Schedule(new Action(TrackAndRaiseAborted), reason);
}
}
// This is only ever called from an appropriate thread (not the thread
// that called abort unless it was an internal abort).
// This method is called without the lock. We still provide single threaded
// guarantees to the Controller because:
// * No other call can ever enter the executor again once the state has
// switched to Aborted
// * If this was an internal abort then the thread was fast pathing its
// way out of the runtime and won't conflict
void TrackAndRaiseAborted(object state)
{
Exception reason = (Exception)state;
if (this.Controller.HasPendingTrackingRecords)
{
try
{
IAsyncResult result = this.Controller.BeginFlushTrackingRecords(ActivityDefaults.TrackingTimeout, Fx.ThunkCallback(new AsyncCallback(OnAbortTrackingComplete)), reason);
if (result.CompletedSynchronously)
{
this.Controller.EndFlushTrackingRecords(result);
}
else
{
return;
}
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
// We ---- any exception here because we are on the abort path
// and are doing a best effort to track this record.
}
}
RaiseAborted(reason);
}
void OnAbortTrackingComplete(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
Exception reason = (Exception)result.AsyncState;
try
{
this.Controller.EndFlushTrackingRecords(result);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
// We ---- any exception here because we are on the abort path
// and are doing a best effort to track this record.
}
RaiseAborted(reason);
}
void RaiseAborted(Exception reason)
{
if (this.invokeCompletedCallback == null)
{
Action abortedHandler = this.Aborted;
if (abortedHandler != null)
{
try
{
this.handlerThreadId = Thread.CurrentThread.ManagedThreadId;
this.isInHandler = true;
abortedHandler(new WorkflowApplicationAbortedEventArgs(this, reason));
}
finally
{
this.isInHandler = false;
}
}
}
else
{
this.invokeCompletedCallback();
}
if (TD.WorkflowInstanceAbortedIsEnabled())
{
TD.WorkflowInstanceAborted(this.Id.ToString(), reason);
}
}
public void Terminate(string reason)
{
Terminate(reason, ActivityDefaults.AcquireLockTimeout);
}
public void Terminate(Exception reason)
{
Terminate(reason, ActivityDefaults.AcquireLockTimeout);
}
public void Terminate(string reason, TimeSpan timeout)
{
if (string.IsNullOrEmpty(reason))
{
throw FxTrace.Exception.ArgumentNullOrEmpty("reason");
}
Terminate(new WorkflowApplicationTerminatedException(reason, this.Id), timeout);
}
public void Terminate(Exception reason, TimeSpan timeout)
{
if (reason == null)
{
throw FxTrace.Exception.ArgumentNull("reason");
}
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
InstanceOperation operation = null;
try
{
operation = new InstanceOperation();
WaitForTurn(operation, timeoutHelper.RemainingTime());
ValidateStateForTerminate();
TerminateCore(reason);
this.Controller.FlushTrackingRecords(timeoutHelper.RemainingTime());
}
finally
{
NotifyOperationComplete(operation);
}
}
void TerminateCore(Exception reason)
{
this.Controller.Terminate(reason);
}
public IAsyncResult BeginTerminate(string reason, AsyncCallback callback, object state)
{
return BeginTerminate(reason, ActivityDefaults.AcquireLockTimeout, callback, state);
}
public IAsyncResult BeginTerminate(Exception reason, AsyncCallback callback, object state)
{
return BeginTerminate(reason, ActivityDefaults.AcquireLockTimeout, callback, state);
}
public IAsyncResult BeginTerminate(string reason, TimeSpan timeout, AsyncCallback callback, object state)
{
if (string.IsNullOrEmpty(reason))
{
throw FxTrace.Exception.ArgumentNullOrEmpty("reason");
}
return BeginTerminate(new WorkflowApplicationTerminatedException(reason, this.Id), timeout, callback, state);
}
public IAsyncResult BeginTerminate(Exception reason, TimeSpan timeout, AsyncCallback callback, object state)
{
if (reason == null)
{
throw FxTrace.Exception.ArgumentNull("reason");
}
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
return TerminateAsyncResult.Create(this, reason, timeout, callback, state);
}
public void EndTerminate(IAsyncResult result)
{
TerminateAsyncResult.End(result);
}
// called from the [....] and async paths
void CancelCore()
{
// We only actually do any work if we haven't completed and we aren't
// unloaded.
if (!this.hasRaisedCompleted && this.state != WorkflowApplicationState.Unloaded)
{
this.Controller.ScheduleCancel();
// This is a loose check, but worst case scenario we call
// an extra, unnecessary Run
if (!this.hasCalledRun && !this.hasRaisedCompleted)
{
RunCore();
}
}
}
public void Cancel()
{
Cancel(ActivityDefaults.AcquireLockTimeout);
}
public void Cancel(TimeSpan timeout)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
InstanceOperation operation = null;
try
{
operation = new InstanceOperation();
WaitForTurn(operation, timeoutHelper.RemainingTime());
ValidateStateForCancel();
CancelCore();
this.Controller.FlushTrackingRecords(timeoutHelper.RemainingTime());
}
finally
{
NotifyOperationComplete(operation);
}
}
public IAsyncResult BeginCancel(AsyncCallback callback, object state)
{
return BeginCancel(ActivityDefaults.AcquireLockTimeout, callback, state);
}
public IAsyncResult BeginCancel(TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
return CancelAsyncResult.Create(this, timeout, callback, state);
}
public void EndCancel(IAsyncResult result)
{
CancelAsyncResult.End(result);
}
// called on the Invoke path, this will go away when WorkflowInvoker implements WorkflowInstance directly
static WorkflowApplication CreateInstance(Activity activity, IDictionary inputs, WorkflowInstanceExtensionManager extensions, SynchronizationContext syncContext, Action invokeCompletedCallback)
{
// 1) Create the workflow instance
Transaction ambientTransaction = Transaction.Current;
List workflowExecutionProperties = null;
if (ambientTransaction != null)
{
// no need for a NoPersistHandle since the ActivityExecutor performs a no-persist zone
// as part of the RuntimeTransactionHandle processing
workflowExecutionProperties = new List(1)
{
new RuntimeTransactionHandle(ambientTransaction)
};
}
WorkflowApplication instance = new WorkflowApplication(activity, inputs, workflowExecutionProperties)
{
SynchronizationContext = syncContext
};
bool success = false;
try
{
// 2) Take the executor lock before allowing extensions to be added
instance.isBusy = true;
// 3) Add extensions
if (extensions != null)
{
instance.extensions = extensions;
}
// 4) Setup miscellaneous state
instance.invokeCompletedCallback = invokeCompletedCallback;
success = true;
}
finally
{
if (!success)
{
instance.isBusy = false;
}
}
return instance;
}
static void RunInstance(WorkflowApplication instance)
{
// We still have the lock because we took it in Create
// first make sure we're ready to run
instance.EnsureInitialized();
// Shortcut path for resuming the instance
instance.RunCore();
instance.hasExecutionOccurredSinceLastIdle = true;
instance.Controller.Run();
}
static WorkflowApplication StartInvoke(Activity activity, IDictionary inputs, WorkflowInstanceExtensionManager extensions, SynchronizationContext syncContext, Action invokeCompletedCallback, AsyncInvokeContext invokeContext)
{
WorkflowApplication instance = CreateInstance(activity, inputs, extensions, syncContext, invokeCompletedCallback);
if (invokeContext != null)
{
invokeContext.WorkflowApplication = instance;
}
RunInstance(instance);
return instance;
}
internal static IDictionary Invoke(Activity activity, IDictionary inputs, WorkflowInstanceExtensionManager extensions, TimeSpan timeout)
{
Fx.Assert(activity != null, "Activity must not be null.");
// Create the invoke synchronization context
PumpBasedSynchronizationContext syncContext = new PumpBasedSynchronizationContext(timeout);
WorkflowApplication instance = CreateInstance(activity, inputs, extensions, syncContext, new Action(syncContext.OnInvokeCompleted));
// Wait for completion
try
{
RunInstance(instance);
syncContext.DoPump();
}
catch (TimeoutException)
{
instance.Abort(SR.AbortingDueToInstanceTimeout);
throw;
}
Exception completionException = null;
IDictionary outputs = null;
if (instance.Controller.State == WorkflowInstanceState.Aborted)
{
completionException = new WorkflowApplicationAbortedException(SR.DefaultAbortReason, instance.Controller.GetAbortReason());
}
else
{
Fx.Assert(instance.Controller.State == WorkflowInstanceState.Complete, "We should only get here when we are completed.");
instance.Controller.GetCompletionState(out outputs, out completionException);
}
if (completionException != null)
{
throw FxTrace.Exception.AsError(completionException);
}
return outputs;
}
internal static IAsyncResult BeginInvoke(Activity activity, IDictionary inputs, WorkflowInstanceExtensionManager extensions, TimeSpan timeout, SynchronizationContext syncContext, AsyncInvokeContext invokeContext, AsyncCallback callback, object state)
{
Fx.Assert(activity != null, "The activity must not be null.");
return new InvokeAsyncResult(activity, inputs, extensions, timeout, syncContext, invokeContext, callback, state);
}
internal static IDictionary EndInvoke(IAsyncResult result)
{
return InvokeAsyncResult.End(result);
}
public void Run()
{
Run(ActivityDefaults.AcquireLockTimeout);
}
public void Run(TimeSpan timeout)
{
InternalRun(timeout, true);
}
void InternalRun(TimeSpan timeout, bool isUserRun)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
InstanceOperation operation = null;
try
{
operation = new InstanceOperation();
WaitForTurn(operation, timeoutHelper.RemainingTime());
ValidateStateForRun();
if (isUserRun)
{
// We set this to true here so that idle is raised
// regardless of whether the call to Run resulted
// in execution.
this.hasExecutionOccurredSinceLastIdle = true;
}
RunCore();
this.Controller.FlushTrackingRecords(timeoutHelper.RemainingTime());
}
finally
{
NotifyOperationComplete(operation);
}
}
void RunCore()
{
if (!this.hasCalledRun)
{
this.hasCalledRun = true;
}
this.state = WorkflowApplicationState.Runnable;
}
public IAsyncResult BeginRun(AsyncCallback callback, object state)
{
return BeginRun(ActivityDefaults.AcquireLockTimeout, callback, state);
}
public IAsyncResult BeginRun(TimeSpan timeout, AsyncCallback callback, object state)
{
return BeginInternalRun(timeout, true, callback, state);
}
IAsyncResult BeginInternalRun(TimeSpan timeout, bool isUserRun, AsyncCallback callback, object state)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
return RunAsyncResult.Create(this, isUserRun, timeout, callback, state);
}
public void EndRun(IAsyncResult result)
{
RunAsyncResult.End(result);
}
// shared by Load/BeginLoad
bool IsLoadTransactionRequired()
{
return base.GetExtensions().Any(module => module.IsLoadTransactionRequired);
}
// shared by Load/BeginLoad
void CreatePersistenceManager()
{
Fx.Assert(this.persistenceManager == null, "CreatePersistenceManager should only be called once");
// first register our extensions since we'll need them to construct the pipeline
base.RegisterExtensionManager(this.extensions);
this.persistenceManager = new PersistenceManager(this.InstanceStore, this.instanceMetadata, this.instanceId);
}
// shared by Load/BeginLoad
PersistencePipeline ProcessInstanceValues(IDictionary values, out object deserializedRuntimeState)
{
PersistencePipeline result = null;
InstanceValue value;
if (values.TryGetValue(WorkflowNamespace.Workflow, out value) && value.Value is ActivityExecutor)
{
deserializedRuntimeState = value.Value;
}
else
{
throw FxTrace.Exception.AsError(new InstancePersistenceException(SR.WorkflowInstanceNotFoundInStore(this.persistenceManager.InstanceId)));
}
if (HasPersistenceModule)
{
IEnumerable modules = base.GetExtensions();
result = new PersistencePipeline(modules);
result.SetLoadedValues(values);
}
return result;
}
public void LoadRunnableInstance()
{
LoadRunnableInstance(ActivityDefaults.LoadTimeout);
}
public void LoadRunnableInstance(TimeSpan timeout)
{
ThrowIfReadOnly(); // only allow a single Load() or Run()
TimeoutHelper.ThrowIfNegativeArgument(timeout);
if (this.InstanceStore == null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.LoadingWorkflowApplicationRequiresInstanceStore));
}
if (this.instanceIdSet)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowApplicationAlreadyHasId));
}
if (this.initialWorkflowArguments != null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CannotUseInputsWithLoad));
}
if (this.persistenceManager != null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryLoadRequiresOwner));
}
RegisterExtensionManager(this.extensions);
this.persistenceManager = new PersistenceManager(InstanceStore, this.instanceMetadata);
if (!this.persistenceManager.IsInitialized)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryLoadRequiresOwner));
}
LoadCore(timeout, true);
}
public void Load(Guid instanceId)
{
Load(instanceId, ActivityDefaults.LoadTimeout);
}
public void Load(Guid instanceId, TimeSpan timeout)
{
ThrowIfAborted();
ThrowIfReadOnly(); // only allow a single Load() or Run()
if (instanceId == Guid.Empty)
{
throw FxTrace.Exception.ArgumentNullOrEmpty("instanceId");
}
TimeoutHelper.ThrowIfNegativeArgument(timeout);
if (this.InstanceStore == null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.LoadingWorkflowApplicationRequiresInstanceStore));
}
if (this.instanceIdSet)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowApplicationAlreadyHasId));
}
if (this.initialWorkflowArguments != null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CannotUseInputsWithLoad));
}
this.instanceId = instanceId;
this.instanceIdSet = true;
CreatePersistenceManager();
LoadCore(timeout, false);
}
void LoadCore(TimeSpan timeout, bool loadAny)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
if (!this.persistenceManager.IsInitialized)
{
this.persistenceManager.Initialize(timeoutHelper.RemainingTime());
}
PersistencePipeline pipeline = null;
WorkflowPersistenceContext context = null;
TransactionScope scope = null;
bool success = false;
try
{
context = new WorkflowPersistenceContext(IsLoadTransactionRequired(), timeoutHelper.OriginalTimeout);
scope = Fx.CreateTransactionScope(context.PublicTransaction);
IDictionary values;
if (loadAny)
{
if (!this.persistenceManager.TryLoad(timeoutHelper.RemainingTime(), out values))
{
throw FxTrace.Exception.AsError(new InstanceNotReadyException(SR.NoRunnableInstances));
}
if (this.instanceIdSet)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowApplicationAlreadyHasId));
}
this.instanceId = this.persistenceManager.InstanceId;
this.instanceIdSet = true;
}
else
{
values = this.persistenceManager.Load(timeoutHelper.RemainingTime());
}
object deserializedRuntimeState;
pipeline = ProcessInstanceValues(values, out deserializedRuntimeState);
if (pipeline != null)
{
try
{
this.persistencePipelineInUse = pipeline;
// Need to ensure that either we see the Aborted state, AbortInstance sees us, or both.
Thread.MemoryBarrier();
if (this.state == WorkflowApplicationState.Aborted)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason));
}
pipeline.EndLoad(pipeline.BeginLoad(timeoutHelper.RemainingTime(), null, null));
}
finally
{
this.persistencePipelineInUse = null;
}
}
base.Initialize(deserializedRuntimeState);
success = true;
}
finally
{
// Clean up the transaction scope regardless of failure
Fx.CompleteTransactionScope(ref scope);
if (context != null)
{
if (success)
{
context.Complete();
}
else
{
context.Abort();
}
}
if (!success)
{
Abort(SR.AbortingDueToLoadFailure);
}
}
if (pipeline != null)
{
pipeline.Publish();
}
}
public IAsyncResult BeginLoadRunnableInstance(AsyncCallback callback, object state)
{
return BeginLoadRunnableInstance(ActivityDefaults.LoadTimeout, callback, state);
}
public IAsyncResult BeginLoadRunnableInstance(TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfReadOnly(); // only allow a single Load() or Run()
TimeoutHelper.ThrowIfNegativeArgument(timeout);
if (this.InstanceStore == null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.LoadingWorkflowApplicationRequiresInstanceStore));
}
if (this.instanceIdSet)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowApplicationAlreadyHasId));
}
if (this.initialWorkflowArguments != null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CannotUseInputsWithLoad));
}
if (this.persistenceManager != null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryLoadRequiresOwner));
}
RegisterExtensionManager(this.extensions);
this.persistenceManager = new PersistenceManager(InstanceStore, this.instanceMetadata);
if (!this.persistenceManager.IsInitialized)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryLoadRequiresOwner));
}
return new LoadAsyncResult(this, true, timeout, callback, state);
}
public IAsyncResult BeginLoad(Guid instanceId, AsyncCallback callback, object state)
{
return BeginLoad(instanceId, ActivityDefaults.LoadTimeout, callback, state);
}
public IAsyncResult BeginLoad(Guid instanceId, TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfAborted();
ThrowIfReadOnly(); // only allow a single Load() or Run()
if (instanceId == Guid.Empty)
{
throw FxTrace.Exception.ArgumentNullOrEmpty("instanceId");
}
TimeoutHelper.ThrowIfNegativeArgument(timeout);
if (this.InstanceStore == null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.LoadingWorkflowApplicationRequiresInstanceStore));
}
if (this.instanceIdSet)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowApplicationAlreadyHasId));
}
if (this.initialWorkflowArguments != null)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CannotUseInputsWithLoad));
}
this.instanceId = instanceId;
this.instanceIdSet = true;
CreatePersistenceManager();
return new LoadAsyncResult(this, false, timeout, callback, state);
}
public void EndLoad(IAsyncResult result)
{
LoadAsyncResult.End(result);
}
public void EndLoadRunnableInstance(IAsyncResult result)
{
LoadAsyncResult.End(result);
}
protected override void OnNotifyUnhandledException(Exception exception, Activity exceptionSource, string exceptionSourceInstanceId)
{
bool done = true;
try
{
Exception abortException = null;
try
{
if (unhandledExceptionHandler == null)
{
unhandledExceptionHandler = new UnhandledExceptionEventHandler();
}
done = unhandledExceptionHandler.Run(this, exception, exceptionSource, exceptionSourceInstanceId);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
abortException = e;
}
if (abortException != null)
{
AbortInstance(abortException, true);
}
}
finally
{
if (done)
{
OnNotifyPaused();
}
}
}
IAsyncResult BeginInternalPersist(PersistenceOperation operation, TimeSpan timeout, bool isInternalPersist, AsyncCallback callback, object state)
{
return new UnloadOrPersistAsyncResult(this, timeout, operation, true, isInternalPersist, callback, state);
}
void EndInternalPersist(IAsyncResult result)
{
UnloadOrPersistAsyncResult.End(result);
}
void TrackPersistence(PersistenceOperation operation)
{
if (this.Controller.TrackingEnabled)
{
if (operation == PersistenceOperation.Complete)
{
this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Deleted));
}
else if (operation == PersistenceOperation.Unload)
{
this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Unloaded));
}
else
{
this.Controller.Track(new WorkflowInstanceRecord(this.Id, this.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Persisted));
}
}
}
void PersistCore(ref TimeoutHelper timeoutHelper, PersistenceOperation operation)
{
if (HasPersistenceProvider)
{
if (!this.persistenceManager.IsInitialized)
{
this.persistenceManager.Initialize(timeoutHelper.RemainingTime());
}
if (!this.persistenceManager.IsLocked && Transaction.Current != null)
{
this.persistenceManager.EnsureReadyness(timeoutHelper.RemainingTime());
}
// Do the tracking before preparing in case the tracking data is being pushed into
// an extension and persisted transactionally with the instance state.
TrackPersistence(operation);
this.Controller.FlushTrackingRecords(timeoutHelper.RemainingTime());
}
bool success = false;
WorkflowPersistenceContext context = null;
TransactionScope scope = null;
try
{
IDictionary data = null;
PersistencePipeline pipeline = null;
if (HasPersistenceModule)
{
IEnumerable modules = base.GetExtensions();
pipeline = new PersistencePipeline(modules, PersistenceManager.GenerateInitialData(this));
pipeline.Collect();
pipeline.Map();
data = pipeline.Values;
}
if (HasPersistenceProvider)
{
if (data == null)
{
data = PersistenceManager.GenerateInitialData(this);
}
if (context == null)
{
Fx.Assert(scope == null, "Should not have been able to set up a scope.");
context = new WorkflowPersistenceContext(pipeline != null && pipeline.IsSaveTransactionRequired, timeoutHelper.OriginalTimeout);
scope = Fx.CreateTransactionScope(context.PublicTransaction);
}
this.persistenceManager.Save(data, operation, timeoutHelper.RemainingTime());
}
if (pipeline != null)
{
if (context == null)
{
Fx.Assert(scope == null, "Should not have been able to set up a scope if we had no context.");
context = new WorkflowPersistenceContext(pipeline.IsSaveTransactionRequired, timeoutHelper.OriginalTimeout);
scope = Fx.CreateTransactionScope(context.PublicTransaction);
}
try
{
this.persistencePipelineInUse = pipeline;
// Need to ensure that either we see the Aborted state, AbortInstance sees us, or both.
Thread.MemoryBarrier();
if (this.state == WorkflowApplicationState.Aborted)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason));
}
pipeline.EndSave(pipeline.BeginSave(timeoutHelper.RemainingTime(), null, null));
}
finally
{
this.persistencePipelineInUse = null;
}
}
success = true;
}
finally
{
// Clean up the transaction scope regardless of failure
Fx.CompleteTransactionScope(ref scope);
if (context != null)
{
if (success)
{
context.Complete();
}
else
{
context.Abort();
}
}
if (success)
{
if (operation != PersistenceOperation.Save)
{
// Stop execution if we've given up the instance lock
this.state = WorkflowApplicationState.Paused;
if (TD.WorkflowApplicationUnloadedIsEnabled())
{
TD.WorkflowApplicationUnloaded(this.Id.ToString());
}
}
else
{
if (TD.WorkflowApplicationPersistedIsEnabled())
{
TD.WorkflowApplicationPersisted(this.Id.ToString());
}
}
if (operation == PersistenceOperation.Complete || operation == PersistenceOperation.Unload)
{
// We did a Delete or Unload, so if we have a persistence provider, tell it to delete the owner.
if (HasPersistenceProvider && this.persistenceManager.OwnerWasCreated)
{
// This will happen to be under the caller's transaction, if there is one.
//
this.persistenceManager.DeleteOwner(timeoutHelper.RemainingTime());
}
MarkUnloaded();
}
}
}
}
[Fx.Tag.InheritThrows(From = "Unload")]
public void Persist()
{
Persist(ActivityDefaults.SaveTimeout);
}
[Fx.Tag.InheritThrows(From = "Unload")]
public void Persist(TimeSpan timeout)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
RequiresPersistenceOperation operation = new RequiresPersistenceOperation();
try
{
WaitForTurn(operation, timeoutHelper.RemainingTime());
ValidateStateForPersist();
PersistCore(ref timeoutHelper, PersistenceOperation.Save);
}
finally
{
NotifyOperationComplete(operation);
}
}
[Fx.Tag.InheritThrows(From = "Unload")]
public IAsyncResult BeginPersist(AsyncCallback callback, object state)
{
return BeginPersist(ActivityDefaults.SaveTimeout, callback, state);
}
[Fx.Tag.InheritThrows(From = "Unload")]
public IAsyncResult BeginPersist(TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
return new UnloadOrPersistAsyncResult(this, timeout, PersistenceOperation.Save, false, false, callback, state);
}
[Fx.Tag.InheritThrows(From = "Unload")]
public void EndPersist(IAsyncResult result)
{
UnloadOrPersistAsyncResult.End(result);
}
// called from WorkflowApplicationIdleEventArgs
internal ReadOnlyCollection GetBookmarksForIdle()
{
return this.Controller.GetBookmarks();
}
public ReadOnlyCollection GetBookmarks()
{
return GetBookmarks(ActivityDefaults.ResumeBookmarkTimeout);
}
public ReadOnlyCollection GetBookmarks(TimeSpan timeout)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
InstanceOperation operation = new InstanceOperation();
try
{
WaitForTurn(operation, timeout);
ValidateStateForGetAllBookmarks();
return this.Controller.GetBookmarks();
}
finally
{
NotifyOperationComplete(operation);
}
}
protected internal override IAsyncResult OnBeginPersist(AsyncCallback callback, object state)
{
return this.BeginInternalPersist(PersistenceOperation.Save, ActivityDefaults.InternalSaveTimeout, true, callback, state);
}
protected internal override void OnEndPersist(IAsyncResult result)
{
this.EndInternalPersist(result);
}
protected internal override IAsyncResult OnBeginAssociateKeys(ICollection keys, AsyncCallback callback, object state)
{
throw Fx.AssertAndThrow("WorkflowApplication is sealed with CanUseKeys as false, so WorkflowInstance should not call OnBeginAssociateKeys.");
}
protected internal override void OnEndAssociateKeys(IAsyncResult result)
{
throw Fx.AssertAndThrow("WorkflowApplication is sealed with CanUseKeys as false, so WorkflowInstance should not call OnEndAssociateKeys.");
}
protected internal override void OnDisassociateKeys(ICollection keys)
{
throw Fx.AssertAndThrow("WorkflowApplication is sealed with CanUseKeys as false, so WorkflowInstance should not call OnDisassociateKeys.");
}
bool AreBookmarksInvalid(out BookmarkResumptionResult result)
{
if (this.hasRaisedCompleted)
{
result = BookmarkResumptionResult.NotFound;
return true;
}
else if (this.state == WorkflowApplicationState.Unloaded || this.state == WorkflowApplicationState.Aborted)
{
result = BookmarkResumptionResult.NotReady;
return true;
}
result = BookmarkResumptionResult.Success;
return false;
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public BookmarkResumptionResult ResumeBookmark(string bookmarkName, object value)
{
if (string.IsNullOrEmpty(bookmarkName))
{
throw FxTrace.Exception.ArgumentNullOrEmpty("bookmarkName");
}
return ResumeBookmark(new Bookmark(bookmarkName), value);
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public BookmarkResumptionResult ResumeBookmark(Bookmark bookmark, object value)
{
return ResumeBookmark(bookmark, value, ActivityDefaults.ResumeBookmarkTimeout);
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public BookmarkResumptionResult ResumeBookmark(string bookmarkName, object value, TimeSpan timeout)
{
if (string.IsNullOrEmpty(bookmarkName))
{
throw FxTrace.Exception.ArgumentNullOrEmpty("bookmarkName");
}
return ResumeBookmark(new Bookmark(bookmarkName), value, timeout);
}
[Fx.Tag.InheritThrows(From = "BeginResumeBookmark", FromDeclaringType = typeof(WorkflowInstance))]
public BookmarkResumptionResult ResumeBookmark(Bookmark bookmark, object value, TimeSpan timeout)
{
TimeoutHelper.ThrowIfNegativeArgument(timeout);
ThrowIfHandlerThread();
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
// This is a loose check, but worst case scenario we call
// an extra, unnecessary Run
if (!this.hasCalledRun)
{
InternalRun(timeoutHelper.RemainingTime(), false);
}
InstanceOperation operation = new RequiresIdleOperation();
BookmarkResumptionResult result;
do
{
InstanceOperation nextOperation = null;
try
{
WaitForTurn(operation, timeoutHelper.RemainingTime());
if (AreBookmarksInvalid(out result))
{
return result;
}
result = ResumeBookmarkCore(bookmark, value);
if (result == BookmarkResumptionResult.Success)
{
this.Controller.FlushTrackingRecords(timeoutHelper.RemainingTime());
}
else if (result == BookmarkResumptionResult.NotReady)
{
nextOperation = new DeferredRequiresIdleOperation();
}
}
finally
{
NotifyOperationComplete(operation);
}
operation = nextOperation;
} while (operation != null);
return result;
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public IAsyncResult BeginResumeBookmark(string bookmarkName, object value, AsyncCallback callback, object state)
{
if (string.IsNullOrEmpty(bookmarkName))
{
throw FxTrace.Exception.ArgumentNullOrEmpty("bookmarkName");
}
return BeginResumeBookmark(new Bookmark(bookmarkName), value, callback, state);
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public IAsyncResult BeginResumeBookmark(string bookmarkName, object value, TimeSpan timeout, AsyncCallback callback, object state)
{
if (string.IsNullOrEmpty(bookmarkName))
{
throw FxTrace.Exception.ArgumentNullOrEmpty("bookmarkName");
}
return BeginResumeBookmark(new Bookmark(bookmarkName), value, timeout, callback, state);
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public IAsyncResult BeginResumeBookmark(Bookmark bookmark, object value, AsyncCallback callback, object state)
{
return BeginResumeBookmark(bookmark, value, ActivityDefaults.ResumeBookmarkTimeout, callback, state);
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public IAsyncResult BeginResumeBookmark(Bookmark bookmark, object value, TimeSpan timeout, AsyncCallback callback, object state)
{
TimeoutHelper.ThrowIfNegativeArgument(timeout);
ThrowIfHandlerThread();
return new ResumeBookmarkAsyncResult(this, bookmark, value, timeout, callback, state);
}
[Fx.Tag.InheritThrows(From = "ResumeBookmark")]
public BookmarkResumptionResult EndResumeBookmark(IAsyncResult result)
{
return ResumeBookmarkAsyncResult.End(result);
}
protected internal override IAsyncResult OnBeginResumeBookmark(Bookmark bookmark, object value, TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfHandlerThread();
return new ResumeBookmarkAsyncResult(this, bookmark, value, true, timeout, callback, state);
}
protected internal override BookmarkResumptionResult OnEndResumeBookmark(IAsyncResult result)
{
return ResumeBookmarkAsyncResult.End(result);
}
BookmarkResumptionResult ResumeBookmarkCore(Bookmark bookmark, object value)
{
BookmarkResumptionResult result = this.Controller.ScheduleBookmarkResumption(bookmark, value);
if (result == BookmarkResumptionResult.Success)
{
RunCore();
}
return result;
}
// Returns true if successful, false otherwise
bool RaiseIdleEvent()
{
if (TD.WorkflowApplicationIdledIsEnabled())
{
TD.WorkflowApplicationIdled(this.Id.ToString());
}
Exception abortException = null;
try
{
Action idleHandler = this.Idle;
if (idleHandler != null)
{
this.handlerThreadId = Thread.CurrentThread.ManagedThreadId;
this.isInHandler = true;
idleHandler(new WorkflowApplicationIdleEventArgs(this));
}
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
abortException = e;
}
finally
{
this.isInHandler = false;
}
if (abortException != null)
{
AbortInstance(abortException, true);
return false;
}
return true;
}
void MarkUnloaded()
{
this.state = WorkflowApplicationState.Unloaded;
// don't abort completed instances
if (this.Controller.State != WorkflowInstanceState.Complete)
{
this.Controller.Abort();
}
else
{
base.DisposeExtensions();
}
Exception abortException = null;
try
{
Action handler = this.Unloaded;
if (handler != null)
{
this.handlerThreadId = Thread.CurrentThread.ManagedThreadId;
this.isInHandler = true;
handler(new WorkflowApplicationEventArgs(this));
}
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
abortException = e;
}
finally
{
this.isInHandler = false;
}
if (abortException != null)
{
AbortInstance(abortException, true);
}
}
[Fx.Tag.Throws(typeof(WorkflowApplicationException), "The WorkflowApplication is in a state for which unloading is not valid. The specific subclass denotes which state the instance is in.")]
[Fx.Tag.Throws(typeof(InstancePersistenceException), "Something went wrong during persistence, but persistence can be retried.")]
[Fx.Tag.Throws(typeof(TimeoutException), "The workflow could not be unloaded within the given timeout.")]
public void Unload()
{
Unload(ActivityDefaults.SaveTimeout);
}
[Fx.Tag.InheritThrows(From = "Unload")]
public void Unload(TimeSpan timeout)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
RequiresPersistenceOperation operation = new RequiresPersistenceOperation();
try
{
WaitForTurn(operation, timeoutHelper.RemainingTime());
ValidateStateForUnload();
if (this.state != WorkflowApplicationState.Unloaded) // Unload on unload is a no-op
{
PersistenceOperation persistenceOperation;
if (this.Controller.State == WorkflowInstanceState.Complete)
{
persistenceOperation = PersistenceOperation.Complete;
}
else
{
persistenceOperation = PersistenceOperation.Unload;
}
PersistCore(ref timeoutHelper, persistenceOperation);
}
}
finally
{
NotifyOperationComplete(operation);
}
}
[Fx.Tag.InheritThrows(From = "Unload")]
public IAsyncResult BeginUnload(AsyncCallback callback, object state)
{
return BeginUnload(ActivityDefaults.SaveTimeout, callback, state);
}
[Fx.Tag.InheritThrows(From = "Unload")]
public IAsyncResult BeginUnload(TimeSpan timeout, AsyncCallback callback, object state)
{
ThrowIfHandlerThread();
TimeoutHelper.ThrowIfNegativeArgument(timeout);
return new UnloadOrPersistAsyncResult(this, timeout, PersistenceOperation.Unload, false, false, callback, state);
}
[Fx.Tag.InheritThrows(From = "Unload")]
public void EndUnload(IAsyncResult result)
{
UnloadOrPersistAsyncResult.End(result);
}
void ThrowIfMulticast(Delegate value)
{
if (value != null && value.GetInvocationList().Length > 1)
{
throw FxTrace.Exception.Argument("value", SR.OnlySingleCastDelegatesAllowed);
}
}
void ThrowIfAborted()
{
if (this.state == WorkflowApplicationState.Aborted)
{
throw FxTrace.Exception.AsError(new WorkflowApplicationAbortedException(SR.WorkflowApplicationAborted(this.Id), this.Id));
}
}
void ThrowIfTerminatedOrCompleted()
{
if (this.hasRaisedCompleted)
{
Exception completionException;
this.Controller.GetCompletionState(out completionException);
if (completionException != null)
{
throw FxTrace.Exception.AsError(new WorkflowApplicationTerminatedException(SR.WorkflowApplicationTerminated(this.Id), this.Id, completionException));
}
else
{
throw FxTrace.Exception.AsError(new WorkflowApplicationCompletedException(SR.WorkflowApplicationCompleted(this.Id), this.Id));
}
}
}
void ThrowIfUnloaded()
{
if (this.state == WorkflowApplicationState.Unloaded)
{
throw FxTrace.Exception.AsError(new WorkflowApplicationUnloadedException(SR.WorkflowApplicationUnloaded(this.Id), this.Id));
}
}
void ThrowIfNoInstanceStore()
{
if (!HasPersistenceProvider)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.InstanceStoreRequiredToPersist));
}
}
void ThrowIfHandlerThread()
{
if (this.IsHandlerThread)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CannotPerformOperationFromHandlerThread));
}
}
void ValidateStateForRun()
{
// WorkflowInstanceException validations
ThrowIfAborted();
ThrowIfTerminatedOrCompleted();
ThrowIfUnloaded();
}
void ValidateStateForGetAllBookmarks()
{
// WorkflowInstanceException validations
ThrowIfAborted();
ThrowIfTerminatedOrCompleted();
ThrowIfUnloaded();
}
void ValidateStateForCancel()
{
// WorkflowInstanceException validations
ThrowIfAborted();
// We only validate that we aren't aborted and no-op otherwise.
// This is because the scenario for calling cancel is for it to
// be a best attempt from an unknown thread. The less it throws
// the easier it is to author a host.
}
void ValidateStateForPersist()
{
// WorkflowInstanceException validations
ThrowIfAborted();
ThrowIfTerminatedOrCompleted();
ThrowIfUnloaded();
// Other validations
ThrowIfNoInstanceStore();
}
void ValidateStateForUnload()
{
// WorkflowInstanceException validations
ThrowIfAborted();
// Other validations
if (this.Controller.State != WorkflowInstanceState.Complete)
{
ThrowIfNoInstanceStore();
}
}
void ValidateStateForTerminate()
{
// WorkflowInstanceException validations
ThrowIfAborted();
ThrowIfTerminatedOrCompleted();
ThrowIfUnloaded();
}
enum PersistenceOperation : byte
{
Complete,
Save,
Unload
}
enum WorkflowApplicationState : byte
{
Paused,
Runnable,
Unloaded,
Aborted
}
internal class SynchronousSynchronizationContext : SynchronizationContext
{
static SynchronousSynchronizationContext value;
SynchronousSynchronizationContext()
{
}
public static SynchronousSynchronizationContext Value
{
get
{
if (value == null)
{
value = new SynchronousSynchronizationContext();
}
return value;
}
}
public override void Post(SendOrPostCallback d, object state)
{
d(state);
}
public override void Send(SendOrPostCallback d, object state)
{
d(state);
}
}
class InvokeAsyncResult : AsyncResult
{
static Action waitCompleteCallback;
WorkflowApplication instance;
AsyncWaitHandle completionWaiter;
IDictionary outputs;
Exception completionException;
public InvokeAsyncResult(Activity activity, IDictionary inputs, WorkflowInstanceExtensionManager extensions, TimeSpan timeout, SynchronizationContext syncContext, AsyncInvokeContext invokeContext, AsyncCallback callback, object state)
: base(callback, state)
{
Fx.Assert(activity != null, "Need an activity");
this.completionWaiter = new AsyncWaitHandle();
syncContext = syncContext ?? SynchronousSynchronizationContext.Value;
this.instance = WorkflowApplication.StartInvoke(activity, inputs, extensions, syncContext, new Action(this.OnInvokeComplete), invokeContext);
if (this.completionWaiter.WaitAsync(WaitCompleteCallback, this, timeout))
{
bool completeSelf = OnWorkflowCompletion();
if (completeSelf)
{
if (this.completionException != null)
{
throw FxTrace.Exception.AsError(this.completionException);
}
else
{
Complete(true);
}
}
}
}
static Action WaitCompleteCallback
{
get
{
if (waitCompleteCallback == null)
{
waitCompleteCallback = new Action(OnWaitComplete);
}
return waitCompleteCallback;
}
}
public static IDictionary End(IAsyncResult result)
{
InvokeAsyncResult thisPtr = AsyncResult.End(result);
return thisPtr.outputs;
}
void OnInvokeComplete()
{
this.completionWaiter.Set();
}
static void OnWaitComplete(object state, TimeoutException asyncException)
{
InvokeAsyncResult thisPtr = (InvokeAsyncResult)state;
if (asyncException != null)
{
thisPtr.instance.Abort(SR.AbortingDueToInstanceTimeout);
thisPtr.Complete(false, asyncException);
return;
}
bool completeSelf = true;
try
{
completeSelf = thisPtr.OnWorkflowCompletion();
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
thisPtr.completionException = e;
}
if (completeSelf)
{
thisPtr.Complete(false, thisPtr.completionException);
}
}
bool OnWorkflowCompletion()
{
if (this.instance.Controller.State == WorkflowInstanceState.Aborted)
{
this.completionException = new WorkflowApplicationAbortedException(SR.DefaultAbortReason, this.instance.Controller.GetAbortReason());
}
else
{
Fx.Assert(this.instance.Controller.State == WorkflowInstanceState.Complete, "We should only get here when we are completed.");
this.instance.Controller.GetCompletionState(out this.outputs, out this.completionException);
}
return true;
}
}
class ResumeBookmarkAsyncResult : AsyncResult
{
static AsyncCompletion resumedCallback = new AsyncCompletion(OnResumed);
static Action waitCompleteCallback = new Action(OnWaitComplete);
static AsyncCompletion trackingCompleteCallback = new AsyncCompletion(OnTrackingComplete);
WorkflowApplication instance;
Bookmark bookmark;
object value;
BookmarkResumptionResult resumptionResult;
TimeoutHelper timeoutHelper;
bool isFromExtension;
InstanceOperation currentOperation;
public ResumeBookmarkAsyncResult(WorkflowApplication instance, Bookmark bookmark, object value, TimeSpan timeout, AsyncCallback callback, object state)
: this(instance, bookmark, value, false, timeout, callback, state)
{
}
public ResumeBookmarkAsyncResult(WorkflowApplication instance, Bookmark bookmark, object value, bool isFromExtension, TimeSpan timeout, AsyncCallback callback, object state)
: base(callback, state)
{
this.instance = instance;
this.bookmark = bookmark;
this.value = value;
this.isFromExtension = isFromExtension;
this.timeoutHelper = new TimeoutHelper(timeout);
bool completeSelf = false;
bool success = false;
this.OnCompleting = new Action(Finally);
try
{
if (!this.instance.hasCalledRun && !this.isFromExtension)
{
IAsyncResult result = this.instance.BeginInternalRun(this.timeoutHelper.RemainingTime(), false, PrepareAsyncCompletion(resumedCallback), this);
if (result.CompletedSynchronously)
{
completeSelf = OnResumed(result);
}
}
else
{
completeSelf = StartResumptionLoop();
}
success = true;
}
finally
{
// We only want to call this if we are throwing. Otherwise OnCompleting will take care of it.
if (!success)
{
this.instance.NotifyOperationComplete(this.currentOperation);
}
}
if (completeSelf)
{
Complete(true);
}
}
public static BookmarkResumptionResult End(IAsyncResult result)
{
ResumeBookmarkAsyncResult thisPtr = AsyncResult.End(result);
return thisPtr.resumptionResult;
}
void Finally(AsyncResult result, Exception completionException)
{
this.instance.NotifyOperationComplete(this.currentOperation);
}
static bool OnResumed(IAsyncResult result)
{
ResumeBookmarkAsyncResult thisPtr = (ResumeBookmarkAsyncResult)result.AsyncState;
thisPtr.instance.EndRun(result);
return thisPtr.StartResumptionLoop();
}
bool StartResumptionLoop()
{
this.currentOperation = new RequiresIdleOperation(this.isFromExtension);
return WaitOnCurrentOperation();
}
bool WaitOnCurrentOperation()
{
bool stillSync = true;
bool tryOneMore = true;
while (tryOneMore)
{
tryOneMore = false;
Fx.Assert(this.currentOperation != null, "We should always have a current operation here.");
if (this.instance.WaitForTurnAsync(this.currentOperation, this.timeoutHelper.RemainingTime(), waitCompleteCallback, this))
{
if (CheckIfBookmarksAreInvalid())
{
stillSync = true;
}
else
{
stillSync = ProcessResumption();
tryOneMore = this.resumptionResult == BookmarkResumptionResult.NotReady;
}
}
else
{
stillSync = false;
}
}
return stillSync;
}
static void OnWaitComplete(object state, TimeoutException asyncException)
{
ResumeBookmarkAsyncResult thisPtr = (ResumeBookmarkAsyncResult)state;
if (asyncException != null)
{
thisPtr.Complete(false, asyncException);
return;
}
Exception completionException = null;
bool completeSelf = false;
try
{
if (thisPtr.CheckIfBookmarksAreInvalid())
{
completeSelf = true;
}
else
{
completeSelf = thisPtr.ProcessResumption();
if (thisPtr.resumptionResult == BookmarkResumptionResult.NotReady)
{
completeSelf = thisPtr.WaitOnCurrentOperation();
}
}
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
completeSelf = true;
completionException = e;
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
bool CheckIfBookmarksAreInvalid()
{
if (this.instance.AreBookmarksInvalid(out this.resumptionResult))
{
return true;
}
return false;
}
bool ProcessResumption()
{
bool stillSync = true;
this.resumptionResult = this.instance.ResumeBookmarkCore(this.bookmark, this.value);
if (this.resumptionResult == BookmarkResumptionResult.Success)
{
if (this.instance.Controller.HasPendingTrackingRecords)
{
IAsyncResult result = this.instance.Controller.BeginFlushTrackingRecords(this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(trackingCompleteCallback), this);
if (result.CompletedSynchronously)
{
stillSync = OnTrackingComplete(result);
}
else
{
stillSync = false;
}
}
}
else if (this.resumptionResult == BookmarkResumptionResult.NotFound)
{
InstanceOperation lastOperation = this.currentOperation;
this.currentOperation = null;
this.instance.NotifyOperationComplete(lastOperation);
this.currentOperation = new DeferredRequiresIdleOperation();
}
return stillSync;
}
static bool OnTrackingComplete(IAsyncResult result)
{
ResumeBookmarkAsyncResult thisPtr = (ResumeBookmarkAsyncResult)result.AsyncState;
thisPtr.instance.Controller.EndFlushTrackingRecords(result);
return true;
}
}
class UnloadOrPersistAsyncResult : AsyncResult
{
static Action waitCompleteCallback = new Action(OnWaitComplete);
static AsyncCompletion savedCallback = new AsyncCompletion(OnSaved);
static AsyncCompletion persistedCallback = new AsyncCompletion(OnPersisted);
static AsyncCompletion initializedCallback = new AsyncCompletion(OnProviderInitialized);
static AsyncCompletion readynessEnsuredCallback = new AsyncCompletion(OnProviderReadynessEnsured);
static AsyncCompletion trackingCompleteCallback = new AsyncCompletion(OnTrackingComplete);
static AsyncCompletion deleteOwnerCompleteCallback = new AsyncCompletion(OnOwnerDeleted);
static AsyncCompletion completeContextCallback = new AsyncCompletion(OnCompleteContext);
static Action completeCallback = new Action(OnComplete);
DependentTransaction dependentTransaction;
WorkflowApplication instance;
bool isUnloaded;
TimeoutHelper timeoutHelper;
PersistenceOperation operation;
RequiresPersistenceOperation instanceOperation;
WorkflowPersistenceContext context;
IDictionary data;
PersistencePipeline pipeline;
bool isInternalPersist;
public UnloadOrPersistAsyncResult(WorkflowApplication instance, TimeSpan timeout, PersistenceOperation operation,
bool isWorkflowThread, bool isInternalPersist, AsyncCallback callback, object state)
: base(callback, state)
{
this.instance = instance;
this.timeoutHelper = new TimeoutHelper(timeout);
this.operation = operation;
this.isInternalPersist = isInternalPersist;
this.isUnloaded = (operation == PersistenceOperation.Unload || operation == PersistenceOperation.Complete);
this.OnCompleting = UnloadOrPersistAsyncResult.completeCallback;
bool completeSelf;
bool success = false;
// Save off the current transaction in case we have an async operation before we end up creating
// the WorkflowPersistenceContext and create it on another thread. Do a blocking dependent clone that
// we will complete when we are completed.
//
// This will throw TransactionAbortedException by design, if the transaction is already rolled back.
Transaction currentTransaction = Transaction.Current;
if (currentTransaction != null)
{
this.dependentTransaction = currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
}
try
{
if (isWorkflowThread)
{
Fx.Assert(this.instance.Controller.IsPersistable, "The runtime won't schedule this work item unless we've passed the guard");
// We're an internal persistence on the workflow thread which means
// that we are passed the guard already, we have the lock, and we know
// we aren't detached.
completeSelf = InitializeProvider();
success = true;
}
else
{
this.instanceOperation = new RequiresPersistenceOperation();
try
{
if (this.instance.WaitForTurnAsync(this.instanceOperation, this.timeoutHelper.RemainingTime(), waitCompleteCallback, this))
{
completeSelf = ValidateState();
}
else
{
completeSelf = false;
}
success = true;
}
finally
{
if (!success)
{
NotifyOperationComplete();
}
}
}
}
finally
{
// If we had an exception, we need to complete the dependent transaction.
if (!success)
{
if (this.dependentTransaction != null)
{
this.dependentTransaction.Complete();
}
}
}
if (completeSelf)
{
Complete(true);
}
}
bool ValidateState()
{
bool alreadyUnloaded = false;
if (this.operation == PersistenceOperation.Unload)
{
this.instance.ValidateStateForUnload();
alreadyUnloaded = this.instance.state == WorkflowApplicationState.Unloaded;
}
else
{
this.instance.ValidateStateForPersist();
}
if (alreadyUnloaded)
{
return true;
}
else
{
return InitializeProvider();
}
}
static void OnWaitComplete(object state, TimeoutException asyncException)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)state;
if (asyncException != null)
{
thisPtr.Complete(false, asyncException);
return;
}
bool completeSelf;
Exception completionException = null;
try
{
completeSelf = thisPtr.ValidateState();
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
completionException = e;
completeSelf = true;
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
bool InitializeProvider()
{
// We finally have the lock and are passed the guard. Let's update our operation if this is an Unload.
if (this.operation == PersistenceOperation.Unload && this.instance.Controller.State == WorkflowInstanceState.Complete)
{
this.operation = PersistenceOperation.Complete;
}
if (this.instance.HasPersistenceProvider && !this.instance.persistenceManager.IsInitialized)
{
IAsyncResult result = this.instance.persistenceManager.BeginInitialize(this.timeoutHelper.RemainingTime(),
PrepareAsyncCompletion(UnloadOrPersistAsyncResult.initializedCallback), this);
return SyncContinue(result);
}
else
{
return EnsureProviderReadyness();
}
}
static bool OnProviderInitialized(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
thisPtr.instance.persistenceManager.EndInitialize(result);
return thisPtr.EnsureProviderReadyness();
}
bool EnsureProviderReadyness()
{
if (this.instance.HasPersistenceProvider && !this.instance.persistenceManager.IsLocked && this.dependentTransaction != null)
{
IAsyncResult result = this.instance.persistenceManager.BeginEnsureReadyness(this.timeoutHelper.RemainingTime(),
PrepareAsyncCompletion(UnloadOrPersistAsyncResult.readynessEnsuredCallback), this);
return SyncContinue(result);
}
else
{
return Track();
}
}
static bool OnProviderReadynessEnsured(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
thisPtr.instance.persistenceManager.EndEnsureReadyness(result);
return thisPtr.Track();
}
public static void End(IAsyncResult result)
{
AsyncResult.End(result);
}
void NotifyOperationComplete()
{
RequiresPersistenceOperation localInstanceOperation = this.instanceOperation;
this.instanceOperation = null;
this.instance.NotifyOperationComplete(localInstanceOperation);
}
bool Track()
{
// Do the tracking before preparing in case the tracking data is being pushed into
// an extension and persisted transactionally with the instance state.
if (this.instance.HasPersistenceProvider)
{
// We only track the persistence operation if we actually
// are persisting (and not just hitting PersistenceParticipants)
this.instance.TrackPersistence(this.operation);
}
if (this.instance.Controller.HasPendingTrackingRecords)
{
TimeSpan flushTrackingRecordsTimeout;
if (this.isInternalPersist)
{
// If we're an internal persist we're using TimeSpan.MaxValue
// for our persistence and we want to use a smaller timeout
// for tracking
flushTrackingRecordsTimeout = ActivityDefaults.TrackingTimeout;
}
else
{
flushTrackingRecordsTimeout = this.timeoutHelper.RemainingTime();
}
IAsyncResult result = this.instance.Controller.BeginFlushTrackingRecords(flushTrackingRecordsTimeout, PrepareAsyncCompletion(trackingCompleteCallback), this);
return SyncContinue(result);
}
return CollectAndMap();
}
static bool OnTrackingComplete(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
thisPtr.instance.Controller.EndFlushTrackingRecords(result);
return thisPtr.CollectAndMap();
}
bool CollectAndMap()
{
bool success = false;
try
{
if (this.instance.HasPersistenceModule)
{
IEnumerable modules = this.instance.GetExtensions();
this.pipeline = new PersistencePipeline(modules, PersistenceManager.GenerateInitialData(this.instance));
this.pipeline.Collect();
this.pipeline.Map();
this.data = this.pipeline.Values;
}
success = true;
}
finally
{
if (!success && this.context != null)
{
this.context.Abort();
}
}
if (this.instance.HasPersistenceProvider)
{
return Persist();
}
else
{
return Save();
}
}
bool Persist()
{
IAsyncResult result = null;
try
{
if (this.data == null)
{
this.data = PersistenceManager.GenerateInitialData(this.instance);
}
if (this.context == null)
{
this.context = new WorkflowPersistenceContext(this.pipeline != null && this.pipeline.IsSaveTransactionRequired,
this.dependentTransaction, this.timeoutHelper.OriginalTimeout);
}
using (PrepareTransactionalCall(this.context.PublicTransaction))
{
result = this.instance.persistenceManager.BeginSave(this.data, this.operation, this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(persistedCallback), this);
}
}
finally
{
if (result == null && this.context != null)
{
this.context.Abort();
}
}
return SyncContinue(result);
}
static bool OnPersisted(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
bool success = false;
try
{
thisPtr.instance.persistenceManager.EndSave(result);
success = true;
}
finally
{
if (!success)
{
thisPtr.context.Abort();
}
}
return thisPtr.Save();
}
bool Save()
{
if (this.pipeline != null)
{
IAsyncResult result = null;
try
{
if (this.context == null)
{
this.context = new WorkflowPersistenceContext(this.pipeline.IsSaveTransactionRequired,
this.dependentTransaction, this.timeoutHelper.RemainingTime());
}
this.instance.persistencePipelineInUse = this.pipeline;
Thread.MemoryBarrier();
if (this.instance.state == WorkflowApplicationState.Aborted)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason));
}
using (PrepareTransactionalCall(this.context.PublicTransaction))
{
result = this.pipeline.BeginSave(this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(savedCallback), this);
}
}
finally
{
if (result == null)
{
this.instance.persistencePipelineInUse = null;
if (this.context != null)
{
this.context.Abort();
}
}
}
return SyncContinue(result);
}
else
{
return CompleteContext();
}
}
static bool OnSaved(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
bool success = false;
try
{
thisPtr.pipeline.EndSave(result);
success = true;
}
finally
{
thisPtr.instance.persistencePipelineInUse = null;
if (!success)
{
thisPtr.context.Abort();
}
}
return thisPtr.CompleteContext();
}
bool CompleteContext()
{
bool wentAsync = false;
IAsyncResult completeResult = null;
if (this.context != null)
{
wentAsync = this.context.TryBeginComplete(this.PrepareAsyncCompletion(completeContextCallback), this, out completeResult);
}
if (wentAsync)
{
Fx.Assert(completeResult != null, "We shouldn't have null here because we would have rethrown or gotten false for went async.");
return SyncContinue(completeResult);
}
else
{
// We completed synchronously if we didn't get an async result out of
// TryBeginComplete
return DeleteOwner();
}
}
static bool OnCompleteContext(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
thisPtr.context.EndComplete(result);
return thisPtr.DeleteOwner();
}
bool DeleteOwner()
{
if (this.instance.HasPersistenceProvider && this.instance.persistenceManager.OwnerWasCreated &&
(this.operation == PersistenceOperation.Unload || this.operation == PersistenceOperation.Complete))
{
// This call uses the ambient transaction directly if there was one, to mimic the [....] case.
//
IAsyncResult deleteOwnerResult = null;
using (PrepareTransactionalCall(this.dependentTransaction))
{
deleteOwnerResult = this.instance.persistenceManager.BeginDeleteOwner(this.timeoutHelper.RemainingTime(),
this.PrepareAsyncCompletion(UnloadOrPersistAsyncResult.deleteOwnerCompleteCallback), this);
}
return this.SyncContinue(deleteOwnerResult);
}
else
{
return CloseInstance();
}
}
static bool OnOwnerDeleted(IAsyncResult result)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result.AsyncState;
thisPtr.instance.persistenceManager.EndDeleteOwner(result);
return thisPtr.CloseInstance();
}
bool CloseInstance()
{
// NOTE: We need to make sure that any changes which occur
// here are appropriately ported to WorkflowApplication's
// CompletionHandler.OnStage1Complete method in the case
// where we don't call BeginPersist.
if (this.operation != PersistenceOperation.Save)
{
// Stop execution if we've given up the instance lock
this.instance.state = WorkflowApplicationState.Paused;
}
if (this.isUnloaded)
{
this.instance.MarkUnloaded();
}
return true;
}
static void OnComplete(AsyncResult result, Exception exception)
{
UnloadOrPersistAsyncResult thisPtr = (UnloadOrPersistAsyncResult)result;
try
{
thisPtr.NotifyOperationComplete();
}
finally
{
if (thisPtr.dependentTransaction != null)
{
thisPtr.dependentTransaction.Complete();
}
}
}
}
abstract class SimpleOperationAsyncResult : AsyncResult
{
static Action waitCompleteCallback = new Action(OnWaitComplete);
static AsyncCallback trackingCompleteCallback = Fx.ThunkCallback(new AsyncCallback(OnTrackingComplete));
WorkflowApplication instance;
TimeoutHelper timeoutHelper;
protected SimpleOperationAsyncResult(WorkflowApplication instance, AsyncCallback callback, object state)
: base(callback, state)
{
this.instance = instance;
}
protected WorkflowApplication Instance
{
get
{
return this.instance;
}
}
protected void Run(TimeSpan timeout)
{
this.timeoutHelper = new TimeoutHelper(timeout);
InstanceOperation operation = new InstanceOperation();
bool completeSelf = true;
try
{
completeSelf = this.instance.WaitForTurnAsync(operation, this.timeoutHelper.RemainingTime(), waitCompleteCallback, this);
if (completeSelf)
{
this.ValidateState();
completeSelf = PerformOperationAndTrack();
}
}
finally
{
if (completeSelf)
{
this.instance.NotifyOperationComplete(operation);
}
}
if (completeSelf)
{
Complete(true);
}
}
bool PerformOperationAndTrack()
{
PerformOperation();
bool completedSync = true;
if (this.instance.Controller.HasPendingTrackingRecords)
{
IAsyncResult trackingResult = this.instance.Controller.BeginFlushTrackingRecords(this.timeoutHelper.RemainingTime(), trackingCompleteCallback, this);
if (trackingResult.CompletedSynchronously)
{
this.instance.Controller.EndFlushTrackingRecords(trackingResult);
}
else
{
completedSync = false;
}
}
return completedSync;
}
static void OnWaitComplete(object state, TimeoutException asyncException)
{
SimpleOperationAsyncResult thisPtr = (SimpleOperationAsyncResult)state;
if (asyncException != null)
{
thisPtr.Complete(false, asyncException);
}
else
{
Exception completionException = null;
bool completeSelf = true;
try
{
thisPtr.ValidateState();
completeSelf = thisPtr.PerformOperationAndTrack();
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
completionException = e;
}
finally
{
if (completeSelf)
{
thisPtr.instance.ForceNotifyOperationComplete();
}
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
}
static void OnTrackingComplete(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
SimpleOperationAsyncResult thisPtr = (SimpleOperationAsyncResult)result.AsyncState;
Exception completionException = null;
try
{
thisPtr.instance.Controller.EndFlushTrackingRecords(result);
}
catch (Exception e)
{
if (Fx.IsFatal(e))
{
throw;
}
completionException = e;
}
finally
{
thisPtr.instance.ForceNotifyOperationComplete();
}
thisPtr.Complete(false, completionException);
}
protected abstract void ValidateState();
protected abstract void PerformOperation();
}
class TerminateAsyncResult : SimpleOperationAsyncResult
{
Exception reason;
TerminateAsyncResult(WorkflowApplication instance, Exception reason, AsyncCallback callback, object state)
: base(instance, callback, state)
{
this.reason = reason;
}
public static TerminateAsyncResult Create(WorkflowApplication instance, Exception reason, TimeSpan timeout, AsyncCallback callback, object state)
{
TerminateAsyncResult result = new TerminateAsyncResult(instance, reason, callback, state);
result.Run(timeout);
return result;
}
public static void End(IAsyncResult result)
{
AsyncResult.End(result);
}
protected override void ValidateState()
{
this.Instance.ValidateStateForTerminate();
}
protected override void PerformOperation()
{
this.Instance.TerminateCore(this.reason);
}
}
class CancelAsyncResult : SimpleOperationAsyncResult
{
CancelAsyncResult(WorkflowApplication instance, AsyncCallback callback, object state)
: base(instance, callback, state)
{
}
public static CancelAsyncResult Create(WorkflowApplication instance, TimeSpan timeout, AsyncCallback callback, object state)
{
CancelAsyncResult result = new CancelAsyncResult(instance, callback, state);
result.Run(timeout);
return result;
}
public static void End(IAsyncResult result)
{
AsyncResult.End(result);
}
protected override void ValidateState()
{
this.Instance.ValidateStateForCancel();
}
protected override void PerformOperation()
{
this.Instance.CancelCore();
}
}
class RunAsyncResult : SimpleOperationAsyncResult
{
bool isUserRun;
RunAsyncResult(WorkflowApplication instance, bool isUserRun, AsyncCallback callback, object state)
: base(instance, callback, state)
{
this.isUserRun = isUserRun;
}
public static RunAsyncResult Create(WorkflowApplication instance, bool isUserRun, TimeSpan timeout, AsyncCallback callback, object state)
{
RunAsyncResult result = new RunAsyncResult(instance, isUserRun, callback, state);
result.Run(timeout);
return result;
}
public static void End(IAsyncResult result)
{
AsyncResult.End(result);
}
protected override void ValidateState()
{
this.Instance.ValidateStateForRun();
}
protected override void PerformOperation()
{
if (this.isUserRun)
{
// We set this to true here so that idle will be raised
// regardless of whether any work is performed.
this.Instance.hasExecutionOccurredSinceLastIdle = true;
}
this.Instance.RunCore();
}
}
class LoadAsyncResult : AsyncResult
{
static AsyncCompletion providerRegisteredCallback = new AsyncCompletion(OnProviderRegistered);
static AsyncCompletion loadCompleteCallback = new AsyncCompletion(OnLoadComplete);
static AsyncCompletion loadPipelineCallback = new AsyncCompletion(OnLoadPipeline);
static AsyncCompletion completeContextCallback = new AsyncCompletion(OnCompleteContext);
static Action completeCallback = new Action(OnComplete);
readonly WorkflowApplication application;
readonly TimeoutHelper timeoutHelper;
readonly bool loadAny;
object deserializedRuntimeState;
PersistencePipeline pipeline;
WorkflowPersistenceContext context;
DependentTransaction dependentTransaction;
public LoadAsyncResult(WorkflowApplication application, bool loadAny, TimeSpan timeout, AsyncCallback callback, object state)
: base(callback, state)
{
this.application = application;
this.timeoutHelper = new TimeoutHelper(timeout);
this.loadAny = loadAny;
OnCompleting = LoadAsyncResult.completeCallback;
// Save off the current transaction in case we have an async operation before we end up creating
// the WorkflowPersistenceContext and create it on another thread. Do a simple clone here to prevent
// the object referenced by Transaction.Current from disposing before we get around to referencing it
// when we create the WorkflowPersistenceContext.
//
// This will throw TransactionAbortedException by design, if the transaction is already rolled back.
Transaction currentTransaction = Transaction.Current;
if (currentTransaction != null)
{
this.dependentTransaction = currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
}
bool completeSelf;
bool success = false;
try
{
completeSelf = RegisterProvider();
success = true;
}
finally
{
if (!success)
{
if (this.dependentTransaction != null)
{
this.dependentTransaction.Complete();
}
this.application.Abort(SR.AbortingDueToLoadFailure);
}
}
if (completeSelf)
{
Complete(true);
}
}
public static void End(IAsyncResult result)
{
AsyncResult.End(result);
}
bool RegisterProvider()
{
if (!this.application.persistenceManager.IsInitialized)
{
IAsyncResult result = this.application.persistenceManager.BeginInitialize(this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(providerRegisteredCallback), this);
return SyncContinue(result);
}
else
{
return Load();
}
}
static bool OnProviderRegistered(IAsyncResult result)
{
LoadAsyncResult thisPtr = (LoadAsyncResult)result.AsyncState;
thisPtr.application.persistenceManager.EndInitialize(result);
return thisPtr.Load();
}
bool Load()
{
IAsyncResult result = null;
try
{
this.context = new WorkflowPersistenceContext(this.application.IsLoadTransactionRequired(),
this.dependentTransaction, this.timeoutHelper.OriginalTimeout);
using (PrepareTransactionalCall(this.context.PublicTransaction))
{
if (this.loadAny)
{
result = this.application.persistenceManager.BeginTryLoad(this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(loadCompleteCallback), this);
}
else
{
result = this.application.persistenceManager.BeginLoad(this.timeoutHelper.RemainingTime(), PrepareAsyncCompletion(loadCompleteCallback), this);
}
}
}
finally
{
if (result == null && this.context != null)
{
this.context.Abort();
}
}
return SyncContinue(result);
}
static bool OnLoadComplete(IAsyncResult result)
{
LoadAsyncResult thisPtr = (LoadAsyncResult)result.AsyncState;
IAsyncResult loadResult = null;
bool success = false;
try
{
IDictionary values;
if (thisPtr.loadAny)
{
if (!thisPtr.application.persistenceManager.EndTryLoad(result, out values))
{
throw FxTrace.Exception.AsError(new InstanceNotReadyException(SR.NoRunnableInstances));
}
if (thisPtr.application.instanceIdSet)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.WorkflowApplicationAlreadyHasId));
}
thisPtr.application.instanceId = thisPtr.application.persistenceManager.InstanceId;
thisPtr.application.instanceIdSet = true;
}
else
{
values = thisPtr.application.persistenceManager.EndLoad(result);
}
thisPtr.pipeline = thisPtr.application.ProcessInstanceValues(values, out thisPtr.deserializedRuntimeState);
if (thisPtr.pipeline != null)
{
thisPtr.pipeline.SetLoadedValues(values);
thisPtr.application.persistencePipelineInUse = thisPtr.pipeline;
Thread.MemoryBarrier();
if (thisPtr.application.state == WorkflowApplicationState.Aborted)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason));
}
using (thisPtr.PrepareTransactionalCall(thisPtr.context.PublicTransaction))
{
loadResult = thisPtr.pipeline.BeginLoad(thisPtr.timeoutHelper.RemainingTime(), thisPtr.PrepareAsyncCompletion(loadPipelineCallback), thisPtr);
}
}
success = true;
}
finally
{
if (!success)
{
thisPtr.context.Abort();
}
}
if (thisPtr.pipeline != null)
{
return thisPtr.SyncContinue(loadResult);
}
else
{
return thisPtr.CompleteContext();
}
}
static bool OnLoadPipeline(IAsyncResult result)
{
LoadAsyncResult thisPtr = (LoadAsyncResult)result.AsyncState;
bool success = false;
try
{
thisPtr.pipeline.EndLoad(result);
success = true;
}
finally
{
if (!success)
{
thisPtr.context.Abort();
}
}
return thisPtr.CompleteContext();
}
bool CompleteContext()
{
this.application.Initialize(this.deserializedRuntimeState);
IAsyncResult completeResult;
if (this.context.TryBeginComplete(PrepareAsyncCompletion(completeContextCallback), this, out completeResult))
{
Fx.Assert(completeResult != null, "We shouldn't have null here.");
return SyncContinue(completeResult);
}
else
{
return Finish();
}
}
static bool OnCompleteContext(IAsyncResult result)
{
LoadAsyncResult thisPtr = (LoadAsyncResult)result.AsyncState;
thisPtr.context.EndComplete(result);
return thisPtr.Finish();
}
bool Finish()
{
if (this.pipeline != null)
{
this.pipeline.Publish();
}
return true;
}
static void OnComplete(AsyncResult result, Exception exception)
{
LoadAsyncResult thisPtr = (LoadAsyncResult)result;
if (thisPtr.dependentTransaction != null)
{
thisPtr.dependentTransaction.Complete();
}
if (exception != null)
{
thisPtr.application.Abort(SR.AbortingDueToLoadFailure);
}
}
}
// this class is not a general purpose SyncContext and is only meant to work for workflow scenarios, where the scheduler ensures
// at most one work item pending. The scheduler ensures that Invoke must run before Post is called on a different thread.
class PumpBasedSynchronizationContext : SynchronizationContext
{
// The waitObject is cached per thread so that we can avoid the cost of creating
// events for multiple synchronous invokes.
[ThreadStatic]
static AutoResetEvent waitObject;
AutoResetEvent queueWaiter;
WorkItem currentWorkItem;
object thisLock;
TimeoutHelper timeoutHelper;
public PumpBasedSynchronizationContext(TimeSpan timeout)
{
this.timeoutHelper = new TimeoutHelper(timeout);
this.thisLock = new object();
}
bool IsInvokeCompleted
{
get;
set;
}
public void DoPump()
{
Fx.Assert(this.currentWorkItem != null, "the work item cannot be null");
WorkItem workItem;
lock (this.thisLock)
{
if (PumpBasedSynchronizationContext.waitObject == null)
{
PumpBasedSynchronizationContext.waitObject = new AutoResetEvent(false);
}
this.queueWaiter = PumpBasedSynchronizationContext.waitObject;
workItem = this.currentWorkItem;
this.currentWorkItem = null;
workItem.Invoke();
}
Fx.Assert(this.queueWaiter != null, "queue waiter cannot be null");
while (this.WaitForNextItem())
{
Fx.Assert(this.currentWorkItem != null, "the work item cannot be null");
workItem = this.currentWorkItem;
this.currentWorkItem = null;
workItem.Invoke();
}
}
public override void Post(SendOrPostCallback d, object state)
{
ScheduleWorkItem(new WorkItem(d, state));
}
public override void Send(SendOrPostCallback d, object state)
{
throw FxTrace.Exception.AsError(FxTrace.FailFast(SR.SendNotSupported));
}
// Since tracking can go async this may or may not be called directly
// under a call to workItem.Invoke. Also, the scheduler may call
// OnNotifyPaused or OnNotifyUnhandledException from any random thread
// if runtime goes async (post-work item tracking, AsyncCodeActivity).
public void OnInvokeCompleted()
{
Fx.AssertAndFailFast(this.currentWorkItem == null, "There can be no pending work items when complete");
this.IsInvokeCompleted = true;
lock (this.thisLock)
{
if (this.queueWaiter != null)
{
// Since we don't know which thread this is being called
// from we just set the waiter directly rather than
// doing our SetWaiter cleanup.
this.queueWaiter.Set();
}
}
}
void ScheduleWorkItem(WorkItem item)
{
lock (this.thisLock)
{
Fx.AssertAndFailFast(this.currentWorkItem == null, "There cannot be more than 1 work item at a given time");
this.currentWorkItem = item;
if (this.queueWaiter != null)
{
// Since we don't know which thread this is being called
// from we just set the waiter directly rather than
// doing our SetWaiter cleanup.
this.queueWaiter.Set();
}
}
}
bool WaitOne(AutoResetEvent waiter, TimeSpan timeout)
{
bool success = false;
try
{
bool result = TimeoutHelper.WaitOne(waiter, timeout);
// if the wait timed out, reset the thread static
success = result;
return result;
}
finally
{
if (!success)
{
PumpBasedSynchronizationContext.waitObject = null;
}
}
}
bool WaitForNextItem()
{
if (!WaitOne(this.queueWaiter, timeoutHelper.RemainingTime()))
{
throw FxTrace.Exception.AsError(new TimeoutException(SR.TimeoutOnOperation(timeoutHelper.OriginalTimeout)));
}
// We need to check this after the wait as well in
// case the notification came in asynchronously
if (this.IsInvokeCompleted)
{
return false;
}
return true;
}
class WorkItem
{
SendOrPostCallback callback;
object state;
public WorkItem(SendOrPostCallback callback, object state)
{
this.callback = callback;
this.state = state;
}
public void Invoke()
{
this.callback(this.state);
}
}
}
class WorkflowEventData
{
public WorkflowEventData(WorkflowApplication instance)
{
this.Instance = instance;
}
public WorkflowApplication Instance
{
get;
private set;
}
public Func NextCallback
{
get;
set;
}
public Exception UnhandledException
{
get;
set;
}
public Activity UnhandledExceptionSource
{
get;
set;
}
public string UnhandledExceptionSourceInstance
{
get;
set;
}
}
class IdleEventHandler
{
Func stage1Callback;
Func stage2Callback;
public IdleEventHandler()
{
}
Func Stage1Callback
{
get
{
if (this.stage1Callback == null)
{
this.stage1Callback = new Func(OnStage1Complete);
}
return this.stage1Callback;
}
}
Func Stage2Callback
{
get
{
if (this.stage2Callback == null)
{
this.stage2Callback = new Func(OnStage2Complete);
}
return this.stage2Callback;
}
}
public bool Run(WorkflowApplication instance)
{
IAsyncResult result = null;
if (instance.Controller.TrackingEnabled)
{
instance.Controller.Track(new WorkflowInstanceRecord(instance.Id, instance.WorkflowDefinition.DisplayName, WorkflowInstanceStates.Idle));
instance.EventData.NextCallback = this.Stage1Callback;
result = instance.Controller.BeginFlushTrackingRecords(ActivityDefaults.TrackingTimeout, EventFrameCallback, instance.EventData);
if (!result.CompletedSynchronously)
{
return false;
}
}
return OnStage1Complete(result, instance, true);
}
bool OnStage1Complete(IAsyncResult lastResult, WorkflowApplication application, bool isStillSync)
{
if (lastResult != null)
{
application.Controller.EndFlushTrackingRecords(lastResult);
}
IAsyncResult result = null;
if (application.RaiseIdleEvent())
{
if (application.Controller.IsPersistable && application.persistenceManager != null)
{
Func persistableIdleHandler = application.PersistableIdle;
if (persistableIdleHandler != null)
{
PersistableIdleAction action = PersistableIdleAction.None;
application.handlerThreadId = Thread.CurrentThread.ManagedThreadId;
try
{
application.isInHandler = true;
action = persistableIdleHandler(new WorkflowApplicationIdleEventArgs(application));
}
finally
{
application.isInHandler = false;
}
if (TD.WorkflowApplicationPersistableIdleIsEnabled())
{
TD.WorkflowApplicationPersistableIdle(application.Id.ToString(), action.ToString());
}
if (action != PersistableIdleAction.None)
{
PersistenceOperation operation = PersistenceOperation.Unload;
if (action == PersistableIdleAction.Persist)
{
operation = PersistenceOperation.Save;
}
else if (action != PersistableIdleAction.Unload)
{
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.InvalidIdleAction));
}
application.EventData.NextCallback = this.Stage2Callback;
result = application.BeginInternalPersist(operation, ActivityDefaults.InternalSaveTimeout, true, EventFrameCallback, application.EventData);
if (!result.CompletedSynchronously)
{
return false;
}
}
}
else
{
// Trace the default action
if (TD.WorkflowApplicationPersistableIdleIsEnabled())
{
TD.WorkflowApplicationPersistableIdle(application.Id.ToString(), PersistableIdleAction.None.ToString());
}
}
}
}
return OnStage2Complete(result, application, isStillSync);
}
bool OnStage2Complete(IAsyncResult lastResult, WorkflowApplication instance, bool isStillSync)
{
if (lastResult != null)
{
instance.EndInternalPersist(lastResult);
}
return true;
}
}
class CompletedEventHandler
{
Func stage1Callback;
Func stage2Callback;
public CompletedEventHandler()
{
}
Func Stage1Callback
{
get
{
if (this.stage1Callback == null)
{
this.stage1Callback = new Func(OnStage1Complete);
}
return this.stage1Callback;
}
}
Func Stage2Callback
{
get
{
if (this.stage2Callback == null)
{
this.stage2Callback = new Func(OnStage2Complete);
}
return this.stage2Callback;
}
}
public bool Run(WorkflowApplication instance)
{
IAsyncResult result = null;
if (instance.Controller.HasPendingTrackingRecords)
{
instance.EventData.NextCallback = this.Stage1Callback;
result = instance.Controller.BeginFlushTrackingRecords(ActivityDefaults.TrackingTimeout, EventFrameCallback, instance.EventData);
if (!result.CompletedSynchronously)
{
return false;
}
}
return OnStage1Complete(result, instance, true);
}
bool OnStage1Complete(IAsyncResult lastResult, WorkflowApplication instance, bool isStillSync)
{
if (lastResult != null)
{
instance.Controller.EndFlushTrackingRecords(lastResult);
}
IDictionary outputs;
Exception completionException;
ActivityInstanceState completionState = instance.Controller.GetCompletionState(out outputs, out completionException);
if (instance.invokeCompletedCallback == null)
{
Action handler = instance.Completed;
if (handler != null)
{
instance.handlerThreadId = Thread.CurrentThread.ManagedThreadId;
try
{
instance.isInHandler = true;
handler(new WorkflowApplicationCompletedEventArgs(instance, completionException, completionState, outputs));
}
finally
{
instance.isInHandler = false;
}
}
}
switch (completionState)
{
case ActivityInstanceState.Closed:
if (TD.WorkflowApplicationCompletedIsEnabled())
{
TD.WorkflowApplicationCompleted(instance.Id.ToString());
}
break;
case ActivityInstanceState.Canceled:
if (TD.WorkflowInstanceCanceledIsEnabled())
{
TD.WorkflowInstanceCanceled(instance.Id.ToString());
}
break;
case ActivityInstanceState.Faulted:
if (TD.WorkflowApplicationTerminatedIsEnabled())
{
TD.WorkflowApplicationTerminated(instance.Id.ToString(), completionException);
}
break;
}
IAsyncResult result = null;
Fx.Assert(instance.Controller.IsPersistable, "Should not be in a No Persist Zone once the instance is complete.");
if (instance.persistenceManager != null || instance.HasPersistenceModule)
{
instance.EventData.NextCallback = this.Stage2Callback;
result = instance.BeginInternalPersist(PersistenceOperation.Unload, ActivityDefaults.InternalSaveTimeout, true, EventFrameCallback, instance.EventData);
if (!result.CompletedSynchronously)
{
return false;
}
}
else
{
instance.MarkUnloaded();
}
return OnStage2Complete(result, instance, isStillSync);
}
bool OnStage2Complete(IAsyncResult lastResult, WorkflowApplication instance, bool isStillSync)
{
if (lastResult != null)
{
instance.EndInternalPersist(lastResult);
}
if (instance.invokeCompletedCallback != null)
{
instance.invokeCompletedCallback();
}
return true;
}
}
class UnhandledExceptionEventHandler
{
Func stage1Callback;
public UnhandledExceptionEventHandler()
{
}
Func Stage1Callback
{
get
{
if (this.stage1Callback == null)
{
this.stage1Callback = new Func(OnStage1Complete);
}
return this.stage1Callback;
}
}
public bool Run(WorkflowApplication instance, Exception exception, Activity exceptionSource, string exceptionSourceInstanceId)
{
IAsyncResult result = null;
if (instance.Controller.HasPendingTrackingRecords)
{
instance.EventData.NextCallback = this.Stage1Callback;
instance.EventData.UnhandledException = exception;
instance.EventData.UnhandledExceptionSource = exceptionSource;
instance.EventData.UnhandledExceptionSourceInstance = exceptionSourceInstanceId;
result = instance.Controller.BeginFlushTrackingRecords(ActivityDefaults.TrackingTimeout, EventFrameCallback, instance.EventData);
if (!result.CompletedSynchronously)
{
return false;
}
}
return OnStage1Complete(result, instance, exception, exceptionSource, exceptionSourceInstanceId);
}
bool OnStage1Complete(IAsyncResult lastResult, WorkflowApplication instance, bool isStillSync)
{
return OnStage1Complete(lastResult, instance, instance.EventData.UnhandledException, instance.EventData.UnhandledExceptionSource, instance.EventData.UnhandledExceptionSourceInstance);
}
bool OnStage1Complete(IAsyncResult lastResult, WorkflowApplication instance, Exception exception, Activity source, string sourceInstanceId)
{
if (lastResult != null)
{
instance.Controller.EndFlushTrackingRecords(lastResult);
}
Func handler = instance.OnUnhandledException;
UnhandledExceptionAction action = UnhandledExceptionAction.Terminate;
if (handler != null)
{
try
{
instance.isInHandler = true;
instance.handlerThreadId = Thread.CurrentThread.ManagedThreadId;
action = handler(new WorkflowApplicationUnhandledExceptionEventArgs(instance, exception, source, sourceInstanceId));
}
finally
{
instance.isInHandler = false;
}
}
if (instance.invokeCompletedCallback != null)
{
action = UnhandledExceptionAction.Terminate;
}
if (TD.WorkflowApplicationUnhandledExceptionIsEnabled())
{
TD.WorkflowApplicationUnhandledException(instance.Id.ToString(), source.GetType().ToString(), source.DisplayName, action.ToString(), exception);
}
switch (action)
{
case UnhandledExceptionAction.Abort:
instance.AbortInstance(exception, true);
break;
case UnhandledExceptionAction.Cancel:
instance.Controller.ScheduleCancel();
break;
case UnhandledExceptionAction.Terminate:
instance.TerminateCore(exception);
break;
default:
throw FxTrace.Exception.AsError(new InvalidOperationException(SR.InvalidUnhandledExceptionAction));
}
return true;
}
}
class InstanceOperation
{
AsyncWaitHandle waitHandle;
public InstanceOperation()
{
this.InterruptsScheduler = true;
}
public bool Notified
{
get;
set;
}
public int ActionId
{
get;
set;
}
public bool InterruptsScheduler
{
get;
protected set;
}
public void OnEnqueued()
{
this.waitHandle = new AsyncWaitHandle();
}
public virtual bool CanRun(WorkflowApplication instance)
{
return true;
}
public void NotifyTurn()
{
Fx.Assert(this.waitHandle != null, "We must have a wait handle.");
waitHandle.Set();
}
public bool WaitForTurn(TimeSpan timeout)
{
if (this.waitHandle != null)
{
return this.waitHandle.Wait(timeout);
}
return true;
}
public bool WaitForTurnAsync(TimeSpan timeout, Action callback, object state)
{
if (this.waitHandle != null)
{
return this.waitHandle.WaitAsync(callback, state, timeout);
}
return true;
}
}
class RequiresIdleOperation : InstanceOperation
{
bool requiresRunnableInstance;
public RequiresIdleOperation()
: this(false)
{
}
public RequiresIdleOperation(bool requiresRunnableInstance)
{
this.InterruptsScheduler = false;
this.requiresRunnableInstance = requiresRunnableInstance;
}
public override bool CanRun(WorkflowApplication instance)
{
if (requiresRunnableInstance && instance.state != WorkflowApplicationState.Runnable)
{
return false;
}
return instance.Controller.State == WorkflowInstanceState.Idle || instance.Controller.State == WorkflowInstanceState.Complete;
}
}
class DeferredRequiresIdleOperation : InstanceOperation
{
public DeferredRequiresIdleOperation()
{
this.InterruptsScheduler = false;
}
public override bool CanRun(WorkflowApplication instance)
{
return (this.ActionId != instance.actionCount && instance.Controller.State == WorkflowInstanceState.Idle) || instance.Controller.State == WorkflowInstanceState.Complete;
}
}
class RequiresPersistenceOperation : InstanceOperation
{
public override bool CanRun(WorkflowApplication instance)
{
if (!instance.Controller.IsPersistable && instance.Controller.State != WorkflowInstanceState.Complete)
{
instance.Controller.PauseWhenPersistable();
return false;
}
else
{
return true;
}
}
}
class WaitForTurnData
{
public WaitForTurnData(Action callback, object state, InstanceOperation operation, WorkflowApplication instance)
{
this.Callback = callback;
this.State = state;
this.Operation = operation;
this.Instance = instance;
}
public Action Callback
{
get;
private set;
}
public object State
{
get;
private set;
}
public InstanceOperation Operation
{
get;
private set;
}
public WorkflowApplication Instance
{
get;
private set;
}
}
class PersistenceManager
{
InstanceHandle handle;
InstanceHandle temporaryHandle;
InstanceOwner owner;
bool ownerWasCreated;
bool isLocked;
bool aborted;
bool isTryLoad;
Guid instanceId;
InstanceStore store;
IDictionary instanceMetadata;
public PersistenceManager(InstanceStore store, IDictionary instanceMetadata, Guid instanceId)
{
Fx.Assert(store != null, "We should never gets here without a store.");
this.instanceId = instanceId;
this.instanceMetadata = instanceMetadata;
InitializeInstanceMetadata();
this.owner = store.DefaultInstanceOwner;
if (this.owner != null)
{
this.handle = store.CreateInstanceHandle(this.owner, instanceId);
}
this.store = store;
}
public PersistenceManager(InstanceStore store, IDictionary instanceMetadata)
{
Fx.Assert(store != null, "We should never get here without a store.");
this.isTryLoad = true;
this.instanceMetadata = instanceMetadata;
InitializeInstanceMetadata();
this.owner = store.DefaultInstanceOwner;
if (this.owner != null)
{
this.handle = store.CreateInstanceHandle(this.owner);
}
this.store = store;
}
public Guid InstanceId
{
get
{
return this.instanceId;
}
}
public bool IsInitialized
{
get
{
return (this.handle != null);
}
}
public bool IsLocked
{
get
{
return this.isLocked;
}
}
public bool OwnerWasCreated
{
get
{
return this.ownerWasCreated;
}
}
void InitializeInstanceMetadata()
{
if (this.instanceMetadata == null)
{
this.instanceMetadata = new Dictionary(1);
}
// We always set this key explicitly so that users can't override
// this metadata value
this.instanceMetadata[PersistenceMetadataNamespace.InstanceType] = new InstanceValue(WorkflowNamespace.WorkflowHostType, InstanceValueOptions.WriteOnly);
}
public void Initialize(TimeSpan timeout)
{
Fx.Assert(this.handle == null, "We are already initialized by now");
using (new TransactionScope(TransactionScopeOption.Suppress))
{
try
{
CreateTemporaryHandle(null);
this.owner = this.store.Execute(this.temporaryHandle, new CreateWorkflowOwnerCommand(), timeout).InstanceOwner;
this.ownerWasCreated = true;
}
finally
{
FreeTemporaryHandle();
}
this.handle = this.isTryLoad ? this.store.CreateInstanceHandle(this.owner) : this.store.CreateInstanceHandle(this.owner, InstanceId);
Thread.MemoryBarrier();
if (this.aborted)
{
this.handle.Free();
}
}
}
void CreateTemporaryHandle(InstanceOwner owner)
{
this.temporaryHandle = this.store.CreateInstanceHandle(owner);
Thread.MemoryBarrier();
if (this.aborted)
{
FreeTemporaryHandle();
}
}
void FreeTemporaryHandle()
{
InstanceHandle handle = this.temporaryHandle;
if (handle != null)
{
handle.Free();
}
}
public IAsyncResult BeginInitialize(TimeSpan timeout, AsyncCallback callback, object state)
{
Fx.Assert(this.handle == null, "We are already initialized by now");
using (new TransactionScope(TransactionScopeOption.Suppress))
{
IAsyncResult result = null;
try
{
CreateTemporaryHandle(null);
result = this.store.BeginExecute(this.temporaryHandle, new CreateWorkflowOwnerCommand(), timeout, callback, state);
}
finally
{
// We've encountered an exception
if (result == null)
{
FreeTemporaryHandle();
}
}
return result;
}
}
public void EndInitialize(IAsyncResult result)
{
try
{
this.owner = this.store.EndExecute(result).InstanceOwner;
this.ownerWasCreated = true;
}
finally
{
FreeTemporaryHandle();
}
this.handle = this.isTryLoad ? this.store.CreateInstanceHandle(this.owner) : this.store.CreateInstanceHandle(this.owner, InstanceId);
Thread.MemoryBarrier();
if (this.aborted)
{
this.handle.Free();
}
}
public void DeleteOwner(TimeSpan timeout)
{
try
{
CreateTemporaryHandle(this.owner);
this.store.Execute(this.temporaryHandle, new DeleteWorkflowOwnerCommand(), timeout);
}
// Ignore some exceptions because DeleteWorkflowOwner is best effort.
catch (InstancePersistenceCommandException) {}
catch (InstanceOwnerException) {}
catch (OperationCanceledException) {}
finally
{
FreeTemporaryHandle();
}
}
public IAsyncResult BeginDeleteOwner(TimeSpan timeout, AsyncCallback callback, object state)
{
IAsyncResult result = null;
try
{
CreateTemporaryHandle(this.owner);
result = this.store.BeginExecute(this.temporaryHandle, new DeleteWorkflowOwnerCommand(), timeout, callback, state);
}
// Ignore some exceptions because DeleteWorkflowOwner is best effort.
catch (InstancePersistenceCommandException) { }
catch (InstanceOwnerException) { }
catch (OperationCanceledException) { }
finally
{
if (result == null)
{
FreeTemporaryHandle();
}
}
return result;
}
public void EndDeleteOwner(IAsyncResult result)
{
try
{
this.store.EndExecute(result);
}
// Ignore some exceptions because DeleteWorkflowOwner is best effort.
catch (InstancePersistenceCommandException) { }
catch (InstanceOwnerException) { }
catch (OperationCanceledException) { }
finally
{
FreeTemporaryHandle();
}
}
public void EnsureReadyness(TimeSpan timeout)
{
Fx.Assert(this.handle != null, "We should already be initialized by now");
Fx.Assert(!IsLocked, "We are already ready for persistence; why are we being called?");
Fx.Assert(!this.isTryLoad, "Should not be on an initial save path if we tried load.");
using (new TransactionScope(TransactionScopeOption.Suppress))
{
this.store.Execute(this.handle, CreateSaveCommand(null, this.instanceMetadata, PersistenceOperation.Save), timeout);
this.isLocked = true;
}
}
public IAsyncResult BeginEnsureReadyness(TimeSpan timeout, AsyncCallback callback, object state)
{
Fx.Assert(this.handle != null, "We should already be initialized by now");
Fx.Assert(!IsLocked, "We are already ready for persistence; why are we being called?");
Fx.Assert(!this.isTryLoad, "Should not be on an initial save path if we tried load.");
using (new TransactionScope(TransactionScopeOption.Suppress))
{
return this.store.BeginExecute(this.handle, CreateSaveCommand(null, this.instanceMetadata, PersistenceOperation.Save), timeout, callback, state);
}
}
public void EndEnsureReadyness(IAsyncResult result)
{
this.store.EndExecute(result);
this.isLocked = true;
}
public static Dictionary GenerateInitialData(WorkflowApplication instance)
{
Dictionary data = new Dictionary(10);
data[WorkflowNamespace.Bookmarks] = new InstanceValue(instance.Controller.GetBookmarks(), InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
data[WorkflowNamespace.LastUpdate] = new InstanceValue(DateTime.UtcNow, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
foreach (KeyValuePair mappedVariable in instance.Controller.GetMappedVariables())
{
data[WorkflowNamespace.VariablesPath.GetName(mappedVariable.Key)] = new InstanceValue(mappedVariable.Value, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
}
Fx.AssertAndThrow(instance.Controller.State != WorkflowInstanceState.Aborted, "Cannot generate data for an aborted instance.");
if (instance.Controller.State != WorkflowInstanceState.Complete)
{
data[WorkflowNamespace.Workflow] = new InstanceValue(instance.Controller.PrepareForSerialization());
data[WorkflowNamespace.Status] = new InstanceValue(instance.Controller.State == WorkflowInstanceState.Idle ? "Idle" : "Executing", InstanceValueOptions.WriteOnly);
}
else
{
data[WorkflowNamespace.Workflow] = new InstanceValue(instance.Controller.PrepareForSerialization(), InstanceValueOptions.Optional);
Exception completionException;
IDictionary outputs;
ActivityInstanceState completionState = instance.Controller.GetCompletionState(out outputs, out completionException);
if (completionState == ActivityInstanceState.Faulted)
{
data[WorkflowNamespace.Status] = new InstanceValue("Faulted", InstanceValueOptions.WriteOnly);
data[WorkflowNamespace.Exception] = new InstanceValue(completionException, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
}
else if (completionState == ActivityInstanceState.Closed)
{
data[WorkflowNamespace.Status] = new InstanceValue("Closed", InstanceValueOptions.WriteOnly);
if (outputs != null)
{
foreach (KeyValuePair output in outputs)
{
data[WorkflowNamespace.OutputPath.GetName(output.Key)] = new InstanceValue(output.Value, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
}
}
}
else
{
Fx.AssertAndThrow(completionState == ActivityInstanceState.Canceled, "Cannot be executing when WorkflowState was completed.");
data[WorkflowNamespace.Status] = new InstanceValue("Canceled", InstanceValueOptions.WriteOnly);
}
}
return data;
}
static SaveWorkflowCommand CreateSaveCommand(IDictionary instance, IDictionary instanceMetadata, PersistenceOperation operation)
{
SaveWorkflowCommand saveCommand = new SaveWorkflowCommand()
{
CompleteInstance = operation == PersistenceOperation.Complete,
UnlockInstance = operation != PersistenceOperation.Save,
};
if (instance != null)
{
foreach (KeyValuePair value in instance)
{
saveCommand.InstanceData.Add(value);
}
}
if (instanceMetadata != null)
{
foreach (KeyValuePair value in instanceMetadata)
{
saveCommand.InstanceMetadataChanges.Add(value);
}
}
return saveCommand;
}
bool TryLoadHelper(InstanceView view, out IDictionary data)
{
if (!view.IsBoundToLock)
{
data = null;
return false;
}
this.instanceId = view.InstanceId;
this.isLocked = true;
if (!this.handle.IsValid)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.WorkflowInstanceAborted(InstanceId)));
}
data = view.InstanceData;
return true;
}
public void Save(IDictionary instance, PersistenceOperation operation, TimeSpan timeout)
{
this.store.Execute(this.handle, CreateSaveCommand(instance, (this.isLocked ? null : this.instanceMetadata), operation), timeout);
this.isLocked = true;
}
public IDictionary Load(TimeSpan timeout)
{
InstanceView view = this.store.Execute(this.handle, new LoadWorkflowCommand(), timeout);
this.isLocked = true;
if (!this.handle.IsValid)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.WorkflowInstanceAborted(InstanceId)));
}
return view.InstanceData;
}
public bool TryLoad(TimeSpan timeout, out IDictionary data)
{
InstanceView view = this.store.Execute(this.handle, new TryLoadRunnableWorkflowCommand(), timeout);
return TryLoadHelper(view, out data);
}
public IAsyncResult BeginSave(IDictionary instance, PersistenceOperation operation, TimeSpan timeout, AsyncCallback callback, object state)
{
return this.store.BeginExecute(this.handle, CreateSaveCommand(instance, (this.isLocked ? null : this.instanceMetadata), operation), timeout, callback, state);
}
public void EndSave(IAsyncResult result)
{
this.store.EndExecute(result);
this.isLocked = true;
}
public IAsyncResult BeginLoad(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.store.BeginExecute(this.handle, new LoadWorkflowCommand(), timeout, callback, state);
}
public IDictionary EndLoad(IAsyncResult result)
{
InstanceView view = this.store.EndExecute(result);
this.isLocked = true;
if (!this.handle.IsValid)
{
throw FxTrace.Exception.AsError(new OperationCanceledException(SR.WorkflowInstanceAborted(InstanceId)));
}
return view.InstanceData;
}
public IAsyncResult BeginTryLoad(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.store.BeginExecute(this.handle, new TryLoadRunnableWorkflowCommand(), timeout, callback, state);
}
public bool EndTryLoad(IAsyncResult result, out IDictionary data)
{
InstanceView view = this.store.EndExecute(result);
return TryLoadHelper(view, out data);
}
public void Abort()
{
this.aborted = true;
// Make sure the setter of handle sees aborted, or v.v., or both.
Thread.MemoryBarrier();
InstanceHandle handle = this.handle;
if (handle != null)
{
handle.Free();
}
FreeTemporaryHandle();
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.