Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Hosting / DefaultWorkflowSchedulerService.cs / 1305376 / DefaultWorkflowSchedulerService.cs
using System; using System.Collections; using System.Collections.ObjectModel; using System.Collections.Generic; using System.Threading; using System.Collections.Specialized; using System.Diagnostics; using System.Workflow.Runtime; using System.Globalization; namespace System.Workflow.Runtime.Hosting { public class DefaultWorkflowSchedulerService : WorkflowSchedulerService { // next two fields controlled by locking the timerQueue private KeyedPriorityQueuetimerQueue = new KeyedPriorityQueue (); private Timer callbackTimer; private TimerCallback timerCallback; private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows"; private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5; private static TimeSpan infinite = new TimeSpan(Timeout.Infinite); private readonly int maxSimultaneousWorkflows; // Maximum number of work items allowed in ThreadPool queue private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0); // next three fields controlled by locking the waitingQueue private int numCurrentWorkers; private Queue waitingQueue; // Queue for extra items waiting to be allowed into thread pool private volatile bool running = false; private IList queueCounters; // expose internal queue length private static int DefaultThreadCount { get { return Environment.ProcessorCount == 1 ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8); } } public DefaultWorkflowSchedulerService() : this(DefaultThreadCount) { } public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows) : base() { if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); this.maxSimultaneousWorkflows = maxSimultaneousWorkflows; init(); } public DefaultWorkflowSchedulerService(NameValueCollection parameters) : base() { if (parameters == null) throw new ArgumentNullException("parameters"); maxSimultaneousWorkflows = DefaultThreadCount; foreach (string key in parameters.Keys) { if (key == null) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null")); string p = parameters[key]; if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase)) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key)); if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows)) throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY); } if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); init(); } private void init() { timerCallback = new TimerCallback(OnTimerCallback); timerQueue.FirstElementChanged += OnFirstElementChanged; waitingQueue = new Queue (); } public int MaxSimultaneousWorkflows { get { return maxSimultaneousWorkflows; } } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId); if (callback == null) throw new ArgumentNullException("callback"); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); // Add the work item to our internal queue and signal the ProcessQueue thread EnqueueWorkItem( new WorkItem(callback, workflowInstanceId) ); } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow)); if (callback == null) throw new ArgumentNullException("callback"); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId")); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc); lock (timerQueue) { timerQueue.Enqueue(timerId, ci, whenUtc); } } internal protected override void Cancel(Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId"); lock (timerQueue) { timerQueue.Remove(timerId); } } override protected void OnStarted() { lock (timerQueue) { base.OnStarted(); CallbackInfo ci = timerQueue.Peek(); if (ci != null) callbackTimer = CreateTimerCallback(ci); running = true; } lock (waitingQueue) { int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count); for (int i = 0; i < nToStart; i++) { if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters == null && this.Runtime.PerformanceCounterManager != null) { queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName); } } protected internal override void Stop() { lock (timerQueue) { base.Stop(); if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } running = false; } lock (waitingQueue) { while (numCurrentWorkers > 0) { Monitor.Wait(waitingQueue); } } } private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs e) { // timerQueue must have been locked by operation that caused this event to fire if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started) { callbackTimer = CreateTimerCallback(e.NewFirstElement); } } private void OnTimerCallback(object ignored) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; CallbackInfo ci = null; bool fire = false; try { lock (timerQueue) { if (State == WorkflowRuntimeServiceState.Started) { ci = timerQueue.Peek(); if (ci != null) { if (ci.IsExpired) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State); timerQueue.Dequeue(); fire = true; } else { callbackTimer = CreateTimerCallback(ci); } } } } if(fire && ci != null) ci.Callback(ci.State); } // Ignore cases where the workflow has been stolen out from under us catch (WorkflowOwnershipException) { } catch (ThreadAbortException e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); throw; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); } } private Timer CreateTimerCallback(CallbackInfo info) { DateTime now = DateTime.UtcNow; TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero; if (span > fiveMinutes) // never let more than five minutes go by without checking span = fiveMinutes; return new Timer(timerCallback, info.State, span, infinite); } private void EnqueueWorkItem( WorkItem workItem ) { lock (waitingQueue) { waitingQueue.Enqueue(workItem); if (running && numCurrentWorkers < maxSimultaneousWorkflows) { if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } } private void QueueWorkerProcess(object state /*unused*/) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; while (true) { WorkItem workItem; lock (waitingQueue) { if (waitingQueue.Count == 0 || !running) { numCurrentWorkers--; Monitor.Pulse(waitingQueue); return; } workItem = waitingQueue.Dequeue(); } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } workItem.Invoke(this); } } internal class WorkItem { private WaitCallback callback; private object state; public WorkItem(WaitCallback callback, object state) { this.callback = callback; this.state = state; } public WaitCallback Callback { get { return callback; } } public void Invoke(WorkflowSchedulerService service) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state); Callback(state); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { throw; } else { service.RaiseExceptionNotHandledEvent(e, (Guid)state); } } } } internal class CallbackInfo { WaitCallback callback; object state; DateTime when; WorkflowSchedulerService service; public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when) { this.service = service; this.callback = callback; this.state = state; this.when = when; } public DateTime When { get { return when; } } public bool IsExpired { get { return DateTime.UtcNow >= when; } } public object State { get { return state; } } public WaitCallback Callback { get { return callback; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved. using System; using System.Collections; using System.Collections.ObjectModel; using System.Collections.Generic; using System.Threading; using System.Collections.Specialized; using System.Diagnostics; using System.Workflow.Runtime; using System.Globalization; namespace System.Workflow.Runtime.Hosting { public class DefaultWorkflowSchedulerService : WorkflowSchedulerService { // next two fields controlled by locking the timerQueue private KeyedPriorityQueue timerQueue = new KeyedPriorityQueue (); private Timer callbackTimer; private TimerCallback timerCallback; private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows"; private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5; private static TimeSpan infinite = new TimeSpan(Timeout.Infinite); private readonly int maxSimultaneousWorkflows; // Maximum number of work items allowed in ThreadPool queue private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0); // next three fields controlled by locking the waitingQueue private int numCurrentWorkers; private Queue waitingQueue; // Queue for extra items waiting to be allowed into thread pool private volatile bool running = false; private IList queueCounters; // expose internal queue length private static int DefaultThreadCount { get { return Environment.ProcessorCount == 1 ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8); } } public DefaultWorkflowSchedulerService() : this(DefaultThreadCount) { } public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows) : base() { if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); this.maxSimultaneousWorkflows = maxSimultaneousWorkflows; init(); } public DefaultWorkflowSchedulerService(NameValueCollection parameters) : base() { if (parameters == null) throw new ArgumentNullException("parameters"); maxSimultaneousWorkflows = DefaultThreadCount; foreach (string key in parameters.Keys) { if (key == null) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null")); string p = parameters[key]; if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase)) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key)); if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows)) throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY); } if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); init(); } private void init() { timerCallback = new TimerCallback(OnTimerCallback); timerQueue.FirstElementChanged += OnFirstElementChanged; waitingQueue = new Queue (); } public int MaxSimultaneousWorkflows { get { return maxSimultaneousWorkflows; } } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId); if (callback == null) throw new ArgumentNullException("callback"); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); // Add the work item to our internal queue and signal the ProcessQueue thread EnqueueWorkItem( new WorkItem(callback, workflowInstanceId) ); } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow)); if (callback == null) throw new ArgumentNullException("callback"); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId")); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc); lock (timerQueue) { timerQueue.Enqueue(timerId, ci, whenUtc); } } internal protected override void Cancel(Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId"); lock (timerQueue) { timerQueue.Remove(timerId); } } override protected void OnStarted() { lock (timerQueue) { base.OnStarted(); CallbackInfo ci = timerQueue.Peek(); if (ci != null) callbackTimer = CreateTimerCallback(ci); running = true; } lock (waitingQueue) { int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count); for (int i = 0; i < nToStart; i++) { if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters == null && this.Runtime.PerformanceCounterManager != null) { queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName); } } protected internal override void Stop() { lock (timerQueue) { base.Stop(); if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } running = false; } lock (waitingQueue) { while (numCurrentWorkers > 0) { Monitor.Wait(waitingQueue); } } } private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs e) { // timerQueue must have been locked by operation that caused this event to fire if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started) { callbackTimer = CreateTimerCallback(e.NewFirstElement); } } private void OnTimerCallback(object ignored) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; CallbackInfo ci = null; bool fire = false; try { lock (timerQueue) { if (State == WorkflowRuntimeServiceState.Started) { ci = timerQueue.Peek(); if (ci != null) { if (ci.IsExpired) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State); timerQueue.Dequeue(); fire = true; } else { callbackTimer = CreateTimerCallback(ci); } } } } if(fire && ci != null) ci.Callback(ci.State); } // Ignore cases where the workflow has been stolen out from under us catch (WorkflowOwnershipException) { } catch (ThreadAbortException e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); throw; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); } } private Timer CreateTimerCallback(CallbackInfo info) { DateTime now = DateTime.UtcNow; TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero; if (span > fiveMinutes) // never let more than five minutes go by without checking span = fiveMinutes; return new Timer(timerCallback, info.State, span, infinite); } private void EnqueueWorkItem( WorkItem workItem ) { lock (waitingQueue) { waitingQueue.Enqueue(workItem); if (running && numCurrentWorkers < maxSimultaneousWorkflows) { if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } } private void QueueWorkerProcess(object state /*unused*/) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; while (true) { WorkItem workItem; lock (waitingQueue) { if (waitingQueue.Count == 0 || !running) { numCurrentWorkers--; Monitor.Pulse(waitingQueue); return; } workItem = waitingQueue.Dequeue(); } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } workItem.Invoke(this); } } internal class WorkItem { private WaitCallback callback; private object state; public WorkItem(WaitCallback callback, object state) { this.callback = callback; this.state = state; } public WaitCallback Callback { get { return callback; } } public void Invoke(WorkflowSchedulerService service) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state); Callback(state); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { throw; } else { service.RaiseExceptionNotHandledEvent(e, (Guid)state); } } } } internal class CallbackInfo { WaitCallback callback; object state; DateTime when; WorkflowSchedulerService service; public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when) { this.service = service; this.callback = callback; this.state = state; this.when = when; } public DateTime When { get { return when; } } public bool IsExpired { get { return DateTime.UtcNow >= when; } } public object State { get { return state; } } public WaitCallback Callback { get { return callback; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SBCSCodePageEncoding.cs
- BatchParser.cs
- ComponentManagerBroker.cs
- _RequestCacheProtocol.cs
- MethodBody.cs
- InputLanguageSource.cs
- SessionEndedEventArgs.cs
- UndirectedGraph.cs
- Mutex.cs
- StorageAssociationSetMapping.cs
- BasicCellRelation.cs
- QueryRewriter.cs
- PixelShader.cs
- TCPClient.cs
- LinqDataSourceEditData.cs
- StrokeCollectionDefaultValueFactory.cs
- QueryContinueDragEventArgs.cs
- CDSsyncETWBCLProvider.cs
- CompositeActivityTypeDescriptor.cs
- RowVisual.cs
- PromptBuilder.cs
- SHA384Managed.cs
- TypedServiceChannelBuilder.cs
- ObjectStateManager.cs
- XmlSchemaSearchPattern.cs
- DataGridAddNewRow.cs
- TextEffectCollection.cs
- GroupItem.cs
- RegularExpressionValidator.cs
- ConfigurationSectionCollection.cs
- UpdateCommandGenerator.cs
- BuiltInExpr.cs
- SHA384.cs
- HelpHtmlBuilder.cs
- Timer.cs
- _IPv4Address.cs
- XmlNamespaceDeclarationsAttribute.cs
- DataBindingHandlerAttribute.cs
- HttpResponseHeader.cs
- Point3DConverter.cs
- NoResizeHandleGlyph.cs
- DBParameter.cs
- RuntimeWrappedException.cs
- JapaneseLunisolarCalendar.cs
- SequenceNumber.cs
- AuthenticatingEventArgs.cs
- TextAdaptor.cs
- BindingBase.cs
- ValidationSummary.cs
- CFStream.cs
- MultiBinding.cs
- TextEditorLists.cs
- XmlAttributeOverrides.cs
- DateTimeOffset.cs
- __Error.cs
- WebPartCloseVerb.cs
- Identity.cs
- OracleParameterBinding.cs
- DataGridViewColumnCollection.cs
- StateMachineSubscriptionManager.cs
- SystemGatewayIPAddressInformation.cs
- NonceToken.cs
- SqlGenerator.cs
- DateTimeSerializationSection.cs
- Metadata.cs
- ToolStripCustomTypeDescriptor.cs
- DecoderBestFitFallback.cs
- TagPrefixAttribute.cs
- AutomationPropertyInfo.cs
- OciHandle.cs
- altserialization.cs
- ServiceContractAttribute.cs
- ConfigXmlReader.cs
- FrameSecurityDescriptor.cs
- IpcClientChannel.cs
- WinFormsComponentEditor.cs
- ViewgenGatekeeper.cs
- LabelInfo.cs
- CultureSpecificStringDictionary.cs
- QilVisitor.cs
- TextBox.cs
- VisemeEventArgs.cs
- XPathDocumentBuilder.cs
- WindowsBrush.cs
- DataRow.cs
- HttpWebRequestElement.cs
- Int64Storage.cs
- ValueConversionAttribute.cs
- GlobalizationSection.cs
- SyntaxCheck.cs
- BooleanSwitch.cs
- NetTcpBindingCollectionElement.cs
- Stylesheet.cs
- SHA384.cs
- CalendarButton.cs
- AssemblyAttributesGoHere.cs
- Stack.cs
- Claim.cs
- CombinedGeometry.cs
- PropertyTabChangedEvent.cs