Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / BufferedOutputAsyncStream.cs / 1 / BufferedOutputAsyncStream.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.Diagnostics; using System.Globalization; using System.IO; using System.ServiceModel; using System.Collections; using System.Collections.Generic; using System.Threading; ////// /// BufferedOutputAsyncStream is used for writing streamed response. /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk without a delay. /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without /// waiting for all outstanding BeginWrites to complete. /// /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream /// 1. allow concurrent IO (for multiple BeginWrite calls) /// 2. support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern. /// /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements. /// /// class BufferedOutputAsyncStream : Stream { Stream stream; int bufferSize; int bufferLimit; Listbuffers; int currentIndex; internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit) { this.stream = stream; this.bufferSize = bufferSize; this.bufferLimit = bufferLimit; this.buffers = new List (); this.buffers.Add(new ByteBuffer(this.bufferSize, stream)); this.currentIndex = 0; } public override bool CanRead { get { return false; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return stream.CanWrite; } } ByteBuffer CurrentBuffer { get { return buffers[currentIndex]; } } public override long Length { get { #pragma warning suppress 56503 // [....], required by the Stream.Length contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } } public override long Position { get { #pragma warning suppress 56503 // [....], required by the Stream.Position contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } set { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } } public override void Close() { CurrentBuffer.Flush(); stream.Close(); // Complete all outstanding writes WaitForAllWritesToComplete(); } public override void Flush() { CurrentBuffer.Flush(); stream.Flush(); } void NextBuffer() { currentIndex++; if (currentIndex == buffers.Count) { if (buffers.Count < bufferLimit) { // allocate new buffer buffers.Add(new ByteBuffer(bufferSize, stream)); return; } currentIndex = 0; } DiagnosticUtility.DebugAssert(currentIndex >= 0 && currentIndex < buffers.Count, string.Format(CultureInfo.InvariantCulture, "The value ({0}) must be greater than or equal to zero and less than {1}", currentIndex, buffers.Count)); // Make Sure that the buffer is ready to receive data CurrentBuffer.WaitForWriteComplete(); } public override int Read(byte[] buffer, int offset, int count) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override int ReadByte() { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override long Seek(long offset, SeekOrigin origin) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } public override void SetLength(long value) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } void WaitForAllWritesToComplete() { // Complete all outstanding writes for (int i = 0; i < buffers.Count; i++) { buffers[i].WaitForWriteComplete(); } } public override void Write(byte[] buffer, int offset, int count) { while (count > 0) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } int freeBytes = CurrentBuffer.FreeBytes; // space left in the CurrentBuffer if (freeBytes > 0) { if (freeBytes > count) freeBytes = count; CurrentBuffer.CopyData(buffer, offset, freeBytes); offset += freeBytes; count -= freeBytes; } if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } } public override void WriteByte(byte value) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } CurrentBuffer.CopyData(value); if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } class ByteBuffer { byte[] bytes; int position = 0; Stream stream; static AsyncCallback writeCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(WriteCallback)); bool writePending = false; bool waiting = false; Exception completionException; internal ByteBuffer(int bufferSize, Stream stream) { this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize); this.stream = stream; } internal bool IsWritePending { get { return writePending; } } object ThisLock { get { return this; } } internal int FreeBytes { get { return this.bytes.Length - this.position; } } internal void Flush() { if (this.position <= 0) return; int bytesToWrite = this.position; this.writePending = true; this.position = 0; IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, writeCallback, this); if (asyncResult.CompletedSynchronously) { this.stream.EndWrite(asyncResult); this.writePending = false; } } static void WriteCallback(IAsyncResult result) { if (result.CompletedSynchronously) return; // Fetch our state information: ByteBuffer ByteBuffer buffer = (ByteBuffer)result.AsyncState; try { buffer.stream.EndWrite(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } buffer.completionException = e; } // Tell the main thread we've finished. lock (buffer.ThisLock) { buffer.writePending = false; // Do not Pulse if no one is waiting, to avoid the overhead of Pulse if (!buffer.waiting) return; Monitor.Pulse(buffer.ThisLock); } } internal void WaitForWriteComplete() { lock (ThisLock) { if (this.writePending) { // Wait until the async write of this buffer is finished. this.waiting = true; Monitor.Wait(ThisLock); this.waiting = false; } } // Raise exception if necessary if (this.completionException != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException); } } internal void CopyData(byte[] buffer, int offset, int count) { DiagnosticUtility.DebugAssert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position)); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count); this.position += count; } internal void CopyData(byte value) { DiagnosticUtility.DebugAssert(this.position < this.bytes.Length, "Buffer is full"); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); this.bytes[this.position++] = value; } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Item.cs
- InputProviderSite.cs
- RefreshEventArgs.cs
- DataGridView.cs
- ApplicationFileCodeDomTreeGenerator.cs
- DomainUpDown.cs
- UxThemeWrapper.cs
- ShaderEffect.cs
- WorkflowWebService.cs
- XmlEncoding.cs
- TextPatternIdentifiers.cs
- FileReader.cs
- SubclassTypeValidatorAttribute.cs
- TrackingMemoryStreamFactory.cs
- SynchronizedChannelCollection.cs
- DataRecordObjectView.cs
- WindowsListViewScroll.cs
- SourceFilter.cs
- WebPartZone.cs
- coordinatorfactory.cs
- WmpBitmapDecoder.cs
- Literal.cs
- ProtocolElementCollection.cs
- HttpStreamXmlDictionaryReader.cs
- SystemResourceKey.cs
- WinCategoryAttribute.cs
- MessageDescriptionCollection.cs
- DesigntimeLicenseContextSerializer.cs
- EntityConnectionStringBuilder.cs
- QueryContext.cs
- XPathChildIterator.cs
- DragStartedEventArgs.cs
- ValidationRuleCollection.cs
- HttpResponseHeader.cs
- GrammarBuilderWildcard.cs
- DesignerEditorPartChrome.cs
- FreezableOperations.cs
- WebRequestModuleElement.cs
- CompilerState.cs
- StateInitialization.cs
- HttpListenerException.cs
- EntityDataSourceEntityTypeFilterItem.cs
- ComponentManagerBroker.cs
- ColorTransformHelper.cs
- GroupBoxRenderer.cs
- ResXResourceReader.cs
- LinkGrep.cs
- XmlSerializer.cs
- EventWaitHandle.cs
- EntityDataSourceDesignerHelper.cs
- DesignBinding.cs
- CallbackHandler.cs
- SafePEFileHandle.cs
- TypeConstant.cs
- SafePointer.cs
- EdmComplexTypeAttribute.cs
- SqlTrackingWorkflowInstance.cs
- LicenseContext.cs
- DependencyPropertyConverter.cs
- Math.cs
- xmlsaver.cs
- _emptywebproxy.cs
- QuaternionRotation3D.cs
- PointAnimationClockResource.cs
- EntityConnection.cs
- EndpointInfoCollection.cs
- figurelengthconverter.cs
- AppearanceEditorPart.cs
- SafeTimerHandle.cs
- MissingFieldException.cs
- WindowsFormsSectionHandler.cs
- SessionStateContainer.cs
- DebuggerAttributes.cs
- DbMetaDataFactory.cs
- SharedStatics.cs
- BamlLocalizabilityResolver.cs
- ContentElement.cs
- SafeHandles.cs
- BoundingRectTracker.cs
- StringStorage.cs
- MatcherBuilder.cs
- IgnoreSection.cs
- MSAAWinEventWrap.cs
- _SSPIWrapper.cs
- SafeMILHandle.cs
- SqlDataAdapter.cs
- PropertyMapper.cs
- DetailsViewPagerRow.cs
- TraceLevelStore.cs
- PolyBezierSegment.cs
- XPathChildIterator.cs
- QilVisitor.cs
- TextParaLineResult.cs
- JoinTreeNode.cs
- TdsEnums.cs
- MembershipSection.cs
- ValidatingReaderNodeData.cs
- HtmlControl.cs
- Animatable.cs
- ToolStripContextMenu.cs