Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Tracking / SqlTrackingService.cs / 1305376 / SqlTrackingService.cs
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Text;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Data.Common;
using System.Data;
using System.Data.SqlClient;
using System.Timers;
using System.Diagnostics;
using System.Reflection;
using System.Workflow.Runtime;
using System.Workflow.ComponentModel;
using System.Workflow.Runtime.Hosting;
using System.Text.RegularExpressions;
using System.Threading;
using System.Transactions;
using System.Globalization;
using System.Workflow.ComponentModel.Serialization;
using System.ComponentModel.Design.Serialization;
using System.Xml;
using System.Configuration;
namespace System.Workflow.Runtime.Tracking
{
public sealed class SqlTrackingService : TrackingService, IProfileNotification
{
#region Private/Protected Members
private bool _isTrans = true;
private bool _partition = false;
private bool _defaultProfile = true;
private bool _enableRetries = false;
private bool _ignoreCommonEnableRetries = false;
private DateTime _lastProfileCheck;
private System.Timers.Timer _timer = new System.Timers.Timer();
private double _interval = 60000;
//private static int _deadlock = 1205;
private TypeKeyedCollection _types = new TypeKeyedCollection();
private object _typeCacheLock = new object();
private WorkflowCommitWorkBatchService _transactionService;
private DbResourceAllocator _dbResourceAllocator;
private static Version UnknownProfileVersionId = new Version( 0, 0 );
// Saved from constructor input to be used in service start initialization
private NameValueCollection _parameters;
string _unvalidatedConnectionString;
private delegate void ExecuteRetriedDelegate(object param);
#endregion
#region Configuration Properties
public string ConnectionString
{
get { return _unvalidatedConnectionString; }
}
///
/// Determines if tracking data should be held and transactionally written to the database at persistence points.
///
///
public bool IsTransactional
{
get { return _isTrans; }
set
{
_isTrans = value;
}
}
///
/// Indicates that records should be moved from the active instance tables to the appropriate parition tables when the instance completes.
///
public bool PartitionOnCompletion
{
get { return _partition; }
set { _partition = value; }
}
///
/// Determines if the default profile should be used for workflow types that do not have a profile specified for them.
///
///
public bool UseDefaultProfile
{
get { return _defaultProfile; }
set { _defaultProfile = value; }
}
///
/// The time interval, in milliseconds, at which to check the database for changes to profiles.
/// Default is 60000.
///
///
/// Setting the interval results in the next check to occur the specified number of millisecond
/// from the time at which the property is set.
///
public double ProfileChangeCheckInterval
{
get { return _interval; }
set
{
if ( value <= 0 )
throw new ArgumentException( ExecutionStringManager.InvalidProfileCheckValue );
_interval = value;
//
// Set the timer's interval.
// This will reset the timer
_timer.Interval = _interval;
}
}
public bool EnableRetries
{
get { return _enableRetries; }
set
{
_enableRetries = value;
_ignoreCommonEnableRetries = true;
}
}
internal DbResourceAllocator DbResourceAllocator
{
get { return this._dbResourceAllocator; }
}
#endregion
#region Construction
public SqlTrackingService(string connectionString)
{
if (String.IsNullOrEmpty(connectionString))
throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString);
_unvalidatedConnectionString = connectionString;
}
public SqlTrackingService(NameValueCollection parameters)
{
if (parameters == null)
throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters);
if (parameters.Count > 0)
{
foreach (string key in parameters.Keys)
{
if (0 == string.Compare("IsTransactional", key, StringComparison.OrdinalIgnoreCase))
_isTrans = bool.Parse(parameters[key]);
else if (0 == string.Compare("UseDefaultProfile", key, StringComparison.OrdinalIgnoreCase))
_defaultProfile = bool.Parse(parameters[key]);
else if (0 == string.Compare("PartitionOnCompletion", key, StringComparison.OrdinalIgnoreCase))
_partition = bool.Parse(parameters[key]);
else if (0 == string.Compare("ProfileChangeCheckInterval", key, StringComparison.OrdinalIgnoreCase))
{
_interval = double.Parse(parameters[key], NumberFormatInfo.InvariantInfo);
if (_interval <= 0)
throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue);
}
else if (0 == string.Compare("ConnectionString", key, StringComparison.OrdinalIgnoreCase))
_unvalidatedConnectionString = parameters[key];
else if (0 == string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase))
{
_enableRetries = bool.Parse(parameters[key]);
_ignoreCommonEnableRetries = true;
}
}
}
_parameters = parameters;
}
#endregion
#region WorkflowRuntimeService
override protected internal void Start()
{
_lastProfileCheck = DateTime.UtcNow;
_dbResourceAllocator = new DbResourceAllocator(this.Runtime, _parameters, _unvalidatedConnectionString);
// Check connection string mismatch if using SharedConnectionWorkflowTransactionService
_transactionService = this.Runtime.GetService();
_dbResourceAllocator.DetectSharedConnectionConflict(_transactionService);
//
// If we didn't find a local value for enable retries
// check in the common section
if ((!_ignoreCommonEnableRetries)&&(null != base.Runtime))
{
NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters;
if (commonConfigurationParameters != null)
{
// Then scan for connection string in the common configuration parameters section
foreach (string key in commonConfigurationParameters.AllKeys)
{
if (string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase) == 0)
{
_enableRetries = bool.Parse(commonConfigurationParameters[key].Value);
break;
}
}
}
}
_timer.Interval = _interval;
_timer.AutoReset = false; // ensure that only one timer thread is checking for profile changes at a time
_timer.Elapsed += new ElapsedEventHandler( CheckProfileChanges );
_timer.Start();
base.Start();
}
#endregion WorkflowRuntimeService
#region IProfileNotification Implementation
protected internal override TrackingChannel GetTrackingChannel( TrackingParameters parameters )
{
if ( null == parameters )
throw new ArgumentNullException( "parameters" );
//
// Return a new channel for this instance
// Give it the parameters and this to store
return new SqlTrackingChannel( parameters, this );
}
public event EventHandler ProfileUpdated;
public event EventHandler ProfileRemoved;
protected internal override TrackingProfile GetProfile(Type workflowType, Version profileVersion)
{
if ( null == workflowType )
throw new ArgumentNullException( "workflowType" );
// parameter wantToCreateDefault = false:
// looking for a specific version that has already been running with this instance; don't use a default here
return GetProfileByScheduleType(workflowType, profileVersion, false);
}
protected internal override bool TryGetProfile(Type workflowType, out TrackingProfile profile)
{
if ( null == workflowType )
throw new ArgumentNullException( "workflowType" );
profile = GetProfileByScheduleType(workflowType, SqlTrackingService.UnknownProfileVersionId, _defaultProfile);
if ( null == profile )
return false;
else
return true;
}
protected internal override TrackingProfile GetProfile(Guid scheduleInstanceId)
{
TrackingProfile profile = null;
GetProfile( scheduleInstanceId, out profile );
return profile;
}
private bool GetProfile( Guid scheduleInstanceId, out TrackingProfile profile )
{
profile = null;
DbCommand cmd = this._dbResourceAllocator.NewCommand();
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "[dbo].[GetInstanceTrackingProfile]";
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@InstanceId", scheduleInstanceId ) );
DbDataReader reader = null;
try
{
reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection );
//
// Should only reach here in non exception state
if ( !reader.HasRows )
{
//
// Didn't find a specific profile for this instance
reader.Close();
profile = null;
return false;
}
else
{
if ( !reader.Read() )
{
reader.Close();
profile = null;
return false;
}
if ( reader.IsDBNull( 0 ) )
profile = null;
else
{
string tmp = reader.GetString( 0 );
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader( tmp );
profile = serializer.Deserialize( pReader );
}
finally
{
if ( null != pReader )
pReader.Close();
}
}
return true;
}
}
finally
{
if ( ( null != reader ) && ( !reader.IsClosed ) )
reader.Close();
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
}
}
protected internal override bool TryReloadProfile(Type workflowType, Guid scheduleInstanceId, out TrackingProfile profile)
{
if ( null == workflowType )
throw new ArgumentNullException( "workflowType" );
bool found = GetProfile( scheduleInstanceId, out profile );
if ( found )
return true;
else
{
profile = null;
return false;
}
}
#endregion
#region Profile Management Methods
private void CheckProfileChanges( object sender, ElapsedEventArgs e )
{
DbCommand cmd = null;
DbDataReader reader = null;
try
{
if ( ( null == ProfileUpdated ) && ( null == ProfileRemoved ) )
return; // no one to notify
Debug.WriteLine( "Checking for updated profiles..." );
cmd = this._dbResourceAllocator.NewCommand();
cmd.CommandText = "GetUpdatedTrackingProfiles";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@LastCheckDateTime", _lastProfileCheck ) );
DbParameter param = this._dbResourceAllocator.NewDbParameter();
param.ParameterName = "@MaxCheckDateTime";
param.DbType = DbType.DateTime;
param.Direction = System.Data.ParameterDirection.Output;
cmd.Parameters.Add( param );
reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection );
//
// No changes
if ( !reader.HasRows )
return;
while ( reader.Read() )
{
Type t = null;
string tmp = null;
TrackingProfile profile = null;
t = Assembly.Load( reader[1] as string ).GetType( reader[0] as string );
if ( null == t )
continue;
tmp = reader[2] as string;
if ( null == tmp )
{
if ( null != ProfileRemoved )
ProfileRemoved( this, new ProfileRemovedEventArgs( t ) );
}
else
{
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader( tmp );
profile = serializer.Deserialize( pReader );
}
finally
{
if ( null != pReader )
pReader.Close();
}
if ( null != ProfileUpdated )
ProfileUpdated( this, new ProfileUpdatedEventArgs( t, profile ) );
}
Debug.WriteLine( ExecutionStringManager.UpdatedProfile + t.FullName );
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
//
// This should never be null/empty unless the proc failed which should throw
if ( null != cmd )
{
//
// If the value is null we error'd so keep the same last time for the next check
if ( null != cmd.Parameters[1].Value )
_lastProfileCheck = ( DateTime ) cmd.Parameters[1].Value;
}
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
//
// Start the timer again (autoreset is false to avoid multiple threads checking for profile changes)
_timer.Start();
}
}
#endregion
#region Private Methods
private void ExecuteRetried(ExecuteRetriedDelegate executeRetried, object param)
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
executeRetried(param);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteRetried caught exception: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " retrying.");
continue;
}
throw;
}
}
}
private DbDataReader ExecuteReaderRetried(DbCommand command, CommandBehavior behavior)
{
DbDataReader reader = null;
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
reader = command.ExecuteReader(behavior);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteReaderRetried caught exception from ExecuteReader: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried retrying.");
continue;
}
throw;
}
}
return reader;
}
private void ExecuteNonQueryRetried(DbCommand command)
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G",System.Globalization.CultureInfo.InvariantCulture));
command.ExecuteNonQuery();
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryRetried caught exception from ExecuteNonQuery: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried retrying.");
continue;
}
throw;
}
}
}
private void ExecuteNonQueryWithTxRetried(DbCommand command)
{
try
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
command.Transaction = command.Connection.BeginTransaction();
command.ExecuteNonQuery();
command.Transaction.Commit();
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried caught exception from ExecuteNonQuery: " + e.ToString());
try
{
if (null != command.Transaction)
command.Transaction.Rollback();
}
catch
{
//
// Rollback() can throw, nothing to do but ---- if this happens
// so that we don't lose the original exception
}
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried retrying.");
continue;
}
throw;
}
}
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ResetConnectionForCommand(DbCommand command)
{
if ( null == command )
return;
if ( null != command.Connection )
{
if (ConnectionState.Open != command.Connection.State)
{
if (ConnectionState.Closed != command.Connection.State)
command.Connection.Close();
command.Connection.Dispose();
command.Connection = _dbResourceAllocator.OpenNewConnectionNoEnlist();
}
}
}
internal static XmlWriter CreateXmlWriter(TextWriter output)
{
XmlWriterSettings settings = new XmlWriterSettings();
settings.Indent = true;
settings.IndentChars = ("\t");
settings.OmitXmlDeclaration = true;
settings.CloseOutput = true;
return XmlWriter.Create(output as TextWriter, settings);
}
private TrackingProfile GetProfileByScheduleType(Type workflowType, Version profileVersionId, bool wantToCreateDefault)
{
DbCommand cmd = this._dbResourceAllocator.NewCommand();
DbDataReader reader = null;
TrackingProfile profile = null;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "dbo.GetTrackingProfile";
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@TypeFullName", workflowType.FullName) );
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@AssemblyFullName", workflowType.Assembly.FullName) );
if (profileVersionId != SqlTrackingService.UnknownProfileVersionId)
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@Version", profileVersionId.ToString() ) );
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@CreateDefault", wantToCreateDefault) );
try
{
reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection);
if ( reader.Read() )
{
string tmp = reader[0] as string;
if ( null != tmp )
{
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader( tmp );
profile = serializer.Deserialize( pReader );
}
finally
{
if ( null != pReader )
pReader.Close();
}
}
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
}
return profile;
}
#endregion
#region Private Classes
private class TypeKeyedCollection : KeyedCollection
{
protected override string GetKeyForItem( Type item )
{
return item.AssemblyQualifiedName;
}
}
private class SerializedDataItem : TrackingDataItem
{
public Type Type;
public string StringData;
public byte[] SerializedData;
public bool NonSerializable;
}
private class SerializedEventArgs : EventArgs
{
public Type Type;
public byte[] SerializedArgs;
}
private struct AddedActivity
{
public string ActivityTypeFullName;
public string ActivityTypeAssemblyFullName;
public string QualifiedName;
public string ParentQualifiedName;
public string AddedActivityActionXoml;
public int Order;
}
private struct RemovedActivity
{
public string QualifiedName;
public string ParentQualifiedName;
public string RemovedActivityActionXoml;
public int Order;
}
private class SerializedWorkflowChangedEventArgs : SerializedEventArgs
{
public IList AddedActivities = new List();
public IList RemovedActivities = new List();
}
#endregion Private Classes
internal class SqlTrackingChannel : TrackingChannel, IPendingWork
{
#region Private Members
private SqlTrackingService _service = null;
private string _callPathKey = null, _parentCallPathKey = null;
private bool _isTrans = false;
private long _internalId = -1;
private long _tmpInternalId = -1;
private Dictionary _activityInstanceId = new Dictionary(32);
private Dictionary _tmpActivityInstanceId = new Dictionary(10);
private TrackingParameters _parameters = null;
private bool _pendingArchive = false;
private bool _completedTerminated = false;
private static int _activityEventBatchSize = 5;
private static int _dataItemBatchSize = 5;
private static int _dataItemAnnotationBatchSize = 5;
private static int _eventAnnotationBatchSize = 5;
#endregion
#region Construction
protected SqlTrackingChannel()
{
}
public SqlTrackingChannel( TrackingParameters parameters, SqlTrackingService service )
{
if ( null == service )
return;
_service = service;
_parameters = parameters;
_isTrans = service.IsTransactional;
GetCallPathKeys( parameters.CallPath );
if (!_isTrans)
{
//
// Look up instance id or insert if new instance
// If we're transactional we'll do this in the first IPendingWork.Commit()
_service.ExecuteRetried(ExecuteInsertWorkflowInstance, null);
}
}
#endregion
#region Public Properties
private DbResourceAllocator DbResourceAllocator
{
get { return _service.DbResourceAllocator; }
}
private WorkflowCommitWorkBatchService WorkflowCommitWorkBatchService
{
get { return _service._transactionService; }
}
#endregion
#region TrackingChannel
protected internal override void InstanceCompletedOrTerminated()
{
if (_isTrans)
{
//
// Indicate that at the next batch commit we should stamp the enddate
_completedTerminated = true;
//
// Indicate that when the next batch commit completes successfully we should partition this instance
if (_service.PartitionOnCompletion)
_pendingArchive = true;
}
else
{
_service.ExecuteRetried(ExecuteSetEndDate, null);
if (_service.PartitionOnCompletion)
_service.ExecuteRetried(PartitionInstance, null);
}
}
private void PartitionInstance(object param)
{
DbCommand command = null;
try
{
//
// Allow enlisting if there is an ambient tx
// This can only happen on a host initiated terminate in V1.
DbConnection connection = DbResourceAllocator.OpenNewConnection(false);
command = DbResourceAllocator.NewCommand(connection);
command.CommandText = "[dbo].[PartitionWorkflowInstance]";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", _internalId));
command.ExecuteNonQuery();
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ExecuteSetEndDate(object param)
{
DbCommand command = null;
try
{
//
// Allow enlisting if there is an ambient tx
// This can only happen on a host initiated terminate in V1.
DbConnection connection = DbResourceAllocator.OpenNewConnection(false);
command = DbResourceAllocator.NewCommand(connection);
ExecuteSetEndDate(_internalId, command);
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ExecuteSetEndDate(long internalId, DbCommand command)
{
if (null == command)
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandText = "[dbo].[SetWorkflowInstanceEndDateTime]";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EndDateTime", DateTime.UtcNow));
command.ExecuteNonQuery();
}
protected internal override void Send(TrackingRecord record)
{
if ( ( Guid.Empty == _parameters.InstanceId ) || ( null == record ) )
throw new ArgumentException( ExecutionStringManager.MissingParametersTrack );
if ( record is ActivityTrackingRecord )
{
ActivityTrackingRecord act = record as ActivityTrackingRecord;
if ( _isTrans )
WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( act ) );
else
_service.ExecuteRetried(ExecuteInsertActivityStatusInstance, SerializeRecord(act));
}
else if ( record is WorkflowTrackingRecord )
{
//
// Instance events cannot be batched - many occur when there isn't a batch
WorkflowTrackingRecord inst = ( WorkflowTrackingRecord ) record;
if ( _isTrans )
{
WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( inst ) );
}
else
{
if ( TrackingWorkflowEvent.Changed == inst.TrackingWorkflowEvent )
{
//
// Dynamic updates are inserted in the WorkflowInstanceEvent table
// and then the arg (workflowchanges) is normalized into xoml
// and the added/removed activities tables
_service.ExecuteRetried(ExecuteInsertWorkflowChange, SerializeRecord(inst));
}
else
{
_service.ExecuteRetried(ExecuteInsertWorkflowInstanceEvent, SerializeRecord(inst));
}
}
}
else if ( record is UserTrackingRecord )
{
UserTrackingRecord user = ( UserTrackingRecord ) record;
if ( _isTrans )
WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( user ) );
else
_service.ExecuteRetried(ExecuteInsertUserEvent, SerializeRecord(user));
}
}
#endregion
#region IPendingWork Members
public bool MustCommit( ICollection items )
{
//
// Never force a persist - this is a balancing act but the V1
// decision is to err on the side of persisting only when the workflow
// requires it based on its model. If the workflow uses persistence points
// wisely this is great. If it goes a long time between persists with lots
// of events the persists will take a long time as the batch can be huge.
return false;
}
public void Commit(System.Transactions.Transaction transaction, ICollection items)
{
if ((null == items) || (0 == items.Count))
return;
DbCommand command = null;
DbConnection connection = null;
bool needToCloseConnection = false;
DbTransaction localTransaction = null;
bool commitTx = false;
try
{
//
// Get the connection and transaction
// The connection might be shared or local
// The tx is shared and may be either a DTC or a local sql tx
connection = DbResourceAllocator.GetEnlistedConnection(
this.WorkflowCommitWorkBatchService, transaction, out needToCloseConnection);
localTransaction = DbResourceAllocator.GetLocalTransaction(
this.WorkflowCommitWorkBatchService, transaction);
if (null == localTransaction)
{
localTransaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
commitTx = true;
}
command = DbResourceAllocator.NewCommand(connection);
command.Transaction = localTransaction;
//
// If we don't have the internal id for the instance this is the first batch
// for this channel instance. If this is a new instance the following will insert
// a new instance record in the db and set _tmpInternalId. If this is a reload of
// an existing instance it will just do a lookup and set _tmpInternalId
// In Completed we will assign _tmpInternalId to _internalId if the batch is successful.
long internalId = -1;
if (_internalId <= 0)
{
ExecuteInsertWorkflowInstance(command);
internalId = _tmpInternalId;
}
else
internalId = _internalId;
IList activities = new List(5);
WorkflowTrackingRecord workflow = null;
//
// Build the batch statement
foreach (object o in items)
{
if (!(o is TrackingRecord))
continue;
if (o is ActivityTrackingRecord)
{
//
// If we have a cached workflow tracking record send it
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ActivityTrackingRecord activity = (ActivityTrackingRecord)o;
//
// Add this event to the list and send to the db if we've hit our limit
activities.Add(activity);
if (_activityEventBatchSize == activities.Count)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities = new List(5);
}
}
else if (o is UserTrackingRecord)
{
//
// If we have cached activity or workflow tracking records send them
if (activities.Count > 0)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities.Clear();
}
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ExecuteInsertUserEvent(internalId, (UserTrackingRecord)o, command);
}
else if (o is WorkflowTrackingRecord)
{
//
// If we have cached activity tracking records send them
if (activities.Count > 0)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities.Clear();
}
WorkflowTrackingRecord record = (WorkflowTrackingRecord)o;
if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent)
{
//
// If we're already holding a workflow tracking record send both to the db
// else cache it and wait for the next workflow tracking record
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ExecuteInsertWorkflowChange(internalId, record, command);
}
else
{
//
// If we're already holding a workflow tracking record send both to the db
// else cache it and wait for the next workflow tracking record
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, record, command);
workflow = null;
}
else
{
workflow = record;
}
}
}
}
//
// If we ended up with any activities event send them.
if (activities.Count > 0)
ExecuteInsertActivityStatusInstance(internalId, activities, command);
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
if (_completedTerminated)
ExecuteSetEndDate(internalId, command);
if (commitTx)
localTransaction.Commit();
}
catch (DbException e)
{
if (commitTx)
localTransaction.Rollback();
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Error writing tracking data to database: " + e);
throw;
}
finally
{
if (needToCloseConnection)
{
connection.Dispose();
}
}
return;
}
public void Complete(bool succeeded, ICollection items)
{
//
// If we didn't succeed on commit reset all flags
if (!succeeded)
{
_completedTerminated = false;
_pendingArchive = false;
_tmpInternalId = -1;
_tmpActivityInstanceId.Clear();
return;
}
//
// Commit succeeded - move the tmp internalId to the real internalId member
if (-1 == _internalId && _tmpInternalId > 0)
_internalId = _tmpInternalId;
//
// Move the tmp activity instance ids to the real activity instance id member
if (null != _tmpActivityInstanceId && _tmpActivityInstanceId.Count > 0)
{
foreach (string key in _tmpActivityInstanceId.Keys)
{
if (!_activityInstanceId.ContainsKey(key))
_activityInstanceId.Add(key, _tmpActivityInstanceId[key]);
}
_tmpActivityInstanceId.Clear();
}
if ( _pendingArchive )
{
try
{
_service.ExecuteRetried(PartitionInstance, null);
}
catch ( Exception e )
{
//
// ---- exceptions here, do not fail the instance.
// Partition logic can be re-run to clean up on failure
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, string.Format(System.Globalization.CultureInfo.InvariantCulture, "Error partitioning instance {0}: {1}", _parameters.InstanceId, e.ToString()));
}
}
}
#endregion
#region Sql Commands - InsertWorkflowInstance
private void ExecuteInsertWorkflowInstance( object param )
{
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbCommand command = DbResourceAllocator.NewCommand(conn);
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted);
command.Connection = conn;
command.Transaction = tx;
_internalId = ExecuteInsertWorkflowInstance( command );
tx.Commit();
}
catch (Exception)
{
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
}
//
// Re-throw original exception
throw;
}
finally
{
if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) )
conn.Close();
}
return;
}
private long ExecuteInsertWorkflowInstance( DbCommand command )
{
if ( null == command )
throw new ArgumentNullException("command");
if ((null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException(ExecutionStringManager.InvalidCommandBadConnection, "command");
//
// Write the type and the workflow definition
string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string;
if (null != xaml && xaml.Length > 0)
InsertWorkflow( command, _parameters.InstanceId, null, _parameters.RootActivity);
else
InsertWorkflow( command, _parameters.InstanceId, _parameters.WorkflowType, _parameters.RootActivity);
//
// Write the instance record
BuildInsertWorkflowInstanceParameters( command );
DbDataReader reader = null;
try
{
reader = command.ExecuteReader();
if (reader.Read())
_tmpInternalId = reader.GetInt64(0);
return _tmpInternalId;
}
finally
{
if (null != reader)
reader.Close();
}
}
private void BuildInsertWorkflowInstanceParameters( DbCommand command )
{
Debug.Assert((command != null), "Null command");
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflowInstance]";
command.Parameters.Clear();
bool xamlInst = false;
string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string;
if (null != xaml && xaml.Length > 0)
xamlInst = true;
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceId", _parameters.InstanceId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TypeFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.FullName ) ));
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@AssemblyFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.Assembly.FullName ) ));
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", _parameters.ContextGuid ) );
if ( Guid.Empty != _parameters.CallerInstanceId )
{
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerInstanceId", _parameters.CallerInstanceId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallPath", _callPathKey ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerContextGuid", _parameters.CallerContextGuid ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerParentContextGuid", _parameters.CallerParentContextGuid ) );
}
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime", this.GetSqlDateTimeString( DateTime.UtcNow ) ) );
}
private void InsertWorkflow( DbCommand command, Guid workflowInstanceId, Type workflowType, Activity rootActivity)
{
string xoml = null;
//
// If we've already seen this type just return
if (null != workflowType)
{
lock (_service._typeCacheLock)
{
if (_service._types.Contains(workflowType.AssemblyQualifiedName))
return;
else
xoml = GetXomlDocument(rootActivity);
}
}
else
{
// Don't forget to deal with XOML-only workflows
lock (_service._typeCacheLock)
{
xoml = GetXomlDocument(rootActivity);
}
}
//
// It is possible to ---- here but the pk specifies ignore duplicate key
// This is better than taking a lock around all of the logic in this method.
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflow]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@IsInstanceType", (null == workflowType ? true : false)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowDefinition", xoml));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowId", DbType.Int32, System.Data.ParameterDirection.Output));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Exists", DbType.Boolean, System.Data.ParameterDirection.Output));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Activities", GetActivitiesXml((CompositeActivity)rootActivity)));
command.ExecuteNonQuery();
//
// Add this to the list of types we've already seen so we don't go
// through the serialization overhead again and hit the db only to learn we've already stored it
// Use a lock here to avoid ---- on _types dictionary
if (null != workflowType)
{
lock (_service._typeCacheLock)
{
if (!_service._types.Contains(workflowType.AssemblyQualifiedName))
{
_service._types.Add(workflowType);
}
}
}
return;
}
#endregion
#region Sql Commands - InsertWorkflowInstanceEvent
private void ExecuteInsertWorkflowInstanceEvent( object param )
{
WorkflowTrackingRecord record = param as WorkflowTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbCommand command = DbResourceAllocator.NewCommand( conn );
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted );
command.Connection = conn;
command.Transaction = tx;
ExecuteInsertWorkflowInstanceEvent( _internalId, record, null, command );
tx.Commit();
}
catch (Exception)
{
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
}
//
// Re-throw original exception
throw;
}
finally
{
if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) )
conn.Close();
}
return;
}
private void ExecuteInsertWorkflowInstanceEvent( long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command )
{
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentException();
BuildInsertWorkflowInstanceEventParameters( internalId, record1, record2, command );
command.ExecuteNonQuery();
long eventId1 = (long) command.Parameters["@WorkflowInstanceEventId1"].Value;
Debug.Assert(eventId1 > 0, "Invalid eventId1");
long eventId2 = -1;
if (null != record2)
{
eventId2 = (long)command.Parameters["@WorkflowInstanceEventId2"].Value;
Debug.Assert(eventId2 > 0, "Invalid eventId2");
}
List> annotations = new List>(record1.Annotations.Count + (null == record2 ? 0 : record2.Annotations.Count));
foreach (string s in record1.Annotations)
annotations.Add(new KeyValuePair(eventId1, s));
if (null != record2)
{
foreach (string s in record2.Annotations)
annotations.Add(new KeyValuePair(eventId2, s));
}
BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command);
}
private void BuildInsertWorkflowInstanceEventParameters(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command)
{
if ( null == record1 )
throw new ArgumentNullException( "record" );
if ( null == command )
throw new ArgumentNullException( "command" );
Debug.Assert( internalId != -1, "Invalid internalId" );
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflowInstanceEvent]";
command.Parameters.Clear();
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TrackingWorkflowEventId1", ( int ) record1.TrackingWorkflowEvent ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime1", record1.EventDateTime ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder1", record1.EventOrder ) );
if ( null != record1.EventArgs )
{
Type t = record1.EventArgs.GetType();
Byte[] data = null;
if (!(record1.EventArgs is SerializedEventArgs))
record1 = SerializeRecord(record1);
SerializedEventArgs sargs = record1.EventArgs as SerializedEventArgs;
data = sargs.SerializedArgs;
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgTypeFullName1", t.FullName ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgAssemblyFullName1", t.Assembly.FullName ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArg1", data ) );
}
command.Parameters.Add( DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId1", DbType.Int64, ParameterDirection.Output));
if (null != record2)
{
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId2", (int)record2.TrackingWorkflowEvent));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime2", record2.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder2", record2.EventOrder));
if (null != record2.EventArgs)
{
Type t = record2.EventArgs.GetType();
Byte[] data = null;
if (!(record2.EventArgs is SerializedEventArgs))
record2 = SerializeRecord(record2);
SerializedEventArgs sargs = record2.EventArgs as SerializedEventArgs;
data = sargs.SerializedArgs;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName2", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName2", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg2", data));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId2", DbType.Int64, ParameterDirection.Output));
}
}
#endregion
#region Sql Commands - InsertActivityStatusInstance
private void ExecuteInsertActivityStatusInstance(object param)
{
ActivityTrackingRecord record = param as ActivityTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidActivityTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted );
DbCommand command = conn.CreateCommand();
command.Transaction = tx;
IList activity = new List(1);
activity.Add(record);
ExecuteInsertActivityStatusInstance( _internalId, activity, command );
tx.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != conn) && (ConnectionState.Closed != conn.State))
conn.Close();
}
return;
}
private void ExecuteInsertActivityStatusInstance(long internalId, IList activities, DbCommand command)
{
if (null == activities || activities.Count <= 0)
return;
if (activities.Count > _activityEventBatchSize)
throw new ArgumentOutOfRangeException("activities");
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertActivityExecutionStatusEventMultiple]";
//
// Add the common parameters
command.Parameters.Clear();
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId));
//
// If we have the workflow's internal id use it to avoid the look up in the db
DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if (internalId > 0)
param.Value = internalId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceContextGuid", _parameters.ContextGuid));
//
// Hashed ids of QName, context and pcontext used as key for storing activity record ids
// Save these for each record in the list so we don't have to recompute them below when adding to the cache
string[] ids = new string[] { null, null, null, null, null };
for (int i = 0; i < activities.Count; i++)
{
ActivityTrackingRecord record = activities[i];
long aid = -1;
ids[i] = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid);
TryGetActivityInstanceId(ids[i], out aid);
BuildInsertActivityStatusEventParameters(internalId, aid, i+1, record, command);
}
command.ExecuteNonQuery();
//
// Get all the output ids
long[] eventIds = new long[] { -1, -1, -1, -1, -1 };
for (int i = 0; i < activities.Count; i++)
{
string index = (i + 1).ToString(CultureInfo.InvariantCulture);
//
// ActivityInstanceId
long aId = (long)command.Parameters["@ActivityInstanceId" + index].Value;
Debug.Assert(aId > 0, "Invalid @ActivityInstanceId output parameter value");
//
// For all status changes that aren't "Closed" add the id to the instance cache
// Set... method checks and only adds if it does already exist.
// To keep the cache size under control remove entries for activities that have closed.
// The activity might fault and need to do a lookup in the db but this isn't the common
// path and the db lookup isn't very expensive.
if (ActivityExecutionStatus.Closed != activities[i].ExecutionStatus)
SetActivityInstanceId(ids[i], aId);
else
RemoveActivityInstanceId(ids[i]);
//
// ActivityExecutionStatusEventId
long aeseId = (long)command.Parameters["@ActivityExecutionStatusEventId" + index].Value;
Debug.Assert(aeseId > 0, "Invalid @ActivityExecutionStatusEventId output parameter value");
eventIds[i] = aeseId;
}
List> annotations = new List>(10);
List> items = new List>(10);
for (int i = 0; i < activities.Count; i++)
{
ActivityTrackingRecord record = activities[i];
//
// Get the ActivityExecutionStatusEventId
long eventId = eventIds[i];
if (eventId<=0)
throw new InvalidOperationException();
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair(eventId, s));
foreach (TrackingDataItem item in record.Body)
items.Add(new KeyValuePair(eventId, item));
}
BatchExecuteInsertEventAnnotation(internalId, 'a', annotations, command);
BatchExecuteInsertTrackingDataItems(internalId, 'a', items, command);
}
private void BuildInsertActivityStatusEventParameters( long internalId, long activityInstanceId, int parameterId, ActivityTrackingRecord record, DbCommand command )
{
string paramIdString = parameterId.ToString(CultureInfo.InvariantCulture);
//
// If we have the activity's instance id use it to avoid the look up in the db
DbParameter param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId" + paramIdString, DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if ( activityInstanceId > 0 )
param.Value = activityInstanceId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName" + paramIdString, record.QualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid" + paramIdString, record.ContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid" + paramIdString, record.ParentContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ExecutionStatusId" + paramIdString, (int)record.ExecutionStatus));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime" + paramIdString, record.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder" + paramIdString, record.EventOrder));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ActivityExecutionStatusEventId" + paramIdString, DbType.Int64, ParameterDirection.Output));
}
#endregion
#region Sql Commands - InsertUserEvent
private void ExecuteInsertUserEvent( object param )
{
UserTrackingRecord record = param as UserTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidUserTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted );
DbCommand command = conn.CreateCommand();
command.Transaction = tx;
ExecuteInsertUserEvent( _internalId, record, command );
tx.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) )
conn.Close();
}
return;
}
private void ExecuteInsertUserEvent( long internalId, UserTrackingRecord record, DbCommand command )
{
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentException();
long aid = -1;
bool cached = false;
//
// Check if we have the activityInstanceId in the cache - we cache to avoid repeatedly searching this table.
string id = BuildQualifiedNameVarName( record.QualifiedName, record.ContextGuid, record.ParentContextGuid );
if ( TryGetActivityInstanceId( id, out aid ) )
cached = true;
BuildInsertUserEventParameters( internalId, aid, record, command );
command.ExecuteNonQuery();
//
// If we didn't already have the activityInstanceId get it from the IN/OUT param and put it in the cache
if ( !cached )
SetActivityInstanceId( id, ( long ) command.Parameters["@ActivityInstanceId"].Value );
long eventId = (long)command.Parameters["@UserEventId"].Value;
List> annotations = new List>(10);
List> items = new List>(10);
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair(eventId, s));
foreach (TrackingDataItem item in record.Body)
items.Add(new KeyValuePair(eventId, item));
BatchExecuteInsertEventAnnotation(internalId, 'u', annotations, command);
BatchExecuteInsertTrackingDataItems(internalId, 'u', items, command);
}
private void BuildInsertUserEventParameters( long internalId, long activityInstanceId, UserTrackingRecord record, DbCommand command )
{
Debug.Assert( internalId != -1, "Invalid internalId" );
Debug.Assert( ( command != null ), "Null command passed to BuildInsertActivityStatusEventParameters" );
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertUserEvent]";
command.Parameters.Clear();
DbParameter param = DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", DbType.Int64 );
command.Parameters.Add( param );
param.Value = internalId;
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder", record.EventOrder ) );
//
// If we have the activity's instance id use it to avoid the look up in the db
param = DbResourceAllocator.NewDbParameter( "@ActivityInstanceId", DbType.Int64, System.Data.ParameterDirection.InputOutput );
command.Parameters.Add( param );
if ( activityInstanceId > 0 )
{
param.Value = activityInstanceId;
}
else
{
//
// Keep the network traffic down - only include the fields needed
// to insert an ActivityInstance record if we don't have the activityInstanceId
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@QualifiedName", record.QualifiedName ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", record.ContextGuid ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ParentContextGuid", record.ParentContextGuid ) );
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", record.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataKey", record.UserDataKey));
if ( null != record.UserData )
{
Type t = record.UserData.GetType();
Byte[] data = null;
bool nonSerializable = false;
string userDataString = null;
if (!(record.UserData is SerializedDataItem))
SerializeDataItem(record.UserData, out data, out nonSerializable);
SerializedDataItem sItem = record.UserData as SerializedDataItem;
data = sItem.SerializedData;
nonSerializable = sItem.NonSerializable;
userDataString = sItem.StringData;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataTypeFullName", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataAssemblyFullName", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Str", userDataString));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Blob", data));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataNonSerializable", nonSerializable));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserEventId", DbType.Int64, ParameterDirection.Output));
}
#endregion
#region Sql Commands - InsertTrackingDataItem
private void BatchExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList> items, DbCommand command)
{
if (null == items || items.Count <= 0)
return;
//
// If the list is smaller than the batch size just push the whole thing
if (items.Count <= _dataItemBatchSize)
{
ExecuteInsertTrackingDataItems(internalId, eventTypeId, items, command);
return;
}
//
// Need to split the list into max batch size chunks
List> batch = new List>(_dataItemBatchSize);
foreach (KeyValuePair kvp in items)
{
batch.Add(kvp);
if (batch.Count == _dataItemBatchSize)
{
ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command);
batch.Clear();
}
}
//
// Send anything that hasn't been sent
if (batch.Count > 0)
ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command);
}
private void ExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList> items, DbCommand command)
{
Debug.Assert( internalId != -1, "Invalid internalId" );
if ( null == items || items.Count <= 0 )
return;
if (items.Count > _dataItemAnnotationBatchSize)
throw new ArgumentOutOfRangeException("items");
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentException();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertTrackingDataItemMultiple]";
command.Parameters.Clear();
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId));
int i = 1; // base 1 to match parameter names
foreach(KeyValuePair kvp in items)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
SerializedDataItem sItem = kvp.Value as SerializedDataItem;
if (null == sItem)
sItem = SerializeDataItem(kvp.Value);
Type t = sItem.Type;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId"+index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@FieldName"+index, sItem.FieldName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName" + index, ((null == t) ? null : t.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName" + index, ((null == t) ? null : t.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Str" + index, sItem.StringData));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Blob" + index, sItem.SerializedData));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@DataNonSerializable" + index, sItem.NonSerializable));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, DbType.Int64, System.Data.ParameterDirection.Output));
}
command.ExecuteNonQuery();
//
// Get all the out parameters holding the data item record ids
// This keeps us from repeatedly going into the parameters collection
// below if a data item has more than one annotation
List ids = new List(_dataItemAnnotationBatchSize);
for( i = 0; i < items.Count; i++ )
{
string index = (i+1).ToString(CultureInfo.InvariantCulture);
ids.Insert(i, (long)command.Parameters["@TrackingDataItemId" + index].Value);
}
//
// Go through all the data items and send all the annotations in batches
List> annotations = new List>(_dataItemAnnotationBatchSize);
i = 0;
foreach(KeyValuePair kvp in items)
{
TrackingDataItem item = kvp.Value;
long dataItemId = ids[i++];
foreach (string s in item.Annotations)
{
annotations.Add(new KeyValuePair(dataItemId, s));
if (annotations.Count == _dataItemAnnotationBatchSize)
{
ExecuteInsertAnnotation(internalId, annotations, command);
annotations.Clear();
}
}
}
//
// If we have anything left send them.
if ( annotations.Count > 0 )
ExecuteInsertAnnotation(internalId, annotations, command);
}
private void ExecuteInsertAnnotation(long internalId, IList> annotations, DbCommand command)
{
if ( null == annotations || annotations.Count <= 0 )
return;
if (annotations.Count > _dataItemAnnotationBatchSize)
throw new ArgumentOutOfRangeException("annotations");
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertTrackingDataItemAnnotationMultiple]";
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId));
int i = 1; // base 1 to match parameter names
foreach( KeyValuePair kvp in annotations)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value ));
}
command.ExecuteNonQuery();
return;
}
private void BatchExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList> annotations, DbCommand command)
{
if (null == annotations || annotations.Count <= 0)
return;
//
// If the list is smaller than the max batch size just send it directly
if (annotations.Count <= _eventAnnotationBatchSize)
{
ExecuteInsertEventAnnotation(internalId, eventTypeId, annotations, command);
return;
}
//
// Need to split the list into max batch size chunks
List> batch = new List>(_eventAnnotationBatchSize);
foreach (KeyValuePair kvp in annotations)
{
batch.Add(kvp);
if (batch.Count == _eventAnnotationBatchSize)
{
ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command);
batch.Clear();
}
}
//
// Send anything that hasn't been sent
if (batch.Count > 0)
ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command);
}
private void ExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList> annotations, DbCommand command)
{
Debug.Assert(internalId != -1, "Invalid internalId");
if (null == annotations || annotations.Count <= 0)
return;
if (annotations.Count > _eventAnnotationBatchSize)
throw new ArgumentOutOfRangeException("annotations");
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentNullException( "command" );
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertEventAnnotationMultiple]";
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId ) );
int i = 1; //base 1 to match parameter names
foreach(KeyValuePair kvp in annotations)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" +index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation"+index, kvp.Value));
}
command.ExecuteNonQuery();
return;
}
#endregion
#region Workflow Change
private void ExecuteInsertWorkflowChange( object param )
{
WorkflowTrackingRecord record = param as WorkflowTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param");
DbCommand command = DbResourceAllocator.NewCommand();
try
{
if ( ConnectionState.Open != command.Connection.State )
command.Connection.Open();
command.Transaction = command.Connection.BeginTransaction();
ExecuteInsertWorkflowChange(_internalId, record, command);
command.Transaction.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ((null != command) && (null != command.Transaction))
command.Transaction.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
return;
}
private void ExecuteInsertWorkflowChange(long internalId, WorkflowTrackingRecord record, DbCommand command)
{
if (null == record)
throw new ArgumentNullException("record");
if (null == record.EventArgs)
throw new InvalidOperationException(ExecutionStringManager.InvalidWorkflowChangeArgs);
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
//
// If we haven't already serialized do so now.
// This is work we have to do to write to store in the db anyway.
if (!(record.EventArgs is SerializedWorkflowChangedEventArgs))
record = SerializeRecord(record);
//
// Insert the workflow instance event
BuildInsertWorkflowInstanceEventParameters(internalId, record, null, command);
command.ExecuteNonQuery();
//
// Get the event id for added/removed activities and annotations
long eventId = (long)command.Parameters["@WorkflowInstanceEventId1"].Value;
SerializedWorkflowChangedEventArgs sargs = (SerializedWorkflowChangedEventArgs)record.EventArgs;
//
// Normalize the activities that have been added/removed if we're tracking definitions
if ((null != sargs.AddedActivities) && (sargs.AddedActivities.Count > 0))
{
foreach (AddedActivity added in sargs.AddedActivities)
ExecuteInsertAddedActivity(internalId, added.QualifiedName, added.ParentQualifiedName, added.ActivityTypeFullName, added.ActivityTypeAssemblyFullName, added.AddedActivityActionXoml, eventId, added.Order, command);
}
if ((null != sargs.RemovedActivities) && (sargs.RemovedActivities.Count > 0))
{
foreach (RemovedActivity removed in sargs.RemovedActivities)
ExecuteInsertRemovedActivity(internalId, removed.QualifiedName, removed.ParentQualifiedName, removed.RemovedActivityActionXoml, eventId, removed.Order, command);
}
List> annotations = new List>(record.Annotations.Count);
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair(eventId, s));
BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command);
}
private void ExecuteInsertAddedActivity(long internalId, string qualifiedName, string parentQualifiedName, string typeFullName, string assemblyFullName, string addedActivityActionXoml, long eventId, int order, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertAddedActivity]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", typeFullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", assemblyFullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AddedActivityAction", addedActivityActionXoml));
if (-1 == order)
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value));
else
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order));
command.ExecuteNonQuery();
}
private void ExecuteInsertRemovedActivity(long internalId, string qualifiedName, string parentQualifiedName, string removedActivityActionXoml, long eventId, int order, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertRemovedActivity]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@RemovedActivityAction", removedActivityActionXoml));
if (-1 == order)
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value));
else
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order));
command.ExecuteNonQuery();
}
#endregion
#region Utility
private bool TryGetActivityInstanceId(string key, out long id)
{
//
// Check the cache of committed ids
if (_activityInstanceId.TryGetValue(key, out id))
return true;
//
// If we're batched check the cache of temp ids generated during this batch commit
if (_isTrans)
return _tmpActivityInstanceId.TryGetValue(key, out id);
else
return false; // not batched so we didn't find the id
}
private void SetActivityInstanceId(string key, long id)
{
//
// If we're batched put the ids in the temp member
// If the commit is successful we'll move these to the real member
// in IPendingWork.Complete
if (_isTrans)
{
if (!_tmpActivityInstanceId.ContainsKey(key))
_tmpActivityInstanceId.Add(key, id);
}
else
{
if (!_activityInstanceId.ContainsKey(key))
_activityInstanceId.Add(key, id);
}
}
private void RemoveActivityInstanceId(string key)
{
//
// Remove from both the temp and real caches
if (_isTrans)
{
if (_tmpActivityInstanceId.ContainsKey(key))
_tmpActivityInstanceId.Remove(key);
}
if (_activityInstanceId.ContainsKey(key))
_activityInstanceId.Remove(key);
}
private string GetSqlDateTimeString(DateTime dateTime)
{
return dateTime.Year.ToString(System.Globalization.CultureInfo.InvariantCulture) + PadToDblDigit(dateTime.Month) + PadToDblDigit(dateTime.Day) + " " + dateTime.Hour.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Minute.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Second.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Millisecond.ToString(System.Globalization.CultureInfo.InvariantCulture);
}
private string PadToDblDigit(int num)
{
string s = num.ToString(System.Globalization.CultureInfo.InvariantCulture);
if (s.Length == 1)
return "0" + s;
else
return s;
}
///
/// Build a string to uniquely identify each activity that should be recorded as a seperate instance.
/// A separate instance is defined by the combination of QualifiedName, Context and ParentContext
///
///
///
private string BuildQualifiedNameVarName( string qId, Guid context, Guid parentContext )
{
Guid hashed = HashHelper.HashServiceType(qId);
return hashed.ToString().Replace('-', '_') + "_" + context.ToString().Replace('-', '_') + "_" + parentContext.ToString().Replace('-', '_');
}
private ActivityTrackingRecord SerializeRecord( ActivityTrackingRecord record )
{
if ( ( null == record.Body ) || ( 0 == record.Body.Count ) )
return record;
for ( int i = 0; i < record.Body.Count; i++ )
record.Body[i] = SerializeDataItem( record.Body[i] );
return record;
}
private UserTrackingRecord SerializeRecord( UserTrackingRecord record )
{
if ( ( ( null == record.Body ) || ( 0 == record.Body.Count ) ) && ( null == record.EventArgs ) && ( null == record.UserData ) )
return record;
if ( null != record.UserData )
{
SerializedDataItem item = new SerializedDataItem();
byte[] data = null;
bool nonSerializable;
SerializeDataItem( record.UserData, out data, out nonSerializable );
item.Type = record.UserData.GetType();
item.StringData = record.UserData.ToString();
item.SerializedData = data;
item.NonSerializable = nonSerializable;
record.UserData = item;
}
for ( int i = 0; i < record.Body.Count; i++ )
record.Body[i] = SerializeDataItem( record.Body[i] );
return record;
}
private WorkflowTrackingRecord SerializeRecord( WorkflowTrackingRecord record )
{
if ( null == record.EventArgs )
return record;
SerializedEventArgs args;
if ( TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent )
{
//
// Convert the WorkflowChanged items
SerializedWorkflowChangedEventArgs sargs = new SerializedWorkflowChangedEventArgs();
TrackingWorkflowChangedEventArgs wargs = ( TrackingWorkflowChangedEventArgs ) record.EventArgs;
if ( null != wargs )
{
for ( int i = 0; i activities)
{
Activity removed = removedAction.OriginalRemovedActivity;
RemovedActivity removedActivity = new RemovedActivity();
removedActivity.Order = order;
removedActivity.QualifiedName = removed.QualifiedName;
if (null != removed.Parent)
removedActivity.ParentQualifiedName = removed.Parent.QualifiedName;
//
// Save the defintion of this change
removedActivity.RemovedActivityActionXoml = GetXomlDocument(removedAction);
activities.Add(removedActivity);
//
// Recursively add all contained activities to the removed list
if (removed is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)removed).Activities)
{
AddRemovedActivity(activity, activities);
}
}
}
private void AddRemovedActivity(Activity removed, IList activities)
{
RemovedActivity removedActivity = new RemovedActivity();
removedActivity.Order = -1;
removedActivity.QualifiedName = removed.QualifiedName;
if ( null != removed.Parent )
removedActivity.ParentQualifiedName = removed.Parent.QualifiedName;
activities.Add(removedActivity);
//
// Recursively add all contained activities to the removed list
if (removed is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)removed).Activities)
{
AddRemovedActivity(activity, activities);
}
}
}
private void AddAddedActivity(AddedActivityAction addedAction, int order, IList activities)
{
Activity added = addedAction.AddedActivity;
AddedActivity addedActivity = new AddedActivity();
addedActivity.Order = order;
Type type = added.GetType();
addedActivity.ActivityTypeFullName = type.FullName;
addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName;
addedActivity.QualifiedName = added.QualifiedName;
if (null != added.Parent)
addedActivity.ParentQualifiedName = added.Parent.QualifiedName;
addedActivity.AddedActivityActionXoml = GetXomlDocument(addedAction);
activities.Add(addedActivity);
//
// Recursively add all contained activities to the added list
if (added is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)added).Activities)
{
AddAddedActivity(activity, activities);
}
}
}
private void AddAddedActivity(Activity added, IList activities)
{
AddedActivity addedActivity = new AddedActivity();
addedActivity.Order = -1;
Type type = added.GetType();
addedActivity.ActivityTypeFullName = type.FullName;
addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName;
addedActivity.QualifiedName = added.QualifiedName;
if ( null != added.Parent )
addedActivity.ParentQualifiedName = added.Parent.QualifiedName;
activities.Add(addedActivity);
//
// Recursively add all contained activities to the added list
if (added is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)added).Activities)
{
AddAddedActivity(activity, activities);
}
}
}
private SerializedDataItem SerializeDataItem( TrackingDataItem item )
{
if ( null == item )
return null;
SerializedDataItem s = new SerializedDataItem();
s.Data = item.Data;
s.Annotations.AddRange( item.Annotations );
s.FieldName = item.FieldName;
if ( null != item.Data )
{
byte[] state = null;
bool nonSerializable;
SerializeDataItem( item.Data, out state, out nonSerializable );
s.SerializedData = state;
s.StringData = item.Data.ToString();
s.Type = item.Data.GetType();
s.NonSerializable = nonSerializable;
}
return s;
}
///
/// Binary serialize an object. Used to persist trackingDataItems.
///
///
///
private void SerializeDataItem( object data, out byte[] state, out bool nonSerializable )
{
nonSerializable = false;
state = null;
if ( null == data )
return;
MemoryStream stream = new MemoryStream(1024);
BinaryFormatter bf = new BinaryFormatter();
try
{
bf.Serialize(stream, data);
state = new byte[stream.Length];
stream.Position = 0;
if (stream.Length > Int32.MaxValue)
return;
else
{
int read = 0, totalRead = 0, cbToRead = 0;
do
{
totalRead += read;
cbToRead = (int)stream.Length - totalRead;
read = stream.Read(state, totalRead, cbToRead);
} while (read > 0);
}
}
catch (SerializationException)
{
nonSerializable = true;
return;
}
finally
{
stream.Close();
}
}
///
/// Make string sql safe
///
///
///
private string SqlEscape( string val )
{
if ( null == val )
return null;
return val.Replace( "'", "''" );
}
/*
static char[] hexDigits = {
'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
///
/// Convert a byte array to a string of hex chars for sql image type
///
///
///
private static string ToHexString( byte[] bytes )
{
if ( null == bytes )
return null;
if ( 0 == bytes.Length )
return null;
char[] chars = new char[bytes.Length * 2];
for ( int i = 0; i < bytes.Length; i++ )
{
int b = bytes[i];
chars[i * 2] = hexDigits[b >> 4];
chars[i * 2 + 1] = hexDigits[b & 0xF];
}
return "0x" + new string( chars );
}
*/
private void GetCallPathKeys( IList callPath )
{
if ( ( null == callPath ) || ( callPath.Count <= 0 ) )
return;
for ( int i = 0; i < callPath.Count; i++ )
{
_callPathKey = _callPathKey + "." + callPath[i];
if ( i < callPath.Count - 1 )
_parentCallPathKey = _parentCallPathKey + "." + callPath[i];
}
if ( null != _callPathKey )
_callPathKey = SqlEscape( _callPathKey.Substring( 1 ) );
if ( null != _parentCallPathKey )
_parentCallPathKey = SqlEscape( _parentCallPathKey.Substring( 1 ) );
}
private string GetActivitiesXml(CompositeActivity root)
{
if (null == root)
return null;
StringBuilder sb = new StringBuilder();
XmlWriter writer = XmlWriter.Create(sb);
try
{
writer.WriteStartDocument();
writer.WriteStartElement("Activities");
WriteActivity(root, writer);
writer.WriteEndElement();
writer.WriteEndDocument();
}
finally
{
writer.Flush();
writer.Close();
}
return sb.ToString();
}
private void WriteActivity(Activity activity, XmlWriter writer)
{
if (null == activity)
return;
if (null == writer)
throw new ArgumentNullException("writer");
Type t = activity.GetType();
writer.WriteStartElement("Activity");
writer.WriteElementString("TypeFullName", t.FullName);
writer.WriteElementString("AssemblyFullName", t.Assembly.FullName);
writer.WriteElementString("QualifiedName", activity.QualifiedName);
//
// Don't write the element if the value is null, sql will see a missing element as a null value
if (null != activity.Parent)
writer.WriteElementString("ParentQualifiedName", activity.Parent.QualifiedName);
writer.WriteEndElement();
if (activity is CompositeActivity)
foreach (Activity a in GetAllEnabledActivities((CompositeActivity)activity))
WriteActivity(a, writer);
}
// This function returns all the executable activities including secondary flow activities.
private IList GetAllEnabledActivities(CompositeActivity compositeActivity)
{
if (compositeActivity == null)
throw new ArgumentNullException("compositeActivity");
List allActivities = new List(compositeActivity.EnabledActivities);
foreach (Activity secondaryFlowActivity in ((ISupportAlternateFlow)compositeActivity).AlternateFlowActivities)
{
if (!allActivities.Contains(secondaryFlowActivity))
allActivities.Add(secondaryFlowActivity);
}
return allActivities;
}
internal string GetXomlDocument(object obj)
{
string xomlText = null;
using (StringWriter stringWriter = new StringWriter(System.Globalization.CultureInfo.InvariantCulture))
{
using (XmlWriter xmlWriter = CreateXmlWriter(stringWriter))
{
WorkflowMarkupSerializer serializer = new WorkflowMarkupSerializer();
serializer.Serialize(xmlWriter, obj);
xomlText = stringWriter.ToString(); ;
}
}
return xomlText;
}
#endregion
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Text;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Data.Common;
using System.Data;
using System.Data.SqlClient;
using System.Timers;
using System.Diagnostics;
using System.Reflection;
using System.Workflow.Runtime;
using System.Workflow.ComponentModel;
using System.Workflow.Runtime.Hosting;
using System.Text.RegularExpressions;
using System.Threading;
using System.Transactions;
using System.Globalization;
using System.Workflow.ComponentModel.Serialization;
using System.ComponentModel.Design.Serialization;
using System.Xml;
using System.Configuration;
namespace System.Workflow.Runtime.Tracking
{
public sealed class SqlTrackingService : TrackingService, IProfileNotification
{
#region Private/Protected Members
private bool _isTrans = true;
private bool _partition = false;
private bool _defaultProfile = true;
private bool _enableRetries = false;
private bool _ignoreCommonEnableRetries = false;
private DateTime _lastProfileCheck;
private System.Timers.Timer _timer = new System.Timers.Timer();
private double _interval = 60000;
//private static int _deadlock = 1205;
private TypeKeyedCollection _types = new TypeKeyedCollection();
private object _typeCacheLock = new object();
private WorkflowCommitWorkBatchService _transactionService;
private DbResourceAllocator _dbResourceAllocator;
private static Version UnknownProfileVersionId = new Version( 0, 0 );
// Saved from constructor input to be used in service start initialization
private NameValueCollection _parameters;
string _unvalidatedConnectionString;
private delegate void ExecuteRetriedDelegate(object param);
#endregion
#region Configuration Properties
public string ConnectionString
{
get { return _unvalidatedConnectionString; }
}
///
/// Determines if tracking data should be held and transactionally written to the database at persistence points.
///
///
public bool IsTransactional
{
get { return _isTrans; }
set
{
_isTrans = value;
}
}
///
/// Indicates that records should be moved from the active instance tables to the appropriate parition tables when the instance completes.
///
public bool PartitionOnCompletion
{
get { return _partition; }
set { _partition = value; }
}
///
/// Determines if the default profile should be used for workflow types that do not have a profile specified for them.
///
///
public bool UseDefaultProfile
{
get { return _defaultProfile; }
set { _defaultProfile = value; }
}
///
/// The time interval, in milliseconds, at which to check the database for changes to profiles.
/// Default is 60000.
///
///
/// Setting the interval results in the next check to occur the specified number of millisecond
/// from the time at which the property is set.
///
public double ProfileChangeCheckInterval
{
get { return _interval; }
set
{
if ( value <= 0 )
throw new ArgumentException( ExecutionStringManager.InvalidProfileCheckValue );
_interval = value;
//
// Set the timer's interval.
// This will reset the timer
_timer.Interval = _interval;
}
}
public bool EnableRetries
{
get { return _enableRetries; }
set
{
_enableRetries = value;
_ignoreCommonEnableRetries = true;
}
}
internal DbResourceAllocator DbResourceAllocator
{
get { return this._dbResourceAllocator; }
}
#endregion
#region Construction
public SqlTrackingService(string connectionString)
{
if (String.IsNullOrEmpty(connectionString))
throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString);
_unvalidatedConnectionString = connectionString;
}
public SqlTrackingService(NameValueCollection parameters)
{
if (parameters == null)
throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters);
if (parameters.Count > 0)
{
foreach (string key in parameters.Keys)
{
if (0 == string.Compare("IsTransactional", key, StringComparison.OrdinalIgnoreCase))
_isTrans = bool.Parse(parameters[key]);
else if (0 == string.Compare("UseDefaultProfile", key, StringComparison.OrdinalIgnoreCase))
_defaultProfile = bool.Parse(parameters[key]);
else if (0 == string.Compare("PartitionOnCompletion", key, StringComparison.OrdinalIgnoreCase))
_partition = bool.Parse(parameters[key]);
else if (0 == string.Compare("ProfileChangeCheckInterval", key, StringComparison.OrdinalIgnoreCase))
{
_interval = double.Parse(parameters[key], NumberFormatInfo.InvariantInfo);
if (_interval <= 0)
throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue);
}
else if (0 == string.Compare("ConnectionString", key, StringComparison.OrdinalIgnoreCase))
_unvalidatedConnectionString = parameters[key];
else if (0 == string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase))
{
_enableRetries = bool.Parse(parameters[key]);
_ignoreCommonEnableRetries = true;
}
}
}
_parameters = parameters;
}
#endregion
#region WorkflowRuntimeService
override protected internal void Start()
{
_lastProfileCheck = DateTime.UtcNow;
_dbResourceAllocator = new DbResourceAllocator(this.Runtime, _parameters, _unvalidatedConnectionString);
// Check connection string mismatch if using SharedConnectionWorkflowTransactionService
_transactionService = this.Runtime.GetService();
_dbResourceAllocator.DetectSharedConnectionConflict(_transactionService);
//
// If we didn't find a local value for enable retries
// check in the common section
if ((!_ignoreCommonEnableRetries)&&(null != base.Runtime))
{
NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters;
if (commonConfigurationParameters != null)
{
// Then scan for connection string in the common configuration parameters section
foreach (string key in commonConfigurationParameters.AllKeys)
{
if (string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase) == 0)
{
_enableRetries = bool.Parse(commonConfigurationParameters[key].Value);
break;
}
}
}
}
_timer.Interval = _interval;
_timer.AutoReset = false; // ensure that only one timer thread is checking for profile changes at a time
_timer.Elapsed += new ElapsedEventHandler( CheckProfileChanges );
_timer.Start();
base.Start();
}
#endregion WorkflowRuntimeService
#region IProfileNotification Implementation
protected internal override TrackingChannel GetTrackingChannel( TrackingParameters parameters )
{
if ( null == parameters )
throw new ArgumentNullException( "parameters" );
//
// Return a new channel for this instance
// Give it the parameters and this to store
return new SqlTrackingChannel( parameters, this );
}
public event EventHandler ProfileUpdated;
public event EventHandler ProfileRemoved;
protected internal override TrackingProfile GetProfile(Type workflowType, Version profileVersion)
{
if ( null == workflowType )
throw new ArgumentNullException( "workflowType" );
// parameter wantToCreateDefault = false:
// looking for a specific version that has already been running with this instance; don't use a default here
return GetProfileByScheduleType(workflowType, profileVersion, false);
}
protected internal override bool TryGetProfile(Type workflowType, out TrackingProfile profile)
{
if ( null == workflowType )
throw new ArgumentNullException( "workflowType" );
profile = GetProfileByScheduleType(workflowType, SqlTrackingService.UnknownProfileVersionId, _defaultProfile);
if ( null == profile )
return false;
else
return true;
}
protected internal override TrackingProfile GetProfile(Guid scheduleInstanceId)
{
TrackingProfile profile = null;
GetProfile( scheduleInstanceId, out profile );
return profile;
}
private bool GetProfile( Guid scheduleInstanceId, out TrackingProfile profile )
{
profile = null;
DbCommand cmd = this._dbResourceAllocator.NewCommand();
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "[dbo].[GetInstanceTrackingProfile]";
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@InstanceId", scheduleInstanceId ) );
DbDataReader reader = null;
try
{
reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection );
//
// Should only reach here in non exception state
if ( !reader.HasRows )
{
//
// Didn't find a specific profile for this instance
reader.Close();
profile = null;
return false;
}
else
{
if ( !reader.Read() )
{
reader.Close();
profile = null;
return false;
}
if ( reader.IsDBNull( 0 ) )
profile = null;
else
{
string tmp = reader.GetString( 0 );
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader( tmp );
profile = serializer.Deserialize( pReader );
}
finally
{
if ( null != pReader )
pReader.Close();
}
}
return true;
}
}
finally
{
if ( ( null != reader ) && ( !reader.IsClosed ) )
reader.Close();
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
}
}
protected internal override bool TryReloadProfile(Type workflowType, Guid scheduleInstanceId, out TrackingProfile profile)
{
if ( null == workflowType )
throw new ArgumentNullException( "workflowType" );
bool found = GetProfile( scheduleInstanceId, out profile );
if ( found )
return true;
else
{
profile = null;
return false;
}
}
#endregion
#region Profile Management Methods
private void CheckProfileChanges( object sender, ElapsedEventArgs e )
{
DbCommand cmd = null;
DbDataReader reader = null;
try
{
if ( ( null == ProfileUpdated ) && ( null == ProfileRemoved ) )
return; // no one to notify
Debug.WriteLine( "Checking for updated profiles..." );
cmd = this._dbResourceAllocator.NewCommand();
cmd.CommandText = "GetUpdatedTrackingProfiles";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@LastCheckDateTime", _lastProfileCheck ) );
DbParameter param = this._dbResourceAllocator.NewDbParameter();
param.ParameterName = "@MaxCheckDateTime";
param.DbType = DbType.DateTime;
param.Direction = System.Data.ParameterDirection.Output;
cmd.Parameters.Add( param );
reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection );
//
// No changes
if ( !reader.HasRows )
return;
while ( reader.Read() )
{
Type t = null;
string tmp = null;
TrackingProfile profile = null;
t = Assembly.Load( reader[1] as string ).GetType( reader[0] as string );
if ( null == t )
continue;
tmp = reader[2] as string;
if ( null == tmp )
{
if ( null != ProfileRemoved )
ProfileRemoved( this, new ProfileRemovedEventArgs( t ) );
}
else
{
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader( tmp );
profile = serializer.Deserialize( pReader );
}
finally
{
if ( null != pReader )
pReader.Close();
}
if ( null != ProfileUpdated )
ProfileUpdated( this, new ProfileUpdatedEventArgs( t, profile ) );
}
Debug.WriteLine( ExecutionStringManager.UpdatedProfile + t.FullName );
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
//
// This should never be null/empty unless the proc failed which should throw
if ( null != cmd )
{
//
// If the value is null we error'd so keep the same last time for the next check
if ( null != cmd.Parameters[1].Value )
_lastProfileCheck = ( DateTime ) cmd.Parameters[1].Value;
}
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
//
// Start the timer again (autoreset is false to avoid multiple threads checking for profile changes)
_timer.Start();
}
}
#endregion
#region Private Methods
private void ExecuteRetried(ExecuteRetriedDelegate executeRetried, object param)
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
executeRetried(param);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteRetried caught exception: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " retrying.");
continue;
}
throw;
}
}
}
private DbDataReader ExecuteReaderRetried(DbCommand command, CommandBehavior behavior)
{
DbDataReader reader = null;
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
reader = command.ExecuteReader(behavior);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteReaderRetried caught exception from ExecuteReader: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried retrying.");
continue;
}
throw;
}
}
return reader;
}
private void ExecuteNonQueryRetried(DbCommand command)
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G",System.Globalization.CultureInfo.InvariantCulture));
command.ExecuteNonQuery();
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryRetried caught exception from ExecuteNonQuery: " + e.ToString());
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried retrying.");
continue;
}
throw;
}
}
}
private void ExecuteNonQueryWithTxRetried(DbCommand command)
{
try
{
short count = 0;
DbRetry dbRetry = new DbRetry(_enableRetries);
while (true)
{
try
{
ResetConnectionForCommand(command);
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
command.Transaction = command.Connection.BeginTransaction();
command.ExecuteNonQuery();
command.Transaction.Commit();
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture));
break;
}
catch(Exception e)
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried caught exception from ExecuteNonQuery: " + e.ToString());
try
{
if (null != command.Transaction)
command.Transaction.Rollback();
}
catch
{
//
// Rollback() can throw, nothing to do but ---- if this happens
// so that we don't lose the original exception
}
if (dbRetry.TryDoRetry(ref count))
{
WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried retrying.");
continue;
}
throw;
}
}
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ResetConnectionForCommand(DbCommand command)
{
if ( null == command )
return;
if ( null != command.Connection )
{
if (ConnectionState.Open != command.Connection.State)
{
if (ConnectionState.Closed != command.Connection.State)
command.Connection.Close();
command.Connection.Dispose();
command.Connection = _dbResourceAllocator.OpenNewConnectionNoEnlist();
}
}
}
internal static XmlWriter CreateXmlWriter(TextWriter output)
{
XmlWriterSettings settings = new XmlWriterSettings();
settings.Indent = true;
settings.IndentChars = ("\t");
settings.OmitXmlDeclaration = true;
settings.CloseOutput = true;
return XmlWriter.Create(output as TextWriter, settings);
}
private TrackingProfile GetProfileByScheduleType(Type workflowType, Version profileVersionId, bool wantToCreateDefault)
{
DbCommand cmd = this._dbResourceAllocator.NewCommand();
DbDataReader reader = null;
TrackingProfile profile = null;
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "dbo.GetTrackingProfile";
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@TypeFullName", workflowType.FullName) );
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@AssemblyFullName", workflowType.Assembly.FullName) );
if (profileVersionId != SqlTrackingService.UnknownProfileVersionId)
cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@Version", profileVersionId.ToString() ) );
cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@CreateDefault", wantToCreateDefault) );
try
{
reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection);
if ( reader.Read() )
{
string tmp = reader[0] as string;
if ( null != tmp )
{
TrackingProfileSerializer serializer = new TrackingProfileSerializer();
StringReader pReader = null;
try
{
pReader = new StringReader( tmp );
profile = serializer.Deserialize( pReader );
}
finally
{
if ( null != pReader )
pReader.Close();
}
}
}
}
finally
{
if ((null != reader) && (!reader.IsClosed))
reader.Close();
if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State))
cmd.Connection.Close();
}
return profile;
}
#endregion
#region Private Classes
private class TypeKeyedCollection : KeyedCollection
{
protected override string GetKeyForItem( Type item )
{
return item.AssemblyQualifiedName;
}
}
private class SerializedDataItem : TrackingDataItem
{
public Type Type;
public string StringData;
public byte[] SerializedData;
public bool NonSerializable;
}
private class SerializedEventArgs : EventArgs
{
public Type Type;
public byte[] SerializedArgs;
}
private struct AddedActivity
{
public string ActivityTypeFullName;
public string ActivityTypeAssemblyFullName;
public string QualifiedName;
public string ParentQualifiedName;
public string AddedActivityActionXoml;
public int Order;
}
private struct RemovedActivity
{
public string QualifiedName;
public string ParentQualifiedName;
public string RemovedActivityActionXoml;
public int Order;
}
private class SerializedWorkflowChangedEventArgs : SerializedEventArgs
{
public IList AddedActivities = new List();
public IList RemovedActivities = new List();
}
#endregion Private Classes
internal class SqlTrackingChannel : TrackingChannel, IPendingWork
{
#region Private Members
private SqlTrackingService _service = null;
private string _callPathKey = null, _parentCallPathKey = null;
private bool _isTrans = false;
private long _internalId = -1;
private long _tmpInternalId = -1;
private Dictionary _activityInstanceId = new Dictionary(32);
private Dictionary _tmpActivityInstanceId = new Dictionary(10);
private TrackingParameters _parameters = null;
private bool _pendingArchive = false;
private bool _completedTerminated = false;
private static int _activityEventBatchSize = 5;
private static int _dataItemBatchSize = 5;
private static int _dataItemAnnotationBatchSize = 5;
private static int _eventAnnotationBatchSize = 5;
#endregion
#region Construction
protected SqlTrackingChannel()
{
}
public SqlTrackingChannel( TrackingParameters parameters, SqlTrackingService service )
{
if ( null == service )
return;
_service = service;
_parameters = parameters;
_isTrans = service.IsTransactional;
GetCallPathKeys( parameters.CallPath );
if (!_isTrans)
{
//
// Look up instance id or insert if new instance
// If we're transactional we'll do this in the first IPendingWork.Commit()
_service.ExecuteRetried(ExecuteInsertWorkflowInstance, null);
}
}
#endregion
#region Public Properties
private DbResourceAllocator DbResourceAllocator
{
get { return _service.DbResourceAllocator; }
}
private WorkflowCommitWorkBatchService WorkflowCommitWorkBatchService
{
get { return _service._transactionService; }
}
#endregion
#region TrackingChannel
protected internal override void InstanceCompletedOrTerminated()
{
if (_isTrans)
{
//
// Indicate that at the next batch commit we should stamp the enddate
_completedTerminated = true;
//
// Indicate that when the next batch commit completes successfully we should partition this instance
if (_service.PartitionOnCompletion)
_pendingArchive = true;
}
else
{
_service.ExecuteRetried(ExecuteSetEndDate, null);
if (_service.PartitionOnCompletion)
_service.ExecuteRetried(PartitionInstance, null);
}
}
private void PartitionInstance(object param)
{
DbCommand command = null;
try
{
//
// Allow enlisting if there is an ambient tx
// This can only happen on a host initiated terminate in V1.
DbConnection connection = DbResourceAllocator.OpenNewConnection(false);
command = DbResourceAllocator.NewCommand(connection);
command.CommandText = "[dbo].[PartitionWorkflowInstance]";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", _internalId));
command.ExecuteNonQuery();
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ExecuteSetEndDate(object param)
{
DbCommand command = null;
try
{
//
// Allow enlisting if there is an ambient tx
// This can only happen on a host initiated terminate in V1.
DbConnection connection = DbResourceAllocator.OpenNewConnection(false);
command = DbResourceAllocator.NewCommand(connection);
ExecuteSetEndDate(_internalId, command);
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
}
private void ExecuteSetEndDate(long internalId, DbCommand command)
{
if (null == command)
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandText = "[dbo].[SetWorkflowInstanceEndDateTime]";
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EndDateTime", DateTime.UtcNow));
command.ExecuteNonQuery();
}
protected internal override void Send(TrackingRecord record)
{
if ( ( Guid.Empty == _parameters.InstanceId ) || ( null == record ) )
throw new ArgumentException( ExecutionStringManager.MissingParametersTrack );
if ( record is ActivityTrackingRecord )
{
ActivityTrackingRecord act = record as ActivityTrackingRecord;
if ( _isTrans )
WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( act ) );
else
_service.ExecuteRetried(ExecuteInsertActivityStatusInstance, SerializeRecord(act));
}
else if ( record is WorkflowTrackingRecord )
{
//
// Instance events cannot be batched - many occur when there isn't a batch
WorkflowTrackingRecord inst = ( WorkflowTrackingRecord ) record;
if ( _isTrans )
{
WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( inst ) );
}
else
{
if ( TrackingWorkflowEvent.Changed == inst.TrackingWorkflowEvent )
{
//
// Dynamic updates are inserted in the WorkflowInstanceEvent table
// and then the arg (workflowchanges) is normalized into xoml
// and the added/removed activities tables
_service.ExecuteRetried(ExecuteInsertWorkflowChange, SerializeRecord(inst));
}
else
{
_service.ExecuteRetried(ExecuteInsertWorkflowInstanceEvent, SerializeRecord(inst));
}
}
}
else if ( record is UserTrackingRecord )
{
UserTrackingRecord user = ( UserTrackingRecord ) record;
if ( _isTrans )
WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( user ) );
else
_service.ExecuteRetried(ExecuteInsertUserEvent, SerializeRecord(user));
}
}
#endregion
#region IPendingWork Members
public bool MustCommit( ICollection items )
{
//
// Never force a persist - this is a balancing act but the V1
// decision is to err on the side of persisting only when the workflow
// requires it based on its model. If the workflow uses persistence points
// wisely this is great. If it goes a long time between persists with lots
// of events the persists will take a long time as the batch can be huge.
return false;
}
public void Commit(System.Transactions.Transaction transaction, ICollection items)
{
if ((null == items) || (0 == items.Count))
return;
DbCommand command = null;
DbConnection connection = null;
bool needToCloseConnection = false;
DbTransaction localTransaction = null;
bool commitTx = false;
try
{
//
// Get the connection and transaction
// The connection might be shared or local
// The tx is shared and may be either a DTC or a local sql tx
connection = DbResourceAllocator.GetEnlistedConnection(
this.WorkflowCommitWorkBatchService, transaction, out needToCloseConnection);
localTransaction = DbResourceAllocator.GetLocalTransaction(
this.WorkflowCommitWorkBatchService, transaction);
if (null == localTransaction)
{
localTransaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
commitTx = true;
}
command = DbResourceAllocator.NewCommand(connection);
command.Transaction = localTransaction;
//
// If we don't have the internal id for the instance this is the first batch
// for this channel instance. If this is a new instance the following will insert
// a new instance record in the db and set _tmpInternalId. If this is a reload of
// an existing instance it will just do a lookup and set _tmpInternalId
// In Completed we will assign _tmpInternalId to _internalId if the batch is successful.
long internalId = -1;
if (_internalId <= 0)
{
ExecuteInsertWorkflowInstance(command);
internalId = _tmpInternalId;
}
else
internalId = _internalId;
IList activities = new List(5);
WorkflowTrackingRecord workflow = null;
//
// Build the batch statement
foreach (object o in items)
{
if (!(o is TrackingRecord))
continue;
if (o is ActivityTrackingRecord)
{
//
// If we have a cached workflow tracking record send it
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ActivityTrackingRecord activity = (ActivityTrackingRecord)o;
//
// Add this event to the list and send to the db if we've hit our limit
activities.Add(activity);
if (_activityEventBatchSize == activities.Count)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities = new List(5);
}
}
else if (o is UserTrackingRecord)
{
//
// If we have cached activity or workflow tracking records send them
if (activities.Count > 0)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities.Clear();
}
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ExecuteInsertUserEvent(internalId, (UserTrackingRecord)o, command);
}
else if (o is WorkflowTrackingRecord)
{
//
// If we have cached activity tracking records send them
if (activities.Count > 0)
{
ExecuteInsertActivityStatusInstance(internalId, activities, command);
activities.Clear();
}
WorkflowTrackingRecord record = (WorkflowTrackingRecord)o;
if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent)
{
//
// If we're already holding a workflow tracking record send both to the db
// else cache it and wait for the next workflow tracking record
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
ExecuteInsertWorkflowChange(internalId, record, command);
}
else
{
//
// If we're already holding a workflow tracking record send both to the db
// else cache it and wait for the next workflow tracking record
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, record, command);
workflow = null;
}
else
{
workflow = record;
}
}
}
}
//
// If we ended up with any activities event send them.
if (activities.Count > 0)
ExecuteInsertActivityStatusInstance(internalId, activities, command);
if (null != workflow)
{
ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command);
workflow = null;
}
if (_completedTerminated)
ExecuteSetEndDate(internalId, command);
if (commitTx)
localTransaction.Commit();
}
catch (DbException e)
{
if (commitTx)
localTransaction.Rollback();
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Error writing tracking data to database: " + e);
throw;
}
finally
{
if (needToCloseConnection)
{
connection.Dispose();
}
}
return;
}
public void Complete(bool succeeded, ICollection items)
{
//
// If we didn't succeed on commit reset all flags
if (!succeeded)
{
_completedTerminated = false;
_pendingArchive = false;
_tmpInternalId = -1;
_tmpActivityInstanceId.Clear();
return;
}
//
// Commit succeeded - move the tmp internalId to the real internalId member
if (-1 == _internalId && _tmpInternalId > 0)
_internalId = _tmpInternalId;
//
// Move the tmp activity instance ids to the real activity instance id member
if (null != _tmpActivityInstanceId && _tmpActivityInstanceId.Count > 0)
{
foreach (string key in _tmpActivityInstanceId.Keys)
{
if (!_activityInstanceId.ContainsKey(key))
_activityInstanceId.Add(key, _tmpActivityInstanceId[key]);
}
_tmpActivityInstanceId.Clear();
}
if ( _pendingArchive )
{
try
{
_service.ExecuteRetried(PartitionInstance, null);
}
catch ( Exception e )
{
//
// ---- exceptions here, do not fail the instance.
// Partition logic can be re-run to clean up on failure
WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, string.Format(System.Globalization.CultureInfo.InvariantCulture, "Error partitioning instance {0}: {1}", _parameters.InstanceId, e.ToString()));
}
}
}
#endregion
#region Sql Commands - InsertWorkflowInstance
private void ExecuteInsertWorkflowInstance( object param )
{
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbCommand command = DbResourceAllocator.NewCommand(conn);
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted);
command.Connection = conn;
command.Transaction = tx;
_internalId = ExecuteInsertWorkflowInstance( command );
tx.Commit();
}
catch (Exception)
{
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
}
//
// Re-throw original exception
throw;
}
finally
{
if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) )
conn.Close();
}
return;
}
private long ExecuteInsertWorkflowInstance( DbCommand command )
{
if ( null == command )
throw new ArgumentNullException("command");
if ((null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException(ExecutionStringManager.InvalidCommandBadConnection, "command");
//
// Write the type and the workflow definition
string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string;
if (null != xaml && xaml.Length > 0)
InsertWorkflow( command, _parameters.InstanceId, null, _parameters.RootActivity);
else
InsertWorkflow( command, _parameters.InstanceId, _parameters.WorkflowType, _parameters.RootActivity);
//
// Write the instance record
BuildInsertWorkflowInstanceParameters( command );
DbDataReader reader = null;
try
{
reader = command.ExecuteReader();
if (reader.Read())
_tmpInternalId = reader.GetInt64(0);
return _tmpInternalId;
}
finally
{
if (null != reader)
reader.Close();
}
}
private void BuildInsertWorkflowInstanceParameters( DbCommand command )
{
Debug.Assert((command != null), "Null command");
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflowInstance]";
command.Parameters.Clear();
bool xamlInst = false;
string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string;
if (null != xaml && xaml.Length > 0)
xamlInst = true;
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceId", _parameters.InstanceId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TypeFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.FullName ) ));
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@AssemblyFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.Assembly.FullName ) ));
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", _parameters.ContextGuid ) );
if ( Guid.Empty != _parameters.CallerInstanceId )
{
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerInstanceId", _parameters.CallerInstanceId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallPath", _callPathKey ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerContextGuid", _parameters.CallerContextGuid ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerParentContextGuid", _parameters.CallerParentContextGuid ) );
}
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime", this.GetSqlDateTimeString( DateTime.UtcNow ) ) );
}
private void InsertWorkflow( DbCommand command, Guid workflowInstanceId, Type workflowType, Activity rootActivity)
{
string xoml = null;
//
// If we've already seen this type just return
if (null != workflowType)
{
lock (_service._typeCacheLock)
{
if (_service._types.Contains(workflowType.AssemblyQualifiedName))
return;
else
xoml = GetXomlDocument(rootActivity);
}
}
else
{
// Don't forget to deal with XOML-only workflows
lock (_service._typeCacheLock)
{
xoml = GetXomlDocument(rootActivity);
}
}
//
// It is possible to ---- here but the pk specifies ignore duplicate key
// This is better than taking a lock around all of the logic in this method.
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflow]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@IsInstanceType", (null == workflowType ? true : false)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowDefinition", xoml));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowId", DbType.Int32, System.Data.ParameterDirection.Output));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Exists", DbType.Boolean, System.Data.ParameterDirection.Output));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Activities", GetActivitiesXml((CompositeActivity)rootActivity)));
command.ExecuteNonQuery();
//
// Add this to the list of types we've already seen so we don't go
// through the serialization overhead again and hit the db only to learn we've already stored it
// Use a lock here to avoid ---- on _types dictionary
if (null != workflowType)
{
lock (_service._typeCacheLock)
{
if (!_service._types.Contains(workflowType.AssemblyQualifiedName))
{
_service._types.Add(workflowType);
}
}
}
return;
}
#endregion
#region Sql Commands - InsertWorkflowInstanceEvent
private void ExecuteInsertWorkflowInstanceEvent( object param )
{
WorkflowTrackingRecord record = param as WorkflowTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbCommand command = DbResourceAllocator.NewCommand( conn );
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted );
command.Connection = conn;
command.Transaction = tx;
ExecuteInsertWorkflowInstanceEvent( _internalId, record, null, command );
tx.Commit();
}
catch (Exception)
{
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
}
//
// Re-throw original exception
throw;
}
finally
{
if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) )
conn.Close();
}
return;
}
private void ExecuteInsertWorkflowInstanceEvent( long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command )
{
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentException();
BuildInsertWorkflowInstanceEventParameters( internalId, record1, record2, command );
command.ExecuteNonQuery();
long eventId1 = (long) command.Parameters["@WorkflowInstanceEventId1"].Value;
Debug.Assert(eventId1 > 0, "Invalid eventId1");
long eventId2 = -1;
if (null != record2)
{
eventId2 = (long)command.Parameters["@WorkflowInstanceEventId2"].Value;
Debug.Assert(eventId2 > 0, "Invalid eventId2");
}
List> annotations = new List>(record1.Annotations.Count + (null == record2 ? 0 : record2.Annotations.Count));
foreach (string s in record1.Annotations)
annotations.Add(new KeyValuePair(eventId1, s));
if (null != record2)
{
foreach (string s in record2.Annotations)
annotations.Add(new KeyValuePair(eventId2, s));
}
BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command);
}
private void BuildInsertWorkflowInstanceEventParameters(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command)
{
if ( null == record1 )
throw new ArgumentNullException( "record" );
if ( null == command )
throw new ArgumentNullException( "command" );
Debug.Assert( internalId != -1, "Invalid internalId" );
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertWorkflowInstanceEvent]";
command.Parameters.Clear();
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TrackingWorkflowEventId1", ( int ) record1.TrackingWorkflowEvent ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime1", record1.EventDateTime ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder1", record1.EventOrder ) );
if ( null != record1.EventArgs )
{
Type t = record1.EventArgs.GetType();
Byte[] data = null;
if (!(record1.EventArgs is SerializedEventArgs))
record1 = SerializeRecord(record1);
SerializedEventArgs sargs = record1.EventArgs as SerializedEventArgs;
data = sargs.SerializedArgs;
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgTypeFullName1", t.FullName ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgAssemblyFullName1", t.Assembly.FullName ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArg1", data ) );
}
command.Parameters.Add( DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId1", DbType.Int64, ParameterDirection.Output));
if (null != record2)
{
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId2", (int)record2.TrackingWorkflowEvent));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime2", record2.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder2", record2.EventOrder));
if (null != record2.EventArgs)
{
Type t = record2.EventArgs.GetType();
Byte[] data = null;
if (!(record2.EventArgs is SerializedEventArgs))
record2 = SerializeRecord(record2);
SerializedEventArgs sargs = record2.EventArgs as SerializedEventArgs;
data = sargs.SerializedArgs;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName2", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName2", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg2", data));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId2", DbType.Int64, ParameterDirection.Output));
}
}
#endregion
#region Sql Commands - InsertActivityStatusInstance
private void ExecuteInsertActivityStatusInstance(object param)
{
ActivityTrackingRecord record = param as ActivityTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidActivityTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted );
DbCommand command = conn.CreateCommand();
command.Transaction = tx;
IList activity = new List(1);
activity.Add(record);
ExecuteInsertActivityStatusInstance( _internalId, activity, command );
tx.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != conn) && (ConnectionState.Closed != conn.State))
conn.Close();
}
return;
}
private void ExecuteInsertActivityStatusInstance(long internalId, IList activities, DbCommand command)
{
if (null == activities || activities.Count <= 0)
return;
if (activities.Count > _activityEventBatchSize)
throw new ArgumentOutOfRangeException("activities");
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentException();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertActivityExecutionStatusEventMultiple]";
//
// Add the common parameters
command.Parameters.Clear();
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId));
//
// If we have the workflow's internal id use it to avoid the look up in the db
DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if (internalId > 0)
param.Value = internalId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceContextGuid", _parameters.ContextGuid));
//
// Hashed ids of QName, context and pcontext used as key for storing activity record ids
// Save these for each record in the list so we don't have to recompute them below when adding to the cache
string[] ids = new string[] { null, null, null, null, null };
for (int i = 0; i < activities.Count; i++)
{
ActivityTrackingRecord record = activities[i];
long aid = -1;
ids[i] = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid);
TryGetActivityInstanceId(ids[i], out aid);
BuildInsertActivityStatusEventParameters(internalId, aid, i+1, record, command);
}
command.ExecuteNonQuery();
//
// Get all the output ids
long[] eventIds = new long[] { -1, -1, -1, -1, -1 };
for (int i = 0; i < activities.Count; i++)
{
string index = (i + 1).ToString(CultureInfo.InvariantCulture);
//
// ActivityInstanceId
long aId = (long)command.Parameters["@ActivityInstanceId" + index].Value;
Debug.Assert(aId > 0, "Invalid @ActivityInstanceId output parameter value");
//
// For all status changes that aren't "Closed" add the id to the instance cache
// Set... method checks and only adds if it does already exist.
// To keep the cache size under control remove entries for activities that have closed.
// The activity might fault and need to do a lookup in the db but this isn't the common
// path and the db lookup isn't very expensive.
if (ActivityExecutionStatus.Closed != activities[i].ExecutionStatus)
SetActivityInstanceId(ids[i], aId);
else
RemoveActivityInstanceId(ids[i]);
//
// ActivityExecutionStatusEventId
long aeseId = (long)command.Parameters["@ActivityExecutionStatusEventId" + index].Value;
Debug.Assert(aeseId > 0, "Invalid @ActivityExecutionStatusEventId output parameter value");
eventIds[i] = aeseId;
}
List> annotations = new List>(10);
List> items = new List>(10);
for (int i = 0; i < activities.Count; i++)
{
ActivityTrackingRecord record = activities[i];
//
// Get the ActivityExecutionStatusEventId
long eventId = eventIds[i];
if (eventId<=0)
throw new InvalidOperationException();
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair(eventId, s));
foreach (TrackingDataItem item in record.Body)
items.Add(new KeyValuePair(eventId, item));
}
BatchExecuteInsertEventAnnotation(internalId, 'a', annotations, command);
BatchExecuteInsertTrackingDataItems(internalId, 'a', items, command);
}
private void BuildInsertActivityStatusEventParameters( long internalId, long activityInstanceId, int parameterId, ActivityTrackingRecord record, DbCommand command )
{
string paramIdString = parameterId.ToString(CultureInfo.InvariantCulture);
//
// If we have the activity's instance id use it to avoid the look up in the db
DbParameter param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId" + paramIdString, DbType.Int64, System.Data.ParameterDirection.InputOutput);
command.Parameters.Add(param);
if ( activityInstanceId > 0 )
param.Value = activityInstanceId;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName" + paramIdString, record.QualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid" + paramIdString, record.ContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid" + paramIdString, record.ParentContextGuid));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ExecutionStatusId" + paramIdString, (int)record.ExecutionStatus));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime" + paramIdString, record.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder" + paramIdString, record.EventOrder));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ActivityExecutionStatusEventId" + paramIdString, DbType.Int64, ParameterDirection.Output));
}
#endregion
#region Sql Commands - InsertUserEvent
private void ExecuteInsertUserEvent( object param )
{
UserTrackingRecord record = param as UserTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidUserTrackingRecordParameter, "param");
DbConnection conn = DbResourceAllocator.OpenNewConnection();
DbTransaction tx = null;
try
{
tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted );
DbCommand command = conn.CreateCommand();
command.Transaction = tx;
ExecuteInsertUserEvent( _internalId, record, command );
tx.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ( null != tx )
tx.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) )
conn.Close();
}
return;
}
private void ExecuteInsertUserEvent( long internalId, UserTrackingRecord record, DbCommand command )
{
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentException();
long aid = -1;
bool cached = false;
//
// Check if we have the activityInstanceId in the cache - we cache to avoid repeatedly searching this table.
string id = BuildQualifiedNameVarName( record.QualifiedName, record.ContextGuid, record.ParentContextGuid );
if ( TryGetActivityInstanceId( id, out aid ) )
cached = true;
BuildInsertUserEventParameters( internalId, aid, record, command );
command.ExecuteNonQuery();
//
// If we didn't already have the activityInstanceId get it from the IN/OUT param and put it in the cache
if ( !cached )
SetActivityInstanceId( id, ( long ) command.Parameters["@ActivityInstanceId"].Value );
long eventId = (long)command.Parameters["@UserEventId"].Value;
List> annotations = new List>(10);
List> items = new List>(10);
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair(eventId, s));
foreach (TrackingDataItem item in record.Body)
items.Add(new KeyValuePair(eventId, item));
BatchExecuteInsertEventAnnotation(internalId, 'u', annotations, command);
BatchExecuteInsertTrackingDataItems(internalId, 'u', items, command);
}
private void BuildInsertUserEventParameters( long internalId, long activityInstanceId, UserTrackingRecord record, DbCommand command )
{
Debug.Assert( internalId != -1, "Invalid internalId" );
Debug.Assert( ( command != null ), "Null command passed to BuildInsertActivityStatusEventParameters" );
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertUserEvent]";
command.Parameters.Clear();
DbParameter param = DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", DbType.Int64 );
command.Parameters.Add( param );
param.Value = internalId;
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder", record.EventOrder ) );
//
// If we have the activity's instance id use it to avoid the look up in the db
param = DbResourceAllocator.NewDbParameter( "@ActivityInstanceId", DbType.Int64, System.Data.ParameterDirection.InputOutput );
command.Parameters.Add( param );
if ( activityInstanceId > 0 )
{
param.Value = activityInstanceId;
}
else
{
//
// Keep the network traffic down - only include the fields needed
// to insert an ActivityInstance record if we don't have the activityInstanceId
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@QualifiedName", record.QualifiedName ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", record.ContextGuid ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ParentContextGuid", record.ParentContextGuid ) );
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", record.EventDateTime));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataKey", record.UserDataKey));
if ( null != record.UserData )
{
Type t = record.UserData.GetType();
Byte[] data = null;
bool nonSerializable = false;
string userDataString = null;
if (!(record.UserData is SerializedDataItem))
SerializeDataItem(record.UserData, out data, out nonSerializable);
SerializedDataItem sItem = record.UserData as SerializedDataItem;
data = sItem.SerializedData;
nonSerializable = sItem.NonSerializable;
userDataString = sItem.StringData;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataTypeFullName", t.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataAssemblyFullName", t.Assembly.FullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Str", userDataString));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Blob", data));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataNonSerializable", nonSerializable));
}
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserEventId", DbType.Int64, ParameterDirection.Output));
}
#endregion
#region Sql Commands - InsertTrackingDataItem
private void BatchExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList> items, DbCommand command)
{
if (null == items || items.Count <= 0)
return;
//
// If the list is smaller than the batch size just push the whole thing
if (items.Count <= _dataItemBatchSize)
{
ExecuteInsertTrackingDataItems(internalId, eventTypeId, items, command);
return;
}
//
// Need to split the list into max batch size chunks
List> batch = new List>(_dataItemBatchSize);
foreach (KeyValuePair kvp in items)
{
batch.Add(kvp);
if (batch.Count == _dataItemBatchSize)
{
ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command);
batch.Clear();
}
}
//
// Send anything that hasn't been sent
if (batch.Count > 0)
ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command);
}
private void ExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList> items, DbCommand command)
{
Debug.Assert( internalId != -1, "Invalid internalId" );
if ( null == items || items.Count <= 0 )
return;
if (items.Count > _dataItemAnnotationBatchSize)
throw new ArgumentOutOfRangeException("items");
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentException();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertTrackingDataItemMultiple]";
command.Parameters.Clear();
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId));
int i = 1; // base 1 to match parameter names
foreach(KeyValuePair kvp in items)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
SerializedDataItem sItem = kvp.Value as SerializedDataItem;
if (null == sItem)
sItem = SerializeDataItem(kvp.Value);
Type t = sItem.Type;
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId"+index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@FieldName"+index, sItem.FieldName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName" + index, ((null == t) ? null : t.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName" + index, ((null == t) ? null : t.Assembly.FullName)));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Str" + index, sItem.StringData));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Blob" + index, sItem.SerializedData));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@DataNonSerializable" + index, sItem.NonSerializable));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, DbType.Int64, System.Data.ParameterDirection.Output));
}
command.ExecuteNonQuery();
//
// Get all the out parameters holding the data item record ids
// This keeps us from repeatedly going into the parameters collection
// below if a data item has more than one annotation
List ids = new List(_dataItemAnnotationBatchSize);
for( i = 0; i < items.Count; i++ )
{
string index = (i+1).ToString(CultureInfo.InvariantCulture);
ids.Insert(i, (long)command.Parameters["@TrackingDataItemId" + index].Value);
}
//
// Go through all the data items and send all the annotations in batches
List> annotations = new List>(_dataItemAnnotationBatchSize);
i = 0;
foreach(KeyValuePair kvp in items)
{
TrackingDataItem item = kvp.Value;
long dataItemId = ids[i++];
foreach (string s in item.Annotations)
{
annotations.Add(new KeyValuePair(dataItemId, s));
if (annotations.Count == _dataItemAnnotationBatchSize)
{
ExecuteInsertAnnotation(internalId, annotations, command);
annotations.Clear();
}
}
}
//
// If we have anything left send them.
if ( annotations.Count > 0 )
ExecuteInsertAnnotation(internalId, annotations, command);
}
private void ExecuteInsertAnnotation(long internalId, IList> annotations, DbCommand command)
{
if ( null == annotations || annotations.Count <= 0 )
return;
if (annotations.Count > _dataItemAnnotationBatchSize)
throw new ArgumentOutOfRangeException("annotations");
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertTrackingDataItemAnnotationMultiple]";
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId));
int i = 1; // base 1 to match parameter names
foreach( KeyValuePair kvp in annotations)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value ));
}
command.ExecuteNonQuery();
return;
}
private void BatchExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList> annotations, DbCommand command)
{
if (null == annotations || annotations.Count <= 0)
return;
//
// If the list is smaller than the max batch size just send it directly
if (annotations.Count <= _eventAnnotationBatchSize)
{
ExecuteInsertEventAnnotation(internalId, eventTypeId, annotations, command);
return;
}
//
// Need to split the list into max batch size chunks
List> batch = new List>(_eventAnnotationBatchSize);
foreach (KeyValuePair kvp in annotations)
{
batch.Add(kvp);
if (batch.Count == _eventAnnotationBatchSize)
{
ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command);
batch.Clear();
}
}
//
// Send anything that hasn't been sent
if (batch.Count > 0)
ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command);
}
private void ExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList> annotations, DbCommand command)
{
Debug.Assert(internalId != -1, "Invalid internalId");
if (null == annotations || annotations.Count <= 0)
return;
if (annotations.Count > _eventAnnotationBatchSize)
throw new ArgumentOutOfRangeException("annotations");
if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) )
throw new ArgumentNullException( "command" );
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertEventAnnotationMultiple]";
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) );
command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId ) );
int i = 1; //base 1 to match parameter names
foreach(KeyValuePair kvp in annotations)
{
string index = (i++).ToString(CultureInfo.InvariantCulture);
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" +index, kvp.Key));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation"+index, kvp.Value));
}
command.ExecuteNonQuery();
return;
}
#endregion
#region Workflow Change
private void ExecuteInsertWorkflowChange( object param )
{
WorkflowTrackingRecord record = param as WorkflowTrackingRecord;
if (null == record)
throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param");
DbCommand command = DbResourceAllocator.NewCommand();
try
{
if ( ConnectionState.Open != command.Connection.State )
command.Connection.Open();
command.Transaction = command.Connection.BeginTransaction();
ExecuteInsertWorkflowChange(_internalId, record, command);
command.Transaction.Commit();
}
catch (Exception)
{
//
// Rollback can throw - ignore these exceptions
// so we don't lose the original exception
try
{
if ((null != command) && (null != command.Transaction))
command.Transaction.Rollback();
}
catch (Exception)
{
}
//
// Re-throw original exception
throw;
}
finally
{
if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State))
command.Connection.Close();
}
return;
}
private void ExecuteInsertWorkflowChange(long internalId, WorkflowTrackingRecord record, DbCommand command)
{
if (null == record)
throw new ArgumentNullException("record");
if (null == record.EventArgs)
throw new InvalidOperationException(ExecutionStringManager.InvalidWorkflowChangeArgs);
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
//
// If we haven't already serialized do so now.
// This is work we have to do to write to store in the db anyway.
if (!(record.EventArgs is SerializedWorkflowChangedEventArgs))
record = SerializeRecord(record);
//
// Insert the workflow instance event
BuildInsertWorkflowInstanceEventParameters(internalId, record, null, command);
command.ExecuteNonQuery();
//
// Get the event id for added/removed activities and annotations
long eventId = (long)command.Parameters["@WorkflowInstanceEventId1"].Value;
SerializedWorkflowChangedEventArgs sargs = (SerializedWorkflowChangedEventArgs)record.EventArgs;
//
// Normalize the activities that have been added/removed if we're tracking definitions
if ((null != sargs.AddedActivities) && (sargs.AddedActivities.Count > 0))
{
foreach (AddedActivity added in sargs.AddedActivities)
ExecuteInsertAddedActivity(internalId, added.QualifiedName, added.ParentQualifiedName, added.ActivityTypeFullName, added.ActivityTypeAssemblyFullName, added.AddedActivityActionXoml, eventId, added.Order, command);
}
if ((null != sargs.RemovedActivities) && (sargs.RemovedActivities.Count > 0))
{
foreach (RemovedActivity removed in sargs.RemovedActivities)
ExecuteInsertRemovedActivity(internalId, removed.QualifiedName, removed.ParentQualifiedName, removed.RemovedActivityActionXoml, eventId, removed.Order, command);
}
List> annotations = new List>(record.Annotations.Count);
foreach (string s in record.Annotations)
annotations.Add(new KeyValuePair(eventId, s));
BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command);
}
private void ExecuteInsertAddedActivity(long internalId, string qualifiedName, string parentQualifiedName, string typeFullName, string assemblyFullName, string addedActivityActionXoml, long eventId, int order, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertAddedActivity]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", typeFullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", assemblyFullName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AddedActivityAction", addedActivityActionXoml));
if (-1 == order)
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value));
else
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order));
command.ExecuteNonQuery();
}
private void ExecuteInsertRemovedActivity(long internalId, string qualifiedName, string parentQualifiedName, string removedActivityActionXoml, long eventId, int order, DbCommand command)
{
if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State))
throw new ArgumentNullException("command");
command.Parameters.Clear();
command.CommandType = CommandType.StoredProcedure;
command.CommandText = "[dbo].[InsertRemovedActivity]";
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName));
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@RemovedActivityAction", removedActivityActionXoml));
if (-1 == order)
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value));
else
command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order));
command.ExecuteNonQuery();
}
#endregion
#region Utility
private bool TryGetActivityInstanceId(string key, out long id)
{
//
// Check the cache of committed ids
if (_activityInstanceId.TryGetValue(key, out id))
return true;
//
// If we're batched check the cache of temp ids generated during this batch commit
if (_isTrans)
return _tmpActivityInstanceId.TryGetValue(key, out id);
else
return false; // not batched so we didn't find the id
}
private void SetActivityInstanceId(string key, long id)
{
//
// If we're batched put the ids in the temp member
// If the commit is successful we'll move these to the real member
// in IPendingWork.Complete
if (_isTrans)
{
if (!_tmpActivityInstanceId.ContainsKey(key))
_tmpActivityInstanceId.Add(key, id);
}
else
{
if (!_activityInstanceId.ContainsKey(key))
_activityInstanceId.Add(key, id);
}
}
private void RemoveActivityInstanceId(string key)
{
//
// Remove from both the temp and real caches
if (_isTrans)
{
if (_tmpActivityInstanceId.ContainsKey(key))
_tmpActivityInstanceId.Remove(key);
}
if (_activityInstanceId.ContainsKey(key))
_activityInstanceId.Remove(key);
}
private string GetSqlDateTimeString(DateTime dateTime)
{
return dateTime.Year.ToString(System.Globalization.CultureInfo.InvariantCulture) + PadToDblDigit(dateTime.Month) + PadToDblDigit(dateTime.Day) + " " + dateTime.Hour.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Minute.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Second.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Millisecond.ToString(System.Globalization.CultureInfo.InvariantCulture);
}
private string PadToDblDigit(int num)
{
string s = num.ToString(System.Globalization.CultureInfo.InvariantCulture);
if (s.Length == 1)
return "0" + s;
else
return s;
}
///
/// Build a string to uniquely identify each activity that should be recorded as a seperate instance.
/// A separate instance is defined by the combination of QualifiedName, Context and ParentContext
///
///
///
private string BuildQualifiedNameVarName( string qId, Guid context, Guid parentContext )
{
Guid hashed = HashHelper.HashServiceType(qId);
return hashed.ToString().Replace('-', '_') + "_" + context.ToString().Replace('-', '_') + "_" + parentContext.ToString().Replace('-', '_');
}
private ActivityTrackingRecord SerializeRecord( ActivityTrackingRecord record )
{
if ( ( null == record.Body ) || ( 0 == record.Body.Count ) )
return record;
for ( int i = 0; i < record.Body.Count; i++ )
record.Body[i] = SerializeDataItem( record.Body[i] );
return record;
}
private UserTrackingRecord SerializeRecord( UserTrackingRecord record )
{
if ( ( ( null == record.Body ) || ( 0 == record.Body.Count ) ) && ( null == record.EventArgs ) && ( null == record.UserData ) )
return record;
if ( null != record.UserData )
{
SerializedDataItem item = new SerializedDataItem();
byte[] data = null;
bool nonSerializable;
SerializeDataItem( record.UserData, out data, out nonSerializable );
item.Type = record.UserData.GetType();
item.StringData = record.UserData.ToString();
item.SerializedData = data;
item.NonSerializable = nonSerializable;
record.UserData = item;
}
for ( int i = 0; i < record.Body.Count; i++ )
record.Body[i] = SerializeDataItem( record.Body[i] );
return record;
}
private WorkflowTrackingRecord SerializeRecord( WorkflowTrackingRecord record )
{
if ( null == record.EventArgs )
return record;
SerializedEventArgs args;
if ( TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent )
{
//
// Convert the WorkflowChanged items
SerializedWorkflowChangedEventArgs sargs = new SerializedWorkflowChangedEventArgs();
TrackingWorkflowChangedEventArgs wargs = ( TrackingWorkflowChangedEventArgs ) record.EventArgs;
if ( null != wargs )
{
for ( int i = 0; i activities)
{
Activity removed = removedAction.OriginalRemovedActivity;
RemovedActivity removedActivity = new RemovedActivity();
removedActivity.Order = order;
removedActivity.QualifiedName = removed.QualifiedName;
if (null != removed.Parent)
removedActivity.ParentQualifiedName = removed.Parent.QualifiedName;
//
// Save the defintion of this change
removedActivity.RemovedActivityActionXoml = GetXomlDocument(removedAction);
activities.Add(removedActivity);
//
// Recursively add all contained activities to the removed list
if (removed is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)removed).Activities)
{
AddRemovedActivity(activity, activities);
}
}
}
private void AddRemovedActivity(Activity removed, IList activities)
{
RemovedActivity removedActivity = new RemovedActivity();
removedActivity.Order = -1;
removedActivity.QualifiedName = removed.QualifiedName;
if ( null != removed.Parent )
removedActivity.ParentQualifiedName = removed.Parent.QualifiedName;
activities.Add(removedActivity);
//
// Recursively add all contained activities to the removed list
if (removed is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)removed).Activities)
{
AddRemovedActivity(activity, activities);
}
}
}
private void AddAddedActivity(AddedActivityAction addedAction, int order, IList activities)
{
Activity added = addedAction.AddedActivity;
AddedActivity addedActivity = new AddedActivity();
addedActivity.Order = order;
Type type = added.GetType();
addedActivity.ActivityTypeFullName = type.FullName;
addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName;
addedActivity.QualifiedName = added.QualifiedName;
if (null != added.Parent)
addedActivity.ParentQualifiedName = added.Parent.QualifiedName;
addedActivity.AddedActivityActionXoml = GetXomlDocument(addedAction);
activities.Add(addedActivity);
//
// Recursively add all contained activities to the added list
if (added is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)added).Activities)
{
AddAddedActivity(activity, activities);
}
}
}
private void AddAddedActivity(Activity added, IList activities)
{
AddedActivity addedActivity = new AddedActivity();
addedActivity.Order = -1;
Type type = added.GetType();
addedActivity.ActivityTypeFullName = type.FullName;
addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName;
addedActivity.QualifiedName = added.QualifiedName;
if ( null != added.Parent )
addedActivity.ParentQualifiedName = added.Parent.QualifiedName;
activities.Add(addedActivity);
//
// Recursively add all contained activities to the added list
if (added is CompositeActivity)
{
foreach (Activity activity in ((CompositeActivity)added).Activities)
{
AddAddedActivity(activity, activities);
}
}
}
private SerializedDataItem SerializeDataItem( TrackingDataItem item )
{
if ( null == item )
return null;
SerializedDataItem s = new SerializedDataItem();
s.Data = item.Data;
s.Annotations.AddRange( item.Annotations );
s.FieldName = item.FieldName;
if ( null != item.Data )
{
byte[] state = null;
bool nonSerializable;
SerializeDataItem( item.Data, out state, out nonSerializable );
s.SerializedData = state;
s.StringData = item.Data.ToString();
s.Type = item.Data.GetType();
s.NonSerializable = nonSerializable;
}
return s;
}
///
/// Binary serialize an object. Used to persist trackingDataItems.
///
///
///
private void SerializeDataItem( object data, out byte[] state, out bool nonSerializable )
{
nonSerializable = false;
state = null;
if ( null == data )
return;
MemoryStream stream = new MemoryStream(1024);
BinaryFormatter bf = new BinaryFormatter();
try
{
bf.Serialize(stream, data);
state = new byte[stream.Length];
stream.Position = 0;
if (stream.Length > Int32.MaxValue)
return;
else
{
int read = 0, totalRead = 0, cbToRead = 0;
do
{
totalRead += read;
cbToRead = (int)stream.Length - totalRead;
read = stream.Read(state, totalRead, cbToRead);
} while (read > 0);
}
}
catch (SerializationException)
{
nonSerializable = true;
return;
}
finally
{
stream.Close();
}
}
///
/// Make string sql safe
///
///
///
private string SqlEscape( string val )
{
if ( null == val )
return null;
return val.Replace( "'", "''" );
}
/*
static char[] hexDigits = {
'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
///
/// Convert a byte array to a string of hex chars for sql image type
///
///
///
private static string ToHexString( byte[] bytes )
{
if ( null == bytes )
return null;
if ( 0 == bytes.Length )
return null;
char[] chars = new char[bytes.Length * 2];
for ( int i = 0; i < bytes.Length; i++ )
{
int b = bytes[i];
chars[i * 2] = hexDigits[b >> 4];
chars[i * 2 + 1] = hexDigits[b & 0xF];
}
return "0x" + new string( chars );
}
*/
private void GetCallPathKeys( IList callPath )
{
if ( ( null == callPath ) || ( callPath.Count <= 0 ) )
return;
for ( int i = 0; i < callPath.Count; i++ )
{
_callPathKey = _callPathKey + "." + callPath[i];
if ( i < callPath.Count - 1 )
_parentCallPathKey = _parentCallPathKey + "." + callPath[i];
}
if ( null != _callPathKey )
_callPathKey = SqlEscape( _callPathKey.Substring( 1 ) );
if ( null != _parentCallPathKey )
_parentCallPathKey = SqlEscape( _parentCallPathKey.Substring( 1 ) );
}
private string GetActivitiesXml(CompositeActivity root)
{
if (null == root)
return null;
StringBuilder sb = new StringBuilder();
XmlWriter writer = XmlWriter.Create(sb);
try
{
writer.WriteStartDocument();
writer.WriteStartElement("Activities");
WriteActivity(root, writer);
writer.WriteEndElement();
writer.WriteEndDocument();
}
finally
{
writer.Flush();
writer.Close();
}
return sb.ToString();
}
private void WriteActivity(Activity activity, XmlWriter writer)
{
if (null == activity)
return;
if (null == writer)
throw new ArgumentNullException("writer");
Type t = activity.GetType();
writer.WriteStartElement("Activity");
writer.WriteElementString("TypeFullName", t.FullName);
writer.WriteElementString("AssemblyFullName", t.Assembly.FullName);
writer.WriteElementString("QualifiedName", activity.QualifiedName);
//
// Don't write the element if the value is null, sql will see a missing element as a null value
if (null != activity.Parent)
writer.WriteElementString("ParentQualifiedName", activity.Parent.QualifiedName);
writer.WriteEndElement();
if (activity is CompositeActivity)
foreach (Activity a in GetAllEnabledActivities((CompositeActivity)activity))
WriteActivity(a, writer);
}
// This function returns all the executable activities including secondary flow activities.
private IList GetAllEnabledActivities(CompositeActivity compositeActivity)
{
if (compositeActivity == null)
throw new ArgumentNullException("compositeActivity");
List allActivities = new List(compositeActivity.EnabledActivities);
foreach (Activity secondaryFlowActivity in ((ISupportAlternateFlow)compositeActivity).AlternateFlowActivities)
{
if (!allActivities.Contains(secondaryFlowActivity))
allActivities.Add(secondaryFlowActivity);
}
return allActivities;
}
internal string GetXomlDocument(object obj)
{
string xomlText = null;
using (StringWriter stringWriter = new StringWriter(System.Globalization.CultureInfo.InvariantCulture))
{
using (XmlWriter xmlWriter = CreateXmlWriter(stringWriter))
{
WorkflowMarkupSerializer serializer = new WorkflowMarkupSerializer();
serializer.Serialize(xmlWriter, obj);
xomlText = stringWriter.ToString(); ;
}
}
return xomlText;
}
#endregion
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- MenuItemBinding.cs
- OleDbDataAdapter.cs
- PermissionSetEnumerator.cs
- RangeBase.cs
- MemoryResponseElement.cs
- ProcessHostServerConfig.cs
- XmlNamespaceDeclarationsAttribute.cs
- LinqDataSourceStatusEventArgs.cs
- FormsAuthenticationCredentials.cs
- MessagePartSpecification.cs
- XmlSubtreeReader.cs
- recordstate.cs
- CodeRemoveEventStatement.cs
- CacheRequest.cs
- StringToken.cs
- DataService.cs
- ObjectItemLoadingSessionData.cs
- designeractionlistschangedeventargs.cs
- HtmlWindowCollection.cs
- SqlDataSourceEnumerator.cs
- WebRequestModuleElement.cs
- ValueQuery.cs
- NTAccount.cs
- MarkupProperty.cs
- HwndTarget.cs
- DetailsViewDeletedEventArgs.cs
- AsyncParams.cs
- HttpDictionary.cs
- SequentialOutput.cs
- RayMeshGeometry3DHitTestResult.cs
- ObjectRef.cs
- SqlLiftWhereClauses.cs
- latinshape.cs
- EdmItemCollection.cs
- AppDomainUnloadedException.cs
- FunctionDefinition.cs
- RemoveStoryboard.cs
- FrameworkContentElement.cs
- JavaScriptSerializer.cs
- CachingHintValidation.cs
- WebChannelFactory.cs
- TemplatePropertyEntry.cs
- BmpBitmapEncoder.cs
- ItemsChangedEventArgs.cs
- GenerateTemporaryAssemblyTask.cs
- Resources.Designer.cs
- FixedSOMGroup.cs
- NetworkStream.cs
- CodeBinaryOperatorExpression.cs
- XmlTextReader.cs
- LifetimeServices.cs
- EditorResources.cs
- Condition.cs
- Storyboard.cs
- SimpleBitVector32.cs
- SvcMapFileSerializer.cs
- TargetParameterCountException.cs
- ListBoxItem.cs
- GeometryConverter.cs
- ViewBox.cs
- PropertyGridEditorPart.cs
- ClrPerspective.cs
- EqualityArray.cs
- CqlIdentifiers.cs
- AnnotationHelper.cs
- SystemIPv4InterfaceProperties.cs
- ViewStateChangedEventArgs.cs
- ListViewAutomationPeer.cs
- EventLogPermissionAttribute.cs
- BorderSidesEditor.cs
- CacheDependency.cs
- BackEase.cs
- DbDeleteCommandTree.cs
- SchemaElementLookUpTableEnumerator.cs
- LookupNode.cs
- Pair.cs
- InitializationEventAttribute.cs
- LinqDataSourceContextEventArgs.cs
- DataGridCaption.cs
- ProxyGenerator.cs
- VersionPair.cs
- PagePropertiesChangingEventArgs.cs
- CmsUtils.cs
- DiffuseMaterial.cs
- ObjectRef.cs
- ADConnectionHelper.cs
- DataGridViewCell.cs
- DataGridColumnStyleMappingNameEditor.cs
- messageonlyhwndwrapper.cs
- ComponentDispatcher.cs
- ComplexType.cs
- MetadataException.cs
- ModuleBuilderData.cs
- HtmlAnchor.cs
- Compiler.cs
- ScrollBarRenderer.cs
- HtmlFormParameterWriter.cs
- ComponentResourceManager.cs
- BuildManager.cs
- X509Chain.cs