Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / WF / RunTime / Scheduler.cs / 1305376 / Scheduler.cs
using System; using System.Globalization; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Transactions; using System.Workflow.ComponentModel; namespace System.Workflow.Runtime { #region Scheduler // Only one instance of this type is used for a workflow instance. // class Scheduler { #region data // state to be persisted for the scheduler internal static DependencyProperty HighPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("HighPriorityEntriesQueue", typeof(Queue), typeof(Scheduler)); internal static DependencyProperty NormalPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("NormalPriorityEntriesQueue", typeof(Queue ), typeof(Scheduler)); Queue highPriorityEntriesQueue; Queue normalPriorityEntriesQueue; // non-persisted state for the scheduler WorkflowExecutor rootWorkflowExecutor; bool empty; bool canRun; bool threadRequested; bool abortOrTerminateRequested; Queue transactedEntries; object syncObject = new object(); #endregion data #region ctors // loading with some state public Scheduler(WorkflowExecutor rootExec, bool canRun) { this.rootWorkflowExecutor = rootExec; this.threadRequested = false; // canRun is true if normal creation // false if loading from a persisted state. Will be set to true later at ResumeOnIdle this.canRun = canRun; this.highPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.HighPriorityEntriesQueueProperty); this.normalPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.NormalPriorityEntriesQueueProperty); if (this.highPriorityEntriesQueue == null) { this.highPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.HighPriorityEntriesQueueProperty, this.highPriorityEntriesQueue); } if (this.normalPriorityEntriesQueue == null) { this.normalPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.NormalPriorityEntriesQueueProperty, this.normalPriorityEntriesQueue); } this.empty = ((this.normalPriorityEntriesQueue.Count == 0) && (this.highPriorityEntriesQueue.Count == 0)); } #endregion ctors #region Misc properties public override string ToString() { return "Scheduler('" + ((Activity)this.RootWorkflowExecutor.WorkflowDefinition).QualifiedName + "')"; } protected WorkflowExecutor RootWorkflowExecutor { get { return this.rootWorkflowExecutor; } } public bool IsStalledNow { get { return empty; } } public bool CanRun { get { return canRun; } set { canRun = value; } } internal bool AbortOrTerminateRequested { get { return abortOrTerminateRequested; } set { abortOrTerminateRequested = value; } } #endregion Misc properties #region Run work public void Run() { do { this.RootWorkflowExecutor.ProcessQueuedEvents(); // Get item to run SchedulableItem item = GetItemToRun(); bool runningItem = false; // no ready work to run... go away if (item == null) break; Activity itemActivity = null; Exception exp = null; TransactionalProperties transactionalProperties = null; int contextId = item.ContextId; // This function gets the root or enclosing while-loop activity Activity contextActivity = this.RootWorkflowExecutor.GetContextActivityForId(contextId); if (contextActivity == null) throw new InvalidOperationException(ExecutionStringManager.InvalidExecutionContext); // This is the activity corresponding to the item's ActivityId itemActivity = contextActivity.GetActivityByName(item.ActivityId); using (new ServiceEnvironment(itemActivity)) { exp = null; bool ignoreFinallyBlock = false; try { // item preamble // set up the item transactional context if necessary // Debug.Assert(itemActivity != null, "null itemActivity"); if (itemActivity == null) throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.InvalidActivityName, item.ActivityId)); Activity atomicActivity = null; if (this.RootWorkflowExecutor.IsActivityInAtomicContext(itemActivity, out atomicActivity)) { transactionalProperties = (TransactionalProperties)atomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty); // If we've aborted for any reason stop now! // If we attempt to enter a new TransactionScope the com+ context will get corrupted // See windows se bug 137267 if (!WorkflowExecutor.CheckAndProcessTransactionAborted(transactionalProperties)) { if (transactionalProperties.TransactionScope == null) { // Use TimeSpan.Zero so scope will not create timeout independent of the transaction // Use EnterpriseServicesInteropOption.Full to flow transaction to COM+ transactionalProperties.TransactionScope = new TransactionScope(transactionalProperties.Transaction, TimeSpan.Zero, EnterpriseServicesInteropOption.Full); WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "Workflow Runtime: Scheduler: instanceId: " + this.RootWorkflowExecutor.InstanceIdString + "Entered into TransactionScope, Current atomic acitivity " + atomicActivity.Name); } } } // Run the item // runningItem = true; WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // running any entry implicitly changes some state of the workflow instance this.RootWorkflowExecutor.stateChangedSincePersistence = true; item.Run(this.RootWorkflowExecutor); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { ignoreFinallyBlock = true; throw; } else { if (transactionalProperties != null) transactionalProperties.TransactionState = TransactionProcessState.AbortProcessed; exp = e; } } finally { if (!ignoreFinallyBlock) { if (runningItem) WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Done with running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // Process exception // if (exp != null) { // this.RootWorkflowExecutor.ExceptionOccured(exp, itemActivity == null ? contextActivity : itemActivity, null); exp = null; } } } } } while (true); } private SchedulableItem GetItemToRun() { SchedulableItem ret = null; lock (this.syncObject) { bool workToDo = false; if ((this.highPriorityEntriesQueue.Count > 0) || (this.normalPriorityEntriesQueue.Count > 0)) { workToDo = true; // If an abort or termination of the workflow has been requested, // then the workflow should try to terminate ASAP. Even transaction scopes // in progress shouldn't be executed to completion. (Ref: 16534) if (this.AbortOrTerminateRequested) { ret = null; } // got work to do in the scheduler else if ((this.highPriorityEntriesQueue.Count > 0)) { ret = this.highPriorityEntriesQueue.Dequeue(); } else if (this.CanRun) { // the scheduler can run right now // // pick an entry to run // if (((IWorkflowCoreRuntime)this.RootWorkflowExecutor).CurrentAtomicActivity == null && (this.normalPriorityEntriesQueue.Count > 0)) ret = this.normalPriorityEntriesQueue.Dequeue(); } else { // scheduler can't run right now.. even though there is ready work // do nothing in the scheduler ret = null; } } if (!workToDo) { // no ready work to do in the scheduler... // we are gonna return the thread back this.empty = true; } // set it to true only iff there is something to run this.threadRequested = (ret != null); } return ret; } // This method should be called only after we have determined that // this instance can start running now public void Resume() { canRun = true; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } // This method should be called only after we have determined that // this instance can start running now public void ResumeIfRunnable() { if (!canRun) return; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } #endregion Run work #region Schedule work public void ScheduleItem(SchedulableItem s, bool isInAtomicTransaction, bool transacted) { lock (this.syncObject) { WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Scheduling entry: {1}", this.RootWorkflowExecutor.InstanceIdString, s.ToString()); // SchedulableItems in AtomicTransaction has higher priority Queue q = isInAtomicTransaction ? this.highPriorityEntriesQueue : this.normalPriorityEntriesQueue; q.Enqueue(s); if (transacted) { if (transactedEntries == null) transactedEntries = new Queue (); transactedEntries.Enqueue(s); } if (!this.threadRequested) { if (this.CanRun) { this.RootWorkflowExecutor.ScheduleForWork(); this.threadRequested = true; } } this.empty = false; } } #endregion Schedule work #region psuedo-transacted support public void PostPersist() { transactedEntries = null; } public void Rollback() { if (transactedEntries != null && transactedEntries.Count > 0) { // make a list of non-transacted entries // @undone: bmalhi: transacted entries only on priority-0 IEnumerator e = this.normalPriorityEntriesQueue.GetEnumerator(); Queue newScheduled = new Queue (); while (e.MoveNext()) { if (!transactedEntries.Contains(e.Current)) newScheduled.Enqueue(e.Current); } // clear the scheduled items this.normalPriorityEntriesQueue.Clear(); // schedule the non-transacted items back e = newScheduled.GetEnumerator(); while (e.MoveNext()) this.normalPriorityEntriesQueue.Enqueue(e.Current); transactedEntries = null; } } #endregion psuedo-transacted support } #endregion Scheduler } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved. using System; using System.Globalization; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Transactions; using System.Workflow.ComponentModel; namespace System.Workflow.Runtime { #region Scheduler // Only one instance of this type is used for a workflow instance. // class Scheduler { #region data // state to be persisted for the scheduler internal static DependencyProperty HighPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("HighPriorityEntriesQueue", typeof(Queue ), typeof(Scheduler)); internal static DependencyProperty NormalPriorityEntriesQueueProperty = DependencyProperty.RegisterAttached("NormalPriorityEntriesQueue", typeof(Queue ), typeof(Scheduler)); Queue highPriorityEntriesQueue; Queue normalPriorityEntriesQueue; // non-persisted state for the scheduler WorkflowExecutor rootWorkflowExecutor; bool empty; bool canRun; bool threadRequested; bool abortOrTerminateRequested; Queue transactedEntries; object syncObject = new object(); #endregion data #region ctors // loading with some state public Scheduler(WorkflowExecutor rootExec, bool canRun) { this.rootWorkflowExecutor = rootExec; this.threadRequested = false; // canRun is true if normal creation // false if loading from a persisted state. Will be set to true later at ResumeOnIdle this.canRun = canRun; this.highPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.HighPriorityEntriesQueueProperty); this.normalPriorityEntriesQueue = (Queue )rootExec.RootActivity.GetValue(Scheduler.NormalPriorityEntriesQueueProperty); if (this.highPriorityEntriesQueue == null) { this.highPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.HighPriorityEntriesQueueProperty, this.highPriorityEntriesQueue); } if (this.normalPriorityEntriesQueue == null) { this.normalPriorityEntriesQueue = new Queue (); rootExec.RootActivity.SetValue(Scheduler.NormalPriorityEntriesQueueProperty, this.normalPriorityEntriesQueue); } this.empty = ((this.normalPriorityEntriesQueue.Count == 0) && (this.highPriorityEntriesQueue.Count == 0)); } #endregion ctors #region Misc properties public override string ToString() { return "Scheduler('" + ((Activity)this.RootWorkflowExecutor.WorkflowDefinition).QualifiedName + "')"; } protected WorkflowExecutor RootWorkflowExecutor { get { return this.rootWorkflowExecutor; } } public bool IsStalledNow { get { return empty; } } public bool CanRun { get { return canRun; } set { canRun = value; } } internal bool AbortOrTerminateRequested { get { return abortOrTerminateRequested; } set { abortOrTerminateRequested = value; } } #endregion Misc properties #region Run work public void Run() { do { this.RootWorkflowExecutor.ProcessQueuedEvents(); // Get item to run SchedulableItem item = GetItemToRun(); bool runningItem = false; // no ready work to run... go away if (item == null) break; Activity itemActivity = null; Exception exp = null; TransactionalProperties transactionalProperties = null; int contextId = item.ContextId; // This function gets the root or enclosing while-loop activity Activity contextActivity = this.RootWorkflowExecutor.GetContextActivityForId(contextId); if (contextActivity == null) throw new InvalidOperationException(ExecutionStringManager.InvalidExecutionContext); // This is the activity corresponding to the item's ActivityId itemActivity = contextActivity.GetActivityByName(item.ActivityId); using (new ServiceEnvironment(itemActivity)) { exp = null; bool ignoreFinallyBlock = false; try { // item preamble // set up the item transactional context if necessary // Debug.Assert(itemActivity != null, "null itemActivity"); if (itemActivity == null) throw new InvalidOperationException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.InvalidActivityName, item.ActivityId)); Activity atomicActivity = null; if (this.RootWorkflowExecutor.IsActivityInAtomicContext(itemActivity, out atomicActivity)) { transactionalProperties = (TransactionalProperties)atomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty); // If we've aborted for any reason stop now! // If we attempt to enter a new TransactionScope the com+ context will get corrupted // See windows se bug 137267 if (!WorkflowExecutor.CheckAndProcessTransactionAborted(transactionalProperties)) { if (transactionalProperties.TransactionScope == null) { // Use TimeSpan.Zero so scope will not create timeout independent of the transaction // Use EnterpriseServicesInteropOption.Full to flow transaction to COM+ transactionalProperties.TransactionScope = new TransactionScope(transactionalProperties.Transaction, TimeSpan.Zero, EnterpriseServicesInteropOption.Full); WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 0, "Workflow Runtime: Scheduler: instanceId: " + this.RootWorkflowExecutor.InstanceIdString + "Entered into TransactionScope, Current atomic acitivity " + atomicActivity.Name); } } } // Run the item // runningItem = true; WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // running any entry implicitly changes some state of the workflow instance this.RootWorkflowExecutor.stateChangedSincePersistence = true; item.Run(this.RootWorkflowExecutor); } catch (Exception e) { if (WorkflowExecutor.IsIrrecoverableException(e)) { ignoreFinallyBlock = true; throw; } else { if (transactionalProperties != null) transactionalProperties.TransactionState = TransactionProcessState.AbortProcessed; exp = e; } } finally { if (!ignoreFinallyBlock) { if (runningItem) WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Done with running scheduled entry: {1}", this.RootWorkflowExecutor.InstanceIdString, item.ToString()); // Process exception // if (exp != null) { // this.RootWorkflowExecutor.ExceptionOccured(exp, itemActivity == null ? contextActivity : itemActivity, null); exp = null; } } } } } while (true); } private SchedulableItem GetItemToRun() { SchedulableItem ret = null; lock (this.syncObject) { bool workToDo = false; if ((this.highPriorityEntriesQueue.Count > 0) || (this.normalPriorityEntriesQueue.Count > 0)) { workToDo = true; // If an abort or termination of the workflow has been requested, // then the workflow should try to terminate ASAP. Even transaction scopes // in progress shouldn't be executed to completion. (Ref: 16534) if (this.AbortOrTerminateRequested) { ret = null; } // got work to do in the scheduler else if ((this.highPriorityEntriesQueue.Count > 0)) { ret = this.highPriorityEntriesQueue.Dequeue(); } else if (this.CanRun) { // the scheduler can run right now // // pick an entry to run // if (((IWorkflowCoreRuntime)this.RootWorkflowExecutor).CurrentAtomicActivity == null && (this.normalPriorityEntriesQueue.Count > 0)) ret = this.normalPriorityEntriesQueue.Dequeue(); } else { // scheduler can't run right now.. even though there is ready work // do nothing in the scheduler ret = null; } } if (!workToDo) { // no ready work to do in the scheduler... // we are gonna return the thread back this.empty = true; } // set it to true only iff there is something to run this.threadRequested = (ret != null); } return ret; } // This method should be called only after we have determined that // this instance can start running now public void Resume() { canRun = true; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } // This method should be called only after we have determined that // this instance can start running now public void ResumeIfRunnable() { if (!canRun) return; if (!empty) { // There is scheduled work // ask the threadprovider for a thread this.RootWorkflowExecutor.ScheduleForWork(); } } #endregion Run work #region Schedule work public void ScheduleItem(SchedulableItem s, bool isInAtomicTransaction, bool transacted) { lock (this.syncObject) { WorkflowTrace.Runtime.TraceEvent(TraceEventType.Information, 1, "Workflow Runtime: Scheduler: InstanceId: {0} : Scheduling entry: {1}", this.RootWorkflowExecutor.InstanceIdString, s.ToString()); // SchedulableItems in AtomicTransaction has higher priority Queue q = isInAtomicTransaction ? this.highPriorityEntriesQueue : this.normalPriorityEntriesQueue; q.Enqueue(s); if (transacted) { if (transactedEntries == null) transactedEntries = new Queue (); transactedEntries.Enqueue(s); } if (!this.threadRequested) { if (this.CanRun) { this.RootWorkflowExecutor.ScheduleForWork(); this.threadRequested = true; } } this.empty = false; } } #endregion Schedule work #region psuedo-transacted support public void PostPersist() { transactedEntries = null; } public void Rollback() { if (transactedEntries != null && transactedEntries.Count > 0) { // make a list of non-transacted entries // @undone: bmalhi: transacted entries only on priority-0 IEnumerator e = this.normalPriorityEntriesQueue.GetEnumerator(); Queue newScheduled = new Queue (); while (e.MoveNext()) { if (!transactedEntries.Contains(e.Current)) newScheduled.Enqueue(e.Current); } // clear the scheduled items this.normalPriorityEntriesQueue.Clear(); // schedule the non-transacted items back e = newScheduled.GetEnumerator(); while (e.MoveNext()) this.normalPriorityEntriesQueue.Enqueue(e.Current); transactedEntries = null; } } #endregion psuedo-transacted support } #endregion Scheduler } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- SqlExpressionNullability.cs
- BuildDependencySet.cs
- TabControlCancelEvent.cs
- securitycriticaldataformultiplegetandset.cs
- UnlockInstanceAsyncResult.cs
- DurableEnlistmentState.cs
- SQLDecimalStorage.cs
- TokenBasedSetEnumerator.cs
- DeadCharTextComposition.cs
- ItemsPanelTemplate.cs
- Trace.cs
- SocketPermission.cs
- RsaKeyIdentifierClause.cs
- StatusBarPanelClickEvent.cs
- RegexInterpreter.cs
- HtmlGenericControl.cs
- SystemDiagnosticsSection.cs
- DynamicMethod.cs
- EncryptedPackageFilter.cs
- CredentialCache.cs
- UserInitiatedRoutedEventPermission.cs
- Propagator.JoinPropagator.cs
- DropDownList.cs
- SqlEnums.cs
- MachineKeySection.cs
- CloudCollection.cs
- KeyNotFoundException.cs
- DragDeltaEventArgs.cs
- ImageInfo.cs
- SymmetricKey.cs
- SchemaCompiler.cs
- PageBreakRecord.cs
- Size3DConverter.cs
- AutomationElement.cs
- ExpressionLexer.cs
- HostSecurityManager.cs
- OdbcPermission.cs
- RowCache.cs
- AnnotationHighlightLayer.cs
- PixelFormat.cs
- ProxyElement.cs
- DetailsViewPagerRow.cs
- IDispatchConstantAttribute.cs
- GACMembershipCondition.cs
- SafeEventLogReadHandle.cs
- BaseTemplateBuildProvider.cs
- UpdatePanelTrigger.cs
- ConnectionString.cs
- SimpleHandlerBuildProvider.cs
- PowerModeChangedEventArgs.cs
- EmptyWorkItem.cs
- SoapHeaderException.cs
- RankException.cs
- CommonXSendMessage.cs
- DeviceFilterDictionary.cs
- SerializationException.cs
- SiteMapProvider.cs
- SymmetricSecurityBindingElement.cs
- DecoderNLS.cs
- SiteMembershipCondition.cs
- TreeViewDataItemAutomationPeer.cs
- DataReaderContainer.cs
- StringStorage.cs
- Page.cs
- ListItemCollection.cs
- XmlSchemaSequence.cs
- HttpListenerContext.cs
- TileBrush.cs
- IndentTextWriter.cs
- LineServicesRun.cs
- EntityDescriptor.cs
- SmtpFailedRecipientException.cs
- AmbiguousMatchException.cs
- ListBase.cs
- XsdCachingReader.cs
- SizeConverter.cs
- BaseServiceProvider.cs
- SchemaExporter.cs
- ToolTipService.cs
- Asn1IntegerConverter.cs
- CatalogZone.cs
- PropertyMapper.cs
- DiscardableAttribute.cs
- QualificationDataAttribute.cs
- CssClassPropertyAttribute.cs
- SpotLight.cs
- SelectedGridItemChangedEvent.cs
- PriorityBinding.cs
- PropertyDescriptor.cs
- BrowserCapabilitiesCodeGenerator.cs
- BooleanConverter.cs
- CodeAttributeArgument.cs
- GCHandleCookieTable.cs
- OutputCacheProfileCollection.cs
- ErrorFormatterPage.cs
- DateTimeOffsetStorage.cs
- InputScopeManager.cs
- ByValueEqualityComparer.cs
- SchemaHelper.cs
- ParameterCollection.cs