Reintroduce missing changes

This commit is contained in:
Hugo Posnic 2017-11-18 16:25:49 +01:00
parent 648f78c139
commit 461ae9fd57

View file

@ -2,13 +2,13 @@
ThreadPool Implementation ThreadPool Implementation
@author: Morten Holdflod Moeller - morten@holdflod.dk @author: Morten Holdflod Moeller - morten@holdflod.dk
@license: LGPL v3 @license: LGPL v3
''' '''
from __future__ import with_statement from __future__ import with_statement
from threading import Thread, RLock from threading import Thread, RLock
from time import sleep from time import sleep
from Queue import Queue, Empty from queue import Queue, Empty
import logging import logging
import sys import sys
@ -31,7 +31,7 @@ class ThreadPoolMixIn:
self.__private_threadpool = True self.__private_threadpool = True
else: else:
self.__private_threadpool = False self.__private_threadpool = False
self.__threadpool = threadpool self.__threadpool = threadpool
def process_request_thread(self, request, client_address): def process_request_thread(self, request, client_address):
@ -48,11 +48,11 @@ class ThreadPoolMixIn:
self.close_request(request) self.close_request(request)
def process_request(self, request, client_address): def process_request(self, request, client_address):
self.__threadpool.add_job(self.process_request_thread, [request, client_address]) self.__threadpool.add_job(self.process_request_thread, [request, client_address])
def shutdown(self): def shutdown(self):
if (self.__private_threadpool): self.__threadpool.shutdown() if (self.__private_threadpool): self.__threadpool.shutdown()
class AddJobException(Exception): class AddJobException(Exception):
''' '''
@ -61,64 +61,64 @@ class AddJobException(Exception):
''' '''
def __init__(self, msg): def __init__(self, msg):
Exception.__init__(self, msg) Exception.__init__(self, msg)
class ThreadPool: class ThreadPool:
''' '''
The class implementing the ThreadPool. The class implementing the ThreadPool.
Instantiate and add jobs using add_job(func, args_list) Instantiate and add jobs using add_job(func, args_list)
''' '''
class Job: #IGNORE:R0903 class Job: #IGNORE:R0903
''' '''
Class encapsulating a job to be handled Class encapsulating a job to be handled
by ThreadPool workers by ThreadPool workers
''' '''
def __init__(self, function, args, return_callback=None): def __init__(self, function, args, return_callback=None):
self.callable = function self.callable = function
self.arguments = args self.arguments = args
self.return_callback = return_callback self.return_callback = return_callback
def execute(self): def execute(self):
''' '''
Called to execute the function Called to execute the function
''' '''
try: try:
return_value = self.callable(*self.arguments) #IGNORE:W0142 return_value = self.callable(*self.arguments) #IGNORE:W0142
except Exception, excep: #IGNORE:W0703 except Exception as excep: #IGNORE:W0703
logger = logging.getLogger("threadpool.worker") logger = logging.getLogger("threadpool.worker")
logger.warning("A job in the ThreadPool raised an exception: " + excep) logger.warning("A job in the ThreadPool raised an exception: " + excep)
#else do nothing cause we don't know what to do... #else do nothing cause we don't know what to do...
return return
try: try:
if (self.return_callback != None): if (self.return_callback != None):
self.return_callback(return_value) self.return_callback(return_value)
except Exception, _: #IGNORE:W0703 everything could go wrong... except Exception as _: #IGNORE:W0703 everything could go wrong...
logger = logging.getLogger('threadpool') logger = logging.getLogger('threadpool')
logger.warning('Error while delivering return value to callback function') logger.warning('Error while delivering return value to callback function')
class Worker(Thread): class Worker(Thread):
''' '''
A worker thread handling jobs in the thread pool A worker thread handling jobs in the thread pool
job queue job queue
''' '''
def __init__(self, pool): def __init__(self, pool):
Thread.__init__(self) Thread.__init__(self)
if (not isinstance(pool, ThreadPool)): if (not isinstance(pool, ThreadPool)):
raise TypeError("pool is not a ThreadPool instance") raise TypeError("pool is not a ThreadPool instance")
self.pool = pool self.pool = pool
self.alive = True self.alive = True
self.start() self.start()
def run(self): def run(self):
''' '''
The workers main-loop getting jobs from queue The workers main-loop getting jobs from queue
and executing them and executing them
''' '''
while self.alive: while self.alive:
@ -130,83 +130,83 @@ class ThreadPool:
self.pool.worker_inactive() self.pool.worker_inactive()
else: else:
self.alive = False self.alive = False
self.pool.punch_out() self.pool.punch_out()
def __init__(self, max_workers = 5, kill_workers_after = 3): def __init__(self, max_workers = 5, kill_workers_after = 3):
if (not isinstance(max_workers, int)): if (not isinstance(max_workers, int)):
raise TypeError("max_workers is not an int") raise TypeError("max_workers is not an int")
if (max_workers < 1): if (max_workers < 1):
raise ValueError('max_workers must be >= 1') raise ValueError('max_workers must be >= 1')
if (not isinstance(kill_workers_after, int)): if (not isinstance(kill_workers_after, int)):
raise TypeError("kill_workers_after is not an int") raise TypeError("kill_workers_after is not an int")
self.__max_workers = max_workers self.__max_workers = max_workers
self.__kill_workers_after = kill_workers_after self.__kill_workers_after = kill_workers_after
# This Queue is assumed Thread Safe # This Queue is assumed Thread Safe
self.__jobs = Queue() self.__jobs = Queue()
self.__worker_count_lock = RLock() self.__worker_count_lock = RLock()
self.__worker_count = 0 self.__worker_count = 0
self.__active_worker_count = 0 self.__active_worker_count = 0
self.__shutting_down = False self.__shutting_down = False
logger = logging.getLogger('threadpool') logger = logging.getLogger('threadpool')
logger.info('started') logger.info('started')
def shutdown(self, wait_for_workers_period = 1, clean_shutdown_reties = 5): def shutdown(self, wait_for_workers_period = 1, clean_shutdown_reties = 5):
if (not isinstance(clean_shutdown_reties, int)): if (not isinstance(clean_shutdown_reties, int)):
raise TypeError("clean_shutdown_reties is not an int") raise TypeError("clean_shutdown_reties is not an int")
if (not clean_shutdown_reties >= 0): if (not clean_shutdown_reties >= 0):
raise ValueError('clean_shutdown_reties must be >= 0') raise ValueError('clean_shutdown_reties must be >= 0')
if (not isinstance(wait_for_workers_period, int)): if (not isinstance(wait_for_workers_period, int)):
raise TypeError("wait_for_workers_period is not an int") raise TypeError("wait_for_workers_period is not an int")
if (not wait_for_workers_period >= 0): if (not wait_for_workers_period >= 0):
raise ValueError('wait_for_workers_period must be >= 0') raise ValueError('wait_for_workers_period must be >= 0')
logger = logging.getLogger("threadpool") logger = logging.getLogger("threadpool")
logger.info("shutting down") logger.info("shutting down")
with self.__worker_count_lock: with self.__worker_count_lock:
self.__shutting_down = True self.__shutting_down = True
self.__max_workers = 0 self.__max_workers = 0
self.__kill_workers_after = 0 self.__kill_workers_after = 0
retries_left = clean_shutdown_reties retries_left = clean_shutdown_reties
while (retries_left > 0): while (retries_left > 0):
with self.__worker_count_lock: with self.__worker_count_lock:
logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__worker_count)) logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__worker_count))
if (self.__worker_count > 0): if (self.__worker_count > 0):
retries_left -= 1 retries_left -= 1
else: else:
retries_left = 0 retries_left = 0
sleep(wait_for_workers_period) sleep(wait_for_workers_period)
with self.__worker_count_lock: with self.__worker_count_lock:
if (self.__worker_count > 0): if (self.__worker_count > 0):
logger.warning("shutdown stopped waiting. Still %i active workers"%self.__worker_count) logger.warning("shutdown stopped waiting. Still %i active workers"%self.__worker_count)
clean_shutdown = False clean_shutdown = False
else: else:
clean_shutdown = True clean_shutdown = True
logger.info("shutdown complete") logger.info("shutdown complete")
return clean_shutdown return clean_shutdown
def punch_out(self): def punch_out(self):
''' '''
Called by worker to update worker count Called by worker to update worker count
when the worker is shutting down when the worker is shutting down
''' '''
with self.__worker_count_lock: with self.__worker_count_lock:
self.__worker_count -= 1 self.__worker_count -= 1
def __new_worker(self): def __new_worker(self):
''' '''
Adding a new worker thread to the thread pool Adding a new worker thread to the thread pool
@ -214,58 +214,58 @@ class ThreadPool:
with self.__worker_count_lock: with self.__worker_count_lock:
ThreadPool.Worker(self) ThreadPool.Worker(self)
self.__worker_count += 1 self.__worker_count += 1
def worker_active(self): def worker_active(self):
with self.__worker_count_lock: with self.__worker_count_lock:
self.__active_worker_count = self.__active_worker_count + 1 self.__active_worker_count = self.__active_worker_count + 1
def worker_inactive(self): def worker_inactive(self):
with self.__worker_count_lock: with self.__worker_count_lock:
self.__active_worker_count = self.__active_worker_count - 1 self.__active_worker_count = self.__active_worker_count - 1
def add_job(self, function, args = None, return_callback=None): def add_job(self, function, args = None, return_callback=None):
''' '''
Put new job into queue Put new job into queue
''' '''
if (not callable(function)): if (not callable(function)):
raise TypeError("function is not a callable") raise TypeError("function is not a callable")
if (not ( args == None or isinstance(args, list))): if (not ( args == None or isinstance(args, list))):
raise TypeError("args is not a list") raise TypeError("args is not a list")
if (not (return_callback == None or callable(return_callback))): if (not (return_callback == None or callable(return_callback))):
raise TypeError("return_callback is not a callable") raise TypeError("return_callback is not a callable")
if (args == None): if (args == None):
args = [] args = []
job = ThreadPool.Job(function, args, return_callback) job = ThreadPool.Job(function, args, return_callback)
with self.__worker_count_lock: with self.__worker_count_lock:
if (self.__shutting_down): if (self.__shutting_down):
raise AddJobException("ThreadPool is shutting down") raise AddJobException("ThreadPool is shutting down")
try: try:
start_new_worker = False start_new_worker = False
if (self.__worker_count < self.__max_workers): if (self.__worker_count < self.__max_workers):
if (self.__active_worker_count == self.__worker_count): if (self.__active_worker_count == self.__worker_count):
start_new_worker = True start_new_worker = True
self.__jobs.put(job) self.__jobs.put(job)
if (start_new_worker): if (start_new_worker):
self.__new_worker() self.__new_worker()
except Exception: except Exception:
raise AddJobException("Could not add job") raise AddJobException("Could not add job")
def get_job(self): def get_job(self):
''' '''
Retrieve next job from queue Retrieve next job from queue
workers die (and should) when workers die (and should) when
returning None returning None
''' '''
job = None job = None
try: try:
if (self.__kill_workers_after < 0): if (self.__kill_workers_after < 0):
@ -276,5 +276,5 @@ class ThreadPool:
job = self.__jobs.get(True, self.__kill_workers_after) job = self.__jobs.get(True, self.__kill_workers_after)
except Empty: except Empty:
job = None job = None
return job return job