mirror of
https://github.com/puppetlabs/vmpooler.git
synced 2026-01-27 02:18:41 -05:00
(POOLER-52) Enforce task limit on providers
This commit changes the model of how provider connections are managed in pool_manager. Instead of creating a connection per pool a set of connections are used, limited by the existing task_limit parameter. Without this change pool_manager runs a nested loop for each pool, maintaining a provider connection.
This commit is contained in:
parent
d4a50e5e56
commit
3b24446997
1 changed files with 58 additions and 25 deletions
|
|
@ -26,6 +26,11 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def slots_available?(slots, max_slots)
|
||||||
|
# Returns slots in use unless max is reached
|
||||||
|
return slots unless slots >= max_slots
|
||||||
|
end
|
||||||
|
|
||||||
def open_socket(host, domain=nil, timeout=5, port=22, &block)
|
def open_socket(host, domain=nil, timeout=5, port=22, &block)
|
||||||
Timeout.timeout(timeout) do
|
Timeout.timeout(timeout) do
|
||||||
target_host = host
|
target_host = host
|
||||||
|
|
@ -569,23 +574,38 @@ module Vmpooler
|
||||||
finish
|
finish
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_pool(pool, maxloop = 0, loop_delay = 5)
|
def evaluate_pool(pool, slots, maxloop = 0, loop_delay = 1)
|
||||||
$logger.log('d', "[*] [#{pool['name']}] starting worker thread")
|
return if $redis.hget("vmpooler__pool__#{pool['name']}", 'slot')
|
||||||
|
checking_pools = $redis.smembers('vmpooler__check__pool__pending')
|
||||||
$providers[pool['name']] ||= Vmpooler::VsphereHelper.new $config, $metrics
|
return if checking_pools.include? pool['name']
|
||||||
|
$redis.sadd('vmpooler__check__pool__pending', pool['name'])
|
||||||
$threads[pool['name']] = Thread.new do
|
loop_count = 1
|
||||||
loop_count = 1
|
while (slots_available?($redis.smembers('vmpooler__check__pool').count, slots.to_i).nil?)
|
||||||
loop do
|
sleep(loop_delay * 3)
|
||||||
_check_pool(pool, $providers[pool['name']])
|
unless maxloop.zero?
|
||||||
sleep(loop_delay)
|
return if loop_count >= maxloop
|
||||||
|
loop_count += 1
|
||||||
unless maxloop.zero?
|
|
||||||
break if loop_count >= maxloop
|
|
||||||
loop_count += 1
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
slots_free = slots_available?($redis.smembers('vmpooler__check__pool').count, slots)
|
||||||
|
next_slot = (slots_free.to_i + 1).to_s
|
||||||
|
check_pool(pool, next_slot)
|
||||||
|
rescue => err
|
||||||
|
$logger.log('d', "[!] [#{pool['name']}] evaluating failed with an error: #{err}")
|
||||||
|
end
|
||||||
|
|
||||||
|
def check_pool(pool, slot)
|
||||||
|
$redis.sadd('vmpooler__check__pool', pool['name'])
|
||||||
|
$redis.srem('vmpooler__check__pool__pending', pool['name'])
|
||||||
|
$redis.hset("vmpooler__pool__#{pool['name']}", 'slot', slot)
|
||||||
|
|
||||||
|
$providers[slot] ||= Vmpooler::VsphereHelper.new $config, $metrics
|
||||||
|
|
||||||
|
$threads[slot] = Thread.new do
|
||||||
|
_check_pool(pool, $providers[slot])
|
||||||
|
end
|
||||||
|
rescue => err
|
||||||
|
$logger.log('d', "[!] [#{pool['name']}] checking failed with an error: #{err}")
|
||||||
end
|
end
|
||||||
|
|
||||||
def _check_pool(pool, provider)
|
def _check_pool(pool, provider)
|
||||||
|
|
@ -731,18 +751,36 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
$redis.srem('vmpooler__check__pool', pool['name'])
|
||||||
|
$redis.hdel("vmpooler__pool__#{pool['name']}", 'slot')
|
||||||
rescue => err
|
rescue => err
|
||||||
$logger.log('d', "[!] [#{pool['name']}] _check_pool failed with an error: #{err}")
|
$logger.log('d', "[!] [#{pool['name']}] _check_pool failed with an error: #{err}")
|
||||||
|
$redis.srem('vmpooler__check__pool', pool['name'])
|
||||||
|
$redis.hdel("vmpooler__pool__#{pool['name']}", 'slot')
|
||||||
raise
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def clean_stale_redis_queues
|
||||||
|
# Clear out the tasks manager, as we don't know about any tasks at this point
|
||||||
|
$redis.set('vmpooler__tasks__clone', 0)
|
||||||
|
# Clear out vmpooler__migrations since stale entries may be left after a restart
|
||||||
|
$redis.del('vmpooler__migration')
|
||||||
|
# Clear out pool entries used to track slot usage
|
||||||
|
to_clear = $redis.smembers('vmpooler__check__pool')
|
||||||
|
to_clear.each do |pool_name|
|
||||||
|
$redis.del("vmpooler__pool__#{pool_name}")
|
||||||
|
end
|
||||||
|
# Clear out vmpooler__check__pool entries to ensure starting with a clean state
|
||||||
|
$redis.del('vmpooler__check__pool')
|
||||||
|
$redis.del('vmpooler__check__pool__pending')
|
||||||
|
end
|
||||||
|
|
||||||
def execute!(maxloop = 0, loop_delay = 1)
|
def execute!(maxloop = 0, loop_delay = 1)
|
||||||
$logger.log('d', 'starting vmpooler')
|
$logger.log('d', 'starting vmpooler')
|
||||||
|
|
||||||
# Clear out the tasks manager, as we don't know about any tasks at this point
|
clean_stale_redis_queues
|
||||||
$redis.set('vmpooler__tasks__clone', 0)
|
|
||||||
# Clear out vmpooler__migrations since stale entries may be left after a restart
|
task_limit = $config[:config]['task_limit']
|
||||||
$redis.del('vmpooler__migration')
|
|
||||||
|
|
||||||
loop_count = 1
|
loop_count = 1
|
||||||
loop do
|
loop do
|
||||||
|
|
@ -761,12 +799,7 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
|
|
||||||
$config[:pools].each do |pool|
|
$config[:pools].each do |pool|
|
||||||
if ! $threads[pool['name']]
|
evaluate_pool(pool, task_limit, maxloop, loop_delay)
|
||||||
check_pool(pool)
|
|
||||||
elsif ! $threads[pool['name']].alive?
|
|
||||||
$logger.log('d', "[!] [#{pool['name']}] worker thread died, restarting")
|
|
||||||
check_pool(pool)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
sleep(loop_delay)
|
sleep(loop_delay)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue