ThreadPool.py r15

bugfix release from ThreadPool.py's author
http://code.google.com/p/pythonthreadpool/issues/detail?id=4
This commit is contained in:
Kálmán Tarnay 2010-03-31 22:54:56 +02:00
parent a3a31a7e93
commit 305198e09e

View file

@ -3,9 +3,6 @@ ThreadPool Implementation
@author: Morten Holdflod Moeller - morten@holdflod.dk @author: Morten Holdflod Moeller - morten@holdflod.dk
@license: LGPL v3 @license: LGPL v3
(A buggy line commented out by Kalman Tarnay...
see: http://code.google.com/p/pythonthreadpool/issues/detail?id=4 )
''' '''
from __future__ import with_statement from __future__ import with_statement
@ -13,6 +10,17 @@ from threading import Thread, RLock
from time import sleep from time import sleep
from Queue import Queue, Empty from Queue import Queue, Empty
import logging import logging
import sys
class NullHandler(logging.Handler):
def emit(self, record):
pass
h = sys.stderr
logging.getLogger('threadpool').addHandler(h)
logging.getLogger('threadpool.worker').addHandler(h)
class ThreadPoolMixIn: class ThreadPoolMixIn:
"""Mix-in class to handle each request in a new thread from the ThreadPool.""" """Mix-in class to handle each request in a new thread from the ThreadPool."""
@ -114,9 +122,12 @@ class ThreadPool:
and executing them and executing them
''' '''
while self.alive: while self.alive:
#print self.pool.__active_worker_count, self.pool.__worker_count
job = self.pool.get_job() job = self.pool.get_job()
if (job != None): if (job != None):
self.pool.worker_active()
job.execute() job.execute()
self.pool.worker_inactive()
else: else:
self.alive = False self.alive = False
@ -137,8 +148,9 @@ class ThreadPool:
# This Queue is assumed Thread Safe # This Queue is assumed Thread Safe
self.__jobs = Queue() self.__jobs = Queue()
self.__active_workers_lock = RLock() self.__worker_count_lock = RLock()
self.__active_workers = 0 self.__worker_count = 0
self.__active_worker_count = 0
self.__shutting_down = False self.__shutting_down = False
logger = logging.getLogger('threadpool') logger = logging.getLogger('threadpool')
@ -158,7 +170,7 @@ class ThreadPool:
logger = logging.getLogger("threadpool") logger = logging.getLogger("threadpool")
logger.info("shutting down") logger.info("shutting down")
with self.__active_workers_lock: with self.__worker_count_lock:
self.__shutting_down = True self.__shutting_down = True
self.__max_workers = 0 self.__max_workers = 0
self.__kill_workers_after = 0 self.__kill_workers_after = 0
@ -166,9 +178,9 @@ class ThreadPool:
retries_left = clean_shutdown_reties retries_left = clean_shutdown_reties
while (retries_left > 0): while (retries_left > 0):
with self.__active_workers_lock: with self.__worker_count_lock:
logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__active_workers)) logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__worker_count))
if (self.__active_workers > 0): if (self.__worker_count > 0):
retries_left -= 1 retries_left -= 1
else: else:
retries_left = 0 retries_left = 0
@ -176,9 +188,9 @@ class ThreadPool:
sleep(wait_for_workers_period) sleep(wait_for_workers_period)
with self.__active_workers_lock: with self.__worker_count_lock:
if (self.__active_workers > 0): if (self.__worker_count > 0):
logger.warning("shutdown stopped waiting. Still %i active workers"%self.__active_workers) logger.warning("shutdown stopped waiting. Still %i active workers"%self.__worker_count)
clean_shutdown = False clean_shutdown = False
else: else:
clean_shutdown = True clean_shutdown = True
@ -192,16 +204,24 @@ class ThreadPool:
Called by worker to update worker count Called by worker to update worker count
when the worker is shutting down when the worker is shutting down
''' '''
with self.__active_workers_lock: with self.__worker_count_lock:
self.__active_workers -= 1 self.__worker_count -= 1
def __new_worker(self): def __new_worker(self):
''' '''
Adding a new worker thread to the thread pool Adding a new worker thread to the thread pool
''' '''
with self.__active_workers_lock: with self.__worker_count_lock:
ThreadPool.Worker(self) ThreadPool.Worker(self)
self.__active_workers += 1 self.__worker_count += 1
def worker_active(self):
with self.__worker_count_lock:
self.__active_worker_count = self.__active_worker_count + 1
def worker_inactive(self):
with self.__worker_count_lock:
self.__active_worker_count = self.__active_worker_count - 1
def add_job(self, function, args = None, return_callback=None): def add_job(self, function, args = None, return_callback=None):
''' '''
@ -220,16 +240,14 @@ class ThreadPool:
job = ThreadPool.Job(function, args, return_callback) job = ThreadPool.Job(function, args, return_callback)
with self.__active_workers_lock: with self.__worker_count_lock:
if (self.__shutting_down): if (self.__shutting_down):
raise AddJobException("ThreadPool is shutting down") raise AddJobException("ThreadPool is shutting down")
try: try:
start_new_worker = False start_new_worker = False
if (self.__active_workers < self.__max_workers): if (self.__worker_count < self.__max_workers):
#DIY fixed.... FIXME if (self.__active_worker_count == self.__worker_count):
#http://code.google.com/p/pythonthreadpool/issues/detail?id=4
#if (self.__active_workers == 0 or not self.__jobs.empty()):
start_new_worker = True start_new_worker = True
self.__jobs.put(job) self.__jobs.put(job)