UdpSocketReceiveManager.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Discovery / System / ServiceModel / Channels / UdpSocketReceiveManager.cs / 1305376 / UdpSocketReceiveManager.cs

                            //---------------------------------------------------------------- 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//---------------------------------------------------------------

namespace System.ServiceModel.Channels 
{
    using System; 
    using System.Collections.Generic; 
    using System.Net;
    using System.Net.Sockets; 
    using System.Runtime;
    using System.ServiceModel.Discovery;
    using System.Threading;
 
    sealed class UdpSocketReceiveManager
    { 
        BufferManager bufferManager; 
        Action continueReceivingCallback;
        int maxPendingReceivesPerSocket; 
        AsyncCallback onReceiveFrom;
        Action onStartReceiving;
        int openCount;
        IUdpReceiveHandler receiveHandler; 
        UdpSocket[] receiveSockets;
        Action onMessageDequeued; 
        object thisLock; 

        internal UdpSocketReceiveManager(UdpSocket[] receiveSockets, int maxPendingReceivesPerSocket, BufferManager bufferManager, IUdpReceiveHandler receiveHandler) 
        {
            Fx.Assert(receiveSockets != null, "receiveSockets parameter is null");
            Fx.Assert(receiveSockets.Length > 0, "receiveSockets parameter is empty");
            Fx.Assert(maxPendingReceivesPerSocket > 0, "maxPendingReceivesPerSocket can't be <= 0"); 
            Fx.Assert(receiveHandler.MaxReceivedMessageSize > 0, "maxReceivedMessageSize must be > 0");
            Fx.Assert(receiveHandler.MaxReceivedMessageSize <= UdpConstants.MaxMessageSizeOverIPv4, "maxReceivedMessageSize should be less than the largest possible UDP packet.  Actual Value=" + receiveHandler.MaxReceivedMessageSize); 
            Fx.Assert(bufferManager != null, "bufferManager argument should not be null"); 
            Fx.Assert(receiveHandler != null, "receiveHandler should not be null");
 
            this.receiveHandler = receiveHandler;
            this.thisLock = new object();
            this.bufferManager = bufferManager;
            this.receiveSockets = receiveSockets; 
            this.maxPendingReceivesPerSocket = maxPendingReceivesPerSocket;
        } 
 
        bool IsDisposed
        { 
            get
            {
                return this.openCount < 0;
            } 
        }
 
        public void SetReceiveHandler(IUdpReceiveHandler handler) 
        {
            Fx.Assert(handler != null, "IUdpReceiveHandler can't be null"); 
            Fx.Assert(handler.MaxReceivedMessageSize == this.receiveHandler.MaxReceivedMessageSize, "new receive handler's max message size doesn't match");
            Fx.Assert(this.openCount > 0, "SetReceiveHandler called on a closed UdpSocketReceiveManager");
            this.receiveHandler = handler;
        } 

        public void Close() 
        { 
            lock (this.thisLock)
            { 
                if (this.IsDisposed)
                {
                    return;
                } 

                this.openCount--; 
 
                if (this.openCount == 0)
                { 
                    this.openCount = -1;
                    for (int i = 0; i < this.receiveSockets.Length; i++)
                    {
                        this.receiveSockets[i].Close(); 
                    }
 
                    this.bufferManager.Clear(); 
                }
            } 
        }

        public void Open()
        { 
            lock (this.thisLock)
            { 
                ThrowIfDisposed(); 

                this.openCount++; 

                if (this.openCount == 1)
                {
                    for (int i = 0; i < this.receiveSockets.Length; i++) 
                    {
                        this.receiveSockets[i].Open(); 
                    } 

                    this.onMessageDequeued = new Action(OnMessageDequeued); 
                    this.onReceiveFrom = Fx.ThunkCallback(new AsyncCallback(OnReceiveFrom));
                    this.continueReceivingCallback = new Action(ContinueReceiving);
                }
            } 

 
            try 
            {
                if (Thread.CurrentThread.IsThreadPoolThread) 
                {
                    EnsureReceiving();
                }
                else 
                {
                    if (this.onStartReceiving == null) 
                    { 
                        this.onStartReceiving = new Action(OnStartReceiving);
                    } 

                    ActionItem.Schedule(this.onStartReceiving, this);
                }
            } 
            catch (Exception ex)
            { 
                if (!TryHandleException(ex)) 
                {
                    throw; 
                }
            }
        }
 
        static void OnStartReceiving(object state)
        { 
            UdpSocketReceiveManager thisPtr = (UdpSocketReceiveManager)state; 

            try 
            {
                if (thisPtr.IsDisposed)
                {
                    return; 
                }
 
                thisPtr.EnsureReceiving(); 
            }
            catch (Exception ex) 
            {
                if (!thisPtr.TryHandleException(ex))
                {
                    throw; 
                }
            } 
        } 

        void OnMessageDequeued() 
        {
            try
            {
                EnsureReceiving(); 
            }
            catch (Exception ex) 
            { 
                if (!TryHandleException(ex))
                { 
                    throw;
                }
            }
        } 

        void ContinueReceiving(object socket) 
        { 
            try
            { 
                while (StartAsyncReceive(socket as UdpSocket))
                {
                    Fx.Assert(Thread.CurrentThread.IsThreadPoolThread, "Receive loop is running on a non-threadpool thread.  If this thread disappears while a completion port operation is outstanding, then the operation will get canceled.");
                } 
            }
            catch (Exception ex) 
            { 
                if (!TryHandleException(ex))
                { 
                    throw;
                }
            }
        } 

        void OnReceiveFrom(IAsyncResult result) 
        { 
            if (result.CompletedSynchronously)
            { 
                return;
            }

            UdpSocketReceiveState state = (UdpSocketReceiveState)result.AsyncState; 

            ArraySegment messageBytes; 
            bool continueReceiving = true; 

            try 
            {
                lock (this.thisLock)
                {
                    if (this.IsDisposed) 
                    {
                        return; 
                    } 

                    messageBytes = EndReceiveFrom(result, state); 
                }

                continueReceiving = this.receiveHandler.HandleDataReceived(messageBytes, state.RemoteEndPoint, state.Socket.InterfaceIndex, this.onMessageDequeued);
            } 
            catch (Exception ex)
            { 
                if (!TryHandleException(ex)) 
                {
                    throw; 
                }
            }
            finally
            { 
                if (!this.IsDisposed && continueReceiving)
                { 
                    ContinueReceiving(state.Socket); 
                }
            } 
        }

        //returns true if receive completed synchronously, false otherwise
        bool StartAsyncReceive(UdpSocket socket) 
        {
            Fx.Assert(socket != null, "UdpSocketReceiveManager.StartAsyncReceive: Socket should never be null"); 
 
            byte[] buffer = null;
            bool throwing = true; 
            bool completedSync = false;

            try
            { 
                ArraySegment messageBytes = default(ArraySegment);
                UdpSocketReceiveState state = null; 
 
                lock (this.thisLock)
                { 
                    if (!this.IsDisposed && socket.PendingReceiveCount < this.maxPendingReceivesPerSocket)
                    {
                        buffer = this.bufferManager.TakeBuffer(this.receiveHandler.MaxReceivedMessageSize);
 
                        state = new UdpSocketReceiveState(socket);
                        EndPoint remoteEndpoint = socket.CreateIPAnyEndPoint(); 
 
                        IAsyncResult result = socket.BeginReceiveFrom(buffer, 0, buffer.Length, ref remoteEndpoint, onReceiveFrom, state);
                        if (result.CompletedSynchronously) 
                        {
                            completedSync = true;
                            messageBytes = EndReceiveFrom(result, state);
 
                        }
                        throwing = false; 
                    } 
                }
 
                //when receiveHandler.HandleDataReceived is called (whether now or later), it will return the buffer to the buffer manager.
                if (completedSync && !this.receiveHandler.HandleDataReceived(messageBytes, state.RemoteEndPoint, state.Socket.InterfaceIndex, this.onMessageDequeued))
                {
                    //if HandleDataReceived returns false, it means that the max pending message count was hit. 
                    return false;
                } 
            } 
            finally
            { 
                if (buffer != null && throwing)
                {
                    this.bufferManager.ReturnBuffer(buffer);
                } 
            }
 
            return completedSync; 
        }
 
        void EnsureReceiving()
        {
            for (int i = 0; i < this.receiveSockets.Length; i++)
            { 
                UdpSocket socket = this.receiveSockets[i];
 
                while(!this.IsDisposed && socket.PendingReceiveCount < this.maxPendingReceivesPerSocket) 
                {
                    bool jumpThreads = false; 
                    try
                    {
                        if (StartAsyncReceive(socket) && !Thread.CurrentThread.IsThreadPoolThread)
                        { 
                            jumpThreads = true;
                        } 
                    } 
                    catch (CommunicationException ex)
                    { 
                        //message too big, ICMP errors, etc, are translated by the socket into a CommunicationException derived exception.
                        //These should not be fatal to the receive loop, so we need to continue receiving.
                        this.receiveHandler.HandleAsyncException(ex);
                        jumpThreads = !Thread.CurrentThread.IsThreadPoolThread; 
                    }
 
                    if (jumpThreads) 
                    {
                        ActionItem.Schedule(this.continueReceivingCallback, socket); 
                        break; //while loop.
                    }
                }
            } 
        }
 
        void ThrowIfDisposed() 
        {
            if (this.IsDisposed) 
            {
                throw FxTrace.Exception.AsError(new ObjectDisposedException("SocketReceiveManager"));
            }
        } 

        bool TryHandleException(Exception ex) 
        { 
            if (Fx.IsFatal(ex))
            { 
                return false;
            }

            this.receiveHandler.HandleAsyncException(ex); 
            return true;
        } 
 
        //call under a lock
        ArraySegment EndReceiveFrom(IAsyncResult result, UdpSocketReceiveState state) 
        {
            EndPoint remoteEndpoint = null;
            ArraySegment messageBytes = state.Socket.EndReceiveFrom(result, ref remoteEndpoint);
            state.RemoteEndPoint = remoteEndpoint; 

            return messageBytes; 
        } 

        internal class UdpSocketReceiveState 
        {
            public UdpSocketReceiveState(UdpSocket socket)
            {
                Fx.Assert(socket != null, "UdpSocketReceiveState.ctor: socket should not be null"); 

                this.Socket = socket; 
            } 

            public EndPoint RemoteEndPoint 
            {
                get;
                set;
            } 

            internal UdpSocket Socket 
            { 
                get;
                private set; 
            }
        }
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK