Code:
/ 4.0 / 4.0 / untmp / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / fx / src / Services / Monitoring / system / Diagnosticts / AsyncStreamReader.cs / 1305376 / AsyncStreamReader.cs
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
/*============================================================
**
** Class: AsyncStreamReader
**
** Purpose: For reading text from streams using a particular
** encoding in an asychronous manner used by the process class
**
**
===========================================================*/
namespace System.Diagnostics {
using System;
using System.IO;
using System.Text;
using System.Runtime.InteropServices;
using System.Threading;
using System.Collections;
internal delegate void UserCallBack(String data);
internal class AsyncStreamReader : IDisposable
{
internal const int DefaultBufferSize = 1024; // Byte buffer size
private const int MinBufferSize = 128;
private Stream stream;
private Encoding encoding;
private Decoder decoder;
private byte[] byteBuffer;
private char[] charBuffer;
// Record the number of valid bytes in the byteBuffer, for a few checks.
// This is the maximum number of chars we can get from one call to
// ReadBuffer. Used so ReadBuffer can tell when to copy data into
// a user's char[] directly, instead of our internal char[].
private int _maxCharsPerBuffer;
// Store a backpointer to the process class, to check for user callbacks
private Process process;
// Delegate to call user function.
private UserCallBack userCallBack;
// Internal Cancel operation
private bool cancelOperation;
private ManualResetEvent eofEvent;
private Queue messageQueue;
private StringBuilder sb;
private bool bLastCarriageReturn;
internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding)
: this(process, stream, callback, encoding, DefaultBufferSize) {
}
// Creates a new AsyncStreamReader for the given stream. The
// character encoding is set by encoding and the buffer size,
// in number of 16-bit characters, is set by bufferSize.
//
internal AsyncStreamReader(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize)
{
Debug.Assert (process != null && stream !=null && encoding !=null && callback != null, "Invalid arguments!");
Debug.Assert(stream.CanRead, "Stream must be readable!");
Debug.Assert(bufferSize > 0, "Invalid buffer size!");
Init(process, stream, callback, encoding, bufferSize);
messageQueue = new Queue();
}
private void Init(Process process, Stream stream, UserCallBack callback, Encoding encoding, int bufferSize) {
this.process = process;
this.stream = stream;
this.encoding = encoding;
this.userCallBack = callback;
decoder = encoding.GetDecoder();
if (bufferSize < MinBufferSize) bufferSize = MinBufferSize;
byteBuffer = new byte[bufferSize];
_maxCharsPerBuffer = encoding.GetMaxCharCount(bufferSize);
charBuffer = new char[_maxCharsPerBuffer];
cancelOperation = false;
eofEvent = new ManualResetEvent(false);
sb = null;
this.bLastCarriageReturn = false;
}
public virtual void Close()
{
Dispose(true);
}
void IDisposable.Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing) {
if (stream != null)
stream.Close();
}
if (stream != null) {
stream = null;
encoding = null;
decoder = null;
byteBuffer = null;
charBuffer = null;
}
if( eofEvent != null) {
eofEvent.Close();
eofEvent = null;
}
}
public virtual Encoding CurrentEncoding {
get { return encoding; }
}
public virtual Stream BaseStream {
get { return stream; }
}
// User calls BeginRead to start the asynchronous read
internal void BeginReadLine() {
if( cancelOperation) {
cancelOperation = false;
}
if( sb == null ) {
sb = new StringBuilder(DefaultBufferSize);
stream.BeginRead(byteBuffer, 0 , byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
}
else {
FlushMessageQueue();
}
}
internal void CancelOperation() {
cancelOperation = true;
}
// This is the async callback function. Only one thread could/should call this.
private void ReadBuffer(IAsyncResult ar) {
int byteLen;
try {
byteLen = stream.EndRead(ar);
}
catch (IOException ) {
// We should ideally consume errors from operations getting cancelled
// so that we don't crash the unsuspecting parent with an unhandled exc.
// This seems to come in 2 forms of exceptions (depending on platform and scenario),
// namely OperationCanceledException and IOException (for errorcode that we don't
// map explicitly).
byteLen = 0; // Treat this as EOF
}
catch (OperationCanceledException ) {
// We should consume any OperationCanceledException from child read here
// so that we don't crash the parent with an unhandled exc
byteLen = 0; // Treat this as EOF
}
if (byteLen == 0) {
// We're at EOF, we won't call this function again from here on.
lock(messageQueue) {
if( sb.Length != 0) {
messageQueue.Enqueue(sb.ToString());
sb.Length = 0;
}
messageQueue.Enqueue(null);
}
try {
// UserCallback could throw, we should still set the eofEvent
FlushMessageQueue();
}
finally {
eofEvent.Set();
}
} else {
int charLen = decoder.GetChars(byteBuffer, 0, byteLen, charBuffer, 0);
sb.Append(charBuffer, 0, charLen);
GetLinesFromStringBuilder();
stream.BeginRead(byteBuffer, 0 , byteBuffer.Length, new AsyncCallback(ReadBuffer), null);
}
}
// Read lines stored in StringBuilder and the buffer we just read into.
// A line is defined as a sequence of characters followed by
// a carriage return ('\r'), a line feed ('\n'), or a carriage return
// immediately followed by a line feed. The resulting string does not
// contain the terminating carriage return and/or line feed. The returned
// value is null if the end of the input stream has been reached.
//
private void GetLinesFromStringBuilder() {
int i = 0;
int lineStart = 0;
int len = sb.Length;
// skip a beginning '\n' character of new block if last block ended
// with '\r'
if (bLastCarriageReturn && (len > 0) && sb[0] == '\n')
{
i = 1;
lineStart = 1;
bLastCarriageReturn = false;
}
while(i < len) {
char ch = sb[i];
// Note the following common line feed chars:
// \n - UNIX \r\n - DOS \r - Mac
if (ch == '\r' || ch == '\n') {
string s = sb.ToString(lineStart, i - lineStart);
lineStart = i + 1;
// skip the "\n" character following "\r" character
if ((ch == '\r') && (lineStart < len) && (sb[lineStart] == '\n'))
{
lineStart++;
i++;
}
lock(messageQueue) {
messageQueue.Enqueue(s);
}
}
i++;
}
if (sb[len - 1] == '\r') {
bLastCarriageReturn = true;
}
// Keep the rest characaters which can't form a new line in string builder.
if( lineStart < len) {
sb.Remove(0, lineStart);
}
else {
sb.Length = 0;
}
FlushMessageQueue();
}
private void FlushMessageQueue() {
while(true) {
// When we call BeginReadLine, we also need to flush the queue
// So there could be a ---- between the ReadBuffer and BeginReadLine
// We need to take lock before DeQueue.
if( messageQueue.Count > 0) {
lock(messageQueue) {
if( messageQueue.Count > 0) {
string s = (string)messageQueue.Dequeue();
// skip if the read is the read is cancelled
// this might happen inside UserCallBack
// However, continue to drain the queue
if (!cancelOperation)
{
userCallBack(s);
}
}
}
}
else {
break;
}
}
}
// Wait until we hit EOF. This is called from Process.WaitForExit
// We will lose some information if we don't do this.
internal void WaitUtilEOF() {
if( eofEvent != null) {
eofEvent.WaitOne();
eofEvent.Close();
eofEvent = null;
}
}
}
}
// File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- DBSchemaTable.cs
- SecurityIdentifierConverter.cs
- UserInitiatedNavigationPermission.cs
- _TransmitFileOverlappedAsyncResult.cs
- HtmlInputSubmit.cs
- CodeTryCatchFinallyStatement.cs
- BasicSecurityProfileVersion.cs
- Error.cs
- MappingMetadataHelper.cs
- NestedContainer.cs
- DataGridViewCellErrorTextNeededEventArgs.cs
- ViewKeyConstraint.cs
- TrackBar.cs
- InvalidComObjectException.cs
- InvokeProviderWrapper.cs
- UserPersonalizationStateInfo.cs
- WebEventTraceProvider.cs
- SessionStateSection.cs
- WindowsAltTab.cs
- BaseParser.cs
- SuppressMergeCheckAttribute.cs
- GeometryDrawing.cs
- ItemTypeToolStripMenuItem.cs
- Matrix.cs
- UnmanagedMemoryStreamWrapper.cs
- TransformerInfo.cs
- BigInt.cs
- ComponentEvent.cs
- SiteMap.cs
- TextCompositionEventArgs.cs
- ContextStaticAttribute.cs
- securitycriticaldataClass.cs
- HMACSHA256.cs
- SafeFileMapViewHandle.cs
- mactripleDES.cs
- DataObjectMethodAttribute.cs
- SettingsAttributes.cs
- SemanticResultValue.cs
- DependencyPropertyConverter.cs
- CodeConstructor.cs
- CommonDialog.cs
- HttpCapabilitiesSectionHandler.cs
- ConnectionProviderAttribute.cs
- ServiceReference.cs
- RawUIStateInputReport.cs
- MobileRedirect.cs
- CodeDirectoryCompiler.cs
- DataBindingExpressionBuilder.cs
- coordinator.cs
- GridViewDeletedEventArgs.cs
- DesignTimeVisibleAttribute.cs
- TableChangeProcessor.cs
- HtmlInputImage.cs
- X509CertificateValidator.cs
- DataViewManager.cs
- XmlDataLoader.cs
- X509RawDataKeyIdentifierClause.cs
- XmlDataSourceView.cs
- DialogWindow.cs
- CornerRadiusConverter.cs
- LocatorPart.cs
- TimeSpanSecondsConverter.cs
- ColorAnimationUsingKeyFrames.cs
- SqlBooleanMismatchVisitor.cs
- OrCondition.cs
- BaseParser.cs
- MetadataArtifactLoaderResource.cs
- StringDictionary.cs
- IdentityReference.cs
- DictationGrammar.cs
- ScalarConstant.cs
- URLIdentityPermission.cs
- WinFormsUtils.cs
- IgnoreFileBuildProvider.cs
- StylusPointPropertyInfoDefaults.cs
- datacache.cs
- DataGridState.cs
- CacheDependency.cs
- SemanticAnalyzer.cs
- Atom10FormatterFactory.cs
- MenuAdapter.cs
- FixedPageStructure.cs
- IdentityNotMappedException.cs
- AnonymousIdentificationSection.cs
- UserControlCodeDomTreeGenerator.cs
- XmlNamespaceMapping.cs
- DeclarativeConditionsCollection.cs
- QuarticEase.cs
- LineUtil.cs
- ColumnResult.cs
- CollectionViewGroup.cs
- ComplexTypeEmitter.cs
- EntryWrittenEventArgs.cs
- InfoCardTrace.cs
- ServerIdentity.cs
- DataObjectCopyingEventArgs.cs
- HttpValueCollection.cs
- KnownColorTable.cs
- CapabilitiesSection.cs
- ToolStripDropDownItem.cs