Parallel processing

This commit is contained in:
Kálmán Tarnay 2010-03-30 02:21:00 +02:00
parent ecfc4af6a3
commit a3a31a7e93
2 changed files with 274 additions and 10 deletions

262
src/trimage/ThreadPool.py Normal file
View file

@ -0,0 +1,262 @@
'''
ThreadPool Implementation
@author: Morten Holdflod Moeller - morten@holdflod.dk
@license: LGPL v3
(A buggy line commented out by Kalman Tarnay...
see: http://code.google.com/p/pythonthreadpool/issues/detail?id=4 )
'''
from __future__ import with_statement
from threading import Thread, RLock
from time import sleep
from Queue import Queue, Empty
import logging
class ThreadPoolMixIn:
"""Mix-in class to handle each request in a new thread from the ThreadPool."""
def __init__(self, threadpool=None):
if (threadpool == None):
threadpool = ThreadPool()
self.__private_threadpool = True
else:
self.__private_threadpool = False
self.__threadpool = threadpool
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
self.close_request(request)
except:
self.handle_error(request, client_address) #IGNORE:W0702
self.close_request(request)
def process_request(self, request, client_address):
self.__threadpool.add_job(self.process_request_thread, [request, client_address])
def shutdown(self):
if (self.__private_threadpool): self.__threadpool.shutdown()
class AddJobException(Exception):
'''
Exceptoion raised when a Job could not be added
to the queue
'''
def __init__(self, msg):
Exception.__init__(self, msg)
class ThreadPool:
'''
The class implementing the ThreadPool.
Instantiate and add jobs using add_job(func, args_list)
'''
class Job: #IGNORE:R0903
'''
Class encapsulating a job to be handled
by ThreadPool workers
'''
def __init__(self, function, args, return_callback=None):
self.callable = function
self.arguments = args
self.return_callback = return_callback
def execute(self):
'''
Called to execute the function
'''
try:
return_value = self.callable(*self.arguments) #IGNORE:W0142
except Exception, excep: #IGNORE:W0703
logger = logging.getLogger("threadpool.worker")
logger.warning("A job in the ThreadPool raised an exception: " + excep)
#else do nothing cause we don't know what to do...
return
try:
if (self.return_callback != None):
self.return_callback(return_value)
except Exception, _: #IGNORE:W0703 everything could go wrong...
logger = logging.getLogger('threadpool')
logger.warning('Error while delivering return value to callback function')
class Worker(Thread):
'''
A worker thread handling jobs in the thread pool
job queue
'''
def __init__(self, pool):
Thread.__init__(self)
if (not isinstance(pool, ThreadPool)):
raise TypeError("pool is not a ThreadPool instance")
self.pool = pool
self.alive = True
self.start()
def run(self):
'''
The workers main-loop getting jobs from queue
and executing them
'''
while self.alive:
job = self.pool.get_job()
if (job != None):
job.execute()
else:
self.alive = False
self.pool.punch_out()
def __init__(self, max_workers = 5, kill_workers_after = 3):
if (not isinstance(max_workers, int)):
raise TypeError("max_workers is not an int")
if (max_workers < 1):
raise ValueError('max_workers must be >= 1')
if (not isinstance(kill_workers_after, int)):
raise TypeError("kill_workers_after is not an int")
self.__max_workers = max_workers
self.__kill_workers_after = kill_workers_after
# This Queue is assumed Thread Safe
self.__jobs = Queue()
self.__active_workers_lock = RLock()
self.__active_workers = 0
self.__shutting_down = False
logger = logging.getLogger('threadpool')
logger.info('started')
def shutdown(self, wait_for_workers_period = 1, clean_shutdown_reties = 5):
if (not isinstance(clean_shutdown_reties, int)):
raise TypeError("clean_shutdown_reties is not an int")
if (not clean_shutdown_reties >= 0):
raise ValueError('clean_shutdown_reties must be >= 0')
if (not isinstance(wait_for_workers_period, int)):
raise TypeError("wait_for_workers_period is not an int")
if (not wait_for_workers_period >= 0):
raise ValueError('wait_for_workers_period must be >= 0')
logger = logging.getLogger("threadpool")
logger.info("shutting down")
with self.__active_workers_lock:
self.__shutting_down = True
self.__max_workers = 0
self.__kill_workers_after = 0
retries_left = clean_shutdown_reties
while (retries_left > 0):
with self.__active_workers_lock:
logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__active_workers))
if (self.__active_workers > 0):
retries_left -= 1
else:
retries_left = 0
sleep(wait_for_workers_period)
with self.__active_workers_lock:
if (self.__active_workers > 0):
logger.warning("shutdown stopped waiting. Still %i active workers"%self.__active_workers)
clean_shutdown = False
else:
clean_shutdown = True
logger.info("shutdown complete")
return clean_shutdown
def punch_out(self):
'''
Called by worker to update worker count
when the worker is shutting down
'''
with self.__active_workers_lock:
self.__active_workers -= 1
def __new_worker(self):
'''
Adding a new worker thread to the thread pool
'''
with self.__active_workers_lock:
ThreadPool.Worker(self)
self.__active_workers += 1
def add_job(self, function, args = None, return_callback=None):
'''
Put new job into queue
'''
if (not callable(function)):
raise TypeError("function is not a callable")
if (not ( args == None or isinstance(args, list))):
raise TypeError("args is not a list")
if (not (return_callback == None or callable(return_callback))):
raise TypeError("return_callback is not a callable")
if (args == None):
args = []
job = ThreadPool.Job(function, args, return_callback)
with self.__active_workers_lock:
if (self.__shutting_down):
raise AddJobException("ThreadPool is shutting down")
try:
start_new_worker = False
if (self.__active_workers < self.__max_workers):
#DIY fixed.... FIXME
#http://code.google.com/p/pythonthreadpool/issues/detail?id=4
#if (self.__active_workers == 0 or not self.__jobs.empty()):
start_new_worker = True
self.__jobs.put(job)
if (start_new_worker):
self.__new_worker()
except Exception:
raise AddJobException("Could not add job")
def get_job(self):
'''
Retrieve next job from queue
workers die (and should) when
returning None
'''
job = None
try:
if (self.__kill_workers_after < 0):
job = self.__jobs.get(True)
elif (self.__kill_workers_after == 0):
job = self.__jobs.get(False)
else:
job = self.__jobs.get(True, self.__kill_workers_after)
except Empty:
job = None
return job

