Code:
/ 4.0 / 4.0 / DEVDIV_TFS / Dev10 / Releases / RTMRel / ndp / clr / src / ManagedLibraries / Remoting / Channels / CORE / RequestQueue.cs / 1305376 / RequestQueue.cs
// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // // Request Queue // queues up the requests to avoid thread pool starvation, // making sure that there are always available threads to process requests //// This code has been taken from the XSP code base and modified for Remoting. // namespace System.Runtime.Remoting.Channels { using System.Threading; using System.Collections; internal class RequestQueue { // configuration params private int _minExternFreeThreads; private int _minLocalFreeThreads; private int _queueLimit; // two queues -- one for local requests, one for external private Queue _localQueue = new Queue(); private Queue _externQueue = new Queue(); // total count private int _count; // work items queued to pick up new work private WaitCallback _workItemCallback; private int _workItemCount; private const int _workItemLimit = 2; // helpers private static bool IsLocal(SocketHandler sh) { return sh.IsLocal(); } private void QueueRequest(SocketHandler sh, bool isLocal) { lock (this) { if (isLocal) _localQueue.Enqueue(sh); else _externQueue.Enqueue(sh); _count++; } } private SocketHandler DequeueRequest(bool localOnly) { Object sh = null; if (_count > 0) { lock (this) { if (_localQueue.Count > 0) { sh = _localQueue.Dequeue(); _count--; } else if (!localOnly && _externQueue.Count > 0) { sh = _externQueue.Dequeue(); _count--; } } } return (SocketHandler)sh; } // ctor internal RequestQueue(int minExternFreeThreads, int minLocalFreeThreads, int queueLimit) { _minExternFreeThreads = minExternFreeThreads; _minLocalFreeThreads = minLocalFreeThreads; _queueLimit = queueLimit; _workItemCallback = new WaitCallback(this.WorkItemCallback); } // method called to process the next request internal void ProcessNextRequest(SocketHandler sh) { sh = GetRequestToExecute(sh); if (sh != null) sh.ProcessRequestNow(); } // ProcessNextRequest // method called when data arrives for incoming requests internal SocketHandler GetRequestToExecute(SocketHandler sh) { int workerThreads, ioThreads; ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads); int freeThreads = (ioThreads > workerThreads) ? workerThreads : ioThreads; // fast path when there are threads available and nothing queued if (freeThreads >= _minExternFreeThreads && _count == 0) return sh; bool isLocal = IsLocal(sh); // fast path when there are threads for local requests available and nothing queued if (isLocal && freeThreads >= _minLocalFreeThreads && _count == 0) return sh; // reject if queue limit exceeded if (_count >= _queueLimit) { sh.RejectRequestNowSinceServerIsBusy(); return null; } // can't execute the current request on the current thread -- need to queue QueueRequest(sh, isLocal); // maybe can execute a request previously queued if (freeThreads >= _minExternFreeThreads) { sh = DequeueRequest(false); // enough threads to process even external requests } else if (freeThreads >= _minLocalFreeThreads) { sh = DequeueRequest(true); // enough threads to process only local requests } else{ sh = null; } if (sh == null){ // not enough threads -> do nothing on this thread ScheduleMoreWorkIfNeeded(); // try to schedule to worker thread } return sh; } // method called from SocketHandler at the end of request internal void ScheduleMoreWorkIfNeeded() { // is queue empty? if (_count == 0) return; // already scheduled enough work items if (_workItemCount >= _workItemLimit) return; // queue the work item Interlocked.Increment(ref _workItemCount); ThreadPool.UnsafeQueueUserWorkItem(_workItemCallback, null); } // method called to pick up more work private void WorkItemCallback(Object state) { Interlocked.Decrement(ref _workItemCount); // is queue empty? if (_count == 0) return; int workerThreads, ioThreads; ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads); bool bHandledRequest = false; // service another request if enough worker threads are available if (workerThreads >= _minLocalFreeThreads) { // pick up request from the queue SocketHandler sh = DequeueRequest(workerThreads < _minExternFreeThreads); if (sh != null) { sh.ProcessRequestNow(); bHandledRequest = true; } } if (!bHandledRequest){ Thread.Sleep(250); ScheduleMoreWorkIfNeeded(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007. // ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // // Request Queue // queues up the requests to avoid thread pool starvation, // making sure that there are always available threads to process requests //// This code has been taken from the XSP code base and modified for Remoting. // namespace System.Runtime.Remoting.Channels { using System.Threading; using System.Collections; internal class RequestQueue { // configuration params private int _minExternFreeThreads; private int _minLocalFreeThreads; private int _queueLimit; // two queues -- one for local requests, one for external private Queue _localQueue = new Queue(); private Queue _externQueue = new Queue(); // total count private int _count; // work items queued to pick up new work private WaitCallback _workItemCallback; private int _workItemCount; private const int _workItemLimit = 2; // helpers private static bool IsLocal(SocketHandler sh) { return sh.IsLocal(); } private void QueueRequest(SocketHandler sh, bool isLocal) { lock (this) { if (isLocal) _localQueue.Enqueue(sh); else _externQueue.Enqueue(sh); _count++; } } private SocketHandler DequeueRequest(bool localOnly) { Object sh = null; if (_count > 0) { lock (this) { if (_localQueue.Count > 0) { sh = _localQueue.Dequeue(); _count--; } else if (!localOnly && _externQueue.Count > 0) { sh = _externQueue.Dequeue(); _count--; } } } return (SocketHandler)sh; } // ctor internal RequestQueue(int minExternFreeThreads, int minLocalFreeThreads, int queueLimit) { _minExternFreeThreads = minExternFreeThreads; _minLocalFreeThreads = minLocalFreeThreads; _queueLimit = queueLimit; _workItemCallback = new WaitCallback(this.WorkItemCallback); } // method called to process the next request internal void ProcessNextRequest(SocketHandler sh) { sh = GetRequestToExecute(sh); if (sh != null) sh.ProcessRequestNow(); } // ProcessNextRequest // method called when data arrives for incoming requests internal SocketHandler GetRequestToExecute(SocketHandler sh) { int workerThreads, ioThreads; ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads); int freeThreads = (ioThreads > workerThreads) ? workerThreads : ioThreads; // fast path when there are threads available and nothing queued if (freeThreads >= _minExternFreeThreads && _count == 0) return sh; bool isLocal = IsLocal(sh); // fast path when there are threads for local requests available and nothing queued if (isLocal && freeThreads >= _minLocalFreeThreads && _count == 0) return sh; // reject if queue limit exceeded if (_count >= _queueLimit) { sh.RejectRequestNowSinceServerIsBusy(); return null; } // can't execute the current request on the current thread -- need to queue QueueRequest(sh, isLocal); // maybe can execute a request previously queued if (freeThreads >= _minExternFreeThreads) { sh = DequeueRequest(false); // enough threads to process even external requests } else if (freeThreads >= _minLocalFreeThreads) { sh = DequeueRequest(true); // enough threads to process only local requests } else{ sh = null; } if (sh == null){ // not enough threads -> do nothing on this thread ScheduleMoreWorkIfNeeded(); // try to schedule to worker thread } return sh; } // method called from SocketHandler at the end of request internal void ScheduleMoreWorkIfNeeded() { // is queue empty? if (_count == 0) return; // already scheduled enough work items if (_workItemCount >= _workItemLimit) return; // queue the work item Interlocked.Increment(ref _workItemCount); ThreadPool.UnsafeQueueUserWorkItem(_workItemCallback, null); } // method called to pick up more work private void WorkItemCallback(Object state) { Interlocked.Decrement(ref _workItemCount); // is queue empty? if (_count == 0) return; int workerThreads, ioThreads; ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads); bool bHandledRequest = false; // service another request if enough worker threads are available if (workerThreads >= _minLocalFreeThreads) { // pick up request from the queue SocketHandler sh = DequeueRequest(workerThreads < _minExternFreeThreads); if (sh != null) { sh.ProcessRequestNow(); bHandledRequest = true; } } if (!bHandledRequest){ Thread.Sleep(250); ScheduleMoreWorkIfNeeded(); } } } } // File provided for Reference Use Only by Microsoft Corporation (c) 2007.
Link Menu

This book is available now!
Buy at Amazon US or
Buy at Amazon UK
- Throw.cs
- BamlReader.cs
- PauseStoryboard.cs
- XmlAttributeCollection.cs
- ClientSideQueueItem.cs
- UnitControl.cs
- DataGridViewAutoSizeModeEventArgs.cs
- LinqDataSourceHelper.cs
- CommonProperties.cs
- AbstractExpressions.cs
- ShapingEngine.cs
- MultiTouchSystemGestureLogic.cs
- WinEventQueueItem.cs
- JournalEntry.cs
- IRCollection.cs
- ComplexPropertyEntry.cs
- AllMembershipCondition.cs
- SqlProviderUtilities.cs
- _AcceptOverlappedAsyncResult.cs
- ObjectPropertyMapping.cs
- RecognizedWordUnit.cs
- JournalEntryStack.cs
- XmlSchemaIdentityConstraint.cs
- FontCacheLogic.cs
- ReadWriteSpinLock.cs
- ContextMenuStripActionList.cs
- TokenBasedSet.cs
- RegionIterator.cs
- AppDomainFactory.cs
- SymDocumentType.cs
- BaseAsyncResult.cs
- XmlSchemaAnyAttribute.cs
- CodeActivityMetadata.cs
- SortQueryOperator.cs
- DesignTimeResourceProviderFactoryAttribute.cs
- PrimitiveCodeDomSerializer.cs
- SolidColorBrush.cs
- TimeManager.cs
- GeneratedCodeAttribute.cs
- ListViewUpdateEventArgs.cs
- ThreadAbortException.cs
- PresentationTraceSources.cs
- SQLDoubleStorage.cs
- QueryStringHandler.cs
- _Semaphore.cs
- DataGridLength.cs
- RightsManagementEncryptedStream.cs
- Context.cs
- EntityContainerEntitySet.cs
- ContextConfiguration.cs
- Sql8ConformanceChecker.cs
- QueryRewriter.cs
- SignatureToken.cs
- XmlDocumentSerializer.cs
- MessageBuffer.cs
- TrackingValidationObjectDictionary.cs
- SystemIcmpV6Statistics.cs
- ContainerUtilities.cs
- ConfigsHelper.cs
- GenericNameHandler.cs
- ResolveDuplex11AsyncResult.cs
- XmlBoundElement.cs
- UidPropertyAttribute.cs
- ObjectNotFoundException.cs
- sqlstateclientmanager.cs
- HttpDebugHandler.cs
- BrowserCapabilitiesCodeGenerator.cs
- WpfXamlMember.cs
- MessageQueuePermissionAttribute.cs
- MenuAdapter.cs
- OdbcConnectionPoolProviderInfo.cs
- NetworkInformationPermission.cs
- EncryptedData.cs
- DataBinder.cs
- MasterPageCodeDomTreeGenerator.cs
- TypefaceMetricsCache.cs
- BindingContext.cs
- CapabilitiesState.cs
- WindowsSecurityTokenAuthenticator.cs
- AssertSection.cs
- SchemaMapping.cs
- InvalidWMPVersionException.cs
- GeometryDrawing.cs
- HelpInfo.cs
- UnsafeNativeMethods.cs
- RuntimeHelpers.cs
- TextReader.cs
- SoapInteropTypes.cs
- ImageFormat.cs
- ClientProxyGenerator.cs
- SoapReflectionImporter.cs
- IPipelineRuntime.cs
- HitTestParameters.cs
- DetailsViewDeletedEventArgs.cs
- NativeMethodsOther.cs
- Keywords.cs
- SoapFault.cs
- MultiByteCodec.cs
- WebPartActionVerb.cs
- PathTooLongException.cs