WorkBatch.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / WorkBatch.cs / 1305376 / WorkBatch.cs

                            #pragma warning disable 1634, 1691 
using System;
using System.Diagnostics;
using System.Transactions;
using System.Collections; 
using System.Collections.Generic;
using System.Workflow.Runtime.Hosting; 
 
namespace System.Workflow.Runtime
{ 
    #region Runtime Batch Implementation

    #region WorkBatch
 
 	internal enum WorkBatchState
	{ 
		Usable, 
		Merged,
 		Completed 
	}

    /// 
    /// Summary description for Work Batching. 
    /// 
    internal sealed class WorkBatch: IWorkBatch, IDisposable 
    { 
        private PendingWorkCollection _pendingWorkCollection;
        private object mutex = new object(); 
 		private WorkBatchState _state;
        private WorkBatchCollection _collection = null;

        private WorkBatch() 
        {
 		} 
 
        internal WorkBatch(WorkBatchCollection workBackCollection)
        { 
            _pendingWorkCollection = new PendingWorkCollection();
            _state = WorkBatchState.Usable;
            _collection = workBackCollection;
        } 

        internal int Count 
        { 
            get { return _pendingWorkCollection.WorkItems.Count; }
        } 

        internal void SetWorkBatchCollection(WorkBatchCollection workBatchCollection)
        {
            _collection = workBatchCollection; 
        }
 
        #region IWorkBatch Implementation 
        /// 
        /// Add Work to Batch 
        /// 
        /// 
        /// 
        void IWorkBatch.Add(IPendingWork work, object workItem) 
        {
            if (_pendingWorkCollection == null) 
                throw new ObjectDisposedException("WorkBatch"); 

            lock (this.mutex) 
            {
                System.Diagnostics.Debug.Assert(this._state == WorkBatchState.Usable, "Trying to add to unusable batch.");

                _pendingWorkCollection.Add(work, _collection.GetNextWorkItemOrderId(work), workItem); 
            }
        } 
        #endregion 

        #region Internal Implementation 

        internal bool IsDirty
        {
            get 
            {
                return this._pendingWorkCollection.IsDirty; 
            } 
        }
        ///  
        /// This one commits all the pending work and its items
        /// added so far in this batch.
        /// 
        ///  
        internal void Commit(Transaction transaction)
        { 
            lock (this.mutex) 
            {
                _pendingWorkCollection.Commit(transaction); 
            }
        }

 
        /// 
        /// This one completes the pending work 
        ///  
        /// 
        internal void Complete(bool succeeded) 
        {
            lock (this.mutex)
            {
                if (this._pendingWorkCollection.IsUsable) 
                {
                    _pendingWorkCollection.Complete(succeeded); 
					_pendingWorkCollection.Dispose(); 
 					this._state = WorkBatchState.Completed;
                } 
            }
        }

        ///  
        /// API for Runtime to call to do Merge Operation: Right now
        /// we dont use this because we dont support incoming work collection. 
        ///  
        /// 
        internal void Merge(WorkBatch batch) 
        {
            if (batch == null)
                return; //nothing to merge
 
            if (_pendingWorkCollection == null)
                throw new ObjectDisposedException("WorkBatch"); 
 
            lock (this.mutex)
            { 
                lock (batch.mutex)
                {
                    foreach (KeyValuePair> item in batch._pendingWorkCollection.WorkItems)
                    { 
                        //_pendingWorkCollection.AddRange(item.Key, item.Value);
                        SortedList newItems = item.Value; 
                        foreach (KeyValuePair kvp in newItems) 
                            _pendingWorkCollection.Add(item.Key, kvp.Key, kvp.Value);
                    } 
                }

                this._state = WorkBatchState.Merged;
            } 
        }
        #endregion 
 
        #region IDisposable Members
        public void Dispose() 
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        } 

        private void Dispose(bool disposing) 
        { 
            if (disposing)
            { 
				_pendingWorkCollection.Dispose();
                _pendingWorkCollection = null;
            }
        } 
        #endregion
 
        #region PendingWorkCollection implementation 

        ///  
        /// Pending Work Implementation
        /// 
        internal sealed class PendingWorkCollection : IDisposable
        { 
            Dictionary> Items;
 
            #region Internal Implementation 
            internal PendingWorkCollection()
            { 
                Items = new Dictionary>();
            }

            internal Dictionary> WorkItems 
            {
                get 
                { 
                    return Items;
                } 
            }

            internal bool IsUsable
            { 
                get
                { 
                    return this.Items != null; 
                }
            } 

            internal bool IsDirty
            {
                get 
                {
                    if (!this.IsUsable) 
                        return false; 

                    // 
                    // Loop through all pending work items in the collection
                    // If any of them assert that they need to commit than the batch is dirty
                    foreach (KeyValuePair> workItem in this.WorkItems)
                    { 
                        try
                        { 
                            IPendingWork work = workItem.Key; 
                            if (work.MustCommit(workItem.Value))
                                return true; 
                        }
                        catch (Exception e)
                        {
                            if (WorkflowExecutor.IsIrrecoverableException(e)) 
                            {
#pragma warning disable 56503 
                                throw; 
#pragma warning restore 56503
                            } 
                            else
                            {
                                // Ignore exceptions and treat condition as false return value;
                            } 
                        }
                    } 
                    // 
                    // If no one asserted that they need to commit we're not dirty
                    return false; 
                }
            }