View file

@ -13,6 +13,8 @@ from hurry.filesize import *
from imghdr import what as determinetype from imghdr import what as determinetype
from Queue import Queue from Queue import Queue
from ThreadPool import ThreadPool
from multiprocessing import cpu_count
from ui import Ui_trimage from ui import Ui_trimage
@ -308,23 +310,23 @@ class Image:
shell = True, stdout=PIPE) shell = True, stdout=PIPE)
if retcode == 0: if retcode == 0:
self.newfilesize = QFile(self.fullpath).size() self.newfilesize = QFile(self.fullpath).size()
return not retcode self.retcode=retcode
return self
class Worker(QThread): class Worker(QThread):
def __init__(self, parent=None): def __init__(self, parent=None):
QThread.__init__(self, parent) QThread.__init__(self, parent)
self.exiting = False self.toDisplay=Queue()
self.toProcess=Queue() self.threadpool = ThreadPool(max_workers=cpu_count())
def __del__(self): def __del__(self):
self.exiting = True self.threadpool.shutdown()
self.wait()
def compress_file(self, images, showapp, verbose, imagelist): def compress_file(self, images, showapp, verbose, imagelist):
"""Start the worker thread.""" """Start the worker thread."""
for image in images: for image in images:
self.toProcess.put(image) self.threadpool.add_job(image.compress, None, return_callback=self.toDisplay.put)
self.showapp = showapp self.showapp = showapp
self.verbose = verbose self.verbose = verbose
self.imagelist = imagelist self.imagelist = imagelist
@ -332,10 +334,10 @@ class Worker(QThread):
def run(self): def run(self):
"""Compress the given file, get data from it and call update_table.""" """Compress the given file, get data from it and call update_table."""
while self.showapp or not self.toProcess.empty(): tp = self.threadpool
image = self.toProcess.get() while self.showapp or not (tp.__active_workers==0 and tp.__jobs.empty()):
success=image.compress() image = self.toDisplay.get()
if success: if image.retcode==0:
#calculate ratio and make a nice string #calculate ratio and make a nice string
oldfilesizestr = size(image.oldfilesize, system=alternative) oldfilesizestr = size(image.oldfilesize, system=alternative)
newfilesizestr = size(image.newfilesize, system=alternative) newfilesizestr = size(image.newfilesize, system=alternative)