Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Hosting / DefaultWorkflowSchedulerService.cs / 1305376 / DefaultWorkflowSchedulerService.cs
using System; using System.Collections; using System.Collections.ObjectModel; using System.Collections.Generic; using System.Threading; using System.Collections.Specialized; using System.Diagnostics; using System.Workflow.Runtime; using System.Globalization; namespace System.Workflow.Runtime.Hosting { public class DefaultWorkflowSchedulerService : WorkflowSchedulerService { // next two fields controlled by locking the timerQueue private KeyedPriorityQueuetimerQueue = new KeyedPriorityQueue (); private Timer callbackTimer; private TimerCallback timerCallback; private const string MAX_SIMULTANEOUS_WORKFLOWS_KEY = "maxSimultaneousWorkflows"; private const int DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS = 5; private static TimeSpan infinite = new TimeSpan(Timeout.Infinite); private readonly int maxSimultaneousWorkflows; // Maximum number of work items allowed in ThreadPool queue private static TimeSpan fiveMinutes = new TimeSpan(0, 5, 0); // next three fields controlled by locking the waitingQueue private int numCurrentWorkers; private Queue waitingQueue; // Queue for extra items waiting to be allowed into thread pool private volatile bool running = false; private IList queueCounters; // expose internal queue length private static int DefaultThreadCount { get { return Environment.ProcessorCount == 1 ? DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS : (int)(DEFAULT_MAX_SIMULTANEOUS_WORKFLOWS * Environment.ProcessorCount * .8); } } public DefaultWorkflowSchedulerService() : this(DefaultThreadCount) { } public DefaultWorkflowSchedulerService(int maxSimultaneousWorkflows) : base() { if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); this.maxSimultaneousWorkflows = maxSimultaneousWorkflows; init(); } public DefaultWorkflowSchedulerService(NameValueCollection parameters) : base() { if (parameters == null) throw new ArgumentNullException("parameters"); maxSimultaneousWorkflows = DefaultThreadCount; foreach (string key in parameters.Keys) { if (key == null) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, "null")); string p = parameters[key]; if (!key.Equals(MAX_SIMULTANEOUS_WORKFLOWS_KEY, StringComparison.OrdinalIgnoreCase)) throw new ArgumentException(String.Format(Thread.CurrentThread.CurrentCulture, ExecutionStringManager.UnknownConfigurationParameter, key)); if (!int.TryParse(p, System.Globalization.NumberStyles.Integer, System.Globalization.CultureInfo.CurrentCulture, out maxSimultaneousWorkflows)) throw new FormatException(MAX_SIMULTANEOUS_WORKFLOWS_KEY); } if (maxSimultaneousWorkflows < 1) throw new ArgumentOutOfRangeException(MAX_SIMULTANEOUS_WORKFLOWS_KEY, maxSimultaneousWorkflows, String.Empty); init(); } private void init() { timerCallback = new TimerCallback(OnTimerCallback); timerQueue.FirstElementChanged += OnFirstElementChanged; waitingQueue = new Queue (); } public int MaxSimultaneousWorkflows { get { return maxSimultaneousWorkflows; } } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0}", workflowInstanceId); if (callback == null) throw new ArgumentNullException("callback"); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); // Add the work item to our internal queue and signal the ProcessQueue thread EnqueueWorkItem( new WorkItem(callback, workflowInstanceId) ); } internal protected override void Schedule(WaitCallback callback, Guid workflowInstanceId, DateTime whenUtc, Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Scheduling work for instance {0} on timer ID {1} in {2}", workflowInstanceId, timerId, (whenUtc - DateTime.UtcNow)); if (callback == null) throw new ArgumentNullException("callback"); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId")); if (workflowInstanceId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "workflowInstanceId")); CallbackInfo ci = new CallbackInfo(this, callback, workflowInstanceId, whenUtc); lock (timerQueue) { timerQueue.Enqueue(timerId, ci, whenUtc); } } internal protected override void Cancel(Guid timerId) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Cancelling work with timer ID {0}", timerId); if (timerId == Guid.Empty) throw new ArgumentException(String.Format(CultureInfo.CurrentUICulture, ExecutionStringManager.CantBeEmptyGuid, "timerId"), "timerId"); lock (timerQueue) { timerQueue.Remove(timerId); } } override protected void OnStarted() { lock (timerQueue) { base.OnStarted(); CallbackInfo ci = timerQueue.Peek(); if (ci != null) callbackTimer = CreateTimerCallback(ci); running = true; } lock (waitingQueue) { int nToStart = Math.Min(maxSimultaneousWorkflows, waitingQueue.Count); for (int i = 0; i < nToStart; i++) { if (ThreadPool.QueueUserWorkItem(QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters == null && this.Runtime.PerformanceCounterManager != null) { queueCounters = this.Runtime.PerformanceCounterManager.CreateCounters(ExecutionStringManager.PerformanceCounterWorkflowsWaitingName); } } protected internal override void Stop() { lock (timerQueue) { base.Stop(); if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } running = false; } lock (waitingQueue) { while (numCurrentWorkers > 0) { Monitor.Wait(waitingQueue); } } } private void OnFirstElementChanged(object source, KeyedPriorityQueueHeadChangedEventArgs e) { // timerQueue must have been locked by operation that caused this event to fire if (callbackTimer != null) { callbackTimer.Dispose(); callbackTimer = null; } if (e.NewFirstElement != null && this.State == WorkflowRuntimeServiceState.Started) { callbackTimer = CreateTimerCallback(e.NewFirstElement); } } private void OnTimerCallback(object ignored) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; CallbackInfo ci = null; bool fire = false; try { lock (timerQueue) { if (State == WorkflowRuntimeServiceState.Started) { ci = timerQueue.Peek(); if (ci != null) { if (ci.IsExpired) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Timeout occured for timer for instance {0}", ci.State); timerQueue.Dequeue(); fire = true; } else { callbackTimer = CreateTimerCallback(ci); } } } } if(fire && ci != null) ci.Callback(ci.State); } // Ignore cases where the workflow has been stolen out from under us catch (WorkflowOwnershipException) { } catch (ThreadAbortException e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); throw; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Timeout for instance, {0} threw exception {1}", ci == null ? null : ci.State, e.Message); RaiseServicesExceptionNotHandledEvent(e, (Guid)ci.State); } } private Timer CreateTimerCallback(CallbackInfo info) { DateTime now = DateTime.UtcNow; TimeSpan span = (info.When > now) ? info.When - now : TimeSpan.Zero; if (span > fiveMinutes) // never let more than five minutes go by without checking span = fiveMinutes; return new Timer(timerCallback, info.State, span, infinite); } private void EnqueueWorkItem( WorkItem workItem ) { lock (waitingQueue) { waitingQueue.Enqueue(workItem); if (running && numCurrentWorkers < maxSimultaneousWorkflows) { if (ThreadPool.QueueUserWorkItem(this.QueueWorkerProcess)) { numCurrentWorkers++; } } } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } } private void QueueWorkerProcess(object state /*unused*/) { //Make sure activity ID comes out of Threadpool are initialized to null. Trace.CorrelationManager.ActivityId = Guid.Empty; while (true) { WorkItem workItem; lock (waitingQueue) { if (waitingQueue.Count == 0 || !running) { numCurrentWorkers--; Monitor.Pulse(waitingQueue); return; } workItem = waitingQueue.Dequeue(); } if (queueCounters != null) { foreach (PerformanceCounter p in queueCounters) { p.RawValue = waitingQueue.Count; } } workItem.Invoke(this); } } internal class WorkItem { private WaitCallback callback; private object state; public WorkItem(WaitCallback callback, object state) { this.callback = callback; this.state = state; } public WaitCallback Callback { get { return callback; } } public void Invoke(WorkflowSchedulerService service) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "Running workflow {0}", state); Callback(state); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { throw; } else { service.RaiseExceptionNotHandledEvent(e, (Guid)state); } } } } internal class CallbackInfo { WaitCallback callback; object state; DateTime when; WorkflowSchedulerService service; public CallbackInfo(WorkflowSchedulerService service, WaitCallback callback, object state, DateTime when) { this.service = service; this.callback = callback; this.state = state; this.when = when; } public DateTime When { get { return when; } } public bool IsExpired { get { return DateTime.UtcNow >= when; } } public object State { get { return state; } } public WaitCallback Callback { get { return callback; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- FontNamesConverter.cs
- AgileSafeNativeMemoryHandle.cs
- DataGridViewCellValidatingEventArgs.cs
- BitmapEffectDrawingContextWalker.cs
- StyleTypedPropertyAttribute.cs
- ResourceReferenceKeyNotFoundException.cs
- Span.cs
- TableRowGroup.cs
- SmtpCommands.cs
- ExpandoClass.cs
- AssemblyNameProxy.cs
- PeerNeighborManager.cs
- DashStyles.cs
- AsnEncodedData.cs
- AssemblySettingAttributes.cs
- TextContainerChangedEventArgs.cs
- RepeaterItem.cs
- Compiler.cs
- EventItfInfo.cs
- _DisconnectOverlappedAsyncResult.cs
- DuplicateWaitObjectException.cs
- PersonalizationDictionary.cs
- EventRoute.cs
- PipelineComponent.cs
- XmlSerializerNamespaces.cs
- AssociationSetEnd.cs
- SessionState.cs
- EventlogProvider.cs
- BindingNavigator.cs
- XmlNamespaceManager.cs
- LineServicesRun.cs
- SkinBuilder.cs
- DbLambda.cs
- CodeStatement.cs
- ResourceManagerWrapper.cs
- NameTable.cs
- FtpWebRequest.cs
- ResourceReferenceExpressionConverter.cs
- AuthorizationRuleCollection.cs
- StructuralObject.cs
- Panel.cs
- AmbientProperties.cs
- ValidationError.cs
- XamlPoint3DCollectionSerializer.cs
- IMembershipProvider.cs
- ColumnResizeUndoUnit.cs
- WebPageTraceListener.cs
- DirtyTextRange.cs
- StatusBarAutomationPeer.cs
- WebPartCollection.cs
- DataTableExtensions.cs
- LightweightEntityWrapper.cs
- TextTreeFixupNode.cs
- DesignerVerbCollection.cs
- String.cs
- TextBox.cs
- ByteStreamMessageUtility.cs
- StoreAnnotationsMap.cs
- TimeManager.cs
- CodeTypeDeclarationCollection.cs
- NewArray.cs
- PlacementWorkspace.cs
- DiagnosticStrings.cs
- KnownBoxes.cs
- TypeRefElement.cs
- PrintPreviewControl.cs
- KnownTypes.cs
- Pair.cs
- basenumberconverter.cs
- LogSwitch.cs
- DefaultBindingPropertyAttribute.cs
- LinkButton.cs
- WebPermission.cs
- TextBreakpoint.cs
- XmlName.cs
- DataGridViewCellMouseEventArgs.cs
- WebPageTraceListener.cs
- LineProperties.cs
- DataMisalignedException.cs
- HostedElements.cs
- EntityContainer.cs
- regiisutil.cs
- DeferredElementTreeState.cs
- DataGridViewCellConverter.cs
- CounterCreationDataCollection.cs
- ApplicationServiceHelper.cs
- TraceInternal.cs
- WhitespaceRuleReader.cs
- XamlToRtfWriter.cs
- BaseInfoTable.cs
- DataGridViewCellFormattingEventArgs.cs
- InputBuffer.cs
- AudioLevelUpdatedEventArgs.cs
- HtmlForm.cs
- DataChangedEventManager.cs
- ResourcePool.cs
- XmlnsCompatibleWithAttribute.cs
- SqlParameterizer.cs
- OraclePermissionAttribute.cs
- DispatcherHookEventArgs.cs