Code:
/ WCF / WCF / 3.5.30729.1 / untmp / Orcas / SP / ndp / cdf / src / WCF / ServiceModel / System / ServiceModel / Channels / InputQueue.cs / 1 / InputQueue.cs
//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------- namespace System.ServiceModel.Channels { using System.Collections.Generic; using System.Diagnostics; using System.ServiceModel.Diagnostics; using System.Threading; // ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The // InputQueue lock is not held during the callback. However, the user code will // not be notified of the item being available until the callback returns. If you // are not sure if the callback will block for a long time, then first call // IOThreadScheduler.ScheduleCallback to get to a "safe" thread. delegate void ItemDequeuedCallback(); class InputQueue: IDisposable where T : class { ItemQueue itemQueue; QueueState queueState; Queue readerQueue; List waiterList; static WaitCallback onInvokeDequeuedCallback; static WaitCallback onDispatchCallback; static WaitCallback completeOutstandingReadersCallback; static WaitCallback completeWaitersFalseCallback; static WaitCallback completeWaitersTrueCallback; enum QueueState { Open, Shutdown, Closed } public InputQueue() { itemQueue = new ItemQueue(); readerQueue = new Queue (); waiterList = new List (); queueState = QueueState.Open; } public int PendingCount { get { lock (ThisLock) { return itemQueue.ItemCount; } } } object ThisLock { get { return itemQueue; } } public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state) { Item item = default(Item); lock (ThisLock) { if (queueState == QueueState.Open) { if (itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else { AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state); readerQueue.Enqueue(reader); return reader; } } else if (queueState == QueueState.Shutdown) { if (itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else if (itemQueue.HasAnyItem) { AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state); readerQueue.Enqueue(reader); return reader; } } } InvokeDequeuedCallback(item.DequeuedCallback); return new TypedCompletedAsyncResult (item.GetValue(), callback, state); } public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state) { lock (ThisLock) { if (queueState == QueueState.Open) { if (!itemQueue.HasAvailableItem) { AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); waiterList.Add(waiter); return waiter; } } else if (queueState == QueueState.Shutdown) { if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem) { AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state); waiterList.Add(waiter); return waiter; } } } return new TypedCompletedAsyncResult (true, callback, state); } static void CompleteOutstandingReadersCallback(object state) { IQueueReader[] outstandingReaders = (IQueueReader[])state; for (int i = 0; i < outstandingReaders.Length; i++) { outstandingReaders[i].Set(default(Item)); } } static void CompleteWaitersFalseCallback(object state) { CompleteWaiters(false, (IQueueWaiter[])state); } static void CompleteWaitersTrueCallback(object state) { CompleteWaiters(true, (IQueueWaiter[])state); } static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters) { for (int i=0; i 0) { waiters = waiterList.ToArray(); waiterList.Clear(); } else { waiters = null; } } public void Close() { ((IDisposable)this).Dispose(); } public void Shutdown() { this.Shutdown(null); } public void Shutdown(CommunicationObject communicationObject) { IQueueReader[] outstandingReaders = null; lock (ThisLock) { if (queueState == QueueState.Shutdown) return; if (queueState == QueueState.Closed) return; this.queueState = QueueState.Shutdown; if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0) { outstandingReaders = new IQueueReader[readerQueue.Count]; readerQueue.CopyTo(outstandingReaders, 0); readerQueue.Clear(); } } if (outstandingReaders != null) { for (int i = 0; i < outstandingReaders.Length; i++) { Exception exception = (communicationObject == null) ? null : communicationObject.GetPendingException(); outstandingReaders[i].Set(new Item(exception, null)); } } } public T Dequeue(TimeSpan timeout) { T value; if (!this.Dequeue(timeout, out value)) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(SR.GetString(SR.TimeoutInputQueueDequeue1, timeout))); } return value; } public bool Dequeue(TimeSpan timeout, out T value) { WaitQueueReader reader = null; Item item = new Item(); lock (ThisLock) { if (queueState == QueueState.Open) { if (itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else { reader = new WaitQueueReader(this); readerQueue.Enqueue(reader); } } else if (queueState == QueueState.Shutdown) { if (itemQueue.HasAvailableItem) { item = itemQueue.DequeueAvailableItem(); } else if (itemQueue.HasAnyItem) { reader = new WaitQueueReader(this); readerQueue.Enqueue(reader); } else { value = default(T); return true; } } else // queueState == QueueState.Closed { value = default(T); return true; } } if (reader != null) { return reader.Wait(timeout, out value); } else { InvokeDequeuedCallback(item.DequeuedCallback); value = item.GetValue(); return true; } } public bool WaitForItem(TimeSpan timeout) { WaitQueueWaiter waiter = null; bool itemAvailable = false; lock (ThisLock) { if (queueState == QueueState.Open) { if (itemQueue.HasAvailableItem) { itemAvailable = true; } else { waiter = new WaitQueueWaiter(); waiterList.Add(waiter); } } else if (queueState == QueueState.Shutdown) { if (itemQueue.HasAvailableItem) { itemAvailable = true; } else if (itemQueue.HasAnyItem) { waiter = new WaitQueueWaiter(); waiterList.Add(waiter); } else { return true; } } else // queueState == QueueState.Closed { return true; } } if (waiter != null) { return waiter.Wait(timeout); } else { return itemAvailable; } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected void Dispose(bool disposing) { if (disposing) { bool dispose = false; lock (ThisLock) { if (queueState != QueueState.Closed) { queueState = QueueState.Closed; dispose = true; } } if (dispose) { while (readerQueue.Count > 0) { IQueueReader reader = readerQueue.Dequeue(); reader.Set(default(Item)); } while (itemQueue.HasAnyItem) { Item item = itemQueue.DequeueAnyItem(); item.Dispose(); InvokeDequeuedCallback(item.DequeuedCallback); } } } } public bool EndDequeue(IAsyncResult result, out T value) { TypedCompletedAsyncResult typedResult = result as TypedCompletedAsyncResult ; if (typedResult != null) { value = TypedCompletedAsyncResult .End(result); return true; } return AsyncQueueReader.End(result, out value); } public T EndDequeue(IAsyncResult result) { T value; if (!this.EndDequeue(result, out value)) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException()); } return value; } public bool EndWaitForItem(IAsyncResult result) { TypedCompletedAsyncResult typedResult = result as TypedCompletedAsyncResult ; if (typedResult != null) return TypedCompletedAsyncResult .End(result); return AsyncQueueWaiter.End(result); } public void Dispatch() { IQueueReader reader = null; Item item = new Item(); IQueueReader[] outstandingReaders = null; IQueueWaiter[] waiters = null; bool itemAvailable = true; lock (ThisLock) { itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown)); this.GetWaiters(out waiters); if (queueState != QueueState.Closed) { itemQueue.MakePendingItemAvailable(); if (readerQueue.Count > 0) { item = itemQueue.DequeueAvailableItem(); reader = readerQueue.Dequeue(); if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0) { outstandingReaders = new IQueueReader[readerQueue.Count]; readerQueue.CopyTo(outstandingReaders, 0); readerQueue.Clear(); itemAvailable = false; } } } } if (outstandingReaders != null) { if (completeOutstandingReadersCallback == null) completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback); IOThreadScheduler.ScheduleCallback(completeOutstandingReadersCallback, outstandingReaders); } if (waiters != null) { CompleteWaitersLater(itemAvailable, waiters); } if (reader != null) { InvokeDequeuedCallback(item.DequeuedCallback); reader.Set(item); } } public void EnqueueAndDispatch(T item) { EnqueueAndDispatch(item, null); } public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback) { EnqueueAndDispatch(item, dequeuedCallback, true); } public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread) { DiagnosticUtility.DebugAssert(exception != null, "exception parameter should not be null"); EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread); } public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread) { DiagnosticUtility.DebugAssert(item != null, "item parameter should not be null"); EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread); } void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread) { bool disposeItem = false; IQueueReader reader = null; bool dispatchLater = false; IQueueWaiter[] waiters = null; bool itemAvailable = true; lock (ThisLock) { itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown)); this.GetWaiters(out waiters); if (queueState == QueueState.Open) { if (canDispatchOnThisThread) { if (readerQueue.Count == 0) { itemQueue.EnqueueAvailableItem(item); } else { reader = readerQueue.Dequeue(); } } else { if (readerQueue.Count == 0) { itemQueue.EnqueueAvailableItem(item); } else { itemQueue.EnqueuePendingItem(item); dispatchLater = true; } } } else // queueState == QueueState.Closed || queueState == QueueState.Shutdown { disposeItem = true; } } if (waiters != null) { if (canDispatchOnThisThread) { CompleteWaiters(itemAvailable, waiters); } else { CompleteWaitersLater(itemAvailable, waiters); } } if (reader != null) { InvokeDequeuedCallback(item.DequeuedCallback); reader.Set(item); } if (dispatchLater) { if (onDispatchCallback == null) { onDispatchCallback = new WaitCallback(OnDispatchCallback); } IOThreadScheduler.ScheduleCallback(onDispatchCallback, this); } else if (disposeItem) { InvokeDequeuedCallback(item.DequeuedCallback); item.Dispose(); } } public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback) { DiagnosticUtility.DebugAssert(item != null, "EnqueueWithoutDispatch: item parameter should not be null"); return EnqueueWithoutDispatch(new Item(item, dequeuedCallback)); } public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback) { DiagnosticUtility.DebugAssert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null"); return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback)); } // This will not block, however, Dispatch() must be called later if this function // returns true. bool EnqueueWithoutDispatch(Item item) { lock (ThisLock) { // Open if (queueState != QueueState.Closed && queueState != QueueState.Shutdown) { if (readerQueue.Count == 0) { itemQueue.EnqueueAvailableItem(item); return false; } else { itemQueue.EnqueuePendingItem(item); return true; } } } item.Dispose(); InvokeDequeuedCallbackLater(item.DequeuedCallback); return false; } static void OnDispatchCallback(object state) { ((InputQueue )state).Dispatch(); } static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback) { if (dequeuedCallback != null) { if (onInvokeDequeuedCallback == null) { onInvokeDequeuedCallback = OnInvokeDequeuedCallback; } IOThreadScheduler.ScheduleCallback(onInvokeDequeuedCallback, dequeuedCallback); } } static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback) { if (dequeuedCallback != null) { dequeuedCallback(); } } static void OnInvokeDequeuedCallback(object state) { DiagnosticUtility.DebugAssert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)"); ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback)state; dequeuedCallback(); } // Used for timeouts. The InputQueue must remove readers from its reader queue to prevent // dispatching items to timed out readers. bool RemoveReader(IQueueReader reader) { DiagnosticUtility.DebugAssert(reader != null, "InputQueue.RemoveReader: (reader != null)"); lock (ThisLock) { if (queueState == QueueState.Open || queueState == QueueState.Shutdown) { bool removed = false; for (int i = readerQueue.Count; i > 0; i--) { IQueueReader temp = readerQueue.Dequeue(); if (Object.ReferenceEquals(temp, reader)) { removed = true; } else { readerQueue.Enqueue(temp); } } return removed; } } return false; } interface IQueueReader { void Set(Item item); } interface IQueueWaiter { void Set(bool itemAvailable); } class WaitQueueReader : IQueueReader { Exception exception; InputQueue inputQueue; T item; ManualResetEvent waitEvent; public WaitQueueReader(InputQueue inputQueue) { this.inputQueue = inputQueue; waitEvent = new ManualResetEvent(false); } public void Set(Item item) { lock (this) { DiagnosticUtility.DebugAssert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)"); DiagnosticUtility.DebugAssert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)"); this.exception = item.Exception; this.item = item.Value; waitEvent.Set(); } } public bool Wait(TimeSpan timeout, out T value) { bool isSafeToClose = false; try { if (!TimeoutHelper.WaitOne(waitEvent, timeout, false)) { if (this.inputQueue.RemoveReader(this)) { value = default(T); isSafeToClose = true; return false; } else { waitEvent.WaitOne(); } } isSafeToClose = true; } finally { if (isSafeToClose) { waitEvent.Close(); } } if (this.exception != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.exception); } value = item; return true; } } class AsyncQueueReader : TraceAsyncResult, IQueueReader { static WaitCallback timerCallback = new WaitCallback(AsyncQueueReader.TimerCallback); bool expired; InputQueue inputQueue; T item; IOThreadTimer timer; public AsyncQueueReader(InputQueue inputQueue, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.inputQueue = inputQueue; if (timeout != TimeSpan.MaxValue) { this.timer = new IOThreadTimer(timerCallback, this, false); this.timer.Set(timeout); } } public static bool End(IAsyncResult result, out T value) { AsyncQueueReader readerResult = AsyncResult.End (result); if (readerResult.expired) { value = default(T); return false; } else { value = readerResult.item; return true; } } static void TimerCallback(object state) { AsyncQueueReader thisPtr = (AsyncQueueReader)state; if (thisPtr.inputQueue.RemoveReader(thisPtr)) { thisPtr.expired = true; thisPtr.Complete(false); } } public void Set(Item item) { this.item = item.Value; if (this.timer != null) { this.timer.Cancel(); } Complete(false, item.Exception); } } struct Item { T value; Exception exception; ItemDequeuedCallback dequeuedCallback; public Item(T value, ItemDequeuedCallback dequeuedCallback) : this(value, null, dequeuedCallback) { } public Item(Exception exception, ItemDequeuedCallback dequeuedCallback) : this(null, exception, dequeuedCallback) { } Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback) { this.value = value; this.exception = exception; this.dequeuedCallback = dequeuedCallback; } public Exception Exception { get { return this.exception; } } public T Value { get { return value; } } public ItemDequeuedCallback DequeuedCallback { get { return dequeuedCallback; } } public void Dispose() { if (value != null) { if (value is IDisposable) ((IDisposable)value).Dispose(); else if (value is ICommunicationObject) ((ICommunicationObject)value).Abort(); } } public T GetValue() { if (this.exception != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.exception); } return this.value; } } class WaitQueueWaiter : IQueueWaiter { bool itemAvailable; ManualResetEvent waitEvent; public WaitQueueWaiter() { waitEvent = new ManualResetEvent(false); } public void Set(bool itemAvailable) { lock (this) { this.itemAvailable = itemAvailable; waitEvent.Set(); } } public bool Wait(TimeSpan timeout) { if (!TimeoutHelper.WaitOne(waitEvent, timeout, false)) { return false; } return this.itemAvailable; } } class AsyncQueueWaiter : AsyncResult, IQueueWaiter { static WaitCallback timerCallback = new WaitCallback(AsyncQueueWaiter.TimerCallback); IOThreadTimer timer; bool itemAvailable; object thisLock = new object(); public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { if (timeout != TimeSpan.MaxValue) { this.timer = new IOThreadTimer(timerCallback, this, false); this.timer.Set(timeout); } } object ThisLock { get { return this.thisLock; } } public static bool End(IAsyncResult result) { AsyncQueueWaiter waiterResult = AsyncResult.End (result); return waiterResult.itemAvailable; } static void TimerCallback(object state) { AsyncQueueWaiter _this = (AsyncQueueWaiter)state; _this.Complete(false); } public void Set(bool itemAvailable) { bool timely; lock (ThisLock) { timely = (this.timer == null) || this.timer.Cancel(); this.itemAvailable = itemAvailable; } if (timely) Complete(false); } } class ItemQueue { Item[] items; int head; int pendingCount; int totalCount; public ItemQueue() { items = new Item[1]; } public Item DequeueAvailableItem() { if (totalCount == pendingCount) { DiagnosticUtility.DebugAssert("ItemQueue does not contain any available items"); throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false); } return DequeueItemCore(); } public Item DequeueAnyItem() { if (pendingCount == totalCount) pendingCount--; return DequeueItemCore(); } void EnqueueItemCore(Item item) { if (totalCount == items.Length) { Item[] newItems = new Item[items.Length * 2]; for (int i = 0; i < totalCount; i++) newItems[i] = items[(head + i) % items.Length]; head = 0; items = newItems; } int tail = (head + totalCount) % items.Length; items[tail] = item; totalCount++; } Item DequeueItemCore() { if (totalCount == 0) { DiagnosticUtility.DebugAssert("ItemQueue does not contain any items"); throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false); } Item item = items[head]; items[head] = new Item(); totalCount--; head = (head + 1) % items.Length; return item; } public void EnqueuePendingItem(Item item) { EnqueueItemCore(item); pendingCount++; } public void EnqueueAvailableItem(Item item) { EnqueueItemCore(item); } public void MakePendingItemAvailable() { if (pendingCount == 0) { DiagnosticUtility.DebugAssert("ItemQueue does not contain any pending items"); throw DiagnosticUtility.ExceptionUtility.ThrowHelperInternal(false); } pendingCount--; } public bool HasAvailableItem { get { return totalCount > pendingCount; } } public bool HasAnyItem { get { return totalCount > 0; } } public int ItemCount { get { return totalCount; } } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // Copyright (c) Microsoft Corporation. All rights reserved.
Link Menu
This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- WebPartUserCapability.cs
- MarginsConverter.cs
- GridViewSelectEventArgs.cs
- EntityChangedParams.cs
- DesignTimeVisibleAttribute.cs
- MultiBindingExpression.cs
- securitycriticaldataformultiplegetandset.cs
- TextStore.cs
- BulletChrome.cs
- SpecularMaterial.cs
- NotConverter.cs
- COAUTHIDENTITY.cs
- ExpressionEditorSheet.cs
- WizardPanel.cs
- Win32Exception.cs
- OnOperation.cs
- Int64Animation.cs
- InvalidAsynchronousStateException.cs
- SerialReceived.cs
- DesignOnlyAttribute.cs
- SyndicationSerializer.cs
- SeparatorAutomationPeer.cs
- DoubleLink.cs
- PerformanceCounterLib.cs
- DesignBindingPicker.cs
- RoleGroup.cs
- XmlComplianceUtil.cs
- SmtpTransport.cs
- DrawTreeNodeEventArgs.cs
- BamlVersionHeader.cs
- mansign.cs
- StrongNameMembershipCondition.cs
- MethodBuilder.cs
- GeometryGroup.cs
- GenericTypeParameterBuilder.cs
- RenderingEventArgs.cs
- ContentPlaceHolder.cs
- ObjectQueryState.cs
- SqlInternalConnection.cs
- WebPartsSection.cs
- ConfigsHelper.cs
- ComplexType.cs
- BitmapScalingModeValidation.cs
- AccessKeyManager.cs
- BezierSegment.cs
- PropertyEmitterBase.cs
- EntityDataReader.cs
- IItemContainerGenerator.cs
- HttpRuntimeSection.cs
- TypeDelegator.cs
- DmlSqlGenerator.cs
- ProxyWebPart.cs
- ProcessInfo.cs
- ResourceDictionaryCollection.cs
- ResolveDuplexCD1AsyncResult.cs
- ListViewHitTestInfo.cs
- StackBuilderSink.cs
- Set.cs
- StyleBamlTreeBuilder.cs
- WebHttpEndpoint.cs
- PersianCalendar.cs
- RowsCopiedEventArgs.cs
- PreDigestedSignedInfo.cs
- KnownTypes.cs
- SplineQuaternionKeyFrame.cs
- BuildTopDownAttribute.cs
- UpdateProgress.cs
- _HeaderInfo.cs
- DiscardableAttribute.cs
- VisualTarget.cs
- GeneralTransform3DGroup.cs
- Debugger.cs
- TrackingStringDictionary.cs
- Image.cs
- ProxyManager.cs
- OverlappedAsyncResult.cs
- HttpApplication.cs
- CacheHelper.cs
- ClientCultureInfo.cs
- StringReader.cs
- DetailsViewPagerRow.cs
- ApplicationSecurityManager.cs
- TemplateNameScope.cs
- GridViewColumn.cs
- WebPartsPersonalization.cs
- JavascriptXmlWriterWrapper.cs
- BaseCollection.cs
- ProcessModelSection.cs
- ListViewItem.cs
- ArglessEventHandlerProxy.cs
- _RegBlobWebProxyDataBuilder.cs
- CodeChecksumPragma.cs
- TransportOutputChannel.cs
- ValueTypeFieldReference.cs
- ComponentEvent.cs
- ResourceWriter.cs
- CodePageUtils.cs
- RightsManagementEncryptionTransform.cs
- BooleanToVisibilityConverter.cs
- WebPartConnectionCollection.cs