Merge branch 'parallel' of git://github.com/kalmi/Trimage into parallel

This commit is contained in:
Kilian Valkhof 2010-04-01 13:13:44 +02:00
commit a5da8a9a58
2 changed files with 42 additions and 23 deletions

View file

@ -3,9 +3,6 @@ ThreadPool Implementation
@author: Morten Holdflod Moeller - morten@holdflod.dk
@license: LGPL v3
(A buggy line commented out by Kalman Tarnay...
see: http://code.google.com/p/pythonthreadpool/issues/detail?id=4 )
'''
from __future__ import with_statement
@ -13,6 +10,17 @@ from threading import Thread, RLock
from time import sleep
from Queue import Queue, Empty
import logging
import sys
class NullHandler(logging.Handler):
def emit(self, record):
pass
h = sys.stderr
logging.getLogger('threadpool').addHandler(h)
logging.getLogger('threadpool.worker').addHandler(h)
class ThreadPoolMixIn:
"""Mix-in class to handle each request in a new thread from the ThreadPool."""
@ -114,9 +122,12 @@ class ThreadPool:
and executing them
'''
while self.alive:
#print self.pool.__active_worker_count, self.pool.__worker_count
job = self.pool.get_job()
if (job != None):
self.pool.worker_active()
job.execute()
self.pool.worker_inactive()
else:
self.alive = False
@ -137,8 +148,9 @@ class ThreadPool:
# This Queue is assumed Thread Safe
self.__jobs = Queue()
self.__active_workers_lock = RLock()
self.__active_workers = 0
self.__worker_count_lock = RLock()
self.__worker_count = 0
self.__active_worker_count = 0
self.__shutting_down = False
logger = logging.getLogger('threadpool')
@ -158,7 +170,7 @@ class ThreadPool:
logger = logging.getLogger("threadpool")
logger.info("shutting down")
with self.__active_workers_lock:
with self.__worker_count_lock:
self.__shutting_down = True
self.__max_workers = 0
self.__kill_workers_after = 0
@ -166,9 +178,9 @@ class ThreadPool:
retries_left = clean_shutdown_reties
while (retries_left > 0):
with self.__active_workers_lock:
logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__active_workers))
if (self.__active_workers > 0):
with self.__worker_count_lock:
logger.info("waiting for workers to shut down (%i), %i workers left"%(retries_left, self.__worker_count))
if (self.__worker_count > 0):
retries_left -= 1
else:
retries_left = 0
@ -176,9 +188,9 @@ class ThreadPool:
sleep(wait_for_workers_period)
with self.__active_workers_lock:
if (self.__active_workers > 0):
logger.warning("shutdown stopped waiting. Still %i active workers"%self.__active_workers)
with self.__worker_count_lock:
if (self.__worker_count > 0):
logger.warning("shutdown stopped waiting. Still %i active workers"%self.__worker_count)
clean_shutdown = False
else:
clean_shutdown = True
@ -192,16 +204,24 @@ class ThreadPool:
Called by worker to update worker count
when the worker is shutting down
'''
with self.__active_workers_lock:
self.__active_workers -= 1
with self.__worker_count_lock:
self.__worker_count -= 1
def __new_worker(self):
'''
Adding a new worker thread to the thread pool
'''
with self.__active_workers_lock:
with self.__worker_count_lock:
ThreadPool.Worker(self)
self.__active_workers += 1
self.__worker_count += 1
def worker_active(self):
with self.__worker_count_lock:
self.__active_worker_count = self.__active_worker_count + 1
def worker_inactive(self):
with self.__worker_count_lock:
self.__active_worker_count = self.__active_worker_count - 1
def add_job(self, function, args = None, return_callback=None):
'''
@ -220,16 +240,14 @@ class ThreadPool:
job = ThreadPool.Job(function, args, return_callback)
with self.__active_workers_lock:
with self.__worker_count_lock:
if (self.__shutting_down):
raise AddJobException("ThreadPool is shutting down")
try:
start_new_worker = False
if (self.__active_workers < self.__max_workers):
#DIY fixed.... FIXME
#http://code.google.com/p/pythonthreadpool/issues/detail?id=4
#if (self.__active_workers == 0 or not self.__jobs.empty()):
if (self.__worker_count < self.__max_workers):
if (self.__active_worker_count == self.__worker_count):
start_new_worker = True
self.__jobs.put(job)

View file

@ -1,5 +1,5 @@
#!/usr/bin/python
import time
import sys
import errno
from os import listdir
@ -329,6 +329,7 @@ class Worker(QThread):
def compress_file(self, images, showapp, verbose, imagelist):
"""Start the worker thread."""
for image in images:
time.sleep(0.05) #FIXME: Workaround http://code.google.com/p/pythonthreadpool/issues/detail?id=5
self.threadpool.add_job(image.compress, None, return_callback=self.toDisplay.put)
self.showapp = showapp
self.verbose = verbose