Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / SingletonConnectionReader.cs / 1 / SingletonConnectionReader.cs
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//-----------------------------------------------------------
namespace System.ServiceModel.Channels
{
using System;
using System.ServiceModel;
using System.ServiceModel.Activation;
using System.ServiceModel.Dispatcher;
using System.ServiceModel.Description;
using System.IO;
using System.IdentityModel.Claims;
using System.IdentityModel.Policy;
using System.ServiceModel.Security;
using System.Threading;
using System.ServiceModel.Diagnostics;
using System.Xml;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Net;
delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
delegate ISingletonChannelListener SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
interface ISingletonChannelListener
{
TimeSpan ReceiveTimeout { get; }
void ReceiveRequest(RequestContext requestContext, ItemDequeuedCallback callback, bool canDispatchOnThisThread);
}
class ServerSingletonPreambleConnectionReader : InitialServerConnectionReader
{
ServerSingletonDecoder decoder;
ServerSingletonPreambleCallback callback;
WaitCallback onAsyncReadComplete;
IConnectionOrientedTransportFactorySettings transportSettings;
TransportSettingsCallback transportSettingsCallback;
SecurityMessageProperty security;
Uri via;
IConnection rawConnection;
byte[] connectionBuffer;
bool isReadPending;
int offset;
int size;
TimeoutHelper receiveTimeoutHelper;
OnViaDelegate viaDelegate;
public ServerSingletonPreambleConnectionReader(IConnection connection, ItemDequeuedCallback connectionDequeuedCallback,
long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback,
ConnectionClosedCallback closedCallback, ServerSingletonPreambleCallback callback)
: base(connection, closedCallback)
{
this.decoder = new ServerSingletonDecoder(streamPosition, MaxViaSize, MaxContentTypeSize);
this.offset = offset;
this.size = size;
this.callback = callback;
this.transportSettingsCallback = transportSettingsCallback;
this.rawConnection = connection;
this.ConnectionDequeuedCallback = connectionDequeuedCallback;
}
public int BufferOffset
{
get { return this.offset; }
}
public int BufferSize
{
get { return this.size; }
}
public ServerSingletonDecoder Decoder
{
get { return this.decoder; }
}
public IConnection RawConnection
{
get { return this.rawConnection; }
}
public Uri Via
{
get { return this.via; }
}
public IConnectionOrientedTransportFactorySettings TransportSettings
{
get { return this.transportSettings; }
}
public SecurityMessageProperty Security
{
get { return this.security; }
}
TimeSpan GetRemainingTimeout()
{
return this.receiveTimeoutHelper.RemainingTime();
}
void ReadAndDispatch()
{
bool success = false;
try
{
while ((size > 0 || !isReadPending) && !IsClosed)
{
if (size == 0)
{
isReadPending = true;
if (onAsyncReadComplete == null)
{
onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete);
}
if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(),
onAsyncReadComplete, null) == AsyncReadResult.Queued)
{
break;
}
HandleReadComplete();
}
int bytesRead = decoder.Decode(connectionBuffer, offset, size);
if (bytesRead > 0)
{
offset += bytesRead;
size -= bytesRead;
}
if (decoder.CurrentState == ServerSingletonDecoder.State.PreUpgradeStart)
{
this.via = decoder.Via;
if (!Connection.Validate(via))
{
// This goes through the failure path (Abort) even though it doesn't throw.
return;
}
if (viaDelegate != null)
{
try
{
viaDelegate(via);
}
catch (ServiceActivationException e)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
// return fault and close connection
SendFault(FramingEncodingString.ServiceActivationFailedFault);
break;
}
}
this.transportSettings = transportSettingsCallback(via);
if (transportSettings == null)
{
EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via));
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(e, TraceEventType.Information);
}
SendFault(FramingEncodingString.EndpointNotFoundFault);
return;
}
// we have enough information to hand off to a channel. Our job is done
callback(this);
break;
}
}
success = true;
}
catch (CommunicationException exception)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
}
}
catch (TimeoutException exception)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
}
}
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
if (!ExceptionHandler.HandleTransportExceptionHelper(e))
{
throw;
}
// containment -- we abort ourselves for any error, no extra containment needed
}
finally
{
if (!success)
{
Abort();
}
}
}
public void SendFault(string faultString)
{
SendFault(faultString, ref this.receiveTimeoutHelper);
}
void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
{
InitialServerConnectionReader.SendFault(Connection, faultString,
connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize);
}
// finish preamble (upgrade and ACK)
public IConnection CompletePreamble(TimeSpan timeout)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
if (!transportSettings.MessageEncoderFactory.Encoder.IsContentTypeSupported(decoder.ContentType))
{
SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper);
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(
SR.ContentTypeMismatch, decoder.ContentType, transportSettings.MessageEncoderFactory.Encoder.ContentType)));
}
StreamUpgradeAcceptor upgradeAcceptor = null;
StreamUpgradeProvider upgrade = transportSettings.Upgrade;
if (upgrade != null)
{
upgradeAcceptor = upgrade.CreateUpgradeAcceptor();
}
IConnection currentConnection = this.Connection;
for (; ; )
{
if (size == 0)
{
offset = 0;
size = currentConnection.Read(connectionBuffer, 0, connectionBuffer.Length, timeoutHelper.RemainingTime());
if (size == 0)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
}
}
for (; ; )
{
int bytesRead = decoder.Decode(connectionBuffer, offset, size);
if (bytesRead > 0)
{
offset += bytesRead;
size -= bytesRead;
}
switch (decoder.CurrentState)
{
case ServerSingletonDecoder.State.UpgradeRequest:
if (upgradeAcceptor == null)
{
SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, decoder.Upgrade)));
}
if (!upgradeAcceptor.CanUpgrade(decoder.Upgrade))
{
SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, decoder.Upgrade)));
}
// accept upgrade
currentConnection.Write(ServerSingletonEncoder.UpgradeResponseBytes, 0, ServerSingletonEncoder.UpgradeResponseBytes.Length, true, timeoutHelper.RemainingTime());
IConnection connectionToUpgrade = currentConnection;
if (this.size > 0)
{
connectionToUpgrade = new PreReadConnection(connectionToUpgrade, this.connectionBuffer, this.offset, this.size);
}
try
{
currentConnection = InitialServerConnectionReader.UpgradeConnection(connectionToUpgrade, upgradeAcceptor, transportSettings);
connectionBuffer = currentConnection.AsyncReadBuffer;
}
#pragma warning suppress 56500
catch (Exception exception)
{
if (DiagnosticUtility.IsFatal(exception))
throw;
// Audit Authentication Failure
WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
throw;
}
break;
case ServerSingletonDecoder.State.Start:
SetupSecurityIfNecessary(upgradeAcceptor);
// we've finished the preamble. Ack and return.
currentConnection.Write(ServerSessionEncoder.AckResponseBytes, 0,
ServerSessionEncoder.AckResponseBytes.Length, true, timeoutHelper.RemainingTime());
return currentConnection;
}
if (size == 0)
{
break;
}
}
}
}
void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor)
{
StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = upgradeAcceptor as StreamSecurityUpgradeAcceptor;
if (securityUpgradeAcceptor != null)
{
this.security = securityUpgradeAcceptor.GetRemoteSecurity();
if (this.security == null)
{
Exception securityFailedException = new ProtocolException(
SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via));
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException);
}
// Audit Authentication Success
WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null);
}
}
#region Transport Security Auditing
void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception)
{
try
{
WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception);
}
#pragma warning suppress 56500 // covered by FxCop
catch (Exception auditException)
{
if (DiagnosticUtility.IsFatal(auditException))
{
throw;
}
DiagnosticUtility.ExceptionUtility.TraceHandledException(auditException, TraceEventType.Error);
}
}
void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception)
{
if ((this.transportSettings.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel)
{
return;
}
if (securityUpgradeAcceptor == null)
{
return;
}
String primaryIdentity = String.Empty;
SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity();
if (clientSecurity != null)
{
primaryIdentity = GetIdentityNameFromContext(clientSecurity);
}
ServiceSecurityAuditBehavior auditBehavior = this.transportSettings.AuditBehavior;
if (auditLevel == AuditLevel.Success)
{
SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation,
auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity);
}
else
{
SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation,
auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity, exception);
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity)
{
return SecurityUtils.GetIdentityNamesFromContext(
clientSecurity.ServiceSecurityContext.AuthorizationContext);
}
#endregion
void HandleReadComplete()
{
offset = 0;
size = Connection.EndRead();
isReadPending = false;
if (size == 0)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
}
}
void OnAsyncReadComplete(object state)
{
bool success = false;
try
{
HandleReadComplete();
ReadAndDispatch();
success = true;
}
catch (CommunicationException exception)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
}
}
catch (TimeoutException exception)
{
if (DiagnosticUtility.ShouldTraceInformation)
{
DiagnosticUtility.ExceptionUtility.TraceHandledException(exception, TraceEventType.Information);
}
}
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
if (!ExceptionHandler.HandleTransportExceptionHelper(e))
{
throw;
}
// containment -- we abort ourselves for any error, no extra containment needed
}
finally
{
if (!success)
{
Abort();
}
}
}
public void StartReading(OnViaDelegate viaDelegate, TimeSpan timeout)
{
this.viaDelegate = viaDelegate;
this.receiveTimeoutHelper = new TimeoutHelper(timeout);
this.connectionBuffer = Connection.AsyncReadBuffer;
ReadAndDispatch();
}
}
class ServerSingletonConnectionReader : SingletonConnectionReader
{
ConnectionDemuxer connectionDemuxer;
ServerSingletonDecoder decoder;
IConnection rawConnection;
string contentType;
public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader,
IConnection upgradedConnection, ConnectionDemuxer connectionDemuxer)
: base(upgradedConnection, preambleReader.BufferOffset, preambleReader.BufferSize,
preambleReader.Security, preambleReader.TransportSettings, preambleReader.Via)
{
this.decoder = preambleReader.Decoder;
this.contentType = this.decoder.ContentType;
this.connectionDemuxer = connectionDemuxer;
this.rawConnection = preambleReader.RawConnection;
}
protected override string ContentType
{
get { return this.contentType; }
}
protected override long StreamPosition
{
get { return this.decoder.StreamPosition; }
}
protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof)
{
while (size > 0)
{
int bytesRead = decoder.Decode(buffer, offset, size);
if (bytesRead > 0)
{
offset += bytesRead;
size -= bytesRead;
}
switch (decoder.CurrentState)
{
case ServerSingletonDecoder.State.EnvelopeStart:
// we're at the envelope
return true;
case ServerSingletonDecoder.State.End:
isAtEof = true;
return false;
}
}
return false;
}
protected override void OnClose(TimeSpan timeout)
{
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
// send back EOF and then recycle the connection
this.Connection.Write(SingletonEncoder.EndBytes, 0, SingletonEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime());
this.connectionDemuxer.ReuseConnection(this.rawConnection, timeoutHelper.RemainingTime());
}
protected override void PrepareMessage(Message message)
{
base.PrepareMessage(message);
IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint;
// pipes will return null
if (remoteEndPoint != null)
{
RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint);
message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty);
}
}
}
abstract class SingletonConnectionReader
{
IConnection connection;
bool doneReceiving;
bool doneSending;
bool isAtEof;
bool isClosed;
SecurityMessageProperty security;
object thisLock = new object();
int offset;
int size;
IConnectionOrientedTransportFactorySettings transportSettings;
Uri via;
Stream inputStream;
protected SingletonConnectionReader(IConnection connection, int offset, int size, SecurityMessageProperty security,
IConnectionOrientedTransportFactorySettings transportSettings, Uri via)
{
this.connection = connection;
this.offset = offset;
this.size = size;
this.security = security;
this.transportSettings = transportSettings;
this.via = via;
}
protected IConnection Connection
{
get
{
return this.connection;
}
}
protected object ThisLock
{
get
{
return this.thisLock;
}
}
protected virtual string ContentType
{
get { return null; }
}
protected abstract long StreamPosition { get; }
public void Abort()
{
this.connection.Abort();
}
public void DoneReceiving(bool atEof)
{
DoneReceiving(atEof, this.transportSettings.CloseTimeout);
}
void DoneReceiving(bool atEof, TimeSpan timeout)
{
if (!this.doneReceiving)
{
this.isAtEof = atEof;
this.doneReceiving = true;
if (this.doneSending)
{
this.Close(timeout);
}
}
}
public void Close(TimeSpan timeout)
{
lock (ThisLock)
{
if (this.isClosed)
{
return;
}
this.isClosed = true;
}
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
bool success = false;
try
{
// first drain our stream if necessary
if (this.inputStream != null)
{
byte[] dummy = DiagnosticUtility.Utility.AllocateByteArray(transportSettings.ConnectionBufferSize);
while (!this.isAtEof)
{
this.inputStream.ReadTimeout = TimeoutHelper.ToMilliseconds(timeoutHelper.RemainingTime());
int bytesRead = this.inputStream.Read(dummy, 0, dummy.Length);
if (bytesRead == 0)
{
this.isAtEof = true;
}
}
}
OnClose(timeoutHelper.RemainingTime());
success = true;
}
finally
{
if (!success)
{
this.Abort();
}
}
}
protected abstract void OnClose(TimeSpan timeout);
public void DoneSending(TimeSpan timeout)
{
this.doneSending = true;
if (this.doneReceiving)
{
this.Close(timeout);
}
}
protected abstract bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof);
protected virtual void PrepareMessage(Message message)
{
message.Properties.Via = this.via;
message.Properties.Security = (this.security != null) ? (SecurityMessageProperty)this.security.CreateCopy() : null;
}
public RequestContext ReceiveRequest(TimeSpan timeout)
{
Message requestMessage = Receive(timeout);
return new StreamedFramingRequestContext(this, requestMessage);
}
public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
return new ReceiveAsyncResult(this, timeout, callback, state);
}
public virtual Message EndReceive(IAsyncResult result)
{
return ReceiveAsyncResult.End(result);
}
public Message Receive(TimeSpan timeout)
{
byte[] buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize);
if (size > 0)
{
Buffer.BlockCopy(connection.AsyncReadBuffer, offset, buffer, offset, size);
}
TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
for (; ; )
{
if (DecodeBytes(buffer, ref offset, ref size, ref isAtEof))
{
break;
}
if (this.isAtEof)
{
DoneReceiving(true, timeoutHelper.RemainingTime());
return null;
}
if (size == 0)
{
offset = 0;
size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());
if (size == 0)
{
DoneReceiving(true, timeoutHelper.RemainingTime());
return null;
}
}
}
// we're ready to read a message
IConnection singletonConnection = this.connection;
if (size > 0)
{
byte[] initialData = DiagnosticUtility.Utility.AllocateByteArray(size);
Buffer.BlockCopy(buffer, offset, initialData, 0, size);
singletonConnection = new PreReadConnection(singletonConnection, initialData);
}
Stream connectionStream = new SingletonInputConnectionStream(this, singletonConnection, this.transportSettings);
this.inputStream = new MaxMessageSizeStream(connectionStream, transportSettings.MaxReceivedMessageSize);
using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null)
{
if (DiagnosticUtility.ShouldUseActivity)
{
ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage);
}
Message message = null;
try
{
message = transportSettings.MessageEncoderFactory.Encoder.ReadMessage(
this.inputStream, transportSettings.MaxBufferSize, this.ContentType);
}
catch (XmlException xmlException)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
}
if (DiagnosticUtility.ShouldUseActivity)
{
TraceUtility.TransferFromTransport(message);
}
PrepareMessage(message);
return message;
}
}
class ReceiveAsyncResult : AsyncResult
{
static WaitCallback onReceiveScheduled = new WaitCallback(OnReceiveScheduled);
Message message;
SingletonConnectionReader parent;
TimeSpan timeout;
public ReceiveAsyncResult(SingletonConnectionReader parent, TimeSpan timeout, AsyncCallback callback,
object state)
: base(callback, state)
{
this.parent = parent;
this.timeout = timeout;
//
IOThreadScheduler.ScheduleCallback(onReceiveScheduled, this);
}
public static Message End(IAsyncResult result)
{
ReceiveAsyncResult receiveAsyncResult = AsyncResult.End(result);
return receiveAsyncResult.message;
}
static void OnReceiveScheduled(object state)
{
ReceiveAsyncResult thisPtr = (ReceiveAsyncResult)state;
Exception completionException = null;
try
{
thisPtr.message = thisPtr.parent.Receive(thisPtr.timeout);
}
#pragma warning suppress 56500 // [....], transferring exception to another thread
catch (Exception exception)
{
if (DiagnosticUtility.IsFatal(exception))
{
throw;
}
completionException = exception;
}
thisPtr.Complete(false, completionException);
}
}
class StreamedFramingRequestContext : RequestContextBase
{
IConnection connection;
SingletonConnectionReader parent;
IConnectionOrientedTransportFactorySettings settings;
TimeoutHelper timeoutHelper;
public StreamedFramingRequestContext(SingletonConnectionReader parent, Message requestMessage)
: base(requestMessage, parent.transportSettings.CloseTimeout, parent.transportSettings.SendTimeout)
{
this.parent = parent;
this.connection = parent.connection;
this.settings = parent.transportSettings;
}
protected override void OnAbort()
{
this.parent.Abort();
}
protected override void OnClose(TimeSpan timeout)
{
this.parent.Close(timeout);
}
protected override void OnReply(Message message, TimeSpan timeout)
{
timeoutHelper = new TimeoutHelper(timeout);
StreamingConnectionHelper.WriteMessage(message, this.connection, false, this.settings, ref timeoutHelper);
parent.DoneSending(timeoutHelper.RemainingTime());
}
protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
{
timeoutHelper = new TimeoutHelper(timeout);
return StreamingConnectionHelper.BeginWriteMessage(message, this.connection, false, this.settings,
ref timeoutHelper, callback, state);
}
protected override void OnEndReply(IAsyncResult result)
{
StreamingConnectionHelper.EndWriteMessage(result);
parent.DoneSending(timeoutHelper.RemainingTime());
}
}
// ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers
class SingletonInputConnectionStream : ConnectionStream
{
SingletonMessageDecoder decoder;
SingletonConnectionReader reader;
bool atEof;
byte[] chunkBuffer; // used for when we have overflow
int chunkBufferOffset;
int chunkBufferSize;
int chunkBytesRemaining;
public SingletonInputConnectionStream(SingletonConnectionReader reader, IConnection connection,
IDefaultCommunicationTimeouts defaultTimeouts)
: base(connection, defaultTimeouts)
{
this.reader = reader;
this.decoder = new SingletonMessageDecoder(reader.StreamPosition);
this.chunkBytesRemaining = 0;
this.chunkBuffer = new byte[IntEncoder.MaxEncodedSize];
}
void AbortReader()
{
this.reader.Abort();
}
public override void Close()
{
this.reader.DoneReceiving(this.atEof);
}
// run chunk data through the decoder
void DecodeData(byte[] buffer, int offset, int size)
{
while (size > 0)
{
int bytesRead = decoder.Decode(buffer, offset, size);
offset += bytesRead;
size -= bytesRead;
DiagnosticUtility.DebugAssert(decoder.CurrentState == SingletonMessageDecoder.State.ReadingEnvelopeBytes || decoder.CurrentState == SingletonMessageDecoder.State.ChunkEnd, "");
}
}
// run the current data through the decoder to get valid message bytes
void DecodeSize(byte[] buffer, ref int offset, ref int size)
{
while (size > 0)
{
int bytesRead = decoder.Decode(buffer, offset, size);
if (bytesRead > 0)
{
offset += bytesRead;
size -= bytesRead;
}
switch (decoder.CurrentState)
{
case SingletonMessageDecoder.State.ChunkStart:
this.chunkBytesRemaining = decoder.ChunkSize;
// if we have overflow and we're not decoding out of our buffer, copy over
if (size > 0 && !object.ReferenceEquals(buffer, this.chunkBuffer))
{
DiagnosticUtility.DebugAssert(size <= this.chunkBuffer.Length, "");
Buffer.BlockCopy(buffer, offset, this.chunkBuffer, 0, size);
this.chunkBufferOffset = 0;
this.chunkBufferSize = size;
}
return;
case SingletonMessageDecoder.State.End:
ProcessEof();
return;
}
}
}
int ReadCore(byte[] buffer, int offset, int count)
{
int bytesRead = -1;
try
{
bytesRead = base.Read(buffer, offset, count);
if (bytesRead == 0)
{
ProcessEof();
}
}
finally
{
if (bytesRead == -1) // there was an exception
{
AbortReader();
}
}
return bytesRead;
}
public override int Read(byte[] buffer, int offset, int count)
{
int result = 0;
while (true)
{
if (count == 0)
{
return result;
}
if (this.atEof)
{
return result;
}
// first deal with any residual carryover
if (this.chunkBufferSize > 0)
{
int bytesToCopy = Math.Min(chunkBytesRemaining,
Math.Min(this.chunkBufferSize, count));
Buffer.BlockCopy(this.chunkBuffer, this.chunkBufferOffset, buffer, offset, bytesToCopy);
// keep decoder up to date
DecodeData(this.chunkBuffer, this.chunkBufferOffset, bytesToCopy);
this.chunkBufferOffset += bytesToCopy;
this.chunkBufferSize -= bytesToCopy;
this.chunkBytesRemaining -= bytesToCopy;
if (this.chunkBytesRemaining == 0 && this.chunkBufferSize > 0)
{
DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
}
result += bytesToCopy;
offset += bytesToCopy;
count -= bytesToCopy;
}
else if (chunkBytesRemaining > 0)
{
// We're in the middle of a chunk. Try and include the next chunk size as well
int bytesRead = ReadCore(buffer, offset, Math.Min(count, chunkBytesRemaining + IntEncoder.MaxEncodedSize));
// keep decoder up to date
DecodeData(buffer, offset, Math.Min(bytesRead, this.chunkBytesRemaining));
if (bytesRead > chunkBytesRemaining)
{
result += this.chunkBytesRemaining;
int overflowCount = bytesRead - chunkBytesRemaining;
int overflowOffset = offset + chunkBytesRemaining;
this.chunkBytesRemaining = 0;
// read at least part of the next chunk, and put any overflow in this.chunkBuffer
DecodeSize(buffer, ref overflowOffset, ref overflowCount);
}
else
{
result += bytesRead;
this.chunkBytesRemaining -= bytesRead;
}
return result;
}
else
{
// Final case: we have a new chunk. Read the size, and loop around again
if (count < IntEncoder.MaxEncodedSize)
{
// we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer
this.chunkBufferOffset = 0;
this.chunkBufferSize = ReadCore(this.chunkBuffer, 0, this.chunkBuffer.Length);
DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
}
else
{
int bytesRead = ReadCore(buffer, offset, IntEncoder.MaxEncodedSize);
int sizeOffset = offset;
DecodeSize(buffer, ref sizeOffset, ref bytesRead);
}
}
}
}
void ProcessEof()
{
if (!this.atEof)
{
this.atEof = true;
if (this.chunkBufferSize > 0 || this.chunkBytesRemaining > 0
|| decoder.CurrentState != SingletonMessageDecoder.State.End)
{
throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
}
this.reader.DoneReceiving(true);
}
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return new ReadAsyncResult(this, buffer, offset, count, callback, state);
}
public override int EndRead(IAsyncResult result)
{
return ReadAsyncResult.End(result);
}
public class ReadAsyncResult : AsyncResult
{
SingletonInputConnectionStream parent;
int result;
public ReadAsyncResult(SingletonInputConnectionStream parent,
byte[] buffer, int offset, int count, AsyncCallback callback, object state)
: base(callback, state)
{
this.parent = parent;
//
this.result = this.parent.Read(buffer, offset, count);
base.Complete(true);
}
public static int End(IAsyncResult result)
{
ReadAsyncResult thisPtr = AsyncResult.End(result);
return thisPtr.result;
}
}
}
}
static class StreamingConnectionHelper
{
public static void WriteMessage(Message message, IConnection connection, bool isRequest,
IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper)
{
byte[] endBytes = null;
if (message != null)
{
MessageEncoder messageEncoder = settings.MessageEncoderFactory.Encoder;
byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
bool writeStreamed;
if (isRequest)
{
endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
}
else
{
endBytes = SingletonEncoder.EnvelopeEndBytes;
writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
}
if (writeStreamed)
{
connection.Write(envelopeStartBytes, 0, envelopeStartBytes.Length, false, timeoutHelper.RemainingTime());
Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
messageEncoder.WriteMessage(message, writeTimeoutStream);
}
else
{
ArraySegment messageData = messageEncoder.WriteMessage(message,
int.MaxValue, settings.BufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
messageData = SingletonEncoder.EncodeMessageFrame(messageData);
Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
envelopeStartBytes.Length);
connection.Write(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), settings.BufferManager);
}
}
else if (isRequest) // context handles response end bytes
{
endBytes = SingletonEncoder.EndBytes;
}
if (endBytes != null)
{
connection.Write(endBytes, 0, endBytes.Length,
true, timeoutHelper.RemainingTime());
}
}
public static IAsyncResult BeginWriteMessage(Message message, IConnection connection, bool isRequest,
IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
AsyncCallback callback, object state)
{
return new WriteMessageAsyncResult(message, connection, isRequest, settings, ref timeoutHelper, callback, state);
}
public static void EndWriteMessage(IAsyncResult result)
{
WriteMessageAsyncResult.End(result);
}
// overrides ConnectionStream to add a Framing int at the beginning of each record
class StreamingOutputConnectionStream : ConnectionStream
{
byte[] encodedSize;
public StreamingOutputConnectionStream(IConnection connection, IDefaultCommunicationTimeouts timeouts)
: base(connection, timeouts)
{
this.encodedSize = new byte[IntEncoder.MaxEncodedSize];
}
void WriteChunkSize(int size)
{
if (size > 0)
{
int bytesEncoded = IntEncoder.Encode(size, encodedSize, 0);
base.Connection.Write(encodedSize, 0, bytesEncoded, false, TimeSpan.FromMilliseconds(this.WriteTimeout));
}
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
WriteChunkSize(count);
return base.BeginWrite(buffer, offset, count, callback, state);
}
public override void WriteByte(byte value)
{
WriteChunkSize(1);
base.WriteByte(value);
}
public override void Write(byte[] buffer, int offset, int count)
{
WriteChunkSize(count);
base.Write(buffer, offset, count);
}
}
class WriteMessageAsyncResult : AsyncResult
{
IConnection connection;
MessageEncoder encoder;
BufferManager bufferManager;
Message message;
static AsyncCallback onWriteBufferedMessage;
static AsyncCallback onWriteStartBytes;
static WaitCallback onWriteStartBytesScheduled;
static AsyncCallback onWriteEndBytes =
DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteEndBytes));
byte[] bufferToFree;
IConnectionOrientedTransportFactorySettings settings;
TimeoutHelper timeoutHelper;
byte[] endBytes;
public WriteMessageAsyncResult(Message message, IConnection connection, bool isRequest,
IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
AsyncCallback callback, object state)
: base(callback, state)
{
this.connection = connection;
this.encoder = settings.MessageEncoderFactory.Encoder;
this.bufferManager = settings.BufferManager;
this.timeoutHelper = timeoutHelper;
this.message = message;
this.settings = settings;
bool throwing = true;
bool completeSelf = false;
if (message == null)
{
if (isRequest) // context takes care of the end bytes on Close/reader.EOF
{
this.endBytes = SingletonEncoder.EndBytes;
}
completeSelf = WriteEndBytes();
}
else
{
try
{
byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
bool writeStreamed;
if (isRequest)
{
this.endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
}
else
{
this.endBytes = SingletonEncoder.EnvelopeEndBytes;
writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
}
if (writeStreamed)
{
if (onWriteStartBytes == null)
{
onWriteStartBytes = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteStartBytes));
}
IAsyncResult writeStartBytesResult = connection.BeginWrite(envelopeStartBytes, 0, envelopeStartBytes.Length, true,
timeoutHelper.RemainingTime(), onWriteStartBytes, this);
if (writeStartBytesResult.CompletedSynchronously)
{
if (onWriteStartBytesScheduled == null)
{
onWriteStartBytesScheduled = new WaitCallback(OnWriteStartBytesScheduled);
}
IOThreadScheduler.ScheduleCallback(onWriteStartBytesScheduled, writeStartBytesResult);
}
}
else
{
ArraySegment messageData = settings.MessageEncoderFactory.Encoder.WriteMessage(message,
int.MaxValue, this.bufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
messageData = SingletonEncoder.EncodeMessageFrame(messageData);
this.bufferToFree = messageData.Array;
Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
envelopeStartBytes.Length);
if (onWriteBufferedMessage == null)
{
onWriteBufferedMessage = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(OnWriteBufferedMessage));
}
IAsyncResult writeBufferedResult =
connection.BeginWrite(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(),
onWriteBufferedMessage, this);
if (writeBufferedResult.CompletedSynchronously)
{
completeSelf = HandleWriteBufferedMessage(writeBufferedResult);
}
}
throwing = false;
}
finally
{
if (throwing)
{
Cleanup();
}
}
}
if (completeSelf)
{
base.Complete(true);
}
}
public static void End(IAsyncResult result)
{
AsyncResult.End(result);
}
void Cleanup()
{
if (bufferToFree != null)
{
this.bufferManager.ReturnBuffer(bufferToFree);
}
}
bool HandleWriteStartBytes(IAsyncResult result)
{
connection.EndWrite(result);
Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
this.encoder.WriteMessage(message, writeTimeoutStream);
return WriteEndBytes();
}
bool HandleWriteBufferedMessage(IAsyncResult result)
{
this.connection.EndWrite(result);
return WriteEndBytes();
}
bool WriteEndBytes()
{
if (this.endBytes == null)
{
Cleanup();
return true;
}
IAsyncResult result = connection.BeginWrite(endBytes, 0,
endBytes.Length, true, timeoutHelper.RemainingTime(), onWriteEndBytes, this);
if (!result.CompletedSynchronously)
{
return false;
}
return HandleWriteEndBytes(result);
}
bool HandleWriteEndBytes(IAsyncResult result)
{
this.connection.EndWrite(result);
Cleanup();
return true;
}
static void OnWriteStartBytes(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
OnWriteStartBytesCallbackHelper(result);
}
static void OnWriteStartBytesScheduled(object state)
{
OnWriteStartBytesCallbackHelper((IAsyncResult) state);
}
static void OnWriteStartBytesCallbackHelper(IAsyncResult result)
{
WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState;
Exception completionException = null;
bool completeSelf = false;
bool throwing = true;
try
{
completeSelf = thisPtr.HandleWriteStartBytes(result);
throwing = false;
}
#pragma warning suppress 56500 // [....], transferring exception to another thread
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
completeSelf = true;
completionException = e;
}
finally
{
if (throwing)
{
thisPtr.Cleanup();
}
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
static void OnWriteBufferedMessage(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState;
Exception completionException = null;
bool completeSelf = false;
bool throwing = true;
try
{
completeSelf = thisPtr.HandleWriteBufferedMessage(result);
throwing = false;
}
#pragma warning suppress 56500 // [....], transferring exception to another thread
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
completeSelf = true;
completionException = e;
}
finally
{
if (throwing)
{
thisPtr.Cleanup();
}
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
static void OnWriteEndBytes(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)result.AsyncState;
Exception completionException = null;
bool completeSelf = false;
bool success = false;
try
{
completeSelf = thisPtr.HandleWriteEndBytes(result);
success = true;
}
#pragma warning suppress 56500 // [....], transferring exception to another thread
catch (Exception e)
{
if (DiagnosticUtility.IsFatal(e))
{
throw;
}
completeSelf = true;
completionException = e;
}
finally
{
if (!success)
{
thisPtr.Cleanup();
}
}
if (completeSelf)
{
thisPtr.Complete(false, completionException);
}
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
// Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- ConfigXmlAttribute.cs
- mediaclock.cs
- AtomicFile.cs
- ProviderConnectionPointCollection.cs
- PrimarySelectionAdorner.cs
- ServerIdentity.cs
- Vector3DConverter.cs
- PartialClassGenerationTask.cs
- DisplayMemberTemplateSelector.cs
- EntityStoreSchemaGenerator.cs
- EventLogEntry.cs
- EntityExpressionVisitor.cs
- MaterialGroup.cs
- ClosureBinding.cs
- BuildManagerHost.cs
- BeginStoryboard.cs
- OptimizerPatterns.cs
- DashStyles.cs
- ProfileService.cs
- Debug.cs
- XamlTreeBuilderBamlRecordWriter.cs
- LogSwitch.cs
- Section.cs
- PlainXmlSerializer.cs
- datacache.cs
- recordstatefactory.cs
- ParserContext.cs
- ListControlBoundActionList.cs
- XmlSchemaDatatype.cs
- ClientOperationFormatterProvider.cs
- StyleModeStack.cs
- wgx_render.cs
- SafeMemoryMappedViewHandle.cs
- ScrollPattern.cs
- Expr.cs
- CustomExpressionEventArgs.cs
- PermissionRequestEvidence.cs
- Label.cs
- ProxyWebPartConnectionCollection.cs
- TypeUsage.cs
- HotSpot.cs
- arabicshape.cs
- WinEventWrap.cs
- CheckoutException.cs
- XmlAttributes.cs
- VisualState.cs
- NotifyParentPropertyAttribute.cs
- WithParamAction.cs
- Effect.cs
- Brushes.cs
- InputBuffer.cs
- ThicknessAnimation.cs
- dsa.cs
- MouseGesture.cs
- EpmTargetTree.cs
- PropertyInfoSet.cs
- SymbolType.cs
- XMLUtil.cs
- ScriptManager.cs
- cookieexception.cs
- ImportContext.cs
- SuppressIldasmAttribute.cs
- StatementContext.cs
- WebPartDisplayModeEventArgs.cs
- PinnedBufferMemoryStream.cs
- _UriSyntax.cs
- ConcurrencyMode.cs
- XmlSchemaSimpleContentRestriction.cs
- CodeNamespaceImportCollection.cs
- TableAdapterManagerGenerator.cs
- DataGridViewSelectedRowCollection.cs
- DialogResultConverter.cs
- BindingUtils.cs
- JoinTreeSlot.cs
- TcpTransportSecurityElement.cs
- Dictionary.cs
- HttpContextWrapper.cs
- ArrangedElement.cs
- DataGridAddNewRow.cs
- Decimal.cs
- ButtonChrome.cs
- Mapping.cs
- EntityDesignerDataSourceView.cs
- Pair.cs
- DesignTimeDataBinding.cs
- EtwTrace.cs
- RequestQueue.cs
- AlternateView.cs
- EntityContainerEmitter.cs
- HttpsHostedTransportConfiguration.cs
- UInt32Storage.cs
- OletxTransactionManager.cs
- WebControlsSection.cs
- DropAnimation.xaml.cs
- TextRangeEditTables.cs
- SecurityDescriptor.cs
- BlobPersonalizationState.cs
- WebHttpSecurityModeHelper.cs
- DocumentXmlWriter.cs
- ScrollItemPattern.cs