Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / cdf / src / NetFx40 / System.ServiceModel.Discovery / System / ServiceModel / Channels / UdpChannelListener.cs / 1484997 / UdpChannelListener.cs
//---------------------------------------------------------------- // Copyright (c) Microsoft Corporation. All rights reserved. //--------------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.Collections.Generic; using System.Net; using System.Net.NetworkInformation; using System.Net.Sockets; using System.Runtime; using System.ServiceModel.Description; using System.ServiceModel.Discovery; using System.Threading; class UdpChannelListener : ChannelListenerBase, IUdpReceiveHandler { BufferManager bufferManager; ServerUdpDuplexChannel channelInstance; InputQueue channelQueue; DuplicateMessageDetector duplicateDetector; bool isMulticast; List listenSockets; Uri listenUri; MessageEncoderFactory messageEncoderFactory; EventHandler onChannelClosed; IUdpTransportSettings settings; UdpSocketReceiveManager socketReceiveManager; int cleanedUp; internal UdpChannelListener(IUdpTransportSettings settings, BindingContext context) : base(context.Binding) { Fx.Assert(settings != null, "settings can't be null"); Fx.Assert(context != null, "BindingContext parameter can't be null"); this.settings = settings; this.cleanedUp = 0; this.duplicateDetector = null; if (settings.DuplicateMessageHistoryLength > 0) { this.duplicateDetector = new DuplicateMessageDetector(settings.DuplicateMessageHistoryLength); } this.bufferManager = BufferManager.CreateBufferManager(settings.MaxBufferPoolSize, (int)settings.MaxReceivedMessageSize); this.onChannelClosed = new EventHandler(OnChannelClosed); if (this.settings.SocketReceiveBufferSize < this.settings.MaxReceivedMessageSize) { throw FxTrace.Exception.ArgumentOutOfRange("SocketReceiveBufferSize", this.settings.SocketReceiveBufferSize, SR.Property1LessThanOrEqualToProperty2("MaxReceivedMessageSize", this.settings.MaxReceivedMessageSize, "SocketReceiveBufferSize", this.settings.SocketReceiveBufferSize)); } this.messageEncoderFactory = UdpUtility.GetEncoder(context); InitUri(context); //Note: because we are binding the sockets in InitSockets, we can start receiving data immediately. //If there is a delay between the Building of the listener and the call to Open, stale data could build up //inside the Winsock buffer. We have decided that making sure the port is updated correctly in the listen uri //(e.g. in the ListenUriMode.Unique case) before leaving the build step is more important than the //potential for stale data. InitSockets(context.ListenUriMode == ListenUriMode.Unique); Fx.Assert(!this.listenUri.IsDefaultPort, "Listen Uri's port should never be the default port: " + this.listenUri); } public MessageEncoderFactory MessageEncoderFactory { get { return this.messageEncoderFactory; } } public override Uri Uri { get { return this.listenUri; } } protected override TimeSpan DefaultReceiveTimeout { get { return UdpConstants.Defaults.ReceiveTimeout; } } protected override TimeSpan DefaultSendTimeout { get { return UdpConstants.Defaults.SendTimeout; } } int IUdpReceiveHandler.MaxReceivedMessageSize { get { return (int)this.settings.MaxReceivedMessageSize; } } string Scheme { get { return UdpConstants.Scheme; } } public override T GetProperty () { T messageEncoderProperty = this.MessageEncoderFactory.Encoder.GetProperty (); if (messageEncoderProperty != null) { return messageEncoderProperty; } if (typeof(T) == typeof(MessageVersion)) { return (T)(object)this.MessageEncoderFactory.Encoder.MessageVersion; } return base.GetProperty (); } void IUdpReceiveHandler.HandleAsyncException(Exception ex) { HandleReceiveException(ex); } //returns false if the message was dropped because the max pending message count was hit. bool IUdpReceiveHandler.HandleDataReceived(ArraySegment data, EndPoint remoteEndpoint, int interfaceIndex, Action onMessageDequeuedCallback) { BufferManager bufferManager = this.bufferManager; bool returnBuffer = true; string messageHash = null; Message message = null; bool continueReceiving = true; try { IPEndPoint remoteIPEndPoint = (IPEndPoint)remoteEndpoint; if (bufferManager != null) { message = UdpUtility.DecodeMessage(this.duplicateDetector, this.messageEncoderFactory.Encoder, bufferManager, data, remoteIPEndPoint, interfaceIndex, true, out messageHash); if (message != null) { continueReceiving = Dispatch(message, onMessageDequeuedCallback); returnBuffer = !continueReceiving; } } else { Fx.Assert(this.State != CommunicationState.Opened, "buffer manager should only be null when closing down and the channel instance has taken control of the receive manager."); IUdpReceiveHandler receiveHandler = (IUdpReceiveHandler)this.channelInstance; if (receiveHandler != null) { returnBuffer = false; //let the channel instance take care of the buffer continueReceiving = receiveHandler.HandleDataReceived(data, remoteEndpoint, interfaceIndex, onMessageDequeuedCallback); } else { //both channel and listener are shutting down, so drop the message and stop the receive loop continueReceiving = false; } } } catch (Exception e) { if (Fx.IsFatal(e)) { returnBuffer = false; throw; } HandleReceiveException(e); } finally { if (returnBuffer) { if (message != null) { if (this.duplicateDetector != null) { Fx.Assert(messageHash != null, "message hash should always be available if duplicate detector is enabled"); this.duplicateDetector.RemoveEntry(messageHash); } message.Close(); // implicitly returns the buffer } else { bufferManager.ReturnBuffer(data.Array); } } } return continueReceiving; } protected override Type GetCommunicationObjectType() { return this.GetType(); } protected override void OnAbort() { Cleanup(); } protected override IDuplexChannel OnAcceptChannel(TimeSpan timeout) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.Dequeue(timeout); } protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.BeginDequeue(timeout, callback, state); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { this.OnClose(timeout); return new CompletedAsyncResult(callback, state); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { this.OnOpen(timeout); return new CompletedAsyncResult(callback, state); } protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.BeginWaitForItem(timeout, callback, state); } protected override void OnClosing() { if (this.channelInstance != null) { lock (ThisLock) { if (this.channelInstance != null) { if (this.channelInstance.TransferReceiveManagerOwnership(this.socketReceiveManager, this.duplicateDetector)) { //don't clean these objects up, they now belong to the channel instance this.socketReceiveManager = null; this.duplicateDetector = null; this.bufferManager = null; } } this.channelInstance = null; } } base.OnClosing(); } protected override void OnClose(TimeSpan timeout) { Cleanup(); } protected override IDuplexChannel OnEndAcceptChannel(IAsyncResult result) { ServerUdpDuplexChannel channel; if (this.channelQueue.EndDequeue(result, out channel)) { return channel; } else { throw FxTrace.Exception.AsError(new TimeoutException()); } } protected override void OnEndClose(IAsyncResult result) { CompletedAsyncResult.End(result); } protected override void OnEndOpen(IAsyncResult result) { CompletedAsyncResult.End(result); } protected override bool OnEndWaitForChannel(IAsyncResult result) { return this.channelQueue.EndWaitForItem(result); } protected override void OnOpen(TimeSpan timeout) { } protected override void OnOpened() { this.channelQueue = new InputQueue (); Fx.Assert(this.socketReceiveManager == null, "receive manager shouldn't be initialized yet"); this.socketReceiveManager = new UdpSocketReceiveManager(this.listenSockets.ToArray(), UdpConstants.PendingReceiveCountPerProcessor * Environment.ProcessorCount, this.bufferManager, this); //do the state change to CommunicationState.Opened before starting the receive loop. //this avoids a ---- between transitioning state and processing messages that are //already in the socket receive buffer. base.OnOpened(); this.socketReceiveManager.Open(); } protected override bool OnWaitForChannel(TimeSpan timeout) { ThrowPending(); TimeoutHelper.ThrowIfNegativeArgument(timeout); return this.channelQueue.WaitForItem(timeout); } void Cleanup() { if (Interlocked.Increment(ref this.cleanedUp) == 1) { lock (this.ThisLock) { if (this.socketReceiveManager != null) { this.socketReceiveManager.Close(); this.socketReceiveManager = null; } if (this.listenSockets != null) { //don't close the sockets...we don't open them, socket manager does. //closing them would cause ref counts to get out of [....]. this.listenSockets.Clear(); this.listenSockets = null; } } if (this.bufferManager != null) { this.bufferManager.Clear(); } if (this.channelQueue != null) { this.channelQueue.Close(); } if (this.duplicateDetector != null) { this.duplicateDetector.Dispose(); } } } //must be called under a lock bool CreateOrRetrieveChannel(out ServerUdpDuplexChannel channel) { bool channelCreated = false; channel = this.channelInstance; if (channel == null) { channelCreated = true; UdpSocket[] sendSockets = this.listenSockets.ToArray(); channel = new ServerUdpDuplexChannel(this, sendSockets, new EndpointAddress(this.listenUri), this.listenUri, this.isMulticast); this.channelInstance = channel; channel.Closed += this.onChannelClosed; } return channelCreated; } bool Dispatch(Message message, Action onMessageDequeuedCallback) { ServerUdpDuplexChannel channel; bool channelCreated; lock (this.ThisLock) { if (this.State != CommunicationState.Opened) { Fx.Assert(this.State > CommunicationState.Opened, "DispatchMessage called when object is not fully opened. This would indicate that the receive loop started before transitioning to CommunicationState.Opened, which should not happen."); //Shutting down - the message will get closed by the caller (IUdpReceiveHandler.OnMessageReceivedCallback) return false; } channelCreated = CreateOrRetrieveChannel(out channel); } if (channelCreated) { this.channelQueue.EnqueueAndDispatch(channel, null, false); } return channel.EnqueueMessage(message, onMessageDequeuedCallback); } //Tries to enqueue this async exception onto the channel instance if possible, //puts it onto the local exception queue otherwise. void HandleReceiveException(Exception ex) { UdpDuplexChannel channel = this.channelInstance; if (channel != null) { channel.HandleReceiveException(ex); } else { if (ServerUdpDuplexChannel.CanIgnoreException(ex)) { FxTrace.Exception.AsWarning(ex); } else { this.channelQueue.EnqueueAndDispatch(UdpUtility.WrapAsyncException(ex), null, false); } } } void InitExplicitUri(Uri listenUriBaseAddress, string relativeAddress) { if (listenUriBaseAddress.IsDefaultPort || listenUriBaseAddress.Port == 0) { throw FxTrace.Exception.ArgumentOutOfRange("context.ListenUriBaseAddress", listenUriBaseAddress, SR.ExplicitListenUriModeRequiresPort); } this.listenUri = UdpUtility.AppendRelativePath(listenUriBaseAddress, relativeAddress); } void InitSockets(bool updateListenPort) { bool ipV4; bool ipV6; UdpUtility.CheckSocketSupport(out ipV4, out ipV6); Fx.Assert(this.listenSockets == null, "listen sockets should only be initialized once"); this.listenSockets = new List (); int port = (this.listenUri.IsDefaultPort ? 0 : this.listenUri.Port); if (this.listenUri.HostNameType == UriHostNameType.IPv6 || this.listenUri.HostNameType == UriHostNameType.IPv4) { IPAddress address = IPAddress.Parse(this.listenUri.DnsSafeHost); if (UdpUtility.IsMulticastAddress(address)) { if (this.settings.EnableMulticast == false) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.UdpMulticastNotEnabled(this.listenUri))); } this.isMulticast = true; NetworkInterface[] adapters = UdpUtility.GetMulticastInterfaces(settings.MulticastInterfaceId); //if listening on a specific adapter, don't disable multicast loopback on that adapter. bool allowMulticastLoopback = !string.IsNullOrEmpty(this.settings.MulticastInterfaceId); for (int i = 0; i < adapters.Length; i++) { if (adapters[i].OperationalStatus == OperationalStatus.Up) { IPInterfaceProperties properties = adapters[i].GetIPProperties(); bool isLoopbackAdapter = adapters[i].NetworkInterfaceType == NetworkInterfaceType.Loopback; if (isLoopbackAdapter) { listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, UdpUtility.GetLoopbackInterfaceIndex(adapters[i], address.AddressFamily == AddressFamily.InterNetwork), allowMulticastLoopback, isLoopbackAdapter)); } else if (this.listenUri.HostNameType == UriHostNameType.IPv6) { if (adapters[i].Supports(NetworkInterfaceComponent.IPv6)) { IPv6InterfaceProperties v6Properties = properties.GetIPv6Properties(); if (v6Properties != null) { listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, v6Properties.Index, allowMulticastLoopback, isLoopbackAdapter)); } } } else { if (adapters[i].Supports(NetworkInterfaceComponent.IPv4)) { IPv4InterfaceProperties v4Properties = properties.GetIPv4Properties(); if (v4Properties != null) { listenSockets.Add(UdpUtility.CreateListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, v4Properties.Index, allowMulticastLoopback, isLoopbackAdapter)); } } } } } if (listenSockets.Count == 0) { throw FxTrace.Exception.AsError(new ArgumentException(SR.UdpFailedToFindMulticastAdapter(this.listenUri))); } } else { //unicast - only sends on the default adapter... this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } } else { IPAddress v4Address = IPAddress.Any; IPAddress v6Address = IPAddress.IPv6Any; if (ipV4 && ipV6) { if (port == 0) { //port 0 is only allowed when ListenUriMode == ListenUriMode.Unique UdpSocket ipv4Socket, ipv6Socket; port = UdpUtility.CreateListenSocketsOnUniquePort(v4Address, v6Address, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive, out ipv4Socket, out ipv6Socket); this.listenSockets.Add(ipv4Socket); this.listenSockets.Add(ipv6Socket); } else { this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v4Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v6Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } } else if (ipV4) { this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v4Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } else if (ipV6) { this.listenSockets.Add(UdpUtility.CreateUnicastListenSocket(v6Address, ref port, this.settings.SocketReceiveBufferSize, this.settings.TimeToLive)); } } if (updateListenPort && port != this.listenUri.Port) { UriBuilder uriBuilder = new UriBuilder(this.listenUri); uriBuilder.Port = port; this.listenUri = uriBuilder.Uri; } } void InitUniqueUri(Uri listenUriBaseAddress, string relativeAddress) { Fx.Assert(listenUriBaseAddress != null, "listenUriBaseAddress parameter should have been verified before now"); listenUriBaseAddress = UdpUtility.AppendRelativePath(listenUriBaseAddress, relativeAddress); this.listenUri = UdpUtility.AppendRelativePath(listenUriBaseAddress, Guid.NewGuid().ToString()); } void InitUri(BindingContext context) { Uri listenUriBase = context.ListenUriBaseAddress; if (context.ListenUriMode == ListenUriMode.Unique && listenUriBase == null) { UriBuilder uriBuilder = new UriBuilder(this.Scheme, DnsCache.MachineName); uriBuilder.Path = Guid.NewGuid().ToString(); listenUriBase = uriBuilder.Uri; context.ListenUriBaseAddress = listenUriBase; } else { if (listenUriBase == null) { throw FxTrace.Exception.ArgumentNull("context.ListenUriBaseAddress"); } if (!listenUriBase.IsAbsoluteUri) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.RelativeUriNotAllowed(listenUriBase)); } if (context.ListenUriMode == ListenUriMode.Unique && !listenUriBase.IsDefaultPort) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.DefaultPortRequiredForListenUriModeUnique(listenUriBase)); } if (listenUriBase.Scheme.Equals(this.Scheme, StringComparison.OrdinalIgnoreCase) == false) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.UriSchemeNotSupported(listenUriBase.Scheme)); } if (!UdpUtility.IsSupportedHostNameType(listenUriBase.HostNameType)) { throw FxTrace.Exception.Argument("context.ListenUriBaseAddress", SR.UnsupportedUriHostNameType(listenUriBase.Host, listenUriBase.HostNameType)); } } switch (context.ListenUriMode) { case ListenUriMode.Explicit: InitExplicitUri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); break; case ListenUriMode.Unique: InitUniqueUri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); break; default: Fx.AssertAndThrow("Unhandled ListenUriMode encountered: " + context.ListenUriMode); break; } } void OnChannelClosed(object sender, EventArgs args) { ServerUdpDuplexChannel closingChannel = (ServerUdpDuplexChannel)sender; closingChannel.Closed -= this.onChannelClosed; lock (ThisLock) { //set to null within a lock because other code //assumes that the instance will not suddenly become null //if it already holds the lock. this.channelInstance = null; } } sealed class ServerUdpDuplexChannel : UdpDuplexChannel { //the listener's buffer manager is used, but the channel won't clear it unless //UdpChannelListener.OnClosing successfully transfers ownership to the channel instance. public ServerUdpDuplexChannel(UdpChannelListener listener, UdpSocket[] sendSockets, EndpointAddress localAddress, Uri via, bool isMulticast) : base(listener, listener.MessageEncoderFactory.Encoder, listener.bufferManager, sendSockets, listener.settings.RetransmissionSettings, listener.settings.MaxPendingMessageCount, localAddress, via, isMulticast, (int)listener.settings.MaxReceivedMessageSize) { } protected override bool IgnoreSerializationException { get { return true; } } //there are some errors on the server side that we should just ignore because the server will not need //to change its behavior if it sees the exception. These errors are not ignored on the client //because it may need to adjust settings (e.g. TTL, send smaller messages, double check server address for correctness) internal static bool CanIgnoreException(Exception ex) { SocketError error; if (UdpUtility.TryGetSocketError(ex, out error)) { switch (error) { case SocketError.ConnectionReset: //"ICMP Destination Unreachable" error - client closed the socket case SocketError.NetworkReset: //ICMP: Time Exceeded (TTL expired) case SocketError.MessageSize: //client sent a message larger than the max message size allowed. return true; } } return false; } internal override void HandleReceiveException(Exception ex) { if (CanIgnoreException(ex)) { FxTrace.Exception.AsWarning(ex); } else { //base implementation will wrap the exception and enqueue it. base.HandleReceiveException(ex); } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- BindStream.cs
- SaveWorkflowAsyncResult.cs
- Events.cs
- TrackingMemoryStreamFactory.cs
- AsymmetricKeyExchangeDeformatter.cs
- ArrowControl.xaml.cs
- MSHTMLHostUtil.cs
- ExpressionNode.cs
- OdbcInfoMessageEvent.cs
- XmlHierarchyData.cs
- CompiledRegexRunnerFactory.cs
- HtmlInputHidden.cs
- RightsManagementEncryptionTransform.cs
- UserControl.cs
- GCHandleCookieTable.cs
- CoTaskMemSafeHandle.cs
- AudioStateChangedEventArgs.cs
- AppSettingsExpressionBuilder.cs
- KeyedQueue.cs
- ReadOnlyDictionary.cs
- ColumnMapVisitor.cs
- HuffModule.cs
- MasterPageBuildProvider.cs
- StorageAssociationSetMapping.cs
- DataTransferEventArgs.cs
- FileDialogCustomPlacesCollection.cs
- Emitter.cs
- SecondaryViewProvider.cs
- CodeSubDirectoriesCollection.cs
- Soap.cs
- GridViewSortEventArgs.cs
- EventSetter.cs
- OdbcParameterCollection.cs
- BitmapEffectGeneralTransform.cs
- WpfKnownTypeInvoker.cs
- WebPartConnectionsConnectVerb.cs
- XmlDataLoader.cs
- TypeContext.cs
- DurableOperationAttribute.cs
- OpacityConverter.cs
- _DigestClient.cs
- OperationAbortedException.cs
- XhtmlTextWriter.cs
- _NegotiateClient.cs
- MemberPath.cs
- DoubleLinkList.cs
- GenericParameterDataContract.cs
- HttpProtocolImporter.cs
- safelinkcollection.cs
- XmlSchemaRedefine.cs
- DbParameterCollectionHelper.cs
- ParsedAttributeCollection.cs
- NamedPipeTransportManager.cs
- XmlSchemaAll.cs
- formatter.cs
- OrderedDictionary.cs
- MimeTypePropertyAttribute.cs
- DaylightTime.cs
- DataGridViewRowHeightInfoNeededEventArgs.cs
- WebFormDesignerActionService.cs
- DesignerTransactionCloseEvent.cs
- ConversionContext.cs
- ACL.cs
- DependencyObjectPropertyDescriptor.cs
- CompensationHandlingFilter.cs
- InheritanceContextChangedEventManager.cs
- WizardStepBase.cs
- UrlMapping.cs
- ToolTipAutomationPeer.cs
- Triangle.cs
- SignatureDescription.cs
- SwitchElementsCollection.cs
- DataRelation.cs
- Vector3DCollectionConverter.cs
- TextEncodedRawTextWriter.cs
- Int64AnimationBase.cs
- FilteredSchemaElementLookUpTable.cs
- StringTraceRecord.cs
- DesignerRegionMouseEventArgs.cs
- GridViewDeleteEventArgs.cs
- OverloadGroupAttribute.cs
- WebPartMenu.cs
- DataGridViewToolTip.cs
- HeaderLabel.cs
- serverconfig.cs
- ProfileGroupSettings.cs
- TranslateTransform3D.cs
- BoundField.cs
- RelatedPropertyManager.cs
- SchemaImporterExtensionsSection.cs
- RewritingPass.cs
- DateBoldEvent.cs
- DataTableCollection.cs
- XmlUrlEditor.cs
- DataContractSerializerFaultFormatter.cs
- FlowDocumentScrollViewerAutomationPeer.cs
- CheckoutException.cs
- Error.cs
- PermissionRequestEvidence.cs
- WebPartMenuStyle.cs