Scheduler.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.Activities / System / Activities / Runtime / Scheduler.cs / 1305376 / Scheduler.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.Activities.Runtime 
{
    using System; 
    using System.Diagnostics.CodeAnalysis; 
    using System.Runtime;
    using System.Runtime.Serialization; 
    using System.Security;
    using System.Threading;
    using System.Runtime.Diagnostics;
 
    [DataContract(Name = XD.Runtime.Scheduler, Namespace = XD.Runtime.Namespace)]
    class Scheduler 
    { 
        static ContinueAction continueAction = new ContinueAction();
        static YieldSilentlyAction yieldSilentlyAction = new YieldSilentlyAction(); 
        static AbortAction abortAction = new AbortAction();

        [DataMember(EmitDefaultValue = false)]
        WorkItem firstWorkItem; 

        static SendOrPostCallback onScheduledWorkCallback = Fx.ThunkCallback(new SendOrPostCallback(OnScheduledWork)); 
 
        SynchronizationContext synchronizationContext;
 
        bool isPausing;
        bool isRunning;

        bool resumeTraceRequired; 

        Callbacks callbacks; 
 
        Quack workItemQueue;
 
        public Scheduler(Callbacks callbacks)
        {
            this.Initialize(callbacks);
        } 

        public static RequestedAction Continue 
        { 
            get
            { 
                return continueAction;
            }
        }
 
        public static RequestedAction YieldSilently
        { 
            get 
            {
                return yieldSilentlyAction; 
            }
        }

        public static RequestedAction Abort 
        {
            get 
            { 
                return abortAction;
            } 
        }

        public bool IsRunning
        { 
            get
            { 
                return this.isRunning; 
            }
        } 

        public bool IsIdle
        {
            get 
            {
                return this.firstWorkItem == null; 
            } 
        }
 
        [DataMember(EmitDefaultValue = false)]
        [SuppressMessage(FxCop.Category.Performance, FxCop.Rule.AvoidUncalledPrivateCode)]
        WorkItem[] SerializedWorkItemQueue
        { 
            get
            { 
                if (this.workItemQueue != null && this.workItemQueue.Count > 0) 
                {
                    return this.workItemQueue.ToArray(); 
                }
                else
                {
                    return null; 
                }
            } 
            set 
            {
                Fx.Assert(value != null, "EmitDefaultValue is false so we should never get null."); 

                // this.firstWorkItem is serialized out separately, so don't use ScheduleWork() here
                this.workItemQueue = new Quack(value);
            } 
        }
 
        public void FillInstanceMap(ActivityInstanceMap instanceMap) 
        {
            if (this.firstWorkItem != null) 
            {
                ActivityInstanceMap.IActivityReference activityReference = this.firstWorkItem as ActivityInstanceMap.IActivityReference;
                if (activityReference != null)
                { 
                    instanceMap.AddEntry(activityReference, true);
                } 
 
                if (this.workItemQueue != null && this.workItemQueue.Count > 0)
                { 
                    for (int i = 0; i < this.workItemQueue.Count; i++)
                    {
                        activityReference = this.workItemQueue[i] as ActivityInstanceMap.IActivityReference;
                        if (activityReference != null) 
                        {
                            instanceMap.AddEntry(activityReference, true); 
                        } 
                    }
                } 
            }
        }

        public static RequestedAction CreateNotifyUnhandledExceptionAction(Exception exception, ActivityInstance sourceInstance) 
        {
            return new NotifyUnhandledExceptionAction(exception, sourceInstance); 
        } 

        public void ClearAllWorkItems(ActivityExecutor executor) 
        {
            if (this.firstWorkItem != null)
            {
                this.firstWorkItem.Release(executor); 
                this.firstWorkItem = null;
 
                if (this.workItemQueue != null) 
                {
                    while (this.workItemQueue.Count > 0) 
                    {
                        WorkItem item = this.workItemQueue.Dequeue();
                        item.Release(executor);
                    } 
                }
            } 
 
            Fx.Assert(this.workItemQueue == null || this.workItemQueue.Count == 0, "We either didn't have a first work item and therefore don't have anything in the queue, or we drained the queue.");
 
            // For consistency we set this to null even if it is empty
            this.workItemQueue = null;
        }
 
        public void OnDeserialized(Callbacks callbacks)
        { 
            Initialize(callbacks); 
            Fx.Assert(this.firstWorkItem != null || this.workItemQueue == null, "cannot have items in the queue unless we also have a firstWorkItem set");
        } 

        // This method should only be called when we relinquished the thread but did not
        // complete the operation (silent yield is the current example)
        public void InternalResume(RequestedAction action) 
        {
            Fx.Assert(this.isRunning, "We should still be processing work - we just don't have a thread"); 
 
            //
            bool isTracingEnabled = FxTrace.ShouldTraceInformation; 
            bool notifiedCompletion = false;
            bool isInstanceComplete = false;

            if (this.callbacks.IsAbortPending) 
            {
                this.isPausing = false; 
                this.isRunning = false; 

                this.NotifyWorkCompletion(); 
                notifiedCompletion = true;

                if (isTracingEnabled)
                { 
                    isInstanceComplete = this.callbacks.IsCompleted;
                } 
 
                // After calling SchedulerIdle we no longer have the lock.  That means
                // that any subsequent processing in this method won't have the single 
                // threaded guarantee.
                this.callbacks.SchedulerIdle();
            }
            else if (object.ReferenceEquals(action, continueAction)) 
            {
                ScheduleWork(false); 
            } 
            else
            { 
                Fx.Assert(action is NotifyUnhandledExceptionAction, "This is the only other choice because we should never have YieldSilently here");

                NotifyUnhandledExceptionAction notifyAction = (NotifyUnhandledExceptionAction)action;
 
                // We only set isRunning back to false so that the host doesn't
                // have to treat this like a pause notification.  As an example, 
                // a host could turn around and call run again in response to 
                // UnhandledException without having to go through its operation
                // dispatch loop first (or request pause again).  If we reset 
                // isPausing here then any outstanding operations wouldn't get
                // signaled with that type of host.
                this.isRunning = false;
 
                this.NotifyWorkCompletion();
                notifiedCompletion = true; 
 
                if (isTracingEnabled)
                { 
                    isInstanceComplete = this.callbacks.IsCompleted;
                }

                this.callbacks.NotifyUnhandledException(notifyAction.Exception, notifyAction.Source); 
            }
 
            if (isTracingEnabled) 
            {
                if (notifiedCompletion) 
                {
                    Guid oldActivityId = Guid.Empty;
                    bool resetId = false;
 
                    if (isInstanceComplete)
                    { 
                        if (TD.WorkflowActivityStopIsEnabled()) 
                        {
                            oldActivityId = DiagnosticTrace.ActivityId; 
                            DiagnosticTrace.ActivityId = this.callbacks.WorkflowInstanceId;
                            resetId = true;

                            TD.WorkflowActivityStop(this.callbacks.WorkflowInstanceId.ToString()); 
                        }
                    } 
                    else 
                    {
                        if (TD.WorkflowActivitySuspendIsEnabled()) 
                        {
                            oldActivityId = DiagnosticTrace.ActivityId;
                            DiagnosticTrace.ActivityId = this.callbacks.WorkflowInstanceId;
                            resetId = true; 

                            TD.WorkflowActivitySuspend(this.callbacks.WorkflowInstanceId.ToString()); 
                        } 
                    }
 
                    if (resetId)
                    {
                        DiagnosticTrace.ActivityId = oldActivityId;
                    } 
                }
            } 
        } 

        // called from ctor and OnDeserialized intialization paths 
        void Initialize(Callbacks callbacks)
        {
            this.callbacks = callbacks;
        } 

        public void Open(SynchronizationContext synchronizationContext) 
        { 
            Fx.Assert(this.synchronizationContext == null, "can only open when in the created state");
            if (synchronizationContext != null) 
            {
                this.synchronizationContext = synchronizationContext;
            }
            else 
            {
                this.synchronizationContext = SynchronizationContextHelper.GetDefaultSynchronizationContext(); 
            } 
        }
 
        internal void Open(Scheduler oldScheduler)
        {
            Fx.Assert(this.synchronizationContext == null, "can only open when in the created state");
            this.synchronizationContext = SynchronizationContextHelper.CloneSynchronizationContext(oldScheduler.synchronizationContext); 
        }
 
        void ScheduleWork(bool notifyStart) 
        {
            if (notifyStart) 
            {
                this.synchronizationContext.OperationStarted();
                this.resumeTraceRequired = true;
            } 
            else
            { 
                this.resumeTraceRequired = false; 
            }
            this.synchronizationContext.Post(Scheduler.onScheduledWorkCallback, this); 
        }

        void NotifyWorkCompletion()
        { 
            this.synchronizationContext.OperationCompleted();
        } 
 
        // signal the scheduler to stop processing work. If we are processing work
        // then we will catch this signal at our next iteration. Pause process completes 
        // when idle is signalled. Can be called while we're processing work since
        // the worst thing that could happen in a ---- is that we pause one extra work item later
        public void Pause()
        { 
            this.isPausing = true;
        } 
 
        public void MarkRunning()
        { 
            this.isRunning = true;
        }

        public void Resume() 
        {
            Fx.Assert(this.isRunning, "This should only be called after we've been set to process work."); 
 
            if (this.IsIdle || this.isPausing || this.callbacks.IsAbortPending)
            { 
                this.isPausing = false;
                this.isRunning = false;
                this.callbacks.SchedulerIdle();
            } 
            else
            { 
                ScheduleWork(true); 
            }
        } 

        public void PushWork(WorkItem workItem)
        {
            if (this.firstWorkItem == null) 
            {
                this.firstWorkItem = workItem; 
            } 
            else
            { 
                if (this.workItemQueue == null)
                {
                    this.workItemQueue = new Quack();
                } 

                this.workItemQueue.PushFront(this.firstWorkItem); 
                this.firstWorkItem = workItem; 
            }
 
            // To avoid the virt call on EVERY work item we check
            // the Verbose flag.  All of our Schedule traces are
            // verbose.
            if (FxTrace.ShouldTraceVerboseToTraceSource) 
            {
                workItem.TraceScheduled(); 
            } 
        }
 
        public void EnqueueWork(WorkItem workItem)
        {
            if (this.firstWorkItem == null)
            { 
                this.firstWorkItem = workItem;
            } 
            else 
            {
                if (this.workItemQueue == null) 
                {
                    this.workItemQueue = new Quack();
                }
 
                this.workItemQueue.Enqueue(workItem);
            } 
 
            if (FxTrace.ShouldTraceVerboseToTraceSource)
            { 
                workItem.TraceScheduled();
            }
        }
 
        static void OnScheduledWork(object state)
        { 
            Scheduler thisPtr = (Scheduler)state; 

            // We snapshot these values here so that we can 
            // use them after calling OnSchedulerIdle.
            //
            bool isTracingEnabled = FxTrace.Trace.ShouldTraceToTraceSource(TraceEventLevel.Informational);
            Guid oldActivityId = Guid.Empty; 
            Guid workflowInstanceId = Guid.Empty;
 
            if (isTracingEnabled) 
            {
                oldActivityId = DiagnosticTrace.ActivityId; 
                workflowInstanceId = thisPtr.callbacks.WorkflowInstanceId;
                FxTrace.Trace.SetAndTraceTransfer(workflowInstanceId, true);

                if (thisPtr.resumeTraceRequired) 
                {
                    if (TD.WorkflowActivityResumeIsEnabled()) 
                    { 
                        TD.WorkflowActivityResume(workflowInstanceId.ToString());
                    } 
                }
            }

            thisPtr.callbacks.ThreadAcquired(); 

            RequestedAction nextAction = continueAction; 
            bool idleOrPaused = false; 

            while (object.ReferenceEquals(nextAction, continueAction)) 
            {
                if (thisPtr.IsIdle || thisPtr.isPausing)
                {
                    idleOrPaused = true; 
                    break;
                } 
 
                // cycle through (queue->thisPtr.firstWorkItem->currentWorkItem)
                WorkItem currentWorkItem = thisPtr.firstWorkItem; 

                // promote an item out of our work queue if necessary
                if (thisPtr.workItemQueue != null && thisPtr.workItemQueue.Count > 0)
                { 
                    thisPtr.firstWorkItem = thisPtr.workItemQueue.Dequeue();
                } 
                else 
                {
                    thisPtr.firstWorkItem = null; 
                }

                nextAction = thisPtr.callbacks.ExecuteWorkItem(currentWorkItem);
            } 

            bool notifiedCompletion = false; 
            bool isInstanceComplete = false; 

            if (idleOrPaused || object.ReferenceEquals(nextAction, abortAction)) 
            {
                thisPtr.isPausing = false;
                thisPtr.isRunning = false;
 
                thisPtr.NotifyWorkCompletion();
                notifiedCompletion = true; 
 
                if (isTracingEnabled)
                { 
                    isInstanceComplete = thisPtr.callbacks.IsCompleted;
                }

                // After calling SchedulerIdle we no longer have the lock.  That means 
                // that any subsequent processing in this method won't have the single
                // threaded guarantee. 
                thisPtr.callbacks.SchedulerIdle(); 
            }
            else if (!object.ReferenceEquals(nextAction, yieldSilentlyAction)) 
            {
                Fx.Assert(nextAction is NotifyUnhandledExceptionAction, "This is the only other option");

                NotifyUnhandledExceptionAction notifyAction = (NotifyUnhandledExceptionAction)nextAction; 

                // We only set isRunning back to false so that the host doesn't 
                // have to treat this like a pause notification.  As an example, 
                // a host could turn around and call run again in response to
                // UnhandledException without having to go through its operation 
                // dispatch loop first (or request pause again).  If we reset
                // isPausing here then any outstanding operations wouldn't get
                // signaled with that type of host.
                thisPtr.isRunning = false; 

                thisPtr.NotifyWorkCompletion(); 
                notifiedCompletion = true; 

                if (isTracingEnabled) 
                {
                    isInstanceComplete = thisPtr.callbacks.IsCompleted;
                }
 
                thisPtr.callbacks.NotifyUnhandledException(notifyAction.Exception, notifyAction.Source);
            } 
 
            if (isTracingEnabled)
            { 
                if (notifiedCompletion)
                {
                    if (isInstanceComplete)
                    { 
                        if (TD.WorkflowActivityStopIsEnabled())
                        { 
                            TD.WorkflowActivityStop(workflowInstanceId.ToString()); 
                        }
                    } 
                    else
                    {
                        if (TD.WorkflowActivitySuspendIsEnabled())
                        { 
                            TD.WorkflowActivitySuspend(workflowInstanceId.ToString());
                        } 
                    } 
                }
 
                DiagnosticTrace.ActivityId = oldActivityId;
            }
        }
 
        public struct Callbacks
        { 
            readonly ActivityExecutor activityExecutor; 

            public Callbacks(ActivityExecutor activityExecutor) 
            {
                this.activityExecutor = activityExecutor;
            }
 
            public Guid WorkflowInstanceId
            { 
                get 
                {
                    return this.activityExecutor.WorkflowInstanceId; 
                }
            }

            public bool IsAbortPending 
            {
                get 
                { 
                    return this.activityExecutor.IsAbortPending || this.activityExecutor.IsTerminatePending;
                } 
            }

            public bool IsCompleted
            { 
                get
                { 
                    return ActivityUtilities.IsCompletedState(this.activityExecutor.State); 
                }
            } 

            public RequestedAction ExecuteWorkItem(WorkItem workItem)
            {
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in ExecuteWorkItem."); 

                // We check the Verbose flag to avoid the 
                // virt call if possible 
                if (FxTrace.ShouldTraceVerboseToTraceSource)
                { 
                    workItem.TraceStarting();
                }

                RequestedAction action = this.activityExecutor.OnExecuteWorkItem(workItem); 

                if (!object.ReferenceEquals(action, Scheduler.YieldSilently)) 
                { 
                    if (this.activityExecutor.IsAbortPending || this.activityExecutor.IsTerminatePending)
                    { 
                        action = Scheduler.Abort;
                    }

                    // if the caller yields, then the work item is still active and the callback 
                    // is responsible for releasing it back to the pool
                    workItem.Dispose(this.activityExecutor); 
                } 

                return action; 
            }

            public void SchedulerIdle()
            { 
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in SchedulerIdle.");
                this.activityExecutor.OnSchedulerIdle(); 
            } 

            public void ThreadAcquired() 
            {
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in ThreadAcquired.");
                this.activityExecutor.OnSchedulerThreadAcquired();
            } 

            public void NotifyUnhandledException(Exception exception, ActivityInstance source) 
            { 
                Fx.Assert(this.activityExecutor != null, "ActivityExecutor null in NotifyUnhandledException.");
                this.activityExecutor.NotifyUnhandledException(exception, source); 
            }
        }

        internal abstract class RequestedAction 
        {
            protected RequestedAction() 
            { 
            }
        } 

        class ContinueAction : RequestedAction
        {
            public ContinueAction() 
            {
            } 
        } 

        class YieldSilentlyAction : RequestedAction 
        {
            public YieldSilentlyAction()
            {
            } 
        }
 
        class AbortAction : RequestedAction 
        {
            public AbortAction() 
            {
            }
        }
 
        class NotifyUnhandledExceptionAction : RequestedAction
        { 
            public NotifyUnhandledExceptionAction(Exception exception, ActivityInstance source) 
            {
                this.Exception = exception; 
                this.Source = source;
            }

            public Exception Exception 
            {
                get; 
                private set; 
            }
 
            public ActivityInstance Source
            {
                get;
                private set; 
            }
        } 
    } 
}
 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK