UdpChannelListener.cs source code in C# .NET

Source code for the .NET framework in C#

                        

Code:

/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Discovery / System / ServiceModel / Channels / UdpChannelListener.cs / 1484997 / UdpChannelListener.cs

                            //---------------------------------------------------------------- 
// Copyright (c) Microsoft Corporation.  All rights reserved.
//---------------------------------------------------------------

namespace System.ServiceModel.Channels 
{
    using System; 
    using System.Collections.Generic; 
    using System.Net;
    using System.Net.NetworkInformation; 
    using System.Net.Sockets;
    using System.Runtime;
    using System.ServiceModel.Description;
    using System.ServiceModel.Discovery; 
    using System.Threading;
 
    class UdpChannelListener 
        : ChannelListenerBase, IUdpReceiveHandler
    { 
        BufferManager bufferManager;
        ServerUdpDuplexChannel channelInstance;
        InputQueue channelQueue;
        DuplicateMessageDetector duplicateDetector; 
        bool isMulticast;
        List listenSockets; 
        Uri listenUri; 
        MessageEncoderFactory messageEncoderFactory;
        EventHandler onChannelClosed; 
        IUdpTransportSettings settings;
        UdpSocketReceiveManager socketReceiveManager;
        int cleanedUp;
 

        internal UdpChannelListener(IUdpTransportSettings settings, BindingContext context) 
            : base(context.Binding) 
        {
            Fx.Assert(settings != null, "settings can't be null"); 
            Fx.Assert(context != null, "BindingContext parameter can't be null");

            this.settings = settings;
            this.cleanedUp = 0; 

            this.duplicateDetector = null; 
 
            if (settings.DuplicateMessageHistoryLength > 0)
            { 
                this.duplicateDetector = new DuplicateMessageDetector(settings.DuplicateMessageHistoryLength);
            }

            this.bufferManager = BufferManager.CreateBufferManager(settings.MaxBufferPoolSize, (int)settings.MaxReceivedMessageSize); 
            this.onChannelClosed = new EventHandler(OnChannelClosed);
 
            if (this.settings.SocketReceiveBufferSize < this.settings.MaxReceivedMessageSize) 
            {
                throw FxTrace.Exception.ArgumentOutOfRange("SocketReceiveBufferSize", this.settings.SocketReceiveBufferSize, 
                    SR.Property1LessThanOrEqualToProperty2("MaxReceivedMessageSize", this.settings.MaxReceivedMessageSize,
                    "SocketReceiveBufferSize", this.settings.SocketReceiveBufferSize));
            }
 
            this.messageEncoderFactory = UdpUtility.GetEncoder(context);
 
            InitUri(context); 

            //Note: because we are binding the sockets in InitSockets, we can start receiving data immediately. 
            //If there is a delay between the Building of the listener and the call to Open, stale data could build up
            //inside the Winsock buffer.  We have decided that making sure the port is updated correctly in the listen uri
            //(e.g. in the ListenUriMode.Unique case) before leaving the build step is more important than the
            //potential for stale data. 
            InitSockets(context.ListenUriMode == ListenUriMode.Unique);
 
            Fx.Assert(!this.listenUri.IsDefaultPort, "Listen Uri's port should never be the default port: " + this.listenUri); 
        }
 
        public MessageEncoderFactory MessageEncoderFactory
        {
            get
            { 
                return this.messageEncoderFactory;
            } 
        } 

        public override Uri Uri 
        {
            get
            {
                return this.listenUri; 
            }
        } 
 
        protected override TimeSpan DefaultReceiveTimeout
        { 
            get
            {
                return UdpConstants.Defaults.ReceiveTimeout;
            } 
        }
 
        protected override TimeSpan DefaultSendTimeout 
        {
            get 
            {
                return UdpConstants.Defaults.SendTimeout;
            }
        } 

        int IUdpReceiveHandler.MaxReceivedMessageSize 
        { 
            get { return (int)this.settings.MaxReceivedMessageSize; }
        } 

        string Scheme
        {
            get 
            {
                return UdpConstants.Scheme; 
            } 
        }
 
        public override T GetProperty()
        {
            T messageEncoderProperty = this.MessageEncoderFactory.Encoder.GetProperty();
            if (messageEncoderProperty != null) 
            {
                return messageEncoderProperty; 
            } 

            if (typeof(T) == typeof(MessageVersion)) 
            {
                return (T)(object)this.MessageEncoderFactory.Encoder.MessageVersion;
            }
            return base.GetProperty(); 
        }
 
        void IUdpReceiveHandler.HandleAsyncException(Exception ex) 
        {
            HandleReceiveException(ex); 
        }

        //returns false if the message was dropped because the max pending message count was hit.
        bool IUdpReceiveHandler.HandleDataReceived(ArraySegment data, EndPoint remoteEndpoint, int interfaceIndex, Action onMessageDequeuedCallback) 
        {
            BufferManager bufferManager = this.bufferManager; 
            bool returnBuffer = true; 
            string messageHash = null;
            Message message = null; 
            bool continueReceiving = true;

            try
            { 
                IPEndPoint remoteIPEndPoint = (IPEndPoint)remoteEndpoint;
                if (bufferManager != null) 
                { 
                    message = UdpUtility.DecodeMessage(this.duplicateDetector, this.messageEncoderFactory.Encoder,
                        bufferManager, data, remoteIPEndPoint, interfaceIndex, true, out messageHash); 

                    if (message != null)
                    {
                        continueReceiving = Dispatch(message, onMessageDequeuedCallback); 
                        returnBuffer = !continueReceiving;
                    } 
                } 
                else
                { 
                    Fx.Assert(this.State != CommunicationState.Opened, "buffer manager should only be null when closing down and the channel instance has taken control of the receive manager.");

                    IUdpReceiveHandler receiveHandler = (IUdpReceiveHandler)this.channelInstance;
 
                    if (receiveHandler != null)
                    { 
                        returnBuffer = false; //let the channel instance take care of the buffer 
                        continueReceiving = receiveHandler.HandleDataReceived(data, remoteEndpoint, interfaceIndex, onMessageDequeuedCallback);
                    } 
                    else
                    {
                        //both channel and listener are shutting down, so drop the message and stop the receive loop
                        continueReceiving = false; 
                    }
                } 
            } 
            catch (Exception e)
            { 
                if (Fx.IsFatal(e))
                {
                    returnBuffer = false;
                    throw; 
                }
 
                HandleReceiveException(e); 
            }
            finally 
            {
                if (returnBuffer)
                {
                    if (message != null) 
                    {
                        if (this.duplicateDetector != null) 
                        { 
                            Fx.Assert(messageHash != null, "message hash should always be available if duplicate detector is enabled");
                            this.duplicateDetector.RemoveEntry(messageHash); 
                        }

                        message.Close(); // implicitly returns the buffer
                    } 
                    else
                    { 
                        bufferManager.ReturnBuffer(data.Array); 
                    }
                } 
            }

            return continueReceiving;
        } 

        protected override Type GetCommunicationObjectType() 
        { 
            return this.GetType();
        } 


        protected override void OnAbort()
        { 
            Cleanup();
        } 
 
        protected override IDuplexChannel OnAcceptChannel(TimeSpan timeout)
        { 
            ThrowPending();

            TimeoutHelper.ThrowIfNegativeArgument(timeout);
 
            return this.channelQueue.Dequeue(timeout);
        } 
 
        protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            ThrowPending();
            TimeoutHelper.ThrowIfNegativeArgument(timeout);

 
            return this.channelQueue.BeginDequeue(timeout, callback, state);
        } 
 

        protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) 
        {
            this.OnClose(timeout);
            return new CompletedAsyncResult(callback, state);
        } 

        protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) 
        { 
            this.OnOpen(timeout);
            return new CompletedAsyncResult(callback, state); 
        }

        protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
        { 
            ThrowPending();
 
            TimeoutHelper.ThrowIfNegativeArgument(timeout); 
            return this.channelQueue.BeginWaitForItem(timeout, callback, state);
        } 

        protected override void OnClosing()
        {
            if (this.channelInstance != null) 
            {
                lock (ThisLock) 
                { 
                    if (this.channelInstance != null)
                    { 
                        if (this.channelInstance.TransferReceiveManagerOwnership(this.socketReceiveManager,
                            this.duplicateDetector))
                        {
                            //don't clean these objects up, they now belong to the channel instance 
                            this.socketReceiveManager = null;
                            this.duplicateDetector = null; 
                            this.bufferManager = null; 
                        }
                    } 

                    this.channelInstance = null;
                }
            } 

            base.OnClosing(); 
        } 

        protected override void OnClose(TimeSpan timeout) 
        {
            Cleanup();
        }
 

        protected override IDuplexChannel OnEndAcceptChannel(IAsyncResult result) 
        { 
            ServerUdpDuplexChannel channel;
            if (this.channelQueue.EndDequeue(result, out channel)) 
            {
                return channel;
            }
            else 
            {
                throw FxTrace.Exception.AsError(new TimeoutException()); 
            } 
        }
 
        protected override void OnEndClose(IAsyncResult result)
        {
            CompletedAsyncResult.End(result);
        } 

        protected override void OnEndOpen(IAsyncResult result) 
        { 
            CompletedAsyncResult.End(result);
        } 

        protected override bool OnEndWaitForChannel(IAsyncResult result)
        {
            return this.channelQueue.EndWaitForItem(result); 
        }
 
 
        protected override void OnOpen(TimeSpan timeout)
        { 

        }

        protected override void OnOpened() 
        {
            this.channelQueue = new InputQueue(); 
 
            Fx.Assert(this.socketReceiveManager == null, "receive manager shouldn't be initialized yet");
 
            this.socketReceiveManager = new UdpSocketReceiveManager(this.listenSockets.ToArray(),
                UdpConstants.PendingReceiveCountPerProcessor * Environment.ProcessorCount,
                this.bufferManager,
                this); 

            //do the state change to CommunicationState.Opened before starting the receive loop. 
            //this avoids a ---- between transitioning state and processing messages that are 
            //already in the socket receive buffer.
            base.OnOpened(); 

            this.socketReceiveManager.Open();
        }
 
        protected override bool OnWaitForChannel(TimeSpan timeout)
        { 
            ThrowPending(); 

            TimeoutHelper.ThrowIfNegativeArgument(timeout); 
            return this.channelQueue.WaitForItem(timeout);
        }

        void Cleanup() 
        {
            if (Interlocked.Increment(ref this.cleanedUp) == 1) 
            { 
                lock (this.ThisLock)
                { 
                    if (this.socketReceiveManager != null)
                    {
                        this.socketReceiveManager.Close();
                        this.socketReceiveManager = null; 
                    }
 
                    if (this.listenSockets != null) 
                    {
                        //don't close the sockets...we don't open them, socket manager does. 
                        //closing them would cause ref counts to get out of [....].
                        this.listenSockets.Clear();
                        this.listenSockets = null;
                    } 
                }
 
                if (this.bufferManager != null) 
                {
                    this.bufferManager.Clear(); 
                }

                if (this.channelQueue != null)
                { 
                    this.channelQueue.Close();
                } 
 
                if (this.duplicateDetector != null)
                { 
                    this.duplicateDetector.Dispose();
                }
            }
        } 

        //must be called under a lock 
        bool CreateOrRetrieveChannel(out ServerUdpDuplexChannel channel) 
        {
            bool channelCreated = false; 

            channel = this.channelInstance;

            if (channel == null) 
            {
                channelCreated = true; 
 
                UdpSocket[] sendSockets = this.listenSockets.ToArray();
 
                channel = new ServerUdpDuplexChannel(this, sendSockets, new EndpointAddress(this.listenUri), this.listenUri, this.isMulticast);

                this.channelInstance = channel;
 
                channel.Closed += this.onChannelClosed;
            } 
 
            return channelCreated;
        } 

        bool Dispatch(Message message, Action onMessageDequeuedCallback)
        {
            ServerUdpDuplexChannel channel; 
            bool channelCreated;
 
            lock (this.ThisLock) 
            {
                if (this.State != CommunicationState.Opened) 
                {
                    Fx.Assert(this.State > CommunicationState.Opened, "DispatchMessage called when object is not fully opened.  This would indicate that the receive loop started before transitioning to CommunicationState.Opened, which should not happen.");

                    //Shutting down - the message will get closed by the caller (IUdpReceiveHandler.OnMessageReceivedCallback) 
                    return false;
                } 
 
                channelCreated = CreateOrRetrieveChannel(out channel);
            } 

            if (channelCreated)
            {
                this.channelQueue.EnqueueAndDispatch(channel, null, false); 
            }
 
            return channel.EnqueueMessage(message, onMessageDequeuedCallback); 
        }
 
        //Tries to enqueue this async exception onto the channel instance if possible,
        //puts it onto the local exception queue otherwise.
        void HandleReceiveException(Exception ex)
        { 
            UdpDuplexChannel channel = this.channelInstance;
 
            if (channel != null) 
            {
                channel.HandleReceiveException(ex); 
            }
            else
            {
                if (ServerUdpDuplexChannel.CanIgnoreException(ex)) 
                {
                    FxTrace.Exception.AsWarning(ex); 
                } 
                else
                { 
                    this.channelQueue.EnqueueAndDispatch(UdpUtility.WrapAsyncException(ex), null, false);
                }
            }
        } 

        void InitExplicitUri(Uri listenUriBaseAddress, string relativeAddress) 
        { 
            if (listenUriBaseAddress.IsDefaultPort || listenUriBaseAddress.Port == 0)
            { 
                throw FxTrace.Exception.ArgumentOutOfRange("context.ListenUriBaseAddress", listenUriBaseAddress, SR.ExplicitListenUriModeRequiresPort);
            }

            this.listenUri = UdpUtility.AppendRelativePath(listenUriBaseAddress, relativeAddress); 

        } 
 
        void InitSockets(bool updateListenPort)
        { 
            bool ipV4;
            bool ipV6;

            UdpUtility.CheckSocketSupport(out ipV4, out ipV6); 

            Fx.Assert(this.listenSockets == null, "listen sockets should only be initialized once"); 
 
            this.listenSockets = new List();
 
            int port = (this.listenUri.IsDefaultPort ? 0 : this.listenUri.Port);

            if (this.listenUri.HostNameType == UriHostNameType.IPv6 ||
                this.listenUri.HostNameType == UriHostNameType.IPv4) 
            {
                IPAddress address = IPAddress.Parse(this.listenUri.DnsSafeHost); 
 
                if (UdpUtility.IsMulticastAddress(address))
                { 
                    if (this.settings.EnableMulticast == false)
                    {
                        throw FxTrace.Exception.AsError(new InvalidOperationException(SR.UdpMulticastNotEnabled(this.listenUri)));
                    } 

                    this.isMulticast = true; 
 
                    NetworkInterface[] adapters = UdpUtility.GetMulticastInterfaces(settings.MulticastInterfaceId);
 
                    //if listening on a specific adapter, don't disable multicast loopback on that adapter.
                    bool allowMulticastLoopback = !string.IsNullOrEmpty(this.settings.MulticastInterfaceId);

                    for (int i = 0; i < adapters.Length; i++) 
                    {
                        if (adapters[i].OperationalStatus == OperationalStatus.Up) 
                        { 
                            IPInterfaceProperties properties = adapters[i].GetIPProperties();
                            bool isLoopbackAdapter = adapters[i].NetworkInterfaceType == NetworkInterfaceType.Loopback; 

                            if (isLoopbackAdapter)
                            {
                                listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, 
                                    UdpUtility.GetLoopbackInterfaceIndex(adapters[i], address.AddressFamily == AddressFamily.InterNetwork),
                                    allowMulticastLoopback, isLoopbackAdapter)); 
                            } 
                            else if (this.listenUri.HostNameType == UriHostNameType.IPv6)
                            { 
                                if (adapters[i].Supports(NetworkInterfaceComponent.IPv6))
                                {
                                    IPv6InterfaceProperties v6Properties = properties.GetIPv6Properties();
 
                                    if (v6Properties != null)
                                    { 
                                        listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, 
                                            this.settings.TimeToLive, v6Properties.Index, allowMulticastLoopback, isLoopbackAdapter));
                                    } 
                                }
                            }
                            else
                            { 
                                if (adapters[i].Supports(NetworkInterfaceComponent.IPv4))
                                { 
                                    IPv4InterfaceProperties v4Properties = properties.GetIPv4Properties(); 
                                    if (v4Properties != null)
                                    { 
                                        listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize,
                                            this.settings.TimeToLive, v4Properties.Index, allowMulticastLoopback, isLoopbackAdapter));
                                    }
                                } 
                            }
                        } 
                    } 

                    if (listenSockets.Count == 0) 
                    {
                        throw FxTrace.Exception.AsError(new ArgumentException(SR.UdpFailedToFindMulticastAdapter(this.listenUri)));
                    }
                } 
                else
                { 
                    //unicast - only sends on the default adapter... 
                    this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(address, ref port,
                        this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); 
                }
            }
            else
            { 
                IPAddress v4Address = IPAddress.Any;
                IPAddress v6Address = IPAddress.IPv6Any; 
 
                if (ipV4 && ipV6)
                { 
                    if (port == 0)
                    {
                        //port 0 is only allowed when ListenUriMode == ListenUriMode.Unique
                        UdpSocket ipv4Socket, ipv6Socket; 
                        port = UdpUtility.CreateListenSocketsOnUniquePort(v4Address, v6Address, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, out ipv4Socket, out ipv6Socket);
 
                        this.listenSockets.Add(ipv4Socket); 
                        this.listenSockets.Add(ipv6Socket);
                    } 
                    else
                    {
                        this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v4Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive));
                        this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v6Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); 
                    }
                } 
                else if (ipV4) 
                {
                    this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v4Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); 
                }
                else if (ipV6)
                {
                    this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v6Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); 
                }
            } 
 
            if (updateListenPort && port != this.listenUri.Port)
            { 
                UriBuilder uriBuilder = new UriBuilder(this.listenUri);
                uriBuilder.Port = port;
                this.listenUri = uriBuilder.Uri;
            } 
        }
 
        void InitUniqueUri(Uri listenUriBaseAddress, string relativeAddress) 
        {
            Fx.Assert(listenUriBaseAddress != null, "listenUriBaseAddress parameter should have been verified before now"); 

            listenUriBaseAddress = UdpUtility.AppendRelativePath(listenUriBaseAddress, relativeAddress);

            this.listenUri = UdpUtility.AppendRelativePath(listenUriBaseAddress, Guid.NewGuid().ToString()); 
        }
 
        void InitUri(BindingContext context) 
        {
            Uri listenUriBase = context.ListenUriBaseAddress; 

            if (context.ListenUriMode == ListenUriMode.Unique && listenUriBase == null)
            {
                UriBuilder uriBuilder = new UriBuilder(this.Scheme, DnsCache.MachineName); 
                uriBuilder.Path = Guid.NewGuid().ToString();
                listenUriBase = uriBuilder.Uri; 
                context.ListenUriBaseAddress = listenUriBase; 
            }
            else 
            {
                if (listenUriBase == null)
                {
                    throw FxTrace.Exception.ArgumentNull("context.ListenUriBaseAddress"); 
                }
 
                if (!listenUriBase.IsAbsoluteUri) 
                {
                    throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.RelativeUriNotAllowed(listenUriBase)); 
                }

                if (context.ListenUriMode == ListenUriMode.Unique && !listenUriBase.IsDefaultPort)
                { 
                    throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.DefaultPortRequiredForListenUriModeUnique(listenUriBase));
                } 
 
                if (listenUriBase.Scheme.Equals(this.Scheme, StringComparison.OrdinalIgnoreCase) == false)
                { 
                    throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.UriSchemeNotSupported(listenUriBase.Scheme));
                }

                if (!UdpUtility.IsSupportedHostNameType(listenUriBase.HostNameType)) 
                {
                    throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.UnsupportedUriHostNameType(listenUriBase.Host, listenUriBase.HostNameType)); 
                } 
            }
 
            switch (context.ListenUriMode)
            {
                case ListenUriMode.Explicit:
                    InitExplicitUri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); 
                    break;
                case ListenUriMode.Unique: 
                    InitUniqueUri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); 
                    break;
                default: 
                    Fx.AssertAndThrow("Unhandled ListenUriMode encountered: " + context.ListenUriMode);
                    break;
            }
        } 

        void OnChannelClosed(object sender, EventArgs args) 
        { 
            ServerUdpDuplexChannel closingChannel = (ServerUdpDuplexChannel)sender;
            closingChannel.Closed -= this.onChannelClosed; 

            lock (ThisLock)
            {
                //set to null within a lock because other code 
                //assumes that the instance will not suddenly become null
                //if it already holds the lock. 
                this.channelInstance = null; 
            }
        } 

        sealed class ServerUdpDuplexChannel : UdpDuplexChannel
        {
            //the listener's buffer manager is used, but the channel won't clear it unless 
            //UdpChannelListener.OnClosing successfully transfers ownership to the channel instance.
            public ServerUdpDuplexChannel(UdpChannelListener listener, UdpSocket[] sendSockets, EndpointAddress localAddress, Uri via, bool isMulticast) 
                : base(listener, listener.MessageEncoderFactory.Encoder, listener.bufferManager, 
                sendSockets, listener.settings.RetransmissionSettings, listener.settings.MaxPendingMessageCount,
                localAddress, via, isMulticast, (int)listener.settings.MaxReceivedMessageSize) 
            {
            }

            protected override bool IgnoreSerializationException 
            {
                get 
                { 
                    return true;
                } 
            }

            //there are some errors on the server side that we should just ignore because the server will not need
            //to change its behavior if it sees the exception.  These errors are not ignored on the client 
            //because it may need to adjust settings (e.g. TTL, send smaller messages, double check server address for correctness)
            internal static bool CanIgnoreException(Exception ex) 
            { 
                SocketError error;
                if (UdpUtility.TryGetSocketError(ex, out error)) 
                {
                    switch (error)
                    {
                        case SocketError.ConnectionReset: //"ICMP Destination Unreachable" error - client closed the socket 
                        case SocketError.NetworkReset: //ICMP: Time Exceeded (TTL expired)
                        case SocketError.MessageSize: //client sent a message larger than the max message size allowed. 
                            return true; 
                    }
                } 
                return false;
            }

            internal override void HandleReceiveException(Exception ex) 
            {
                if (CanIgnoreException(ex)) 
                { 
                    FxTrace.Exception.AsWarning(ex);
                } 
                else
                {
                    //base implementation will wrap the exception and enqueue it.
                    base.HandleReceiveException(ex); 
                }
            } 
        } 
    }
} 

// File provided for Reference Use Only by Microsoft Corporation (c) 2007.


                        

Link Menu

Network programming in C#, Network Programming in VB.NET, Network Programming in .NET
This book is available now!
Buy at Amazon US or
Buy at Amazon UK