This commit is contained in:
mattkirby 2017-04-15 01:48:03 +00:00 committed by GitHub
commit faf35db358
2 changed files with 167 additions and 83 deletions

View file

@ -26,6 +26,11 @@ module Vmpooler
end
end
def slots_available(slots, max_slots)
# Returns slots in use unless max is reached
return slots unless slots >= max_slots
end
def open_socket(host, domain=nil, timeout=5, port=22, &block)
Timeout.timeout(timeout) do
target_host = host
@ -569,23 +574,38 @@ module Vmpooler
finish
end
def check_pool(pool, maxloop = 0, loop_delay = 5)
$logger.log('d', "[*] [#{pool['name']}] starting worker thread")
$providers[pool['name']] ||= Vmpooler::VsphereHelper.new $config, $metrics
$threads[pool['name']] = Thread.new do
loop_count = 1
loop do
_check_pool(pool, $providers[pool['name']])
sleep(loop_delay)
unless maxloop.zero?
break if loop_count >= maxloop
loop_count += 1
end
def evaluate_pool(pool, slots, maxloop = 0, loop_delay = 1)
return if $redis.hget("vmpooler__pool__#{pool['name']}", 'slot')
checking_pools = $redis.smembers('vmpooler__check__pool__pending')
return if checking_pools.include? pool['name']
$redis.sadd('vmpooler__check__pool__pending', pool['name'])
loop_count = 1
while (slots_available($redis.smembers('vmpooler__check__pool').count, slots.to_i).nil?)
sleep(loop_delay)
unless maxloop.zero?
return if loop_count >= maxloop
loop_count += 1
end
end
slots_free = slots_available($redis.smembers('vmpooler__check__pool').count, slots)
next_slot = (slots_free.to_i + 1).to_s
check_pool(pool, next_slot)
rescue => err
$logger.log('d', "[!] [#{pool['name']}] evaluating failed with an error: #{err}")
end
def check_pool(pool, slot)
$redis.sadd('vmpooler__check__pool', pool['name'])
$redis.srem('vmpooler__check__pool__pending', pool['name'])
$redis.hset("vmpooler__pool__#{pool['name']}", 'slot', slot)
$providers[slot] ||= Vmpooler::VsphereHelper.new $config, $metrics
$threads[pool['name']] = Thread.new do
_check_pool(pool, $providers[slot])
end
rescue => err
$logger.log('d', "[!] [#{pool['name']}] checking failed with an error: #{err}")
end
def _check_pool(pool, provider)
@ -731,18 +751,36 @@ module Vmpooler
end
end
end
$redis.srem('vmpooler__check__pool', pool['name'])
$redis.hdel("vmpooler__pool__#{pool['name']}", 'slot')
rescue => err
$logger.log('d', "[!] [#{pool['name']}] _check_pool failed with an error: #{err}")
$redis.srem('vmpooler__check__pool', pool['name'])
$redis.hdel("vmpooler__pool__#{pool['name']}", 'slot')
raise
end
def clean_stale_redis_queues
# Clear out the tasks manager, as we don't know about any tasks at this point
$redis.set('vmpooler__tasks__clone', 0)
# Clear out vmpooler__migrations since stale entries may be left after a restart
$redis.del('vmpooler__migration')
# Clear out pool entries used to track slot usage
to_clear = $redis.smembers('vmpooler__check__pool')
to_clear.each do |pool_name|
$redis.del("vmpooler__pool__#{pool_name}")
end
# Clear out vmpooler__check__pool entries to ensure starting with a clean state
$redis.del('vmpooler__check__pool')
$redis.del('vmpooler__check__pool__pending')
end
def execute!(maxloop = 0, loop_delay = 1)
$logger.log('d', 'starting vmpooler')
# Clear out the tasks manager, as we don't know about any tasks at this point
$redis.set('vmpooler__tasks__clone', 0)
# Clear out vmpooler__migrations since stale entries may be left after a restart
$redis.del('vmpooler__migration')
clean_stale_redis_queues
task_limit = $config[:config]['task_limit']
loop_count = 1
loop do
@ -761,12 +799,7 @@ module Vmpooler
end
$config[:pools].each do |pool|
if ! $threads[pool['name']]
check_pool(pool)
elsif ! $threads[pool['name']].alive?
$logger.log('d', "[!] [#{pool['name']}] worker thread died, restarting")
check_pool(pool)
end
evaluate_pool(pool, task_limit, maxloop, loop_delay)
end
sleep(loop_delay)

View file

@ -37,6 +37,27 @@ describe 'Pool Manager' do
end
end
describe '#slots_available' do
let(:slots) { 1 }
let(:max_slots) { 2 }
before do
expect(subject).not_to be_nil
end
it 'returns the next number of slots in use if less than the max' do
expect(subject.slots_available slots, max_slots).to eq(1)
end
it 'returns 0 when no slots are in use' do
expect(subject.slots_available 0, max_slots).to eq(0)
end
it 'returns nil when all slots are in use' do
expect(subject.slots_available 2, max_slots).to be_nil
end
end
describe '#open_socket' do
let(:TCPSocket) { double('tcpsocket') }
let(:socket) { double('tcpsocket') }
@ -1673,6 +1694,8 @@ EOT
let(:config) {
YAML.load(<<-EOT
---
:config:
task_limit: 10
:pools:
- name: #{pool}
EOT
@ -1689,6 +1712,7 @@ EOT
before(:each) do
allow(subject).to receive(:check_disk_queue)
allow(subject).to receive(:check_snapshot_queue)
allow(subject).to receive(:evaluate_pool)
allow(subject).to receive(:check_pool)
expect(logger).to receive(:log).with('d', 'starting vmpooler')
end
@ -1705,6 +1729,25 @@ EOT
expect(redis.get('vmpooler__migration')).to be_nil
end
it 'should clear check pool tasks' do
redis.sadd('vmpooler__check__pool', pool['name'])
subject.execute!(1,0)
expect(redis.get('vmpooler__check__pool')).to be_nil
end
it 'should clear check pool pending tasks' do
redis.sadd('vmpooler__check__pool__pending', pool['name'])
subject.execute!(1,0)
expect(redis.get('vmpooler__check__pool__pending')).to be_nil
end
it 'should clear all slot allocation' do
redis.sadd('vmpooler__check__pool', pool['name'])
redis.hset("vmpooler__pool__#{pool['name']}", 'slot', 1)
subject.execute!(1,0)
expect(redis.keys('vmpooler__pool*')).to eq([])
end
it 'should run the check_disk_queue method' do
expect(subject).to receive(:check_disk_queue)
@ -1718,7 +1761,7 @@ EOT
end
it 'should check the pools in the config' do
expect(subject).to receive(:check_pool).with(a_pool_with_name_of(pool))
expect(subject).to receive(:evaluate_pool).with(a_pool_with_name_of(pool), 10, 1, 0)
subject.execute!(1,0)
end
@ -1744,6 +1787,7 @@ EOT
subject.execute!(1,0)
end
end
context 'with dead snapshot_manager thread' do
@ -1779,15 +1823,6 @@ EOT
# Reset the global variable - Note this is a code smell
$threads = nil
end
it 'should run the check_pool method and log a message' do
expect(thread).to receive(:alive?).and_return(false)
expect(subject).to receive(:check_pool).with(a_pool_with_name_of(pool))
expect(logger).to receive(:log).with('d', "[!] [#{pool}] worker thread died, restarting")
$threads[pool] = thread
subject.execute!(1,0)
end
end
context 'delays between loops' do
@ -1833,7 +1868,6 @@ EOT
it 'should run per thread tasks 5 times when threads are not remembered' do
expect(subject).to receive(:check_disk_queue).exactly(maxloop).times
expect(subject).to receive(:check_snapshot_queue).exactly(maxloop).times
expect(subject).to receive(:check_pool).exactly(maxloop).times
subject.execute!(maxloop,0)
end
@ -1841,10 +1875,8 @@ EOT
it 'should not run per thread tasks when threads are alive' do
expect(subject).to receive(:check_disk_queue).exactly(0).times
expect(subject).to receive(:check_snapshot_queue).exactly(0).times
expect(subject).to receive(:check_pool).exactly(0).times
allow(thread).to receive(:alive?).and_return(true)
$threads[pool] = thread
$threads['disk_manager'] = thread
$threads['snapshot_manager'] = thread
@ -1853,13 +1885,71 @@ EOT
end
end
describe "#check_pool" do
describe "#evaluate_pool" do
let(:task_limit) { 1 }
before do
expect(subject).not_to be_nil
end
context 'on evaluating pool' do
before(:each) do
allow(subject).to receive(:check_pool)
end
it 'should run check_pool' do
expect(subject).to receive(:check_pool).with(a_pool_with_name_of(pool), '1')
subject.evaluate_pool({'name' => pool}, task_limit)
end
it 'should not run check_pool when current pool is being checked' do
redis.hset("vmpooler__pool__#{pool}", 'slot', 1)
expect(subject).to_not receive(:check_pool)
subject.evaluate_pool({'name' => pool}, task_limit, 1, 0)
end
it 'should not run check_pool when no slot is free' do
redis.sadd('vmpooler__check__pool', pool)
expect(subject).to_not receive(:check_pool)
subject.evaluate_pool({'name' => pool}, task_limit, 1, 0)
end
it 'should not run check_pool when pending pools include requested pool' do
redis.sadd('vmpooler__check__pool__pending', pool)
expect(subject).to_not receive(:check_pool)
subject.evaluate_pool({'name' => pool}, task_limit)
end
it 'should not run check_pool when no slots are free' do
redis.sadd('vmpooler__check__pool', pool)
expect(subject).to_not receive(:check_pool)
subject.evaluate_pool({'name' => "#{pool}0"}, task_limit, 1, 0)
end
it 'should allocate the second slot when the first is in use and task_limit is 2' do
redis.sadd('vmpooler__check__pool', pool)
expect(subject).to receive(:check_pool).with(a_pool_with_name_of("#{pool}0"), '2')
subject.evaluate_pool({'name' => "#{pool}0"}, 2, 1, 0)
end
it 'should add pool to pending queue when no slots are free' do
redis.sadd('vmpooler__check__pool', pool)
expect(subject).to_not receive(:check_pool)
subject.evaluate_pool({'name' => "#{pool}0"}, task_limit, 1, 0)
expect(redis.sismember('vmpooler__check__pool__pending', "#{pool}0")).to be true
end
end
end
describe "#check_pool" do
let(:threads) {{}}
let(:provider) {{}}
let(:config) {
YAML.load(<<-EOT
---
:config:
task_limit: 10
:pools:
- name: #{pool}
EOT
@ -1878,7 +1968,6 @@ EOT
before(:each) do
# Note the Vmpooler::VsphereHelper is not mocked
allow(subject).to receive(:_check_pool)
expect(logger).to receive(:log).with('d', "[*] [#{pool}] starting worker thread")
end
after(:each) do
@ -1887,22 +1976,19 @@ EOT
$providers = nil
end
it 'should log a message the worker thread is starting' do
subject.check_pool(pool_object,1,0)
end
it 'should populate the providers global variable' do
subject.check_pool(pool_object,1,0)
subject.check_pool(pool_object,1)
expect($providers[pool]).to_not be_nil
expect($providers).to_not be_nil
end
it 'should populate the threads global variable' do
subject.check_pool(pool_object,1,0)
subject.check_pool(pool_object,1)
# Unable to test for nil as the Thread is mocked
expect($threads.keys.include?(pool))
end
end
context 'delays between loops' do
@ -1922,43 +2008,8 @@ EOT
$provider = nil
end
it 'when a non-default loop delay is specified' do
start_time = Time.now
subject.check_pool(pool_object,maxloop,loop_delay)
finish_time = Time.now
# Use a generous delta to take into account various CPU load etc.
expect(finish_time - start_time).to be_within(0.75).of(maxloop * loop_delay)
end
end
context 'loops specified number of times (5)' do
let(:maxloop) { 5 }
# Note a maxloop of zero can not be tested as it never terminates
before(:each) do
allow(logger).to receive(:log)
# Note the Vmpooler::VsphereHelper is not mocked
allow(subject).to receive(:_check_pool)
end
after(:each) do
# Reset the global variable - Note this is a code smell
$threads = nil
$provider = nil
end
it 'should run startup tasks only once' do
expect(logger).to receive(:log).with('d', "[*] [#{pool}] starting worker thread")
subject.check_pool(pool_object,maxloop,0)
end
it 'should run per thread tasks 5 times' do
expect(subject).to receive(:_check_pool).exactly(maxloop).times
subject.check_pool(pool_object,maxloop,0)
end
end
end
describe '#remove_vmpooler_migration_vm' do