Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Tracking / SqlTrackingService.cs / 1305376 / SqlTrackingService.cs
using System; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Collections.Specialized; using System.Text; using System.IO; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.Data.Common; using System.Data; using System.Data.SqlClient; using System.Timers; using System.Diagnostics; using System.Reflection; using System.Workflow.Runtime; using System.Workflow.ComponentModel; using System.Workflow.Runtime.Hosting; using System.Text.RegularExpressions; using System.Threading; using System.Transactions; using System.Globalization; using System.Workflow.ComponentModel.Serialization; using System.ComponentModel.Design.Serialization; using System.Xml; using System.Configuration; namespace System.Workflow.Runtime.Tracking { public sealed class SqlTrackingService : TrackingService, IProfileNotification { #region Private/Protected Members private bool _isTrans = true; private bool _partition = false; private bool _defaultProfile = true; private bool _enableRetries = false; private bool _ignoreCommonEnableRetries = false; private DateTime _lastProfileCheck; private System.Timers.Timer _timer = new System.Timers.Timer(); private double _interval = 60000; //private static int _deadlock = 1205; private TypeKeyedCollection _types = new TypeKeyedCollection(); private object _typeCacheLock = new object(); private WorkflowCommitWorkBatchService _transactionService; private DbResourceAllocator _dbResourceAllocator; private static Version UnknownProfileVersionId = new Version( 0, 0 ); // Saved from constructor input to be used in service start initialization private NameValueCollection _parameters; string _unvalidatedConnectionString; private delegate void ExecuteRetriedDelegate(object param); #endregion #region Configuration Properties public string ConnectionString { get { return _unvalidatedConnectionString; } } ////// Determines if tracking data should be held and transactionally written to the database at persistence points. /// ///public bool IsTransactional { get { return _isTrans; } set { _isTrans = value; } } /// /// Indicates that records should be moved from the active instance tables to the appropriate parition tables when the instance completes. /// public bool PartitionOnCompletion { get { return _partition; } set { _partition = value; } } ////// Determines if the default profile should be used for workflow types that do not have a profile specified for them. /// ///public bool UseDefaultProfile { get { return _defaultProfile; } set { _defaultProfile = value; } } /// /// The time interval, in milliseconds, at which to check the database for changes to profiles. /// Default is 60000. /// ////// Setting the interval results in the next check to occur the specified number of millisecond /// from the time at which the property is set. /// public double ProfileChangeCheckInterval { get { return _interval; } set { if ( value <= 0 ) throw new ArgumentException( ExecutionStringManager.InvalidProfileCheckValue ); _interval = value; // // Set the timer's interval. // This will reset the timer _timer.Interval = _interval; } } public bool EnableRetries { get { return _enableRetries; } set { _enableRetries = value; _ignoreCommonEnableRetries = true; } } internal DbResourceAllocator DbResourceAllocator { get { return this._dbResourceAllocator; } } #endregion #region Construction public SqlTrackingService(string connectionString) { if (String.IsNullOrEmpty(connectionString)) throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString); _unvalidatedConnectionString = connectionString; } public SqlTrackingService(NameValueCollection parameters) { if (parameters == null) throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters); if (parameters.Count > 0) { foreach (string key in parameters.Keys) { if (0 == string.Compare("IsTransactional", key, StringComparison.OrdinalIgnoreCase)) _isTrans = bool.Parse(parameters[key]); else if (0 == string.Compare("UseDefaultProfile", key, StringComparison.OrdinalIgnoreCase)) _defaultProfile = bool.Parse(parameters[key]); else if (0 == string.Compare("PartitionOnCompletion", key, StringComparison.OrdinalIgnoreCase)) _partition = bool.Parse(parameters[key]); else if (0 == string.Compare("ProfileChangeCheckInterval", key, StringComparison.OrdinalIgnoreCase)) { _interval = double.Parse(parameters[key], NumberFormatInfo.InvariantInfo); if (_interval <= 0) throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue); } else if (0 == string.Compare("ConnectionString", key, StringComparison.OrdinalIgnoreCase)) _unvalidatedConnectionString = parameters[key]; else if (0 == string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase)) { _enableRetries = bool.Parse(parameters[key]); _ignoreCommonEnableRetries = true; } } } _parameters = parameters; } #endregion #region WorkflowRuntimeService override protected internal void Start() { _lastProfileCheck = DateTime.UtcNow; _dbResourceAllocator = new DbResourceAllocator(this.Runtime, _parameters, _unvalidatedConnectionString); // Check connection string mismatch if using SharedConnectionWorkflowTransactionService _transactionService = this.Runtime.GetService(); _dbResourceAllocator.DetectSharedConnectionConflict(_transactionService); // // If we didn't find a local value for enable retries // check in the common section if ((!_ignoreCommonEnableRetries)&&(null != base.Runtime)) { NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters; if (commonConfigurationParameters != null) { // Then scan for connection string in the common configuration parameters section foreach (string key in commonConfigurationParameters.AllKeys) { if (string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase) == 0) { _enableRetries = bool.Parse(commonConfigurationParameters[key].Value); break; } } } } _timer.Interval = _interval; _timer.AutoReset = false; // ensure that only one timer thread is checking for profile changes at a time _timer.Elapsed += new ElapsedEventHandler( CheckProfileChanges ); _timer.Start(); base.Start(); } #endregion WorkflowRuntimeService #region IProfileNotification Implementation protected internal override TrackingChannel GetTrackingChannel( TrackingParameters parameters ) { if ( null == parameters ) throw new ArgumentNullException( "parameters" ); // // Return a new channel for this instance // Give it the parameters and this to store return new SqlTrackingChannel( parameters, this ); } public event EventHandler ProfileUpdated; public event EventHandler ProfileRemoved; protected internal override TrackingProfile GetProfile(Type workflowType, Version profileVersion) { if ( null == workflowType ) throw new ArgumentNullException( "workflowType" ); // parameter wantToCreateDefault = false: // looking for a specific version that has already been running with this instance; don't use a default here return GetProfileByScheduleType(workflowType, profileVersion, false); } protected internal override bool TryGetProfile(Type workflowType, out TrackingProfile profile) { if ( null == workflowType ) throw new ArgumentNullException( "workflowType" ); profile = GetProfileByScheduleType(workflowType, SqlTrackingService.UnknownProfileVersionId, _defaultProfile); if ( null == profile ) return false; else return true; } protected internal override TrackingProfile GetProfile(Guid scheduleInstanceId) { TrackingProfile profile = null; GetProfile( scheduleInstanceId, out profile ); return profile; } private bool GetProfile( Guid scheduleInstanceId, out TrackingProfile profile ) { profile = null; DbCommand cmd = this._dbResourceAllocator.NewCommand(); cmd.CommandType = CommandType.StoredProcedure; cmd.CommandText = "[dbo].[GetInstanceTrackingProfile]"; cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@InstanceId", scheduleInstanceId ) ); DbDataReader reader = null; try { reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection ); // // Should only reach here in non exception state if ( !reader.HasRows ) { // // Didn't find a specific profile for this instance reader.Close(); profile = null; return false; } else { if ( !reader.Read() ) { reader.Close(); profile = null; return false; } if ( reader.IsDBNull( 0 ) ) profile = null; else { string tmp = reader.GetString( 0 ); TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader( tmp ); profile = serializer.Deserialize( pReader ); } finally { if ( null != pReader ) pReader.Close(); } } return true; } } finally { if ( ( null != reader ) && ( !reader.IsClosed ) ) reader.Close(); if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); } } protected internal override bool TryReloadProfile(Type workflowType, Guid scheduleInstanceId, out TrackingProfile profile) { if ( null == workflowType ) throw new ArgumentNullException( "workflowType" ); bool found = GetProfile( scheduleInstanceId, out profile ); if ( found ) return true; else { profile = null; return false; } } #endregion #region Profile Management Methods private void CheckProfileChanges( object sender, ElapsedEventArgs e ) { DbCommand cmd = null; DbDataReader reader = null; try { if ( ( null == ProfileUpdated ) && ( null == ProfileRemoved ) ) return; // no one to notify Debug.WriteLine( "Checking for updated profiles..." ); cmd = this._dbResourceAllocator.NewCommand(); cmd.CommandText = "GetUpdatedTrackingProfiles"; cmd.CommandType = CommandType.StoredProcedure; cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@LastCheckDateTime", _lastProfileCheck ) ); DbParameter param = this._dbResourceAllocator.NewDbParameter(); param.ParameterName = "@MaxCheckDateTime"; param.DbType = DbType.DateTime; param.Direction = System.Data.ParameterDirection.Output; cmd.Parameters.Add( param ); reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection ); // // No changes if ( !reader.HasRows ) return; while ( reader.Read() ) { Type t = null; string tmp = null; TrackingProfile profile = null; t = Assembly.Load( reader[1] as string ).GetType( reader[0] as string ); if ( null == t ) continue; tmp = reader[2] as string; if ( null == tmp ) { if ( null != ProfileRemoved ) ProfileRemoved( this, new ProfileRemovedEventArgs( t ) ); } else { TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader( tmp ); profile = serializer.Deserialize( pReader ); } finally { if ( null != pReader ) pReader.Close(); } if ( null != ProfileUpdated ) ProfileUpdated( this, new ProfileUpdatedEventArgs( t, profile ) ); } Debug.WriteLine( ExecutionStringManager.UpdatedProfile + t.FullName ); } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); // // This should never be null/empty unless the proc failed which should throw if ( null != cmd ) { // // If the value is null we error'd so keep the same last time for the next check if ( null != cmd.Parameters[1].Value ) _lastProfileCheck = ( DateTime ) cmd.Parameters[1].Value; } if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); // // Start the timer again (autoreset is false to avoid multiple threads checking for profile changes) _timer.Start(); } } #endregion #region Private Methods private void ExecuteRetried(ExecuteRetriedDelegate executeRetried, object param) { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); executeRetried(param); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteRetried caught exception: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " retrying."); continue; } throw; } } } private DbDataReader ExecuteReaderRetried(DbCommand command, CommandBehavior behavior) { DbDataReader reader = null; short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); reader = command.ExecuteReader(behavior); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteReaderRetried caught exception from ExecuteReader: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried retrying."); continue; } throw; } } return reader; } private void ExecuteNonQueryRetried(DbCommand command) { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G",System.Globalization.CultureInfo.InvariantCulture)); command.ExecuteNonQuery(); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryRetried caught exception from ExecuteNonQuery: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried retrying."); continue; } throw; } } } private void ExecuteNonQueryWithTxRetried(DbCommand command) { try { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); command.Transaction = command.Connection.BeginTransaction(); command.ExecuteNonQuery(); command.Transaction.Commit(); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried caught exception from ExecuteNonQuery: " + e.ToString()); try { if (null != command.Transaction) command.Transaction.Rollback(); } catch { // // Rollback() can throw, nothing to do but ---- if this happens // so that we don't lose the original exception } if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried retrying."); continue; } throw; } } } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ResetConnectionForCommand(DbCommand command) { if ( null == command ) return; if ( null != command.Connection ) { if (ConnectionState.Open != command.Connection.State) { if (ConnectionState.Closed != command.Connection.State) command.Connection.Close(); command.Connection.Dispose(); command.Connection = _dbResourceAllocator.OpenNewConnectionNoEnlist(); } } } internal static XmlWriter CreateXmlWriter(TextWriter output) { XmlWriterSettings settings = new XmlWriterSettings(); settings.Indent = true; settings.IndentChars = ("\t"); settings.OmitXmlDeclaration = true; settings.CloseOutput = true; return XmlWriter.Create(output as TextWriter, settings); } private TrackingProfile GetProfileByScheduleType(Type workflowType, Version profileVersionId, bool wantToCreateDefault) { DbCommand cmd = this._dbResourceAllocator.NewCommand(); DbDataReader reader = null; TrackingProfile profile = null; cmd.CommandType = CommandType.StoredProcedure; cmd.CommandText = "dbo.GetTrackingProfile"; cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@TypeFullName", workflowType.FullName) ); cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@AssemblyFullName", workflowType.Assembly.FullName) ); if (profileVersionId != SqlTrackingService.UnknownProfileVersionId) cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@Version", profileVersionId.ToString() ) ); cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@CreateDefault", wantToCreateDefault) ); try { reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection); if ( reader.Read() ) { string tmp = reader[0] as string; if ( null != tmp ) { TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader( tmp ); profile = serializer.Deserialize( pReader ); } finally { if ( null != pReader ) pReader.Close(); } } } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); } return profile; } #endregion #region Private Classes private class TypeKeyedCollection : KeyedCollection { protected override string GetKeyForItem( Type item ) { return item.AssemblyQualifiedName; } } private class SerializedDataItem : TrackingDataItem { public Type Type; public string StringData; public byte[] SerializedData; public bool NonSerializable; } private class SerializedEventArgs : EventArgs { public Type Type; public byte[] SerializedArgs; } private struct AddedActivity { public string ActivityTypeFullName; public string ActivityTypeAssemblyFullName; public string QualifiedName; public string ParentQualifiedName; public string AddedActivityActionXoml; public int Order; } private struct RemovedActivity { public string QualifiedName; public string ParentQualifiedName; public string RemovedActivityActionXoml; public int Order; } private class SerializedWorkflowChangedEventArgs : SerializedEventArgs { public IList AddedActivities = new List (); public IList RemovedActivities = new List (); } #endregion Private Classes internal class SqlTrackingChannel : TrackingChannel, IPendingWork { #region Private Members private SqlTrackingService _service = null; private string _callPathKey = null, _parentCallPathKey = null; private bool _isTrans = false; private long _internalId = -1; private long _tmpInternalId = -1; private Dictionary _activityInstanceId = new Dictionary (32); private Dictionary _tmpActivityInstanceId = new Dictionary (10); private TrackingParameters _parameters = null; private bool _pendingArchive = false; private bool _completedTerminated = false; private static int _activityEventBatchSize = 5; private static int _dataItemBatchSize = 5; private static int _dataItemAnnotationBatchSize = 5; private static int _eventAnnotationBatchSize = 5; #endregion #region Construction protected SqlTrackingChannel() { } public SqlTrackingChannel( TrackingParameters parameters, SqlTrackingService service ) { if ( null == service ) return; _service = service; _parameters = parameters; _isTrans = service.IsTransactional; GetCallPathKeys( parameters.CallPath ); if (!_isTrans) { // // Look up instance id or insert if new instance // If we're transactional we'll do this in the first IPendingWork.Commit() _service.ExecuteRetried(ExecuteInsertWorkflowInstance, null); } } #endregion #region Public Properties private DbResourceAllocator DbResourceAllocator { get { return _service.DbResourceAllocator; } } private WorkflowCommitWorkBatchService WorkflowCommitWorkBatchService { get { return _service._transactionService; } } #endregion #region TrackingChannel protected internal override void InstanceCompletedOrTerminated() { if (_isTrans) { // // Indicate that at the next batch commit we should stamp the enddate _completedTerminated = true; // // Indicate that when the next batch commit completes successfully we should partition this instance if (_service.PartitionOnCompletion) _pendingArchive = true; } else { _service.ExecuteRetried(ExecuteSetEndDate, null); if (_service.PartitionOnCompletion) _service.ExecuteRetried(PartitionInstance, null); } } private void PartitionInstance(object param) { DbCommand command = null; try { // // Allow enlisting if there is an ambient tx // This can only happen on a host initiated terminate in V1. DbConnection connection = DbResourceAllocator.OpenNewConnection(false); command = DbResourceAllocator.NewCommand(connection); command.CommandText = "[dbo].[PartitionWorkflowInstance]"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", _internalId)); command.ExecuteNonQuery(); } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ExecuteSetEndDate(object param) { DbCommand command = null; try { // // Allow enlisting if there is an ambient tx // This can only happen on a host initiated terminate in V1. DbConnection connection = DbResourceAllocator.OpenNewConnection(false); command = DbResourceAllocator.NewCommand(connection); ExecuteSetEndDate(_internalId, command); } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ExecuteSetEndDate(long internalId, DbCommand command) { if (null == command) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandText = "[dbo].[SetWorkflowInstanceEndDateTime]"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EndDateTime", DateTime.UtcNow)); command.ExecuteNonQuery(); } protected internal override void Send(TrackingRecord record) { if ( ( Guid.Empty == _parameters.InstanceId ) || ( null == record ) ) throw new ArgumentException( ExecutionStringManager.MissingParametersTrack ); if ( record is ActivityTrackingRecord ) { ActivityTrackingRecord act = record as ActivityTrackingRecord; if ( _isTrans ) WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( act ) ); else _service.ExecuteRetried(ExecuteInsertActivityStatusInstance, SerializeRecord(act)); } else if ( record is WorkflowTrackingRecord ) { // // Instance events cannot be batched - many occur when there isn't a batch WorkflowTrackingRecord inst = ( WorkflowTrackingRecord ) record; if ( _isTrans ) { WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( inst ) ); } else { if ( TrackingWorkflowEvent.Changed == inst.TrackingWorkflowEvent ) { // // Dynamic updates are inserted in the WorkflowInstanceEvent table // and then the arg (workflowchanges) is normalized into xoml // and the added/removed activities tables _service.ExecuteRetried(ExecuteInsertWorkflowChange, SerializeRecord(inst)); } else { _service.ExecuteRetried(ExecuteInsertWorkflowInstanceEvent, SerializeRecord(inst)); } } } else if ( record is UserTrackingRecord ) { UserTrackingRecord user = ( UserTrackingRecord ) record; if ( _isTrans ) WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( user ) ); else _service.ExecuteRetried(ExecuteInsertUserEvent, SerializeRecord(user)); } } #endregion #region IPendingWork Members public bool MustCommit( ICollection items ) { // // Never force a persist - this is a balancing act but the V1 // decision is to err on the side of persisting only when the workflow // requires it based on its model. If the workflow uses persistence points // wisely this is great. If it goes a long time between persists with lots // of events the persists will take a long time as the batch can be huge. return false; } public void Commit(System.Transactions.Transaction transaction, ICollection items) { if ((null == items) || (0 == items.Count)) return; DbCommand command = null; DbConnection connection = null; bool needToCloseConnection = false; DbTransaction localTransaction = null; bool commitTx = false; try { // // Get the connection and transaction // The connection might be shared or local // The tx is shared and may be either a DTC or a local sql tx connection = DbResourceAllocator.GetEnlistedConnection( this.WorkflowCommitWorkBatchService, transaction, out needToCloseConnection); localTransaction = DbResourceAllocator.GetLocalTransaction( this.WorkflowCommitWorkBatchService, transaction); if (null == localTransaction) { localTransaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); commitTx = true; } command = DbResourceAllocator.NewCommand(connection); command.Transaction = localTransaction; // // If we don't have the internal id for the instance this is the first batch // for this channel instance. If this is a new instance the following will insert // a new instance record in the db and set _tmpInternalId. If this is a reload of // an existing instance it will just do a lookup and set _tmpInternalId // In Completed we will assign _tmpInternalId to _internalId if the batch is successful. long internalId = -1; if (_internalId <= 0) { ExecuteInsertWorkflowInstance(command); internalId = _tmpInternalId; } else internalId = _internalId; IList activities = new List (5); WorkflowTrackingRecord workflow = null; // // Build the batch statement foreach (object o in items) { if (!(o is TrackingRecord)) continue; if (o is ActivityTrackingRecord) { // // If we have a cached workflow tracking record send it if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ActivityTrackingRecord activity = (ActivityTrackingRecord)o; // // Add this event to the list and send to the db if we've hit our limit activities.Add(activity); if (_activityEventBatchSize == activities.Count) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities = new List (5); } } else if (o is UserTrackingRecord) { // // If we have cached activity or workflow tracking records send them if (activities.Count > 0) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities.Clear(); } if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ExecuteInsertUserEvent(internalId, (UserTrackingRecord)o, command); } else if (o is WorkflowTrackingRecord) { // // If we have cached activity tracking records send them if (activities.Count > 0) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities.Clear(); } WorkflowTrackingRecord record = (WorkflowTrackingRecord)o; if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent) { // // If we're already holding a workflow tracking record send both to the db // else cache it and wait for the next workflow tracking record if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ExecuteInsertWorkflowChange(internalId, record, command); } else { // // If we're already holding a workflow tracking record send both to the db // else cache it and wait for the next workflow tracking record if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, record, command); workflow = null; } else { workflow = record; } } } } // // If we ended up with any activities event send them. if (activities.Count > 0) ExecuteInsertActivityStatusInstance(internalId, activities, command); if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } if (_completedTerminated) ExecuteSetEndDate(internalId, command); if (commitTx) localTransaction.Commit(); } catch (DbException e) { if (commitTx) localTransaction.Rollback(); WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Error writing tracking data to database: " + e); throw; } finally { if (needToCloseConnection) { connection.Dispose(); } } return; } public void Complete(bool succeeded, ICollection items) { // // If we didn't succeed on commit reset all flags if (!succeeded) { _completedTerminated = false; _pendingArchive = false; _tmpInternalId = -1; _tmpActivityInstanceId.Clear(); return; } // // Commit succeeded - move the tmp internalId to the real internalId member if (-1 == _internalId && _tmpInternalId > 0) _internalId = _tmpInternalId; // // Move the tmp activity instance ids to the real activity instance id member if (null != _tmpActivityInstanceId && _tmpActivityInstanceId.Count > 0) { foreach (string key in _tmpActivityInstanceId.Keys) { if (!_activityInstanceId.ContainsKey(key)) _activityInstanceId.Add(key, _tmpActivityInstanceId[key]); } _tmpActivityInstanceId.Clear(); } if ( _pendingArchive ) { try { _service.ExecuteRetried(PartitionInstance, null); } catch ( Exception e ) { // // ---- exceptions here, do not fail the instance. // Partition logic can be re-run to clean up on failure WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, string.Format(System.Globalization.CultureInfo.InvariantCulture, "Error partitioning instance {0}: {1}", _parameters.InstanceId, e.ToString())); } } } #endregion #region Sql Commands - InsertWorkflowInstance private void ExecuteInsertWorkflowInstance( object param ) { DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbCommand command = DbResourceAllocator.NewCommand(conn); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted); command.Connection = conn; command.Transaction = tx; _internalId = ExecuteInsertWorkflowInstance( command ); tx.Commit(); } catch (Exception) { try { if ( null != tx ) tx.Rollback(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception } // // Re-throw original exception throw; } finally { if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) ) conn.Close(); } return; } private long ExecuteInsertWorkflowInstance( DbCommand command ) { if ( null == command ) throw new ArgumentNullException("command"); if ((null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(ExecutionStringManager.InvalidCommandBadConnection, "command"); // // Write the type and the workflow definition string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) InsertWorkflow( command, _parameters.InstanceId, null, _parameters.RootActivity); else InsertWorkflow( command, _parameters.InstanceId, _parameters.WorkflowType, _parameters.RootActivity); // // Write the instance record BuildInsertWorkflowInstanceParameters( command ); DbDataReader reader = null; try { reader = command.ExecuteReader(); if (reader.Read()) _tmpInternalId = reader.GetInt64(0); return _tmpInternalId; } finally { if (null != reader) reader.Close(); } } private void BuildInsertWorkflowInstanceParameters( DbCommand command ) { Debug.Assert((command != null), "Null command"); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflowInstance]"; command.Parameters.Clear(); bool xamlInst = false; string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) xamlInst = true; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceId", _parameters.InstanceId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TypeFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.FullName ) )); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@AssemblyFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.Assembly.FullName ) )); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", _parameters.ContextGuid ) ); if ( Guid.Empty != _parameters.CallerInstanceId ) { command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerInstanceId", _parameters.CallerInstanceId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallPath", _callPathKey ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerContextGuid", _parameters.CallerContextGuid ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerParentContextGuid", _parameters.CallerParentContextGuid ) ); } command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime", this.GetSqlDateTimeString( DateTime.UtcNow ) ) ); } private void InsertWorkflow( DbCommand command, Guid workflowInstanceId, Type workflowType, Activity rootActivity) { string xoml = null; // // If we've already seen this type just return if (null != workflowType) { lock (_service._typeCacheLock) { if (_service._types.Contains(workflowType.AssemblyQualifiedName)) return; else xoml = GetXomlDocument(rootActivity); } } else { // Don't forget to deal with XOML-only workflows lock (_service._typeCacheLock) { xoml = GetXomlDocument(rootActivity); } } // // It is possible to ---- here but the pk specifies ignore duplicate key // This is better than taking a lock around all of the logic in this method. command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflow]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@IsInstanceType", (null == workflowType ? true : false))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowDefinition", xoml)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowId", DbType.Int32, System.Data.ParameterDirection.Output)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Exists", DbType.Boolean, System.Data.ParameterDirection.Output)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Activities", GetActivitiesXml((CompositeActivity)rootActivity))); command.ExecuteNonQuery(); // // Add this to the list of types we've already seen so we don't go // through the serialization overhead again and hit the db only to learn we've already stored it // Use a lock here to avoid ---- on _types dictionary if (null != workflowType) { lock (_service._typeCacheLock) { if (!_service._types.Contains(workflowType.AssemblyQualifiedName)) { _service._types.Add(workflowType); } } } return; } #endregion #region Sql Commands - InsertWorkflowInstanceEvent private void ExecuteInsertWorkflowInstanceEvent( object param ) { WorkflowTrackingRecord record = param as WorkflowTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbCommand command = DbResourceAllocator.NewCommand( conn ); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted ); command.Connection = conn; command.Transaction = tx; ExecuteInsertWorkflowInstanceEvent( _internalId, record, null, command ); tx.Commit(); } catch (Exception) { try { if ( null != tx ) tx.Rollback(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception } // // Re-throw original exception throw; } finally { if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) ) conn.Close(); } return; } private void ExecuteInsertWorkflowInstanceEvent( long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command ) { if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentException(); BuildInsertWorkflowInstanceEventParameters( internalId, record1, record2, command ); command.ExecuteNonQuery(); long eventId1 = (long) command.Parameters["@WorkflowInstanceEventId1"].Value; Debug.Assert(eventId1 > 0, "Invalid eventId1"); long eventId2 = -1; if (null != record2) { eventId2 = (long)command.Parameters["@WorkflowInstanceEventId2"].Value; Debug.Assert(eventId2 > 0, "Invalid eventId2"); } List > annotations = new List >(record1.Annotations.Count + (null == record2 ? 0 : record2.Annotations.Count)); foreach (string s in record1.Annotations) annotations.Add(new KeyValuePair (eventId1, s)); if (null != record2) { foreach (string s in record2.Annotations) annotations.Add(new KeyValuePair (eventId2, s)); } BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command); } private void BuildInsertWorkflowInstanceEventParameters(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command) { if ( null == record1 ) throw new ArgumentNullException( "record" ); if ( null == command ) throw new ArgumentNullException( "command" ); Debug.Assert( internalId != -1, "Invalid internalId" ); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflowInstanceEvent]"; command.Parameters.Clear(); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TrackingWorkflowEventId1", ( int ) record1.TrackingWorkflowEvent ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime1", record1.EventDateTime ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder1", record1.EventOrder ) ); if ( null != record1.EventArgs ) { Type t = record1.EventArgs.GetType(); Byte[] data = null; if (!(record1.EventArgs is SerializedEventArgs)) record1 = SerializeRecord(record1); SerializedEventArgs sargs = record1.EventArgs as SerializedEventArgs; data = sargs.SerializedArgs; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgTypeFullName1", t.FullName ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgAssemblyFullName1", t.Assembly.FullName ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArg1", data ) ); } command.Parameters.Add( DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId1", DbType.Int64, ParameterDirection.Output)); if (null != record2) { command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId2", (int)record2.TrackingWorkflowEvent)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime2", record2.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder2", record2.EventOrder)); if (null != record2.EventArgs) { Type t = record2.EventArgs.GetType(); Byte[] data = null; if (!(record2.EventArgs is SerializedEventArgs)) record2 = SerializeRecord(record2); SerializedEventArgs sargs = record2.EventArgs as SerializedEventArgs; data = sargs.SerializedArgs; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName2", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName2", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg2", data)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId2", DbType.Int64, ParameterDirection.Output)); } } #endregion #region Sql Commands - InsertActivityStatusInstance private void ExecuteInsertActivityStatusInstance(object param) { ActivityTrackingRecord record = param as ActivityTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidActivityTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted ); DbCommand command = conn.CreateCommand(); command.Transaction = tx; IList activity = new List (1); activity.Add(record); ExecuteInsertActivityStatusInstance( _internalId, activity, command ); tx.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ( null != tx ) tx.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != conn) && (ConnectionState.Closed != conn.State)) conn.Close(); } return; } private void ExecuteInsertActivityStatusInstance(long internalId, IList activities, DbCommand command) { if (null == activities || activities.Count <= 0) return; if (activities.Count > _activityEventBatchSize) throw new ArgumentOutOfRangeException("activities"); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertActivityExecutionStatusEventMultiple]"; // // Add the common parameters command.Parameters.Clear(); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId)); // // If we have the workflow's internal id use it to avoid the look up in the db DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if (internalId > 0) param.Value = internalId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceContextGuid", _parameters.ContextGuid)); // // Hashed ids of QName, context and pcontext used as key for storing activity record ids // Save these for each record in the list so we don't have to recompute them below when adding to the cache string[] ids = new string[] { null, null, null, null, null }; for (int i = 0; i < activities.Count; i++) { ActivityTrackingRecord record = activities[i]; long aid = -1; ids[i] = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid); TryGetActivityInstanceId(ids[i], out aid); BuildInsertActivityStatusEventParameters(internalId, aid, i+1, record, command); } command.ExecuteNonQuery(); // // Get all the output ids long[] eventIds = new long[] { -1, -1, -1, -1, -1 }; for (int i = 0; i < activities.Count; i++) { string index = (i + 1).ToString(CultureInfo.InvariantCulture); // // ActivityInstanceId long aId = (long)command.Parameters["@ActivityInstanceId" + index].Value; Debug.Assert(aId > 0, "Invalid @ActivityInstanceId output parameter value"); // // For all status changes that aren't "Closed" add the id to the instance cache // Set... method checks and only adds if it does already exist. // To keep the cache size under control remove entries for activities that have closed. // The activity might fault and need to do a lookup in the db but this isn't the common // path and the db lookup isn't very expensive. if (ActivityExecutionStatus.Closed != activities[i].ExecutionStatus) SetActivityInstanceId(ids[i], aId); else RemoveActivityInstanceId(ids[i]); // // ActivityExecutionStatusEventId long aeseId = (long)command.Parameters["@ActivityExecutionStatusEventId" + index].Value; Debug.Assert(aeseId > 0, "Invalid @ActivityExecutionStatusEventId output parameter value"); eventIds[i] = aeseId; } List > annotations = new List >(10); List > items = new List >(10); for (int i = 0; i < activities.Count; i++) { ActivityTrackingRecord record = activities[i]; // // Get the ActivityExecutionStatusEventId long eventId = eventIds[i]; if (eventId<=0) throw new InvalidOperationException(); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair (eventId, s)); foreach (TrackingDataItem item in record.Body) items.Add(new KeyValuePair (eventId, item)); } BatchExecuteInsertEventAnnotation(internalId, 'a', annotations, command); BatchExecuteInsertTrackingDataItems(internalId, 'a', items, command); } private void BuildInsertActivityStatusEventParameters( long internalId, long activityInstanceId, int parameterId, ActivityTrackingRecord record, DbCommand command ) { string paramIdString = parameterId.ToString(CultureInfo.InvariantCulture); // // If we have the activity's instance id use it to avoid the look up in the db DbParameter param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId" + paramIdString, DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if ( activityInstanceId > 0 ) param.Value = activityInstanceId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName" + paramIdString, record.QualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid" + paramIdString, record.ContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid" + paramIdString, record.ParentContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ExecutionStatusId" + paramIdString, (int)record.ExecutionStatus)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime" + paramIdString, record.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder" + paramIdString, record.EventOrder)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ActivityExecutionStatusEventId" + paramIdString, DbType.Int64, ParameterDirection.Output)); } #endregion #region Sql Commands - InsertUserEvent private void ExecuteInsertUserEvent( object param ) { UserTrackingRecord record = param as UserTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidUserTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted ); DbCommand command = conn.CreateCommand(); command.Transaction = tx; ExecuteInsertUserEvent( _internalId, record, command ); tx.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ( null != tx ) tx.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) ) conn.Close(); } return; } private void ExecuteInsertUserEvent( long internalId, UserTrackingRecord record, DbCommand command ) { if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentException(); long aid = -1; bool cached = false; // // Check if we have the activityInstanceId in the cache - we cache to avoid repeatedly searching this table. string id = BuildQualifiedNameVarName( record.QualifiedName, record.ContextGuid, record.ParentContextGuid ); if ( TryGetActivityInstanceId( id, out aid ) ) cached = true; BuildInsertUserEventParameters( internalId, aid, record, command ); command.ExecuteNonQuery(); // // If we didn't already have the activityInstanceId get it from the IN/OUT param and put it in the cache if ( !cached ) SetActivityInstanceId( id, ( long ) command.Parameters["@ActivityInstanceId"].Value ); long eventId = (long)command.Parameters["@UserEventId"].Value; List > annotations = new List >(10); List > items = new List >(10); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair (eventId, s)); foreach (TrackingDataItem item in record.Body) items.Add(new KeyValuePair (eventId, item)); BatchExecuteInsertEventAnnotation(internalId, 'u', annotations, command); BatchExecuteInsertTrackingDataItems(internalId, 'u', items, command); } private void BuildInsertUserEventParameters( long internalId, long activityInstanceId, UserTrackingRecord record, DbCommand command ) { Debug.Assert( internalId != -1, "Invalid internalId" ); Debug.Assert( ( command != null ), "Null command passed to BuildInsertActivityStatusEventParameters" ); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertUserEvent]"; command.Parameters.Clear(); DbParameter param = DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", DbType.Int64 ); command.Parameters.Add( param ); param.Value = internalId; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder", record.EventOrder ) ); // // If we have the activity's instance id use it to avoid the look up in the db param = DbResourceAllocator.NewDbParameter( "@ActivityInstanceId", DbType.Int64, System.Data.ParameterDirection.InputOutput ); command.Parameters.Add( param ); if ( activityInstanceId > 0 ) { param.Value = activityInstanceId; } else { // // Keep the network traffic down - only include the fields needed // to insert an ActivityInstance record if we don't have the activityInstanceId command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@QualifiedName", record.QualifiedName ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", record.ContextGuid ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ParentContextGuid", record.ParentContextGuid ) ); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", record.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataKey", record.UserDataKey)); if ( null != record.UserData ) { Type t = record.UserData.GetType(); Byte[] data = null; bool nonSerializable = false; string userDataString = null; if (!(record.UserData is SerializedDataItem)) SerializeDataItem(record.UserData, out data, out nonSerializable); SerializedDataItem sItem = record.UserData as SerializedDataItem; data = sItem.SerializedData; nonSerializable = sItem.NonSerializable; userDataString = sItem.StringData; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataTypeFullName", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataAssemblyFullName", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Str", userDataString)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Blob", data)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataNonSerializable", nonSerializable)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserEventId", DbType.Int64, ParameterDirection.Output)); } #endregion #region Sql Commands - InsertTrackingDataItem private void BatchExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList > items, DbCommand command) { if (null == items || items.Count <= 0) return; // // If the list is smaller than the batch size just push the whole thing if (items.Count <= _dataItemBatchSize) { ExecuteInsertTrackingDataItems(internalId, eventTypeId, items, command); return; } // // Need to split the list into max batch size chunks List > batch = new List >(_dataItemBatchSize); foreach (KeyValuePair kvp in items) { batch.Add(kvp); if (batch.Count == _dataItemBatchSize) { ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command); batch.Clear(); } } // // Send anything that hasn't been sent if (batch.Count > 0) ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command); } private void ExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList > items, DbCommand command) { Debug.Assert( internalId != -1, "Invalid internalId" ); if ( null == items || items.Count <= 0 ) return; if (items.Count > _dataItemAnnotationBatchSize) throw new ArgumentOutOfRangeException("items"); if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentException(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertTrackingDataItemMultiple]"; command.Parameters.Clear(); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId)); int i = 1; // base 1 to match parameter names foreach(KeyValuePair kvp in items) { string index = (i++).ToString(CultureInfo.InvariantCulture); SerializedDataItem sItem = kvp.Value as SerializedDataItem; if (null == sItem) sItem = SerializeDataItem(kvp.Value); Type t = sItem.Type; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId"+index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@FieldName"+index, sItem.FieldName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName" + index, ((null == t) ? null : t.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName" + index, ((null == t) ? null : t.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Str" + index, sItem.StringData)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Blob" + index, sItem.SerializedData)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@DataNonSerializable" + index, sItem.NonSerializable)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, DbType.Int64, System.Data.ParameterDirection.Output)); } command.ExecuteNonQuery(); // // Get all the out parameters holding the data item record ids // This keeps us from repeatedly going into the parameters collection // below if a data item has more than one annotation List ids = new List (_dataItemAnnotationBatchSize); for( i = 0; i < items.Count; i++ ) { string index = (i+1).ToString(CultureInfo.InvariantCulture); ids.Insert(i, (long)command.Parameters["@TrackingDataItemId" + index].Value); } // // Go through all the data items and send all the annotations in batches List > annotations = new List >(_dataItemAnnotationBatchSize); i = 0; foreach(KeyValuePair kvp in items) { TrackingDataItem item = kvp.Value; long dataItemId = ids[i++]; foreach (string s in item.Annotations) { annotations.Add(new KeyValuePair (dataItemId, s)); if (annotations.Count == _dataItemAnnotationBatchSize) { ExecuteInsertAnnotation(internalId, annotations, command); annotations.Clear(); } } } // // If we have anything left send them. if ( annotations.Count > 0 ) ExecuteInsertAnnotation(internalId, annotations, command); } private void ExecuteInsertAnnotation(long internalId, IList > annotations, DbCommand command) { if ( null == annotations || annotations.Count <= 0 ) return; if (annotations.Count > _dataItemAnnotationBatchSize) throw new ArgumentOutOfRangeException("annotations"); if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertTrackingDataItemAnnotationMultiple]"; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId)); int i = 1; // base 1 to match parameter names foreach( KeyValuePair kvp in annotations) { string index = (i++).ToString(CultureInfo.InvariantCulture); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value )); } command.ExecuteNonQuery(); return; } private void BatchExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList > annotations, DbCommand command) { if (null == annotations || annotations.Count <= 0) return; // // If the list is smaller than the max batch size just send it directly if (annotations.Count <= _eventAnnotationBatchSize) { ExecuteInsertEventAnnotation(internalId, eventTypeId, annotations, command); return; } // // Need to split the list into max batch size chunks List > batch = new List >(_eventAnnotationBatchSize); foreach (KeyValuePair kvp in annotations) { batch.Add(kvp); if (batch.Count == _eventAnnotationBatchSize) { ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command); batch.Clear(); } } // // Send anything that hasn't been sent if (batch.Count > 0) ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command); } private void ExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList > annotations, DbCommand command) { Debug.Assert(internalId != -1, "Invalid internalId"); if (null == annotations || annotations.Count <= 0) return; if (annotations.Count > _eventAnnotationBatchSize) throw new ArgumentOutOfRangeException("annotations"); if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentNullException( "command" ); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertEventAnnotationMultiple]"; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId ) ); int i = 1; //base 1 to match parameter names foreach(KeyValuePair kvp in annotations) { string index = (i++).ToString(CultureInfo.InvariantCulture); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" +index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation"+index, kvp.Value)); } command.ExecuteNonQuery(); return; } #endregion #region Workflow Change private void ExecuteInsertWorkflowChange( object param ) { WorkflowTrackingRecord record = param as WorkflowTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param"); DbCommand command = DbResourceAllocator.NewCommand(); try { if ( ConnectionState.Open != command.Connection.State ) command.Connection.Open(); command.Transaction = command.Connection.BeginTransaction(); ExecuteInsertWorkflowChange(_internalId, record, command); command.Transaction.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ((null != command) && (null != command.Transaction)) command.Transaction.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } return; } private void ExecuteInsertWorkflowChange(long internalId, WorkflowTrackingRecord record, DbCommand command) { if (null == record) throw new ArgumentNullException("record"); if (null == record.EventArgs) throw new InvalidOperationException(ExecutionStringManager.InvalidWorkflowChangeArgs); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); // // If we haven't already serialized do so now. // This is work we have to do to write to store in the db anyway. if (!(record.EventArgs is SerializedWorkflowChangedEventArgs)) record = SerializeRecord(record); // // Insert the workflow instance event BuildInsertWorkflowInstanceEventParameters(internalId, record, null, command); command.ExecuteNonQuery(); // // Get the event id for added/removed activities and annotations long eventId = (long)command.Parameters["@WorkflowInstanceEventId1"].Value; SerializedWorkflowChangedEventArgs sargs = (SerializedWorkflowChangedEventArgs)record.EventArgs; // // Normalize the activities that have been added/removed if we're tracking definitions if ((null != sargs.AddedActivities) && (sargs.AddedActivities.Count > 0)) { foreach (AddedActivity added in sargs.AddedActivities) ExecuteInsertAddedActivity(internalId, added.QualifiedName, added.ParentQualifiedName, added.ActivityTypeFullName, added.ActivityTypeAssemblyFullName, added.AddedActivityActionXoml, eventId, added.Order, command); } if ((null != sargs.RemovedActivities) && (sargs.RemovedActivities.Count > 0)) { foreach (RemovedActivity removed in sargs.RemovedActivities) ExecuteInsertRemovedActivity(internalId, removed.QualifiedName, removed.ParentQualifiedName, removed.RemovedActivityActionXoml, eventId, removed.Order, command); } List > annotations = new List >(record.Annotations.Count); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair (eventId, s)); BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command); } private void ExecuteInsertAddedActivity(long internalId, string qualifiedName, string parentQualifiedName, string typeFullName, string assemblyFullName, string addedActivityActionXoml, long eventId, int order, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertAddedActivity]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", typeFullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", assemblyFullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AddedActivityAction", addedActivityActionXoml)); if (-1 == order) command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value)); else command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order)); command.ExecuteNonQuery(); } private void ExecuteInsertRemovedActivity(long internalId, string qualifiedName, string parentQualifiedName, string removedActivityActionXoml, long eventId, int order, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertRemovedActivity]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@RemovedActivityAction", removedActivityActionXoml)); if (-1 == order) command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value)); else command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order)); command.ExecuteNonQuery(); } #endregion #region Utility private bool TryGetActivityInstanceId(string key, out long id) { // // Check the cache of committed ids if (_activityInstanceId.TryGetValue(key, out id)) return true; // // If we're batched check the cache of temp ids generated during this batch commit if (_isTrans) return _tmpActivityInstanceId.TryGetValue(key, out id); else return false; // not batched so we didn't find the id } private void SetActivityInstanceId(string key, long id) { // // If we're batched put the ids in the temp member // If the commit is successful we'll move these to the real member // in IPendingWork.Complete if (_isTrans) { if (!_tmpActivityInstanceId.ContainsKey(key)) _tmpActivityInstanceId.Add(key, id); } else { if (!_activityInstanceId.ContainsKey(key)) _activityInstanceId.Add(key, id); } } private void RemoveActivityInstanceId(string key) { // // Remove from both the temp and real caches if (_isTrans) { if (_tmpActivityInstanceId.ContainsKey(key)) _tmpActivityInstanceId.Remove(key); } if (_activityInstanceId.ContainsKey(key)) _activityInstanceId.Remove(key); } private string GetSqlDateTimeString(DateTime dateTime) { return dateTime.Year.ToString(System.Globalization.CultureInfo.InvariantCulture) + PadToDblDigit(dateTime.Month) + PadToDblDigit(dateTime.Day) + " " + dateTime.Hour.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Minute.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Second.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Millisecond.ToString(System.Globalization.CultureInfo.InvariantCulture); } private string PadToDblDigit(int num) { string s = num.ToString(System.Globalization.CultureInfo.InvariantCulture); if (s.Length == 1) return "0" + s; else return s; } /// /// Build a string to uniquely identify each activity that should be recorded as a seperate instance. /// A separate instance is defined by the combination of QualifiedName, Context and ParentContext /// /// ///private string BuildQualifiedNameVarName( string qId, Guid context, Guid parentContext ) { Guid hashed = HashHelper.HashServiceType(qId); return hashed.ToString().Replace('-', '_') + "_" + context.ToString().Replace('-', '_') + "_" + parentContext.ToString().Replace('-', '_'); } private ActivityTrackingRecord SerializeRecord( ActivityTrackingRecord record ) { if ( ( null == record.Body ) || ( 0 == record.Body.Count ) ) return record; for ( int i = 0; i < record.Body.Count; i++ ) record.Body[i] = SerializeDataItem( record.Body[i] ); return record; } private UserTrackingRecord SerializeRecord( UserTrackingRecord record ) { if ( ( ( null == record.Body ) || ( 0 == record.Body.Count ) ) && ( null == record.EventArgs ) && ( null == record.UserData ) ) return record; if ( null != record.UserData ) { SerializedDataItem item = new SerializedDataItem(); byte[] data = null; bool nonSerializable; SerializeDataItem( record.UserData, out data, out nonSerializable ); item.Type = record.UserData.GetType(); item.StringData = record.UserData.ToString(); item.SerializedData = data; item.NonSerializable = nonSerializable; record.UserData = item; } for ( int i = 0; i < record.Body.Count; i++ ) record.Body[i] = SerializeDataItem( record.Body[i] ); return record; } private WorkflowTrackingRecord SerializeRecord( WorkflowTrackingRecord record ) { if ( null == record.EventArgs ) return record; SerializedEventArgs args; if ( TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent ) { // // Convert the WorkflowChanged items SerializedWorkflowChangedEventArgs sargs = new SerializedWorkflowChangedEventArgs(); TrackingWorkflowChangedEventArgs wargs = ( TrackingWorkflowChangedEventArgs ) record.EventArgs; if ( null != wargs ) { for ( int i = 0; i activities) { Activity removed = removedAction.OriginalRemovedActivity; RemovedActivity removedActivity = new RemovedActivity(); removedActivity.Order = order; removedActivity.QualifiedName = removed.QualifiedName; if (null != removed.Parent) removedActivity.ParentQualifiedName = removed.Parent.QualifiedName; // // Save the defintion of this change removedActivity.RemovedActivityActionXoml = GetXomlDocument(removedAction); activities.Add(removedActivity); // // Recursively add all contained activities to the removed list if (removed is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)removed).Activities) { AddRemovedActivity(activity, activities); } } } private void AddRemovedActivity(Activity removed, IList activities) { RemovedActivity removedActivity = new RemovedActivity(); removedActivity.Order = -1; removedActivity.QualifiedName = removed.QualifiedName; if ( null != removed.Parent ) removedActivity.ParentQualifiedName = removed.Parent.QualifiedName; activities.Add(removedActivity); // // Recursively add all contained activities to the removed list if (removed is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)removed).Activities) { AddRemovedActivity(activity, activities); } } } private void AddAddedActivity(AddedActivityAction addedAction, int order, IList activities) { Activity added = addedAction.AddedActivity; AddedActivity addedActivity = new AddedActivity(); addedActivity.Order = order; Type type = added.GetType(); addedActivity.ActivityTypeFullName = type.FullName; addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName; addedActivity.QualifiedName = added.QualifiedName; if (null != added.Parent) addedActivity.ParentQualifiedName = added.Parent.QualifiedName; addedActivity.AddedActivityActionXoml = GetXomlDocument(addedAction); activities.Add(addedActivity); // // Recursively add all contained activities to the added list if (added is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)added).Activities) { AddAddedActivity(activity, activities); } } } private void AddAddedActivity(Activity added, IList activities) { AddedActivity addedActivity = new AddedActivity(); addedActivity.Order = -1; Type type = added.GetType(); addedActivity.ActivityTypeFullName = type.FullName; addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName; addedActivity.QualifiedName = added.QualifiedName; if ( null != added.Parent ) addedActivity.ParentQualifiedName = added.Parent.QualifiedName; activities.Add(addedActivity); // // Recursively add all contained activities to the added list if (added is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)added).Activities) { AddAddedActivity(activity, activities); } } } private SerializedDataItem SerializeDataItem( TrackingDataItem item ) { if ( null == item ) return null; SerializedDataItem s = new SerializedDataItem(); s.Data = item.Data; s.Annotations.AddRange( item.Annotations ); s.FieldName = item.FieldName; if ( null != item.Data ) { byte[] state = null; bool nonSerializable; SerializeDataItem( item.Data, out state, out nonSerializable ); s.SerializedData = state; s.StringData = item.Data.ToString(); s.Type = item.Data.GetType(); s.NonSerializable = nonSerializable; } return s; } /// /// Binary serialize an object. Used to persist trackingDataItems. /// /// /// private void SerializeDataItem( object data, out byte[] state, out bool nonSerializable ) { nonSerializable = false; state = null; if ( null == data ) return; MemoryStream stream = new MemoryStream(1024); BinaryFormatter bf = new BinaryFormatter(); try { bf.Serialize(stream, data); state = new byte[stream.Length]; stream.Position = 0; if (stream.Length > Int32.MaxValue) return; else { int read = 0, totalRead = 0, cbToRead = 0; do { totalRead += read; cbToRead = (int)stream.Length - totalRead; read = stream.Read(state, totalRead, cbToRead); } while (read > 0); } } catch (SerializationException) { nonSerializable = true; return; } finally { stream.Close(); } } ////// Make string sql safe /// /// ///private string SqlEscape( string val ) { if ( null == val ) return null; return val.Replace( "'", "''" ); } /* static char[] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; /// /// Convert a byte array to a string of hex chars for sql image type /// /// ///private static string ToHexString( byte[] bytes ) { if ( null == bytes ) return null; if ( 0 == bytes.Length ) return null; char[] chars = new char[bytes.Length * 2]; for ( int i = 0; i < bytes.Length; i++ ) { int b = bytes[i]; chars[i * 2] = hexDigits[b >> 4]; chars[i * 2 + 1] = hexDigits[b & 0xF]; } return "0x" + new string( chars ); } */ private void GetCallPathKeys( IList callPath ) { if ( ( null == callPath ) || ( callPath.Count <= 0 ) ) return; for ( int i = 0; i < callPath.Count; i++ ) { _callPathKey = _callPathKey + "." + callPath[i]; if ( i < callPath.Count - 1 ) _parentCallPathKey = _parentCallPathKey + "." + callPath[i]; } if ( null != _callPathKey ) _callPathKey = SqlEscape( _callPathKey.Substring( 1 ) ); if ( null != _parentCallPathKey ) _parentCallPathKey = SqlEscape( _parentCallPathKey.Substring( 1 ) ); } private string GetActivitiesXml(CompositeActivity root) { if (null == root) return null; StringBuilder sb = new StringBuilder(); XmlWriter writer = XmlWriter.Create(sb); try { writer.WriteStartDocument(); writer.WriteStartElement("Activities"); WriteActivity(root, writer); writer.WriteEndElement(); writer.WriteEndDocument(); } finally { writer.Flush(); writer.Close(); } return sb.ToString(); } private void WriteActivity(Activity activity, XmlWriter writer) { if (null == activity) return; if (null == writer) throw new ArgumentNullException("writer"); Type t = activity.GetType(); writer.WriteStartElement("Activity"); writer.WriteElementString("TypeFullName", t.FullName); writer.WriteElementString("AssemblyFullName", t.Assembly.FullName); writer.WriteElementString("QualifiedName", activity.QualifiedName); // // Don't write the element if the value is null, sql will see a missing element as a null value if (null != activity.Parent) writer.WriteElementString("ParentQualifiedName", activity.Parent.QualifiedName); writer.WriteEndElement(); if (activity is CompositeActivity) foreach (Activity a in GetAllEnabledActivities((CompositeActivity)activity)) WriteActivity(a, writer); } // This function returns all the executable activities including secondary flow activities. private IList GetAllEnabledActivities(CompositeActivity compositeActivity) { if (compositeActivity == null) throw new ArgumentNullException("compositeActivity"); List allActivities = new List (compositeActivity.EnabledActivities); foreach (Activity secondaryFlowActivity in ((ISupportAlternateFlow)compositeActivity).AlternateFlowActivities) { if (!allActivities.Contains(secondaryFlowActivity)) allActivities.Add(secondaryFlowActivity); } return allActivities; } internal string GetXomlDocument(object obj) { string xomlText = null; using (StringWriter stringWriter = new StringWriter(System.Globalization.CultureInfo.InvariantCulture)) { using (XmlWriter xmlWriter = CreateXmlWriter(stringWriter)) { WorkflowMarkupSerializer serializer = new WorkflowMarkupSerializer(); serializer.Serialize(xmlWriter, obj); xomlText = stringWriter.ToString(); ; } } return xomlText; } #endregion } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved. using System; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Collections.Specialized; using System.Text; using System.IO; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.Data.Common; using System.Data; using System.Data.SqlClient; using System.Timers; using System.Diagnostics; using System.Reflection; using System.Workflow.Runtime; using System.Workflow.ComponentModel; using System.Workflow.Runtime.Hosting; using System.Text.RegularExpressions; using System.Threading; using System.Transactions; using System.Globalization; using System.Workflow.ComponentModel.Serialization; using System.ComponentModel.Design.Serialization; using System.Xml; using System.Configuration; namespace System.Workflow.Runtime.Tracking { public sealed class SqlTrackingService : TrackingService, IProfileNotification { #region Private/Protected Members private bool _isTrans = true; private bool _partition = false; private bool _defaultProfile = true; private bool _enableRetries = false; private bool _ignoreCommonEnableRetries = false; private DateTime _lastProfileCheck; private System.Timers.Timer _timer = new System.Timers.Timer(); private double _interval = 60000; //private static int _deadlock = 1205; private TypeKeyedCollection _types = new TypeKeyedCollection(); private object _typeCacheLock = new object(); private WorkflowCommitWorkBatchService _transactionService; private DbResourceAllocator _dbResourceAllocator; private static Version UnknownProfileVersionId = new Version( 0, 0 ); // Saved from constructor input to be used in service start initialization private NameValueCollection _parameters; string _unvalidatedConnectionString; private delegate void ExecuteRetriedDelegate(object param); #endregion #region Configuration Properties public string ConnectionString { get { return _unvalidatedConnectionString; } } /// /// Determines if tracking data should be held and transactionally written to the database at persistence points. /// ///public bool IsTransactional { get { return _isTrans; } set { _isTrans = value; } } /// /// Indicates that records should be moved from the active instance tables to the appropriate parition tables when the instance completes. /// public bool PartitionOnCompletion { get { return _partition; } set { _partition = value; } } ////// Determines if the default profile should be used for workflow types that do not have a profile specified for them. /// ///public bool UseDefaultProfile { get { return _defaultProfile; } set { _defaultProfile = value; } } /// /// The time interval, in milliseconds, at which to check the database for changes to profiles. /// Default is 60000. /// ////// Setting the interval results in the next check to occur the specified number of millisecond /// from the time at which the property is set. /// public double ProfileChangeCheckInterval { get { return _interval; } set { if ( value <= 0 ) throw new ArgumentException( ExecutionStringManager.InvalidProfileCheckValue ); _interval = value; // // Set the timer's interval. // This will reset the timer _timer.Interval = _interval; } } public bool EnableRetries { get { return _enableRetries; } set { _enableRetries = value; _ignoreCommonEnableRetries = true; } } internal DbResourceAllocator DbResourceAllocator { get { return this._dbResourceAllocator; } } #endregion #region Construction public SqlTrackingService(string connectionString) { if (String.IsNullOrEmpty(connectionString)) throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString); _unvalidatedConnectionString = connectionString; } public SqlTrackingService(NameValueCollection parameters) { if (parameters == null) throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters); if (parameters.Count > 0) { foreach (string key in parameters.Keys) { if (0 == string.Compare("IsTransactional", key, StringComparison.OrdinalIgnoreCase)) _isTrans = bool.Parse(parameters[key]); else if (0 == string.Compare("UseDefaultProfile", key, StringComparison.OrdinalIgnoreCase)) _defaultProfile = bool.Parse(parameters[key]); else if (0 == string.Compare("PartitionOnCompletion", key, StringComparison.OrdinalIgnoreCase)) _partition = bool.Parse(parameters[key]); else if (0 == string.Compare("ProfileChangeCheckInterval", key, StringComparison.OrdinalIgnoreCase)) { _interval = double.Parse(parameters[key], NumberFormatInfo.InvariantInfo); if (_interval <= 0) throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue); } else if (0 == string.Compare("ConnectionString", key, StringComparison.OrdinalIgnoreCase)) _unvalidatedConnectionString = parameters[key]; else if (0 == string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase)) { _enableRetries = bool.Parse(parameters[key]); _ignoreCommonEnableRetries = true; } } } _parameters = parameters; } #endregion #region WorkflowRuntimeService override protected internal void Start() { _lastProfileCheck = DateTime.UtcNow; _dbResourceAllocator = new DbResourceAllocator(this.Runtime, _parameters, _unvalidatedConnectionString); // Check connection string mismatch if using SharedConnectionWorkflowTransactionService _transactionService = this.Runtime.GetService(); _dbResourceAllocator.DetectSharedConnectionConflict(_transactionService); // // If we didn't find a local value for enable retries // check in the common section if ((!_ignoreCommonEnableRetries)&&(null != base.Runtime)) { NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters; if (commonConfigurationParameters != null) { // Then scan for connection string in the common configuration parameters section foreach (string key in commonConfigurationParameters.AllKeys) { if (string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase) == 0) { _enableRetries = bool.Parse(commonConfigurationParameters[key].Value); break; } } } } _timer.Interval = _interval; _timer.AutoReset = false; // ensure that only one timer thread is checking for profile changes at a time _timer.Elapsed += new ElapsedEventHandler( CheckProfileChanges ); _timer.Start(); base.Start(); } #endregion WorkflowRuntimeService #region IProfileNotification Implementation protected internal override TrackingChannel GetTrackingChannel( TrackingParameters parameters ) { if ( null == parameters ) throw new ArgumentNullException( "parameters" ); // // Return a new channel for this instance // Give it the parameters and this to store return new SqlTrackingChannel( parameters, this ); } public event EventHandler ProfileUpdated; public event EventHandler ProfileRemoved; protected internal override TrackingProfile GetProfile(Type workflowType, Version profileVersion) { if ( null == workflowType ) throw new ArgumentNullException( "workflowType" ); // parameter wantToCreateDefault = false: // looking for a specific version that has already been running with this instance; don't use a default here return GetProfileByScheduleType(workflowType, profileVersion, false); } protected internal override bool TryGetProfile(Type workflowType, out TrackingProfile profile) { if ( null == workflowType ) throw new ArgumentNullException( "workflowType" ); profile = GetProfileByScheduleType(workflowType, SqlTrackingService.UnknownProfileVersionId, _defaultProfile); if ( null == profile ) return false; else return true; } protected internal override TrackingProfile GetProfile(Guid scheduleInstanceId) { TrackingProfile profile = null; GetProfile( scheduleInstanceId, out profile ); return profile; } private bool GetProfile( Guid scheduleInstanceId, out TrackingProfile profile ) { profile = null; DbCommand cmd = this._dbResourceAllocator.NewCommand(); cmd.CommandType = CommandType.StoredProcedure; cmd.CommandText = "[dbo].[GetInstanceTrackingProfile]"; cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@InstanceId", scheduleInstanceId ) ); DbDataReader reader = null; try { reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection ); // // Should only reach here in non exception state if ( !reader.HasRows ) { // // Didn't find a specific profile for this instance reader.Close(); profile = null; return false; } else { if ( !reader.Read() ) { reader.Close(); profile = null; return false; } if ( reader.IsDBNull( 0 ) ) profile = null; else { string tmp = reader.GetString( 0 ); TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader( tmp ); profile = serializer.Deserialize( pReader ); } finally { if ( null != pReader ) pReader.Close(); } } return true; } } finally { if ( ( null != reader ) && ( !reader.IsClosed ) ) reader.Close(); if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); } } protected internal override bool TryReloadProfile(Type workflowType, Guid scheduleInstanceId, out TrackingProfile profile) { if ( null == workflowType ) throw new ArgumentNullException( "workflowType" ); bool found = GetProfile( scheduleInstanceId, out profile ); if ( found ) return true; else { profile = null; return false; } } #endregion #region Profile Management Methods private void CheckProfileChanges( object sender, ElapsedEventArgs e ) { DbCommand cmd = null; DbDataReader reader = null; try { if ( ( null == ProfileUpdated ) && ( null == ProfileRemoved ) ) return; // no one to notify Debug.WriteLine( "Checking for updated profiles..." ); cmd = this._dbResourceAllocator.NewCommand(); cmd.CommandText = "GetUpdatedTrackingProfiles"; cmd.CommandType = CommandType.StoredProcedure; cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@LastCheckDateTime", _lastProfileCheck ) ); DbParameter param = this._dbResourceAllocator.NewDbParameter(); param.ParameterName = "@MaxCheckDateTime"; param.DbType = DbType.DateTime; param.Direction = System.Data.ParameterDirection.Output; cmd.Parameters.Add( param ); reader = ExecuteReaderRetried( cmd, CommandBehavior.CloseConnection ); // // No changes if ( !reader.HasRows ) return; while ( reader.Read() ) { Type t = null; string tmp = null; TrackingProfile profile = null; t = Assembly.Load( reader[1] as string ).GetType( reader[0] as string ); if ( null == t ) continue; tmp = reader[2] as string; if ( null == tmp ) { if ( null != ProfileRemoved ) ProfileRemoved( this, new ProfileRemovedEventArgs( t ) ); } else { TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader( tmp ); profile = serializer.Deserialize( pReader ); } finally { if ( null != pReader ) pReader.Close(); } if ( null != ProfileUpdated ) ProfileUpdated( this, new ProfileUpdatedEventArgs( t, profile ) ); } Debug.WriteLine( ExecutionStringManager.UpdatedProfile + t.FullName ); } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); // // This should never be null/empty unless the proc failed which should throw if ( null != cmd ) { // // If the value is null we error'd so keep the same last time for the next check if ( null != cmd.Parameters[1].Value ) _lastProfileCheck = ( DateTime ) cmd.Parameters[1].Value; } if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); // // Start the timer again (autoreset is false to avoid multiple threads checking for profile changes) _timer.Start(); } } #endregion #region Private Methods private void ExecuteRetried(ExecuteRetriedDelegate executeRetried, object param) { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); executeRetried(param); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteRetried caught exception: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " retrying."); continue; } throw; } } } private DbDataReader ExecuteReaderRetried(DbCommand command, CommandBehavior behavior) { DbDataReader reader = null; short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); reader = command.ExecuteReader(behavior); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteReaderRetried caught exception from ExecuteReader: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried retrying."); continue; } throw; } } return reader; } private void ExecuteNonQueryRetried(DbCommand command) { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G",System.Globalization.CultureInfo.InvariantCulture)); command.ExecuteNonQuery(); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryRetried caught exception from ExecuteNonQuery: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried retrying."); continue; } throw; } } } private void ExecuteNonQueryWithTxRetried(DbCommand command) { try { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); command.Transaction = command.Connection.BeginTransaction(); command.ExecuteNonQuery(); command.Transaction.Commit(); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch(Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried caught exception from ExecuteNonQuery: " + e.ToString()); try { if (null != command.Transaction) command.Transaction.Rollback(); } catch { // // Rollback() can throw, nothing to do but ---- if this happens // so that we don't lose the original exception } if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried retrying."); continue; } throw; } } } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ResetConnectionForCommand(DbCommand command) { if ( null == command ) return; if ( null != command.Connection ) { if (ConnectionState.Open != command.Connection.State) { if (ConnectionState.Closed != command.Connection.State) command.Connection.Close(); command.Connection.Dispose(); command.Connection = _dbResourceAllocator.OpenNewConnectionNoEnlist(); } } } internal static XmlWriter CreateXmlWriter(TextWriter output) { XmlWriterSettings settings = new XmlWriterSettings(); settings.Indent = true; settings.IndentChars = ("\t"); settings.OmitXmlDeclaration = true; settings.CloseOutput = true; return XmlWriter.Create(output as TextWriter, settings); } private TrackingProfile GetProfileByScheduleType(Type workflowType, Version profileVersionId, bool wantToCreateDefault) { DbCommand cmd = this._dbResourceAllocator.NewCommand(); DbDataReader reader = null; TrackingProfile profile = null; cmd.CommandType = CommandType.StoredProcedure; cmd.CommandText = "dbo.GetTrackingProfile"; cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@TypeFullName", workflowType.FullName) ); cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@AssemblyFullName", workflowType.Assembly.FullName) ); if (profileVersionId != SqlTrackingService.UnknownProfileVersionId) cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter( "@Version", profileVersionId.ToString() ) ); cmd.Parameters.Add( this._dbResourceAllocator.NewDbParameter( "@CreateDefault", wantToCreateDefault) ); try { reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection); if ( reader.Read() ) { string tmp = reader[0] as string; if ( null != tmp ) { TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader( tmp ); profile = serializer.Deserialize( pReader ); } finally { if ( null != pReader ) pReader.Close(); } } } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); } return profile; } #endregion #region Private Classes private class TypeKeyedCollection : KeyedCollection { protected override string GetKeyForItem( Type item ) { return item.AssemblyQualifiedName; } } private class SerializedDataItem : TrackingDataItem { public Type Type; public string StringData; public byte[] SerializedData; public bool NonSerializable; } private class SerializedEventArgs : EventArgs { public Type Type; public byte[] SerializedArgs; } private struct AddedActivity { public string ActivityTypeFullName; public string ActivityTypeAssemblyFullName; public string QualifiedName; public string ParentQualifiedName; public string AddedActivityActionXoml; public int Order; } private struct RemovedActivity { public string QualifiedName; public string ParentQualifiedName; public string RemovedActivityActionXoml; public int Order; } private class SerializedWorkflowChangedEventArgs : SerializedEventArgs { public IList AddedActivities = new List (); public IList RemovedActivities = new List (); } #endregion Private Classes internal class SqlTrackingChannel : TrackingChannel, IPendingWork { #region Private Members private SqlTrackingService _service = null; private string _callPathKey = null, _parentCallPathKey = null; private bool _isTrans = false; private long _internalId = -1; private long _tmpInternalId = -1; private Dictionary _activityInstanceId = new Dictionary (32); private Dictionary _tmpActivityInstanceId = new Dictionary (10); private TrackingParameters _parameters = null; private bool _pendingArchive = false; private bool _completedTerminated = false; private static int _activityEventBatchSize = 5; private static int _dataItemBatchSize = 5; private static int _dataItemAnnotationBatchSize = 5; private static int _eventAnnotationBatchSize = 5; #endregion #region Construction protected SqlTrackingChannel() { } public SqlTrackingChannel( TrackingParameters parameters, SqlTrackingService service ) { if ( null == service ) return; _service = service; _parameters = parameters; _isTrans = service.IsTransactional; GetCallPathKeys( parameters.CallPath ); if (!_isTrans) { // // Look up instance id or insert if new instance // If we're transactional we'll do this in the first IPendingWork.Commit() _service.ExecuteRetried(ExecuteInsertWorkflowInstance, null); } } #endregion #region Public Properties private DbResourceAllocator DbResourceAllocator { get { return _service.DbResourceAllocator; } } private WorkflowCommitWorkBatchService WorkflowCommitWorkBatchService { get { return _service._transactionService; } } #endregion #region TrackingChannel protected internal override void InstanceCompletedOrTerminated() { if (_isTrans) { // // Indicate that at the next batch commit we should stamp the enddate _completedTerminated = true; // // Indicate that when the next batch commit completes successfully we should partition this instance if (_service.PartitionOnCompletion) _pendingArchive = true; } else { _service.ExecuteRetried(ExecuteSetEndDate, null); if (_service.PartitionOnCompletion) _service.ExecuteRetried(PartitionInstance, null); } } private void PartitionInstance(object param) { DbCommand command = null; try { // // Allow enlisting if there is an ambient tx // This can only happen on a host initiated terminate in V1. DbConnection connection = DbResourceAllocator.OpenNewConnection(false); command = DbResourceAllocator.NewCommand(connection); command.CommandText = "[dbo].[PartitionWorkflowInstance]"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", _internalId)); command.ExecuteNonQuery(); } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ExecuteSetEndDate(object param) { DbCommand command = null; try { // // Allow enlisting if there is an ambient tx // This can only happen on a host initiated terminate in V1. DbConnection connection = DbResourceAllocator.OpenNewConnection(false); command = DbResourceAllocator.NewCommand(connection); ExecuteSetEndDate(_internalId, command); } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ExecuteSetEndDate(long internalId, DbCommand command) { if (null == command) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandText = "[dbo].[SetWorkflowInstanceEndDateTime]"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EndDateTime", DateTime.UtcNow)); command.ExecuteNonQuery(); } protected internal override void Send(TrackingRecord record) { if ( ( Guid.Empty == _parameters.InstanceId ) || ( null == record ) ) throw new ArgumentException( ExecutionStringManager.MissingParametersTrack ); if ( record is ActivityTrackingRecord ) { ActivityTrackingRecord act = record as ActivityTrackingRecord; if ( _isTrans ) WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( act ) ); else _service.ExecuteRetried(ExecuteInsertActivityStatusInstance, SerializeRecord(act)); } else if ( record is WorkflowTrackingRecord ) { // // Instance events cannot be batched - many occur when there isn't a batch WorkflowTrackingRecord inst = ( WorkflowTrackingRecord ) record; if ( _isTrans ) { WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( inst ) ); } else { if ( TrackingWorkflowEvent.Changed == inst.TrackingWorkflowEvent ) { // // Dynamic updates are inserted in the WorkflowInstanceEvent table // and then the arg (workflowchanges) is normalized into xoml // and the added/removed activities tables _service.ExecuteRetried(ExecuteInsertWorkflowChange, SerializeRecord(inst)); } else { _service.ExecuteRetried(ExecuteInsertWorkflowInstanceEvent, SerializeRecord(inst)); } } } else if ( record is UserTrackingRecord ) { UserTrackingRecord user = ( UserTrackingRecord ) record; if ( _isTrans ) WorkflowEnvironment.WorkBatch.Add( this, SerializeRecord( user ) ); else _service.ExecuteRetried(ExecuteInsertUserEvent, SerializeRecord(user)); } } #endregion #region IPendingWork Members public bool MustCommit( ICollection items ) { // // Never force a persist - this is a balancing act but the V1 // decision is to err on the side of persisting only when the workflow // requires it based on its model. If the workflow uses persistence points // wisely this is great. If it goes a long time between persists with lots // of events the persists will take a long time as the batch can be huge. return false; } public void Commit(System.Transactions.Transaction transaction, ICollection items) { if ((null == items) || (0 == items.Count)) return; DbCommand command = null; DbConnection connection = null; bool needToCloseConnection = false; DbTransaction localTransaction = null; bool commitTx = false; try { // // Get the connection and transaction // The connection might be shared or local // The tx is shared and may be either a DTC or a local sql tx connection = DbResourceAllocator.GetEnlistedConnection( this.WorkflowCommitWorkBatchService, transaction, out needToCloseConnection); localTransaction = DbResourceAllocator.GetLocalTransaction( this.WorkflowCommitWorkBatchService, transaction); if (null == localTransaction) { localTransaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); commitTx = true; } command = DbResourceAllocator.NewCommand(connection); command.Transaction = localTransaction; // // If we don't have the internal id for the instance this is the first batch // for this channel instance. If this is a new instance the following will insert // a new instance record in the db and set _tmpInternalId. If this is a reload of // an existing instance it will just do a lookup and set _tmpInternalId // In Completed we will assign _tmpInternalId to _internalId if the batch is successful. long internalId = -1; if (_internalId <= 0) { ExecuteInsertWorkflowInstance(command); internalId = _tmpInternalId; } else internalId = _internalId; IList activities = new List (5); WorkflowTrackingRecord workflow = null; // // Build the batch statement foreach (object o in items) { if (!(o is TrackingRecord)) continue; if (o is ActivityTrackingRecord) { // // If we have a cached workflow tracking record send it if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ActivityTrackingRecord activity = (ActivityTrackingRecord)o; // // Add this event to the list and send to the db if we've hit our limit activities.Add(activity); if (_activityEventBatchSize == activities.Count) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities = new List (5); } } else if (o is UserTrackingRecord) { // // If we have cached activity or workflow tracking records send them if (activities.Count > 0) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities.Clear(); } if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ExecuteInsertUserEvent(internalId, (UserTrackingRecord)o, command); } else if (o is WorkflowTrackingRecord) { // // If we have cached activity tracking records send them if (activities.Count > 0) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities.Clear(); } WorkflowTrackingRecord record = (WorkflowTrackingRecord)o; if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent) { // // If we're already holding a workflow tracking record send both to the db // else cache it and wait for the next workflow tracking record if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ExecuteInsertWorkflowChange(internalId, record, command); } else { // // If we're already holding a workflow tracking record send both to the db // else cache it and wait for the next workflow tracking record if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, record, command); workflow = null; } else { workflow = record; } } } } // // If we ended up with any activities event send them. if (activities.Count > 0) ExecuteInsertActivityStatusInstance(internalId, activities, command); if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } if (_completedTerminated) ExecuteSetEndDate(internalId, command); if (commitTx) localTransaction.Commit(); } catch (DbException e) { if (commitTx) localTransaction.Rollback(); WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Error writing tracking data to database: " + e); throw; } finally { if (needToCloseConnection) { connection.Dispose(); } } return; } public void Complete(bool succeeded, ICollection items) { // // If we didn't succeed on commit reset all flags if (!succeeded) { _completedTerminated = false; _pendingArchive = false; _tmpInternalId = -1; _tmpActivityInstanceId.Clear(); return; } // // Commit succeeded - move the tmp internalId to the real internalId member if (-1 == _internalId && _tmpInternalId > 0) _internalId = _tmpInternalId; // // Move the tmp activity instance ids to the real activity instance id member if (null != _tmpActivityInstanceId && _tmpActivityInstanceId.Count > 0) { foreach (string key in _tmpActivityInstanceId.Keys) { if (!_activityInstanceId.ContainsKey(key)) _activityInstanceId.Add(key, _tmpActivityInstanceId[key]); } _tmpActivityInstanceId.Clear(); } if ( _pendingArchive ) { try { _service.ExecuteRetried(PartitionInstance, null); } catch ( Exception e ) { // // ---- exceptions here, do not fail the instance. // Partition logic can be re-run to clean up on failure WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, string.Format(System.Globalization.CultureInfo.InvariantCulture, "Error partitioning instance {0}: {1}", _parameters.InstanceId, e.ToString())); } } } #endregion #region Sql Commands - InsertWorkflowInstance private void ExecuteInsertWorkflowInstance( object param ) { DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbCommand command = DbResourceAllocator.NewCommand(conn); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted); command.Connection = conn; command.Transaction = tx; _internalId = ExecuteInsertWorkflowInstance( command ); tx.Commit(); } catch (Exception) { try { if ( null != tx ) tx.Rollback(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception } // // Re-throw original exception throw; } finally { if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) ) conn.Close(); } return; } private long ExecuteInsertWorkflowInstance( DbCommand command ) { if ( null == command ) throw new ArgumentNullException("command"); if ((null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(ExecutionStringManager.InvalidCommandBadConnection, "command"); // // Write the type and the workflow definition string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) InsertWorkflow( command, _parameters.InstanceId, null, _parameters.RootActivity); else InsertWorkflow( command, _parameters.InstanceId, _parameters.WorkflowType, _parameters.RootActivity); // // Write the instance record BuildInsertWorkflowInstanceParameters( command ); DbDataReader reader = null; try { reader = command.ExecuteReader(); if (reader.Read()) _tmpInternalId = reader.GetInt64(0); return _tmpInternalId; } finally { if (null != reader) reader.Close(); } } private void BuildInsertWorkflowInstanceParameters( DbCommand command ) { Debug.Assert((command != null), "Null command"); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflowInstance]"; command.Parameters.Clear(); bool xamlInst = false; string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) xamlInst = true; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceId", _parameters.InstanceId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TypeFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.FullName ) )); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@AssemblyFullName", ( xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.Assembly.FullName ) )); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", _parameters.ContextGuid ) ); if ( Guid.Empty != _parameters.CallerInstanceId ) { command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerInstanceId", _parameters.CallerInstanceId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallPath", _callPathKey ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerContextGuid", _parameters.CallerContextGuid ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@CallerParentContextGuid", _parameters.CallerParentContextGuid ) ); } command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime", this.GetSqlDateTimeString( DateTime.UtcNow ) ) ); } private void InsertWorkflow( DbCommand command, Guid workflowInstanceId, Type workflowType, Activity rootActivity) { string xoml = null; // // If we've already seen this type just return if (null != workflowType) { lock (_service._typeCacheLock) { if (_service._types.Contains(workflowType.AssemblyQualifiedName)) return; else xoml = GetXomlDocument(rootActivity); } } else { // Don't forget to deal with XOML-only workflows lock (_service._typeCacheLock) { xoml = GetXomlDocument(rootActivity); } } // // It is possible to ---- here but the pk specifies ignore duplicate key // This is better than taking a lock around all of the logic in this method. command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflow]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@IsInstanceType", (null == workflowType ? true : false))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowDefinition", xoml)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowId", DbType.Int32, System.Data.ParameterDirection.Output)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Exists", DbType.Boolean, System.Data.ParameterDirection.Output)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Activities", GetActivitiesXml((CompositeActivity)rootActivity))); command.ExecuteNonQuery(); // // Add this to the list of types we've already seen so we don't go // through the serialization overhead again and hit the db only to learn we've already stored it // Use a lock here to avoid ---- on _types dictionary if (null != workflowType) { lock (_service._typeCacheLock) { if (!_service._types.Contains(workflowType.AssemblyQualifiedName)) { _service._types.Add(workflowType); } } } return; } #endregion #region Sql Commands - InsertWorkflowInstanceEvent private void ExecuteInsertWorkflowInstanceEvent( object param ) { WorkflowTrackingRecord record = param as WorkflowTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbCommand command = DbResourceAllocator.NewCommand( conn ); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted ); command.Connection = conn; command.Transaction = tx; ExecuteInsertWorkflowInstanceEvent( _internalId, record, null, command ); tx.Commit(); } catch (Exception) { try { if ( null != tx ) tx.Rollback(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception } // // Re-throw original exception throw; } finally { if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) ) conn.Close(); } return; } private void ExecuteInsertWorkflowInstanceEvent( long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command ) { if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentException(); BuildInsertWorkflowInstanceEventParameters( internalId, record1, record2, command ); command.ExecuteNonQuery(); long eventId1 = (long) command.Parameters["@WorkflowInstanceEventId1"].Value; Debug.Assert(eventId1 > 0, "Invalid eventId1"); long eventId2 = -1; if (null != record2) { eventId2 = (long)command.Parameters["@WorkflowInstanceEventId2"].Value; Debug.Assert(eventId2 > 0, "Invalid eventId2"); } List > annotations = new List >(record1.Annotations.Count + (null == record2 ? 0 : record2.Annotations.Count)); foreach (string s in record1.Annotations) annotations.Add(new KeyValuePair (eventId1, s)); if (null != record2) { foreach (string s in record2.Annotations) annotations.Add(new KeyValuePair (eventId2, s)); } BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command); } private void BuildInsertWorkflowInstanceEventParameters(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command) { if ( null == record1 ) throw new ArgumentNullException( "record" ); if ( null == command ) throw new ArgumentNullException( "command" ); Debug.Assert( internalId != -1, "Invalid internalId" ); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflowInstanceEvent]"; command.Parameters.Clear(); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@TrackingWorkflowEventId1", ( int ) record1.TrackingWorkflowEvent ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventDateTime1", record1.EventDateTime ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder1", record1.EventOrder ) ); if ( null != record1.EventArgs ) { Type t = record1.EventArgs.GetType(); Byte[] data = null; if (!(record1.EventArgs is SerializedEventArgs)) record1 = SerializeRecord(record1); SerializedEventArgs sargs = record1.EventArgs as SerializedEventArgs; data = sargs.SerializedArgs; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgTypeFullName1", t.FullName ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArgAssemblyFullName1", t.Assembly.FullName ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventArg1", data ) ); } command.Parameters.Add( DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId1", DbType.Int64, ParameterDirection.Output)); if (null != record2) { command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId2", (int)record2.TrackingWorkflowEvent)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime2", record2.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder2", record2.EventOrder)); if (null != record2.EventArgs) { Type t = record2.EventArgs.GetType(); Byte[] data = null; if (!(record2.EventArgs is SerializedEventArgs)) record2 = SerializeRecord(record2); SerializedEventArgs sargs = record2.EventArgs as SerializedEventArgs; data = sargs.SerializedArgs; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName2", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName2", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg2", data)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId2", DbType.Int64, ParameterDirection.Output)); } } #endregion #region Sql Commands - InsertActivityStatusInstance private void ExecuteInsertActivityStatusInstance(object param) { ActivityTrackingRecord record = param as ActivityTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidActivityTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted ); DbCommand command = conn.CreateCommand(); command.Transaction = tx; IList activity = new List (1); activity.Add(record); ExecuteInsertActivityStatusInstance( _internalId, activity, command ); tx.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ( null != tx ) tx.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != conn) && (ConnectionState.Closed != conn.State)) conn.Close(); } return; } private void ExecuteInsertActivityStatusInstance(long internalId, IList activities, DbCommand command) { if (null == activities || activities.Count <= 0) return; if (activities.Count > _activityEventBatchSize) throw new ArgumentOutOfRangeException("activities"); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertActivityExecutionStatusEventMultiple]"; // // Add the common parameters command.Parameters.Clear(); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId)); // // If we have the workflow's internal id use it to avoid the look up in the db DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if (internalId > 0) param.Value = internalId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceContextGuid", _parameters.ContextGuid)); // // Hashed ids of QName, context and pcontext used as key for storing activity record ids // Save these for each record in the list so we don't have to recompute them below when adding to the cache string[] ids = new string[] { null, null, null, null, null }; for (int i = 0; i < activities.Count; i++) { ActivityTrackingRecord record = activities[i]; long aid = -1; ids[i] = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid); TryGetActivityInstanceId(ids[i], out aid); BuildInsertActivityStatusEventParameters(internalId, aid, i+1, record, command); } command.ExecuteNonQuery(); // // Get all the output ids long[] eventIds = new long[] { -1, -1, -1, -1, -1 }; for (int i = 0; i < activities.Count; i++) { string index = (i + 1).ToString(CultureInfo.InvariantCulture); // // ActivityInstanceId long aId = (long)command.Parameters["@ActivityInstanceId" + index].Value; Debug.Assert(aId > 0, "Invalid @ActivityInstanceId output parameter value"); // // For all status changes that aren't "Closed" add the id to the instance cache // Set... method checks and only adds if it does already exist. // To keep the cache size under control remove entries for activities that have closed. // The activity might fault and need to do a lookup in the db but this isn't the common // path and the db lookup isn't very expensive. if (ActivityExecutionStatus.Closed != activities[i].ExecutionStatus) SetActivityInstanceId(ids[i], aId); else RemoveActivityInstanceId(ids[i]); // // ActivityExecutionStatusEventId long aeseId = (long)command.Parameters["@ActivityExecutionStatusEventId" + index].Value; Debug.Assert(aeseId > 0, "Invalid @ActivityExecutionStatusEventId output parameter value"); eventIds[i] = aeseId; } List > annotations = new List >(10); List > items = new List >(10); for (int i = 0; i < activities.Count; i++) { ActivityTrackingRecord record = activities[i]; // // Get the ActivityExecutionStatusEventId long eventId = eventIds[i]; if (eventId<=0) throw new InvalidOperationException(); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair (eventId, s)); foreach (TrackingDataItem item in record.Body) items.Add(new KeyValuePair (eventId, item)); } BatchExecuteInsertEventAnnotation(internalId, 'a', annotations, command); BatchExecuteInsertTrackingDataItems(internalId, 'a', items, command); } private void BuildInsertActivityStatusEventParameters( long internalId, long activityInstanceId, int parameterId, ActivityTrackingRecord record, DbCommand command ) { string paramIdString = parameterId.ToString(CultureInfo.InvariantCulture); // // If we have the activity's instance id use it to avoid the look up in the db DbParameter param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId" + paramIdString, DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if ( activityInstanceId > 0 ) param.Value = activityInstanceId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName" + paramIdString, record.QualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid" + paramIdString, record.ContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid" + paramIdString, record.ParentContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ExecutionStatusId" + paramIdString, (int)record.ExecutionStatus)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime" + paramIdString, record.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder" + paramIdString, record.EventOrder)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ActivityExecutionStatusEventId" + paramIdString, DbType.Int64, ParameterDirection.Output)); } #endregion #region Sql Commands - InsertUserEvent private void ExecuteInsertUserEvent( object param ) { UserTrackingRecord record = param as UserTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidUserTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbTransaction tx = null; try { tx = conn.BeginTransaction( System.Data.IsolationLevel.ReadCommitted ); DbCommand command = conn.CreateCommand(); command.Transaction = tx; ExecuteInsertUserEvent( _internalId, record, command ); tx.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ( null != tx ) tx.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ( ( null != conn ) && ( ConnectionState.Closed != conn.State ) ) conn.Close(); } return; } private void ExecuteInsertUserEvent( long internalId, UserTrackingRecord record, DbCommand command ) { if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentException(); long aid = -1; bool cached = false; // // Check if we have the activityInstanceId in the cache - we cache to avoid repeatedly searching this table. string id = BuildQualifiedNameVarName( record.QualifiedName, record.ContextGuid, record.ParentContextGuid ); if ( TryGetActivityInstanceId( id, out aid ) ) cached = true; BuildInsertUserEventParameters( internalId, aid, record, command ); command.ExecuteNonQuery(); // // If we didn't already have the activityInstanceId get it from the IN/OUT param and put it in the cache if ( !cached ) SetActivityInstanceId( id, ( long ) command.Parameters["@ActivityInstanceId"].Value ); long eventId = (long)command.Parameters["@UserEventId"].Value; List > annotations = new List >(10); List > items = new List >(10); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair (eventId, s)); foreach (TrackingDataItem item in record.Body) items.Add(new KeyValuePair (eventId, item)); BatchExecuteInsertEventAnnotation(internalId, 'u', annotations, command); BatchExecuteInsertTrackingDataItems(internalId, 'u', items, command); } private void BuildInsertUserEventParameters( long internalId, long activityInstanceId, UserTrackingRecord record, DbCommand command ) { Debug.Assert( internalId != -1, "Invalid internalId" ); Debug.Assert( ( command != null ), "Null command passed to BuildInsertActivityStatusEventParameters" ); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertUserEvent]"; command.Parameters.Clear(); DbParameter param = DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", DbType.Int64 ); command.Parameters.Add( param ); param.Value = internalId; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventOrder", record.EventOrder ) ); // // If we have the activity's instance id use it to avoid the look up in the db param = DbResourceAllocator.NewDbParameter( "@ActivityInstanceId", DbType.Int64, System.Data.ParameterDirection.InputOutput ); command.Parameters.Add( param ); if ( activityInstanceId > 0 ) { param.Value = activityInstanceId; } else { // // Keep the network traffic down - only include the fields needed // to insert an ActivityInstance record if we don't have the activityInstanceId command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@QualifiedName", record.QualifiedName ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ContextGuid", record.ContextGuid ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@ParentContextGuid", record.ParentContextGuid ) ); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", record.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataKey", record.UserDataKey)); if ( null != record.UserData ) { Type t = record.UserData.GetType(); Byte[] data = null; bool nonSerializable = false; string userDataString = null; if (!(record.UserData is SerializedDataItem)) SerializeDataItem(record.UserData, out data, out nonSerializable); SerializedDataItem sItem = record.UserData as SerializedDataItem; data = sItem.SerializedData; nonSerializable = sItem.NonSerializable; userDataString = sItem.StringData; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataTypeFullName", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataAssemblyFullName", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Str", userDataString)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Blob", data)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataNonSerializable", nonSerializable)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserEventId", DbType.Int64, ParameterDirection.Output)); } #endregion #region Sql Commands - InsertTrackingDataItem private void BatchExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList > items, DbCommand command) { if (null == items || items.Count <= 0) return; // // If the list is smaller than the batch size just push the whole thing if (items.Count <= _dataItemBatchSize) { ExecuteInsertTrackingDataItems(internalId, eventTypeId, items, command); return; } // // Need to split the list into max batch size chunks List > batch = new List >(_dataItemBatchSize); foreach (KeyValuePair kvp in items) { batch.Add(kvp); if (batch.Count == _dataItemBatchSize) { ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command); batch.Clear(); } } // // Send anything that hasn't been sent if (batch.Count > 0) ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command); } private void ExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList > items, DbCommand command) { Debug.Assert( internalId != -1, "Invalid internalId" ); if ( null == items || items.Count <= 0 ) return; if (items.Count > _dataItemAnnotationBatchSize) throw new ArgumentOutOfRangeException("items"); if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentException(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertTrackingDataItemMultiple]"; command.Parameters.Clear(); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId)); int i = 1; // base 1 to match parameter names foreach(KeyValuePair kvp in items) { string index = (i++).ToString(CultureInfo.InvariantCulture); SerializedDataItem sItem = kvp.Value as SerializedDataItem; if (null == sItem) sItem = SerializeDataItem(kvp.Value); Type t = sItem.Type; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId"+index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@FieldName"+index, sItem.FieldName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName" + index, ((null == t) ? null : t.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName" + index, ((null == t) ? null : t.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Str" + index, sItem.StringData)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Blob" + index, sItem.SerializedData)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@DataNonSerializable" + index, sItem.NonSerializable)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, DbType.Int64, System.Data.ParameterDirection.Output)); } command.ExecuteNonQuery(); // // Get all the out parameters holding the data item record ids // This keeps us from repeatedly going into the parameters collection // below if a data item has more than one annotation List ids = new List (_dataItemAnnotationBatchSize); for( i = 0; i < items.Count; i++ ) { string index = (i+1).ToString(CultureInfo.InvariantCulture); ids.Insert(i, (long)command.Parameters["@TrackingDataItemId" + index].Value); } // // Go through all the data items and send all the annotations in batches List > annotations = new List >(_dataItemAnnotationBatchSize); i = 0; foreach(KeyValuePair kvp in items) { TrackingDataItem item = kvp.Value; long dataItemId = ids[i++]; foreach (string s in item.Annotations) { annotations.Add(new KeyValuePair (dataItemId, s)); if (annotations.Count == _dataItemAnnotationBatchSize) { ExecuteInsertAnnotation(internalId, annotations, command); annotations.Clear(); } } } // // If we have anything left send them. if ( annotations.Count > 0 ) ExecuteInsertAnnotation(internalId, annotations, command); } private void ExecuteInsertAnnotation(long internalId, IList > annotations, DbCommand command) { if ( null == annotations || annotations.Count <= 0 ) return; if (annotations.Count > _dataItemAnnotationBatchSize) throw new ArgumentOutOfRangeException("annotations"); if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertTrackingDataItemAnnotationMultiple]"; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId)); int i = 1; // base 1 to match parameter names foreach( KeyValuePair kvp in annotations) { string index = (i++).ToString(CultureInfo.InvariantCulture); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value )); } command.ExecuteNonQuery(); return; } private void BatchExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList > annotations, DbCommand command) { if (null == annotations || annotations.Count <= 0) return; // // If the list is smaller than the max batch size just send it directly if (annotations.Count <= _eventAnnotationBatchSize) { ExecuteInsertEventAnnotation(internalId, eventTypeId, annotations, command); return; } // // Need to split the list into max batch size chunks List > batch = new List >(_eventAnnotationBatchSize); foreach (KeyValuePair kvp in annotations) { batch.Add(kvp); if (batch.Count == _eventAnnotationBatchSize) { ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command); batch.Clear(); } } // // Send anything that hasn't been sent if (batch.Count > 0) ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command); } private void ExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList > annotations, DbCommand command) { Debug.Assert(internalId != -1, "Invalid internalId"); if (null == annotations || annotations.Count <= 0) return; if (annotations.Count > _eventAnnotationBatchSize) throw new ArgumentOutOfRangeException("annotations"); if ( ( null == command ) || ( null == command.Connection ) || ( ConnectionState.Open != command.Connection.State ) ) throw new ArgumentNullException( "command" ); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertEventAnnotationMultiple]"; command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@WorkflowInstanceInternalId", internalId ) ); command.Parameters.Add( DbResourceAllocator.NewDbParameter( "@EventTypeId", eventTypeId ) ); int i = 1; //base 1 to match parameter names foreach(KeyValuePair kvp in annotations) { string index = (i++).ToString(CultureInfo.InvariantCulture); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" +index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation"+index, kvp.Value)); } command.ExecuteNonQuery(); return; } #endregion #region Workflow Change private void ExecuteInsertWorkflowChange( object param ) { WorkflowTrackingRecord record = param as WorkflowTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param"); DbCommand command = DbResourceAllocator.NewCommand(); try { if ( ConnectionState.Open != command.Connection.State ) command.Connection.Open(); command.Transaction = command.Connection.BeginTransaction(); ExecuteInsertWorkflowChange(_internalId, record, command); command.Transaction.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ((null != command) && (null != command.Transaction)) command.Transaction.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } return; } private void ExecuteInsertWorkflowChange(long internalId, WorkflowTrackingRecord record, DbCommand command) { if (null == record) throw new ArgumentNullException("record"); if (null == record.EventArgs) throw new InvalidOperationException(ExecutionStringManager.InvalidWorkflowChangeArgs); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); // // If we haven't already serialized do so now. // This is work we have to do to write to store in the db anyway. if (!(record.EventArgs is SerializedWorkflowChangedEventArgs)) record = SerializeRecord(record); // // Insert the workflow instance event BuildInsertWorkflowInstanceEventParameters(internalId, record, null, command); command.ExecuteNonQuery(); // // Get the event id for added/removed activities and annotations long eventId = (long)command.Parameters["@WorkflowInstanceEventId1"].Value; SerializedWorkflowChangedEventArgs sargs = (SerializedWorkflowChangedEventArgs)record.EventArgs; // // Normalize the activities that have been added/removed if we're tracking definitions if ((null != sargs.AddedActivities) && (sargs.AddedActivities.Count > 0)) { foreach (AddedActivity added in sargs.AddedActivities) ExecuteInsertAddedActivity(internalId, added.QualifiedName, added.ParentQualifiedName, added.ActivityTypeFullName, added.ActivityTypeAssemblyFullName, added.AddedActivityActionXoml, eventId, added.Order, command); } if ((null != sargs.RemovedActivities) && (sargs.RemovedActivities.Count > 0)) { foreach (RemovedActivity removed in sargs.RemovedActivities) ExecuteInsertRemovedActivity(internalId, removed.QualifiedName, removed.ParentQualifiedName, removed.RemovedActivityActionXoml, eventId, removed.Order, command); } List > annotations = new List >(record.Annotations.Count); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair (eventId, s)); BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command); } private void ExecuteInsertAddedActivity(long internalId, string qualifiedName, string parentQualifiedName, string typeFullName, string assemblyFullName, string addedActivityActionXoml, long eventId, int order, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertAddedActivity]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", typeFullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", assemblyFullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AddedActivityAction", addedActivityActionXoml)); if (-1 == order) command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value)); else command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order)); command.ExecuteNonQuery(); } private void ExecuteInsertRemovedActivity(long internalId, string qualifiedName, string parentQualifiedName, string removedActivityActionXoml, long eventId, int order, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertRemovedActivity]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@RemovedActivityAction", removedActivityActionXoml)); if (-1 == order) command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value)); else command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order)); command.ExecuteNonQuery(); } #endregion #region Utility private bool TryGetActivityInstanceId(string key, out long id) { // // Check the cache of committed ids if (_activityInstanceId.TryGetValue(key, out id)) return true; // // If we're batched check the cache of temp ids generated during this batch commit if (_isTrans) return _tmpActivityInstanceId.TryGetValue(key, out id); else return false; // not batched so we didn't find the id } private void SetActivityInstanceId(string key, long id) { // // If we're batched put the ids in the temp member // If the commit is successful we'll move these to the real member // in IPendingWork.Complete if (_isTrans) { if (!_tmpActivityInstanceId.ContainsKey(key)) _tmpActivityInstanceId.Add(key, id); } else { if (!_activityInstanceId.ContainsKey(key)) _activityInstanceId.Add(key, id); } } private void RemoveActivityInstanceId(string key) { // // Remove from both the temp and real caches if (_isTrans) { if (_tmpActivityInstanceId.ContainsKey(key)) _tmpActivityInstanceId.Remove(key); } if (_activityInstanceId.ContainsKey(key)) _activityInstanceId.Remove(key); } private string GetSqlDateTimeString(DateTime dateTime) { return dateTime.Year.ToString(System.Globalization.CultureInfo.InvariantCulture) + PadToDblDigit(dateTime.Month) + PadToDblDigit(dateTime.Day) + " " + dateTime.Hour.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Minute.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Second.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Millisecond.ToString(System.Globalization.CultureInfo.InvariantCulture); } private string PadToDblDigit(int num) { string s = num.ToString(System.Globalization.CultureInfo.InvariantCulture); if (s.Length == 1) return "0" + s; else return s; } /// /// Build a string to uniquely identify each activity that should be recorded as a seperate instance. /// A separate instance is defined by the combination of QualifiedName, Context and ParentContext /// /// ///private string BuildQualifiedNameVarName( string qId, Guid context, Guid parentContext ) { Guid hashed = HashHelper.HashServiceType(qId); return hashed.ToString().Replace('-', '_') + "_" + context.ToString().Replace('-', '_') + "_" + parentContext.ToString().Replace('-', '_'); } private ActivityTrackingRecord SerializeRecord( ActivityTrackingRecord record ) { if ( ( null == record.Body ) || ( 0 == record.Body.Count ) ) return record; for ( int i = 0; i < record.Body.Count; i++ ) record.Body[i] = SerializeDataItem( record.Body[i] ); return record; } private UserTrackingRecord SerializeRecord( UserTrackingRecord record ) { if ( ( ( null == record.Body ) || ( 0 == record.Body.Count ) ) && ( null == record.EventArgs ) && ( null == record.UserData ) ) return record; if ( null != record.UserData ) { SerializedDataItem item = new SerializedDataItem(); byte[] data = null; bool nonSerializable; SerializeDataItem( record.UserData, out data, out nonSerializable ); item.Type = record.UserData.GetType(); item.StringData = record.UserData.ToString(); item.SerializedData = data; item.NonSerializable = nonSerializable; record.UserData = item; } for ( int i = 0; i < record.Body.Count; i++ ) record.Body[i] = SerializeDataItem( record.Body[i] ); return record; } private WorkflowTrackingRecord SerializeRecord( WorkflowTrackingRecord record ) { if ( null == record.EventArgs ) return record; SerializedEventArgs args; if ( TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent ) { // // Convert the WorkflowChanged items SerializedWorkflowChangedEventArgs sargs = new SerializedWorkflowChangedEventArgs(); TrackingWorkflowChangedEventArgs wargs = ( TrackingWorkflowChangedEventArgs ) record.EventArgs; if ( null != wargs ) { for ( int i = 0; i activities) { Activity removed = removedAction.OriginalRemovedActivity; RemovedActivity removedActivity = new RemovedActivity(); removedActivity.Order = order; removedActivity.QualifiedName = removed.QualifiedName; if (null != removed.Parent) removedActivity.ParentQualifiedName = removed.Parent.QualifiedName; // // Save the defintion of this change removedActivity.RemovedActivityActionXoml = GetXomlDocument(removedAction); activities.Add(removedActivity); // // Recursively add all contained activities to the removed list if (removed is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)removed).Activities) { AddRemovedActivity(activity, activities); } } } private void AddRemovedActivity(Activity removed, IList activities) { RemovedActivity removedActivity = new RemovedActivity(); removedActivity.Order = -1; removedActivity.QualifiedName = removed.QualifiedName; if ( null != removed.Parent ) removedActivity.ParentQualifiedName = removed.Parent.QualifiedName; activities.Add(removedActivity); // // Recursively add all contained activities to the removed list if (removed is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)removed).Activities) { AddRemovedActivity(activity, activities); } } } private void AddAddedActivity(AddedActivityAction addedAction, int order, IList activities) { Activity added = addedAction.AddedActivity; AddedActivity addedActivity = new AddedActivity(); addedActivity.Order = order; Type type = added.GetType(); addedActivity.ActivityTypeFullName = type.FullName; addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName; addedActivity.QualifiedName = added.QualifiedName; if (null != added.Parent) addedActivity.ParentQualifiedName = added.Parent.QualifiedName; addedActivity.AddedActivityActionXoml = GetXomlDocument(addedAction); activities.Add(addedActivity); // // Recursively add all contained activities to the added list if (added is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)added).Activities) { AddAddedActivity(activity, activities); } } } private void AddAddedActivity(Activity added, IList activities) { AddedActivity addedActivity = new AddedActivity(); addedActivity.Order = -1; Type type = added.GetType(); addedActivity.ActivityTypeFullName = type.FullName; addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName; addedActivity.QualifiedName = added.QualifiedName; if ( null != added.Parent ) addedActivity.ParentQualifiedName = added.Parent.QualifiedName; activities.Add(addedActivity); // // Recursively add all contained activities to the added list if (added is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)added).Activities) { AddAddedActivity(activity, activities); } } } private SerializedDataItem SerializeDataItem( TrackingDataItem item ) { if ( null == item ) return null; SerializedDataItem s = new SerializedDataItem(); s.Data = item.Data; s.Annotations.AddRange( item.Annotations ); s.FieldName = item.FieldName; if ( null != item.Data ) { byte[] state = null; bool nonSerializable; SerializeDataItem( item.Data, out state, out nonSerializable ); s.SerializedData = state; s.StringData = item.Data.ToString(); s.Type = item.Data.GetType(); s.NonSerializable = nonSerializable; } return s; } /// /// Binary serialize an object. Used to persist trackingDataItems. /// /// /// private void SerializeDataItem( object data, out byte[] state, out bool nonSerializable ) { nonSerializable = false; state = null; if ( null == data ) return; MemoryStream stream = new MemoryStream(1024); BinaryFormatter bf = new BinaryFormatter(); try { bf.Serialize(stream, data); state = new byte[stream.Length]; stream.Position = 0; if (stream.Length > Int32.MaxValue) return; else { int read = 0, totalRead = 0, cbToRead = 0; do { totalRead += read; cbToRead = (int)stream.Length - totalRead; read = stream.Read(state, totalRead, cbToRead); } while (read > 0); } } catch (SerializationException) { nonSerializable = true; return; } finally { stream.Close(); } } ////// Make string sql safe /// /// ///private string SqlEscape( string val ) { if ( null == val ) return null; return val.Replace( "'", "''" ); } /* static char[] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; /// /// Convert a byte array to a string of hex chars for sql image type /// /// ///private static string ToHexString( byte[] bytes ) { if ( null == bytes ) return null; if ( 0 == bytes.Length ) return null; char[] chars = new char[bytes.Length * 2]; for ( int i = 0; i < bytes.Length; i++ ) { int b = bytes[i]; chars[i * 2] = hexDigits[b >> 4]; chars[i * 2 + 1] = hexDigits[b & 0xF]; } return "0x" + new string( chars ); } */ private void GetCallPathKeys( IList callPath ) { if ( ( null == callPath ) || ( callPath.Count <= 0 ) ) return; for ( int i = 0; i < callPath.Count; i++ ) { _callPathKey = _callPathKey + "." + callPath[i]; if ( i < callPath.Count - 1 ) _parentCallPathKey = _parentCallPathKey + "." + callPath[i]; } if ( null != _callPathKey ) _callPathKey = SqlEscape( _callPathKey.Substring( 1 ) ); if ( null != _parentCallPathKey ) _parentCallPathKey = SqlEscape( _parentCallPathKey.Substring( 1 ) ); } private string GetActivitiesXml(CompositeActivity root) { if (null == root) return null; StringBuilder sb = new StringBuilder(); XmlWriter writer = XmlWriter.Create(sb); try { writer.WriteStartDocument(); writer.WriteStartElement("Activities"); WriteActivity(root, writer); writer.WriteEndElement(); writer.WriteEndDocument(); } finally { writer.Flush(); writer.Close(); } return sb.ToString(); } private void WriteActivity(Activity activity, XmlWriter writer) { if (null == activity) return; if (null == writer) throw new ArgumentNullException("writer"); Type t = activity.GetType(); writer.WriteStartElement("Activity"); writer.WriteElementString("TypeFullName", t.FullName); writer.WriteElementString("AssemblyFullName", t.Assembly.FullName); writer.WriteElementString("QualifiedName", activity.QualifiedName); // // Don't write the element if the value is null, sql will see a missing element as a null value if (null != activity.Parent) writer.WriteElementString("ParentQualifiedName", activity.Parent.QualifiedName); writer.WriteEndElement(); if (activity is CompositeActivity) foreach (Activity a in GetAllEnabledActivities((CompositeActivity)activity)) WriteActivity(a, writer); } // This function returns all the executable activities including secondary flow activities. private IList GetAllEnabledActivities(CompositeActivity compositeActivity) { if (compositeActivity == null) throw new ArgumentNullException("compositeActivity"); List allActivities = new List (compositeActivity.EnabledActivities); foreach (Activity secondaryFlowActivity in ((ISupportAlternateFlow)compositeActivity).AlternateFlowActivities) { if (!allActivities.Contains(secondaryFlowActivity)) allActivities.Add(secondaryFlowActivity); } return allActivities; } internal string GetXomlDocument(object obj) { string xomlText = null; using (StringWriter stringWriter = new StringWriter(System.Globalization.CultureInfo.InvariantCulture)) { using (XmlWriter xmlWriter = CreateXmlWriter(stringWriter)) { WorkflowMarkupSerializer serializer = new WorkflowMarkupSerializer(); serializer.Serialize(xmlWriter, obj); xomlText = stringWriter.ToString(); ; } } return xomlText; } #endregion } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- AutomationEvent.cs
- StorageComplexTypeMapping.cs
- LayeredChannelListener.cs
- Button.cs
- Track.cs
- PenCursorManager.cs
- BitmapEffectGeneralTransform.cs
- NativeMethods.cs
- MachineKeyConverter.cs
- PersistChildrenAttribute.cs
- StrongNameKeyPair.cs
- DataGridViewButtonColumn.cs
- XmlHierarchicalEnumerable.cs
- TypeDescriptionProvider.cs
- XmlTextWriter.cs
- Highlights.cs
- basemetadatamappingvisitor.cs
- CommandValueSerializer.cs
- ConfigurationLocation.cs
- XamlFxTrace.cs
- DataBindingList.cs
- BufferedStream.cs
- ServiceContractListItemList.cs
- XmlAttribute.cs
- InfoCardBaseException.cs
- ArgumentOutOfRangeException.cs
- WebResourceUtil.cs
- newinstructionaction.cs
- DeclaredTypeValidator.cs
- AnnotationResource.cs
- WebServiceTypeData.cs
- UInt64Storage.cs
- Viewport3DVisual.cs
- handlecollector.cs
- Rect3D.cs
- AccessibleObject.cs
- TextElementAutomationPeer.cs
- RoleManagerSection.cs
- EventHandlers.cs
- InputLangChangeRequestEvent.cs
- BaseTypeViewSchema.cs
- PreservationFileReader.cs
- SiteMapNodeItemEventArgs.cs
- HighlightComponent.cs
- Serializer.cs
- WebScriptEndpoint.cs
- LocalizationComments.cs
- odbcmetadatacollectionnames.cs
- SafeNativeMethods.cs
- UndoManager.cs
- WindowsStatusBar.cs
- TransformerInfo.cs
- MultiByteCodec.cs
- DataGridViewRowHeightInfoPushedEventArgs.cs
- RandomNumberGenerator.cs
- DynamicDataExtensions.cs
- Block.cs
- SQLDateTime.cs
- EntityKeyElement.cs
- PointUtil.cs
- InputChannelBinder.cs
- CompressEmulationStream.cs
- ResizeGrip.cs
- ASCIIEncoding.cs
- TranslateTransform.cs
- ParserExtension.cs
- ValuePattern.cs
- ITextView.cs
- TextServicesManager.cs
- XmlWellformedWriter.cs
- COM2ExtendedTypeConverter.cs
- VisualCollection.cs
- StructuredProperty.cs
- Encoder.cs
- XmlAutoDetectWriter.cs
- FrameworkContentElement.cs
- TextAdaptor.cs
- OrthographicCamera.cs
- SapiRecoContext.cs
- SqlDataSourceView.cs
- ListViewItem.cs
- MD5CryptoServiceProvider.cs
- XmlSchemaAnnotation.cs
- LayoutExceptionEventArgs.cs
- UrlMappingsModule.cs
- FreezableOperations.cs
- CustomLineCap.cs
- HtmlInputRadioButton.cs
- LinearKeyFrames.cs
- ReadWriteObjectLock.cs
- Command.cs
- DbConnectionPoolGroup.cs
- PeerNameResolver.cs
- HttpPostServerProtocol.cs
- ResourceAssociationType.cs
- ZipIOLocalFileBlock.cs
- ConditionalWeakTable.cs
- AppDomainUnloadedException.cs
- FolderBrowserDialog.cs
- DataServiceQueryOfT.cs