diff --git a/src/trimage/ThreadPool.py b/src/trimage/ThreadPool.py index c7387d8..5c64409 100644 --- a/src/trimage/ThreadPool.py +++ b/src/trimage/ThreadPool.py @@ -3,9 +3,6 @@ ThreadPool Implementation @author: Morten Holdflod Moeller - morten@holdflod.dk @license: LGPL v3 - -(A buggy line commented out by Kalman Tarnay... -see: http://code.google.com/p/pythonthreadpool/issues/detail?id=4 ) ''' from __future__ import with_statement @@ -13,6 +10,17 @@ from threading import Thread, RLock from time import sleep from Queue import Queue, Empty import logging +import sys + +class NullHandler(logging.Handler): + def emit(self, record): + pass + +h = sys.stderr +logging.getLogger('threadpool').addHandler(h) +logging.getLogger('threadpool.worker').addHandler(h) + + class ThreadPoolMixIn: """Mix-in class to handle each request in a new thread from the ThreadPool.""" @@ -114,9 +122,12 @@ class ThreadPool: and executing them ''' while self.alive: + #print self.pool.__active_worker_count, self.pool.__worker_count job = self.pool.get_job() if (job != None): + self.pool.worker_active() job.execute() + self.pool.worker_inactive() else: self.alive = False @@ -137,8 +148,9 @@ class ThreadPool: # This Queue is assumed Thread Safe self.__jobs = Queue() - self.__active_workers_lock = RLock() - self.__active_workers = 0 + self.__worker_count_lock = RLock() + self.__worker_count = 0 + self.__active_worker_count = 0 self.__shutting_down = False logger = logging.getLogger('threadpool') @@ -158,7 +170,7 @@ class ThreadPool: logger = logging.getLogger("threadpool") logger.info("shutting down") - with self.__active_workers_lock: + with self.__worker_count_lock: self.__shutting_down = True self.__max_workers = 0 self.__kill_workers_after = 0 @@ -166,9 +178,9 @@ class ThreadPool: retries_left = clean_shutdown_reties while (retries_left > 0): - with self.__active_workers_lock: - logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__active_workers)) - if (self.__active_workers > 0): + with self.__worker_count_lock: + logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__worker_count)) + if (self.__worker_count > 0): retries_left -= 1 else: retries_left = 0 @@ -176,9 +188,9 @@ class ThreadPool: sleep(wait_for_workers_period) - with self.__active_workers_lock: - if (self.__active_workers > 0): - logger.warning("shutdown stopped waiting. Still %i active workers"%self.__active_workers) + with self.__worker_count_lock: + if (self.__worker_count > 0): + logger.warning("shutdown stopped waiting. Still %i active workers"%self.__worker_count) clean_shutdown = False else: clean_shutdown = True @@ -192,16 +204,24 @@ class ThreadPool: Called by worker to update worker count when the worker is shutting down ''' - with self.__active_workers_lock: - self.__active_workers -= 1 + with self.__worker_count_lock: + self.__worker_count -= 1 def __new_worker(self): ''' Adding a new worker thread to the thread pool ''' - with self.__active_workers_lock: + with self.__worker_count_lock: ThreadPool.Worker(self) - self.__active_workers += 1 + self.__worker_count += 1 + + def worker_active(self): + with self.__worker_count_lock: + self.__active_worker_count = self.__active_worker_count + 1 + + def worker_inactive(self): + with self.__worker_count_lock: + self.__active_worker_count = self.__active_worker_count - 1 def add_job(self, function, args = None, return_callback=None): ''' @@ -220,16 +240,14 @@ class ThreadPool: job = ThreadPool.Job(function, args, return_callback) - with self.__active_workers_lock: + with self.__worker_count_lock: if (self.__shutting_down): raise AddJobException("ThreadPool is shutting down") - + try: start_new_worker = False - if (self.__active_workers < self.__max_workers): - #DIY fixed.... FIXME - #http://code.google.com/p/pythonthreadpool/issues/detail?id=4 - #if (self.__active_workers == 0 or not self.__jobs.empty()): + if (self.__worker_count < self.__max_workers): + if (self.__active_worker_count == self.__worker_count): start_new_worker = True self.__jobs.put(job)