BufferedReceiveManager.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Activities / System / ServiceModel / Activities / Dispatcher / BufferedReceiveManager.cs / 1407647 / BufferedReceiveManager.cs

                            //---------------------------------------------------------------- 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//---------------------------------------------------------------
namespace System.ServiceModel.Activities.Dispatcher
{ 
    using System.Activities.Hosting;
    using System.Collections.Generic; 
    using System.Collections.ObjectModel; 
    using System.Runtime;
    using System.Runtime.DurableInstancing; 
    using System.ServiceModel.Channels;
    using System.Threading;

    sealed class BufferedReceiveManager : IExtension 
    {
        static AsyncCallback onEndAbandon; 
        Dictionary> bufferedProperties; 
        PendingMessageThrottle throttle;
        WorkflowServiceHost host; 

        int initialized;

        [Fx.Tag.SynchronizationObject(Blocking = false)] 
        object thisLock;
 
        public BufferedReceiveManager(int maxPendingMessagesPerChannel) 
        {
            this.throttle = new PendingMessageThrottle(maxPendingMessagesPerChannel); 
            this.thisLock = new object();
        }

        public bool BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry) 
        {
            Fx.Assert(receiveContext != null, "ReceiveContext must be present in order to perform buffering"); 
 
            bool success = false;
 
            BufferedReceiveMessageProperty property = null;
            if (BufferedReceiveMessageProperty.TryGet(operationContext.IncomingMessageProperties, out property))
            {
                CorrelationMessageProperty correlation = null; 
                if (CorrelationMessageProperty.TryGet(operationContext.IncomingMessageProperties, out correlation))
                { 
                    InstanceKey instanceKey = correlation.CorrelationKey; 
                    int channelKey = operationContext.Channel.GetHashCode();
                    if (this.throttle.Acquire(channelKey)) 
                    {
                        try
                        {
                            // Tag the property with identifying data to be used during later processing 
                            if (UpdateProperty(property, receiveContext, channelKey, bookmarkName, state))
                            { 
                                // Cleanup if we are notified the ReceiveContext faulted underneath us 
                                receiveContext.Faulted += delegate(object sender, EventArgs e)
                                { 
                                    lock (this.thisLock)
                                    {
                                        if (this.bufferedProperties.ContainsKey(instanceKey))
                                        { 
                                            if (this.bufferedProperties[instanceKey].Remove(property))
                                            { 
                                                try 
                                                {
                                                    property.RequestContext.DelayClose(false); 
                                                    property.RequestContext.Abort();
                                                }
                                                catch (Exception exception)
                                                { 
                                                    if (Fx.IsFatal(exception))
                                                    { 
                                                        throw; 
                                                    }
 
                                                    // ---- these exceptions as we are already on the error path
                                                }

                                                this.throttle.Release(channelKey); 
                                            }
                                        } 
                                    } 
                                };
 
                                // Actual Buffering
                                lock (this.thisLock)
                                {
                                    // Optimistic state check in case we just ----d with the receiveContext 
                                    // faulting. If the receiveContext still faults after the state check, the above
                                    // cleanup routine will handle things correctly. In both cases, a double-release 
                                    // of the throttle is protected. 
                                    if (receiveContext.State == ReceiveContextState.Received)
                                    { 
                                        bool found = false;
                                        // if the exception indicates retry-able (such as RetryException),
                                        // we will simply retry.  This happens when racing with abort and
                                        // WF informing the client to retry (BufferedReceiveManager is a 
                                        // client in this case).
                                        if (retry) 
                                        { 
                                            property.RequestContext.DelayClose(true);
                                            property.RegisterForReplay(operationContext); 
                                            property.ReplayRequest();
                                            property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
                                            found = true;
                                        } 
                                        else
                                        { 
                                            ReadOnlyCollection bookmarks = this.host.DurableInstanceManager.PersistenceProviderDirectory.GetBookmarksForInstance(instanceKey); 
                                            // Retry in case match the existing bookmark
                                            if (bookmarks != null) 
                                            {
                                                for (int i = 0; i < bookmarks.Count; ++i)
                                                {
                                                    BookmarkInfo bookmark = bookmarks[i]; 
                                                    if (bookmark.BookmarkName == bookmarkName)
                                                    { 
                                                        // Found it so retry... 
                                                        property.RequestContext.DelayClose(true);
                                                        property.RegisterForReplay(operationContext); 
                                                        property.ReplayRequest();
                                                        property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
                                                        found = true;
                                                        break; 
                                                    }
                                                } 
                                            } 
                                        }
 
                                        if (!found)
                                        {
                                            List properties;
                                            if (!this.bufferedProperties.TryGetValue(instanceKey, out properties)) 
                                            {
                                                properties = new List(); 
                                                this.bufferedProperties.Add(instanceKey, properties); 
                                            }
                                            property.RequestContext.DelayClose(true); 
                                            property.RegisterForReplay(operationContext);
                                            properties.Add(property);
                                        }
                                        else 
                                        {
                                            this.throttle.Release(channelKey); 
                                        } 
                                        success = true;
                                    } 
                                }
                            }
                        }
                        finally 
                        {
                            if (!success) 
                            { 
                                this.throttle.Release(channelKey);
                            } 
                        }
                    }
                }
            } 

            return success; 
        } 

        public void Retry(HashSet associatedInstances, ReadOnlyCollection availableBookmarks) 
        {
            List bookmarks = new List(availableBookmarks);
            foreach (InstanceKey instanceKey in associatedInstances)
            { 
                lock (this.thisLock)
                { 
                    if (this.bufferedProperties.ContainsKey(instanceKey)) 
                    {
                        List properties = this.bufferedProperties[instanceKey]; 
                        int index = 0;

                        while (index < properties.Count && bookmarks.Count > 0)
                        { 
                            BufferedReceiveMessageProperty property = properties[index];
 
                            // Determine if this property is now ready to be processed 
                            int channelKey = 0;
                            bool found = false; 
                            for (int i = 0 ; i < bookmarks.Count ; ++i)
                            {
                                BookmarkInfo bookmark = (BookmarkInfo)bookmarks[i];
                                PropertyData data = (PropertyData)property.UserState; 
                                if (bookmark.BookmarkName == data.BookmarkName)
                                { 
                                    // Found it so retry... 
                                    bookmarks.RemoveAt(i);
                                    channelKey = data.ChannelKey; 
                                    property.ReplayRequest();
                                    property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext);
                                    found = true;
                                    break; 
                                }
                            } 
 
                            if (!found)
                            { 
                                index++;
                            }
                            else
                            { 
                                properties.RemoveAt(index);
                                this.throttle.Release(channelKey); 
                            } 
                        }
                    } 
                }

                if (bookmarks.Count == 0)
                { 
                    break;
                } 
            } 
        }
 
        public void AbandonBufferedReceives(HashSet associatedInstances)
        {
            foreach (InstanceKey instanceKey in associatedInstances)
            { 
                lock (this.thisLock)
                { 
                    if (this.bufferedProperties.ContainsKey(instanceKey)) 
                    {
                        foreach (BufferedReceiveMessageProperty property in this.bufferedProperties[instanceKey]) 
                        {
                            PropertyData data = (PropertyData)property.UserState;
                            AbandonReceiveContext(data.ReceiveContext);
                            this.throttle.Release(data.ChannelKey); 
                        }
 
                        this.bufferedProperties.Remove(instanceKey); 
                    }
                } 
            }
        }

        // clean up any remaining buffered receives as part of ServiceHost close. 
        internal void AbandonBufferedReceives()
        { 
            lock (this.thisLock) 
            {
                foreach (List value in this.bufferedProperties.Values) 
                {
                    foreach (BufferedReceiveMessageProperty property in value)
                    {
                        PropertyData data = (PropertyData)property.UserState; 
                        AbandonReceiveContext(data.ReceiveContext);
                        this.throttle.Release(data.ChannelKey); 
                    } 
                }
                this.bufferedProperties.Clear(); 
            }
        }

        // Best-effort to abandon the receiveContext 
        internal static void AbandonReceiveContext(ReceiveContext receiveContext)
        { 
            if (receiveContext != null) 
            {
                if (onEndAbandon == null) 
                {
                    onEndAbandon = Fx.ThunkCallback(new AsyncCallback(OnEndAbandon));
                }
 
                try
                { 
                    IAsyncResult result = receiveContext.BeginAbandon( 
                        TimeSpan.MaxValue, onEndAbandon, receiveContext);
                    if (result.CompletedSynchronously) 
                    {
                        HandleEndAbandon(result);
                    }
                } 
                catch (Exception exception)
                { 
                    if (Fx.IsFatal(exception)) 
                    {
                        throw; 
                    }

                    // We ---- any Abandon exception - best effort.
                    FxTrace.Exception.AsWarning(exception); 
                }
            } 
        } 

        static bool HandleEndAbandon(IAsyncResult result) 
        {
            ReceiveContext receiveContext = (ReceiveContext)result.AsyncState;
            receiveContext.EndAbandon(result);
            return true; 
        }
 
        static void OnEndAbandon(IAsyncResult result) 
        {
            if (result.CompletedSynchronously) 
            {
                return;
            }
 
            try
            { 
                HandleEndAbandon(result); 
            }
            catch (Exception exception) 
            {
                if (Fx.IsFatal(exception))
                {
                    throw; 
                }
 
                // We ---- any Abandon exception - best effort. 
                FxTrace.Exception.AsWarning(exception);
            } 
        }

        void IExtension.Attach(ServiceHostBase owner)
        { 
            if (owner == null)
            { 
                throw FxTrace.Exception.AsError(new ArgumentNullException("owner")); 
            }
 
            if (Interlocked.CompareExchange(ref this.initialized, 1, 0) != 0)
            {
                throw FxTrace.Exception.AsError(
                    new InvalidOperationException(SR.BufferedReceiveBehaviorMultipleUse)); 
            }
 
            owner.ThrowIfClosedOrOpened(); 

            Fx.Assert(owner is WorkflowServiceHost, "owner must be of WorkflowServiceHost type!"); 
            this.host = (WorkflowServiceHost)owner;
            Initialize();
        }
 
        void IExtension.Detach(ServiceHostBase owner)
        { 
        } 

        bool UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state) 
        {
            // If there's data already there make sure the state is allowed
            if (property.UserState == null)
            { 
                property.UserState = new PropertyData()
                { 
                    ReceiveContext = receiveContext, 
                    ChannelKey = channelKey,
                    BookmarkName = bookmarkName, 
                    State = state
                };
            }
            else 
            {
                PropertyData data = (PropertyData)property.UserState; 
 
                // We should not buffer twice at the same state
                if (data.State == state) 
                {
                    return false;
                }
 
                data.State = state;
            } 
 
            return true;
        } 

        void Initialize()
        {
            this.bufferedProperties = new Dictionary>(); 
        }
 
        class PendingMessageThrottle 
        {
            [Fx.Tag.SynchronizationObject(Blocking = false)] 
            Dictionary pendingMessages;

            int maxPendingMessagesPerChannel;
            int warningRestoreLimit; 

            public PendingMessageThrottle(int maxPendingMessagesPerChannel) 
            { 
                this.maxPendingMessagesPerChannel = maxPendingMessagesPerChannel;
                this.warningRestoreLimit = (int)Math.Floor(0.7 * (double)maxPendingMessagesPerChannel); 
                this.pendingMessages = new Dictionary();
            }

            public bool Acquire(int channelKey) 
            {
                lock (this.pendingMessages) 
                { 
                    if (!this.pendingMessages.ContainsKey(channelKey))
                    { 
                        this.pendingMessages.Add(channelKey, new ThrottleEntry());
                    }

                    ThrottleEntry entry = this.pendingMessages[channelKey]; 
                    if (entry.Count < this.maxPendingMessagesPerChannel)
                    { 
                        entry.Count++; 
                        return true;
                    } 
                    else
                    {
                        if (TD.MaxPendingMessagesPerChannelExceededIsEnabled())
                        { 
                            if (!entry.WarningIssued)
                            { 
                                TD.MaxPendingMessagesPerChannelExceeded(this.maxPendingMessagesPerChannel); 
                                entry.WarningIssued = true;
                            } 
                        }

                        return false;
                    } 
                }
            } 
 
            public void Release(int channelKey)
            { 
                lock (this.pendingMessages)
                {
                    ThrottleEntry entry = this.pendingMessages[channelKey];
                    Fx.Assert(entry.Count > 0, "The pending message throttle was released too many times"); 

                    entry.Count--; 
                    if (entry.Count == 0) 
                    {
                        this.pendingMessages.Remove(channelKey); 
                    }
                    else if (entry.Count < this.warningRestoreLimit)
                    {
                        entry.WarningIssued = false; 
                    }
                } 
            } 

            class ThrottleEntry 
            {
                public ThrottleEntry()
                {
                } 

                public bool WarningIssued 
                { 
                    get;
                    set; 
                }

                public int Count
                { 
                    get;
                    set; 
                } 
            }
        } 

        class PropertyData
        {
            public PropertyData() 
            {
            } 
 
            public ReceiveContext ReceiveContext
            { 
                get;
                set;
            }
 
            public int ChannelKey
            { 
                get; 
                set;
            } 

            public string BookmarkName
            {
                get; 
                set;
            } 
 
            public BufferedReceiveState State
            { 
                get;
                set;
            }
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK