SynchronizedDispatch.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ FX-1434 / FX-1434 / 1.0 / untmp / whidbey / REDBITS / ndp / clr / src / BCL / System / Runtime / Remoting / SynchronizedDispatch.cs / 1 / SynchronizedDispatch.cs

                            // ==++== 
//
//   Copyright (c) Microsoft Corporation.  All rights reserved.
//
// ==--== 
//
//   Synchronization Property for URT Contexts. Uses the ThreadPool API. 
//   An instance of this property in a context enforces a synchronization 
//   domain for the context (and all contexts that share the same instance).
//   This means that at any instant, at most 1 thread could be executing 
//   in all contexts that share an instance of this property.
//
//   This is done by contributing sinks that intercept and serialize in-coming
//   calls for the respective contexts. 
//
//   If the property is marked for re-entrancy, then call-outs are 
//   intercepted too. The call-out interception allows other waiting threads 
//   to enter the synchronization domain for maximal throughput.
// 
namespace System.Runtime.Remoting.Contexts {
    using System.Threading;
    using System.Runtime.Remoting;
    using System.Runtime.Remoting.Messaging; 
    using System.Runtime.Remoting.Activation;
    using System.Security.Permissions; 
    using System; 
    using Queue = System.Collections.Queue;
    using ArrayList = System.Collections.ArrayList; 
    [Serializable]
    [AttributeUsage(AttributeTargets.Class)]
    [SecurityPermissionAttribute(SecurityAction.LinkDemand, Flags=SecurityPermissionFlag.Infrastructure)]
    [SecurityPermissionAttribute(SecurityAction.InheritanceDemand, Flags=SecurityPermissionFlag.Infrastructure)] 
[System.Runtime.InteropServices.ComVisible(true)]
    public class SynchronizationAttribute 
        : ContextAttribute, IContributeServerContextSink, 
                    IContributeClientContextSink
    { 
        // The class should not be instantiated in a context that has Synchronization
        public const int NOT_SUPPORTED  = 0x00000001;

        // The class does not care if the context has Synchronization or not 
        public const int SUPPORTED      = 0x00000002;
 
        // The class should be instantiated in a context that has Synchronization 
        public const int REQUIRED    = 0x00000004;
 
        // The class should be instantiated in a context with a new instance of
        // Synchronization property each time
        public const int REQUIRES_NEW = 0x00000008;
 
        private const String PROPERTY_NAME = "Synchronization";
 
        private static readonly UInt32 _timeOut = (UInt32)0x7fffffff; 
        // event that releases a thread-pool worker thread
        [NonSerialized()]internal AutoResetEvent _asyncWorkEvent; 
        [NonSerialized()]private RegisteredWaitHandle _waitHandle;

        // queue of work items.
        [NonSerialized()]internal Queue _workItemQueue; 
        // flag for the domain lock (access always synchronized on the _workItemQueue)
        [NonSerialized()]internal bool _locked; 
        // flag to indicate if the lock should be released during call-outs 
        internal bool _bReEntrant;
        // flag for use as an attribute on types 
        internal int _flavor;

        [NonSerialized()]private SynchronizationAttribute _cliCtxAttr;
        // Logical call id (used only in non-reentrant case for deadlock avoidance) 
        [NonSerialized()]private String _syncLcid;
        [NonSerialized()]private ArrayList _asyncLcidList; 
 

        public virtual bool Locked {get { return _locked;} set { _locked=value; } } 
        public virtual bool IsReEntrant { get { return _bReEntrant;} }

        internal String SyncCallOutLCID
        { 
            get
            { 
                BCLDebug.Assert( 
                    !_bReEntrant,
                    "Should not use this for the reentrant case"); 

                return _syncLcid;
            }
 
            set
            { 
                BCLDebug.Assert( 
                    !_bReEntrant,
                    "Should not use this for the reentrant case"); 

                BCLDebug.Assert(
                    _syncLcid==null
                        || (_syncLcid!=null && value==null) 
                        || _syncLcid.Equals(value),
                    "context can be associated with one logical call at a time"); 
 
                _syncLcid = value;
            } 
        }

        internal ArrayList AsyncCallOutLCIDList
        { 
            get { return _asyncLcidList; }
        } 
 
        internal bool IsKnownLCID(IMessage reqMsg)
        { 
            String msgLCID =
                ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey])
                    .RemotingData.LogicalCallID;
            return ( msgLCID.Equals(_syncLcid) 
                    || _asyncLcidList.Contains(msgLCID));
 
        } 

 
        /*
        *   Constructor for the synchronized dispatch property
        */
        public SynchronizationAttribute() 

            : this(REQUIRED, false) { 
        } 

        /* 
        *   Constructor.
        *   If reEntrant is true, we allow other calls to come in
        *   if the currently running call leaves the domain for a call-out.
        */ 
        public SynchronizationAttribute(bool reEntrant)
 
            : this(REQUIRED, reEntrant) { 
        }
 
        public SynchronizationAttribute(int flag)

            : this(flag, false) {
        } 

        public SynchronizationAttribute(int flag, bool reEntrant) 
 
            // Invoke ContextProperty ctor!
            : base(PROPERTY_NAME) { 

            _bReEntrant = reEntrant;

            switch (flag) 
            {
            case NOT_SUPPORTED: 
            case SUPPORTED: 
            case REQUIRED:
            case REQUIRES_NEW: 
                _flavor = flag;
                break;
            default:
                throw new ArgumentException(Environment.GetResourceString("Argument_InvalidFlag"), "flag"); 
            }
        } 
 
        // Dispose off the WaitHandle registered in Initialization
        internal void Dispose() 
        {
            //Unregister the RegisteredWaitHandle
            if (_waitHandle != null)
                _waitHandle.Unregister(null); 
        }
 
        // Override ContextAttribute's implementation of IContextAttribute::IsContextOK 
