Table of contents
No headers
/*
 * MindTouch Dream - a distributed REST framework 
 * Copyright (C) 2006-2009 MindTouch, Inc.
 * www.mindtouch.com  oss@mindtouch.com
 *
 * For community documentation and downloads visit wiki.developer.mindtouch.com;
 * please review the licensing section.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Threading;
using log4net;
using MindTouch.Collections;
using MindTouch.Tasking;

namespace MindTouch.Threading {

    /// <summary>
    /// ElasticPriorityThreadPool provides a thread pool that can have a variable number of threads going from a minimum number of reserved
    /// threads to a maximum number of parallel threads.
    /// </summary>
    /// <remarks>
    /// The threads are obtained from the DispatchThreadScheduler and shared across all other clients of the DispatchThreadScheduler.
    /// Obtained threads are released automatically if the thread pool is idle for long enough.  Reserved threads are never released.
    /// </remarks>
    public class ElasticPriorityThreadPool : IDispatchHost, IDisposable {

        //--- Constants ---

        /// <summary>
        /// Maximum number of threads that can be reserved by a single instance.
        /// </summary>
        public const int MAX_RESERVED_THREADS = 1000;

        //--- Types ---
        private class PrioritizedThreadPool : IDispatchQueue {

            //--- Fields ---
            private readonly int _priority;
            private readonly ElasticPriorityThreadPool _pool;

            //--- Constructors ---
            public PrioritizedThreadPool(int priority, ElasticPriorityThreadPool pool) {
                _priority = priority;
                _pool = pool;
            }

            //--- Properties ---
            public int Priority { get { return _priority; } }

            //--- Methods ---
            public void QueueWorkItem(Action callback) {
                if(!TryQueueWorkItem(callback)) {
                    throw new NotSupportedException("TryQueueWorkItem failed");
                }
            }

            public bool TryQueueWorkItem(Action callback) {
                return _pool.TryQueueWorkItem(_priority, callback);
            }
        }

        //--- Class Fields ---
        private static int _instanceCounter;
        private static readonly ILog _log = LogUtils.CreateLog();

        //--- Fields ---
        private readonly object _syncRoot = new object();
        private readonly int _id = Interlocked.Increment(ref _instanceCounter);
        private readonly IThreadsafePriorityQueue<Action> _inbox;
        private readonly PrioritizedThreadPool[] _prioritizedInbox;
        private readonly IThreadsafeStack<KeyValuePair<DispatchThread, Result<DispatchWorkItem>>> _reservedThreads = new LockFreeStack<KeyValuePair<DispatchThread, Result<DispatchWorkItem>>>();
        private readonly int _minReservedThreads;
        private readonly int _maxParallelThreads;
        private int _threadCount;
        private int _threadVelocity;
        private DispatchThread[] _activeThreads;
        private bool _disposed;

        //--- Constructors ---

        /// <summary>
        /// Creates a new ElasticPriorityThreadPool instance.
        /// </summary>
        /// <param name="minReservedThreads">Minium number of threads to reserve for the thread pool.</param>
        /// <param name="maxParallelThreads">Maximum number of parallel threads used by the thread pool.</param>
        /// <param name="maxPriority">Maximum priority number (inclusive upper bound).</param>
        /// <exception cref="InsufficientResourcesException">The ElasticPriorityThreadPool instance was unable to obtain the minimum reserved threads.</exception>
        public ElasticPriorityThreadPool(int minReservedThreads, int maxParallelThreads, int maxPriority) {
            _minReservedThreads = Math.Max(0, Math.Min(minReservedThreads, MAX_RESERVED_THREADS));
            _maxParallelThreads = Math.Max(Math.Max(1, minReservedThreads), Math.Min(maxParallelThreads, int.MaxValue));
            _inbox = new LockFreePriorityQueue<Action>(maxPriority);
            _prioritizedInbox = new PrioritizedThreadPool[maxPriority];
            for(int i = 0; i < maxPriority; ++i) {
                _prioritizedInbox[i] = new PrioritizedThreadPool(i, this);
            }

            // initialize reserved threads
            _activeThreads = new DispatchThread[Math.Min(_maxParallelThreads, Math.Max(_minReservedThreads, Math.Min(16, _maxParallelThreads)))];
            if(_minReservedThreads > 0) {
                DispatchThreadScheduler.RequestThread(_minReservedThreads, AddThread);
            }
            DispatchThreadScheduler.RegisterHost(this);
            _log.DebugFormat("Create @{0}", this);
        }

        //--- Properties ---

        /// <summary>
        /// Number of minimum reserved threads.
        /// </summary>
        public int MinReservedThreads { get { return _disposed ? 0 : _minReservedThreads; } }

        /// <summary>
        /// Number of maxium parallel threads.
        /// </summary>
        public int MaxParallelThreads { get { return _maxParallelThreads; } }

        /// <summary>
        /// Number of threads currently used.
        /// </summary>
        public int ThreadCount { get { return _threadCount; } }

        /// <summary>
        /// Number of items pending for execution.
        /// </summary>
        public int WorkItemCount {
            get {
                int result = _inbox.Count;
                DispatchThread[] threads = _activeThreads;
                foreach(DispatchThread thread in threads) {
                    if(thread != null) {
                        result += thread.PendingWorkItemCount;
                    }
                }
                return result;
            }
        }

        /// <summary>
        /// Max priority for work items.
        /// </summary>
        public int MaxPriority { get { return _inbox.MaxPriority; } }

        /// <summary>
        /// Accessor for prioritized dispatch queue.
        /// </summary>
        /// <param name="priority">Dispatch queue priority level (between 0 and MaxPriority).</param>
        /// <returns>Prioritized dispatch queue.</returns>
        public IDispatchQueue this[int priority] {
            get {
                if(_disposed) {
                    throw new ObjectDisposedException("ElasticPriorityThreadPool has already been disposed");
                }
                return _prioritizedInbox[priority];
            }
        }

        //--- Methods ---

        /// <summary>
        /// Shutdown the ElasticThreadPool instance.  This method blocks until all pending items have finished processing.
        /// </summary>
        public void Dispose() {
            if(!_disposed) {
                _disposed = true;
                _log.DebugFormat("Dispose @{0}", this);

                // TODO (steveb): make dispose more reliable
                // 1) we can't wait indefinitively!
                // 2) we should progressively sleep longer and longer to avoid unnecessary overhead
                // 3) this pattern feels useful enough to be captured into a helper method

                // wait until all threads have been decommissioned
                while(ThreadCount > 0) {
                    Thread.Sleep(100);
                }

                // discard all reserved threads
                KeyValuePair<DispatchThread, Result<DispatchWorkItem>> reserved;
                while(_reservedThreads.TryPop(out reserved)) {
                    DispatchThreadScheduler.ReleaseThread(reserved.Key, reserved.Value);
                }
                DispatchThreadScheduler.UnregisterHost(this);
            }
        }

        /// <summary>
        /// Convert the dispatch queue into a string.
        /// </summary>
        /// <returns>String.</returns>
        public override string ToString() {
            return string.Format("ElasticPriorityThreadPool @{0} (current: {1}, reserve: {5}, velocity: {2}, min: {3}, max: {4}, items: {6}, max-priority: {7})", _id, _threadCount, _threadVelocity, _minReservedThreads, _maxParallelThreads, _reservedThreads.Count, WorkItemCount, _inbox.MaxPriority);
        }

        private bool TryQueueWorkItem(int priority, Action callback) {
            if(_disposed) {
                throw new ObjectDisposedException("ElasticThreadPool has already been disposed");
            }

            // check if we can enqueue work-item into current dispatch thread
            IDispatchQueue queue = this[priority];
            if(DispatchThread.TryQueueWorkItem(queue, callback)) {
                return true;
            }

            // check if there are available threads to which the work-item can be given to
            KeyValuePair<DispatchThread, Result<DispatchWorkItem>> entry;
            if(_reservedThreads.TryPop(out entry)) {
                lock(_syncRoot) {
                    RegisterThread("new item", entry.Key);
                }

                // found an available thread, let's resume it with the work-item
                entry.Value.Return(new DispatchWorkItem(callback, queue));
                return true;
            }

            // no threads available, keep work-item for later
            if(!_inbox.TryEnqueue(priority, callback)) {
                return false;
            }

            // check if we need to request a thread to kick things off
            if(ThreadCount == 0) {
                ((IDispatchHost)this).IncreaseThreadCount("request first thread");
            }
            return true;
        }

        private void AddThread(KeyValuePair<DispatchThread, Result<DispatchWorkItem>> keyvalue) {
            DispatchThread thread = keyvalue.Key;
            Result<DispatchWorkItem> result = keyvalue.Value;
            if(_threadVelocity >= 0) {
                lock(_syncRoot) {
                    _threadVelocity = 0;

                    // check if an item is available for dispatch
                    int priority;
                    Action callback;
                    if(TryRequestItem(null, out priority, out callback)) {
                        RegisterThread("new thread", thread);

                        // dispatch work-item
                        result.Return(new DispatchWorkItem(callback, this[priority]));
                        return;
                    }
                }
            }

            // we have no need for this thread
            RemoveThread("insufficient work for new thread", thread, result);
        }

        private void RemoveThread(string reason, DispatchThread thread, Result<DispatchWorkItem> result) {
            if(thread == null) {
                throw new ArgumentNullException("thread");
            }
            if(result == null) {
                throw new ArgumentNullException("result");
            }
            if(thread.PendingWorkItemCount != 0) {
                throw new ArgumentException(string.Format("thread #{1} still has work-items in queue (items: {0})", thread.PendingWorkItemCount, thread.Id), "thread");
            }

            // remove thread from list of allocated threads
            lock(_syncRoot) {
                _threadVelocity = 0;
                UnregisterThread(reason, thread);
            }

            // check if we can put thread into the reserved list
            if(_reservedThreads.Count < MinReservedThreads) {
                if(!_reservedThreads.TryPush(new KeyValuePair<DispatchThread, Result<DispatchWorkItem>>(thread, result))) {
                    throw new NotSupportedException("TryPush failed");
                }
            } else {

                // return thread to resource manager
                DispatchThreadScheduler.ReleaseThread(thread, result);
            }
        }

        private bool TryRequestItem(DispatchThread thread, out int priority, out Action callback) {

            // check if we can find a work-item in the shared queue
            if(_inbox.TryDequeue(out priority, out callback)) {
                return true;
            }

            // try to steal a work-item from another thread; take a snapshot of all allocated threads (needed in case the array is copied for resizing)
            DispatchThread[] threads = _activeThreads;
            foreach(DispatchThread entry in threads) {

                // check if we can steal a work-item from this thread
                if((entry != null) && !ReferenceEquals(entry, thread) && entry.TryStealWorkItem(out callback)) {
                    priority = ((PrioritizedThreadPool)entry.DispatchQueue).Priority;
                    return true;
                }
            }

            // check again if we can find a work-item in the shared queue since trying to steal may have overlapped with the arrival of a new item
            if(_inbox.TryDequeue(out priority, out callback)) {
                return true;
            }
            return false;
        }

        private void RegisterThread(string reason, DispatchThread thread) {
            ++_threadCount;
            thread.Host = this;

            // find an empty slot in the array of all threads
            int index;
            for(index = 0; index < _activeThreads.Length; ++index) {

                // check if we found an empty slot
                if(_activeThreads[index] == null) {

                    // assign it to the found slot and stop iterating
                    _activeThreads[index] = thread;
                    break;
                }
            }

            // check if we need to grow the array
            if(index == _activeThreads.Length) {

                // make room to add a new thread by doubling the array size and copying over the existing entries
                DispatchThread[] newArray = new DispatchThread[2 * _activeThreads.Length];
                Array.Copy(_activeThreads, newArray, _activeThreads.Length);

                // assign new thread
                newArray[index] = thread;

                // update instance field
                _activeThreads = newArray;
            }
#if EXTRA_DEBUG
            _log.DebugFormat("AddThread: {1} - {0}", this, reason);
#endif
        }

        private void UnregisterThread(string reason, DispatchThread thread) {
            thread.Host = null;

            // find thread and remove it
            for(int i = 0; i < _activeThreads.Length; ++i) {
                if(ReferenceEquals(_activeThreads[i], thread)) {
                    --_threadCount;
                    _activeThreads[i] = null;
#if EXTRA_DEBUG
                    _log.DebugFormat("RemoveThread: {1} - {0}", this, reason);
#endif
                    break;
                }
            }
        }

        //--- IDispatchHost Members ---
        long IDispatchHost.PendingWorkItemCount { get { return WorkItemCount; } }
        int IDispatchHost.MinThreadCount { get { return MinReservedThreads; } }
        int IDispatchHost.MaxThreadCount { get { return _maxParallelThreads; } }

        void IDispatchHost.RequestWorkItem(DispatchThread thread, Result<DispatchWorkItem> result) {
            if(thread == null) {
                throw new ArgumentNullException("thread");
            }
            if(thread.PendingWorkItemCount > 0) {
                throw new ArgumentException(string.Format("thread #{1} still has work-items in queue (items: {0})", thread.PendingWorkItemCount, thread.Id), "thread");
            }
            if(!ReferenceEquals(thread.Host, this)) {
                throw new InvalidOperationException(string.Format("thread is allocated to another queue: received {0}, expected: {1}", thread.Host, this));
            }
            if(result == null) {
                throw new ArgumentNullException("result");
            }

            // check if we need to decommission threads without causing starvation
            if(_threadVelocity < 0) {
                RemoveThread("system saturation", thread, result);
                return;
            }

            // check if we found a work-item
            int priority;
            Action callback;
            if(TryRequestItem(thread, out priority, out callback)) {

                // dispatch work-item
                result.Return(new DispatchWorkItem(callback, this[priority]));
            } else {

                // relinquich thread; it's not required anymore
                RemoveThread("insufficient work", thread, result);
            }
        }

        void IDispatchHost.IncreaseThreadCount(string reason) {

            // check if thread pool is already awaiting another thread
            if(_threadVelocity > 0) {
                return;
            }
            lock(_syncRoot) {
                _threadVelocity = 1;

                // check if thread pool has enough threads
                if(_threadCount >= _maxParallelThreads) {
                    _threadVelocity = 0;
                    return;
                }
#if EXTRA_DEBUG
                _log.DebugFormat("IncreaseThreadCount: {1} - {0}", this, reason);
#endif
            }

            // check if there are threads in the reserve
            KeyValuePair<DispatchThread, Result<DispatchWorkItem>> reservedThread;
            if(_reservedThreads.TryPop(out reservedThread)) {
                AddThread(reservedThread);
            } else {
                DispatchThreadScheduler.RequestThread(0, AddThread);
            }
        }

        void IDispatchHost.MaintainThreadCount(string reason) {

            // check if thread pool is already trying to steady
            if(_threadVelocity == 0) {
                return;
            }
            lock(_syncRoot) {
                _threadVelocity = 0;
#if EXTRA_DEBUG
                _log.DebugFormat("MaintainThreadCount: {1} - {0}", this, reason);
#endif
            }
        }

        void IDispatchHost.DecreaseThreadCount(string reason) {

            // check if thread pool is already trying to discard thread
            if(_threadVelocity < 0) {
                return;
            }
            lock(_syncRoot) {
                _threadVelocity = -1;
#if EXTRA_DEBUG
                _log.DebugFormat("DecreaseThreadCount: {1} - {0}", this, reason);
#endif
            }
        }
    }
}
Tag page (Edit tags)
    You must login to post a comment.