Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Dispatcher / ChannelHandler.cs / 1 / ChannelHandler.cs
//------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Dispatcher { using System; using System.Diagnostics; using System.ServiceModel; using System.ServiceModel.Description; using System.ServiceModel.Diagnostics; using System.Globalization; using System.Runtime.CompilerServices; using System.ServiceModel.Channels; using System.Threading; using System.Transactions; using System.Xml; using System.Security; using SessionIdleManager = System.ServiceModel.Channels.ServiceChannel.SessionIdleManager; class ChannelHandler { public static readonly TimeSpan CloseAfterFaultTimeout = TimeSpan.FromSeconds(10); RequestInfo requestInfo; readonly IChannelBinder binder; ServiceChannel channel; bool doneReceiving; readonly DuplexChannelBinder duplexBinder; bool hasRegisterBeenCalled; bool hasSession; readonly ServiceHostBase host; readonly bool incrementedActivityCountInConstructor; readonly bool isCallback; bool isPumpAcquired; bool isChannelTerminated; bool isConcurrent; bool isManualAddressing; readonly ListenerHandler listener; MessageVersion messageVersion; static AsyncCallback onAsyncReceiveComplete = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(ChannelHandler.OnAsyncReceiveComplete)); static WaitCallback onContinueAsyncReceive = new WaitCallback(ChannelHandler.OnContinueAsyncReceive); static WaitCallback onStartSyncMessagePump = new WaitCallback(ChannelHandler.OnStartSyncMessagePump); static WaitCallback onStartAsyncMessagePump = new WaitCallback(ChannelHandler.OnStartAsyncMessagePump); static WaitCallback onStartSingleTransactedBatch = new WaitCallback(ChannelHandler.OnStartSingleTransactedBatch); static WaitCallback openAndEnsurePump = new WaitCallback(ChannelHandler.OpenAndEnsurePump); ErrorHandlingReceiver receiver; bool receiveSynchronously; bool receiveWithTransaction; static TimeSpan transactionalTimeout { get { return TimeSpan.FromSeconds(5); } } RequestContext replied; RequestContext requestWaitingForThrottle; WrappedTransaction acceptTransaction; readonly ServiceThrottle throttle; readonly bool wasChannelThrottled; ServiceThrottle instanceContextThrottle = null; SharedTransactedBatchContext sharedTransactedBatchContext = null; TransactedBatchContext transactedBatchContext = null; bool isMainTransactedBatchHandler; readonly SessionIdleManager idleManager; internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceChannel channel) { ClientRuntime clientRuntime = channel.ClientRuntime; this.messageVersion = messageVersion; this.isManualAddressing = clientRuntime.ManualAddressing; this.binder = binder; this.channel = channel; this.isConcurrent = true; this.duplexBinder = binder as DuplexChannelBinder; this.hasSession = binder.HasSession; this.isCallback = true; DispatchRuntime dispatchRuntime = clientRuntime.DispatchRuntime; if (dispatchRuntime == null) { this.receiver = new ErrorHandlingReceiver(binder, null); } else { this.receiver = new ErrorHandlingReceiver(binder, dispatchRuntime.ChannelDispatcher); } this.requestInfo = new RequestInfo(this); } internal ChannelHandler(MessageVersion messageVersion, IChannelBinder binder, ServiceThrottle throttle, ListenerHandler listener, bool wasChannelThrottled, WrappedTransaction acceptTransaction, SessionIdleManager idleManager) { ChannelDispatcher channelDispatcher = listener.ChannelDispatcher; this.messageVersion = messageVersion; this.isManualAddressing = channelDispatcher.ManualAddressing; this.binder = binder; this.throttle = throttle; this.listener = listener; this.wasChannelThrottled = wasChannelThrottled; this.host = listener.Host; this.receiveSynchronously = channelDispatcher.ReceiveSynchronously; this.duplexBinder = binder as DuplexChannelBinder; this.hasSession = binder.HasSession; this.isConcurrent = ConcurrencyBehavior.IsConcurrent(channelDispatcher, this.hasSession); this.receiver = new ErrorHandlingReceiver(binder, channelDispatcher); this.idleManager = idleManager; DiagnosticUtility.DebugAssert((this.idleManager != null) == (this.binder.HasSession && this.listener.ChannelDispatcher.DefaultCommunicationTimeouts.ReceiveTimeout != TimeSpan.MaxValue), "idle manager is present only when there is a session with a finite receive timeout"); if (channelDispatcher.IsTransactedReceive) { receiveSynchronously = true; receiveWithTransaction = true; if (channelDispatcher.MaxTransactedBatchSize > 0) { int maxConcurrentBatches = 1; if (null != throttle && throttle.MaxConcurrentCalls > 1) { maxConcurrentBatches = throttle.MaxConcurrentCalls; foreach (EndpointDispatcher endpointDispatcher in channelDispatcher.Endpoints) { if (ConcurrencyMode.Multiple != endpointDispatcher.DispatchRuntime.ConcurrencyMode) { maxConcurrentBatches = 1; break; } } } this.sharedTransactedBatchContext = new SharedTransactedBatchContext(this, channelDispatcher, maxConcurrentBatches); this.isMainTransactedBatchHandler = true; this.throttle = null; } } this.acceptTransaction = acceptTransaction; this.requestInfo = new RequestInfo(this); if (!this.hasSession) { if (this.listener.State == CommunicationState.Opened) { this.listener.ChannelDispatcher.Channels.IncrementActivityCount(); this.incrementedActivityCountInConstructor = true; } } } internal ChannelHandler(ChannelHandler handler, TransactedBatchContext context) { this.messageVersion = handler.messageVersion; this.isManualAddressing = handler.isManualAddressing; this.binder = handler.binder; this.throttle = null; this.listener = handler.listener; this.wasChannelThrottled = handler.wasChannelThrottled; this.host = handler.host; this.receiveSynchronously = true; this.receiveWithTransaction = true; this.duplexBinder = handler.duplexBinder; this.hasSession = handler.hasSession; this.isConcurrent = handler.isConcurrent; this.receiver = handler.receiver; this.sharedTransactedBatchContext = context.Shared; this.transactedBatchContext = context; this.isMainTransactedBatchHandler = false; this.requestInfo = new RequestInfo(this); } internal IChannelBinder Binder { get { return this.binder; } } internal ServiceChannel Channel { get { return this.channel; } } internal bool HasRegisterBeenCalled { get { return this.hasRegisterBeenCalled; } } bool IsOpen { get { return this.binder.Channel.State == CommunicationState.Opened; } } EndpointAddress LocalAddress { get { if (this.binder != null) { IInputChannel input = this.binder.Channel as IInputChannel; if (input != null) { return input.LocalAddress; } IReplyChannel reply = this.binder.Channel as IReplyChannel; if (reply != null) { return reply.LocalAddress; } } return null; } } internal static void Register(ChannelHandler handler) { handler.Register(); } void Register() { this.hasRegisterBeenCalled = true; if (this.binder.Channel.State == CommunicationState.Created) { IOThreadScheduler.ScheduleCallback(openAndEnsurePump, this); } else { this.EnsurePump(); } } internal InstanceContext InstanceContext { get { return (this.channel != null) ? this.channel.InstanceContext : null; } } internal ServiceThrottle InstanceContextServiceThrottle { get { return this.instanceContextThrottle; } set { this.instanceContextThrottle = value; } } object ThisLock { get { return this; } } void AsyncMessagePump() { IAsyncResult result = this.BeginTryReceive(); if ((result != null) && result.CompletedSynchronously) { this.AsyncMessagePump(result); } } void AsyncMessagePump(IAsyncResult result) { for (; ; ) { RequestContext request; while (!this.EndTryReceive(result, out request)) { result = this.BeginTryReceive(); if ((result == null) || !result.CompletedSynchronously) { return; } } if (!HandleRequest(request, null)) { break; } if (!TryAcquirePump()) { break; } result = this.BeginTryReceive(); if (result == null || !result.CompletedSynchronously) { break; } } } IAsyncResult BeginTryReceive() { this.requestInfo.Cleanup(); return this.receiver.BeginTryReceive(TimeSpan.MaxValue, ChannelHandler.onAsyncReceiveComplete, this); } bool DispatchAndReleasePump(RequestContext request, bool cleanThread, OperationContext currentOperationContext) { ServiceChannel channel = this.requestInfo.Channel; EndpointDispatcher endpoint = this.requestInfo.Endpoint; bool releasedPump = false; try { Message message = request.RequestMessage; DispatchRuntime dispatchBehavior = this.requestInfo.DispatchRuntime; if (channel == null || dispatchBehavior == null) { DiagnosticUtility.DebugAssert("System.ServiceModel.Dispatcher.ChannelHandler.Dispatch(): (channel == null || dispatchBehavior == null)"); return true; } DispatchOperationRuntime operation = dispatchBehavior.GetOperation(ref message); if (operation == null) { DiagnosticUtility.DebugAssert("ChannelHandler.Dispatch (operation == null)"); throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(String.Format(CultureInfo.InvariantCulture, "No DispatchOperationRuntime found to process message."))); } if (MessageLogger.LoggingEnabled) { MessageLogger.LogMessage(ref message, (operation.IsOneWay ? MessageLoggingSource.ServiceLevelReceiveDatagram : MessageLoggingSource.ServiceLevelReceiveRequest) | MessageLoggingSource.LastChance); } if (operation.IsTerminating && this.hasSession) { this.isChannelTerminated = true; } bool hasOperationContextBeenSet; if (currentOperationContext != null) { hasOperationContextBeenSet = true; currentOperationContext.ReInit(request, message, channel); } else { hasOperationContextBeenSet = false; currentOperationContext = new OperationContext(request, message, channel, this.host); } if (currentOperationContext.EndpointDispatcher == null && this.listener != null) { currentOperationContext.EndpointDispatcher = endpoint; } MessageRpc rpc = new MessageRpc(request, message, operation, channel, this.host, this, cleanThread, currentOperationContext, this.requestInfo.ExistingInstanceContext); rpc.TransactedBatchContext = this.transactedBatchContext; // passing responsibility for call throttle to MessageRpc // (MessageRpc implicitly owns this throttle once it's created) this.requestInfo.ChannelHandlerOwnsCallThrottle = false; // explicitly passing responsibility for instance throttle to MessageRpc rpc.MessageRpcOwnsInstanceContextThrottle = this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle; this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = false; // These need to happen before Dispatch but after accessing any ChannelHandler // state, because we go multi-threaded after this until we reacquire pump mutex. this.ReleasePump(); releasedPump = true; return operation.Parent.Dispatch(ref rpc, hasOperationContextBeenSet); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } return this.HandleError(e, request, channel); } finally { if (!releasedPump) { this.ReleasePump(); } } } internal void DispatchDone() { if (this.throttle != null) this.throttle.DeactivateCall(); } bool EndTryReceive(IAsyncResult result, out RequestContext requestContext) { bool valid = this.receiver.EndTryReceive(result, out requestContext); if (valid) { this.HandleReceiveComplete(requestContext); } return valid; } void EnsureChannelAndEndpoint(RequestContext request) { this.requestInfo.Channel = this.channel; if (this.requestInfo.Channel == null) { bool addressMatched; if (this.hasSession) { this.requestInfo.Channel = this.GetSessionChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched); } else { this.requestInfo.Channel = this.GetDatagramChannel(request.RequestMessage, out this.requestInfo.Endpoint, out addressMatched); } if (this.requestInfo.Channel == null) { this.host.RaiseUnknownMessageReceived(request.RequestMessage); if (addressMatched) { this.ReplyContractFilterDidNotMatch(request); } else { this.ReplyAddressFilterDidNotMatch(request); } } } else { this.requestInfo.Endpoint = this.requestInfo.Channel.EndpointDispatcher; //For sessionful contracts, the InstanceContext throttle is not copied over to the channel //as we create the channel before acquiring the lock if (this.InstanceContextServiceThrottle != null && this.requestInfo.Channel.InstanceContextServiceThrottle == null) { this.requestInfo.Channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle; } } this.requestInfo.EndpointLookupDone = true; if (this.requestInfo.Channel == null) { // SFx drops a message here TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint); request.Close(); return; } if (this.requestInfo.Channel.HasSession || this.isCallback) { this.requestInfo.DispatchRuntime = this.requestInfo.Channel.DispatchRuntime; } else { this.requestInfo.DispatchRuntime = this.requestInfo.Endpoint.DispatchRuntime; } } void EnsurePump() { if (null == this.sharedTransactedBatchContext || this.isMainTransactedBatchHandler) { if (TryAcquirePump()) { if (this.receiveSynchronously) { IOThreadScheduler.ScheduleCallback(ChannelHandler.onStartSyncMessagePump, this); } else { if (Thread.CurrentThread.IsThreadPoolThread) { IAsyncResult result = this.BeginTryReceive(); if ((result != null) && result.CompletedSynchronously) { IOThreadScheduler.ScheduleCallback(ChannelHandler.onContinueAsyncReceive, result); } } else { // Since this is not a threadpool thread, we don't know if this thread will exit // while the IO is still pending (which would cancel the IO), so we have to get // over to a threadpool thread which we know will not exit while there is pending IO. IOThreadScheduler.ScheduleCallback(ChannelHandler.onStartAsyncMessagePump, this); } } } } else { IOThreadScheduler.ScheduleCallback(ChannelHandler.onStartSingleTransactedBatch, this); } } ServiceChannel GetDatagramChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched) { addressMatched = false; endpoint = this.GetEndpointDispatcher(message, out addressMatched); if (endpoint == null) { return null; } if (endpoint.DatagramChannel == null) { lock (this.listener.ThisLock) { if (endpoint.DatagramChannel == null) { endpoint.DatagramChannel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager); this.InitializeServiceChannel(endpoint.DatagramChannel); } } } return endpoint.DatagramChannel; } EndpointDispatcher GetEndpointDispatcher(Message message, out bool addressMatched) { return this.listener.Endpoints.Lookup(message, out addressMatched); } ServiceChannel GetSessionChannel(Message message, out EndpointDispatcher endpoint, out bool addressMatched) { addressMatched = false; if (this.channel == null) { lock (this.ThisLock) { if (this.channel == null) { endpoint = this.GetEndpointDispatcher(message, out addressMatched); if (endpoint != null) { this.channel = new ServiceChannel(this.binder, endpoint, this.listener.ChannelDispatcher, this.idleManager); this.InitializeServiceChannel(this.channel); } } } } if (this.channel == null) { endpoint = null; } else { endpoint = this.channel.EndpointDispatcher; } return this.channel; } void InitializeServiceChannel(ServiceChannel channel) { if (this.wasChannelThrottled) { channel.ServiceThrottle = this.throttle; } if (this.InstanceContextServiceThrottle != null) { channel.InstanceContextServiceThrottle = this.InstanceContextServiceThrottle; } ClientRuntime clientRuntime = channel.ClientRuntime; if (clientRuntime != null) { Type contractType = clientRuntime.ContractClientType; Type callbackType = clientRuntime.CallbackClientType; if (contractType != null) { channel.Proxy = ServiceChannelFactory.CreateProxy(contractType, callbackType, MessageDirection.Output, channel); } } if (this.listener != null) { this.listener.ChannelDispatcher.InitializeChannel((IClientChannel)channel.Proxy); } ((IChannel)channel).Open(); } void ProvideFault(Exception e, ref ErrorHandlerFaultInfo faultInfo) { if (this.listener != null) { this.listener.ChannelDispatcher.ProvideFault(e, this.requestInfo.Channel==null ? this.binder.Channel.GetProperty() : this.requestInfo.Channel.GetProperty (), ref faultInfo); } else if (this.channel != null) { DispatchRuntime dispatchBehavior = this.channel.ClientRuntime.CallbackDispatchRuntime; dispatchBehavior.ChannelDispatcher.ProvideFault(e, this.channel.GetProperty (), ref faultInfo); } } internal bool HandleError(Exception e) { ErrorHandlerFaultInfo dummy = new ErrorHandlerFaultInfo(); return this.HandleError(e, ref dummy); } bool HandleError(Exception e, ref ErrorHandlerFaultInfo faultInfo) { if (!(e != null)) { DiagnosticUtility.DebugAssert(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown))); throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.GetString(SR.SFxNonExceptionThrown)))); } if (this.listener != null) { return listener.ChannelDispatcher.HandleError(e, ref faultInfo); } else if (this.channel != null) { return this.channel.ClientRuntime.CallbackDispatchRuntime.ChannelDispatcher.HandleError(e, ref faultInfo); } else { return false; } } bool HandleError(Exception e, RequestContext request, ServiceChannel channel) { ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(this.messageVersion.Addressing.DefaultFaultAction); bool replied; ProvideFaultAndReplyFailure(request, e, ref faultInfo, out replied); if (replied) { try { request.Close(); } catch (Exception e1) { if (DiagnosticUtility.IsFatal(e1)) { throw; } this.HandleError(e1); } } else { request.Abort(); } if (!this.HandleError(e, ref faultInfo) && this.hasSession) { if (channel != null) { if (replied) { TimeoutHelper timeoutHelper = new TimeoutHelper(CloseAfterFaultTimeout); try { channel.Close(timeoutHelper.RemainingTime()); } catch (Exception e2) { if (DiagnosticUtility.IsFatal(e2)) { throw; } this.HandleError(e2); } try { this.binder.CloseAfterFault(timeoutHelper.RemainingTime()); } catch (Exception e3) { if (DiagnosticUtility.IsFatal(e3)) { throw; } this.HandleError(e3); } } else { channel.Abort(); this.binder.Abort(); } } else { if (replied) { try { this.binder.CloseAfterFault(CloseAfterFaultTimeout); } catch (Exception e4) { if (DiagnosticUtility.IsFatal(e4)) { throw; } this.HandleError(e4); } } else { this.binder.Abort(); } } } return true; } void HandleReceiveComplete(RequestContext context) { if ((context == null) && this.incrementedActivityCountInConstructor) { this.listener.ChannelDispatcher.Channels.DecrementActivityCount(); } if (this.channel != null) { this.channel.HandleReceiveComplete(context); } else { if (context == null && this.hasSession) { bool close; lock (this.ThisLock) { close = !this.doneReceiving; this.doneReceiving = true; } if (close) { this.receiver.Close(); if (this.idleManager != null) { this.idleManager.CancelTimer(); } ServiceThrottle throttle = this.throttle; if (throttle != null) throttle.DeactivateChannel(); } } } } bool HandleRequest(RequestContext request, OperationContext currentOperationContext) { if (request == null) { // channel EOF, stop receiving return false; } ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? TraceUtility.ExtractActivity(request.RequestMessage) : null; using (ServiceModelActivity.BoundOperation(activity)) { if (this.HandleRequestAsReply(request)) { this.ReleasePump(); return true; } if (this.isChannelTerminated) { this.ReleasePump(); this.ReplyChannelTerminated(request); return true; } if (!this.TryAcquireCallThrottle(request)) { // this.ThrottleAcquiredForCall will be called to continue return false; } // NOTE: from here on down, ensure that this code is the same as ThrottleAcquiredForCall (see 55460) if (this.requestInfo.ChannelHandlerOwnsCallThrottle) { DiagnosticUtility.DebugAssert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsCallThrottle"); } this.requestInfo.ChannelHandlerOwnsCallThrottle = true; this.TryRetrievingInstanceContext(request); if (this.requestInfo.Channel == null || this.requestInfo.DispatchRuntime == null) { //TryRetrievingInstanceContext would already have replied/close the RequestContext and also released the pump. return true; } this.requestInfo.Channel.CompletedIOOperation(); //Only acquire InstanceContext throttle if one doesnt already exist. if (!this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null))) { // this.ThrottleAcquired will be called to continue return false; } if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle) { DiagnosticUtility.DebugAssert("ChannelHandler.HandleRequest: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle"); } this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null); if (!this.DispatchAndReleasePump(request, true, currentOperationContext)) { // this.DispatchDone will be called to continue return false; } } return true; } bool HandleRequestAsReply(RequestContext request) { if (this.duplexBinder != null) { if (this.duplexBinder.HandleRequestAsReply(request.RequestMessage)) { return true; } } return false; } static void OnStartAsyncMessagePump(object state) { ((ChannelHandler)state).AsyncMessagePump(); } static void OnStartSyncMessagePump(object state) { ChannelHandler handler = state as ChannelHandler; if (handler.receiveWithTransaction) handler.SyncTransactionalMessagePump(); else handler.SyncMessagePump(); } static void OnStartSingleTransactedBatch(object state) { ChannelHandler handler = state as ChannelHandler; handler.TransactedBatchLoop(); } static void OnAsyncReceiveComplete(IAsyncResult result) { if (!result.CompletedSynchronously) { ((ChannelHandler)result.AsyncState).AsyncMessagePump(result); } } static void OnContinueAsyncReceive(object state) { IAsyncResult result = (IAsyncResult)state; ((ChannelHandler)result.AsyncState).AsyncMessagePump(result); } static void OpenAndEnsurePump(object state) { ((ChannelHandler)state).OpenAndEnsurePump(); } void OpenAndEnsurePump() { Exception exception = null; try { this.binder.Channel.Open(); } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } exception = e; } if (exception != null) { if (DiagnosticUtility.ShouldTraceWarning) { TraceUtility.TraceEvent(System.Diagnostics.TraceEventType.Warning, TraceCode.FailedToOpenIncomingChannel, SR.GetString(SR.TraceCodeFailedToOpenIncomingChannel)); } SessionIdleManager idleManager = this.idleManager; if (idleManager != null) { idleManager.CancelTimer(); } if ((this.throttle != null) && this.hasSession) { this.throttle.DeactivateChannel(); } this.HandleError(exception); if (this.incrementedActivityCountInConstructor) { this.listener.ChannelDispatcher.Channels.DecrementActivityCount(); } } else { this.EnsurePump(); } } bool TryReceive(TimeSpan timeout, out RequestContext requestContext) { bool valid = this.receiver.TryReceive(timeout, out requestContext); if (valid) { this.HandleReceiveComplete(requestContext); } return valid; } void ReplyAddressFilterDidNotMatch(RequestContext request) { FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.DestinationUnreachable, this.messageVersion.Addressing.Namespace); string reason = SR.GetString(SR.SFxNoEndpointMatchingAddress, request.RequestMessage.Headers.To); ReplyFailure(request, code, reason); } void ReplyContractFilterDidNotMatch(RequestContext request) { // By default, the contract filter is just a filter over the set of initiating actions in // the contract, so we do error messages accordingly AddressingVersion addressingVersion = this.messageVersion.Addressing; if (addressingVersion != AddressingVersion.None && request.RequestMessage.Headers.Action == null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError( new MessageHeaderException( SR.GetString(SR.SFxMissingActionHeader, addressingVersion.Namespace), AddressingStrings.Action, addressingVersion.Namespace)); } else { // some of this code is duplicated in DispatchRuntime.UnhandledActionInvoker // ideally both places would use FaultConverter and ActionNotSupportedException FaultCode code = FaultCode.CreateSenderFaultCode(AddressingStrings.ActionNotSupported, this.messageVersion.Addressing.Namespace); string reason = SR.GetString(SR.SFxNoEndpointMatchingContract, request.RequestMessage.Headers.Action); ReplyFailure(request, code, reason, this.messageVersion.Addressing.FaultAction); } } void ReplyChannelTerminated(RequestContext request) { FaultCode code = FaultCode.CreateSenderFaultCode(FaultCodeConstants.Codes.SessionTerminated, FaultCodeConstants.Namespaces.NetDispatch); string reason = SR.GetString(SR.SFxChannelTerminated0); string action = FaultCodeConstants.Actions.NetDispatcher; Message fault = Message.CreateMessage(this.messageVersion, code, reason, action); ReplyFailure(request, fault, action, reason, code); } void ReplyFailure(RequestContext request, FaultCode code, string reason) { string action = this.messageVersion.Addressing.DefaultFaultAction; ReplyFailure(request, code, reason, action); } void ReplyFailure(RequestContext request, FaultCode code, string reason, string action) { Message fault = Message.CreateMessage(this.messageVersion, code, reason, action); ReplyFailure(request, fault, action, reason, code); } void ReplyFailure(RequestContext request, Message fault, string action, string reason, FaultCode code) { FaultException exception = new FaultException(reason, code); ErrorBehavior.ThrowAndCatch(exception); ErrorHandlerFaultInfo faultInfo = new ErrorHandlerFaultInfo(action); faultInfo.Fault = fault; bool replied; ProvideFaultAndReplyFailure(request, exception, ref faultInfo, out replied); this.HandleError(exception, ref faultInfo); } void ProvideFaultAndReplyFailure(RequestContext request, Exception exception, ref ErrorHandlerFaultInfo faultInfo, out bool replied) { replied = false; bool requestMessageIsFault = false; try { requestMessageIsFault = request.RequestMessage.IsFault; } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; // ignore it } bool enableFaults = false; if (this.listener != null) { enableFaults = this.listener.ChannelDispatcher.EnableFaults; } else if (this.channel != null && this.channel.IsClient) { enableFaults = this.channel.ClientRuntime.EnableFaults; } if ((!requestMessageIsFault) && enableFaults) { this.ProvideFault(exception, ref faultInfo); if (faultInfo.Fault != null) { Message reply = faultInfo.Fault; replied = this.TryReply(request, reply); try { reply.Close(); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.HandleError(e); } } } } // return whether we actually tried to send a reply bool Reply(RequestContext request, Message reply) { // Ensure we only reply once (we may hit the same error multiple times) if (this.replied == request) { return false; } this.replied = request; bool canSendReply = true; Message requestMessage = null; try { requestMessage = request.RequestMessage; } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; // ignore it } if (!object.ReferenceEquals(requestMessage, null)) { UniqueId requestID = null; try { requestID = requestMessage.Headers.MessageId; } catch (MessageHeaderException) { // ignore it - we don't need to correlate the reply if the MessageId header is bad } if (!object.ReferenceEquals(requestID, null) && !this.isManualAddressing) { System.ServiceModel.Channels.RequestReplyCorrelator.PrepareReply(reply, requestID); } if (!this.hasSession && !this.isManualAddressing) { try { canSendReply = System.ServiceModel.Channels.RequestReplyCorrelator.AddressReply(reply, requestMessage); } catch (MessageHeaderException) { // ignore it - we don't need to address the reply if the FaultTo header is bad } } } // ObjectDisposeException can happen // if the channel is closed in a different // thread. 99% this check will avoid false // exceptions. if (this.IsOpen && canSendReply) { request.Reply(reply); return true; } return false; } void ReleasePump() { if (this.isConcurrent) { this.ReleasePumpCore(); } } void ReleasePumpCore() { lock (this.ThisLock) { this.isPumpAcquired = false; } } void SyncMessagePump() { OperationContext existingOperationContext = OperationContext.Current; try { OperationContext currentOperationContext = new OperationContext(this.host); OperationContext.Current = currentOperationContext; for (; ; ) { RequestContext request; this.requestInfo.Cleanup(); while (!TryReceive(TimeSpan.MaxValue, out request)) { } if (!HandleRequest(request, currentOperationContext)) { break; } if (!TryAcquirePump()) { break; } currentOperationContext.Recycle(); } } finally { OperationContext.Current = existingOperationContext; } } [MethodImpl(MethodImplOptions.NoInlining)] void SyncTransactionalMessagePump() { for (; ; ) { bool completedSynchronously; if (null == sharedTransactedBatchContext) completedSynchronously = TransactedLoop(); else completedSynchronously = TransactedBatchLoop(); if (! completedSynchronously) return; } } bool TransactedLoop() { try { this.receiver.WaitForMessage(); } catch (Exception ex) { if (DiagnosticUtility.IsFatal(ex)) throw; if (!this.HandleError(ex)) throw; } RequestContext request; Transaction tx = CreateOrGetAttachedTransaction(); OperationContext existingOperationContext = OperationContext.Current; try { OperationContext currentOperationContext = new OperationContext(this.host); OperationContext.Current = currentOperationContext; for ( ; ;) { this.requestInfo.Cleanup(); bool received = TryTransactionalReceive(tx, out request); if (! received) { return IsOpen; } if (null == request) { return false; } TransactionMessageProperty.Set(tx, request.RequestMessage); if (!HandleRequest(request, currentOperationContext)) { return false; } if (!TryAcquirePump()) { return false; } tx = CreateOrGetAttachedTransaction(); currentOperationContext.Recycle(); } } finally { OperationContext.Current = existingOperationContext; } } bool TransactedBatchLoop() { if (null != this.transactedBatchContext) { if (this.transactedBatchContext.InDispatch) { this.transactedBatchContext.ForceRollback(); this.transactedBatchContext.InDispatch = false; } if (! this.transactedBatchContext.IsActive) { if (! this.isMainTransactedBatchHandler) return false; this.transactedBatchContext = null; } } if (null == this.transactedBatchContext) { try { this.receiver.WaitForMessage(); } catch (Exception ex) { if (DiagnosticUtility.IsFatal(ex)) throw; if (!this.HandleError(ex)) throw; } this.transactedBatchContext = this.sharedTransactedBatchContext.CreateTransactedBatchContext(); } OperationContext existingOperationContext = OperationContext.Current; try { OperationContext currentOperationContext = new OperationContext(this.host); OperationContext.Current = currentOperationContext; RequestContext request; while (this.transactedBatchContext.IsActive) { this.requestInfo.Cleanup(); bool valid = TryTransactionalReceive(this.transactedBatchContext.Transaction, out request); if (! valid) { if (this.IsOpen) { this.transactedBatchContext.ForceCommit(); return true; } else { this.transactedBatchContext.ForceRollback(); return false; } } if (null == request) { this.transactedBatchContext.ForceRollback(); return false; } TransactionMessageProperty.Set(this.transactedBatchContext.Transaction, request.RequestMessage); this.transactedBatchContext.InDispatch = true; if (! HandleRequest(request, currentOperationContext)) { return false; } if (this.transactedBatchContext.InDispatch) { this.transactedBatchContext.ForceRollback(); this.transactedBatchContext.InDispatch = false; return true; } if (!TryAcquirePump()) { DiagnosticUtility.DebugAssert("System.ServiceModel.Dispatcher.ChannelHandler.TransactedBatchLoop(): (TryAcquiredPump returned false)"); return false; } currentOperationContext.Recycle(); } } finally { OperationContext.Current = existingOperationContext; } return true; } Transaction CreateOrGetAttachedTransaction() { if (null != this.acceptTransaction) { lock(ThisLock) { if (null != this.acceptTransaction) { Transaction tx = this.acceptTransaction.Transaction; this.acceptTransaction = null; return tx; } } } if (null != this.InstanceContext && this.InstanceContext.HasTransaction) return InstanceContext.Transaction.Attached; else return TransactionBehavior.CreateTransaction( this.listener.ChannelDispatcher.TransactionIsolationLevel, TransactionBehavior.NormalizeTimeout(this.listener.ChannelDispatcher.TransactionTimeout)); } // calls receive on the channel; returns false if no message during the "short timeout" bool TryTransactionalReceive(Transaction tx, out RequestContext request) { request = null; bool received = false; try { using (TransactionScope scope = new TransactionScope(tx)) { if (null != this.sharedTransactedBatchContext) { lock (this.sharedTransactedBatchContext.ReceiveLock) { if (this.transactedBatchContext.AboutToExpire) return false; received = this.TryReceive(TimeSpan.Zero, out request); } } else { received = this.TryReceive(transactionalTimeout, out request); } scope.Complete(); } } catch (ObjectDisposedException ex) // thrown from the transaction { this.HandleError(ex); request = null; return false; } catch (TransactionException ex) { this.HandleError(ex); request = null; return false; } catch (Exception ex) { if (DiagnosticUtility.IsFatal(ex)) throw; if (!this.HandleError(ex)) throw; } return received; } // This callback always occurs async and always on a dirty thread internal void ThrottleAcquiredForCall() { RequestContext request = this.requestWaitingForThrottle; this.requestWaitingForThrottle = null; if (this.requestInfo.ChannelHandlerOwnsCallThrottle) { DiagnosticUtility.DebugAssert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsCallThrottle"); } this.requestInfo.ChannelHandlerOwnsCallThrottle = true; this.TryRetrievingInstanceContext(request); if (this.requestInfo.Channel == null || this.requestInfo.DispatchRuntime == null) { //Should reply/close request and also close the pump return; } this.requestInfo.Channel.CompletedIOOperation(); if (this.TryAcquireThrottle(request, (this.requestInfo.ExistingInstanceContext == null))) { if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle) { DiagnosticUtility.DebugAssert("ChannelHandler.ThrottleAcquiredForCall: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle"); } this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null); if (this.DispatchAndReleasePump(request, false, null)) { this.EnsurePump(); } } } void TryRetrievingInstanceContext(RequestContext request) { bool releasePump = true; try { if (!this.requestInfo.EndpointLookupDone) { this.EnsureChannelAndEndpoint(request); } if (this.requestInfo.Channel == null) { return; } if (this.requestInfo.DispatchRuntime != null) { IContextChannel transparentProxy = this.requestInfo.Channel.Proxy as IContextChannel; try { this.requestInfo.ExistingInstanceContext = this.requestInfo.DispatchRuntime.InstanceContextProvider.GetExistingInstanceContext(request.RequestMessage, transparentProxy); releasePump = false; } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } // Code that calls this method uses Channel==null to determine // if we should dispatch. If ICP throws, we should not dispatch. // When this class is refactored, we need to change this to be explicit // about this rather than setting Channel to signal failure. this.requestInfo.Channel = null; this.HandleError(e, request, channel); } } else { // This can happen if we are pumping for an async client, // and we receive a bogus reply. In that case, there is no // DispatchRuntime, because we are only expecting replies. // // One possible fix for this would be in DuplexChannelBinder // to drop all messages with a RelatesTo that do not match a // pending request. // // However, that would not fix: // (a) we could get a valid request message with a // RelatesTo that we should try to process. // (b) we could get a reply message that does not have // a RelatesTo. // // So we do the null check here. // // SFx drops a message here TraceUtility.TraceDroppedMessage(request.RequestMessage, this.requestInfo.Endpoint); request.Close(); } } catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } this.HandleError(e, request, channel); } finally { if (releasePump) { this.ReleasePump(); } } } // This callback always occurs async and always on a dirty thread internal void ThrottleAcquired() { RequestContext request = this.requestWaitingForThrottle; this.requestWaitingForThrottle = null; if (this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle) { DiagnosticUtility.DebugAssert("ChannelHandler.ThrottleAcquired: this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle"); } this.requestInfo.ChannelHandlerOwnsInstanceContextThrottle = (this.requestInfo.ExistingInstanceContext == null); if (this.DispatchAndReleasePump(request, false, null)) { this.EnsurePump(); } } bool TryAcquireThrottle(RequestContext request, bool acquireInstanceContextThrottle) { ServiceThrottle throttle = this.throttle; if ((throttle != null) && (throttle.IsActive)) { this.requestWaitingForThrottle = request; if (throttle.AcquireInstanceContextAndDynamic(this, acquireInstanceContextThrottle)) { this.requestWaitingForThrottle = null; return true; } else { return false; } } else { return true; } } bool TryAcquireCallThrottle(RequestContext request) { ServiceThrottle throttle = this.throttle; if ((throttle != null) && (throttle.IsActive)) { this.requestWaitingForThrottle = request; if (throttle.AcquireCall(this)) { this.requestWaitingForThrottle = null; return true; } else { return false; } } else { return true; } } bool TryAcquirePump() { if (this.isConcurrent) { return TryAcquirePumpCore(); } else { return true; } } bool TryAcquirePumpCore() { lock (this.ThisLock) { if (!this.isPumpAcquired) { this.isPumpAcquired = true; return true; } else { return false; } } } bool TryReply(RequestContext request, Message reply) { try { return this.Reply(request, reply); } #pragma warning suppress 56500 // covered by FxCOP catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) throw; this.HandleError(e); } return false; } struct RequestInfo { public EndpointDispatcher Endpoint; public InstanceContext ExistingInstanceContext; public ServiceChannel Channel; public bool EndpointLookupDone; public DispatchRuntime DispatchRuntime; public ChannelHandler ChannelHandler; public bool ChannelHandlerOwnsCallThrottle; // if true, we are responsible for call throttle public bool ChannelHandlerOwnsInstanceContextThrottle; // if true, we are responsible for instance/dynamic throttle public RequestInfo(ChannelHandler channelHandler) { this.Endpoint = null; this.ExistingInstanceContext = null; this.Channel = null; this.EndpointLookupDone = false; this.DispatchRuntime = null; this.ChannelHandler = channelHandler; this.ChannelHandlerOwnsCallThrottle = false; this.ChannelHandlerOwnsInstanceContextThrottle = false; } public void Cleanup() { this.Endpoint = null; this.ExistingInstanceContext = null; this.Channel = null; this.DispatchRuntime = null; this.EndpointLookupDone = false; if (this.ChannelHandlerOwnsCallThrottle) { this.ChannelHandler.DispatchDone(); this.ChannelHandlerOwnsCallThrottle = false; } if (this.ChannelHandlerOwnsInstanceContextThrottle) { this.ChannelHandler.throttle.DeactivateInstanceContext(); this.ChannelHandlerOwnsInstanceContextThrottle = false; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- XslException.cs
- MulticastDelegate.cs
- QuaternionIndependentAnimationStorage.cs
- RankException.cs
- ToolStripPanel.cs
- ListBoxItem.cs
- ItemCollectionEditor.cs
- XpsImage.cs
- sqlcontext.cs
- XmlSchemaAny.cs
- SharedStatics.cs
- XmlDomTextWriter.cs
- PageHandlerFactory.cs
- SelectedGridItemChangedEvent.cs
- ExpandedWrapper.cs
- GenericUriParser.cs
- AssemblySettingAttributes.cs
- TextRenderingModeValidation.cs
- OperationGenerator.cs
- ReadOnlyActivityGlyph.cs
- base64Transforms.cs
- processwaithandle.cs
- TCEAdapterGenerator.cs
- BinHexDecoder.cs
- WebPartExportVerb.cs
- StringDictionary.cs
- WebZone.cs
- ShaderEffect.cs
- TableRow.cs
- DefaultAsyncDataDispatcher.cs
- TokenBasedSet.cs
- FrameworkContentElementAutomationPeer.cs
- NestPullup.cs
- DataBindingList.cs
- CodeExporter.cs
- HtmlControlAdapter.cs
- SQLInt16.cs
- ProviderCollection.cs
- GradientBrush.cs
- DataPagerFieldItem.cs
- LineProperties.cs
- HttpPostedFile.cs
- XmlAutoDetectWriter.cs
- MouseEventArgs.cs
- ClientConvert.cs
- ExternalFile.cs
- BatchWriter.cs
- AmbientLight.cs
- WaitHandleCannotBeOpenedException.cs
- DelegateTypeInfo.cs
- diagnosticsswitches.cs
- XsltSettings.cs
- CodeAccessSecurityEngine.cs
- InputMethodStateChangeEventArgs.cs
- CapabilitiesUse.cs
- TextTreeObjectNode.cs
- TrailingSpaceComparer.cs
- FileRecordSequence.cs
- AbstractSvcMapFileLoader.cs
- DataPointer.cs
- EtwTrackingBehavior.cs
- RegexNode.cs
- PrintingPermissionAttribute.cs
- RequestCacheValidator.cs
- TypeElement.cs
- CellParagraph.cs
- PersistChildrenAttribute.cs
- BaseCollection.cs
- ForeignKeyConstraint.cs
- Knowncolors.cs
- GrammarBuilderDictation.cs
- QilIterator.cs
- DataGridViewTextBoxEditingControl.cs
- GraphicsPath.cs
- PenLineCapValidation.cs
- CodeGotoStatement.cs
- XmlnsCache.cs
- JavaScriptSerializer.cs
- Page.cs
- BindingBase.cs
- EnumBuilder.cs
- Axis.cs
- XmlSchemaCompilationSettings.cs
- EditorZone.cs
- VoiceChangeEventArgs.cs
- HttpProfileGroupBase.cs
- InputLanguageCollection.cs
- ConnectionManagementSection.cs
- ScrollViewer.cs
- LineBreak.cs
- CancellationHandler.cs
- ControlIdConverter.cs
- AppDomainManager.cs
- X509UI.cs
- AuthenticationException.cs
- GeometryCombineModeValidation.cs
- FormatterServices.cs
- FullTrustAssembly.cs
- GridEntry.cs
- GenericXmlSecurityTokenAuthenticator.cs