[System.Runtime.InteropServices.ComVisible(true)]
        public override bool IsContextOK(Context ctx, IConstructionCallMessage msg) 
        {
            if (ctx == null)
                throw new ArgumentNullException("ctx");
            if (msg == null) 
                throw new ArgumentNullException("msg");
 
            // < 

            bool isOK = true; 
            if (_flavor == REQUIRES_NEW)
            {
                isOK = false;
                // Each activation request instantiates a new attribute class. 
                // We are relying on that for the REQUIRES_NEW case!
                BCLDebug.Assert(ctx.GetProperty(PROPERTY_NAME) != this, 
                    "ctx.GetProperty(PROPERTY_NAME) != this"); 
            }
            else 
            {
                SynchronizationAttribute syncProp = (SynchronizationAttribute) ctx.GetProperty(PROPERTY_NAME);
                if (   ( (_flavor == NOT_SUPPORTED)&&(syncProp != null) )
                    || ( (_flavor == REQUIRED)&&(syncProp == null) ) 
                    )
                { 
                    isOK = false; 
                }
 
                if (_flavor == REQUIRED)
                {
                    // pick up the property from the current context
                    _cliCtxAttr = syncProp; 
                }
            } 
            return isOK; 
        }
 
        // Override ContextAttribute's impl. of IContextAttribute::GetPropForNewCtx
[System.Runtime.InteropServices.ComVisible(true)]
        public override void GetPropertiesForNewContext(IConstructionCallMessage ctorMsg)
        { 
            if ( (_flavor==NOT_SUPPORTED) || (_flavor==SUPPORTED) || (null == ctorMsg) )
            { 
                return ; 
            }
 
            if (_cliCtxAttr != null)
            {
                BCLDebug.Assert(_flavor == REQUIRED,"Use cli-ctx property only for the REQUIRED flavor");
                ctorMsg.ContextProperties.Add((IContextProperty)_cliCtxAttr); 
                _cliCtxAttr = null;
            } 
            else 
            {
                ctorMsg.ContextProperties.Add((IContextProperty)this); 
            }
        }

        // We need this to make the use of the property as an attribute 
        // light-weight. This allows us to delay initialize everything we
        // need to fully function as a ContextProperty. 
        internal virtual void InitIfNecessary() 
        {
            lock(this) 
            {
                if (_asyncWorkEvent == null)
                {
                    // initialize thread pool event to non-signaled state. 
                    _asyncWorkEvent = new AutoResetEvent(false);
 
                    _workItemQueue = new Queue(); 
                    _asyncLcidList = new ArrayList();
 
                    WaitOrTimerCallback callBackDelegate =
                        new WaitOrTimerCallback(this.DispatcherCallBack);

                    // Register a callback to be executed by the thread-pool 
                    // each time the event is signaled.
                    _waitHandle = ThreadPool.RegisterWaitForSingleObject( 
                                    _asyncWorkEvent, 
                                    callBackDelegate,
                                    null, // state info 
                                    _timeOut,
                                    false); // bExecuteOnlyOnce
                }
            } 
        }
 
        /* 
        * Call back function -- executed for each work item that
        * was enqueued. This is invoked by a thread-pool thread for 
        * async work items and the caller thread for sync items.
        */
        private void DispatcherCallBack(Object stateIgnored, bool ignored)
        { 
            // This function should be called by only one thread at a time. We will
            // ensure this by releasing exactly one waiting thread to go work on 
            // a WorkItem 

            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- In DispatherCallBack "); 

            BCLDebug.Assert(_locked==true,"_locked==true");
            WorkItem work;
            // get the work item out of the queue. 
            lock (_workItemQueue)
            { 
                work = (WorkItem) _workItemQueue.Dequeue(); 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- Dequeued Work for: " + work._thread.GetHashCode());
            } 
            BCLDebug.Assert(work!=null,"work!=null");
            BCLDebug.Assert(work.IsSignaled() && !(work.IsDummy()),"work.IsSignaled() && !(work.IsDummy())");
            // execute the work item (WorkItem.Execute will switch to the proper context)
            ExecuteWorkItem(work); 
            HandleWorkCompletion();
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] --- CallBack finished for: " + work._thread.GetHashCode()); 
        } 

        /* 
        *   This is used by the call-out (client context) sinks to notify
        *   the domain manager that the thread is leaving
        */
        internal virtual void HandleThreadExit() 
        {
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~~ Thread EXIT ~~~~"); 
            // For now treat this as if the work was completed! 
            BCLDebug.Assert(_locked==true,"_locked==true");
            HandleWorkCompletion(); 
        }

        /*
        *   This is used by a returning call-out thread to request 
        *   that it be queued for re-entry into the domain.
        */ 
        internal virtual void HandleThreadReEntry() 
        {
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~~ Thread REQUEST REENTRY ~~~~"); 
            // Treat this as if a new work item needs to be done
            // <

            WorkItem work = new WorkItem(null, null, null); 
            work.SetDummy();
            HandleWorkRequest(work); 
        } 

        /* 
        *   This gets called at the end of work.Execute and from
        *   HandleThreadExit() in the re-entrant scenario.
        *   This is the point where we decide what to do next!
        */ 
        internal virtual void HandleWorkCompletion()
        { 
            // We should still have the lock held for the workItem that just completed 
            BCLDebug.Assert(_locked==true,"_locked==true");
            // Now we check the queue to see if we need to release any one? 
            WorkItem nextWork = null;
            bool bNotify = false;
            lock (_workItemQueue)
            { 
                if (_workItemQueue.Count >= 1)
                { 
                    nextWork = (WorkItem) _workItemQueue.Peek(); 
                    bNotify = true;
                    nextWork.SetSignaled(); 
                }
                else
                {
                    // We set locked to false only in the case there is no 
                    // next work to be done.
                    // NOTE: this is the only place _locked in ever set to false! 
                    //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Domain UNLOCKED!"); 
                    _locked = false;
                } 
            }
            // See if we found a non-signaled work item at the head.
            if (bNotify)
            { 
                // In both sync and async cases we just hand off the _locked state to
                // the next thread which will execute. 
                if (nextWork.IsAsync()) 
                {
                    // Async-WorkItem: signal ThreadPool event to release one thread 
                    _asyncWorkEvent.Set();
                    //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Signal " + nextWork._thread.GetHashCode() + (nextWork.IsDummy()?" DUMMY ":" REAL "));
                }
                else 
                {
                    // Sync-WorkItem: notify the waiting sync-thread. 
                    lock(nextWork) 
                    {
                        Monitor.Pulse(nextWork); 
                        //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Notify " + nextWork._thread.GetHashCode() + (nextWork.IsDummy()?" DUMMY ":" REAL ") );
                    }
                }
            } 
        }
 
        /* 
        *   This is called by any new incoming thread or from
        *   HandleThreadReEntry() when a call-out thread wants to 
        *   re-enter the domain.
        *   In the latter case, the WorkItem is a dummy item, it
        *   just serves the purpose of something to block on till
        *   the thread is given a green signal to re-enter. 
        */
        internal virtual void HandleWorkRequest(WorkItem work) 
        { 
            // <
 

            bool bQueued;

            // Check for nested call backs 
            if (!IsNestedCall(work._reqMsg))
            { 
                // See what type of work it is 
                if (work.IsAsync())
                { 
                    // Async work is always queued.
                    bQueued = true;
                    // Enqueue the workItem
                    lock (_workItemQueue) 
                    {
                        //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Async Item EnQueue " + work._thread.GetHashCode()); 
                        work.SetWaiting(); 
                        _workItemQueue.Enqueue(work);
                        // If this is the only work item in the queue we will 
                        // have to trigger the thread-pool event ourselves
                        if ( (!_locked) && (_workItemQueue.Count == 1) )
                        {
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Async Signal Self: " + work._thread.GetHashCode()); 
                            work.SetSignaled();
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Domain Locked!"); 
                            _locked = true; 
                            _asyncWorkEvent.Set();
                        } 
                    }
                }
                else
                { 
                    // Sync work is queued only if there are other items
                    // already in the queue. 
                    lock(work) 
                    {
                        // Enqueue if we need to 
                        lock(_workItemQueue)
                        {
                            if ((!_locked) && (_workItemQueue.Count == 0))
                            { 
                                _locked = true;
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ### Domain Locked!"); 
                                bQueued = false; 
                            }
                            else 
                            {
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ ENQUEUE Sync!" + (work.IsDummy()?" DUMMY ":" REAL ") + work._thread);
                                bQueued = true;
                                work.SetWaiting(); 
                                _workItemQueue.Enqueue(work);
                            } 
                        } 

                        if (bQueued == true) 
                        {
                            // If we queued a work item we must wait for some
                            // other thread to peek at it and Notify us.
 
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ WORK::WAIT" + work._thread);
                            Monitor.Wait(work); 
                            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ FINISH Work::WAIT" + work._thread); 
                            BCLDebug.Assert(_locked==true,"_locked==true");
                            // Our turn to complete the work! 
                            // Execute the callBack only if this is real work
                            if (!work.IsDummy())
                            {
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Invoke DispatcherCallBack " + work._thread); 
                                // We invoke the callback here that does exactly
                                // what we need to do ... dequeue work, execute, checkForMore 
                                DispatcherCallBack(null, true); 
                            }
                            else 
                            {
                                // DummyWork is just use to block/unblock a returning call.
                                // Throw away our dummy WorkItem.
                                lock(_workItemQueue) 
                                {
                                    _workItemQueue.Dequeue(); 
                                } 
                                // We don't check for more work here since we are already
                                // in the midst of an executing WorkItem (at the end of which 
                                // the check will be performed)
                            }
                        }
                        else 
                        {
                            // We did not queue the work item. 
                            if (!work.IsDummy()) 
                            {
                                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Execute direct" + work._thread); 
                                // Execute the work.
                                BCLDebug.Assert(_locked==true,"_locked==true");
                                work.SetSignaled();
                                ExecuteWorkItem(work); 
                                // Check for more work
                                HandleWorkCompletion(); 
                            } 
                        }
                    } 
                }
            }
            else
            { 
                // We allow the nested calls to execute directly
 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] ~~~ Execute Nested Call direct" + work._thread); 
                // Execute the work.
                BCLDebug.Assert(_locked==true,"_locked==true"); 
                work.SetSignaled();
                work.Execute();
                // We are still inside the top level call ...
                // so after work.Execute finishes we don't check for more work 
                // or unlock the domain as we do elsewhere.
            } 
        } 

        internal void ExecuteWorkItem(WorkItem work) 
        {
            work.Execute();
        }
 
        internal bool IsNestedCall(IMessage reqMsg)
        { 
            // This returns TRUE only if it is a non-reEntrant context 
            // AND
            // (the LCID of the reqMsg matches that of 
            // the top level sync call lcid associated with the context.
            //  OR
            // it matches one of the async call out lcids)
 
            bool bNested = false;
            if (!IsReEntrant) 
            { 
                String lcid = SyncCallOutLCID;
                if (lcid != null) 
                {
                    // This means we are inside a top level call
                    LogicalCallContext callCtx =
                        (LogicalCallContext)reqMsg.Properties[Message.CallContextKey]; 

                    if ( callCtx!=null && 
                        lcid.Equals(callCtx.RemotingData.LogicalCallID)) 
                    {
                        // This is a nested call (we made a call out during 
                        // the top level call and eventually that has resulted
                        // in an incoming call with the same lcid)
                        bNested = true;
                    } 
                }
                if (!bNested && AsyncCallOutLCIDList.Count>0) 
                { 
                    // This means we are inside a top level call
                    LogicalCallContext callCtx = 
                        (LogicalCallContext)reqMsg.Properties[Message.CallContextKey];
                    if (AsyncCallOutLCIDList.Contains(callCtx.RemotingData.LogicalCallID))
                    {
                        bNested = true; 
                    }
                } 
            } 
            return bNested;
        } 


        /*
        *   Implements IContributeServerContextSink::GetServerContextSink 
        *   Create a SynchronizedDispatch sink and return it.
        */ 
        public virtual IMessageSink GetServerContextSink(IMessageSink nextSink) 
        {
            InitIfNecessary(); 

            SynchronizedServerContextSink propertySink =
                new SynchronizedServerContextSink(
                            this, 
                            nextSink);
 
            return (IMessageSink) propertySink; 
        }
 
        /*
        *   Implements IContributeClientContextSink::GetClientContextSink
        *   Create a CallOut sink and return it.
        */ 
        public virtual IMessageSink GetClientContextSink(IMessageSink nextSink)
        { 
            InitIfNecessary(); 

            SynchronizedClientContextSink propertySink = 
                new SynchronizedClientContextSink(
                            this,
                            nextSink);
 
            return (IMessageSink) propertySink;
        } 
 
    }
 
    /*************************************** SERVER SINK ********************************/
    /*
    *   Implements the sink contributed by the Synch-Dispatch
    *   Property. The sink holds a back pointer to the property. 
    *   The sink intercepts incoming calls to objects resident in
    *   the Context and co-ordinates with the property to enforce 
    *   the domain policy. 
    */
    internal class SynchronizedServerContextSink 
            : InternalSink, IMessageSink
    {
        internal IMessageSink   _nextSink;
        internal SynchronizationAttribute _property; 

        internal SynchronizedServerContextSink(SynchronizationAttribute prop, IMessageSink nextSink) 
        { 
            _property = prop;
            _nextSink = nextSink; 
        }

        ~SynchronizedServerContextSink()
        { 
            _property.Dispose();
        } 
 
        /*
        * Implements IMessageSink::SyncProcessMessage 
        */
        public virtual IMessage SyncProcessMessage(IMessage reqMsg)
        {
            // 1. Create a work item 
            WorkItem work = new WorkItem(reqMsg,
                                        _nextSink, 
                                        null /* replySink */); 

            // 2. Notify the property to handle the WorkItem 
            // The work item may get put in a Queue or may execute directly
            // if the domain is free.
            _property.HandleWorkRequest(work);
 
            // 3. Pick up retMsg from the WorkItem and return
            return work.ReplyMessage; 
        } 

        /* 
        *   Implements IMessageSink::AsyncProcessMessage
        */
        public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
        { 
            // 1. Create a work item
            WorkItem work = new WorkItem(reqMsg, 
                                        _nextSink, 
                                        replySink);
            work.SetAsync(); 
            // 2. We always queue the work item in async case
            _property.HandleWorkRequest(work);
            // 3. Return an IMsgCtrl
            return null; 
        }
 
        /* 
        * Implements IMessageSink::GetNextSink
        */ 
        public IMessageSink NextSink
        {
            get
            { 
                return _nextSink;
            } 
        } 
    }
 
    //*************************************** WORK ITEM ********************************//
    /*
    *   A work item holds the info about a call to Sync or
    *   Async-ProcessMessage. 
    */
    internal class WorkItem 
    { 
        private const int FLG_WAITING  = 0x0001;
        private const int FLG_SIGNALED = 0x0002; 
        private const int FLG_ASYNC      = 0x0004;
        private const int FLG_DUMMY     = 0x0008;

        internal int _flags; 
        internal IMessage _reqMsg;
        internal IMessageSink _nextSink; 
        // ReplySink will be null for an sync work item. 
        internal IMessageSink _replySink;
        // ReplyMsg is set once the sync call is completed 
        internal IMessage _replyMsg;

        // Context in which the work should execute.
        internal Context _ctx; 

        internal LogicalCallContext _callCtx; 
        internal static InternalCrossContextDelegate _xctxDel = new InternalCrossContextDelegate(ExecuteCallback); 

        //DBGDBG 
        //internal int _thread;

        internal WorkItem(IMessage reqMsg, IMessageSink nextSink, IMessageSink replySink)
        { 
            _reqMsg = reqMsg;
            _replyMsg = null; 
            _nextSink = nextSink; 
            _replySink = replySink;
            _ctx = Thread.CurrentContext; 
            _callCtx = CallContext.GetLogicalCallContext();
            //DBGDBG
            //_thread = Thread.CurrentThread.GetHashCode();
        } 

        // To mark a work item being enqueued 
        internal virtual void SetWaiting() 
        {
            BCLDebug.Assert(!IsWaiting(),"!IsWaiting()"); 
            _flags |= FLG_WAITING;
        }

        internal virtual bool IsWaiting() 
        {
            return (_flags&FLG_WAITING) == FLG_WAITING; 
        } 

        // To mark a work item that has been given the green light! 
        internal virtual void SetSignaled()
        {
            BCLDebug.Assert(!IsSignaled(),"!IsSignaled()");
            _flags |= FLG_SIGNALED; 
        }
 
        internal virtual bool IsSignaled() 
        {
            return (_flags & FLG_SIGNALED) == FLG_SIGNALED; 
        }

        internal virtual void SetAsync()
        { 
            _flags |= FLG_ASYNC;
        } 
 
        internal virtual bool IsAsync()
        { 
            return (_flags & FLG_ASYNC) == FLG_ASYNC;
        }

        internal virtual void SetDummy() 
        {
            _flags |= FLG_DUMMY; 
        } 

        internal virtual bool IsDummy() 
        {
            return (_flags & FLG_DUMMY) == FLG_DUMMY;
        }
 

        internal static Object ExecuteCallback(Object[] args) 
        { 
            WorkItem This = (WorkItem) args[0];
 
            if (This.IsAsync())
            {
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] AsyncWork.Execute");
                This._nextSink.AsyncProcessMessage(This._reqMsg, This._replySink); 
            }
            else if (This._nextSink != null) 
            { 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] SyncWork.Execute");
                This._replyMsg = This._nextSink.SyncProcessMessage(This._reqMsg); 
            }
            return null;
        }
 
        /*
        *   Execute is called to complete a work item (sync or async). 
        *   Execute assumes that the context is set correctly and the lock 
        *   is taken (i.e. it makes no policy decisions)
        * 
        *   It is called from the following 3 points:
        *       1. thread pool thread executing the callback for an async item
        *       2. calling thread executing the callback for a queued sync item
        *       3. calling thread directly calling Execute for a non-queued sync item 
        */
        internal virtual void Execute() 
        { 
            // Execute should be called with the domain policy enforced
            // i.e. a Synchronization domain should be locked etc ... 
            BCLDebug.Assert(IsSignaled(),"IsSignaled()");

            Thread.CurrentThread.InternalCrossContextCallback(_ctx, _xctxDel, new Object[] { this } );
        } 
        internal virtual IMessage ReplyMessage { get {return _replyMsg;}}
    } 
 
    //*************************************** CLIENT SINK ********************************//
 
    /*
    *   Implements the client context sink contributed by the
    *   Property. The sink holds a back pointer to the property.
    *   The sink intercepts outgoing calls from objects the Context 
    *   and co-ordinates with the property to enforce the domain policy.
    */ 
    internal class SynchronizedClientContextSink 
            : InternalSink, IMessageSink
    { 
        internal IMessageSink   _nextSink;
        internal SynchronizationAttribute _property;

        internal SynchronizedClientContextSink(SynchronizationAttribute prop, IMessageSink nextSink) 
        {
            _property = prop; 
            _nextSink = nextSink; 
        }
 
        ~SynchronizedClientContextSink()
        {
            _property.Dispose();
        } 

        /* 
        *   Implements IMessageSink::SyncProcessMessage for the call-out sink 
        */
        public virtual IMessage SyncProcessMessage(IMessage reqMsg) 
        {
            BCLDebug.Assert(_property.Locked == true,"_property.Locked == true");
            IMessage replyMsg;
            if (_property.IsReEntrant) 
            {
                // In this case we are required to let anybody waiting for 
                // the domain to enter and execute 
                // Notify the property that we are leaving
                _property.HandleThreadExit(); 

                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] R: Sync call-out");
                replyMsg = _nextSink.SyncProcessMessage(reqMsg);
 
                // We will just block till we are given permission to re-enter
                // Notify the property that we wish to re-enter the domain. 
                // This will block the thread here if someone is in the domain. 
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] R: Sync call-out returned, waiting for lock");
                _property.HandleThreadReEntry(); 
                BCLDebug.Assert(_property.Locked == true,"_property.Locked == true");
            }
            else
            { 
                // In the non-reentrant case we are just a pass-through sink
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Sync call-out (pass through)"); 
                // We should mark the domain with our LCID so that call-backs are allowed to enter.. 
                LogicalCallContext cctx =
                    (LogicalCallContext) reqMsg.Properties[Message.CallContextKey]; 

                String lcid = cctx.RemotingData.LogicalCallID;
                bool bClear = false;
                if (lcid == null) 
                {
                    // We used to assign call-ids in RemotingProxy.cs at the 
                    // start of each Invoke. As an optimization we now do it 
                    // here in a delayed fashion... since currently only
                    // Synchronization needs it 
                    // Note that for Sync-calls we would just inherit an LCID
                    // if the call has one, if not we create one. However for
                    // async calls we always generate a new LCID.
                    lcid = Identity.GetNewLogicalCallID(); 
                    cctx.RemotingData.LogicalCallID = lcid;
                    bClear = true; 
 
                    BCLDebug.Assert(
                        _property.SyncCallOutLCID == null, 
                        "Synchronization domain is already in a callOut state");
                }

                bool bTopLevel=false; 
                if (_property.SyncCallOutLCID==null)
                { 
                    _property.SyncCallOutLCID = lcid; 
                    bTopLevel = true;
                } 

                BCLDebug.Assert(lcid.Equals(_property.SyncCallOutLCID), "Bad synchronization domain state!");

                replyMsg = _nextSink.SyncProcessMessage(reqMsg); 

                // if a top level call out returned we clear the callId in the domain 
                if (bTopLevel) 
                {
                    _property.SyncCallOutLCID = null; 

                    // The sync callOut is done, we do not need the lcid
                    // that was associated with the call any more.
                    // (clear it only if we added one to the reqMsg) 
                    if (bClear)
                    { 
                        // Note that we make changes to the callCtx in 
                        // the reply message ... since this is the one that
                        // will get installed back on the thread that called 
                        // the proxy.
                        LogicalCallContext cctxRet =
                            (LogicalCallContext) replyMsg.Properties[Message.CallContextKey];
                        BCLDebug.Assert( 
                            cctxRet != null,
                            "CallContext should be non-null"); 
                        cctxRet.RemotingData.LogicalCallID = null; 
                    }
                } 

                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Sync call-out returned");
            }
            return replyMsg; 
        }
 
        /* 
        *   Implements IMessageSink::AsyncProcessMessage for the call-out sink
        */ 
        public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink)
        {
            IMessageCtrl msgCtrl = null;
 
            BCLDebug.Assert(_property.Locked == true,"_property.Locked == true");
 
            if (!_property.IsReEntrant) 
            {
                // In this case new calls are not allowed to enter the domain 
                // We need to track potentially more than one async-call-outs
                // and allow the completion notifications to come in for those

                LogicalCallContext cctx = 
                    (LogicalCallContext) reqMsg.Properties[Message.CallContextKey];
                // We used to generate a new lcid automatically in RemotingProxy 
                // Invoke at the start of each Async call. 
                // However now we do it here as an optimization (since only
                // Synchronization needs it) 
                // RemotingProxy invoke code does Clone() the callContext at
                // the start of each Async call so we don't have to worry
                // about stomping someone else's lcid here.
 

                String lcid =  Identity.GetNewLogicalCallID(); 
                cctx.RemotingData.LogicalCallID = lcid; 

 
                BCLDebug.Assert(
                    _property.SyncCallOutLCID == null,
                    "Cannot handle async call outs when already in a top-level sync call out");
                //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: Async CallOut: adding to lcidList: " + lcid); 
                _property.AsyncCallOutLCIDList.Add(lcid);
            } 
            // We will call AsyncProcessMessage directly on this thread 
            // since the thread should not block much. However, we will
            // have to intercept the callback on the replySink chain for 
            // which we wrap the caller provided replySink into our sink.
            AsyncReplySink mySink = new AsyncReplySink(replySink, _property);

            // NOTE: we will need to yield the Synchronization Domain at 
            // some time or another to get our own callBack to complete.
 
            // Note that for the Async call-outs we have to provide an interception 
            // sink whether we are re-entrant or not since we want
            // the replySink.SyncProcessMessage call to be wait for the lock just like 
            // any other call-in.
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Async call-out");

            msgCtrl = _nextSink.AsyncProcessMessage(reqMsg, (IMessageSink)mySink); 
            //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] Async call-out AsyncPM returned, reply to come separately");
 
            return msgCtrl; 
        }
 
        /*
        *   Implements IMessageSink::GetNextSink
        */
        public IMessageSink NextSink 
        {
            get 
            { 
                return _nextSink;
            } 

        }

        /* 
        *   This class just implements the CallBack sink we provide to
        *   intercept the callback of an Async out-call. The CallBack sink 
        *   ensures that arbitrary threads do not enter our Synchronization 
        *   Domain without asking us if it is Ok!
        */ 
        internal class AsyncReplySink : IMessageSink
        {
            internal IMessageSink _nextSink;
            internal SynchronizationAttribute _property; 
            internal AsyncReplySink(IMessageSink nextSink, SynchronizationAttribute prop)
            { 
                _nextSink = nextSink; 
                _property = prop;
            } 

            public virtual IMessage SyncProcessMessage(IMessage reqMsg)
            {
 
                // We handle this as a regular new Sync workItem
                // 1. Create a work item 
                WorkItem work = new WorkItem(reqMsg, 
                                            _nextSink,
                                            null /* replySink */); 

                // 2. Notify the property to handle the WorkItem
                // The work item may get put in a Queue or may execute right away.
                _property.HandleWorkRequest(work); 

                if (!_property.IsReEntrant) 
                { 
                    // Remove the async lcid we had added to the call out list.
                    //DBGConsole.WriteLine(Thread.CurrentThread.GetHashCode()+"] NR: InterceptionSink::SyncPM Removing async call-out lcid: " + ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID); 
                    _property.AsyncCallOutLCIDList.Remove(
                        ((LogicalCallContext)reqMsg.Properties[Message.CallContextKey]).RemotingData.LogicalCallID);
                }
 
                // 3. Pick up retMsg from the WorkItem and return
                return work.ReplyMessage; 
            } 

            public virtual IMessageCtrl AsyncProcessMessage(IMessage reqMsg, IMessageSink replySink) 
            {
                throw new NotSupportedException();
            }
 
            /*
            * Implements IMessageSink::GetNextSink 
            */ 
            public IMessageSink NextSink
            { 
                get
                {
                    return _nextSink;
                } 
            }
        } 
    } 

} 


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK