diff --git a/src/trimage/ThreadPool.py b/src/trimage/ThreadPool.py deleted file mode 100644 index 7f92344..0000000 --- a/src/trimage/ThreadPool.py +++ /dev/null @@ -1,280 +0,0 @@ -''' -ThreadPool Implementation - -@author: Morten Holdflod Moeller - morten@holdflod.dk -@license: LGPL v3 -''' - -from __future__ import with_statement -from threading import Thread, RLock -from time import sleep -from queue import Queue, Empty -import logging -import sys - -class NullHandler(logging.Handler): - def emit(self, record): - pass - -h = sys.stderr -logging.getLogger('threadpool').addHandler(h) -logging.getLogger('threadpool.worker').addHandler(h) - - - -class ThreadPoolMixIn: - """Mix-in class to handle each request in a new thread from the ThreadPool.""" - - def __init__(self, threadpool=None): - if (threadpool == None): - threadpool = ThreadPool() - self.__private_threadpool = True - else: - self.__private_threadpool = False - - self.__threadpool = threadpool - - def process_request_thread(self, request, client_address): - """Same as in BaseServer but as a thread. - - In addition, exception handling is done here. - - """ - try: - self.finish_request(request, client_address) - self.close_request(request) - except: - self.handle_error(request, client_address) #IGNORE:W0702 - self.close_request(request) - - def process_request(self, request, client_address): - self.__threadpool.add_job(self.process_request_thread, [request, client_address]) - - def shutdown(self): - if (self.__private_threadpool): self.__threadpool.shutdown() - - -class AddJobException(Exception): - ''' - Exceptoion raised when a Job could not be added - to the queue - ''' - def __init__(self, msg): - Exception.__init__(self, msg) - - -class ThreadPool: - ''' - The class implementing the ThreadPool. - - Instantiate and add jobs using add_job(func, args_list) - ''' - - class Job: #IGNORE:R0903 - ''' - Class encapsulating a job to be handled - by ThreadPool workers - ''' - def __init__(self, function, args, return_callback=None): - self.callable = function - self.arguments = args - self.return_callback = return_callback - - def execute(self): - ''' - Called to execute the function - ''' - try: - return_value = self.callable(*self.arguments) #IGNORE:W0142 - except Exception as excep: #IGNORE:W0703 - logger = logging.getLogger("threadpool.worker") - logger.warning("A job in the ThreadPool raised an exception: " + excep) - #else do nothing cause we don't know what to do... - return - - try: - if (self.return_callback != None): - self.return_callback(return_value) - except Exception as _: #IGNORE:W0703 everything could go wrong... - logger = logging.getLogger('threadpool') - logger.warning('Error while delivering return value to callback function') - - class Worker(Thread): - ''' - A worker thread handling jobs in the thread pool - job queue - ''' - - def __init__(self, pool): - Thread.__init__(self) - - if (not isinstance(pool, ThreadPool)): - raise TypeError("pool is not a ThreadPool instance") - - self.pool = pool - - self.alive = True - self.start() - - def run(self): - ''' - The workers main-loop getting jobs from queue - and executing them - ''' - while self.alive: - #print self.pool.__active_worker_count, self.pool.__worker_count - job = self.pool.get_job() - if (job != None): - self.pool.worker_active() - job.execute() - self.pool.worker_inactive() - else: - self.alive = False - - self.pool.punch_out() - - def __init__(self, max_workers = 5, kill_workers_after = 3): - if (not isinstance(max_workers, int)): - raise TypeError("max_workers is not an int") - if (max_workers < 1): - raise ValueError('max_workers must be >= 1') - - if (not isinstance(kill_workers_after, int)): - raise TypeError("kill_workers_after is not an int") - - self.__max_workers = max_workers - self.__kill_workers_after = kill_workers_after - - # This Queue is assumed Thread Safe - self.__jobs = Queue() - - self.__worker_count_lock = RLock() - self.__worker_count = 0 - self.__active_worker_count = 0 - - self.__shutting_down = False - logger = logging.getLogger('threadpool') - logger.info('started') - - def shutdown(self, wait_for_workers_period = 1, clean_shutdown_reties = 5): - if (not isinstance(clean_shutdown_reties, int)): - raise TypeError("clean_shutdown_reties is not an int") - if (not clean_shutdown_reties >= 0): - raise ValueError('clean_shutdown_reties must be >= 0') - - if (not isinstance(wait_for_workers_period, int)): - raise TypeError("wait_for_workers_period is not an int") - if (not wait_for_workers_period >= 0): - raise ValueError('wait_for_workers_period must be >= 0') - - logger = logging.getLogger("threadpool") - logger.info("shutting down") - - with self.__worker_count_lock: - self.__shutting_down = True - self.__max_workers = 0 - self.__kill_workers_after = 0 - - retries_left = clean_shutdown_reties - while (retries_left > 0): - - with self.__worker_count_lock: - logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__worker_count)) - if (self.__worker_count > 0): - retries_left -= 1 - else: - retries_left = 0 - - sleep(wait_for_workers_period) - - - with self.__worker_count_lock: - if (self.__worker_count > 0): - logger.warning("shutdown stopped waiting. Still %i active workers"%self.__worker_count) - clean_shutdown = False - else: - clean_shutdown = True - - logger.info("shutdown complete") - - return clean_shutdown - - def punch_out(self): - ''' - Called by worker to update worker count - when the worker is shutting down - ''' - with self.__worker_count_lock: - self.__worker_count -= 1 - - def __new_worker(self): - ''' - Adding a new worker thread to the thread pool - ''' - with self.__worker_count_lock: - ThreadPool.Worker(self) - self.__worker_count += 1 - - def worker_active(self): - with self.__worker_count_lock: - self.__active_worker_count = self.__active_worker_count + 1 - - def worker_inactive(self): - with self.__worker_count_lock: - self.__active_worker_count = self.__active_worker_count - 1 - - def add_job(self, function, args = None, return_callback=None): - ''' - Put new job into queue - ''' - - if (not callable(function)): - raise TypeError("function is not a callable") - if (not ( args == None or isinstance(args, list))): - raise TypeError("args is not a list") - if (not (return_callback == None or callable(return_callback))): - raise TypeError("return_callback is not a callable") - - if (args == None): - args = [] - - job = ThreadPool.Job(function, args, return_callback) - - with self.__worker_count_lock: - if (self.__shutting_down): - raise AddJobException("ThreadPool is shutting down") - - try: - start_new_worker = False - if (self.__worker_count < self.__max_workers): - if (self.__active_worker_count == self.__worker_count): - start_new_worker = True - - self.__jobs.put(job) - - if (start_new_worker): - self.__new_worker() - - except Exception: - raise AddJobException("Could not add job") - - - def get_job(self): - ''' - Retrieve next job from queue - workers die (and should) when - returning None - ''' - - job = None - try: - if (self.__kill_workers_after < 0): - job = self.__jobs.get(True) - elif (self.__kill_workers_after == 0): - job = self.__jobs.get(False) - else: - job = self.__jobs.get(True, self.__kill_workers_after) - except Empty: - job = None - - return job