Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / BufferedOutputAsyncStream.cs / 1 / BufferedOutputAsyncStream.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System; using System.Diagnostics; using System.Globalization; using System.IO; using System.ServiceModel; using System.Collections; using System.Collections.Generic; using System.Threading; ////// /// BufferedOutputAsyncStream is used for writing streamed response. /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk without a delay. /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without /// waiting for all outstanding BeginWrites to complete. /// /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream /// 1. allow concurrent IO (for multiple BeginWrite calls) /// 2. support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern. /// /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements. /// /// class BufferedOutputAsyncStream : Stream { Stream stream; int bufferSize; int bufferLimit; Listbuffers; int currentIndex; internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit) { this.stream = stream; this.bufferSize = bufferSize; this.bufferLimit = bufferLimit; this.buffers = new List (); this.buffers.Add(new ByteBuffer(this.bufferSize, stream)); this.currentIndex = 0; } public override bool CanRead { get { return false; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return stream.CanWrite; } } ByteBuffer CurrentBuffer { get { return buffers[currentIndex]; } } public override long Length { get { #pragma warning suppress 56503 // [....], required by the Stream.Length contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } } public override long Position { get { #pragma warning suppress 56503 // [....], required by the Stream.Position contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } set { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } } public override void Close() { CurrentBuffer.Flush(); stream.Close(); // Complete all outstanding writes WaitForAllWritesToComplete(); } public override void Flush() { CurrentBuffer.Flush(); stream.Flush(); } void NextBuffer() { currentIndex++; if (currentIndex == buffers.Count) { if (buffers.Count < bufferLimit) { // allocate new buffer buffers.Add(new ByteBuffer(bufferSize, stream)); return; } currentIndex = 0; } DiagnosticUtility.DebugAssert(currentIndex >= 0 && currentIndex < buffers.Count, string.Format(CultureInfo.InvariantCulture, "The value ({0}) must be greater than or equal to zero and less than {1}", currentIndex, buffers.Count)); // Make Sure that the buffer is ready to receive data CurrentBuffer.WaitForWriteComplete(); } public override int Read(byte[] buffer, int offset, int count) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override int ReadByte() { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override long Seek(long offset, SeekOrigin origin) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } public override void SetLength(long value) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } void WaitForAllWritesToComplete() { // Complete all outstanding writes for (int i = 0; i < buffers.Count; i++) { buffers[i].WaitForWriteComplete(); } } public override void Write(byte[] buffer, int offset, int count) { while (count > 0) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } int freeBytes = CurrentBuffer.FreeBytes; // space left in the CurrentBuffer if (freeBytes > 0) { if (freeBytes > count) freeBytes = count; CurrentBuffer.CopyData(buffer, offset, freeBytes); offset += freeBytes; count -= freeBytes; } if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } } public override void WriteByte(byte value) { if (CurrentBuffer.IsWritePending) { NextBuffer(); } CurrentBuffer.CopyData(value); if (CurrentBuffer.FreeBytes == 0) { CurrentBuffer.Flush(); } } class ByteBuffer { byte[] bytes; int position = 0; Stream stream; static AsyncCallback writeCallback = DiagnosticUtility.ThunkAsyncCallback(new AsyncCallback(WriteCallback)); bool writePending = false; bool waiting = false; Exception completionException; internal ByteBuffer(int bufferSize, Stream stream) { this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize); this.stream = stream; } internal bool IsWritePending { get { return writePending; } } object ThisLock { get { return this; } } internal int FreeBytes { get { return this.bytes.Length - this.position; } } internal void Flush() { if (this.position <= 0) return; int bytesToWrite = this.position; this.writePending = true; this.position = 0; IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, writeCallback, this); if (asyncResult.CompletedSynchronously) { this.stream.EndWrite(asyncResult); this.writePending = false; } } static void WriteCallback(IAsyncResult result) { if (result.CompletedSynchronously) return; // Fetch our state information: ByteBuffer ByteBuffer buffer = (ByteBuffer)result.AsyncState; try { buffer.stream.EndWrite(result); } #pragma warning suppress 56500 // [....], transferring exception to another thread catch (Exception e) { if (DiagnosticUtility.IsFatal(e)) { throw; } buffer.completionException = e; } // Tell the main thread we've finished. lock (buffer.ThisLock) { buffer.writePending = false; // Do not Pulse if no one is waiting, to avoid the overhead of Pulse if (!buffer.waiting) return; Monitor.Pulse(buffer.ThisLock); } } internal void WaitForWriteComplete() { lock (ThisLock) { if (this.writePending) { // Wait until the async write of this buffer is finished. this.waiting = true; Monitor.Wait(ThisLock); this.waiting = false; } } // Raise exception if necessary if (this.completionException != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException); } } internal void CopyData(byte[] buffer, int offset, int count) { DiagnosticUtility.DebugAssert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position)); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count); this.position += count; } internal void CopyData(byte value) { DiagnosticUtility.DebugAssert(this.position < this.bytes.Length, "Buffer is full"); DiagnosticUtility.DebugAssert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); this.bytes[this.position++] = value; } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- EditorPartCollection.cs
- Overlapped.cs
- CodeTypeOfExpression.cs
- ScrollProperties.cs
- XhtmlBasicObjectListAdapter.cs
- DefaultEvaluationContext.cs
- ConsumerConnectionPointCollection.cs
- DropShadowBitmapEffect.cs
- CompilerScope.cs
- SerializationInfoEnumerator.cs
- ImplicitInputBrush.cs
- ArrangedElementCollection.cs
- GridViewColumnCollectionChangedEventArgs.cs
- IntSecurity.cs
- DeliveryRequirementsAttribute.cs
- ExcCanonicalXml.cs
- WebBrowserContainer.cs
- ParameterToken.cs
- HtmlInputText.cs
- IHttpResponseInternal.cs
- GeneralTransformGroup.cs
- ProcessModelSection.cs
- DesignerHierarchicalDataSourceView.cs
- DynamicDocumentPaginator.cs
- PrePrepareMethodAttribute.cs
- KeyedQueue.cs
- SqlBooleanMismatchVisitor.cs
- PiiTraceSource.cs
- SelectionWordBreaker.cs
- KeyConstraint.cs
- DataSourceGroupCollection.cs
- InstanceKeyView.cs
- PackWebRequest.cs
- AmbientProperties.cs
- OdbcEnvironmentHandle.cs
- UnsupportedPolicyOptionsException.cs
- PointCollectionConverter.cs
- SessionEndingCancelEventArgs.cs
- DataListItem.cs
- SyntaxCheck.cs
- GenericsInstances.cs
- InstanceKey.cs
- TableLayout.cs
- ControlBindingsCollection.cs
- MULTI_QI.cs
- Application.cs
- HttpHeaderCollection.cs
- wmiutil.cs
- HttpPostedFileWrapper.cs
- DataMemberAttribute.cs
- StorageBasedPackageProperties.cs
- ProxyWebPartManagerDesigner.cs
- SingleTagSectionHandler.cs
- querybuilder.cs
- RoutedEvent.cs
- BamlLocalizerErrorNotifyEventArgs.cs
- InProcStateClientManager.cs
- ComponentGlyph.cs
- EntityDataSourceWrapperPropertyDescriptor.cs
- DecoratedNameAttribute.cs
- TextFragmentEngine.cs
- ConfigurationSection.cs
- Processor.cs
- ForceCopyBuildProvider.cs
- FocusManager.cs
- QilInvokeLateBound.cs
- FontNamesConverter.cs
- CompilerLocalReference.cs
- Event.cs
- SoapIgnoreAttribute.cs
- Model3DCollection.cs
- _DomainName.cs
- EncoderParameter.cs
- TransformationRules.cs
- SerialPinChanges.cs
- XmlExtensionFunction.cs
- AttributeSetAction.cs
- NativeObjectSecurity.cs
- OutputCacheEntry.cs
- streamingZipPartStream.cs
- NextPreviousPagerField.cs
- TableLayoutStyleCollection.cs
- DeadCharTextComposition.cs
- DynamicValueConverter.cs
- PrinterResolution.cs
- WebPartVerbCollection.cs
- WithParamAction.cs
- FrameworkElementAutomationPeer.cs
- DateTimeSerializationSection.cs
- BitmapEffectInputData.cs
- TrackingRecord.cs
- TextRangeAdaptor.cs
- _KerberosClient.cs
- ServiceReflector.cs
- NullableBoolConverter.cs
- x509utils.cs
- Keywords.cs
- WebServiceErrorEvent.cs
- RangeContentEnumerator.cs
- SecurityIdentifierElement.cs