Code:
/ Dotnetfx_Vista_SP2 / Dotnetfx_Vista_SP2 / 8.0.50727.4016 / DEVDIV / depot / DevDiv / releases / whidbey / NetFxQFE / ndp / fx / src / Net / System / Net / _StreamFramer.cs / 1 / _StreamFramer.cs
/*++ Copyright (c) 2000 Microsoft Corporation Module Name: _StreamFramer.cs Abstract: Author: Mauro Ottaviani original implementation Alexei Vopilov 20-Jul-2002 made it generic enough (still not perfect, consider IStreamFramer interface) Revision History: --*/ namespace System.Net { using System; using System.IO; using System.Runtime.InteropServices; using System.Threading; using System.ComponentModel; using System.Globalization; using System.Net; using System.Net.Sockets; internal class StreamFramer { private Stream m_Transport; private bool m_Eof; private FrameHeader m_WriteHeader = new FrameHeader(); private FrameHeader m_CurReadHeader = new FrameHeader(); private FrameHeader m_ReadVerifier = new FrameHeader(FrameHeader.IgnoreValue, FrameHeader.IgnoreValue, FrameHeader.IgnoreValue); //private const int c_DefaultBufferSize = 1024; //private int m_BufferSize = c_DefaultBufferSize; //private byte[] m_ReadBuffer = new byte[FrameHeader.SizeOf + m_BufferSize]; //private int m_CurReadOffset; private byte[] m_ReadHeaderBuffer; private byte[] m_WriteHeaderBuffer; private readonly AsyncCallback m_ReadFrameCallback; private readonly AsyncCallback m_BeginWriteCallback; private NetworkStream m_NetworkStream; //optimizing writes public StreamFramer(Stream Transport) { if (Transport == null || Transport == Stream.Null) { throw new ArgumentNullException("Transport"); } m_Transport = Transport; if(m_Transport.GetType() == typeof(NetworkStream)){ m_NetworkStream = Transport as NetworkStream; } m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size]; m_WriteHeaderBuffer = new byte[m_WriteHeader.Size]; m_ReadFrameCallback = new AsyncCallback(ReadFrameCallback); m_BeginWriteCallback = new AsyncCallback(BeginWriteCallback); } /* // Consider removing. public FrameHeader m_ReadVerifierHeader { get { return m_ReadVerifier; } // May not be called while IO is in progress set { m_ReadVerifier = value; m_CurReadHeader = m_ReadVerifier.Clone(); m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size]; } } */ public FrameHeader ReadHeader { get { return m_CurReadHeader; } } public FrameHeader WriteHeader { get { return m_WriteHeader; } /* // Consider removing. // May not be called while IO is in progress set { m_WriteHeader = value; m_WriteHeaderBuffer = new byte[m_WriteHeader.Size]; } */ } public Stream Transport { get { return m_Transport; } } /* // Consider removing. public bool EndOfFile { get { return m_Eof; } } */ /* // Consider removing. public bool CanRead { get { return Transport.CanRead; } } */ /* // Consider removing. public bool CanWrite { get { return Transport.CanWrite; } } */ public byte[] ReadMessage() { if (m_Eof) { return null; } int offset = 0; byte[] buffer = m_ReadHeaderBuffer; int bytesRead; while (offset < buffer.Length) { bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); if (bytesRead == 0) { if (offset == 0) { // m_Eof, return null m_Eof = true; return null; } else { throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); } } offset += bytesRead; } m_CurReadHeader.CopyFrom(buffer, 0, m_ReadVerifier); if (m_CurReadHeader.PayloadSize > m_CurReadHeader.MaxMessageSize) { throw new InvalidOperationException(SR.GetString(SR.net_frame_size, m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), m_CurReadHeader.PayloadSize.ToString(NumberFormatInfo.InvariantInfo))); } buffer = new byte[m_CurReadHeader.PayloadSize]; offset = 0; while (offset < buffer.Length) { bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); if (bytesRead == 0) { throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); } offset += bytesRead; } return buffer; } public IAsyncResult BeginReadMessage(AsyncCallback asyncCallback, object stateObject) { WorkerAsyncResult workerResult; if (m_Eof){ workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, null, 0, 0); workerResult.InvokeCallback(-1); return workerResult; } workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length); IAsyncResult result = Transport.BeginRead(m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length, m_ReadFrameCallback, workerResult); if (result.CompletedSynchronously) { ReadFrameComplete(result); } return workerResult; } private void ReadFrameCallback(IAsyncResult transportResult) { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName); if (transportResult.CompletedSynchronously) { return; } WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; try { ReadFrameComplete(transportResult); } catch (Exception e) { if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { throw; } if (!(e is IOException)) { e = new System.IO.IOException(SR.GetString(SR.net_io_readfailure, e.Message), e); } // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(e); } catch { Exception e1 = new System.IO.IOException(SR.GetString(SR.net_io_readfailure, string.Empty), new Exception(SR.GetString(SR.net_nonClsCompliantException))); // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(e1); } } // IO COMPLETION CALLBACK // // This callback is responsible for getting complete protocol frame // First, it reads the header // Second, it determines the frame size // Third, loops while not all frame received or an error. // private void ReadFrameComplete(IAsyncResult transportResult) { do { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameComplete|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName); WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; int bytesRead = Transport.EndRead(transportResult); workerResult.Offset += bytesRead; GlobalLog.Assert(workerResult.Offset <= workerResult.End, "StreamFramer::ReadFrameCallback|WRONG: offset - end = {0}", workerResult.Offset - workerResult.End); if (bytesRead <= 0) { // (by design) This indicates the stream has receives EOF // If we are in the middle of a Frame - fail, otherwise - produce EOF object result = null; if (!workerResult.HeaderDone && workerResult.Offset == 0) { result = (object)-1; } else { result = new System.IO.IOException(SR.GetString(SR.net_frame_read_io)); } workerResult.InvokeCallback(result); return; } if (workerResult.Offset >= workerResult.End) { if (!workerResult.HeaderDone) { workerResult.HeaderDone = true; // This indicates the header has been read succesfully m_CurReadHeader.CopyFrom(workerResult.Buffer, 0, m_ReadVerifier); int payloadSize = m_CurReadHeader.PayloadSize; if (payloadSize < 0) { // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(new System.IO.IOException(SR.GetString(SR.net_frame_read_size))); } if (payloadSize == 0) { // report emtpy frame (NOT eof!) to the caller, he might be interested in workerResult.InvokeCallback(0); return; } if (payloadSize > m_CurReadHeader.MaxMessageSize) { throw new InvalidOperationException(SR.GetString(SR.net_frame_size, m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), payloadSize.ToString(NumberFormatInfo.InvariantInfo))); } // Start reading the remaining frame data (note header does not count) byte[] frame = new byte[payloadSize]; // Save the ref of the data block workerResult.Buffer = frame; workerResult.End = frame.Length; workerResult.Offset = 0; // Transport.BeginRead below will pickup those changes } else { workerResult.HeaderDone = false; //reset for optional object reuse workerResult.InvokeCallback(workerResult.End); return; } } // This means we need more data to complete the data block transportResult = Transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, m_ReadFrameCallback, workerResult); } while(transportResult.CompletedSynchronously); } // // User will call this when workerResult gets signalled // // On Beginread User always gets back our WorkerAsyncResult // The Result property represents either a number of bytes read or an // exception put by our async state machine // public byte[] EndReadMessage(IAsyncResult asyncResult) { if (asyncResult==null) { throw new ArgumentNullException("asyncResult"); } WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; if (workerResult == null) { throw new ArgumentException(SR.GetString(SR.net_io_async_result, typeof(WorkerAsyncResult).FullName), "asyncResult"); } if (!workerResult.InternalPeekCompleted) { workerResult.InternalWaitForCompletion(); } if (workerResult.Result is Exception) { throw (Exception)(workerResult.Result); } int size = (int)workerResult.Result; if (size == -1) { m_Eof = true; return null; } else if (size == 0) { //empty frame return new byte[0]; } return workerResult.Buffer; } // // // // public void WriteMessage(byte[] message) { if (message == null) { throw new ArgumentNullException("message"); } m_WriteHeader.PayloadSize = message.Length; m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0); if (m_NetworkStream != null && message.Length != 0) { BufferOffsetSize[] buffers = new BufferOffsetSize[2]; buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); buffers[1] = new BufferOffsetSize(message, 0, message.Length, false); m_NetworkStream.MultipleWrite(buffers); } else { Transport.Write(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length); if (message.Length==0) { return; } Transport.Write(message, 0, message.Length); } } // // // // public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallback, object stateObject) { if (message == null) { throw new ArgumentNullException("message"); } m_WriteHeader.PayloadSize = message.Length; m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0); if (m_NetworkStream != null && message.Length != 0) { BufferOffsetSize[] buffers = new BufferOffsetSize[2]; buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); buffers[1] = new BufferOffsetSize(message, 0, message.Length, false); return m_NetworkStream.BeginMultipleWrite(buffers, asyncCallback, stateObject); } if (message.Length == 0) { return Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, asyncCallback, stateObject); } //Will need two async writes // Prepare the second WorkerAsyncResult workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, message, 0, message.Length); // Charge the first IAsyncResult result = Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, m_BeginWriteCallback, workerResult); if (result.CompletedSynchronously) { BeginWriteComplete(result); } return workerResult; } private void BeginWriteCallback(IAsyncResult transportResult) { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::BeginWriteCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.AsyncState.GetType().FullName); if (transportResult.CompletedSynchronously) { return; } WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; try { BeginWriteComplete(transportResult); } catch (Exception e) { if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { throw; } workerResult.InvokeCallback(e); } catch { workerResult.InvokeCallback(new Exception(SR.GetString(SR.net_nonClsCompliantException))); } } // IO COMPLETION CALLBACK // // Called when user IO request was wrapped to do several underlined IO // private void BeginWriteComplete(IAsyncResult transportResult) { do { WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; //First, complete the previous portion write Transport.EndWrite(transportResult); //Check on exit criterion if (workerResult.Offset == workerResult.End) { workerResult.InvokeCallback(); return; } //setup exit criterion workerResult.Offset = workerResult.End; //Write next portion (frame body) using Async IO transportResult = Transport.BeginWrite(workerResult.Buffer, 0, workerResult.End, m_BeginWriteCallback, workerResult); } while (transportResult.CompletedSynchronously); } public void EndWriteMessage(IAsyncResult asyncResult) { if (asyncResult==null) { throw new ArgumentNullException("asyncResult"); } WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; if (workerResult != null) { if (!workerResult.InternalPeekCompleted) { workerResult.InternalWaitForCompletion(); } if (workerResult.Result is Exception) { throw (Exception)(workerResult.Result); } } else { Transport.EndWrite(asyncResult); } } /* // Consider removing. public void Close() { Transport.Close(); } */ } // // This class wraps an Async IO request // It is based on our internal LazyAsyncResult helper // - If ParentResult is not null then the base class (LazyAsyncResult) methods must not be used // // - If ParentResult == null, then real user IO request is wrapped // /* // Consider removing. internal delegate void WorkerCallback(WorkerAsyncResult result); */ internal class WorkerAsyncResult : LazyAsyncResult { public byte[] Buffer; public int Offset; public int End; public bool IsWrite; public WorkerAsyncResult ParentResult; /* // Consider removing. public WorkerCallback StepDoneCallback; */ public bool HeaderDone; // This migth be reworked so we read both header and frame in one chunk public bool HandshakeDone; public WorkerAsyncResult(object asyncObject, object asyncState, AsyncCallback savedAsyncCallback, byte[] buffer, int offset, int end) : base( asyncObject, asyncState, savedAsyncCallback) { Buffer = buffer; Offset = offset; End = end; } /* // Consider removing. public WorkerAsyncResult(WorkerAsyncResult parentResult, byte[] buffer, int offset, int end) : base(null, null, null) { ParentResult = parentResult; Buffer = buffer; Offset = offset; End = end; } */ } // This guy describes the header used in framing of the stream data. internal class FrameHeader { public const int IgnoreValue = -1; public const int HandshakeDoneId= 20; public const int HandshakeErrId = 21; public const int HandshakeId = 22; public const int DefaultMajorV = 1; public const int DefaultMinorV = 0; private int _MessageId; private int _MajorV; private int _MinorV; private int _PayloadSize; public FrameHeader () { _MessageId = HandshakeId; _MajorV = DefaultMajorV; _MinorV = DefaultMinorV; _PayloadSize = -1; } public FrameHeader (int messageId, int majorV, int minorV) { _MessageId = messageId; _MajorV = majorV; _MinorV = minorV; _PayloadSize = -1; } /* // Consider removing. public FrameHeader Clone() { return new FrameHeader(_MessageId, _MajorV, _MinorV); } */ public int Size { get { return 5; } } public int MaxMessageSize { get { return 0xFFFF; } } public int MessageId { get { return _MessageId; } set { _MessageId = value; } } public int MajorV { get { return _MajorV; } } public int MinorV { get { return _MinorV; } } public int PayloadSize { get { return _PayloadSize; } set { if (value > MaxMessageSize) { throw new ArgumentException(SR.GetString(SR.net_frame_max_size, MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), value.ToString(NumberFormatInfo.InvariantInfo)), "PayloadSize"); } _PayloadSize = value; } } public void CopyTo(byte[] dest, int start) { dest[start++] = (byte)_MessageId; dest[start++] = (byte)_MajorV; dest[start++] = (byte)_MinorV; dest[start++] = (byte)((_PayloadSize >> 8) & 0xFF); dest[start] = (byte)(_PayloadSize & 0xFF); } public void CopyFrom(byte[] bytes, int start, FrameHeader verifier) { _MessageId = bytes[start++]; _MajorV = bytes[start++]; _MinorV = bytes[start++]; _PayloadSize = (int) ((bytes[start++]<<8) | bytes[start]); if (verifier.MessageId != FrameHeader.IgnoreValue && MessageId != verifier.MessageId) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MessageId", MessageId, verifier.MessageId)); } if (verifier.MajorV != FrameHeader.IgnoreValue && MajorV != verifier.MajorV) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MajorV", MajorV, verifier.MajorV)); } if (verifier.MinorV != FrameHeader.IgnoreValue && MinorV != verifier.MinorV) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MinorV", MinorV, verifier.MinorV)); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved. /*++ Copyright (c) 2000 Microsoft Corporation Module Name: _StreamFramer.cs Abstract: Author: Mauro Ottaviani original implementation Alexei Vopilov 20-Jul-2002 made it generic enough (still not perfect, consider IStreamFramer interface) Revision History: --*/ namespace System.Net { using System; using System.IO; using System.Runtime.InteropServices; using System.Threading; using System.ComponentModel; using System.Globalization; using System.Net; using System.Net.Sockets; internal class StreamFramer { private Stream m_Transport; private bool m_Eof; private FrameHeader m_WriteHeader = new FrameHeader(); private FrameHeader m_CurReadHeader = new FrameHeader(); private FrameHeader m_ReadVerifier = new FrameHeader(FrameHeader.IgnoreValue, FrameHeader.IgnoreValue, FrameHeader.IgnoreValue); //private const int c_DefaultBufferSize = 1024; //private int m_BufferSize = c_DefaultBufferSize; //private byte[] m_ReadBuffer = new byte[FrameHeader.SizeOf + m_BufferSize]; //private int m_CurReadOffset; private byte[] m_ReadHeaderBuffer; private byte[] m_WriteHeaderBuffer; private readonly AsyncCallback m_ReadFrameCallback; private readonly AsyncCallback m_BeginWriteCallback; private NetworkStream m_NetworkStream; //optimizing writes public StreamFramer(Stream Transport) { if (Transport == null || Transport == Stream.Null) { throw new ArgumentNullException("Transport"); } m_Transport = Transport; if(m_Transport.GetType() == typeof(NetworkStream)){ m_NetworkStream = Transport as NetworkStream; } m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size]; m_WriteHeaderBuffer = new byte[m_WriteHeader.Size]; m_ReadFrameCallback = new AsyncCallback(ReadFrameCallback); m_BeginWriteCallback = new AsyncCallback(BeginWriteCallback); } /* // Consider removing. public FrameHeader m_ReadVerifierHeader { get { return m_ReadVerifier; } // May not be called while IO is in progress set { m_ReadVerifier = value; m_CurReadHeader = m_ReadVerifier.Clone(); m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size]; } } */ public FrameHeader ReadHeader { get { return m_CurReadHeader; } } public FrameHeader WriteHeader { get { return m_WriteHeader; } /* // Consider removing. // May not be called while IO is in progress set { m_WriteHeader = value; m_WriteHeaderBuffer = new byte[m_WriteHeader.Size]; } */ } public Stream Transport { get { return m_Transport; } } /* // Consider removing. public bool EndOfFile { get { return m_Eof; } } */ /* // Consider removing. public bool CanRead { get { return Transport.CanRead; } } */ /* // Consider removing. public bool CanWrite { get { return Transport.CanWrite; } } */ public byte[] ReadMessage() { if (m_Eof) { return null; } int offset = 0; byte[] buffer = m_ReadHeaderBuffer; int bytesRead; while (offset < buffer.Length) { bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); if (bytesRead == 0) { if (offset == 0) { // m_Eof, return null m_Eof = true; return null; } else { throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); } } offset += bytesRead; } m_CurReadHeader.CopyFrom(buffer, 0, m_ReadVerifier); if (m_CurReadHeader.PayloadSize > m_CurReadHeader.MaxMessageSize) { throw new InvalidOperationException(SR.GetString(SR.net_frame_size, m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), m_CurReadHeader.PayloadSize.ToString(NumberFormatInfo.InvariantInfo))); } buffer = new byte[m_CurReadHeader.PayloadSize]; offset = 0; while (offset < buffer.Length) { bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); if (bytesRead == 0) { throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); } offset += bytesRead; } return buffer; } public IAsyncResult BeginReadMessage(AsyncCallback asyncCallback, object stateObject) { WorkerAsyncResult workerResult; if (m_Eof){ workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, null, 0, 0); workerResult.InvokeCallback(-1); return workerResult; } workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length); IAsyncResult result = Transport.BeginRead(m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length, m_ReadFrameCallback, workerResult); if (result.CompletedSynchronously) { ReadFrameComplete(result); } return workerResult; } private void ReadFrameCallback(IAsyncResult transportResult) { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName); if (transportResult.CompletedSynchronously) { return; } WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; try { ReadFrameComplete(transportResult); } catch (Exception e) { if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { throw; } if (!(e is IOException)) { e = new System.IO.IOException(SR.GetString(SR.net_io_readfailure, e.Message), e); } // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(e); } catch { Exception e1 = new System.IO.IOException(SR.GetString(SR.net_io_readfailure, string.Empty), new Exception(SR.GetString(SR.net_nonClsCompliantException))); // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(e1); } } // IO COMPLETION CALLBACK // // This callback is responsible for getting complete protocol frame // First, it reads the header // Second, it determines the frame size // Third, loops while not all frame received or an error. // private void ReadFrameComplete(IAsyncResult transportResult) { do { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameComplete|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName); WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; int bytesRead = Transport.EndRead(transportResult); workerResult.Offset += bytesRead; GlobalLog.Assert(workerResult.Offset <= workerResult.End, "StreamFramer::ReadFrameCallback|WRONG: offset - end = {0}", workerResult.Offset - workerResult.End); if (bytesRead <= 0) { // (by design) This indicates the stream has receives EOF // If we are in the middle of a Frame - fail, otherwise - produce EOF object result = null; if (!workerResult.HeaderDone && workerResult.Offset == 0) { result = (object)-1; } else { result = new System.IO.IOException(SR.GetString(SR.net_frame_read_io)); } workerResult.InvokeCallback(result); return; } if (workerResult.Offset >= workerResult.End) { if (!workerResult.HeaderDone) { workerResult.HeaderDone = true; // This indicates the header has been read succesfully m_CurReadHeader.CopyFrom(workerResult.Buffer, 0, m_ReadVerifier); int payloadSize = m_CurReadHeader.PayloadSize; if (payloadSize < 0) { // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(new System.IO.IOException(SR.GetString(SR.net_frame_read_size))); } if (payloadSize == 0) { // report emtpy frame (NOT eof!) to the caller, he might be interested in workerResult.InvokeCallback(0); return; } if (payloadSize > m_CurReadHeader.MaxMessageSize) { throw new InvalidOperationException(SR.GetString(SR.net_frame_size, m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), payloadSize.ToString(NumberFormatInfo.InvariantInfo))); } // Start reading the remaining frame data (note header does not count) byte[] frame = new byte[payloadSize]; // Save the ref of the data block workerResult.Buffer = frame; workerResult.End = frame.Length; workerResult.Offset = 0; // Transport.BeginRead below will pickup those changes } else { workerResult.HeaderDone = false; //reset for optional object reuse workerResult.InvokeCallback(workerResult.End); return; } } // This means we need more data to complete the data block transportResult = Transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, m_ReadFrameCallback, workerResult); } while(transportResult.CompletedSynchronously); } // // User will call this when workerResult gets signalled // // On Beginread User always gets back our WorkerAsyncResult // The Result property represents either a number of bytes read or an // exception put by our async state machine // public byte[] EndReadMessage(IAsyncResult asyncResult) { if (asyncResult==null) { throw new ArgumentNullException("asyncResult"); } WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; if (workerResult == null) { throw new ArgumentException(SR.GetString(SR.net_io_async_result, typeof(WorkerAsyncResult).FullName), "asyncResult"); } if (!workerResult.InternalPeekCompleted) { workerResult.InternalWaitForCompletion(); } if (workerResult.Result is Exception) { throw (Exception)(workerResult.Result); } int size = (int)workerResult.Result; if (size == -1) { m_Eof = true; return null; } else if (size == 0) { //empty frame return new byte[0]; } return workerResult.Buffer; } // // // // public void WriteMessage(byte[] message) { if (message == null) { throw new ArgumentNullException("message"); } m_WriteHeader.PayloadSize = message.Length; m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0); if (m_NetworkStream != null && message.Length != 0) { BufferOffsetSize[] buffers = new BufferOffsetSize[2]; buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); buffers[1] = new BufferOffsetSize(message, 0, message.Length, false); m_NetworkStream.MultipleWrite(buffers); } else { Transport.Write(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length); if (message.Length==0) { return; } Transport.Write(message, 0, message.Length); } } // // // // public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallback, object stateObject) { if (message == null) { throw new ArgumentNullException("message"); } m_WriteHeader.PayloadSize = message.Length; m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0); if (m_NetworkStream != null && message.Length != 0) { BufferOffsetSize[] buffers = new BufferOffsetSize[2]; buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); buffers[1] = new BufferOffsetSize(message, 0, message.Length, false); return m_NetworkStream.BeginMultipleWrite(buffers, asyncCallback, stateObject); } if (message.Length == 0) { return Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, asyncCallback, stateObject); } //Will need two async writes // Prepare the second WorkerAsyncResult workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, message, 0, message.Length); // Charge the first IAsyncResult result = Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, m_BeginWriteCallback, workerResult); if (result.CompletedSynchronously) { BeginWriteComplete(result); } return workerResult; } private void BeginWriteCallback(IAsyncResult transportResult) { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::BeginWriteCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.AsyncState.GetType().FullName); if (transportResult.CompletedSynchronously) { return; } WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; try { BeginWriteComplete(transportResult); } catch (Exception e) { if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { throw; } workerResult.InvokeCallback(e); } catch { workerResult.InvokeCallback(new Exception(SR.GetString(SR.net_nonClsCompliantException))); } } // IO COMPLETION CALLBACK // // Called when user IO request was wrapped to do several underlined IO // private void BeginWriteComplete(IAsyncResult transportResult) { do { WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; //First, complete the previous portion write Transport.EndWrite(transportResult); //Check on exit criterion if (workerResult.Offset == workerResult.End) { workerResult.InvokeCallback(); return; } //setup exit criterion workerResult.Offset = workerResult.End; //Write next portion (frame body) using Async IO transportResult = Transport.BeginWrite(workerResult.Buffer, 0, workerResult.End, m_BeginWriteCallback, workerResult); } while (transportResult.CompletedSynchronously); } public void EndWriteMessage(IAsyncResult asyncResult) { if (asyncResult==null) { throw new ArgumentNullException("asyncResult"); } WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; if (workerResult != null) { if (!workerResult.InternalPeekCompleted) { workerResult.InternalWaitForCompletion(); } if (workerResult.Result is Exception) { throw (Exception)(workerResult.Result); } } else { Transport.EndWrite(asyncResult); } } /* // Consider removing. public void Close() { Transport.Close(); } */ } // // This class wraps an Async IO request // It is based on our internal LazyAsyncResult helper // - If ParentResult is not null then the base class (LazyAsyncResult) methods must not be used // // - If ParentResult == null, then real user IO request is wrapped // /* // Consider removing. internal delegate void WorkerCallback(WorkerAsyncResult result); */ internal class WorkerAsyncResult : LazyAsyncResult { public byte[] Buffer; public int Offset; public int End; public bool IsWrite; public WorkerAsyncResult ParentResult; /* // Consider removing. public WorkerCallback StepDoneCallback; */ public bool HeaderDone; // This migth be reworked so we read both header and frame in one chunk public bool HandshakeDone; public WorkerAsyncResult(object asyncObject, object asyncState, AsyncCallback savedAsyncCallback, byte[] buffer, int offset, int end) : base( asyncObject, asyncState, savedAsyncCallback) { Buffer = buffer; Offset = offset; End = end; } /* // Consider removing. public WorkerAsyncResult(WorkerAsyncResult parentResult, byte[] buffer, int offset, int end) : base(null, null, null) { ParentResult = parentResult; Buffer = buffer; Offset = offset; End = end; } */ } // This guy describes the header used in framing of the stream data. internal class FrameHeader { public const int IgnoreValue = -1; public const int HandshakeDoneId= 20; public const int HandshakeErrId = 21; public const int HandshakeId = 22; public const int DefaultMajorV = 1; public const int DefaultMinorV = 0; private int _MessageId; private int _MajorV; private int _MinorV; private int _PayloadSize; public FrameHeader () { _MessageId = HandshakeId; _MajorV = DefaultMajorV; _MinorV = DefaultMinorV; _PayloadSize = -1; } public FrameHeader (int messageId, int majorV, int minorV) { _MessageId = messageId; _MajorV = majorV; _MinorV = minorV; _PayloadSize = -1; } /* // Consider removing. public FrameHeader Clone() { return new FrameHeader(_MessageId, _MajorV, _MinorV); } */ public int Size { get { return 5; } } public int MaxMessageSize { get { return 0xFFFF; } } public int MessageId { get { return _MessageId; } set { _MessageId = value; } } public int MajorV { get { return _MajorV; } } public int MinorV { get { return _MinorV; } } public int PayloadSize { get { return _PayloadSize; } set { if (value > MaxMessageSize) { throw new ArgumentException(SR.GetString(SR.net_frame_max_size, MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), value.ToString(NumberFormatInfo.InvariantInfo)), "PayloadSize"); } _PayloadSize = value; } } public void CopyTo(byte[] dest, int start) { dest[start++] = (byte)_MessageId; dest[start++] = (byte)_MajorV; dest[start++] = (byte)_MinorV; dest[start++] = (byte)((_PayloadSize >> 8) & 0xFF); dest[start] = (byte)(_PayloadSize & 0xFF); } public void CopyFrom(byte[] bytes, int start, FrameHeader verifier) { _MessageId = bytes[start++]; _MajorV = bytes[start++]; _MinorV = bytes[start++]; _PayloadSize = (int) ((bytes[start++]<<8) | bytes[start]); if (verifier.MessageId != FrameHeader.IgnoreValue && MessageId != verifier.MessageId) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MessageId", MessageId, verifier.MessageId)); } if (verifier.MajorV != FrameHeader.IgnoreValue && MajorV != verifier.MajorV) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MajorV", MajorV, verifier.MajorV)); } if (verifier.MinorV != FrameHeader.IgnoreValue && MinorV != verifier.MinorV) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MinorV", MinorV, verifier.MinorV)); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Invariant.cs
- ManagedWndProcTracker.cs
- RetrieveVirtualItemEventArgs.cs
- Evaluator.cs
- Translator.cs
- Parameter.cs
- Base64Encoder.cs
- CornerRadius.cs
- StringUtil.cs
- Pen.cs
- BinaryQueryOperator.cs
- Accessible.cs
- NonVisualControlAttribute.cs
- WebPartDisplayModeCancelEventArgs.cs
- FunctionCommandText.cs
- ExpressionVisitor.cs
- FixedTextContainer.cs
- SubqueryRules.cs
- DataGridCellsPresenter.cs
- DropSource.cs
- DesignerTransactionCloseEvent.cs
- Int16KeyFrameCollection.cs
- SqlDataSourceCustomCommandEditor.cs
- ScrollableControl.cs
- LogicalTreeHelper.cs
- CodeCatchClause.cs
- HttpCapabilitiesEvaluator.cs
- ChtmlTextWriter.cs
- Int64AnimationUsingKeyFrames.cs
- SqlServer2KCompatibilityAnnotation.cs
- _NTAuthentication.cs
- IMembershipProvider.cs
- Mappings.cs
- TextSchema.cs
- TagPrefixCollection.cs
- SoapIgnoreAttribute.cs
- ConstructorExpr.cs
- RelationshipWrapper.cs
- ToolStripContentPanelRenderEventArgs.cs
- IsolationInterop.cs
- DictionaryBase.cs
- TextureBrush.cs
- ObservableCollection.cs
- TempFiles.cs
- Material.cs
- ProtocolsConfiguration.cs
- PageRanges.cs
- Win32SafeHandles.cs
- XmlObjectSerializerReadContextComplex.cs
- IncrementalReadDecoders.cs
- XmlTextWriter.cs
- MarkupExtensionReturnTypeAttribute.cs
- XmlBinaryWriterSession.cs
- SqlDataSourceSummaryPanel.cs
- RowType.cs
- SimpleTextLine.cs
- ExpressionWriter.cs
- ListViewItem.cs
- OdbcConnectionOpen.cs
- entityreference_tresulttype.cs
- GlyphTypeface.cs
- ApplicationInfo.cs
- PageEventArgs.cs
- GridEntry.cs
- GenerateHelper.cs
- InternalBase.cs
- FilterElement.cs
- Section.cs
- RuntimeIdentifierPropertyAttribute.cs
- PageResolution.cs
- ConsoleCancelEventArgs.cs
- CacheHelper.cs
- As.cs
- AsyncResult.cs
- PrivilegedConfigurationManager.cs
- COM2ICategorizePropertiesHandler.cs
- SelectionProviderWrapper.cs
- StreamUpgradeProvider.cs
- WmfPlaceableFileHeader.cs
- SafeArrayRankMismatchException.cs
- ColorPalette.cs
- MeshGeometry3D.cs
- EntityDataSourceQueryBuilder.cs
- MultiPropertyDescriptorGridEntry.cs
- PlatformCulture.cs
- UTF32Encoding.cs
- StringExpressionSet.cs
- XmlSchemaSimpleType.cs
- ApplicationId.cs
- TrackingMemoryStream.cs
- MenuItemStyle.cs
- ColorTransform.cs
- Paragraph.cs
- XamlGridLengthSerializer.cs
- WebAdminConfigurationHelper.cs
- ImageFormatConverter.cs
- RecommendedAsConfigurableAttribute.cs
- ComponentCommands.cs
- ListControlBoundActionList.cs
- DataSetMappper.cs