            internal void Add(IPendingWork work, long orderId, object workItem) 
            {
                SortedList workItems = null; 
 
                if (!Items.TryGetValue(work, out workItems))
                { 
                    workItems = new SortedList();
                    Items.Add(work, workItems);
                }
                Debug.Assert(!workItems.ContainsKey(orderId), string.Format(System.Globalization.CultureInfo.InvariantCulture, "List already contains key {0}", orderId)); 
                workItems.Add(orderId, workItem);
                WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "pending work hc {0} added workItem hc {1}", work.GetHashCode(), workItem.GetHashCode()); 
            } 

            //Commit All Pending Work 
            internal void Commit(Transaction transaction)
            {
                //ignore items param
                foreach (KeyValuePair> workItem in Items) 
                {
                    IPendingWork work = workItem.Key; 
                    List values = new List(workItem.Value.Values); 
                    work.Commit(transaction, values);
                } 
            }

            //Complete All Pending Work
            internal void Complete(bool succeeded) 
            {
                foreach (KeyValuePair> workItem in Items) 
                { 
                    IPendingWork work = workItem.Key;
                    List values = new List(workItem.Value.Values); 
                    try
                    {
                        work.Complete(succeeded, values);
                    } 
                    catch (Exception e)
                    { 
                        if (WorkflowExecutor.IsIrrecoverableException(e)) 
                        {
                            throw; 
                        }
                        else
                        {
                            WorkflowTrace.Runtime.TraceEvent(TraceEventType.Warning, 0, "Work Item {0} threw exception on complete notification", workItem.GetType()); 
                        }
                    } 
                } 
            }
 
            #endregion

            #region IDisposable Members
            public void Dispose() 
            {
                Dispose(true); 
                GC.SuppressFinalize(this); 
            }
 
            private void Dispose(bool disposing)
            {
                if (disposing && Items != null)
                { 
                    Items.Clear();
                    Items = null; 
                } 
            }
 
            #endregion
        }
        #endregion
    } 
    #endregion
 
    #region WorkBatchCollection 
    /// 
    /// collection of name to Batch 
    /// 
    internal sealed class WorkBatchCollection : Dictionary
    {
        object transientBatchID = new object(); 
        private object mutex = new object();
        // 
        // All access must be through Interlocked.* methods 
        private long _workItemOrderId = 0;
 
        internal long WorkItemOrderId
        {
            get
            { 
                return Threading.Interlocked.Read( ref _workItemOrderId );
            } 
            set 
            {
                Debug.Assert(value >= _workItemOrderId, "New value for WorkItemOrderId must be greater than the current value"); 
                lock(mutex)
                {
                    _workItemOrderId = value;
                } 
            }
        } 
 
        internal long GetNextWorkItemOrderId(IPendingWork pendingWork)
        { 
            return Threading.Interlocked.Increment(ref _workItemOrderId);
        }
        /// 
        /// A new batch is created per atomic scope or any 
        /// required sub batches. An example of an optional sub batch
        /// could be a batch created for Send activities 
        ///  
        /// 
        ///  
        internal IWorkBatch GetBatch(object id)
        {
            WorkBatch batch = null;
 
            lock ( mutex )
            { 
				if (this.TryGetValue(id, out batch)) 
					return batch;
 
                batch = new WorkBatch(this);
                Add( id, batch );
            }
 
            return batch;
        } 
 
 		/// 
		/// Find a batch for a given id without creating it. 
 		/// 
 		/// batch key
		/// batch or null if not found
 		private WorkBatch FindBatch(object id) 
		{
			WorkBatch batch = null; 
			lock (mutex) 
 			{
				TryGetValue(id, out batch); 
 			}

 			return batch;
		} 

        internal IWorkBatch GetTransientBatch() 
        { 
            return GetBatch(transientBatchID);
        } 

        internal WorkBatch GetMergedBatch()
        {
            lock (mutex) 
            {
                WorkBatch batch = new WorkBatch(this); 
 
                foreach (WorkBatch existingBatch in this.Values)
                { 
                    batch.Merge(existingBatch);
                }
                //Copy of all the items merged in one batch.
                //Order is preserved in the same way batches are created. 
                return batch;
            } 
        } 

        internal void RollbackBatch(object id) 
        {
            lock ( mutex )
            {
 				WorkBatch batch = FindBatch(id); 
                if ( batch != null )
                { 
                    batch.Complete( false ); 
                    batch.Dispose();
                    Remove( id ); 
                }
            }
        }
 
		// Rollback all sub batches, calling "complete(false)" on all entries.
        internal void RollbackAllBatchedWork() 
        { 
            lock (mutex)
            { 
                foreach (WorkBatch batch in this.Values)
                {
                    batch.Complete(false);
                    batch.Dispose(); 
                }
                Clear(); // clear the collection 
            } 
        }
 
		// Clear sub batches after successful commit/complete.
        internal void ClearSubBatches()
        {
            lock (mutex) 
            {
                foreach (WorkBatch existingBatch in this.Values) 
                { 
                    existingBatch.Dispose();
                } 
                Clear(); // clear the collection
            }
        }
 
        internal void ClearTransientBatch()
        { 
            RollbackBatch(transientBatchID); 
        }
    } 
    #endregion

    #endregion
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
                        

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK