From 3b244469975b80f1acc20042eb713d2d40f615a6 Mon Sep 17 00:00:00 2001 From: "kirby@puppetlabs.com" Date: Wed, 12 Apr 2017 09:20:56 -0700 Subject: [PATCH] (POOLER-52) Enforce task limit on providers This commit changes the model of how provider connections are managed in pool_manager. Instead of creating a connection per pool a set of connections are used, limited by the existing task_limit parameter. Without this change pool_manager runs a nested loop for each pool, maintaining a provider connection. --- lib/vmpooler/pool_manager.rb | 83 +++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 25 deletions(-) diff --git a/lib/vmpooler/pool_manager.rb b/lib/vmpooler/pool_manager.rb index c8c033b..d134bba 100644 --- a/lib/vmpooler/pool_manager.rb +++ b/lib/vmpooler/pool_manager.rb @@ -26,6 +26,11 @@ module Vmpooler end end + def slots_available?(slots, max_slots) + # Returns slots in use unless max is reached + return slots unless slots >= max_slots + end + def open_socket(host, domain=nil, timeout=5, port=22, &block) Timeout.timeout(timeout) do target_host = host @@ -569,23 +574,38 @@ module Vmpooler finish end - def check_pool(pool, maxloop = 0, loop_delay = 5) - $logger.log('d', "[*] [#{pool['name']}] starting worker thread") - - $providers[pool['name']] ||= Vmpooler::VsphereHelper.new $config, $metrics - - $threads[pool['name']] = Thread.new do - loop_count = 1 - loop do - _check_pool(pool, $providers[pool['name']]) - sleep(loop_delay) - - unless maxloop.zero? - break if loop_count >= maxloop - loop_count += 1 - end + def evaluate_pool(pool, slots, maxloop = 0, loop_delay = 1) + return if $redis.hget("vmpooler__pool__#{pool['name']}", 'slot') + checking_pools = $redis.smembers('vmpooler__check__pool__pending') + return if checking_pools.include? pool['name'] + $redis.sadd('vmpooler__check__pool__pending', pool['name']) + loop_count = 1 + while (slots_available?($redis.smembers('vmpooler__check__pool').count, slots.to_i).nil?) + sleep(loop_delay * 3) + unless maxloop.zero? + return if loop_count >= maxloop + loop_count += 1 end end + slots_free = slots_available?($redis.smembers('vmpooler__check__pool').count, slots) + next_slot = (slots_free.to_i + 1).to_s + check_pool(pool, next_slot) + rescue => err + $logger.log('d', "[!] [#{pool['name']}] evaluating failed with an error: #{err}") + end + + def check_pool(pool, slot) + $redis.sadd('vmpooler__check__pool', pool['name']) + $redis.srem('vmpooler__check__pool__pending', pool['name']) + $redis.hset("vmpooler__pool__#{pool['name']}", 'slot', slot) + + $providers[slot] ||= Vmpooler::VsphereHelper.new $config, $metrics + + $threads[slot] = Thread.new do + _check_pool(pool, $providers[slot]) + end + rescue => err + $logger.log('d', "[!] [#{pool['name']}] checking failed with an error: #{err}") end def _check_pool(pool, provider) @@ -731,18 +751,36 @@ module Vmpooler end end end + $redis.srem('vmpooler__check__pool', pool['name']) + $redis.hdel("vmpooler__pool__#{pool['name']}", 'slot') rescue => err $logger.log('d', "[!] [#{pool['name']}] _check_pool failed with an error: #{err}") + $redis.srem('vmpooler__check__pool', pool['name']) + $redis.hdel("vmpooler__pool__#{pool['name']}", 'slot') raise end + def clean_stale_redis_queues + # Clear out the tasks manager, as we don't know about any tasks at this point + $redis.set('vmpooler__tasks__clone', 0) + # Clear out vmpooler__migrations since stale entries may be left after a restart + $redis.del('vmpooler__migration') + # Clear out pool entries used to track slot usage + to_clear = $redis.smembers('vmpooler__check__pool') + to_clear.each do |pool_name| + $redis.del("vmpooler__pool__#{pool_name}") + end + # Clear out vmpooler__check__pool entries to ensure starting with a clean state + $redis.del('vmpooler__check__pool') + $redis.del('vmpooler__check__pool__pending') + end + def execute!(maxloop = 0, loop_delay = 1) $logger.log('d', 'starting vmpooler') - # Clear out the tasks manager, as we don't know about any tasks at this point - $redis.set('vmpooler__tasks__clone', 0) - # Clear out vmpooler__migrations since stale entries may be left after a restart - $redis.del('vmpooler__migration') + clean_stale_redis_queues + + task_limit = $config[:config]['task_limit'] loop_count = 1 loop do @@ -761,12 +799,7 @@ module Vmpooler end $config[:pools].each do |pool| - if ! $threads[pool['name']] - check_pool(pool) - elsif ! $threads[pool['name']].alive? - $logger.log('d', "[!] [#{pool['name']}] worker thread died, restarting") - check_pool(pool) - end + evaluate_pool(pool, task_limit, maxloop, loop_delay) end sleep(loop_delay)