InternalSendMessage.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Activities / System / ServiceModel / Activities / InternalSendMessage.cs / 1480445 / InternalSendMessage.cs

                            //------------------------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.ServiceModel.Activities 
{
    using System; 
    using System.Activities; 
    using System.Activities.Statements;
    using System.Collections.Generic; 
    using System.Collections.ObjectModel;
    using System.Diagnostics;
    using System.Diagnostics.CodeAnalysis;
    using System.Linq; 
    using System.Runtime;
    using System.Runtime.Collections; 
    using System.Runtime.Diagnostics; 
    using System.Runtime.Serialization;
    using System.Security.Principal; 
    using System.ServiceModel.Activities.Description;
    using System.ServiceModel.Activities.Dispatcher;
    using System.ServiceModel.Activities.Tracking;
    using System.ServiceModel.Channels; 
    using System.ServiceModel.Description;
    using System.ServiceModel.Diagnostics; 
    using System.Transactions; 
    using System.Xml.Linq;
 

    // InternalSendMessage encapsulates both the server and client send.  For the server
    // send it provides the ability to persist after correlations have been initialized
    // but before the send has actually been completed by the channel stack.  This is not 
    // supported by client send.
 
    class InternalSendMessage : NativeActivity 
    {
        static string runtimeTransactionHandlePropertyName = typeof(RuntimeTransactionHandle).FullName; 

        // Explicit correlation OM
        Collection correlationInitializers;
        Collection replyCorrelationQueries; 

        ICollection correlationQueries; 
 
        MessageVersion messageVersion;
 
        ContractDescription cachedContract;
        ServiceEndpoint cachedServiceEndpoint;
        AddressHeaderCollection cachedEndpointHeaderCollection;
        FactoryCacheKey cachedFactoryCacheKey; 
        bool isConfigSettingsSecure;
        bool configVerified; 
        KeyValuePair, SendMessageChannelCache> lastUsedFactoryCacheItem; 

 
        // this will be scheduled if ShouldPersistBeforeSend is set to true
        Activity persist;

        WaitOnChannelCorrelation channelCorrelationCompletionWaiter; 
        Variable sendMessageInstance;
        Variable noPersistHandle; 
 
        OpenChannelFactory openChannelFactory;
        OpenChannelAndSendMessage openChannelAndSendMessage; 

        FaultCallback onSendFailure;

        public InternalSendMessage() 
        {
            this.TokenImpersonationLevel = TokenImpersonationLevel.Identification; 
 
            this.sendMessageInstance = new Variable();
            this.channelCorrelationCompletionWaiter = new WaitOnChannelCorrelation { Instance = this.sendMessageInstance }; 

            this.noPersistHandle = new Variable();

            this.openChannelFactory = new OpenChannelFactory { Instance = this.sendMessageInstance }; 
            this.openChannelAndSendMessage = new OpenChannelAndSendMessage { Instance = this.sendMessageInstance, InternalSendMessage = this, };
        } 
 
        public TokenImpersonationLevel TokenImpersonationLevel
        { 
            get;
            set;
        }
 
        // Endpoint defines the service to talk to, and endpointAddress is used to set
        // the Uri at the runtime, such as the duplex scenario. 
        public Endpoint Endpoint 
        {
            get; 
            set;
        }

        public string EndpointConfigurationName 
        {
            get; 
            set; 
        }
 
        // This is needed for the callback case
        public InArgument EndpointAddress
        {
            get; 
            set;
        } 
 
        public InArgument CorrelatesWith
        { 
            get;
            set;
        }
 
        public string OperationName
        { 
            get; 
            set;
        } 

        public string Action
        {
            get; 
            set;
        } 
 
        // cache for internal implementation. This should be set by the Send
        // Should only be used in initating send. 
        // Should use this instead of OperationContract.IsOneWay
        public bool IsOneWay
        {
            get; 
            set;
        } 
 
        protected override bool CanInduceIdle
        { 
            get
            {
                return true;
            } 
        }
 
        // this flag is for Send/SendReply to indicate if we are client-side send or receive-side sendreply 
        //
        internal bool IsSendReply 
        {
            get;
            set;
        } 

        // Used for cleaning up the Message variable 
        internal OutArgument MessageOut 
        {
            get; 
            set;
        }

        // should be used to decide whether persist before sending the message 
        internal bool ShouldPersistBeforeSend { get; set; }
 
        public Collection CorrelationInitializers 
        {
            get 
            {
                if (this.correlationInitializers == null)
                {
                    this.correlationInitializers = new Collection(); 
                }
                return this.correlationInitializers; 
            } 
        }
 
        // This will be passed in from the parent Send activity
        public CorrelationQuery CorrelationQuery
        {
            get; 
            set;
        } 
 
        // This needs to be set by the ReceiveReply, we assume that this is unique
        internal ICollection ReplyCorrelationQueries 
        {
            get
            {
                if (this.replyCorrelationQueries == null) 
                {
                    this.replyCorrelationQueries = new Collection(); 
                } 

                return this.replyCorrelationQueries; 
            }
        }

        // on the serverside, the ContractName is set during ContractInference and is used for retrieving the 
        // correct CorrelationQueryBehavior. ContractName on the Serverside can thus be different from what is
        // set on the OM 
        public XName ServiceContractName 
        {
            get; 
            set;
        }

        public InArgument Message 
        {
            get; 
            set; 
        }
 
        internal Send Parent
        {
            get;
            set; 
        }
 
        // we cache the ServiceEndpoint for perf reasons so that we can retrieve endpointaddress, contract etc without 
        // creating a new ServiceEndpoint each time
        // Note that we should not pass the cachedServiceEndpoint to the ChannelFactory, as we need to have a 
        // distinct instance per-Factory.
        ServiceEndpoint GetCachedServiceEndpoint()
        {
            if (this.cachedServiceEndpoint == null) 
            {
                this.cachedServiceEndpoint = CreateServiceEndpoint(); 
            } 
            return this.cachedServiceEndpoint;
        } 

        AddressHeaderCollection GetCachedEndpointHeaders()
        {
            Fx.Assert(this.Endpoint != null, "Endpoint should not be null"); 
            if (this.cachedEndpointHeaderCollection == null)
            { 
                this.cachedEndpointHeaderCollection = new AddressHeaderCollection(this.Endpoint.Headers); 
            }
            return this.cachedEndpointHeaderCollection; 
        }

        void InitializeEndpoint(ref ServiceEndpoint serviceEndpoint, string configurationName)
        { 
            ServiceEndpoint serviceEndpointFromConfig = null;
 
            if (configurationName != null) 
            {
                // load the standard endpoint from the config 
                serviceEndpointFromConfig = ConfigLoader.LookupEndpoint(configurationName, null, serviceEndpoint.Contract);
            }

            if (serviceEndpointFromConfig != null) 
            {
                // standard endpoint case: it can completely override the endpoint 
                serviceEndpoint = serviceEndpointFromConfig; 
            }
            else 
            {
                // normal endpoint case
                if (!serviceEndpoint.IsFullyConfigured)
                { 
                    new ConfigLoader().LoadChannelBehaviors(serviceEndpoint, configurationName);
                } 
            } 
        }
 
        // used to create ChannelFactoryReference instances. We don't cache the serviceEndpoint
        // directly, as we need to have a distinct instance per-Factory. So it's cached behind the
        // scenes as part of the ChannelFactoryReference
        ServiceEndpoint CreateServiceEndpoint() 
        {
            ContractDescription contract = null; 
            bool ensureTransactionFlow = false; 
            if (this.cachedContract == null)
            { 
                contract = this.GetContractDescription();
                ensureTransactionFlow = true;
            }
            else 
            {
                contract = this.cachedContract; 
            } 
            ServiceEndpoint result = new ServiceEndpoint(contract);
            if (this.Endpoint != null) 
            {
                result.Binding = this.Endpoint.Binding;
                if (this.Endpoint.AddressUri != null)
                { 
                    result.Address = new EndpointAddress(this.Endpoint.AddressUri, this.Endpoint.Identity, this.GetCachedEndpointHeaders());
                } 
            } 
            // Get ServiceEndpoint will be called only on the client side, hence if endpoint is null, we will try to load the config with
            // endpointConfigurationName. 
            // endpointConfigurationName = null will be translated to endpointConfigurationName = String.Empty
            else
            {
                // we are loading the binding & the behaviors from config 
                if (this.ServiceContractName != null)
                { 
                    result.Contract.ConfigurationName = this.ServiceContractName.LocalName; 
                }
                InitializeEndpoint(ref result, this.EndpointConfigurationName ?? string.Empty); 
            }

            // if the cachedContract is null, verify if TransactionFlow is accounted for in the contract
            // if cachedContract is not null, we can skip this since the contract should be fixed for the workflow definition 
            if (ensureTransactionFlow)
            { 
                EnsureTransactionFlowOnContract(ref result); 
                this.cachedContract = result.Contract;
            } 
            EnsureCorrelationQueryBehavior(result);
            return result;
        }
 
        void EnsureCorrelationQueryBehavior(ServiceEndpoint serviceEndpoint)
        { 
            CorrelationQueryBehavior correlationQueryBehavior = serviceEndpoint.Behaviors.Find(); 
            if (correlationQueryBehavior == null)
            { 
                // Add CorrelationQueryBehavior if either Binding has queries or if either Send or ReceiveReplies
                // have correlation query associated with them
                if (CorrelationQueryBehavior.BindingHasDefaultQueries(serviceEndpoint.Binding)
                    || this.CorrelationQuery != null 
                    || this.ReplyCorrelationQueries.Count > 0)
                { 
                    correlationQueryBehavior = new CorrelationQueryBehavior(new Collection()); 
                    serviceEndpoint.Behaviors.Add(correlationQueryBehavior);
                } 
            }
            if (correlationQueryBehavior != null)
            {
                // add CorrelationQuery from Send 
                if (this.CorrelationQuery != null && !correlationQueryBehavior.CorrelationQueries.Contains(this.CorrelationQuery))
                { 
                    correlationQueryBehavior.CorrelationQueries.Add(this.CorrelationQuery); 
                }
 
                //add ReplyCorrelationQueries from ReceiveReply (there could be multiple ReceiveReplies for a Send and hence the collection
                foreach (CorrelationQuery query in this.ReplyCorrelationQueries)
                {
                    // Filter out duplicate CorrelationQueries in the collection. 
                    // Currently, we only do reference comparison and Where message filter comparison.
                    if (!correlationQueryBehavior.CorrelationQueries.Contains(query)) 
                    { 
                        correlationQueryBehavior.CorrelationQueries.Add(query);
                    } 
                    else
                    {
                        if (TD.DuplicateCorrelationQueryIsEnabled())
                        { 
                            TD.DuplicateCorrelationQuery(query.Where.ToString());
                        } 
                    } 
                }
 
                this.correlationQueries = correlationQueryBehavior.CorrelationQueries;
            }
        }
 
        static void EnsureCorrelationBehaviorScopeName(ActivityContext context, CorrelationQueryBehavior correlationBehavior)
        { 
            Fx.Assert(correlationBehavior != null, "caller must verify"); 
            if (correlationBehavior.ScopeName == null)
            { 
                CorrelationExtension extension = context.GetExtension();
                if (extension != null)
                {
                    correlationBehavior.ScopeName = extension.ScopeName; 
                }
            } 
        } 

        void EnsureTransactionFlowOnContract(ref ServiceEndpoint serviceEndpoint) 
        {
            if (!this.IsOneWay)
            {
                BindingElementCollection elementCollection = serviceEndpoint.Binding.CreateBindingElements(); 
                TransactionFlowBindingElement bindingElement = elementCollection.Find();
                bool flowTransaction = ((bindingElement != null) && (bindingElement.Transactions)); 
                if (flowTransaction) 
                {
                    ContractInferenceHelper.EnsureTransactionFlowOnContract(ref serviceEndpoint, 
                        this.ServiceContractName, this.OperationName, this.Action, this.Parent.ProtectionLevel);
                }
            }
        } 

        internal MessageVersion GetMessageVersion() 
        { 
            if (this.messageVersion == null)
            { 
                ServiceEndpoint endpoint = this.GetCachedServiceEndpoint();
                this.messageVersion = (endpoint != null && endpoint.Binding != null) ? endpoint.Binding.MessageVersion : null;
            }
            return this.messageVersion; 
        }
 
        ContractDescription GetContractDescription() 
        {
            ContractDescription cd; 

            // When channel cache is disabled or when operation uses message contract,
            // we use the fully inferred description; otherwise, we use a fixed description to increase channel cache hits
 
            if (!this.Parent.ChannelCacheEnabled || this.Parent.OperationUsesMessageContract)
            { 
                // If this is one-way send untyped message, this.OperationDescription would still be null 
                if (this.Parent.OperationDescription == null)
                { 
                    Fx.Assert(this.IsOneWay, "We can only reach here when we are one-way send Message!");
                    this.Parent.OperationDescription = ContractInferenceHelper.CreateOneWayOperationDescription(this.Parent);
                }
 
                cd = ContractInferenceHelper.CreateContractFromOperation(this.ServiceContractName, this.Parent.OperationDescription);
            } 
            else 
            {
                // Create ContractDescription using Fixed MessageIn/MessageOut contract 
                // If IOutputChannel, we create a Contract with name IOutputChannel and OperationDescription "Send"
                // else, Contract name is IRequestChannel with OperationDescription "Request"

                if (this.IsOneWay) 
                {
                    cd = ContractInferenceHelper.CreateOutputChannelContractDescription(this.ServiceContractName, this.Parent.ProtectionLevel); 
                } 
                else
                { 
                    cd = ContractInferenceHelper.CreateRequestChannelContractDescription(this.ServiceContractName, this.Parent.ProtectionLevel);
                }
            }
 
            if (this.ServiceContractName != null)
            { 
                cd.ConfigurationName = this.ServiceContractName.LocalName; 
            }
            return cd; 
        }

        EndpointAddress CreateEndpointAddress(NativeActivityContext context)
        { 
            ServiceEndpoint endpoint = this.GetCachedServiceEndpoint();
            Uri endpointAddressUri = (this.EndpointAddress != null) ? this.EndpointAddress.Get(context) : null; 
 
            if (endpoint != null && endpoint.Address != null)
            { 
                return endpointAddressUri == null ?
                    endpoint.Address :
                    new EndpointAddressBuilder(endpoint.Address) { Uri = endpointAddressUri }.ToEndpointAddress();
            } 
            else if (this.Endpoint != null)
            { 
                return endpointAddressUri == null ? 
                    this.Endpoint.GetAddress() :
                    new EndpointAddress(endpointAddressUri, this.Endpoint.Identity, this.GetCachedEndpointHeaders()); 
            }
            else
            {
                return null; 
            }
        } 
 
        EndpointAddress CreateEndpointAddressFromCallback(EndpointAddress CallbackAddress)
        { 
            Fx.Assert(CallbackAddress != null, "CallbackAddress cannot be null");

            EndpointIdentity endpointIdentity = null;
            AddressHeaderCollection headers = null; 
            EndpointAddress endpointAddress;
 
            if (this.Endpoint != null) 
            {
                // we honor Identity and Headers on the Endpoint OM even when the AddressUri is null 
                endpointIdentity = this.Endpoint.Identity;
                headers = this.GetCachedEndpointHeaders();
            }
            else 
            {
                // this could be from config 
                ServiceEndpoint endpoint = this.GetCachedServiceEndpoint(); 
                Fx.Assert(endpoint != null, " endpoint cannot be null");
                if (endpoint.Address != null) 
                {
                    endpointIdentity = endpoint.Address.Identity;
                    headers = endpoint.Address.Headers;
                } 
            }
 
            if (endpointIdentity != null || headers != null) 
            {
                Uri callbackUri = CallbackAddress.Uri; 
                endpointAddress = new EndpointAddress(callbackUri, endpointIdentity, headers);
            }
            else
            { 
                endpointAddress = CallbackAddress;
            } 
            return endpointAddress; 
        }
 

        bool IsEndpointSettingsSafeForCache()
        {
            if (!this.configVerified) 
            {
 
                // let's set isConfigSettingsSecure flag to false if we use endpointConfiguration, 
                // this is used to decide if we cache factory or not
 
                this.isConfigSettingsSecure = this.Endpoint != null ? true : false;
                this.configVerified = true;
            }
            return this.isConfigSettingsSecure; 
        }
 
        protected override void CacheMetadata(NativeActivityMetadata metadata) 
        {
            if (ShouldPersistBeforeSend) 
            {
                if (this.persist == null)
                {
                    this.persist = new Persist(); 
                }
                metadata.AddImplementationChild(this.persist); 
            } 

            RuntimeArgument endpointAddressArgument = new RuntimeArgument(Constants.EndpointAddress, Constants.UriType, ArgumentDirection.In); 
            metadata.Bind(this.EndpointAddress, endpointAddressArgument);
            metadata.AddArgument(endpointAddressArgument);

            RuntimeArgument correlatesWithArgument = new RuntimeArgument(Constants.CorrelatesWith, Constants.CorrelationHandleType, ArgumentDirection.In); 
            metadata.Bind(this.CorrelatesWith, correlatesWithArgument);
            metadata.AddArgument(correlatesWithArgument); 
 
            if (this.correlationInitializers != null)
            { 
                int count = 0;
                foreach (CorrelationInitializer correlation in this.correlationInitializers)
                {
                    if (correlation.CorrelationHandle != null) 
                    {
                        RuntimeArgument argument = new RuntimeArgument(Constants.Parameter + count, 
                            correlation.CorrelationHandle.ArgumentType, correlation.CorrelationHandle.Direction, true); 
                        metadata.Bind(correlation.CorrelationHandle, argument);
                        metadata.AddArgument(argument); 
                        count++;
                    }
                }
            } 

            RuntimeArgument requestMessageArgument = new RuntimeArgument(Constants.RequestMessage, Constants.MessageType, ArgumentDirection.In); 
            metadata.Bind(this.Message, requestMessageArgument); 
            metadata.AddArgument(requestMessageArgument);
 
            if (this.MessageOut != null)
            {
                RuntimeArgument requestMessageReference = new RuntimeArgument("MessageReference", Constants.MessageType, ArgumentDirection.Out);
                metadata.Bind(this.MessageOut, requestMessageReference); 
                metadata.AddArgument(requestMessageReference);
            } 
 
            metadata.AddImplementationVariable(this.sendMessageInstance);
            metadata.AddImplementationVariable(this.noPersistHandle); 

            metadata.AddImplementationChild(this.channelCorrelationCompletionWaiter);
            metadata.AddImplementationChild(this.openChannelFactory);
            metadata.AddImplementationChild(this.openChannelAndSendMessage); 

            metadata.AddDefaultExtensionProvider(SendMessageChannelCache.DefaultExtensionProvider); 
        } 

        protected override void Cancel(NativeActivityContext context) 
        {
            // Do nothing.  InternalSendMessage cannot be canceled since
            // the individual parts of the process cannot be canceled.
        } 

        protected override void Abort(NativeActivityAbortContext context) 
        { 
            VolatileSendMessageInstance volatileInstance = this.sendMessageInstance.Get(context);
 
            if (volatileInstance != null)
            {
                CleanupResources(volatileInstance.Instance);
            } 
        }
 
        void CleanupResources(SendMessageInstance instance) 
        {
            if (instance != null) 
            {
                instance.Dispose();
            }
        } 

        protected override void Execute(NativeActivityContext context) 
        { 
            //
 


            // The entire InternalSendMessage runs in a no persist zone
            NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context); 
            noPersistHandle.Enter(context);
 
            // Set up the SendMessageInstance, which will 
            // setup an AsyncOperationBlock under the hood and thus block persistence
            // until the message has been sent and we return to the workflow thread 
            SendMessageInstance instance = new SendMessageInstance(this, context);
            SetSendMessageInstance(context, instance);

            if (instance.RequestContext != null) 
            {
                ExecuteClientRequest(context, instance); 
            } 
            else
            { 
                ExecuteServerResponse(context, instance);
            }
        }
 
        void SetSendMessageInstance(NativeActivityContext context, SendMessageInstance instance)
        { 
            VolatileSendMessageInstance volatileInstance = new VolatileSendMessageInstance { Instance = instance }; 
            this.sendMessageInstance.Set(context, volatileInstance);
        } 

        SendMessageInstance GetSendMessageInstance(ActivityContext context)
        {
            VolatileSendMessageInstance volatileInstance = this.sendMessageInstance.Get(context); 

            Fx.Assert(volatileInstance != null, "This should never be null."); 
 
            return volatileInstance.Instance;
        } 

        // Used for server-side send (replies). We don't have any async code here since the
        // Dispatcher handles any completions
        void ExecuteServerResponse(NativeActivityContext context, SendMessageInstance instance) 
        {
            Fx.Assert(instance.ResponseContext != null, "only valid for responses"); 
            Fx.Assert(instance.ResponseContext.WorkflowOperationContext != null, "The WorkflowOperationContext is required on the CorrelationResponseContext"); 
            instance.OperationContext = instance.ResponseContext.WorkflowOperationContext.OperationContext;
 
            // now that we have our op-context, invoke the callback that user might have added in the AEC in the previous activity
            // e.g. distributed compensation activity will add this so that they can convert an execution property
            // to an message properties, as will Transaction Flow
            instance.ProcessMessagePropertyCallbacks(); 

            ProcessSendMessageTrace(context, instance, false); 
 
            // retrieve the correct CorrelationQueryBehavior from the ChannelExtensions collection
            CorrelationQueryBehavior correlationBehavior = null; 
            Collection correlationQueryBehaviors = instance.OperationContext.Channel.Extensions.FindAll();
            foreach (CorrelationQueryBehavior cqb in correlationQueryBehaviors)
            {
                if (cqb.ServiceContractName == this.ServiceContractName) 
                {
                    correlationBehavior = cqb; 
                    break; 
                }
            } 

            //set the reply
            instance.RequestOrReply = this.Message.Get(context);
 
            if (correlationBehavior != null)
            { 
                EnsureCorrelationBehaviorScopeName(context, correlationBehavior); 
                instance.RegisterCorrelationBehavior(correlationBehavior);
 
                if (instance.CorrelationKeyCalculator != null)
                {
                    if (correlationBehavior.SendNames != null && correlationBehavior.SendNames.Count > 0)
                    { 
                        if (correlationBehavior.SendNames.Count == 1 && (correlationBehavior.SendNames.Contains(ContextExchangeCorrelationHelper.CorrelationName)))
                        { 
                            // Contextchannel is the only channel participating in correlation 
                            // Since we already have the instance id, we don't have to wait for the context channel to call us back to initialize
                            // the correlation - InstanceId can be retrieved directly from ContextMessageProperty through Operation context. 
                            ContextMessageProperty contextProperties = null;
                            if (ContextMessageProperty.TryGet(instance.OperationContext.OutgoingMessageProperties, out contextProperties))
                            {
                                // 

                                CorrelationDataMessageProperty.AddData(instance.RequestOrReply, ContextExchangeCorrelationHelper.CorrelationName, () => ContextExchangeCorrelationHelper.GetContextCorrelationData(instance.OperationContext)); 
                            } 
                            // Initialize correlations right away without waiting for the context channel to call us back
                            InitializeCorrelations(context, instance); 
                        }
                        else
                        {
                            // Initialize correlations through channel callback 
                            instance.OperationContext.OutgoingMessageProperties.Add(CorrelationCallbackMessageProperty.Name,
                                new MessageCorrelationCallbackMessageProperty(correlationBehavior.SendNames ?? new string[0], instance)); 
                            instance.CorrelationSynchronizer = new CorrelationSynchronizer(); 
                        }
                    } 
                    else
                    {
                        // there are no channel based queries, we can initialize correlations right away
                        InitializeCorrelations(context, instance); 
                    }
                } 
            } 

            // For exception case: Always call WorkflowOperationContext.SendFault to either send back the fault in the request/reply case 
            // or make sure the error handler extension gets a chance to handle this fault;
            if (instance.ResponseContext.Exception != null)
            {
                try 
                {
                    instance.ResponseContext.WorkflowOperationContext.SendFault(instance.ResponseContext.Exception); 
                } 
                catch (Exception e)
                { 
                    if (Fx.IsFatal(e))
                    {
                        throw;
                    } 
                    instance.ResponseContext.Exception = e;
                } 
            } 
            else
            { 
                try
                {
                    instance.ResponseContext.WorkflowOperationContext.SendReply(instance.RequestOrReply);
                } 
                catch (Exception e)
                { 
                    if (Fx.IsFatal(e)) 
                    {
                        throw; 
                    }
                    instance.ResponseContext.Exception = e;
                }
            } 

            if (TraceUtility.ActivityTracing) 
            { 
                if (instance.AmbientActivityId != Trace.CorrelationManager.ActivityId)
                { 
                    if (TD.StopSignpostEventIsEnabled())
                    {
                        TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary(3) {
                                                    { MessagingActivityHelper.ActivityName, this.DisplayName }, 
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
                                                    { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId } 
                            })); 
                    }
                    FxTrace.Trace.SetAndTraceTransfer(instance.AmbientActivityId, true); 
                    instance.AmbientActivityId = Guid.Empty;
                }
            }
 
            if (instance.CorrelationSynchronizer == null)
            { 
                // We aren't doing any correlation work so we just 
                // finalize the send.
                context.SetValue(this.Message, null); 
                context.SetValue(this.MessageOut, null);

                if (ShouldPersistBeforeSend)
                { 
                    // Need to allow persistence.
                    NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context); 
                    noPersistHandle.Exit(context); 

                    // 
                    context.ScheduleActivity(this.persist, new CompletionCallback(OnPersistCompleted));
                }
                else
                { 
                    FinalizeSendMessageCore(instance);
                } 
            } 
            else
            { 
                // We're doing correlation.  Either the work is already
                // done or we need to synchronize with the channel stack.
                if (instance.CorrelationSynchronizer.IsChannelWorkComplete)
                { 
                    // No need to schedule our completion waiter
                    OnChannelCorrelationCompleteCore(context, instance); 
                } 
                else
                { 
                    context.ScheduleActivity(this.channelCorrelationCompletionWaiter, OnChannelCorrelationComplete, null);
                }

                // We notify that we're done with the send.  If the 
                // correlation processing has already completed then
                // we'll finalize the send. 
                if (instance.CorrelationSynchronizer.NotifySendComplete()) 
                {
                    FinalizeSendMessageCore(instance); 
                }
            }
        }
 
        void ProcessSendMessageTrace(NativeActivityContext context, SendMessageInstance instance, bool isClient)
        { 
            if (TraceUtility.MessageFlowTracing) 
            {
                try 
                {
                    if (TraceUtility.ActivityTracing)
                    {
                        instance.AmbientActivityId = Trace.CorrelationManager.ActivityId; 
                    }
 
                    if (isClient) 
                    {
                        //We need to emit a transfer from WF instance ID to the id set in the TLS 
                        instance.E2EActivityId = Trace.CorrelationManager.ActivityId;
                        if (instance.E2EActivityId == Guid.Empty)
                        {
                            instance.E2EActivityId = Guid.NewGuid(); 
                        }
 
                        if (context.WorkflowInstanceId != instance.E2EActivityId) 
                        {
                            DiagnosticTrace.ActivityId = context.WorkflowInstanceId; 
                            FxTrace.Trace.SetAndTraceTransfer(instance.E2EActivityId, true);
                        }
                    }
                    else 
                    {
                        DiagnosticTrace.ActivityId = context.WorkflowInstanceId; 
                        //transfer is taken care by WorkflowOperationContext 
                        instance.E2EActivityId = instance.ResponseContext.WorkflowOperationContext.E2EActivityId;
                    } 

                    context.Track(
                        new SendMessageRecord(MessagingActivityHelper.MessageCorrelationSendRecord)
                        { 
                            E2EActivityId = instance.E2EActivityId
                        }); 
 
                    if (TraceUtility.ActivityTracing)
                    { 
                        if (TD.StartSignpostEventIsEnabled())
                        {
                            TD.StartSignpostEvent(new DictionaryTraceRecord(new Dictionary(3) {
                                                    { MessagingActivityHelper.ActivityName, this.DisplayName }, 
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
                                                    { MessagingActivityHelper.ActivityInstanceId, context.ActivityInstanceId } 
                            })); 
                        }
                    } 
                }
                catch (Exception ex)
                {
                    if (Fx.IsFatal(ex)) 
                    {
                        throw; 
                    } 
                    FxTrace.Exception.AsInformation(ex);
                } 
            }
        }

        void OnChannelCorrelationComplete(NativeActivityContext context, ActivityInstance completedInstance) 
        {
            SendMessageInstance instance = GetSendMessageInstance(context); 
            Fx.Assert(instance != null, "The instance cannot be null here."); 

            OnChannelCorrelationCompleteCore(context, instance); 
        }

        void OnChannelCorrelationCompleteCore(NativeActivityContext context, SendMessageInstance instance)
        { 
            Message message = InitializeCorrelations(context, instance);
            instance.CorrelationSynchronizer.NotifyMessageUpdatedByWorkflow(message); 
 
            context.SetValue(this.Message, null);
            context.SetValue(this.MessageOut, null); 

            if (this.ShouldPersistBeforeSend && instance.RequestContext == null)
            {
                // Need to allow persistence. 
                NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context);
                noPersistHandle.Exit(context); 
 
                //
                context.ScheduleActivity(this.persist, new CompletionCallback(OnPersistCompleted)); 
            }
            else
            {
                // Create a bookmark to complete the callback, this is to ensure that the InstanceKey does get saved in the PPD 
                // by the time the bookmark is resumed. The instancekey is not getting  saved in the PPD  till workflow gets to
                // the next idle state. 
                // 
                Bookmark completeCorrelationBookmark = context.CreateBookmark(CompleteCorrelationCallback, BookmarkOptions.NonBlocking);
                context.ResumeBookmark(completeCorrelationBookmark, null); 
            }
        }

        void CompleteCorrelationCallback(NativeActivityContext context, Bookmark bookmark, object value) 
        {
            SendMessageInstance instance = GetSendMessageInstance(context); 
            Fx.Assert(instance != null, "The instance cannot be null here."); 

            if (instance.CorrelationSynchronizer.NotifyWorkflowCorrelationProcessingComplete()) 
            {
                // The send complete notification has already occurred
                // so it is up to us to finalize the send.
                FinalizeSendMessageCore(instance); 
            }
        } 
 
        void OnPersistCompleted(NativeActivityContext context, ActivityInstance completedInstance)
        { 
            // We can reenter no persist now
            NoPersistHandle noPersistHandle = this.noPersistHandle.Get(context);
            noPersistHandle.Enter(context);
 
            // We might get back a null here because we've allowed persistence.
            // If that is the case we'll just ignore it ... we don't have any more 
            // meaningful work to do. 
            SendMessageInstance instance = GetSendMessageInstance(context);
 
            if (instance != null)
            {
                // Do it with or without correlation
                if (instance.CorrelationSynchronizer == null || instance.CorrelationSynchronizer.NotifyWorkflowCorrelationProcessingComplete()) 
                {
                    // The send complete notification has already occurred 
                    // so it is up to us to finalize the send. 
                    FinalizeSendMessageCore(instance);
                } 
            }
        }

        void ExecuteClientRequest(NativeActivityContext context, SendMessageInstance instance) 
        {
            // This is the client send request: we need to figure out the channel and request message first 
 
            // Get the Extension for the ChannelSettings
            instance.CacheExtension = context.GetExtension(); 
            Fx.Assert(instance.CacheExtension != null, "channelCacheExtension must exist.");

            // Send.ChannelCacheEnabled must be set before we call CreateEndpointAddress
            // because CreateEndpointAddress will cache description and description resolution depends on the value of ChannelCacheEnabled 
            this.Parent.InitializeChannelCacheEnabledSetting(instance.CacheExtension);
 
            // if there is a correlatesWith handle with callbackcontext(Durable Duplex case), use the callback address and context from 
            // there. The handle could be an explicit 'CorrelatesWith' handle or an ambient handle.
            if (instance.CorrelatesWith != null) 
            {
                if (instance.CorrelatesWith.CallbackContext != null)
                {
                    instance.CorrelationCallbackContext = instance.CorrelatesWith.CallbackContext; 

                    // construct EndpointAdress based on the ListenAddress from callback and the identity and headers from Endpoint or from Config 
                    instance.EndpointAddress = CreateEndpointAddressFromCallback(instance.CorrelationCallbackContext.ListenAddress.ToEndpointAddress()); 
                }
 
                if (instance.CorrelatesWith.Context != null)
                {
                    instance.CorrelationContext = instance.CorrelatesWith.Context;
                } 
            }
            // Request  is always of Type Message. Message Argument will be set by Send using the appropriate formatter 
            instance.RequestOrReply = this.Message.Get(context); 

            if (instance.EndpointAddress == null) 
            {
                //try to get it from endpoint or config
                instance.EndpointAddress = CreateEndpointAddress(context);
            } 

            if (instance.EndpointAddress == null) 
            { 
                throw FxTrace.Exception.AsError(new ValidationException(SR.EndpointAddressNotSetInEndpoint(this.OperationName)));
            } 

            // Configname to be used for the FactoryCacheKey,
            // if endpoint is defined, we use the settings from endpoint and ignore the endpointConfigurationName
            // if endpoint is not defined we use the endpointConfigurationName 
            string configName = (this.Endpoint != null) ? null : this.EndpointConfigurationName;
 
            ProcessSendMessageTrace(context, instance, true); 

            // Get ChannelFactory from the cache 
            ObjectCache channelFactoryCache = null;
            ObjectCacheItem cacheItem = null;
            ChannelCacheSettings channelCacheSettings;
 
            // retrieve the FactoryCacheKey and cache it so that we could use it later
            if (this.cachedFactoryCacheKey == null) 
            { 
                ServiceEndpoint targetEndpoint = this.GetCachedServiceEndpoint();
                this.cachedFactoryCacheKey = new FactoryCacheKey(this.Endpoint, configName, this.IsOneWay, this.TokenImpersonationLevel, 
                    targetEndpoint.Contract, this.correlationQueries);
            }

            // let's decide if we can share the cache from the extension 
            // cache can be share if AllowUnsafeSharing is true or it is safe to share
            if (instance.CacheExtension.AllowUnsafeCaching || this.IsEndpointSettingsSafeForCache()) 
            { 
                channelFactoryCache = instance.CacheExtension.GetFactoryCache();
                Fx.Assert(channelFactoryCache != null, "factory cache should be initialized either from the extension or from the globalcache"); 

                channelCacheSettings = instance.CacheExtension.ChannelSettings;

 
                // Get a ChannelFactoryReference (either cached or brand new)
                KeyValuePair, SendMessageChannelCache> localLastUsedCacheItem = this.lastUsedFactoryCacheItem; 
                if (object.ReferenceEquals(localLastUsedCacheItem.Value, instance.CacheExtension)) 
                {
                    if (localLastUsedCacheItem.Key != null && localLastUsedCacheItem.Key.TryAddReference()) 
                    {
                        cacheItem = localLastUsedCacheItem.Key;
                    }
                    else 
                    {
                        // the item is invalid 
                        this.lastUsedFactoryCacheItem = new KeyValuePair, SendMessageChannelCache>(null, null); 
                    }
                } 

                if (cacheItem == null)
                {
                    // try retrieving the factoryreference directly from the factory cache 
                    cacheItem = channelFactoryCache.Take(this.cachedFactoryCacheKey);
                } 
            } 
            else
            { 
                // not safe to share cache, do not cache anything
                channelCacheSettings = ChannelCacheSettings.EmptyCacheSettings;
            }
 
            ChannelFactoryReference newFactoryReference = null;
            if (cacheItem == null) 
            { 
                // nothing in our cache, we'll have to setup a new factory reference, which ClientSendAsyncResult will open asynchronously
                ServiceEndpoint targetEndpoint = this.CreateServiceEndpoint(); 
                // create a new ChannelFactoryReference that holds the channelfactory and a cache for its channels,
                // cache settings are based on the channelcachesettings provided through the extension
                newFactoryReference = new ChannelFactoryReference(this.cachedFactoryCacheKey, targetEndpoint, channelCacheSettings);
            } 

            instance.SetupFactoryReference(cacheItem, newFactoryReference, channelFactoryCache); 
 
            if (this.onSendFailure == null)
            { 
                this.onSendFailure = new FaultCallback(OnSendFailure);
            }

            if (instance.FactoryReference.NeedsOpen) 
            {
                context.ScheduleActivity(this.openChannelFactory, OnChannelFactoryOpened, this.onSendFailure); 
            } 
            else
            { 
                OnChannelFactoryOpenedCore(context, instance);
            }
        }
 
        void OnSendFailure(NativeActivityFaultContext context, Exception propagatedException, ActivityInstance propagatedFrom)
        { 
            // We throw the exception because we want this activity to abort 
            // as well.  The abort path will take care of performing resource
            // clean-up (see Abort(NativeActivityAbortContext)). 
            throw FxTrace.Exception.AsError(propagatedException);
        }

        void OnChannelFactoryOpened(NativeActivityContext context, ActivityInstance completedInstance) 
        {
            SendMessageInstance instance = GetSendMessageInstance(context); 
            Fx.Assert(instance != null, "Must have a SendMessageInstance here."); 

            OnChannelFactoryOpenedCore(context, instance); 
        }

        void OnChannelFactoryOpenedCore(NativeActivityContext context, SendMessageInstance instance)
        { 
            // now that we know the factory is open, setup our client channel and pool reference
            instance.PopulateClientChannel(); 
 
            IContextChannel contextChannel = instance.ClientSendChannel as IContextChannel;
            instance.OperationContext = (contextChannel == null) ? null : new OperationContext(contextChannel); 

            // Retrieve the CorrelationQueryBehavior from the serviceEndpoint that we used for ChannelFactoryCreation
            // we later look for CorrelationQueryBehavior.SendNames which actually gets initialized during ChannelFactory creation
            // 
            CorrelationQueryBehavior correlationQueryBehavior = instance.FactoryReference.CorrelationQueryBehavior;
 
            if (correlationQueryBehavior != null) 
            {
                EnsureCorrelationBehaviorScopeName(context, correlationQueryBehavior); 
                instance.RegisterCorrelationBehavior(correlationQueryBehavior);
            }

            // now that we have our op-context, invoke the callback that user might have added in the AEC in the previous activity 
            // e.g. distributed compensation activity will add this so that they can convert an execution property
            // to an message properties, as will Transaction Flow 
            instance.ProcessMessagePropertyCallbacks(); 

            // Add the ContextMessage Property if either CallBackContextMessageProperty or ContextMessageProperty is set 
            // if both are set validate that the context is the same in both of them
            ContextMessageProperty contextMessageProperty = null;
            if (instance.CorrelationCallbackContext != null && instance.CorrelationContext != null)
            { 
                // validate if the context is equivalent
                if (MessagingActivityHelper.CompareContextEquality(instance.CorrelationCallbackContext.Context, instance.CorrelationContext.Context)) 
                { 
                    contextMessageProperty = new ContextMessageProperty(instance.CorrelationCallbackContext.Context);
                } 
                else
                {
                    throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ContextMismatchInContextAndCallBackContext));
                } 
            }
            else if (instance.CorrelationCallbackContext != null) 
            { 
                contextMessageProperty = new ContextMessageProperty(instance.CorrelationCallbackContext.Context);
            } 
            else if (instance.CorrelationContext != null)
            {
                contextMessageProperty = new ContextMessageProperty(instance.CorrelationContext.Context);
            } 

            if (contextMessageProperty != null) 
            { 
                contextMessageProperty.AddOrReplaceInMessage(instance.RequestOrReply);
            } 

            // Add callback context Message property with instance id.
            // If binding contains ContextBindingElement with listenaddress set, the callback context message property will flow on the wire
 
            // Pull the instanceId from the CorrelationHandle, if it is already initialized, else create a new GUID.
            // we want to send the callback context only for the first message and when there is a FollowingContextCorrelation defined ( i.e., we are expecting a 
            // receive message back) or when there is an ambienthandle and the handle is not initalized. We will never use CorrelatesWith handle to initialize 
            // FollowingContext, since CorrelatesWith on the client side should always be used for a following correlation
            String contextValue; 
            CorrelationHandle followingContextHandle = instance.ContextBasedCorrelationHandle != null ? instance.ContextBasedCorrelationHandle : instance.AmbientHandle;

            if (followingContextHandle != null && (followingContextHandle.Scope == null || followingContextHandle.Scope.IsInitialized == false))
            { 
                // we are creating a new GUID for the context. As a practice,we don't want to send the WorkflowInstanceId over the wire
                contextValue = Guid.NewGuid().ToString(); 
                Dictionary contextValues = new Dictionary(1) 
                    {
                        {  ContextMessageProperty.InstanceIdKey, contextValue } 
                    };
                new CallbackContextMessageProperty(contextValues).AddOrReplaceInMessage(instance.RequestOrReply);
            }
 
            // verify if we can complete Correlation intialization now
            if (instance.CorrelationSendNames != null) 
            { 
                // we're going to initialize request correlations later
                instance.RequestOrReply.Properties.Add(CorrelationCallbackMessageProperty.Name, 
                    new MessageCorrelationCallbackMessageProperty(instance.CorrelationSendNames, instance));

                instance.CorrelationSynchronizer = new CorrelationSynchronizer();
            } 
            else
            { 
                InitializeCorrelations(context, instance); 
            }
 
            if (instance.CorrelationSynchronizer != null)
            {
                context.ScheduleActivity(this.channelCorrelationCompletionWaiter, OnChannelCorrelationComplete, this.onSendFailure);
            } 

            context.ScheduleActivity(this.openChannelAndSendMessage, OnClientSendComplete, this.onSendFailure); 
        } 

        void OnClientSendComplete(NativeActivityContext context, ActivityInstance completedInstance) 
        {
            SendMessageInstance instance = GetSendMessageInstance(context);

            if (instance.CorrelationSynchronizer == null || instance.CorrelationSynchronizer.NotifySendComplete()) 
            {
                // Either there was no correlation or the send completed 
                // after the correlation processing so we need to do the 
                // finalize
                FinalizeSendMessageCore(instance); 
            }
        }

        Message InitializeCorrelations(NativeActivityContext context, SendMessageInstance instance) 
        {
            if (instance.CorrelationKeyCalculator != null) 
            { 
                // first setup the key-based correlations, pass in the Correlation Initialiers and the AmbientHandle
                // for associating the keys. 
                // For content based correlation, we will never initalize correlation with a selectHandle.It has to be either specified in a CorrelationInitalizer
                // or should be an ambient handle
                // For contextbased correlation, selecthandle will be callbackHandle in case of Send and contextHandle in case of sendReply
                instance.RequestOrReply = MessagingActivityHelper.InitializeCorrelationHandles(context, 
                    instance.ContextBasedCorrelationHandle, instance.AmbientHandle, this.correlationInitializers,
                    instance.CorrelationKeyCalculator, instance.RequestOrReply); 
            } 

            // then setup any channel based correlations as necessary 
            //
            if (instance.RequestContext != null)
            {
                // first check for an explicit association 
                CorrelationHandle channelCorrelationHandle = instance.GetExplicitChannelCorrelationHandle(context, this.correlationInitializers);
                if (channelCorrelationHandle != null) 
                { 
                    if (!channelCorrelationHandle.TryRegisterRequestContext(context, instance.RequestContext))
                    { 
                        throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryRegisterRequestContextFailed));
                    }
                }
                else // if that fails, use the ambient handle. We do not use the CorrelatesWith handle for RequestReply correlation 
                {
                    if (!this.IsOneWay) 
                    { 
                        // we have already validated this in SendMessageInstanceConstructor, just assert here
                        Fx.Assert(instance.AmbientHandle != null, "For two way send we need to have either a RequestReply correlation handle or an ambient handle"); 
                        if (!instance.AmbientHandle.TryRegisterRequestContext(context, instance.RequestContext))
                        {
                            throw FxTrace.Exception.AsError(new InvalidOperationException(SR.TryRegisterRequestContextFailed));
                        } 
                    }
                } 
            } 

            return instance.RequestOrReply; 
        }

        void FinalizeSendMessageCore(SendMessageInstance instance)
        { 
            Exception completionException = instance.GetCompletionException();
 
            if (completionException != null) 
            {
                throw FxTrace.Exception.AsError(completionException); 
            }
        }

        class OpenChannelFactory : AsyncCodeActivity 
        {
            public OpenChannelFactory() 
            { 
            }
 
            public InArgument Instance
            {
                get;
                set; 
            }
 
            protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state) 
            {
                VolatileSendMessageInstance volatileInstance = this.Instance.Get(context); 

                return new OpenChannelFactoryAsyncResult(volatileInstance.Instance, callback, state);
            }
 
            protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
            { 
                OpenChannelFactoryAsyncResult.End(result); 
            }
 
            class OpenChannelFactoryAsyncResult : AsyncResult
            {
                static AsyncCompletion channelFactoryOpenCompletion = new AsyncCompletion(ChannelFactoryOpenCompletion);
 
                SendMessageInstance instance;
 
                public OpenChannelFactoryAsyncResult(SendMessageInstance instance, AsyncCallback callback, object state) 
                    : base(callback, state)
                { 
                    this.instance = instance;

                    bool completeSelf = false;
 
                    if (this.instance.FactoryReference.NeedsOpen)
                    { 
                        IAsyncResult result = this.instance.FactoryReference.BeginOpen(PrepareAsyncCompletion(channelFactoryOpenCompletion), this); 
                        if (result.CompletedSynchronously)
                        { 
                            completeSelf = OnNewChannelFactoryOpened(result);
                        }
                    }
                    else 
                    {
                        completeSelf = true; 
                    } 

                    if (completeSelf) 
                    {
                        Complete(true);
                    }
                } 

                public static void End(IAsyncResult result) 
                { 
                    AsyncResult.End(result);
                } 

                static bool ChannelFactoryOpenCompletion(IAsyncResult result)
                {
                    OpenChannelFactoryAsyncResult thisPtr = (OpenChannelFactoryAsyncResult)result.AsyncState; 
                    return thisPtr.OnNewChannelFactoryOpened(result);
                } 
 
                bool OnNewChannelFactoryOpened(IAsyncResult result)
                { 
                    ObjectCacheItem newCacheItem =
                        this.instance.FactoryReference.EndOpen(result, this.instance.FactoryCache);
                    this.instance.RegisterNewCacheItem(newCacheItem);
 
                    return true;
                } 
            } 
        }
 
        class OpenChannelAndSendMessage : AsyncCodeActivity
        {
            public OpenChannelAndSendMessage()
            { 
            }
 
            public InArgument Instance 
            {
                get; 
                set;
            }

            public InternalSendMessage InternalSendMessage 
            {
                get; 
                set; 
            }
 
            protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
            {
                VolatileSendMessageInstance volatileInstance = this.Instance.Get(context);
                Transaction transaction = null; 

                RuntimeTransactionHandle handle = context.GetProperty(); 
                if (handle != null) 
                {
                    transaction = handle.GetCurrentTransaction(context); 
                }

                return new OpenChannelAndSendMessageAsyncResult(InternalSendMessage, volatileInstance.Instance, transaction, callback, state);
            } 

            protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result) 
            { 
                OpenChannelAndSendMessageAsyncResult.End(result);
            } 

            class OpenChannelAndSendMessageAsyncResult : AsyncResult
            {
                static AsyncCompletion onChannelOpened = new AsyncCompletion(OnChannelOpened); 
                static AsyncCompletion onChannelSendComplete = new AsyncCompletion(OnChannelSendComplete);
                static AsyncCallback onChannelReceiveReplyCompleted = Fx.ThunkCallback(OnChannelReceiveReplyComplete); 
 
                SendMessageInstance instance;
                InternalSendMessage internalSendMessage; 
                IChannel channel;
                Transaction currentTransactionContext;
                Guid ambientActivityId;
 
                //This is used to create a blocking dependent clone to synchronize the transaction commit processing with the completion of the aborting clone
                //that is created in this async result. 
                DependentTransaction dependentClone; 

                public OpenChannelAndSendMessageAsyncResult(InternalSendMessage internalSendMessage, SendMessageInstance instance, Transaction currentTransactionContext, AsyncCallback callback, object state) 
                    : base(callback, state)
                {
                    this.internalSendMessage = internalSendMessage;
                    this.instance = instance; 
                    this.channel = this.instance.ClientSendChannel;
                    this.currentTransactionContext = currentTransactionContext; 
 
                    bool completeSelf = false;
 
                    //channel is still in created state, we need to open it
                    if (this.channel.State == CommunicationState.Created)
                    {
                        // Disable ContextManager before channel is opened 
                        IContextManager contextManager = this.channel.GetProperty();
                        if (contextManager != null) 
                        { 
                            contextManager.Enabled = false;
                        } 

                        IAsyncResult result = this.channel.BeginOpen(PrepareAsyncCompletion(onChannelOpened), this);
                        if (result.CompletedSynchronously)
                        { 
                            completeSelf = OnChannelOpened(result);
                        } 
                    } 
                    else
                    { 
                        // channel already opened & retrieved from cache
                        // we don't have to do anything with ChannelOpen
                        completeSelf = BeginSendMessage();
                    } 

                    if (completeSelf) 
                    { 
                        Complete(true);
                    } 
                }

                public static void End(IAsyncResult result)
                { 
                    AsyncResult.End(result);
                } 
 
                static bool OnChannelOpened(IAsyncResult result)
                { 
                    OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState;

                    thisPtr.channel.EndOpen(result);
                    return thisPtr.BeginSendMessage(); 
                }
 
                bool BeginSendMessage() 
                {
                    IAsyncResult result = null; 
                    bool requestSucceeded = false;
                    OperationContext oldContext = OperationContext.Current;
                    bool asyncSend = !this.internalSendMessage.IsOneWay;
 
                    try
                    { 
                        OperationContext.Current = this.instance.OperationContext; 

                        if (TraceUtility.MessageFlowTracingOnly) 
                        {
                            //set the E2E activity ID
                            DiagnosticTrace.ActivityId = this.instance.E2EActivityId;
                        } 

                        using (PrepareTransactionalCall(this.currentTransactionContext)) 
                        { 
                            if (asyncSend)
                            { 
                                //If there is a transaction that we could be flowing out then we create this blocking clone to [....] with the commit processing.
                                if (this.currentTransactionContext != null)
                                {
                                    this.dependentClone = this.currentTransactionContext.DependentClone(DependentCloneOption.BlockCommitUntilComplete); 
                                }
 
                                this.instance.RequestContext.EnsureAsyncWaitHandle(); 

                                result = ((IRequestChannel)this.channel).BeginRequest(this.instance.RequestOrReply, onChannelReceiveReplyCompleted, this); 
                                if (result.CompletedSynchronously)
                                {
                                    Message reply = ((IRequestChannel)this.channel).EndRequest(result);
                                    this.instance.RequestContext.ReceiveReply(this.instance.OperationContext, reply); 
                                }
                            } 
                            else 
                            {
                                result = ((IOutputChannel)this.channel).BeginSend(this.instance.RequestOrReply, PrepareAsyncCompletion(onChannelSendComplete), this); 
                                if (result.CompletedSynchronously)
                                {
                                    ((IOutputChannel)this.channel).EndSend(result);
                                } 
                            }
 
                            requestSucceeded = true; 
                        }
                    } 
                    finally
                    {
                        OperationContext.Current = oldContext;
 
                        if (!requestSucceeded)
                        { 
                            //if we did not succeed, complete the blocking clone anyway if we created it 
                            if (this.dependentClone != null)
                            { 
                                this.dependentClone.Complete();
                                this.dependentClone = null;
                            }
                            this.channel.Abort(); 
                        }
 
                        if (result != null && result.CompletedSynchronously) 
                        {
                            //if we are done synchronously, we need to complete a blocking dependent clone if we created one (asyncSend case) 
                            if (this.dependentClone != null)
                            {
                                this.dependentClone.Complete();
                                this.dependentClone = null; 
                            }
                            this.internalSendMessage.CleanupResources(this.instance); 
                        } 
                    }
 
                    if (asyncSend)
                    {
                        return true;
                    } 
                    else
                    { 
                        return SyncContinue(result); 
                    }
                } 

                static void OnChannelReceiveReplyComplete(IAsyncResult result)
                {
                    if (result.CompletedSynchronously) 
                    {
                        return; 
                    } 

                    OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState; 

                    OperationContext oldContext = OperationContext.Current;

                    Message reply = null; 
                    bool requestSucceeded = false;
 
                    try 
                    {
                        OperationContext.Current = thisPtr.instance.OperationContext; 

                        thisPtr.TraceActivityData();

                        System.Transactions.TransactionScope scope = Fx.CreateTransactionScope(thisPtr.currentTransactionContext); 
                        try
                        { 
                            Fx.Assert(thisPtr.channel is IRequestChannel, "Channel must be of IRequestChannel type!"); 

                            reply = ((IRequestChannel)thisPtr.channel).EndRequest(result); 

                            //
                            thisPtr.instance.RequestContext.ReceiveAsyncReply(thisPtr.instance.OperationContext, reply, null);
 
                            requestSucceeded = true;
                        } 
                        finally 
                        {
                            Fx.CompleteTransactionScope(ref scope); 
                        }
                    }
                    catch (Exception exception)
                    { 
                        if (Fx.IsFatal(exception))
                        { 
                            throw; 
                        }
 
                        thisPtr.instance.RequestContext.ReceiveAsyncReply(thisPtr.instance.OperationContext, null, exception);
                    }
                    finally
                    { 
                        //Complete the blocking dependent clone created before the async call was made.
                        if (thisPtr.dependentClone != null) 
                        { 
                            thisPtr.dependentClone.Complete();
                            thisPtr.dependentClone = null; 
                        }

                        OperationContext.Current = oldContext;
 
                        if (!requestSucceeded)
                        { 
                            thisPtr.channel.Abort(); 
                        }
 
                        thisPtr.internalSendMessage.CleanupResources(thisPtr.instance);
                    }
                }
 
                static bool OnChannelSendComplete(IAsyncResult result)
                { 
                    if (result.CompletedSynchronously) 
                    {
                        return true; 
                    }

                    OpenChannelAndSendMessageAsyncResult thisPtr = (OpenChannelAndSendMessageAsyncResult)result.AsyncState;
 
                    OperationContext oldContext = OperationContext.Current;
 
                    try 
                    {
                        OperationContext.Current = thisPtr.instance.OperationContext; 

                        thisPtr.TraceActivityData();

                        System.Transactions.TransactionScope scope = Fx.CreateTransactionScope(thisPtr.currentTransactionContext); 
                        try
                        { 
                            Fx.Assert(thisPtr.channel is IOutputChannel, "Channel must be of IOutputChannel type!"); 

                            ((IOutputChannel)thisPtr.channel).EndSend(result); 
                        }
                        finally
                        {
                            Fx.CompleteTransactionScope(ref scope); 
                        }
                    } 
                    catch (Exception exception) 
                    {
                        if (Fx.IsFatal(exception)) 
                        {
                            throw;
                        }
 
                        // stash away the exception to be retrieved in FinalizeSendMessageCore
                        thisPtr.instance.RequestContext.Exception = exception; 
                    } 
                    finally
                    { 
                        OperationContext.Current = oldContext;
                        thisPtr.internalSendMessage.CleanupResources(thisPtr.instance);
                    }
 
                    return true;
                } 
 
                void TraceActivityData()
                { 
                    if (TraceUtility.ActivityTracing)
                    {
                        if (TD.StopSignpostEventIsEnabled())
                        { 
                            TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary(3) {
                                                    { MessagingActivityHelper.ActivityName, this.instance.Activity.DisplayName }, 
                                                    { MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution }, 
                                                    { MessagingActivityHelper.ActivityInstanceId, this.instance.ActivityInstanceId }
                                })); 
                        }
                        FxTrace.Trace.SetAndTraceTransfer(this.ambientActivityId, true);
                        this.ambientActivityId = Guid.Empty;
                    } 
                }
            } 
        } 

        class WaitOnChannelCorrelation : AsyncCodeActivity 
        {
            public WaitOnChannelCorrelation()
            {
            } 

            public InArgument Instance 
            { 
                get;
                set; 
            }

            protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
            { 
                VolatileSendMessageInstance volatileInstance = this.Instance.Get(context);
 
                Fx.Assert(volatileInstance.Instance != null, "This should not have gone through a persistence episode yet."); 

                return new WaitOnChannelCorrelationAsyncResult(volatileInstance.Instance.CorrelationSynchronizer, callback, state); 
            }

            protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
            { 
                WaitOnChannelCorrelationAsyncResult.End(result);
            } 
 
            class WaitOnChannelCorrelationAsyncResult : AsyncResult
            { 
                CorrelationSynchronizer synchronizer;

                public WaitOnChannelCorrelationAsyncResult(CorrelationSynchronizer synchronizer, AsyncCallback callback, object state)
                    : base(callback, state) 
                {
                    this.synchronizer = synchronizer; 
 
                    if (synchronizer.IsChannelWorkComplete)
                    { 
                        Complete(true);
                    }
                    else
                    { 
                        if (synchronizer.SetWorkflowNotificationCallback(new Action(OnChannelCorrelationComplete)))
                        { 
                            // The bool flipped just before we set the action so 
                            // we're actually complete.  The contract is that the
                            // action will never be raised if Set returns true. 
                            Complete(true);
                        }
                    }
                } 

                public static void End(IAsyncResult result) 
                { 
                    AsyncResult.End(result);
                } 

                void OnChannelCorrelationComplete()
                {
                    Complete(false); 
                }
            } 
        } 

        class CorrelationSynchronizer 
        {
            Action onRequestSetByChannel;
            Action onWorkflowCorrelationProcessingComplete;
            object thisLock; 
            Completion completion;
 
            public CorrelationSynchronizer() 
            {
                this.thisLock = new object(); 
            }

            public bool IsChannelWorkComplete
            { 
                get;
                private set; 
            } 

            public Message UpdatedMessage 
            {
                get;
                private set;
            } 

            public void NotifyRequestSetByChannel(Action onWorkflowCorrelationProcessingComplete) 
            { 
                Fx.Assert(onWorkflowCorrelationProcessingComplete != null, "Must have a non-null callback.");
                Action toCall = null; 

                lock (this.thisLock)
                {
                    this.IsChannelWorkComplete = true; 
                    this.onWorkflowCorrelationProcessingComplete = onWorkflowCorrelationProcessingComplete;
 
                    toCall = this.onRequestSetByChannel; 
                }
 
                if (toCall != null)
                {
                    toCall();
                } 
            }
 
            public void NotifyMessageUpdatedByWorkflow(Message message) 
            {
                this.UpdatedMessage = message; 
            }

            public bool NotifyWorkflowCorrelationProcessingComplete()
            { 
                Fx.Assert(this.onWorkflowCorrelationProcessingComplete != null, "This must be set before this can be called.");
 
                bool result = false; 

                lock (this.thisLock) 
                {
                    if (this.completion == Completion.SendComplete)
                    {
                        // The send has already completed so we are responsible for 
                        // making sure FinalizeSendMessage is called.
                        result = true; 
                    } 
                    else
                    { 
                        Fx.Assert(this.completion == Completion.None, "We should be the first one to complete.");

                        this.completion = Completion.CorrelationComplete;
                    } 
                }
 
                this.onWorkflowCorrelationProcessingComplete(this.UpdatedMessage); 

                return result; 
            }

            public bool NotifySendComplete()
            { 
                bool result = false;
                lock (this.thisLock) 
                { 
                    if (this.completion == Completion.CorrelationComplete)
                    { 
                        // The correlation has already finished so we are responsible for
                        // making sure that FinalizeSendMessage is called.
                        result = true;
                    } 
                    else
                    { 
                        Fx.Assert(this.completion == Completion.None, "We should be the first one to complete."); 

                        this.completion = Completion.SendComplete; 
                    }
                }

                return result; 
            }
 
            // Returns true if the channel work is actually done.  If this 
            // returns true then the passed in Action will never be called.
            public bool SetWorkflowNotificationCallback(Action onRequestSetByChannel) 
            {
                Fx.Assert(onRequestSetByChannel != null, "Must have a non-null callback.");

                bool result = false; 
                lock (this.thisLock)
                { 
                    result = this.IsChannelWorkComplete; 
                    this.onRequestSetByChannel = onRequestSetByChannel;
                } 

                return result;
            }
 
            // This three state enum allows us to determine whether
            // we are the first or second code path.  The second 
            // code path needs finalize the send. 
            enum Completion
            { 
                None,
                SendComplete,
                CorrelationComplete
            } 
        }
 
        // This class defines the instance data that used to store intermediate states 
        // during the volatile async operation of sending a message.
        class SendMessageInstance 
        {
            CorrelationHandle explicitChannelCorrelationHandle;
            IList sendMessageCallbacks;
            ChannelFactoryReference factoryReference; 
            ObjectCacheItem cacheItem;
            ObjectCache factoryCache; 
            readonly InternalSendMessage parent; 
            bool isUsingCacheFromExtension;
 
            // needed so that we can return our ClientSendChannel to the pool under Dispose
            ObjectCacheItem> clientChannelPool;

            public SendMessageInstance(InternalSendMessage parent, NativeActivityContext context) 
            {
                this.parent = parent; 
 
                // setup both our following state as well as any anonymous response information
                CorrelationHandle correlatesWith = (parent.CorrelatesWith == null) ? null : parent.CorrelatesWith.Get(context); 
                if (correlatesWith != null && !correlatesWith.IsInitalized())
                {
                    // if send or sendReply has a correlatesWith, it should always be initialized with either content or with callbackcontext, context or
                    // ResponseContext 
                    throw FxTrace.Exception.AsError(new ValidationException(SR.SendWithUninitializedCorrelatesWith(this.parent.OperationName ?? string.Empty)));
                } 
 
                if (correlatesWith == null)
                { 
                    this.AmbientHandle = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
                    correlatesWith = this.AmbientHandle;
                }
 
                this.CorrelatesWith = correlatesWith;
 
                if (!parent.IsSendReply) 
                {
                    // we're a client-side request 

                    // Validate correlation handle
                    CorrelationHandle channelCorrelationHandle = GetExplicitChannelCorrelationHandle(context, parent.correlationInitializers);
                    if (parent.IsOneWay) 
                    {
                        if (channelCorrelationHandle != null) 
                        { 
                            // this is a one-way send , we should not have a RequestReply Correlation initializer
                            throw FxTrace.Exception.AsError(new InvalidOperationException(SR.RequestReplyHandleShouldNotBePresentForOneWay)); 

                        }
                    }
                    else // two-way send 
                    {
                        if (channelCorrelationHandle == null && this.AmbientHandle == null) 
                        { 
                            this.AmbientHandle = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
                            if (this.AmbientHandle == null) 
                            {
                                // we neither have a channelHandle nor an ambientHandle
                                throw FxTrace.Exception.AsError(new InvalidOperationException(
                                    SR.SendMessageNeedsToPairWithReceiveMessageForTwoWayContract(parent.OperationName ?? string.Empty))); 
                            }
                        } 
                    } 

                    // Formatter and OperationContract should be  removed from CorrelationRequestContext 
                    // This will be done when SendMessage/ReceiveMessage is completely removed from the code base
                    this.RequestContext = new CorrelationRequestContext();

                    // callback correlationHandle is used for initalizing context based correlation 
                    this.ContextBasedCorrelationHandle = CorrelationHandle.GetExplicitCallbackCorrelation(context, parent.correlationInitializers);
 
                    // by default we use the channel factory cache from the extension 
                    isUsingCacheFromExtension = true;
                } 
                else
                {
                    // we are a server-side following send
                    CorrelationResponseContext responseContext; 
                    if (correlatesWith == null || !correlatesWith.TryAcquireResponseContext(context, out responseContext))
                    { 
                        throw FxTrace.Exception.AsError(new InvalidOperationException(SR.CorrelatedContextRequiredForAnonymousSend)); 
                    }
 
                    // Contract inference logic should validate that the Receive and Following send do not have conflicting data(e.g., OperationName)

                    this.ResponseContext = responseContext;
 
                    // in case of Context based correlation, we use context handle to initialize correlation
                    this.ContextBasedCorrelationHandle = CorrelationHandle.GetExplicitContextCorrelation(context, parent.correlationInitializers); 
                } 

                this.sendMessageCallbacks = MessagingActivityHelper.GetCallbacks(context.Properties); 

                if (TraceUtility.MessageFlowTracing)
                {
                    this.ActivityInstanceId = context.ActivityInstanceId; 
                }
            } 
 
            public InternalSendMessage Activity
            { 
                get
                {
                    return this.parent;
                } 
            }
 
            public CorrelationHandle CorrelatesWith 
            {
                get; 
                private set;
            }

            public CorrelationHandle AmbientHandle 
            {
                get; 
                private set; 
            }
 
            public CorrelationHandle ContextBasedCorrelationHandle
            {
                get;
                private set; 
            }
 
            public EndpointAddress EndpointAddress 
            {
                get; 
                set;
            }

            public IChannel ClientSendChannel 
            {
                get; 
                private set; 
            }
 
            public CorrelationSynchronizer CorrelationSynchronizer
            {
                get;
                set; 
            }
 
            public Message RequestOrReply 
            {
                get; 
                set;
            }

            public OperationContext OperationContext 
            {
                get; 
                set; 
            }
 
            public CorrelationRequestContext RequestContext
            {
                get;
                private set; 
            }
 
            // This is required for setting adding the ChannelFactory to the cache once it is opened 
            public ObjectCache FactoryCache
            { 
                get
                {
                    return this.factoryCache;
                } 
            }
 
            // This is required for setting adding the ChannelFactory to the cache once it is opened 
            public SendMessageChannelCache CacheExtension
            { 
                get;
                set;
            }
 
            //This is required for returning it to the cache after use
            public ChannelFactoryReference FactoryReference 
            { 
                get
                { 
                    return this.factoryReference;
                }
            }
 
            public CorrelationResponseContext ResponseContext
            { 
                get; 
                private set;
            } 

            public CorrelationKeyCalculator CorrelationKeyCalculator
            {
                get; 
                private set;
            } 
 
            public CorrelationCallbackContext CorrelationCallbackContext
            { 
                get;
                set;
            }
 
            public CorrelationContext CorrelationContext
            { 
                get; 
                set;
            } 

            public Guid AmbientActivityId
            {
                get; 
                set;
            } 
 
            public ICollection CorrelationSendNames
            { 
                get;
                private set;
            }
 
            public Guid E2EActivityId
            { 
                get; 
                set;
            } 

            public string ActivityInstanceId
            {
                get; 
                private set;
            } 
 
            public bool IsCorrelationInitialized
            { 
                get;
                set;
            }
 
            public void SetupFactoryReference(ObjectCacheItem cacheItem, ChannelFactoryReference newFactoryReference, ObjectCache factoryCache)
            { 
                this.factoryCache = factoryCache; 
                if (this.factoryCache == null)
                { 
                    isUsingCacheFromExtension = false;
                }
                if (cacheItem != null)
                { 
                    // we found the item in our cache
                    Fx.Assert(newFactoryReference == null, "need one of cacheItem or newFactoryReference"); 
                    Fx.Assert(cacheItem.Value != null, "should have valid value"); 
                    this.cacheItem = cacheItem;
                    this.factoryReference = cacheItem.Value; 
                }
                else
                {
                    Fx.Assert(newFactoryReference != null, "need one of cacheItem or newFactoryReference"); 
                    this.factoryReference = newFactoryReference;
                } 
            } 

            public void RegisterNewCacheItem(ObjectCacheItem newCacheItem) 
            {
                Fx.Assert(this.cacheItem == null, "should only be called for new cache items");
                this.cacheItem = newCacheItem;
            } 

            public CorrelationHandle GetExplicitChannelCorrelationHandle(NativeActivityContext context, Collection additionalCorrelations) 
            { 
                if (this.explicitChannelCorrelationHandle == null)
                { 
                    this.explicitChannelCorrelationHandle = CorrelationHandle.GetExplicitChannelCorrelation(context, additionalCorrelations);
                }
                return this.explicitChannelCorrelationHandle;
            } 

            public void RegisterCorrelationBehavior(CorrelationQueryBehavior correlationBehavior) 
            { 
                Fx.Assert(correlationBehavior != null, "caller must verify");
                if (correlationBehavior.ScopeName != null) 
                {
                    CorrelationKeyCalculator keyCalculator = correlationBehavior.GetKeyCalculator();
                    if (keyCalculator != null)
                    { 
                        this.CorrelationKeyCalculator = keyCalculator;
                        if (this.RequestContext != null) 
                        { 
                            this.RequestContext.CorrelationKeyCalculator = keyCalculator;
                            // for requests, determine if we should be using the correlation callback 
                            if (correlationBehavior.SendNames != null && correlationBehavior.SendNames.Count > 0)
                            {
                                this.CorrelationSendNames = correlationBehavior.SendNames;
                            } 
                        }
                    } 
                } 
            }
 
            public void ProcessMessagePropertyCallbacks()
            {
                if (this.sendMessageCallbacks != null)
                { 
                    foreach (ISendMessageCallback sendMessageCallback in this.sendMessageCallbacks)
                    { 
                        sendMessageCallback.OnSendMessage(this.OperationContext); 
                    }
                } 
            }

            public void PopulateClientChannel()
            { 
                Fx.Assert(this.ClientSendChannel == null && this.clientChannelPool == null, "should only be called once per instance");
                this.ClientSendChannel = this.FactoryReference.TakeChannel(this.EndpointAddress, out this.clientChannelPool); 
            } 

            public void Dispose() 
            {
                if (this.ClientSendChannel != null)
                {
                    Fx.Assert(this.FactoryReference != null, "Must have a factory reference."); 
                    this.FactoryReference.ReturnChannel(this.ClientSendChannel, this.clientChannelPool);
                    this.ClientSendChannel = null; 
                    this.clientChannelPool = null; 
                }
 
                if (this.cacheItem != null)
                {
                    this.cacheItem.ReleaseReference();
 
                    // if we are using the FactoryCache from the extension, store the last used cacheItem and extension
                    if (this.isUsingCacheFromExtension) 
                    { 
                        this.parent.lastUsedFactoryCacheItem = new KeyValuePair, SendMessageChannelCache>(this.cacheItem, this.CacheExtension);
                    } 
                    this.cacheItem = null;
                }
            }
 
            public Exception GetCompletionException()
            { 
                if (this.RequestContext != null) 
                {
                    // We got an exception trying to send message or receive a reply 
                    // Scenario: ContractFilterMismatch at serverside if the message action is not matched correctly
                    return this.RequestContext.Exception;
                }
                else 
                {
                    return this.ResponseContext.Exception; 
                } 
            }
        } 

        class MessageCorrelationCallbackMessageProperty : CorrelationCallbackMessageProperty
        {
            public MessageCorrelationCallbackMessageProperty(ICollection neededData, SendMessageInstance instance) 
                : base(neededData)
            { 
                this.Instance = instance; 
            }
 
            protected MessageCorrelationCallbackMessageProperty(MessageCorrelationCallbackMessageProperty callback)
                : base(callback)
            {
                this.Instance = callback.Instance; 
            }
 
            public SendMessageInstance Instance 
            {
                get; 
                private set;
            }

            public override IMessageProperty CreateCopy() 
            {
                return new MessageCorrelationCallbackMessageProperty(this); 
            } 

            protected override IAsyncResult OnBeginFinalizeCorrelation(Message message, TimeSpan timeout, AsyncCallback callback, object state) 
            {
                return new FinalizeCorrelationAsyncResult(this, message, callback, state);
            }
 
            protected override Message OnEndFinalizeCorrelation(IAsyncResult result)
            { 
                return FinalizeCorrelationAsyncResult.End(result); 
            }
 
            protected override Message OnFinalizeCorrelation(Message message, TimeSpan timeout)
            {
                return OnEndFinalizeCorrelation(OnBeginFinalizeCorrelation(message, timeout, null, null));
            } 

            class FinalizeCorrelationAsyncResult : AsyncResult 
            { 
                Message message;
                Completion completion; 

                object thisLock;

                public FinalizeCorrelationAsyncResult(MessageCorrelationCallbackMessageProperty property, Message message, 
                    AsyncCallback callback, object state)
                    : base(callback, state) 
                { 
                    bool completeSelf = false;
                    if (property.Instance.IsCorrelationInitialized) 
                    {
                        // we do not modify the message since correlation is not calculated again
                        this.message = message;
                        completeSelf = true; 
                    }
                    else 
                    { 
                        property.Instance.IsCorrelationInitialized = true;
                        this.thisLock = new object(); 

                        property.Instance.RequestOrReply = message;

                        property.Instance.CorrelationSynchronizer.NotifyRequestSetByChannel(new Action(OnWorkflowCorrelationProcessingComplete)); 

                        // We have to do this dance with the lock because 
                        // we aren't sure if we've been running [....] or not. 
                        // NOTE: It is possible for us to go async and
                        // still decide we're completing [....].  This is fine 
                        // as it does not violate the async pattern since
                        // the work is done by the time Begin completes.
                        completeSelf = false;
 
                        lock (this.thisLock)
                        { 
                            if (completion == Completion.WorkflowCorrelationProcessingComplete) 
                            {
                                completeSelf = true; 
                            }
                            else
                            {
                                Fx.Assert(this.completion == Completion.None, "We must be not ready then."); 

                                this.completion = Completion.ConstructorComplete; 
                            } 
                        }
                    } 
                    if (completeSelf)
                    {
                        Complete(true);
                    } 

                } 
 
                void OnWorkflowCorrelationProcessingComplete(Message updatedMessage)
                { 
                    this.message = updatedMessage;

                    // We have to do this dance with the lock because
                    // we aren't sure if we've been running [....] or not. 
                    // NOTE: It is possible for us to go async and
                    // still decide we're completing [....].  This is fine 
                    // as it does not violate the async pattern since 
                    // the work is done by the time Begin completes.
                    bool completeSelf = false; 

                    lock (this.thisLock)
                    {
                        if (this.completion == Completion.ConstructorComplete) 
                        {
                            completeSelf = true; 
                        } 
                        else
                        { 
                            Fx.Assert(this.completion == Completion.None, "We must be not ready then.");

                            this.completion = Completion.WorkflowCorrelationProcessingComplete;
                        } 
                    }
 
                    if (completeSelf) 
                    {
                        Complete(false); 
                    }
                }

                public static Message End(IAsyncResult result) 
                {
                    FinalizeCorrelationAsyncResult thisPtr = AsyncResult.End(result); 
                    return thisPtr.message; 
                }
 
                // This three state enum allows us to determine whether
                // we are the first or second code path.  The second
                // code path needs to complete the async result.
                enum Completion 
                {
                    None, 
                    ConstructorComplete, 
                    WorkflowCorrelationProcessingComplete
                } 
            }
        }

        [DataContract] 
        class VolatileSendMessageInstance
        { 
            public VolatileSendMessageInstance() 
            {
            } 

            // Note that we do not mark this DataMember since we don�t want it to be serialized
            public SendMessageInstance Instance { get; set; }
        } 

        // Represents an item in our object cache. Stores a ChannelFactory and an associated pool of channels 
        internal sealed class ChannelFactoryReference : IDisposable 
        {
            static Action> disposeChannelPool = new Action>(DisposeChannelPool); 
            static AsyncCallback onDisposeCommunicationObject = Fx.ThunkCallback(new AsyncCallback(OnDisposeCommunicationObject));
            readonly FactoryCacheKey factoryKey;
            readonly ServiceEndpoint targetEndpoint;
            ChannelFactory channelFactory; 
            ObjectCache> channelCache;
            CorrelationQueryBehavior correlationQueryBehavior; 
            Func> createChannelCacheItem; 

            public ChannelFactoryReference(FactoryCacheKey factoryKey, ServiceEndpoint targetEndpoint, ChannelCacheSettings channelCacheSettings) 
            {
                Fx.Assert(channelCacheSettings != null, " channelCacheSettings should not be null");
                Fx.Assert(factoryKey != null, " factoryKey should not be null");
                Fx.Assert(targetEndpoint != null, " targetEndpoint should not be null"); 

                this.factoryKey = factoryKey; 
                this.targetEndpoint = targetEndpoint; 

                if (factoryKey.IsOperationContractOneWay) 
                {
                    this.channelFactory = new ChannelFactory(targetEndpoint);
                }
                else 
                {
                    this.channelFactory = new ChannelFactory(targetEndpoint); 
                } 
                this.channelFactory.UseActiveAutoClose = true;
                this.channelFactory.Credentials.Windows.AllowedImpersonationLevel = factoryKey.TokenImpersonationLevel; 

                ObjectCacheSettings channelSettings = new ObjectCacheSettings
                {
                    CacheLimit = channelCacheSettings.MaxItemsInCache, 
                    IdleTimeout = channelCacheSettings.IdleTimeout,
                    LeaseTimeout = channelCacheSettings.LeaseTimeout 
                }; 

                // our channel cache is keyed solely on endpoint since we don't allow the via to be dynamic 
                // for a ChannelFactoryReference
                this.channelCache = new ObjectCache>(channelSettings)
                {
                    DisposeItemCallback = disposeChannelPool 
                };
                this.createChannelCacheItem = () => new Pool(channelCacheSettings.MaxItemsInCache); 
            } 

            public CorrelationQueryBehavior CorrelationQueryBehavior 
            {
                get
                {
                    if (this.correlationQueryBehavior == null) 
                    {
                        this.correlationQueryBehavior = this.targetEndpoint.Behaviors.Find(); 
                    } 

                    return this.correlationQueryBehavior; 
                }
            }

            // As a perf optimization, we provide this property to avoid async result/callback creations 
            public bool NeedsOpen
            { 
                get 
                {
                    return this.channelFactory.State == CommunicationState.Created; 
                }
            }

            public IAsyncResult BeginOpen(AsyncCallback callback, object state) 
            {
                Fx.Assert(NeedsOpen, "caller should check NeedsOpen first"); 
                return this.channelFactory.BeginOpen(callback, state); 
            }
 
            // after open we should be added to a cache if one is provided
            public ObjectCacheItem EndOpen(IAsyncResult result, ObjectCache factoryCache)
            {
                this.channelFactory.EndOpen(result); 

                ObjectCacheItem cacheItem = null; 
                if (factoryCache != null) 
                {
                    cacheItem = factoryCache.Add(this.factoryKey, this); 
                }

                return cacheItem;
            } 

            [SuppressMessage(FxCop.Category.Usage, FxCop.Rule.DisposableFieldsShouldBeDisposed, 
                Justification = "disposable field is being disposed using DisposeCommunicationObject")] 
            public void Dispose()
            { 
                DisposeCommunicationObject(this.channelFactory);
            }

            public IChannel TakeChannel(EndpointAddress endpointAddress, out ObjectCacheItem> channelPool) 
            {
                channelPool = this.channelCache.Take(endpointAddress, this.createChannelCacheItem); 
                Fx.Assert(channelPool != null, "Take with delegate should always return a valid Item"); 

                IChannel result; 
                lock (channelPool.Value)
                {
                    result = channelPool.Value.Take();
                } 

                // make an effort to kill stale channels 
                ServiceChannel serviceChannel = result as ServiceChannel; 
                if (result != null && ( result.State != CommunicationState.Opened || (serviceChannel != null && serviceChannel.Binder.Channel.State != CommunicationState.Opened)))
                { 
                    result.Abort();
                    result = null;
                }
 
                if (result == null)
                { 
                    Uri via = null; 

                    // service endpoint always sets the ListenUri, which will break default callback-context behavior 
                    if (this.targetEndpoint.Address != null && this.targetEndpoint.Address.Uri != this.targetEndpoint.ListenUri)
                    {
                        via = this.targetEndpoint.ListenUri;
                    } 

                    if (this.factoryKey.IsOperationContractOneWay) 
                    { 
                        result = ((ChannelFactory)this.channelFactory).CreateChannel(endpointAddress, via);
                    } 
                    else
                    {
                        result = ((ChannelFactory)this.channelFactory).CreateChannel(endpointAddress, via);
                    } 
                }
 
                if (!(result is ServiceChannel)) 
                {
                    result = ServiceChannelFactory.GetServiceChannel(result); 
                }

                return result;
            } 

            public void ReturnChannel(IChannel channel, ObjectCacheItem> channelPool) 
            { 
                bool shouldDispose = channel.State != CommunicationState.Opened;
 
                // channel is in open state, try returning it to the pool
                if (!shouldDispose)
                {
                    lock (channelPool.Value) 
                    {
                        shouldDispose = !channelPool.Value.Return(channel); 
                    } 
                }
 
                if (shouldDispose)
                {
                     // not caching the channel, so we need to close it
                     DisposeCommunicationObject(channel); 
                }
 
                // and return our cache item 
                channelPool.ReleaseReference();
            } 

            static void DisposeChannelPool(Pool channelPool)
            {
                IChannel channel; 

                // we don't need to lock the Take from the Pool here since no one will be accessing this anymore 
                // Dispose will be called under a lock from the ObjectCacheItem 
                while ((channel = channelPool.Take()) != null)
                { 
                    DisposeCommunicationObject(channel);
                }
            }
 
            static void DisposeCommunicationObject(ICommunicationObject communicationObject)
            { 
                bool success = false; 
                try
                { 
                    if (communicationObject.State == CommunicationState.Opened)
                    {
                        IAsyncResult result = communicationObject.BeginClose(ServiceDefaults.CloseTimeout, onDisposeCommunicationObject, communicationObject);
                        if (result.CompletedSynchronously) 
                        {
                            communicationObject.EndClose(result); 
                        } 
                        success = true;
                    } 
                }
                catch (CommunicationException)
                {
                    // expected, we'll abort 
                }
                catch (TimeoutException) 
                { 
                    // expected, we'll abort
                } 
                finally
                {
                    if (!success)
                    { 
                        communicationObject.Abort();
                    } 
                } 
            }
 
            static void OnDisposeCommunicationObject(IAsyncResult result)
            {
                if (result.CompletedSynchronously)
                { 
                    return;
                } 
                ICommunicationObject communicationObject = (ICommunicationObject)result.AsyncState; 

                bool success = false; 
                try
                {
                    communicationObject.EndClose(result);
                    success = true; 
                }
                catch (CommunicationException) 
                { 
                    // expected, we'll abort
                } 
                catch (TimeoutException)
                {
                    // expected, we'll abort
                } 
                finally
                { 
                    if (!success) 
                    {
                        communicationObject.Abort(); 
                    }
                }
            }
        } 

        internal class FactoryCacheKey : IEquatable 
        { 
            Endpoint endpoint;
            bool isOperationContractOneWay; 
            TokenImpersonationLevel tokenImpersonationLevel;
            ContractDescription contract;
            Collection correlationQueries;
            string endpointConfigurationName; 

            public FactoryCacheKey(Endpoint endpoint, string endpointConfigurationName, bool isOperationOneway, 
                TokenImpersonationLevel tokenImpersonationLevel, ContractDescription contractDescription, 
                ICollection correlationQueries)
            { 
                this.endpoint = endpoint;
                this.endpointConfigurationName = endpointConfigurationName;
                this.isOperationContractOneWay = isOperationOneway;
                this.tokenImpersonationLevel = tokenImpersonationLevel; 
                this.contract = contractDescription;
 
                if (correlationQueries != null) 
                {
                    this.correlationQueries = new Collection(); 
                    foreach (CorrelationQuery query in correlationQueries)
                    {
                        this.correlationQueries.Add(query);
                    } 
                }
            } 
 
            public bool IsOperationContractOneWay
            { 
                get
                {
                    return this.isOperationContractOneWay;
                } 
            }
 
            public TokenImpersonationLevel TokenImpersonationLevel 
            {
                get 
                {
                    return this.tokenImpersonationLevel;
                }
            } 

            public bool Equals(FactoryCacheKey other) 
            { 
                if (object.ReferenceEquals(this, other))
                { 
                    return true;
                }

                if (other == null) 
                {
                    // this means only one of them is null 
                    return false; 
                }
 
                // 1) Compare Endpoint/EndpointConfigurationName
                if ((this.endpoint == null && other.endpoint != null) ||
                    (other.endpoint == null && this.endpoint != null))
                { 
                    return false;
                } 
 
                // if endpoint is not null we compare the endpoint, else we compare the endpointconfiguration
                if (this.endpoint != null) 
                {
                    if (!object.ReferenceEquals(this.endpoint, other.endpoint))
                    {
                        // Binding - 
                        // We are comparing by ref here, can we compare binding elements instead
                        if (this.endpoint.Binding != other.endpoint.Binding) 
                        { 
                            return false;
                        } 
                    }
                }
                else if (this.endpointConfigurationName != other.endpointConfigurationName)
                { 
                    return false;
                } 
 
                // (2) TokenImpersonationlevel
                if (this.TokenImpersonationLevel != other.TokenImpersonationLevel) 
                {
                    return false;
                }
 
                // (3) OperationContract.IsOneWay to decide if the ChannelFactory needs to be of type RequestChannel or OutputChannel
                if (this.IsOperationContractOneWay != other.IsOperationContractOneWay) 
                { 
                    return false;
                } 

                // (4) Verify if the ContractDescriptions are equivalent
                if (!ContractDescriptionComparerHelper.IsContractDescriptionEquivalent(this.contract, other.contract))
                { 
                    return false;
                } 
 
                // (5) Verify the correlationquery collection
                //  For now, we verify each query by ref, so that loop scenarios would work 
                //  Can we do a value comparison here?
                if (!ContractDescriptionComparerHelper.EqualsUnordered(this.correlationQueries, other.correlationQueries))
                {
                    return false; 
                }
 
                return true; 
            }
 
            public override int GetHashCode()
            {
                int hashCode = 0;
 
                if (this.contract != null && this.contract.Name != null)
                { 
                    //using ContractName as the hashcode 
                    hashCode ^= this.contract.Name.GetHashCode();
                } 

                if (this.endpoint != null && this.endpoint.Binding != null)
                {
                    //we compare binding by ref 
                    hashCode ^= this.endpoint.Binding.GetHashCode();
                } 
 
                return hashCode;
            } 

            static class ContractDescriptionComparerHelper
            {
                public static bool EqualsUnordered(Collection left, Collection right) where T : class 
                {
                    return EqualsUnordered(left, right, (t1, t2) => t1 == t2); 
                } 

                public static bool IsContractDescriptionEquivalent(ContractDescription c1, ContractDescription c2) 
                {
                    if (c1 == c2)
                    {
                        return true; 
                    }
 
                    // if the contract is not one of the default contracts that we use, we only do a byref comparison 
                    // fully inferred contracts always have null ContractType
                    if (c1.ContractType == null || c2.ContractType == null) 
                    {
                        return false;
                    }
 
                    //compare contractname
                    return (c1 != null && 
                            c2 != null && 
                            c1.Name == c2.Name &&
                            c1.Namespace == c2.Namespace && 
                            c1.ConfigurationName == c2.ConfigurationName &&
                            c1.ProtectionLevel == c2.ProtectionLevel &&
                            c1.SessionMode == c2.SessionMode &&
                            c1.ContractType == c2.ContractType && 
                            c1.Behaviors.Count == c2.Behaviors.Count && //we have no way to verify each one
                            EqualsUnordered(c1.Operations, c2.Operations, (o1, o2) => IsOperationDescriptionEquivalent(o1, o2))); 
                } 

                static bool EqualsOrdered(IList left, IList right, Func equals) 
                {
                    if (left == null)
                    {
                        return (right == null || right.Count == 0); 
                    }
                    else if (right == null) 
                    { 
                        return left.Count == 0;
                    } 
                    if (left.Count != right.Count)
                    {
                        return false;
                    } 
                    for (int i = 0; i < left.Count; i++)
                    { 
                        if (!equals(left[i], right[i])) 
                        {
                            return false; 
                        }
                    }
                    return true;
                } 

                static bool EqualsUnordered(Collection left, Collection right, Func equals) 
                { 
                    if (left == null)
                    { 
                        return (right == null || right.Count == 0);
                    }
                    else if (right == null)
                    { 
                        return left.Count == 0;
                    } 
                    // This check ensures that the lists have the same contents, but does not verify that they have the same 
                    // quantity of each item if they are duplicates.
                    return left.Count == right.Count && 
                        left.All(leftItem => right.Any(rightItem => equals(leftItem, rightItem))) &&
                        right.All(rightItem => left.Any(leftItem => equals(leftItem, rightItem)));
                }
 
                static bool IsOperationDescriptionEquivalent(OperationDescription o1, OperationDescription o2)
                { 
                    if (o1 == o2) 
                    {
                        return true; 
                    }

                    return (o1.Name == o2.Name &&
                            o1.ProtectionLevel == o2.ProtectionLevel && 
                            o1.IsOneWay == o2.IsOneWay &&
                            IsTransactionBehaviorEquivalent(o1, o2) && //we are verifying only the TransactionFlowBehavior 
                            EqualsOrdered(o1.Messages, o2.Messages, (m1, m2) => IsMessageDescriptionEquivalent(m1, m2))); 
                }
 
                static bool IsMessageDescriptionEquivalent(MessageDescription m1, MessageDescription m2)
                {
                    if (m1 == m2)
                    { 
                        return true;
                    } 
 
                    //we are comparing only action and direction
                    return (m1.Action == m2.Action && m1.Direction == m2.Direction); 
                }

                static bool IsTransactionBehaviorEquivalent(OperationDescription o1, OperationDescription o2)
                { 
                    if ((o1 == null || o2 == null) && o1 == o2)
                    { 
                        return true; 
                    }
                    if (o1.Behaviors.Count == o2.Behaviors.Count) 
                    {
                        //we are only going to check the TransactionFlowAttribute
                        TransactionFlowAttribute t1 = o1.Behaviors.Find();
                        TransactionFlowAttribute t2 = o2.Behaviors.Find(); 
                        if ((t1 == null && t2 != null) || (t2 == null && t1 != null))
                        { 
                            return false; 

                        } 
                        //verify if both have the same value for TransactionFlowOption
                        if ((t1 != null) && (t1.Transactions != t2.Transactions))
                        {
                            return false; 
                        }
                        else 
                        { 
                            return true;
                        } 
                    }
                    else
                    {
                        return false; 
                    }
                } 
            } 
        }
    } 
}

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK