SynchronizedPool.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / SynchronizedPool.cs / 1 / SynchronizedPool.cs

                            //------------------------------------------------------------ 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------
namespace System.ServiceModel
{ 
    using System.Collections.Generic;
    using System.Threading; 
    using System.Security; 
    using System.Security.Permissions;
 
    // A simple synchronized pool would simply lock a stack and push/pop on return/take.
    //
    // This implementation tries to reduce locking by exploiting the case where an item
    // is taken and returned by the same thread, which turns out to be common in our 
    // scenarios.
    // 
    // Initially, all the quota is allocated to a global (non-thread-specific) pool, 
    // which takes locks.  As different threads take and return values, we record their IDs,
    // and if we detect that a thread is taking and returning "enough" on the same thread, 
    // then we decide to "promote" the thread.  When a thread is promoted, we decrease the
    // quota of the global pool by one, and allocate a thread-specific entry for the thread
    // to store it's value.  Once this entry is allocated, the thread can take and return
    // it's value from that entry without taking any locks.  Not only does this avoid 
    // locks, but it affinitizes pooled items to a particular thread.
    // 
    // There are a couple of additional things worth noting: 
    //
    // It is possible for a thread that we have reserved an entry for to exit.  This means 
    // we will still have a entry allocated for it, but the pooled item stored there
    // will never be used.  After a while, we could end up with a number of these, and
    // as a result we would begin to exhaust the quota of the overall pool.  To mitigate this
    // case, we throw away the entire per-thread pool, and return all the quota back to 
    // the global pool if we are unable to promote a thread (due to lack of space).  Then
    // the set of active threads will be re-promoted as they take and return items. 
    // 
    // You may notice that the code does not immediately promote a thread, and does not
    // immediately throw away the entire per-thread pool when it is unable to promote a 
    // thread.  Instead, it uses counters (based on the number of calls to the pool)
    // and a threshold to figure out when to do these operations.  In the case where the
    // pool to misconfigured to have too few items for the workload, this avoids constant
    // promoting and rebuilding of the per thread entries. 
    //
    // You may also notice that we do not use interlocked methods when adjusting statistics. 
    // Since the statistics are a heuristic as to how often something is happening, they 
    // do not need to be perfect.
    // 
    class SynchronizedPool where T : class
    {
        Entry[] entries;
        PendingEntry[] pending; 
        GlobalPool globalPool;
        int maxCount; 
        int promotionFailures; 
        const int maxPendingEntries = 128;
        const int maxReturnsBeforePromotion = 64; 
        const int maxPromotionFailures = 64;
        const int maxThreadItemsPerProcessor = 16;

        public SynchronizedPool(int maxCount) 
        {
            int threadCount = maxCount; 
            int maxThreadCount = maxThreadItemsPerProcessor + SynchronizedPoolHelper.ProcessorCount; 
            if (threadCount > maxThreadCount)
                threadCount = maxThreadCount; 
            this.maxCount = maxCount;
            this.entries = new Entry[threadCount];
            this.pending = new PendingEntry[4];
            this.globalPool = new GlobalPool(maxCount); 
        }
 
        object ThisLock 
        {
            get { return this; } 
        }

        public void Clear()
        { 
            Entry[] entries = this.entries;
 
            for (int i = 0; i < entries.Length; i++) 
            {
                entries[i].value = null; 
            }

            globalPool.Clear();
        } 

        void HandlePromotionFailure(int thisThreadID) 
        { 
            int newPromotionFailures = this.promotionFailures + 1;
 
            if (newPromotionFailures >= maxPromotionFailures)
            {
                lock (ThisLock)
                { 
                    this.entries = new Entry[this.entries.Length];
 
                    globalPool.MaxCount = maxCount; 
                }
 
                PromoteThread(thisThreadID);
            }
            else
            { 
                this.promotionFailures = newPromotionFailures;
            } 
        } 

        bool PromoteThread(int thisThreadID) 
        {
            lock (ThisLock)
            {
                for (int i = 0; i < this.entries.Length; i++) 
                {
                    int threadID = this.entries[i].threadID; 
 
                    if (threadID == thisThreadID)
                    { 
                        return true;
                    }
                    else if (threadID == 0)
                    { 
                        globalPool.DecrementMaxCount();
                        this.entries[i].threadID = thisThreadID; 
                        return true; 
                    }
                } 
            }

            return false;
        } 

        void RecordReturnToGlobalPool(int thisThreadID) 
        { 
            PendingEntry[] localPending = this.pending;
 
            for (int i = 0; i < localPending.Length; i++)
            {
                int threadID = localPending[i].threadID;
 
                if (threadID == thisThreadID)
                { 
                    int newReturnCount = localPending[i].returnCount + 1; 

                    if (newReturnCount >= maxReturnsBeforePromotion) 
                    {
                        localPending[i].returnCount = 0;

                        if (!PromoteThread(thisThreadID)) 
                        {
                            HandlePromotionFailure(thisThreadID); 
                        } 
                    }
                    else 
                    {
                        localPending[i].returnCount = newReturnCount;
                    }
                    break; 
                }
                else if (threadID == 0) 
                { 
                    break;
                } 
            }
        }

        void RecordTakeFromGlobalPool(int thisThreadID) 
        {
            PendingEntry[] localPending = this.pending; 
 
            for (int i = 0; i < localPending.Length; i++)
            { 
                int threadID = localPending[i].threadID;

                if (threadID == thisThreadID)
                { 
                    return;
                } 
                else if (threadID == 0) 
                {
                    lock (localPending) 
                    {
                        if (localPending[i].threadID == 0)
                        {
                            localPending[i].threadID = thisThreadID; 
                            return;
                        } 
                    } 
                }
            } 

            if (localPending.Length >= maxPendingEntries)
            {
                this.pending = new PendingEntry[localPending.Length]; 
            }
            else 
            { 
                PendingEntry[] newPending = new PendingEntry[localPending.Length * 2];
                Array.Copy(localPending, newPending, localPending.Length); 
                this.pending = newPending;
            }
        }
 
        public bool Return(T value)
        { 
            int thisThreadID = Thread.CurrentThread.ManagedThreadId; 

            if (thisThreadID == 0) 
                return false;

            if (ReturnToPerThreadPool(thisThreadID, value))
                return true; 

            return ReturnToGlobalPool(thisThreadID, value); 
        } 

        bool ReturnToPerThreadPool(int thisThreadID, T value) 
        {
            Entry[] entries = this.entries;

            for (int i = 0; i < entries.Length; i++) 
            {
                int threadID = entries[i].threadID; 
 
                if (threadID == thisThreadID)
                { 
                    if (entries[i].value == null)
                    {
                        entries[i].value = value;
                        return true; 
                    }
                    else 
                    { 
                        return false;
                    } 
                }
                else if (threadID == 0)
                {
                    break; 
                }
            } 
 
            return false;
        } 

        bool ReturnToGlobalPool(int thisThreadID, T value)
        {
            RecordReturnToGlobalPool(thisThreadID); 

            return globalPool.Return(value); 
        } 

        public T Take() 
        {
            int thisThreadID = Thread.CurrentThread.ManagedThreadId;

            if (thisThreadID == 0) 
                return null;
 
            T value = TakeFromPerThreadPool(thisThreadID); 

            if (value != null) 
                return value;

            return TakeFromGlobalPool(thisThreadID);
        } 

        T TakeFromPerThreadPool(int thisThreadID) 
        { 
            Entry[] entries = this.entries;
 
            for (int i = 0; i < entries.Length; i++)
            {
                int threadID = entries[i].threadID;
 
                if (threadID == thisThreadID)
                { 
                    T value = entries[i].value; 

                    if (value != null) 
                    {
                        entries[i].value = null;
                        return value;
                    } 
                    else
                    { 
                        return null; 
                    }
                } 
                else if (threadID == 0)
                {
                    break;
                } 
            }
 
            return null; 
        }
 
        T TakeFromGlobalPool(int thisThreadID)
        {
            RecordTakeFromGlobalPool(thisThreadID);
 
            return globalPool.Take();
        } 
 
        struct Entry
        { 
            public int threadID;
            public T value;
        }
 
        struct PendingEntry
        { 
            public int threadID; 
            public int returnCount;
        } 

        class GlobalPool
        {
            Stack items; 
            int maxCount;
 
            public GlobalPool(int maxCount) 
            {
                this.items = new Stack(); 
                this.maxCount = maxCount;
            }

            public int MaxCount 
            {
                get 
                { 
                    return maxCount;
                } 
                set
                {
                    lock (ThisLock)
                    { 
                        while (items.Count > value)
                        { 
                            items.Pop(); 
                        }
                        maxCount = value; 
                    }
                }
            }
 
            object ThisLock
            { 
                get { return this; } 
            }
 
            public void DecrementMaxCount()
            {
                lock (ThisLock)
                { 
                    if (items.Count == maxCount)
                    { 
                        items.Pop(); 
                    }
                    maxCount--; 
                }
            }

            public T Take() 
            {
                if (this.items.Count > 0) 
                { 
                    lock (ThisLock)
                    { 
                        if (this.items.Count > 0)
                        {
                            return this.items.Pop();
                        } 
                    }
                } 
                return null; 
            }
 
            public bool Return(T value)
            {
                if (this.items.Count < this.MaxCount)
                { 
                    lock (ThisLock)
                    { 
                        if (this.items.Count < this.MaxCount) 
                        {
                            this.items.Push(value); 
                            return true;
                        }
                    }
                } 
                return false;
            } 
 
            public void Clear()
            { 
                lock (ThisLock)
                {
                    this.items.Clear();
                } 
            }
        } 
    } 

    static class SynchronizedPoolHelper 
    {
        static public readonly int ProcessorCount = GetProcessorCount();

        ///  
        /// Critical - Asserts in order to get the processor count from the environment
        /// Safe - this data isn't actually protected so it's ok to leak 
        ///  
        [SecurityCritical, SecurityTreatAsSafe]
        [EnvironmentPermission(SecurityAction.Assert, Read = "NUMBER_OF_PROCESSORS")] 
        static int GetProcessorCount()
        {
            return Environment.ProcessorCount;
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK