Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Activities / System / ServiceModel / Activities / InternalSendMessage.cs / 1480445 / InternalSendMessage.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Activities { using System; using System.Activities; using System.Activities.Statements; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Runtime; using System.Runtime.Collections; using System.Runtime.Diagnostics; using System.Runtime.Serialization; using System.Security.Principal; using System.ServiceModel.Activities.Description; using System.ServiceModel.Activities.Dispatcher; using System.ServiceModel.Activities.Tracking; using System.ServiceModel.Channels; using System.ServiceModel.Description; using System.ServiceModel.Diagnostics; using System.Transactions; using System.Xml.Linq; // InternalSendMessage encapsulates both the server and client send. For the server // send it provides the ability to persist after correlations have been initialized // but before the send has actually been completed by the channel stack. This is not // supported by client send. class InternalSendMessage : NativeActivity { static string runtimeTransactionHandlePropertyName = typeof(RuntimeTransactionHandle).FullName; // Explicit correlation OM CollectioncorrelationInitializers; Collection replyCorrelationQueries; ICollection correlationQueries; MessageVersion messageVersion; ContractDescription cachedContract; ServiceEndpoint cachedServiceEndpoint; AddressHeaderCollection cachedEndpointHeaderCollection; FactoryCacheKey cachedFactoryCacheKey; bool isConfigSettingsSecure; bool configVerified; KeyValuePair , SendMessageChannelCache> lastUsedFactoryCacheItem; // this will be scheduled if ShouldPersistBeforeSend is set to true Activity persist; WaitOnChannelCorrelation channelCorrelationCompletionWaiter; Variable sendMessageInstance; Variable noPersistHandle; OpenChannelFactory openChannelFactory; OpenChannelAndSendMessage openChannelAndSendMessage; FaultCallback onSendFailure; public InternalSendMessage() { this.TokenImpersonationLevel = TokenImpersonationLevel.Identification; this.sendMessageInstance = new Variable (); this.channelCorrelationCompletionWaiter = new WaitOnChannelCorrelation { Instance = this.sendMessageInstance }; this.noPersistHandle = new Variable (); this.openChannelFactory = new OpenChannelFactory { Instance = this.sendMessageInstance }; this.openChannelAndSendMessage = new OpenChannelAndSendMessage { Instance = this.sendMessageInstance, InternalSendMessage = this, }; } public TokenImpersonationLevel TokenImpersonationLevel { get; set; } // Endpoint defines the service to talk to, and endpointAddress is used to set // the Uri at the runtime, such as the duplex scenario. public Endpoint Endpoint { get; set; } public string EndpointConfigurationName { get; set; } // This is needed for the callback case public InArgument EndpointAddress { get; set; } public InArgument CorrelatesWith { get; set; } public string OperationName { get; set; } public string Action { get; set; } // cache for internal implementation. This should be set by the Send // Should only be used in initating send. // Should use this instead of OperationContract.IsOneWay public bool IsOneWay { get; set; } protected override bool CanInduceIdle { get { return true; } } // this flag is for Send/SendReply to indicate if we are client-side send or receive-side sendreply // internal bool IsSendReply { get; set; } // Used for cleaning up the Message variable internal OutArgument MessageOut { get; set; } // should be used to decide whether persist before sending the message internal bool ShouldPersistBeforeSend { get; set; } public Collection CorrelationInitializers { get { if (this.correlationInitializers == null) { this.correlationInitializers = new Collection (); } return this.correlationInitializers; } } // This will be passed in from the parent Send activity public CorrelationQuery CorrelationQuery { get; set; } // This needs to be set by the ReceiveReply, we assume that this is unique internal ICollection ReplyCorrelationQueries { get { if (this.replyCorrelationQueries == null) { this.replyCorrelationQueries = new Collection (); } return this.replyCorrelationQueries; } } // on the serverside, the ContractName is set during ContractInference and is used for retrieving the // correct CorrelationQueryBehavior. ContractName on the Serverside can thus be different from what is // set on the OM public XName ServiceContractName { get; set; } public InArgument Message { get; set; } internal Send Parent { get; set; } // we cache the ServiceEndpoint for perf reasons so that we can retrieve endpointaddress, contract etc without // creating a new ServiceEndpoint each time // Note that we should not pass the cachedServiceEndpoint to the ChannelFactory, as we need to have a // distinct instance per-Factory. ServiceEndpoint GetCachedServiceEndpoint() { if (this.cachedServiceEndpoint == null) { this.cachedServiceEndpoint = CreateServiceEndpoint(); } return this.cachedServiceEndpoint; } AddressHeaderCollection GetCachedEndpointHeaders() { Fx.Assert(this.Endpoint != null, "Endpoint should not be null"); if (this.cachedEndpointHeaderCollection == null) { this.cachedEndpointHeaderCollection = new AddressHeaderCollection(this.Endpoint.Headers); } return this.cachedEndpointHeaderCollection; } void InitializeEndpoint(ref ServiceEndpoint serviceEndpoint, string configurationName) { ServiceEndpoint serviceEndpointFromConfig = null; if (configurationName != null) { // load the standard endpoint from the config serviceEndpointFromConfig = ConfigLoader.LookupEndpoint(configurationName, null, serviceEndpoint.Contract); } if (serviceEndpointFromConfig != null) { // standard endpoint case: it can completely override the endpoint serviceEndpoint = serviceEndpointFromConfig; } else { // normal endpoint case if (!serviceEndpoint.IsFullyConfigured) { new ConfigLoader().LoadChannelBehaviors(serviceEndpoint, configurationName); } } } // used to create ChannelFactoryReference instances. We don't cache the serviceEndpoint // directly, as we need to have a distinct instance per-Factory. So it's cached behind the // scenes as part of the ChannelFactoryReference ServiceEndpoint CreateServiceEndpoint() { ContractDescription contract = null; bool ensureTransactionFlow = false; if (this.cachedContract == null) { contract = this.GetContractDescription(); ensureTransactionFlow = true; } else { contract = this.cachedContract; } ServiceEndpoint result = new ServiceEndpoint(contract); if (this.Endpoint != null) { result.Binding = this.Endpoint.Binding; if (this.Endpoint.AddressUri != null) { result.Address = new EndpointAddress(this.Endpoint.AddressUri, this.Endpoint.Identity, this.GetCachedEndpointHeaders()); } } // Get ServiceEndpoint will be called only on the client side, hence if endpoint is null, we will try to load the config with // endpointConfigurationName. // endpointConfigurationName = null will be translated to endpointConfigurationName = String.Empty else { // we are loading the binding & the behaviors from config if (this.ServiceContractName != null) { result.Contract.ConfigurationName = this.ServiceContractName.LocalName; } InitializeEndpoint(ref result, this.EndpointConfigurationName ?? string.Empty); } // if the cachedContract is null, verify if TransactionFlow is accounted for in the contract // if cachedContract is not null, we can skip this since the contract should be fixed for the workflow definition if (ensureTransactionFlow) { EnsureTransactionFlowOnContract(ref result); this.cachedContract = result.Contract; } EnsureCorrelationQueryBehavior(result); return result; } void EnsureCorrelationQueryBehavior(ServiceEndpoint serviceEndpoint) { CorrelationQueryBehavior correlationQueryBehavior = serviceEndpoint.Behaviors.Find (); if (correlationQueryBehavior == null) { // Add CorrelationQueryBehavior if either Binding has queries or if either Send or ReceiveReplies // have correlation query associated with them if (CorrelationQueryBehavior.BindingHasDefaultQueries(serviceEndpoint.Binding) || this.CorrelationQuery != null || this.ReplyCorrelationQueries.Count > 0) { correlationQueryBehavior = new CorrelationQueryBehavior(new Collection ()); serviceEndpoint.Behaviors.Add(correlationQueryBehavior); } } if (correlationQueryBehavior != null) { // add CorrelationQuery from Send if (this.CorrelationQuery != null && !correlationQueryBehavior.CorrelationQueries.Contains(this.CorrelationQuery)) { correlationQueryBehavior.CorrelationQueries.Add(this.CorrelationQuery); } //add ReplyCorrelationQueries from ReceiveReply (there could be multiple ReceiveReplies for a Send and hence the collection foreach (CorrelationQuery query in this.ReplyCorrelationQueries) { // Filter out duplicate CorrelationQueries in the collection. // Currently, we only do reference comparison and Where message filter comparison. if (!correlationQueryBehavior.CorrelationQueries.Contains(query)) { correlationQueryBehavior.CorrelationQueries.Add(query); } else { if (TD.DuplicateCorrelationQueryIsEnabled()) { TD.DuplicateCorrelationQuery(query.Where.ToString()); } } } this.correlationQueries = correlationQueryBehavior.CorrelationQueries; } } static void EnsureCorrelationBehaviorScopeName(ActivityContext context, CorrelationQueryBehavior correlationBehavior) { Fx.Assert(correlationBehavior != null, "caller must verify"); if (correlationBehavior.ScopeName == null) { CorrelationExtension extension = context.GetExtension (); if (extension != null) { correlationBehavior.ScopeName = extension.ScopeName; } } } void EnsureTransactionFlowOnContract(ref ServiceEndpoint serviceEndpoint) { if (!this.IsOneWay) { BindingElementCollection elementCollection = serviceEndpoint.Binding.CreateBindingElements(); TransactionFlowBindingElement bindingElement = elementCollection.Find (); bool flowTransaction = ((bindingElement != null) && (bindingElement.Transactions)); if (flowTransaction) { ContractInferenceHelper.EnsureTransactionFlowOnContract(ref serviceEndpoint, this.ServiceContractName, this.OperationName, this.Action, this.Parent.ProtectionLevel); } } } internal MessageVersion GetMessageVersion() { if (this.messageVersion == null) { ServiceEndpoint endpoint = this.GetCachedServiceEndpoint(); this.messageVersion = (endpoint != null && endpoint.Binding != null) ? endpoint.Binding.MessageVersion : null; } return this.messageVersion; } ContractDescription GetContractDescription() { ContractDescription cd; // When channel cache is disabled or when operation uses message contract, // we use the fully inferred description; otherwise, we use a fixed description to increase channel cache hits if (!this.Parent.ChannelCacheEnabled || this.Parent.OperationUsesMessageContract) { // If this is one-way send untyped message, this.OperationDescription would still be null if (this.Parent.OperationDescription == null) { Fx.Assert(this.IsOneWay, "We can only reach here when we are one-way send Message!"); this.Parent.OperationDescription = ContractInferenceHelper.CreateOneWayOperationDescription(this.Parent); } cd = ContractInferenceHelper.CreateContractFromOperation(this.ServiceContractName, this.Parent.OperationDescription); } else { // Create ContractDescription using Fixed MessageIn/MessageOut contract // If IOutputChannel, we create a Contract with name IOutputChannel and OperationDescription "Send" // else, Contract name is IRequestChannel with OperationDescription "Request" if (this.IsOneWay) { cd = ContractInferenceHelper.CreateOutputChannelContractDescription(this.ServiceContractName, this.Parent.ProtectionLevel); } else { cd = ContractInferenceHelper.CreateRequestChannelContractDescription(this.ServiceContractName, this.Parent.ProtectionLevel); } } if (this.ServiceContractName != null) { cd.ConfigurationName = this.ServiceContractName.LocalName; } return cd; } EndpointAddress CreateEndpointAddress(NativeActivityContext context) { ServiceEndpoint endpoint = this.GetCachedServiceEndpoint(); Uri endpointAddressUri = (this.EndpointAddress != null) ? this.EndpointAddress.Get(context) : null; if (endpoint != null && endpoint.Address != null) { return endpointAddressUri == null ? endpoint.Address : new EndpointAddressBuilder(endpoint.Address) { Uri = endpointAddressUri }.ToEndpointAddress(); } else if (this.Endpoint != null) { return endpointAddressUri == null ? this.Endpoint.GetAddress() : new EndpointAddress(endpointAddressUri, this.Endpoint.Identity, this.GetCachedEndpointHeaders()); } else { return null; } } EndpointAddress CreateEndpointAddressFromCallback(EndpointAddress CallbackAddress) { Fx.Assert(CallbackAddress != null, "CallbackAddress cannot be null"); EndpointIdentity endpointIdentity = null; AddressHeaderCollection headers = null; EndpointAddress endpointAddress; if (this.Endpoint != null) { // we honor Identity and Headers on the Endpoint OM even when the AddressUri is null endpointIdentity = this.Endpoint.Identity; headers = this.GetCachedEndpointHeaders(); } else { // this could be from config ServiceEndpoint endpoint = this.GetCachedServiceEndpoint(); Fx.Assert(endpoint != null, " endpoint cannot be null"); if (endpoint.Address != null) { endpointIdentity = endpoint.Address.Identity; headers = endpoint.Address.Headers; } } if (endpointIdentity != null || headers != null) { Uri callbackUri = CallbackAddress.Uri; endpointAddress = new EndpointAddress(callbackUri, endpointIdentity, headers); } else { endpointAddress = CallbackAddress; } return endpointAddress; } bool IsEndpointSettingsSafeForCache() { if (!this.configVerified) { // let's set isConfigSettingsSecure flag to false if we use endpointConfiguration, // this is used to decide if we cache factory or not this.isConfigSettingsSecure = this.Endpoint != null ? true : false; this.configVerified = true; } return this.isConfigSettingsSecure; } protected override void CacheMetadata(NativeActivityMetadata metadata) { if (ShouldPersistBeforeSend) { if (this.persist == null) { this.persist = new Persist(); } metadata.AddImplementationChild(this.persist); } RuntimeArgument endpointAddressArgument = new RuntimeArgument(Constants.EndpointAddress, Constants.UriType, ArgumentDirection.In); metadata.Bind(this.EndpointAddress, endpointAddressArgument); metadata.AddArgument(endpointAddressArgument); RuntimeArgument correlatesWithArgument = new RuntimeArgument(Constants.CorrelatesWith, Constants.CorrelationHandleType, ArgumentDirection.In); metadata.Bind(this.CorrelatesWith, correlatesWithArgument); metadata.AddArgument(correlatesWithArgument); if (this.correlationInitializers != null) { int count = 0; foreach (CorrelationInitializer correlation in this.correlationInitializers) { if (correlation.CorrelationHandle != null) { RuntimeArgument argument = new RuntimeArgument(Constants.Parameter + count, correlation.CorrelationHandle.ArgumentType, correlation.CorrelationHandle.Direction, true); metadata.Bind(correlation.CorrelationHandle, argument); metadata.AddArgument(argument); count++; } } } RuntimeArgument requestMessageArgument = new RuntimeArgument(Constants.RequestMessage, Constants.MessageType, ArgumentDirection.In); metadata.Bind(this.Message, requestMessageArgument); metadata.AddArgument(requestMessageArgument); if (this.MessageOut != null) { RuntimeArgument requestMessageReference = new RuntimeArgument("MessageReference", Constants.MessageType, ArgumentDirection.Out); metadata.Bind(this.MessageOut, requestMessageReference); metadata.AddArgument(requestMessageReference); } metadata.AddImplementationVariable(this.sendMessageInstance); metadata.AddImplementationVariable(this.noPersistHandle); metadata.AddImplementationChild(this.channelCorrelationCompletionWaiter); metadata.AddImplementationChild(this.openChannelFactory); metadata.AddImplementationChild(this.openChannelAndSendMessage); metadata.AddDefaultExtensionProvider(SendMessageChannelCache.DefaultExtensionProvider); } protected override void Cancel(NativeActivityContext context) { // Do nothing. InternalSendMessage cannot be canceled since // the individual parts of the process cannot be canceled. } protected override void Abort(NativeActivityAbortContext context) { VolatileSendMessageInstance volatileInstance = this.sendMessageInstance.Get(context); if (volatileInstance != null) { CleanupResources(volatileInstance.Instance); } } void CleanupResources(SendMessageInstance instance) { if (instance != null) { instance.Dispose(); } } protected override void Execute(NativeActivityContext context) { // // The entire InternalSendMessage runs in a no persist zone NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context); noPersistHandle.Enter(context); // Set up the SendMessageInstance, which will // setup an AsyncOperationBlock under the hood and thus block persistence // until the message has been sent and we return to the workflow thread SendMessageInstance instance = new SendMessageInstance(this, context); SetSendMessageInstance(context, instance); if (instance.RequestContext != null) { ExecuteClientRequest(context, instance); } else { ExecuteServerResponse(context, instance); } } void SetSendMessageInstance(NativeActivityContext context, SendMessageInstance instance) { VolatileSendMessageInstance volatileInstance = new VolatileSendMessageInstance { Instance = instance }; this.sendMessageInstance.Set(context, volatileInstance); } SendMessageInstance GetSendMessageInstance(ActivityContext context) { VolatileSendMessageInstance volatileInstance = this.sendMessageInstance.Get(context); Fx.Assert(volatileInstance != null, "This should never be null."); return volatileInstance.Instance; } // Used for server-side send (replies). We don't have any async code here since the // Dispatcher handles any completions void ExecuteServerResponse(NativeActivityContext context, SendMessageInstance instance) { Fx.Assert(instance.ResponseContext != null, "only valid for responses"); Fx.Assert(instance.ResponseContext.WorkflowOperationContext != null, "The WorkflowOperationContext is required on the CorrelationResponseContext"); instance.OperationContext = instance.ResponseContext.WorkflowOperationContext.OperationContext; // now that we have our op-context, invoke the callback that user might have added in the AEC in the previous activity // e.g. distributed compensation activity will add this so that they can convert an execution property // to an message properties, as will Transaction Flow instance.ProcessMessagePropertyCallbacks(); ProcessSendMessageTrace(context, instance, false); // retrieve the correct CorrelationQueryBehavior from the ChannelExtensions collection CorrelationQueryBehavior correlationBehavior = null; Collection correlationQueryBehaviors = instance.OperationContext.Channel.Extensions.FindAll (); foreach (CorrelationQueryBehavior cqb in correlationQueryBehaviors) { if (cqb.ServiceContractName == this.ServiceContractName) { correlationBehavior = cqb; break; } } //set the reply instance.RequestOrReply = this.Message.Get(context); if (correlationBehavior != null) { EnsureCorrelationBehaviorScopeName(context, correlationBehavior); instance.RegisterCorrelationBehavior(correlationBehavior); if (instance.CorrelationKeyCalculator != null) { if (correlationBehavior.SendNames != null && correlationBehavior.SendNames.Count > 0) { if (correlationBehavior.SendNames.Count == 1 && (correlationBehavior.SendNames.Contains(ContextExchangeCorrelationHelper.CorrelationName))) { // Contextchannel is the only channel participating in correlation // Since we already have the instance id, we don't have to wait for the context channel to call us back to initialize // the correlation - InstanceId can be retrieved directly from ContextMessageProperty through Operation context. ContextMessageProperty contextProperties = null; if (ContextMessageProperty.TryGet(instance.OperationContext.OutgoingMessageProperties, out contextProperties)) { // CorrelationDataMessageProperty.AddData(instance.RequestOrReply, ContextExchangeCorrelationHelper.CorrelationName, () => ContextExchangeCorrelationHelper.GetContextCorrelationData(instance.OperationContext)); } // Initialize correlations right away without waiting for the context channel to call us back InitializeCorrelations(context, instance); } else { // Initialize correlations through channel callback instance.OperationContext.OutgoingMessageProperties.Add(CorrelationCallbackMessageProperty.Name, new MessageCorrelationCallbackMessageProperty(correlationBehavior.SendNames ?? new string[0], instance)); instance.CorrelationSynchronizer = new CorrelationSynchronizer(); } } else { // there are no channel based queries, we can initialize correlations right away InitializeCorrelations(context, instance); } } } // For exception case: Always call WorkflowOperationContext.SendFault to either send back the fault in the request/reply case // or make sure the error handler extension gets a chance to handle this fault; if (instance.ResponseContext.Exception != null) { try { instance.ResponseContext.WorkflowOperationContext.SendFault(instance.ResponseContext.Exception); } catch (Exception e) { if (Fx.IsFatal(e)) { throw; } instance.ResponseContext.Exception = e; } } else { try { instance.ResponseContext.WorkflowOperationContext.SendReply(instance.RequestOrReply); } catch (Exception e) { if (Fx.IsFatal(e)) { throw; } instance.ResponseContext.Exception = e; } } if (TraceUtility.ActivityTracing) { if (instance.AmbientActivityId != Trace.CorrelationManager.ActivityId) { if (TD.StopSignpostEventIsEnabled()) { TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary (3) { { MessagingActivityHelper.ActivityName, this.DisplayName }, { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution }, { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId } })); } FxTrace.Trace.SetAndTraceTransfer(instance.AmbientActivityId, true); instance.AmbientActivityId = Guid.Empty; } } if (instance.CorrelationSynchronizer == null) { // We aren't doing any correlation work so we just // finalize the send. context.SetValue(this.Message, null); context.SetValue(this.MessageOut, null); if (ShouldPersistBeforeSend) { // Need to allow persistence. NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context); noPersistHandle.Exit(context); // context.ScheduleActivity(this.persist, new CompletionCallback(OnPersistCompleted)); } else { FinalizeSendMessageCore(instance); } } else { // We're doing correlation. Either the work is already // done or we need to synchronize with the channel stack. if (instance.CorrelationSynchronizer.IsChannelWorkComplete) { // No need to schedule our completion waiter OnChannelCorrelationCompleteCore(context, instance); } else { context.ScheduleActivity(this.channelCorrelationCompletionWaiter, OnChannelCorrelationComplete, null); } // We notify that we're done with the send. If the // correlation processing has already completed then // we'll finalize the send. if (instance.CorrelationSynchronizer.NotifySendComplete()) { FinalizeSendMessageCore(instance); } } } void ProcessSendMessageTrace(NativeActivityContext context, SendMessageInstance instance, bool isClient) { if (TraceUtility.MessageFlowTracing) { try { if (TraceUtility.ActivityTracing) { instance.AmbientActivityId = Trace.CorrelationManager.ActivityId; } if (isClient) { //We need to emit a transfer from WF instance ID to the id set in the TLS instance.E2EActivityId = Trace.CorrelationManager.ActivityId; if (instance.E2EActivityId == Guid.Empty) { instance.E2EActivityId = Guid.NewGuid(); } if (context.WorkflowInstanceId != instance.E2EActivityId) { DiagnosticTrace.ActivityId = context.WorkflowInstanceId; FxTrace.Trace.SetAndTraceTransfer(instance.E2EActivityId, true); } } else { DiagnosticTrace.ActivityId = context.WorkflowInstanceId; //transfer is taken care by WorkflowOperationContext instance.E2EActivityId = instance.ResponseContext.WorkflowOperationContext.E2EActivityId; } context.Track( new SendMessageRecord(MessagingActivityHelper.MessageCorrelationSendRecord) { E2EActivityId = instance.E2EActivityId }); if (TraceUtility.ActivityTracing) { if (TD.StartSignpostEventIsEnabled()) { TD.StartSignpostEvent(new DictionaryTraceRecord(new Dictionary (3) { { MessagingActivityHelper.ActivityName, this.DisplayName }, { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution }, { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId } })); } } } catch (Exception ex) { if (Fx.IsFatal(ex)) { throw; } FxTrace.Exception.AsInformation(ex); } } } void OnChannelCorrelationComplete(NativeActivityContext context, ActivityInstance completedInstance) { SendMessageInstance instance = GetSendMessageInstance(context); Fx.Assert(instance != null, "The instance cannot be null here."); OnChannelCorrelationCompleteCore(context, instance); } void OnChannelCorrelationCompleteCore(NativeActivityContext context, SendMessageInstance instance) { Message message = InitializeCorrelations(context, instance); instance.CorrelationSynchronizer.NotifyMessageUpdatedByWorkflow(message); context.SetValue(this.Message, null); context.SetValue(this.MessageOut, null); if (this.ShouldPersistBeforeSend && instance.RequestContext == null) { // Need to allow persistence. NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context); noPersistHandle.Exit(context); // context.ScheduleActivity(this.persist, new CompletionCallback(OnPersistCompleted)); } else { // Create a bookmark to complete the callback, this is to ensure that the InstanceKey does get saved in the PPD // by the time the bookmark is resumed. The instancekey is not getting saved in the PPD till workflow gets to // the next idle state. // Bookmark completeCorrelationBookmark = context.CreateBookmark(CompleteCorrelationCallback, BookmarkOptions.NonBlocking); context.ResumeBookmark(completeCorrelationBookmark, null); } } void CompleteCorrelationCallback(NativeActivityContext context, Bookmark bookmark, object value) { SendMessageInstance instance = GetSendMessageInstance(context); Fx.Assert(instance != null, "The instance cannot be null here."); if (instance.CorrelationSynchronizer.NotifyWorkflowCorrelationProcessingComplete()) { // The send complete notification has already occurred // so it is up to us to finalize the send. FinalizeSendMessageCore(instance); } } void OnPersistCompleted(NativeActivityContext context, ActivityInstance completedInstance) { // We can reenter no persist now NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context); noPersistHandle.Enter(context); // We might get back a null here because we've allowed persistence. // If that is the case we'll just ignore it ... we don't have any more // meaningful work to do. SendMessageInstance instance = GetSendMessageInstance(context); if (instance != null) { // Do it with or without correlation if (instance.CorrelationSynchronizer == null || instance.CorrelationSynchronizer.NotifyWorkflowCorrelationProcessingComplete()) { // The send complete notification has already occurred // so it is up to us to finalize the send. FinalizeSendMessageCore(instance); } } } void ExecuteClientRequest(NativeActivityContext context, SendMessageInstance instance) { // This is the client send request: we need to figure out the channel and request message first // Get the Extension for the ChannelSettings instance.CacheExtension = context.GetExtension (); Fx.Assert(instance.CacheExtension != null, "channelCacheExtension must exist."); // Send.ChannelCacheEnabled must be set before we call CreateEndpointAddress // because CreateEndpointAddress will cache description and description resolution depends on the value of ChannelCacheEnabled this.Parent.InitializeChannelCacheEnabledSetting(instance.CacheExtension); // if there is a correlatesWith handle with callbackcontext(Durable Duplex case), use the callback address and context from // there. The handle could be an explicit 'CorrelatesWith' handle or an ambient handle. if (instance.CorrelatesWith != null) { if (instance.CorrelatesWith.CallbackContext != null) { instance.CorrelationCallbackContext = instance.CorrelatesWith.CallbackContext; // construct EndpointAdress based on the ListenAddress from callback and the identity and headers from Endpoint or from Config instance.EndpointAddress = CreateEndpointAddressFromCallback(instance.CorrelationCallbackContext.ListenAddress.ToEndpointAddress()); } if (instance.CorrelatesWith.Context != null) { instance.CorrelationContext = instance.CorrelatesWith.Context; } } // Request is always of Type Message. Message Argument will be set by Send using the appropriate formatter instance.RequestOrReply = this.Message.Get(context); if (instance.EndpointAddress == null) { //try to get it from endpoint or config instance.EndpointAddress = CreateEndpointAddress(context); } if (instance.EndpointAddress == null) { throw FxTrace.Exception.AsError(new ValidationException(SR.EndpointAddressNotSetInEndpoint(this.OperationName))); } // Configname to be used for the FactoryCacheKey, // if endpoint is defined, we use the settings from endpoint and ignore the endpointConfigurationName // if endpoint is not defined we use the endpointConfigurationName string configName = (this.Endpoint != null) ? null : this.EndpointConfigurationName; ProcessSendMessageTrace(context, instance, true); // Get ChannelFactory from the cache ObjectCache channelFactoryCache = null; ObjectCacheItem cacheItem = null; ChannelCacheSettings channelCacheSettings; // retrieve the FactoryCacheKey and cache it so that we could use it later if (this.cachedFactoryCacheKey == null) { ServiceEndpoint targetEndpoint = this.GetCachedServiceEndpoint(); this.cachedFactoryCacheKey = new FactoryCacheKey(this.Endpoint, configName, this.IsOneWay, this.TokenImpersonationLevel, targetEndpoint.Contract, this.correlationQueries); } // let's decide if we can share the cache from the extension // cache can be share if AllowUnsafeSharing is true or it is safe to share if (instance.CacheExtension.AllowUnsafeCaching || this.IsEndpointSettingsSafeForCache()) { channelFactoryCache = instance.CacheExtension.GetFactoryCache(); Fx.Assert(channelFactoryCache != null, "factory cache should be initialized either from the extension or from the globalcache"); channelCacheSettings = instance.CacheExtension.ChannelSettings; // Get a ChannelFactoryReference (either cached or brand new) KeyValuePair , SendMessageChannelCache> localLastUsedCacheItem = this.lastUsedFactoryCacheItem; if (object.ReferenceEquals(localLastUsedCacheItem.Value, instance.CacheExtension)) { if (localLastUsedCacheItem.Key != null && localLastUsedCacheItem.Key.TryAddReference()) { cacheItem = localLastUsedCacheItem.Key; } else { // the item is invalid this.lastUsedFactoryCacheItem = new KeyValuePair , SendMessageChannelCache>(null, null); } } if (cacheItem == null) { // try retrieving the factoryreference directly from the factory cache cacheItem = channelFactoryCache.Take(this.cachedFactoryCacheKey); } } else { // not safe to share cache, do not cache anything channelCacheSettings = ChannelCacheSettings.EmptyCacheSettings; } ChannelFactoryReference newFactoryReference = null; if (cacheItem == null) { // nothing in our cache, we'll have to setup a new factory reference, which ClientSendAsyncResult will open asynchronously ServiceEndpoint targetEndpoint = this.CreateServiceEndpoint(); // create a new ChannelFactoryReference that holds the channelfactory and a cache for its channels, // cache settings are based on the channelcachesettings provided through the extension newFactoryReference = new ChannelFactoryReference(this.cachedFactoryCacheKey, targetEndpoint, channelCacheSettings); } instance.SetupFactoryReference(cacheItem, newFactoryReference, channelFactoryCache); if (this.onSendFailure == null) { this.onSendFailure = new FaultCallback(OnSendFailure); } if (instance.FactoryReference.NeedsOpen) { context.ScheduleActivity(this.openChannelFactory, OnChannelFactoryOpened, this.onSendFailure); } else { OnChannelFactoryOpenedCore(context, instance); } } void OnSendFailure(NativeActivityFaultContext context, Exception propagatedException, ActivityInstance propagatedFrom) { // We throw the exception because we want this activity to abort // as well. The abort path will take care of performing resource // clean-up (see Abort(NativeActivityAbortContext)). throw FxTrace.Exception.AsError(propagatedException); } void OnChannelFactoryOpened(NativeActivityContext context, ActivityInstance completedInstance) { SendMessageInstance instance = GetSendMessageInstance(context); Fx.Assert(instance != null, "Must have a SendMessageInstance here."); OnChannelFactoryOpenedCore(context, instance); } void OnChannelFactoryOpenedCore(NativeActivityContext context, SendMessageInstance instance) { // now that we know the factory is open, setup our client channel and pool reference instance.PopulateClientChannel(); IContextChannel contextChannel = instance.ClientSendChannel as IContextChannel; instance.OperationContext = (contextChannel == null) ? null : new OperationContext(contextChannel); // Retrieve the CorrelationQueryBehavior from the serviceEndpoint that we used for ChannelFactoryCreation // we later look for CorrelationQueryBehavior.SendNames which actually gets initialized during ChannelFactory creation // CorrelationQueryBehavior correlationQueryBehavior = instance.FactoryReference.CorrelationQueryBehavior; if (correlationQueryBehavior != null) { EnsureCorrelationBehaviorScopeName(context, correlationQueryBehavior); instance.RegisterCorrelationBehavior(correlationQueryBehavior); } // now that we have our op-context, invoke the callback that user might have added in the AEC in the previous activity // e.g. distributed compensation activity will add this so that they can convert an execution property // to an message properties, as will Transaction Flow instance.ProcessMessagePropertyCallbacks(); // Add the ContextMessage Property if either CallBackContextMessageProperty or ContextMessageProperty is set // if both are set validate that the context is the same in both of them ContextMessageProperty contextMessageProperty = null; if (instance.CorrelationCallbackContext != null && instance.CorrelationContext != null) { // validate if the context is equivalent if (MessagingActivityHelper.CompareContextEquality(instance.CorrelationCallbackContext.Context, instance.CorrelationContext.Context)) { contextMessageProperty = new ContextMessageProperty(instance.CorrelationCallbackContext.Context); } else { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ContextMismatchInContextAndCallBackContext)); } } else if (instance.CorrelationCallbackContext != null) { contextMessageProperty = new ContextMessageProperty(instance.CorrelationCallbackContext.Context); } else if (instance.CorrelationContext != null) { contextMessageProperty = new ContextMessageProperty(instance.CorrelationContext.Context); } if (contextMessageProperty != null) { contextMessageProperty.AddOrReplaceInMessage(instance.RequestOrReply); } // Add callback context Message property with instance id. // If binding contains ContextBindingElement with listenaddress set, the callback context message property will flow on the wire // Pull the instanceId from the CorrelationHandle, if it is already initialized, else create a new GUID. // we want to send the callback context only for the first message and when there is a FollowingContextCorrelation defined ( i.e., we are expecting a // receive message back) or when there is an ambienthandle and the handle is not initalized. We will never use CorrelatesWith handle to initialize // FollowingContext, since CorrelatesWith on the client side should always be used for a following correlation String contextValue; CorrelationHandle followingContextHandle = instance.ContextBasedCorrelationHandle != null ? instance.ContextBasedCorrelationHandle : instance.AmbientHandle; if (followingContextHandle != null && (followingContextHandle.Scope == null || followingContextHandle.Scope.IsInitialized == false)) { // we are creating a new GUID for the context. As a practice,we don't want to send the WorkflowInstanceId over the wire contextValue = Guid.NewGuid().ToString(); Dictionary contextValues = new Dictionary (1) { { ContextMessageProperty.InstanceIdKey, contextValue } }; new CallbackContextMessageProperty(contextValues).AddOrReplaceInMessage(instance.RequestOrReply); } // verify if we can complete Correlation intialization now if (instance.CorrelationSendNames != null) { // we're going to initialize request correlations later instance.RequestOrReply.Properties.Add(CorrelationCallbackMessageProperty.Name, new MessageCorrelationCallbackMessageProperty(instance.CorrelationSendNames, instance)); instance.CorrelationSynchronizer = new CorrelationSynchronizer(); } else { InitializeCorrelations(context, instance); } if (instance.CorrelationSynchronizer != null) { context.ScheduleActivity(this.channelCorrelationCompletionWaiter, OnChannelCorrelationComplete, this.onSendFailure); } context.ScheduleActivity(this.openChannelAndSendMessage, OnClientSendComplete, this.onSendFailure); } void OnClientSendComplete(NativeActivityContext context, ActivityInstance completedInstance) { SendMessageInstance instance = GetSendMessageInstance(context); if (instance.CorrelationSynchronizer == null || instance.CorrelationSynchronizer.NotifySendComplete()) { // Either there was no correlation or the send completed // after the correlation processing so we need to do the // finalize FinalizeSendMessageCore(instance); } } Message InitializeCorrelations(NativeActivityContext context, SendMessageInstance instance) { if (instance.CorrelationKeyCalculator != null) { // first setup the key-based correlations, pass in the Correlation Initialiers and the AmbientHandle // for associating the keys. // For content based correlation, we will never initalize correlation with a selectHandle.It has to be either specified in a CorrelationInitalizer // or should be an ambient handle // For contextbased correlation, selecthandle will be callbackHandle in case of Send and contextHandle in case of sendReply instance.RequestOrReply = MessagingActivityHelper.InitializeCorrelationHandles(context, instance.ContextBasedCorrelationHandle, instance.AmbientHandle, this.correlationInitializers, instance.CorrelationKeyCalculator, instance.RequestOrReply); } // then setup any channel based correlations as necessary // if (instance.RequestContext != null) { // first check for an explicit association CorrelationHandle channelCorrelationHandle = instance.GetExplicitChannelCorrelationHandle(context, this.correlationInitializers); if (channelCorrelationHandle != null) { if (!channelCorrelationHandle.TryRegisterRequestContext(context, instance.RequestContext)) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryRegisterRequestContextFailed)); } } else // if that fails, use the ambient handle. We do not use the CorrelatesWith handle for RequestReply correlation { if (!this.IsOneWay) { // we have already validated this in SendMessageInstanceConstructor, just assert here Fx.Assert(instance.AmbientHandle != null, "For two way send we need to have either a RequestReply correlation handle or an ambient handle"); if (!instance.AmbientHandle.TryRegisterRequestContext(context, instance.RequestContext)) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryRegisterRequestContextFailed)); } } } } return instance.RequestOrReply; } void FinalizeSendMessageCore(SendMessageInstance instance) { Exception completionException = instance.GetCompletionException(); if (completionException != null) { throw FxTrace.Exception.AsError(completionException); } } class OpenChannelFactory : AsyncCodeActivity { public OpenChannelFactory() { } public InArgument Instance { get; set; } protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state) { VolatileSendMessageInstance volatileInstance = this.Instance.Get(context); return new OpenChannelFactoryAsyncResult(volatileInstance.Instance, callback, state); } protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result) { OpenChannelFactoryAsyncResult.End(result); } class OpenChannelFactoryAsyncResult : AsyncResult { static AsyncCompletion channelFactoryOpenCompletion = new AsyncCompletion(ChannelFactoryOpenCompletion); SendMessageInstance instance; public OpenChannelFactoryAsyncResult(SendMessageInstance instance, AsyncCallback callback, object state) : base(callback, state) { this.instance = instance; bool completeSelf = false; if (this.instance.FactoryReference.NeedsOpen) { IAsyncResult result = this.instance.FactoryReference.BeginOpen(PrepareAsyncCompletion(channelFactoryOpenCompletion), this); if (result.CompletedSynchronously) { completeSelf = OnNewChannelFactoryOpened(result); } } else { completeSelf = true; } if (completeSelf) { Complete(true); } } public static void End(IAsyncResult result) { AsyncResult.End (result); } static bool ChannelFactoryOpenCompletion(IAsyncResult result) { OpenChannelFactoryAsyncResult thisPtr = (OpenChannelFactoryAsyncResult)result.AsyncState; return thisPtr.OnNewChannelFactoryOpened(result); } bool OnNewChannelFactoryOpened(IAsyncResult result) { ObjectCacheItem newCacheItem = this.instance.FactoryReference.EndOpen(result, this.instance.FactoryCache); this.instance.RegisterNewCacheItem(newCacheItem); return true; } } } class OpenChannelAndSendMessage : AsyncCodeActivity { public OpenChannelAndSendMessage() { } public InArgument Instance { get; set; } public InternalSendMessage InternalSendMessage { get; set; } protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state) { VolatileSendMessageInstance volatileInstance = this.Instance.Get(context); Transaction transaction = null; RuntimeTransactionHandle handle = context.GetProperty (); if (handle != null) { transaction = handle.GetCurrentTransaction(context); } return new OpenChannelAndSendMessageAsyncResult(InternalSendMessage, volatileInstance.Instance, transaction, callback, state); } protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result) { OpenChannelAndSendMessageAsyncResult.End(result); } class OpenChannelAndSendMessageAsyncResult : AsyncResult { static AsyncCompletion onChannelOpened = new AsyncCompletion(OnChannelOpened); static AsyncCompletion onChannelSendComplete = new AsyncCompletion(OnChannelSendComplete); static AsyncCallback onChannelReceiveReplyCompleted = Fx.ThunkCallback(OnChannelReceiveReplyComplete); SendMessageInstance instance; InternalSendMessage internalSendMessage; IChannel channel; Transaction currentTransactionContext; Guid ambientActivityId; //This is used to create a blocking dependent clone to synchronize the transaction commit processing with the completion of the aborting clone //that is created in this async result. DependentTransaction dependentClone; public OpenChannelAndSendMessageAsyncResult(InternalSendMessage internalSendMessage, SendMessageInstance instance, Transaction currentTransactionContext, AsyncCallback callback, object state) : base(callback, state) { this.internalSendMessage = internalSendMessage; this.instance = instance; this.channel = this.instance.ClientSendChannel; this.currentTransactionContext = currentTransactionContext; bool completeSelf = false; //channel is still in created state, we need to open it if (this.channel.State == CommunicationState.Created) { // Disable ContextManager before channel is opened IContextManager contextManager = this.channel.GetProperty (); if (contextManager != null) { contextManager.Enabled = false; } IAsyncResult result = this.channel.BeginOpen(PrepareAsyncCompletion(onChannelOpened), this); if (result.CompletedSynchronously) { completeSelf = OnChannelOpened(result); } } else { // channel already opened & retrieved from cache // we don't have to do anything with ChannelOpen completeSelf = BeginSendMessage(); } if (completeSelf) { Complete(true); } } public static void End(IAsyncResult result) { AsyncResult.End (result); } static bool OnChannelOpened(IAsyncResult result) { OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState; thisPtr.channel.EndOpen(result); return thisPtr.BeginSendMessage(); } bool BeginSendMessage() { IAsyncResult result = null; bool requestSucceeded = false; OperationContext oldContext = OperationContext.Current; bool asyncSend = !this.internalSendMessage.IsOneWay; try { OperationContext.Current = this.instance.OperationContext; if (TraceUtility.MessageFlowTracingOnly) { //set the E2E activity ID DiagnosticTrace.ActivityId = this.instance.E2EActivityId; } using (PrepareTransactionalCall(this.currentTransactionContext)) { if (asyncSend) { //If there is a transaction that we could be flowing out then we create this blocking clone to [....] with the commit processing. if (this.currentTransactionContext != null) { this.dependentClone = this.currentTransactionContext.DependentClone(DependentCloneOption.BlockCommitUntilComplete); } this.instance.RequestContext.EnsureAsyncWaitHandle(); result = ((IRequestChannel)this.channel).BeginRequest(this.instance.RequestOrReply, onChannelReceiveReplyCompleted, this); if (result.CompletedSynchronously) { Message reply = ((IRequestChannel)this.channel).EndRequest(result); this.instance.RequestContext.ReceiveReply(this.instance.OperationContext, reply); } } else { result = ((IOutputChannel)this.channel).BeginSend(this.instance.RequestOrReply, PrepareAsyncCompletion(onChannelSendComplete), this); if (result.CompletedSynchronously) { ((IOutputChannel)this.channel).EndSend(result); } } requestSucceeded = true; } } finally { OperationContext.Current = oldContext; if (!requestSucceeded) { //if we did not succeed, complete the blocking clone anyway if we created it if (this.dependentClone != null) { this.dependentClone.Complete(); this.dependentClone = null; } this.channel.Abort(); } if (result != null && result.CompletedSynchronously) { //if we are done synchronously, we need to complete a blocking dependent clone if we created one (asyncSend case) if (this.dependentClone != null) { this.dependentClone.Complete(); this.dependentClone = null; } this.internalSendMessage.CleanupResources(this.instance); } } if (asyncSend) { return true; } else { return SyncContinue(result); } } static void OnChannelReceiveReplyComplete(IAsyncResult result) { if (result.CompletedSynchronously) { return; } OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState; OperationContext oldContext = OperationContext.Current; Message reply = null; bool requestSucceeded = false; try { OperationContext.Current = thisPtr.instance.OperationContext; thisPtr.TraceActivityData(); System.Transactions.TransactionScope scope = Fx.CreateTransactionScope(thisPtr.currentTransactionContext); try { Fx.Assert(thisPtr.channel is IRequestChannel, "Channel must be of IRequestChannel type!"); reply = ((IRequestChannel)thisPtr.channel).EndRequest(result); // thisPtr.instance.RequestContext.ReceiveAsyncReply(thisPtr.instance.OperationContext, reply, null); requestSucceeded = true; } finally { Fx.CompleteTransactionScope(ref scope); } } catch (Exception exception) { if (Fx.IsFatal(exception)) { throw; } thisPtr.instance.RequestContext.ReceiveAsyncReply(thisPtr.instance.OperationContext, null, exception); } finally { //Complete the blocking dependent clone created before the async call was made. if (thisPtr.dependentClone != null) { thisPtr.dependentClone.Complete(); thisPtr.dependentClone = null; } OperationContext.Current = oldContext; if (!requestSucceeded) { thisPtr.channel.Abort(); } thisPtr.internalSendMessage.CleanupResources(thisPtr.instance); } } static bool OnChannelSendComplete(IAsyncResult result) { if (result.CompletedSynchronously) { return true; } OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState; OperationContext oldContext = OperationContext.Current; try { OperationContext.Current = thisPtr.instance.OperationContext; thisPtr.TraceActivityData(); System.Transactions.TransactionScope scope = Fx.CreateTransactionScope(thisPtr.currentTransactionContext); try { Fx.Assert(thisPtr.channel is IOutputChannel, "Channel must be of IOutputChannel type!"); ((IOutputChannel)thisPtr.channel).EndSend(result); } finally { Fx.CompleteTransactionScope(ref scope); } } catch (Exception exception) { if (Fx.IsFatal(exception)) { throw; } // stash away the exception to be retrieved in FinalizeSendMessageCore thisPtr.instance.RequestContext.Exception = exception; } finally { OperationContext.Current = oldContext; thisPtr.internalSendMessage.CleanupResources(thisPtr.instance); } return true; } void TraceActivityData() { if (TraceUtility.ActivityTracing) { if (TD.StopSignpostEventIsEnabled()) { TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary (3) { { MessagingActivityHelper.ActivityName, this.instance.Activity.DisplayName }, { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution }, { MessagingActivityHelper.ActivityInstanceId, this.instance.ActivityInstanceId } })); } FxTrace.Trace.SetAndTraceTransfer(this.ambientActivityId, true); this.ambientActivityId = Guid.Empty; } } } } class WaitOnChannelCorrelation : AsyncCodeActivity { public WaitOnChannelCorrelation() { } public InArgument Instance { get; set; } protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state) { VolatileSendMessageInstance volatileInstance = this.Instance.Get(context); Fx.Assert(volatileInstance.Instance != null, "This should not have gone through a persistence episode yet."); return new WaitOnChannelCorrelationAsyncResult(volatileInstance.Instance.CorrelationSynchronizer, callback, state); } protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result) { WaitOnChannelCorrelationAsyncResult.End(result); } class WaitOnChannelCorrelationAsyncResult : AsyncResult { CorrelationSynchronizer synchronizer; public WaitOnChannelCorrelationAsyncResult(CorrelationSynchronizer synchronizer, AsyncCallback callback, object state) : base(callback, state) { this.synchronizer = synchronizer; if (synchronizer.IsChannelWorkComplete) { Complete(true); } else { if (synchronizer.SetWorkflowNotificationCallback(new Action(OnChannelCorrelationComplete))) { // The bool flipped just before we set the action so // we're actually complete. The contract is that the // action will never be raised if Set returns true. Complete(true); } } } public static void End(IAsyncResult result) { AsyncResult.End (result); } void OnChannelCorrelationComplete() { Complete(false); } } } class CorrelationSynchronizer { Action onRequestSetByChannel; Action onWorkflowCorrelationProcessingComplete; object thisLock; Completion completion; public CorrelationSynchronizer() { this.thisLock = new object(); } public bool IsChannelWorkComplete { get; private set; } public Message UpdatedMessage { get; private set; } public void NotifyRequestSetByChannel(Action onWorkflowCorrelationProcessingComplete) { Fx.Assert(onWorkflowCorrelationProcessingComplete != null, "Must have a non-null callback."); Action toCall = null; lock (this.thisLock) { this.IsChannelWorkComplete = true; this.onWorkflowCorrelationProcessingComplete = onWorkflowCorrelationProcessingComplete; toCall = this.onRequestSetByChannel; } if (toCall != null) { toCall(); } } public void NotifyMessageUpdatedByWorkflow(Message message) { this.UpdatedMessage = message; } public bool NotifyWorkflowCorrelationProcessingComplete() { Fx.Assert(this.onWorkflowCorrelationProcessingComplete != null, "This must be set before this can be called."); bool result = false; lock (this.thisLock) { if (this.completion == Completion.SendComplete) { // The send has already completed so we are responsible for // making sure FinalizeSendMessage is called. result = true; } else { Fx.Assert(this.completion == Completion.None, "We should be the first one to complete."); this.completion = Completion.CorrelationComplete; } } this.onWorkflowCorrelationProcessingComplete(this.UpdatedMessage); return result; } public bool NotifySendComplete() { bool result = false; lock (this.thisLock) { if (this.completion == Completion.CorrelationComplete) { // The correlation has already finished so we are responsible for // making sure that FinalizeSendMessage is called. result = true; } else { Fx.Assert(this.completion == Completion.None, "We should be the first one to complete."); this.completion = Completion.SendComplete; } } return result; } // Returns true if the channel work is actually done. If this // returns true then the passed in Action will never be called. public bool SetWorkflowNotificationCallback(Action onRequestSetByChannel) { Fx.Assert(onRequestSetByChannel != null, "Must have a non-null callback."); bool result = false; lock (this.thisLock) { result = this.IsChannelWorkComplete; this.onRequestSetByChannel = onRequestSetByChannel; } return result; } // This three state enum allows us to determine whether // we are the first or second code path. The second // code path needs finalize the send. enum Completion { None, SendComplete, CorrelationComplete } } // This class defines the instance data that used to store intermediate states // during the volatile async operation of sending a message. class SendMessageInstance { CorrelationHandle explicitChannelCorrelationHandle; IList sendMessageCallbacks; ChannelFactoryReference factoryReference; ObjectCacheItem cacheItem; ObjectCache factoryCache; readonly InternalSendMessage parent; bool isUsingCacheFromExtension; // needed so that we can return our ClientSendChannel to the pool under Dispose ObjectCacheItem > clientChannelPool; public SendMessageInstance(InternalSendMessage parent, NativeActivityContext context) { this.parent = parent; // setup both our following state as well as any anonymous response information CorrelationHandle correlatesWith = (parent.CorrelatesWith == null) ? null : parent.CorrelatesWith.Get(context); if (correlatesWith != null && !correlatesWith.IsInitalized()) { // if send or sendReply has a correlatesWith, it should always be initialized with either content or with callbackcontext, context or // ResponseContext throw FxTrace.Exception.AsError(new ValidationException(SR.SendWithUninitializedCorrelatesWith(this.parent.OperationName ?? string.Empty))); } if (correlatesWith == null) { this.AmbientHandle = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle; correlatesWith = this.AmbientHandle; } this.CorrelatesWith = correlatesWith; if (!parent.IsSendReply) { // we're a client-side request // Validate correlation handle CorrelationHandle channelCorrelationHandle = GetExplicitChannelCorrelationHandle(context, parent.correlationInitializers); if (parent.IsOneWay) { if (channelCorrelationHandle != null) { // this is a one-way send , we should not have a RequestReply Correlation initializer throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RequestReplyHandleShouldNotBePresentForOneWay)); } } else // two-way send { if (channelCorrelationHandle == null && this.AmbientHandle == null) { this.AmbientHandle = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle; if (this.AmbientHandle == null) { // we neither have a channelHandle nor an ambientHandle throw FxTrace.Exception.AsError(new InvalidOperationException( SR.SendMessageNeedsToPairWithReceiveMessageForTwoWayContract(parent.OperationName ?? string.Empty))); } } } // Formatter and OperationContract should be removed from CorrelationRequestContext // This will be done when SendMessage/ReceiveMessage is completely removed from the code base this.RequestContext = new CorrelationRequestContext(); // callback correlationHandle is used for initalizing context based correlation this.ContextBasedCorrelationHandle = CorrelationHandle.GetExplicitCallbackCorrelation(context, parent.correlationInitializers); // by default we use the channel factory cache from the extension isUsingCacheFromExtension = true; } else { // we are a server-side following send CorrelationResponseContext responseContext; if (correlatesWith == null || !correlatesWith.TryAcquireResponseContext(context, out responseContext)) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CorrelatedContextRequiredForAnonymousSend)); } // Contract inference logic should validate that the Receive and Following send do not have conflicting data(e.g., OperationName) this.ResponseContext = responseContext; // in case of Context based correlation, we use context handle to initialize correlation this.ContextBasedCorrelationHandle = CorrelationHandle.GetExplicitContextCorrelation(context, parent.correlationInitializers); } this.sendMessageCallbacks = MessagingActivityHelper.GetCallbacks (context.Properties); if (TraceUtility.MessageFlowTracing) { this.ActivityInstanceId = context.ActivityInstanceId; } } public InternalSendMessage Activity { get { return this.parent; } } public CorrelationHandle CorrelatesWith { get; private set; } public CorrelationHandle AmbientHandle { get; private set; } public CorrelationHandle ContextBasedCorrelationHandle { get; private set; } public EndpointAddress EndpointAddress { get; set; } public IChannel ClientSendChannel { get; private set; } public CorrelationSynchronizer CorrelationSynchronizer { get; set; } public Message RequestOrReply { get; set; } public OperationContext OperationContext { get; set; } public CorrelationRequestContext RequestContext { get; private set; } // This is required for setting adding the ChannelFactory to the cache once it is opened public ObjectCache FactoryCache { get { return this.factoryCache; } } // This is required for setting adding the ChannelFactory to the cache once it is opened public SendMessageChannelCache CacheExtension { get; set; } //This is required for returning it to the cache after use public ChannelFactoryReference FactoryReference { get { return this.factoryReference; } } public CorrelationResponseContext ResponseContext { get; private set; } public CorrelationKeyCalculator CorrelationKeyCalculator { get; private set; } public CorrelationCallbackContext CorrelationCallbackContext { get; set; } public CorrelationContext CorrelationContext { get; set; } public Guid AmbientActivityId { get; set; } public ICollection CorrelationSendNames { get; private set; } public Guid E2EActivityId { get; set; } public string ActivityInstanceId { get; private set; } public bool IsCorrelationInitialized { get; set; } public void SetupFactoryReference(ObjectCacheItem cacheItem, ChannelFactoryReference newFactoryReference, ObjectCache factoryCache) { this.factoryCache = factoryCache; if (this.factoryCache == null) { isUsingCacheFromExtension = false; } if (cacheItem != null) { // we found the item in our cache Fx.Assert(newFactoryReference == null, "need one of cacheItem or newFactoryReference"); Fx.Assert(cacheItem.Value != null, "should have valid value"); this.cacheItem = cacheItem; this.factoryReference = cacheItem.Value; } else { Fx.Assert(newFactoryReference != null, "need one of cacheItem or newFactoryReference"); this.factoryReference = newFactoryReference; } } public void RegisterNewCacheItem(ObjectCacheItem newCacheItem) { Fx.Assert(this.cacheItem == null, "should only be called for new cache items"); this.cacheItem = newCacheItem; } public CorrelationHandle GetExplicitChannelCorrelationHandle(NativeActivityContext context, Collection additionalCorrelations) { if (this.explicitChannelCorrelationHandle == null) { this.explicitChannelCorrelationHandle = CorrelationHandle.GetExplicitChannelCorrelation(context, additionalCorrelations); } return this.explicitChannelCorrelationHandle; } public void RegisterCorrelationBehavior(CorrelationQueryBehavior correlationBehavior) { Fx.Assert(correlationBehavior != null, "caller must verify"); if (correlationBehavior.ScopeName != null) { CorrelationKeyCalculator keyCalculator = correlationBehavior.GetKeyCalculator(); if (keyCalculator != null) { this.CorrelationKeyCalculator = keyCalculator; if (this.RequestContext != null) { this.RequestContext.CorrelationKeyCalculator = keyCalculator; // for requests, determine if we should be using the correlation callback if (correlationBehavior.SendNames != null && correlationBehavior.SendNames.Count > 0) { this.CorrelationSendNames = correlationBehavior.SendNames; } } } } } public void ProcessMessagePropertyCallbacks() { if (this.sendMessageCallbacks != null) { foreach (ISendMessageCallback sendMessageCallback in this.sendMessageCallbacks) { sendMessageCallback.OnSendMessage(this.OperationContext); } } } public void PopulateClientChannel() { Fx.Assert(this.ClientSendChannel == null && this.clientChannelPool == null, "should only be called once per instance"); this.ClientSendChannel = this.FactoryReference.TakeChannel(this.EndpointAddress, out this.clientChannelPool); } public void Dispose() { if (this.ClientSendChannel != null) { Fx.Assert(this.FactoryReference != null, "Must have a factory reference."); this.FactoryReference.ReturnChannel(this.ClientSendChannel, this.clientChannelPool); this.ClientSendChannel = null; this.clientChannelPool = null; } if (this.cacheItem != null) { this.cacheItem.ReleaseReference(); // if we are using the FactoryCache from the extension, store the last used cacheItem and extension if (this.isUsingCacheFromExtension) { this.parent.lastUsedFactoryCacheItem = new KeyValuePair , SendMessageChannelCache>(this.cacheItem, this.CacheExtension); } this.cacheItem = null; } } public Exception GetCompletionException() { if (this.RequestContext != null) { // We got an exception trying to send message or receive a reply // Scenario: ContractFilterMismatch at serverside if the message action is not matched correctly return this.RequestContext.Exception; } else { return this.ResponseContext.Exception; } } } class MessageCorrelationCallbackMessageProperty : CorrelationCallbackMessageProperty { public MessageCorrelationCallbackMessageProperty(ICollection neededData, SendMessageInstance instance) : base(neededData) { this.Instance = instance; } protected MessageCorrelationCallbackMessageProperty(MessageCorrelationCallbackMessageProperty callback) : base(callback) { this.Instance = callback.Instance; } public SendMessageInstance Instance { get; private set; } public override IMessageProperty CreateCopy() { return new MessageCorrelationCallbackMessageProperty(this); } protected override IAsyncResult OnBeginFinalizeCorrelation(Message message, TimeSpan timeout, AsyncCallback callback, object state) { return new FinalizeCorrelationAsyncResult(this, message, callback, state); } protected override Message OnEndFinalizeCorrelation(IAsyncResult result) { return FinalizeCorrelationAsyncResult.End(result); } protected override Message OnFinalizeCorrelation(Message message, TimeSpan timeout) { return OnEndFinalizeCorrelation(OnBeginFinalizeCorrelation(message, timeout, null, null)); } class FinalizeCorrelationAsyncResult : AsyncResult { Message message; Completion completion; object thisLock; public FinalizeCorrelationAsyncResult(MessageCorrelationCallbackMessageProperty property, Message message, AsyncCallback callback, object state) : base(callback, state) { bool completeSelf = false; if (property.Instance.IsCorrelationInitialized) { // we do not modify the message since correlation is not calculated again this.message = message; completeSelf = true; } else { property.Instance.IsCorrelationInitialized = true; this.thisLock = new object(); property.Instance.RequestOrReply = message; property.Instance.CorrelationSynchronizer.NotifyRequestSetByChannel(new Action (OnWorkflowCorrelationProcessingComplete)); // We have to do this dance with the lock because // we aren't sure if we've been running [....] or not. // NOTE: It is possible for us to go async and // still decide we're completing [....]. This is fine // as it does not violate the async pattern since // the work is done by the time Begin completes. completeSelf = false; lock (this.thisLock) { if (completion == Completion.WorkflowCorrelationProcessingComplete) { completeSelf = true; } else { Fx.Assert(this.completion == Completion.None, "We must be not ready then."); this.completion = Completion.ConstructorComplete; } } } if (completeSelf) { Complete(true); } } void OnWorkflowCorrelationProcessingComplete(Message updatedMessage) { this.message = updatedMessage; // We have to do this dance with the lock because // we aren't sure if we've been running [....] or not. // NOTE: It is possible for us to go async and // still decide we're completing [....]. This is fine // as it does not violate the async pattern since // the work is done by the time Begin completes. bool completeSelf = false; lock (this.thisLock) { if (this.completion == Completion.ConstructorComplete) { completeSelf = true; } else { Fx.Assert(this.completion == Completion.None, "We must be not ready then."); this.completion = Completion.WorkflowCorrelationProcessingComplete; } } if (completeSelf) { Complete(false); } } public static Message End(IAsyncResult result) { FinalizeCorrelationAsyncResult thisPtr = AsyncResult.End (result); return thisPtr.message; } // This three state enum allows us to determine whether // we are the first or second code path. The second // code path needs to complete the async result. enum Completion { None, ConstructorComplete, WorkflowCorrelationProcessingComplete } } } [DataContract] class VolatileSendMessageInstance { public VolatileSendMessageInstance() { } // Note that we do not mark this DataMember since we don�t want it to be serialized public SendMessageInstance Instance { get; set; } } // Represents an item in our object cache. Stores a ChannelFactory and an associated pool of channels internal sealed class ChannelFactoryReference : IDisposable { static Action > disposeChannelPool = new Action >(DisposeChannelPool); static AsyncCallback onDisposeCommunicationObject = Fx.ThunkCallback(new AsyncCallback(OnDisposeCommunicationObject)); readonly FactoryCacheKey factoryKey; readonly ServiceEndpoint targetEndpoint; ChannelFactory channelFactory; ObjectCache > channelCache; CorrelationQueryBehavior correlationQueryBehavior; Func > createChannelCacheItem; public ChannelFactoryReference(FactoryCacheKey factoryKey, ServiceEndpoint targetEndpoint, ChannelCacheSettings channelCacheSettings) { Fx.Assert(channelCacheSettings != null, " channelCacheSettings should not be null"); Fx.Assert(factoryKey != null, " factoryKey should not be null"); Fx.Assert(targetEndpoint != null, " targetEndpoint should not be null"); this.factoryKey = factoryKey; this.targetEndpoint = targetEndpoint; if (factoryKey.IsOperationContractOneWay) { this.channelFactory = new ChannelFactory (targetEndpoint); } else { this.channelFactory = new ChannelFactory (targetEndpoint); } this.channelFactory.UseActiveAutoClose = true; this.channelFactory.Credentials.Windows.AllowedImpersonationLevel = factoryKey.TokenImpersonationLevel; ObjectCacheSettings channelSettings = new ObjectCacheSettings { CacheLimit = channelCacheSettings.MaxItemsInCache, IdleTimeout = channelCacheSettings.IdleTimeout, LeaseTimeout = channelCacheSettings.LeaseTimeout }; // our channel cache is keyed solely on endpoint since we don't allow the via to be dynamic // for a ChannelFactoryReference this.channelCache = new ObjectCache >(channelSettings) { DisposeItemCallback = disposeChannelPool }; this.createChannelCacheItem = () => new Pool (channelCacheSettings.MaxItemsInCache); } public CorrelationQueryBehavior CorrelationQueryBehavior { get { if (this.correlationQueryBehavior == null) { this.correlationQueryBehavior = this.targetEndpoint.Behaviors.Find (); } return this.correlationQueryBehavior; } } // As a perf optimization, we provide this property to avoid async result/callback creations public bool NeedsOpen { get { return this.channelFactory.State == CommunicationState.Created; } } public IAsyncResult BeginOpen(AsyncCallback callback, object state) { Fx.Assert(NeedsOpen, "caller should check NeedsOpen first"); return this.channelFactory.BeginOpen(callback, state); } // after open we should be added to a cache if one is provided public ObjectCacheItem EndOpen(IAsyncResult result, ObjectCache factoryCache) { this.channelFactory.EndOpen(result); ObjectCacheItem cacheItem = null; if (factoryCache != null) { cacheItem = factoryCache.Add(this.factoryKey, this); } return cacheItem; } [SuppressMessage(FxCop.Category.Usage, FxCop.Rule.DisposableFieldsShouldBeDisposed, Justification = "disposable field is being disposed using DisposeCommunicationObject")] public void Dispose() { DisposeCommunicationObject(this.channelFactory); } public IChannel TakeChannel(EndpointAddress endpointAddress, out ObjectCacheItem > channelPool) { channelPool = this.channelCache.Take(endpointAddress, this.createChannelCacheItem); Fx.Assert(channelPool != null, "Take with delegate should always return a valid Item"); IChannel result; lock (channelPool.Value) { result = channelPool.Value.Take(); } // make an effort to kill stale channels ServiceChannel serviceChannel = result as ServiceChannel; if (result != null && ( result.State != CommunicationState.Opened || (serviceChannel != null && serviceChannel.Binder.Channel.State != CommunicationState.Opened))) { result.Abort(); result = null; } if (result == null) { Uri via = null; // service endpoint always sets the ListenUri, which will break default callback-context behavior if (this.targetEndpoint.Address != null && this.targetEndpoint.Address.Uri != this.targetEndpoint.ListenUri) { via = this.targetEndpoint.ListenUri; } if (this.factoryKey.IsOperationContractOneWay) { result = ((ChannelFactory )this.channelFactory).CreateChannel(endpointAddress, via); } else { result = ((ChannelFactory )this.channelFactory).CreateChannel(endpointAddress, via); } } if (!(result is ServiceChannel)) { result = ServiceChannelFactory.GetServiceChannel(result); } return result; } public void ReturnChannel(IChannel channel, ObjectCacheItem > channelPool) { bool shouldDispose = channel.State != CommunicationState.Opened; // channel is in open state, try returning it to the pool if (!shouldDispose) { lock (channelPool.Value) { shouldDispose = !channelPool.Value.Return(channel); } } if (shouldDispose) { // not caching the channel, so we need to close it DisposeCommunicationObject(channel); } // and return our cache item channelPool.ReleaseReference(); } static void DisposeChannelPool(Pool channelPool) { IChannel channel; // we don't need to lock the Take from the Pool here since no one will be accessing this anymore // Dispose will be called under a lock from the ObjectCacheItem while ((channel = channelPool.Take()) != null) { DisposeCommunicationObject(channel); } } static void DisposeCommunicationObject(ICommunicationObject communicationObject) { bool success = false; try { if (communicationObject.State == CommunicationState.Opened) { IAsyncResult result = communicationObject.BeginClose(ServiceDefaults.CloseTimeout, onDisposeCommunicationObject, communicationObject); if (result.CompletedSynchronously) { communicationObject.EndClose(result); } success = true; } } catch (CommunicationException) { // expected, we'll abort } catch (TimeoutException) { // expected, we'll abort } finally { if (!success) { communicationObject.Abort(); } } } static void OnDisposeCommunicationObject(IAsyncResult result) { if (result.CompletedSynchronously) { return; } ICommunicationObject communicationObject = (ICommunicationObject)result.AsyncState; bool success = false; try { communicationObject.EndClose(result); success = true; } catch (CommunicationException) { // expected, we'll abort } catch (TimeoutException) { // expected, we'll abort } finally { if (!success) { communicationObject.Abort(); } } } } internal class FactoryCacheKey : IEquatable { Endpoint endpoint; bool isOperationContractOneWay; TokenImpersonationLevel tokenImpersonationLevel; ContractDescription contract; Collection correlationQueries; string endpointConfigurationName; public FactoryCacheKey(Endpoint endpoint, string endpointConfigurationName, bool isOperationOneway, TokenImpersonationLevel tokenImpersonationLevel, ContractDescription contractDescription, ICollection correlationQueries) { this.endpoint = endpoint; this.endpointConfigurationName = endpointConfigurationName; this.isOperationContractOneWay = isOperationOneway; this.tokenImpersonationLevel = tokenImpersonationLevel; this.contract = contractDescription; if (correlationQueries != null) { this.correlationQueries = new Collection (); foreach (CorrelationQuery query in correlationQueries) { this.correlationQueries.Add(query); } } } public bool IsOperationContractOneWay { get { return this.isOperationContractOneWay; } } public TokenImpersonationLevel TokenImpersonationLevel { get { return this.tokenImpersonationLevel; } } public bool Equals(FactoryCacheKey other) { if (object.ReferenceEquals(this, other)) { return true; } if (other == null) { // this means only one of them is null return false; } // 1) Compare Endpoint/EndpointConfigurationName if ((this.endpoint == null && other.endpoint != null) || (other.endpoint == null && this.endpoint != null)) { return false; } // if endpoint is not null we compare the endpoint, else we compare the endpointconfiguration if (this.endpoint != null) { if (!object.ReferenceEquals(this.endpoint, other.endpoint)) { // Binding - // We are comparing by ref here, can we compare binding elements instead if (this.endpoint.Binding != other.endpoint.Binding) { return false; } } } else if (this.endpointConfigurationName != other.endpointConfigurationName) { return false; } // (2) TokenImpersonationlevel if (this.TokenImpersonationLevel != other.TokenImpersonationLevel) { return false; } // (3) OperationContract.IsOneWay to decide if the ChannelFactory needs to be of type RequestChannel or OutputChannel if (this.IsOperationContractOneWay != other.IsOperationContractOneWay) { return false; } // (4) Verify if the ContractDescriptions are equivalent if (!ContractDescriptionComparerHelper.IsContractDescriptionEquivalent(this.contract, other.contract)) { return false; } // (5) Verify the correlationquery collection // For now, we verify each query by ref, so that loop scenarios would work // Can we do a value comparison here? if (!ContractDescriptionComparerHelper.EqualsUnordered(this.correlationQueries, other.correlationQueries)) { return false; } return true; } public override int GetHashCode() { int hashCode = 0; if (this.contract != null && this.contract.Name != null) { //using ContractName as the hashcode hashCode ^= this.contract.Name.GetHashCode(); } if (this.endpoint != null && this.endpoint.Binding != null) { //we compare binding by ref hashCode ^= this.endpoint.Binding.GetHashCode(); } return hashCode; } static class ContractDescriptionComparerHelper { public static bool EqualsUnordered (Collection left, Collection right) where T : class { return EqualsUnordered(left, right, (t1, t2) => t1 == t2); } public static bool IsContractDescriptionEquivalent(ContractDescription c1, ContractDescription c2) { if (c1 == c2) { return true; } // if the contract is not one of the default contracts that we use, we only do a byref comparison // fully inferred contracts always have null ContractType if (c1.ContractType == null || c2.ContractType == null) { return false; } //compare contractname return (c1 != null && c2 != null && c1.Name == c2.Name && c1.Namespace == c2.Namespace && c1.ConfigurationName == c2.ConfigurationName && c1.ProtectionLevel == c2.ProtectionLevel && c1.SessionMode == c2.SessionMode && c1.ContractType == c2.ContractType && c1.Behaviors.Count == c2.Behaviors.Count && //we have no way to verify each one EqualsUnordered (c1.Operations, c2.Operations, (o1, o2) => IsOperationDescriptionEquivalent(o1, o2))); } static bool EqualsOrdered (IList left, IList right, Func equals) { if (left == null) { return (right == null || right.Count == 0); } else if (right == null) { return left.Count == 0; } if (left.Count != right.Count) { return false; } for (int i = 0; i < left.Count; i++) { if (!equals(left[i], right[i])) { return false; } } return true; } static bool EqualsUnordered (Collection left, Collection right, Func equals) { if (left == null) { return (right == null || right.Count == 0); } else if (right == null) { return left.Count == 0; } // This check ensures that the lists have the same contents, but does not verify that they have the same // quantity of each item if they are duplicates. return left.Count == right.Count && left.All(leftItem => right.Any(rightItem => equals(leftItem, rightItem))) && right.All(rightItem => left.Any(leftItem => equals(leftItem, rightItem))); } static bool IsOperationDescriptionEquivalent(OperationDescription o1, OperationDescription o2) { if (o1 == o2) { return true; } return (o1.Name == o2.Name && o1.ProtectionLevel == o2.ProtectionLevel && o1.IsOneWay == o2.IsOneWay && IsTransactionBehaviorEquivalent(o1, o2) && //we are verifying only the TransactionFlowBehavior EqualsOrdered(o1.Messages, o2.Messages, (m1, m2) => IsMessageDescriptionEquivalent(m1, m2))); } static bool IsMessageDescriptionEquivalent(MessageDescription m1, MessageDescription m2) { if (m1 == m2) { return true; } //we are comparing only action and direction return (m1.Action == m2.Action && m1.Direction == m2.Direction); } static bool IsTransactionBehaviorEquivalent(OperationDescription o1, OperationDescription o2) { if ((o1 == null || o2 == null) && o1 == o2) { return true; } if (o1.Behaviors.Count == o2.Behaviors.Count) { //we are only going to check the TransactionFlowAttribute TransactionFlowAttribute t1 = o1.Behaviors.Find (); TransactionFlowAttribute t2 = o2.Behaviors.Find (); if ((t1 == null && t2 != null) || (t2 == null && t1 != null)) { return false; } //verify if both have the same value for TransactionFlowOption if ((t1 != null) && (t1.Transactions != t2.Transactions)) { return false; } else { return true; } } else { return false; } } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- UserNameServiceElement.cs
- DefaultAsyncDataDispatcher.cs
- XmlSubtreeReader.cs
- SymbolType.cs
- DataGridViewHitTestInfo.cs
- SqlCacheDependencyDatabaseCollection.cs
- WebPartHeaderCloseVerb.cs
- GridViewColumnCollection.cs
- ExpandableObjectConverter.cs
- ToolStripScrollButton.cs
- ExpressionBuilder.cs
- MultiDataTrigger.cs
- WebPartConnection.cs
- TraceXPathNavigator.cs
- NativeActivityContext.cs
- DynamicDocumentPaginator.cs
- SystemUdpStatistics.cs
- _Connection.cs
- SecurityRuntime.cs
- Rules.cs
- PathGeometry.cs
- IncrementalHitTester.cs
- dataSvcMapFileLoader.cs
- PrePostDescendentsWalker.cs
- TextInfo.cs
- MenuStrip.cs
- UserMapPath.cs
- WsdlExporter.cs
- StackBuilderSink.cs
- WindowsContainer.cs
- RootCodeDomSerializer.cs
- BinaryUtilClasses.cs
- HeaderUtility.cs
- SqlSupersetValidator.cs
- ToolStripItemClickedEventArgs.cs
- DesignTimeResourceProviderFactoryAttribute.cs
- StyleTypedPropertyAttribute.cs
- FirstMatchCodeGroup.cs
- ProviderUtil.cs
- DataGridViewRowPostPaintEventArgs.cs
- ScriptControlDescriptor.cs
- BufferBuilder.cs
- XpsFixedPageReaderWriter.cs
- DataConnectionHelper.cs
- CodeAttributeArgumentCollection.cs
- LingerOption.cs
- BinaryUtilClasses.cs
- ToolStripSplitStackLayout.cs
- ModuleBuilder.cs
- CodeDelegateInvokeExpression.cs
- DesigntimeLicenseContext.cs
- XpsFixedPageReaderWriter.cs
- AssertSection.cs
- UserPersonalizationStateInfo.cs
- ContainerControl.cs
- TextSelectionHighlightLayer.cs
- HostnameComparisonMode.cs
- listitem.cs
- UserPreferenceChangingEventArgs.cs
- TimeSpanValidator.cs
- UndirectedGraph.cs
- SourceLineInfo.cs
- VariableQuery.cs
- FixedFlowMap.cs
- CustomPopupPlacement.cs
- ScrollBar.cs
- TemplatingOptionsDialog.cs
- XslException.cs
- SiblingIterators.cs
- NumberSubstitution.cs
- SimpleWorkerRequest.cs
- SQLInt32Storage.cs
- WizardStepBase.cs
- Pkcs9Attribute.cs
- ProjectionPathSegment.cs
- Preprocessor.cs
- ComAdminWrapper.cs
- BrowserDefinition.cs
- FormViewDesigner.cs
- DataGridViewTextBoxColumn.cs
- PathGeometry.cs
- Base64Encoding.cs
- TransformConverter.cs
- ProjectionCamera.cs
- ArgumentNullException.cs
- EditingCoordinator.cs
- Geometry3D.cs
- Exception.cs
- XmlExtensionFunction.cs
- HtmlInputReset.cs
- ThemeableAttribute.cs
- ParenthesizePropertyNameAttribute.cs
- BinaryParser.cs
- Decorator.cs
- FontSizeConverter.cs
- BlobPersonalizationState.cs
- ObservableCollection.cs
- InvalidStoreProtectionKeyException.cs
- SafeCryptContextHandle.cs
- Point4D.cs