Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Net / System / Net / _StreamFramer.cs / 1305376 / _StreamFramer.cs
/*++ Copyright (c) 2000 Microsoft Corporation Module Name: _StreamFramer.cs Abstract: Author: Mauro Ottaviani original implementation Alexei Vopilov 20-Jul-2002 made it generic enough (still not perfect, consider IStreamFramer interface) Revision History: --*/ namespace System.Net { using System; using System.IO; using System.Runtime.InteropServices; using System.Threading; using System.ComponentModel; using System.Globalization; using System.Net; using System.Net.Sockets; internal class StreamFramer { private Stream m_Transport; private bool m_Eof; private FrameHeader m_WriteHeader = new FrameHeader(); private FrameHeader m_CurReadHeader = new FrameHeader(); private FrameHeader m_ReadVerifier = new FrameHeader(FrameHeader.IgnoreValue, FrameHeader.IgnoreValue, FrameHeader.IgnoreValue); //private const int c_DefaultBufferSize = 1024; //private int m_BufferSize = c_DefaultBufferSize; //private byte[] m_ReadBuffer = new byte[FrameHeader.SizeOf + m_BufferSize]; //private int m_CurReadOffset; private byte[] m_ReadHeaderBuffer; private byte[] m_WriteHeaderBuffer; private readonly AsyncCallback m_ReadFrameCallback; private readonly AsyncCallback m_BeginWriteCallback; private NetworkStream m_NetworkStream; //optimizing writes public StreamFramer(Stream Transport) { if (Transport == null || Transport == Stream.Null) { throw new ArgumentNullException("Transport"); } m_Transport = Transport; if(m_Transport.GetType() == typeof(NetworkStream)){ m_NetworkStream = Transport as NetworkStream; } m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size]; m_WriteHeaderBuffer = new byte[m_WriteHeader.Size]; m_ReadFrameCallback = new AsyncCallback(ReadFrameCallback); m_BeginWriteCallback = new AsyncCallback(BeginWriteCallback); } /* // Consider removing. public FrameHeader m_ReadVerifierHeader { get { return m_ReadVerifier; } // May not be called while IO is in progress set { m_ReadVerifier = value; m_CurReadHeader = m_ReadVerifier.Clone(); m_ReadHeaderBuffer = new byte[m_CurReadHeader.Size]; } } */ public FrameHeader ReadHeader { get { return m_CurReadHeader; } } public FrameHeader WriteHeader { get { return m_WriteHeader; } /* // Consider removing. // May not be called while IO is in progress set { m_WriteHeader = value; m_WriteHeaderBuffer = new byte[m_WriteHeader.Size]; } */ } public Stream Transport { get { return m_Transport; } } /* // Consider removing. public bool EndOfFile { get { return m_Eof; } } */ /* // Consider removing. public bool CanRead { get { return Transport.CanRead; } } */ /* // Consider removing. public bool CanWrite { get { return Transport.CanWrite; } } */ public byte[] ReadMessage() { if (m_Eof) { return null; } int offset = 0; byte[] buffer = m_ReadHeaderBuffer; int bytesRead; while (offset < buffer.Length) { bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); if (bytesRead == 0) { if (offset == 0) { // m_Eof, return null m_Eof = true; return null; } else { throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); } } offset += bytesRead; } m_CurReadHeader.CopyFrom(buffer, 0, m_ReadVerifier); if (m_CurReadHeader.PayloadSize > m_CurReadHeader.MaxMessageSize) { throw new InvalidOperationException(SR.GetString(SR.net_frame_size, m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), m_CurReadHeader.PayloadSize.ToString(NumberFormatInfo.InvariantInfo))); } buffer = new byte[m_CurReadHeader.PayloadSize]; offset = 0; while (offset < buffer.Length) { bytesRead = Transport.Read(buffer, offset, buffer.Length - offset); if (bytesRead == 0) { throw new IOException(SR.GetString(SR.net_io_readfailure, SR.GetString(SR.net_io_connectionclosed))); } offset += bytesRead; } return buffer; } public IAsyncResult BeginReadMessage(AsyncCallback asyncCallback, object stateObject) { WorkerAsyncResult workerResult; if (m_Eof){ workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, null, 0, 0); workerResult.InvokeCallback(-1); return workerResult; } workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length); IAsyncResult result = Transport.BeginRead(m_ReadHeaderBuffer, 0, m_ReadHeaderBuffer.Length, m_ReadFrameCallback, workerResult); if (result.CompletedSynchronously) { ReadFrameComplete(result); } return workerResult; } private void ReadFrameCallback(IAsyncResult transportResult) { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName); if (transportResult.CompletedSynchronously) { return; } WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; try { ReadFrameComplete(transportResult); } catch (Exception e) { if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { throw; } if (!(e is IOException)) { e = new System.IO.IOException(SR.GetString(SR.net_io_readfailure, e.Message), e); } // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(e); } } // IO COMPLETION CALLBACK // // This callback is responsible for getting complete protocol frame // First, it reads the header // Second, it determines the frame size // Third, loops while not all frame received or an error. // private void ReadFrameComplete(IAsyncResult transportResult) { do { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::ReadFrameComplete|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.GetType().FullName); WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; int bytesRead = Transport.EndRead(transportResult); workerResult.Offset += bytesRead; GlobalLog.Assert(workerResult.Offset <= workerResult.End, "StreamFramer::ReadFrameCallback|WRONG: offset - end = {0}", workerResult.Offset - workerResult.End); if (bytesRead <= 0) { // (by design) This indicates the stream has receives EOF // If we are in the middle of a Frame - fail, otherwise - produce EOF object result = null; if (!workerResult.HeaderDone && workerResult.Offset == 0) { result = (object)-1; } else { result = new System.IO.IOException(SR.GetString(SR.net_frame_read_io)); } workerResult.InvokeCallback(result); return; } if (workerResult.Offset >= workerResult.End) { if (!workerResult.HeaderDone) { workerResult.HeaderDone = true; // This indicates the header has been read succesfully m_CurReadHeader.CopyFrom(workerResult.Buffer, 0, m_ReadVerifier); int payloadSize = m_CurReadHeader.PayloadSize; if (payloadSize < 0) { // Let's call user callback and he call us back and we will throw workerResult.InvokeCallback(new System.IO.IOException(SR.GetString(SR.net_frame_read_size))); } if (payloadSize == 0) { // report emtpy frame (NOT eof!) to the caller, he might be interested in workerResult.InvokeCallback(0); return; } if (payloadSize > m_CurReadHeader.MaxMessageSize) { throw new InvalidOperationException(SR.GetString(SR.net_frame_size, m_CurReadHeader.MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), payloadSize.ToString(NumberFormatInfo.InvariantInfo))); } // Start reading the remaining frame data (note header does not count) byte[] frame = new byte[payloadSize]; // Save the ref of the data block workerResult.Buffer = frame; workerResult.End = frame.Length; workerResult.Offset = 0; // Transport.BeginRead below will pickup those changes } else { workerResult.HeaderDone = false; //reset for optional object reuse workerResult.InvokeCallback(workerResult.End); return; } } // This means we need more data to complete the data block transportResult = Transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, m_ReadFrameCallback, workerResult); } while(transportResult.CompletedSynchronously); } // // User will call this when workerResult gets signalled // // On Beginread User always gets back our WorkerAsyncResult // The Result property represents either a number of bytes read or an // exception put by our async state machine // public byte[] EndReadMessage(IAsyncResult asyncResult) { if (asyncResult==null) { throw new ArgumentNullException("asyncResult"); } WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; if (workerResult == null) { throw new ArgumentException(SR.GetString(SR.net_io_async_result, typeof(WorkerAsyncResult).FullName), "asyncResult"); } if (!workerResult.InternalPeekCompleted) { workerResult.InternalWaitForCompletion(); } if (workerResult.Result is Exception) { throw (Exception)(workerResult.Result); } int size = (int)workerResult.Result; if (size == -1) { m_Eof = true; return null; } else if (size == 0) { //empty frame return new byte[0]; } return workerResult.Buffer; } // // // // public void WriteMessage(byte[] message) { if (message == null) { throw new ArgumentNullException("message"); } m_WriteHeader.PayloadSize = message.Length; m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0); if (m_NetworkStream != null && message.Length != 0) { BufferOffsetSize[] buffers = new BufferOffsetSize[2]; buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); buffers[1] = new BufferOffsetSize(message, 0, message.Length, false); m_NetworkStream.MultipleWrite(buffers); } else { Transport.Write(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length); if (message.Length==0) { return; } Transport.Write(message, 0, message.Length); } } // // // // public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallback, object stateObject) { if (message == null) { throw new ArgumentNullException("message"); } m_WriteHeader.PayloadSize = message.Length; m_WriteHeader.CopyTo(m_WriteHeaderBuffer, 0); if (m_NetworkStream != null && message.Length != 0) { BufferOffsetSize[] buffers = new BufferOffsetSize[2]; buffers[0] = new BufferOffsetSize(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, false); buffers[1] = new BufferOffsetSize(message, 0, message.Length, false); return m_NetworkStream.BeginMultipleWrite(buffers, asyncCallback, stateObject); } if (message.Length == 0) { return Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, asyncCallback, stateObject); } //Will need two async writes // Prepare the second WorkerAsyncResult workerResult = new WorkerAsyncResult(this, stateObject, asyncCallback, message, 0, message.Length); // Charge the first IAsyncResult result = Transport.BeginWrite(m_WriteHeaderBuffer, 0, m_WriteHeaderBuffer.Length, m_BeginWriteCallback, workerResult); if (result.CompletedSynchronously) { BeginWriteComplete(result); } return workerResult; } private void BeginWriteCallback(IAsyncResult transportResult) { GlobalLog.Assert(transportResult.AsyncState is WorkerAsyncResult, "StreamFramer::BeginWriteCallback|The state expected to be WorkerAsyncResult, received:{0}.", transportResult.AsyncState.GetType().FullName); if (transportResult.CompletedSynchronously) { return; } WorkerAsyncResult workerResult = (WorkerAsyncResult) transportResult.AsyncState; try { BeginWriteComplete(transportResult); } catch (Exception e) { if (e is ThreadAbortException || e is StackOverflowException || e is OutOfMemoryException) { throw; } workerResult.InvokeCallback(e); } } // IO COMPLETION CALLBACK // // Called when user IO request was wrapped to do several underlined IO // private void BeginWriteComplete(IAsyncResult transportResult) { do { WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; //First, complete the previous portion write Transport.EndWrite(transportResult); //Check on exit criterion if (workerResult.Offset == workerResult.End) { workerResult.InvokeCallback(); return; } //setup exit criterion workerResult.Offset = workerResult.End; //Write next portion (frame body) using Async IO transportResult = Transport.BeginWrite(workerResult.Buffer, 0, workerResult.End, m_BeginWriteCallback, workerResult); } while (transportResult.CompletedSynchronously); } public void EndWriteMessage(IAsyncResult asyncResult) { if (asyncResult==null) { throw new ArgumentNullException("asyncResult"); } WorkerAsyncResult workerResult = asyncResult as WorkerAsyncResult; if (workerResult != null) { if (!workerResult.InternalPeekCompleted) { workerResult.InternalWaitForCompletion(); } if (workerResult.Result is Exception) { throw (Exception)(workerResult.Result); } } else { Transport.EndWrite(asyncResult); } } /* // Consider removing. public void Close() { Transport.Close(); } */ } // // This class wraps an Async IO request // It is based on our internal LazyAsyncResult helper // - If ParentResult is not null then the base class (LazyAsyncResult) methods must not be used // // - If ParentResult == null, then real user IO request is wrapped // /* // Consider removing. internal delegate void WorkerCallback(WorkerAsyncResult result); */ internal class WorkerAsyncResult : LazyAsyncResult { public byte[] Buffer; public int Offset; public int End; public bool IsWrite; public WorkerAsyncResult ParentResult; /* // Consider removing. public WorkerCallback StepDoneCallback; */ public bool HeaderDone; // This migth be reworked so we read both header and frame in one chunk public bool HandshakeDone; public WorkerAsyncResult(object asyncObject, object asyncState, AsyncCallback savedAsyncCallback, byte[] buffer, int offset, int end) : base( asyncObject, asyncState, savedAsyncCallback) { Buffer = buffer; Offset = offset; End = end; } /* // Consider removing. public WorkerAsyncResult(WorkerAsyncResult parentResult, byte[] buffer, int offset, int end) : base(null, null, null) { ParentResult = parentResult; Buffer = buffer; Offset = offset; End = end; } */ } // This guy describes the header used in framing of the stream data. internal class FrameHeader { public const int IgnoreValue = -1; public const int HandshakeDoneId= 20; public const int HandshakeErrId = 21; public const int HandshakeId = 22; public const int DefaultMajorV = 1; public const int DefaultMinorV = 0; private int _MessageId; private int _MajorV; private int _MinorV; private int _PayloadSize; public FrameHeader () { _MessageId = HandshakeId; _MajorV = DefaultMajorV; _MinorV = DefaultMinorV; _PayloadSize = -1; } public FrameHeader (int messageId, int majorV, int minorV) { _MessageId = messageId; _MajorV = majorV; _MinorV = minorV; _PayloadSize = -1; } /* // Consider removing. public FrameHeader Clone() { return new FrameHeader(_MessageId, _MajorV, _MinorV); } */ public int Size { get { return 5; } } public int MaxMessageSize { get { return 0xFFFF; } } public int MessageId { get { return _MessageId; } set { _MessageId = value; } } public int MajorV { get { return _MajorV; } } public int MinorV { get { return _MinorV; } } public int PayloadSize { get { return _PayloadSize; } set { if (value > MaxMessageSize) { throw new ArgumentException(SR.GetString(SR.net_frame_max_size, MaxMessageSize.ToString(NumberFormatInfo.InvariantInfo), value.ToString(NumberFormatInfo.InvariantInfo)), "PayloadSize"); } _PayloadSize = value; } } public void CopyTo(byte[] dest, int start) { dest[start++] = (byte)_MessageId; dest[start++] = (byte)_MajorV; dest[start++] = (byte)_MinorV; dest[start++] = (byte)((_PayloadSize >> 8) & 0xFF); dest[start] = (byte)(_PayloadSize & 0xFF); } public void CopyFrom(byte[] bytes, int start, FrameHeader verifier) { _MessageId = bytes[start++]; _MajorV = bytes[start++]; _MinorV = bytes[start++]; _PayloadSize = (int) ((bytes[start++]<<8) | bytes[start]); if (verifier.MessageId != FrameHeader.IgnoreValue && MessageId != verifier.MessageId) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MessageId", MessageId, verifier.MessageId)); } if (verifier.MajorV != FrameHeader.IgnoreValue && MajorV != verifier.MajorV) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MajorV", MajorV, verifier.MajorV)); } if (verifier.MinorV != FrameHeader.IgnoreValue && MinorV != verifier.MinorV) { throw new InvalidOperationException(SR.GetString(SR.net_io_header_id, "MinorV", MinorV, verifier.MinorV)); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- JsonServiceDocumentSerializer.cs
- InstalledVoice.cs
- WebBodyFormatMessageProperty.cs
- GridPattern.cs
- XmlLoader.cs
- TextDocumentView.cs
- ParentControlDesigner.cs
- CommittableTransaction.cs
- MethodCallExpression.cs
- TcpSocketManager.cs
- FillRuleValidation.cs
- Cursor.cs
- _SSPISessionCache.cs
- ObjectStateEntryOriginalDbUpdatableDataRecord.cs
- ServiceChannelFactory.cs
- RayHitTestParameters.cs
- _TransmitFileOverlappedAsyncResult.cs
- WindowsPen.cs
- RC2.cs
- LayoutExceptionEventArgs.cs
- MouseButton.cs
- ComponentResourceKey.cs
- _SslSessionsCache.cs
- BaseParser.cs
- CookielessHelper.cs
- CollectionViewGroup.cs
- OleDbMetaDataFactory.cs
- precedingquery.cs
- PropertyIDSet.cs
- EventPrivateKey.cs
- RadioButton.cs
- MethodBuilder.cs
- XmlQualifiedNameTest.cs
- PathFigureCollectionConverter.cs
- FileAuthorizationModule.cs
- RuntimeWrappedException.cs
- ProfileProvider.cs
- IntSecurity.cs
- MenuItemStyle.cs
- X509ServiceCertificateAuthenticationElement.cs
- CounterCreationData.cs
- SQLConvert.cs
- PolyLineSegment.cs
- SmiEventSink_Default.cs
- RecommendedAsConfigurableAttribute.cs
- CodeSnippetStatement.cs
- TargetConverter.cs
- LinkUtilities.cs
- GotoExpression.cs
- SafeRegistryKey.cs
- BufferModeSettings.cs
- SchemaTypeEmitter.cs
- XhtmlBasicValidationSummaryAdapter.cs
- CodeTryCatchFinallyStatement.cs
- URLString.cs
- DataGridParentRows.cs
- RenderDataDrawingContext.cs
- SelectionEditor.cs
- ReadOnlyCollection.cs
- TokenizerHelper.cs
- MULTI_QI.cs
- xsdvalidator.cs
- FileFormatException.cs
- TextContainer.cs
- BinaryFormatterWriter.cs
- AQNBuilder.cs
- DelegatedStream.cs
- DataGridViewCellStateChangedEventArgs.cs
- mediaeventargs.cs
- ConstraintCollection.cs
- AnimationException.cs
- ConsoleCancelEventArgs.cs
- IList.cs
- MenuScrollingVisibilityConverter.cs
- XmlSchemaComplexContent.cs
- PrintPreviewControl.cs
- VirtualPathProvider.cs
- ArgIterator.cs
- HostedAspNetEnvironment.cs
- LocatorPart.cs
- StatusStrip.cs
- FontStretch.cs
- Control.cs
- RootBuilder.cs
- TablePattern.cs
- Path.cs
- CodeGeneratorOptions.cs
- HtmlInputCheckBox.cs
- CopyNamespacesAction.cs
- NamespaceCollection.cs
- ReflectPropertyDescriptor.cs
- PackagePartCollection.cs
- DesignerLabelAdapter.cs
- StrongNamePublicKeyBlob.cs
- DPTypeDescriptorContext.cs
- AccessDataSourceView.cs
- DataGridViewColumnEventArgs.cs
- ConvertTextFrag.cs
- CuspData.cs
- ShaderEffect.cs