diff --git a/lib/vmpooler/pool_manager.rb b/lib/vmpooler/pool_manager.rb index fc384f1..dab18f8 100644 --- a/lib/vmpooler/pool_manager.rb +++ b/lib/vmpooler/pool_manager.rb @@ -1354,7 +1354,20 @@ module Vmpooler loop_delay_max = CHECK_LOOP_DELAY_MAX_DEFAULT, loop_delay_decay = CHECK_LOOP_DELAY_DECAY_DEFAULT) - # Use the pool setings if they exist + $logger.log('d', '[*] [ondemand_provisioner] starting worker thread') + + $threads['ondemand_provisioner'] = Thread.new do + puts "running #{maxloop} #{loop_delay_min}" + _check_ondemand_requests(maxloop, loop_delay_min, loop_delay_max, loop_delay_decay) + end + end + + def _check_ondemand_requests(maxloop = 0, + loop_delay_min = CHECK_LOOP_DELAY_MIN_DEFAULT, + loop_delay_max = CHECK_LOOP_DELAY_MAX_DEFAULT, + loop_delay_decay = CHECK_LOOP_DELAY_DECAY_DEFAULT) + + puts 'jumped over' loop_delay_min = $config[:config]['check_loop_delay_min'] unless $config[:config]['check_loop_delay_min'].nil? loop_delay_max = $config[:config]['check_loop_delay_max'] unless $config[:config]['check_loop_delay_max'].nil? loop_delay_decay = $config[:config]['check_loop_delay_decay'] unless $config[:config]['check_loop_delay_decay'].nil? @@ -1367,6 +1380,7 @@ module Vmpooler loop do result = process_ondemand_requests + puts result loop_delay = (loop_delay * loop_delay_decay).to_i loop_delay = loop_delay_min if result > 0 @@ -1416,31 +1430,28 @@ module Vmpooler ondemand_clone_limit = $config[:config]['ondemand_clone_limit'] @tasks['ondemand_clone_count'] = 0 unless @tasks['ondemand_clone_count'] queue.each do |request, score| - if @tasks['ondemand_clone_count'] < ondemand_clone_limit - pool_alias, pool, count, request_id = request.split(':') - count = count.to_i - provider = get_provider_for_pool(pool) - slots = ondemand_clone_limit - @tasks['ondemand_clone_count'] - return if slots == 0 - if slots >= count - count.times do - @tasks['ondemand_clone_count'] += 1 - clone_vm(pool, provider, request_id, pool_alias) - end - redis.zrem(queue_key, request) - else - remaining_count = count - slots - slots.times do - @tasks['ondemand_clone_count'] += 1 - clone_vm(pool, provider, request_id, pool_alias) - end - redis.pipelined do - redis.zrem(queue_key, request) - redis.zadd(queue_key, score, "#{pool_alias}:#{pool}:#{remaining_count}:#{request_id}") - end + break unless @tasks['ondemand_clone_count'] < ondemand_clone_limit + pool_alias, pool, count, request_id = request.split(':') + count = count.to_i + provider = get_provider_for_pool(pool) + slots = ondemand_clone_limit - @tasks['ondemand_clone_count'] + return if slots == 0 + if slots >= count + count.times do + @tasks['ondemand_clone_count'] += 1 + clone_vm(pool, provider, request_id, pool_alias) end + redis.zrem(queue_key, request) else - break + remaining_count = count - slots + slots.times do + @tasks['ondemand_clone_count'] += 1 + clone_vm(pool, provider, request_id, pool_alias) + end + redis.pipelined do + redis.zrem(queue_key, request) + redis.zadd(queue_key, score, "#{pool_alias}:#{pool}:#{remaining_count}:#{request_id}") + end end end queue.length @@ -1464,7 +1475,6 @@ module Vmpooler in_progress_requests = redis.zrange('vmpooler__provisioning__processing', 0, -1) in_progress_requests&.each do |request_id| next unless vms_ready?(request_id, redis) - $logger.log('s', 'vms are ready') redis.multi redis.hset("vmpooler__odrequest__#{request_id}", 'status', 'ready') diff --git a/spec/helpers.rb b/spec/helpers.rb index daf3dc8..6547b12 100644 --- a/spec/helpers.rb +++ b/spec/helpers.rb @@ -142,3 +142,11 @@ end def create_ondemand_vm(vmname, request_id, pool, pool_alias, redis) redis.sadd("vmpooler__#{request_id}__#{pool_alias}__#{pool}", vmname) end + +def create_ondemand_creationtask(request_string, score, redis) + redis.zadd('vmpooler__odcreate__task', score, request_string) +end + +def create_ondemand_processing(request_id, score, redis) + redis.zadd('vmpooler__provisioning__processing', score, request_id) +end diff --git a/spec/unit/pool_manager_spec.rb b/spec/unit/pool_manager_spec.rb index e46de2a..34d663e 100644 --- a/spec/unit/pool_manager_spec.rb +++ b/spec/unit/pool_manager_spec.rb @@ -2856,6 +2856,12 @@ EOT subject.execute!(1,0) end + it 'should run the check_ondemand_requests method' do + expect(subject).to receive(:check_ondemand_requests) + + subject.execute!(1,0) + end + context 'creating Providers' do let(:vsphere_provider) { double('vsphere_provider') } let(:config) { @@ -3053,6 +3059,24 @@ EOT end end + context 'with dead ondemand provisioner thread' do + let(:ondemand_provisioner_thread) { double('thread', :alive? => false) } + let(:default_check_loop_delay_min) { 5 } + let(:default_check_loop_delay_max) { 60 } + let(:default_check_loop_delay_decay) { 2.0 } + before(:each) do + # Reset the global variable - Note this is a code smell + $threads = {} + $threads['ondemand_provisioner'] = ondemand_provisioner_thread + end + + it 'should run the process_ondemand_requests method' do + expect(subject).to receive(:check_ondemand_requests).with(default_check_loop_delay_min, default_check_loop_delay_max, default_check_loop_delay_decay) + subject.execute!(1,0) + end + + end + context 'with check_loop_delay_xxx settings' do let(:pool_thread) { double('thread', :alive? => false) } let(:check_loop_delay_min) { 7 } @@ -3098,6 +3122,7 @@ EOT allow(subject).to receive(:check_disk_queue) allow(subject).to receive(:check_snapshot_queue) allow(subject).to receive(:check_pool) + allow(subject).to receive(:check_ondemand_requests) end it 'when a non-default loop delay is specified' do @@ -3127,6 +3152,7 @@ EOT expect(subject).to receive(:check_disk_queue).exactly(maxloop).times expect(subject).to receive(:check_snapshot_queue).exactly(maxloop).times expect(subject).to receive(:check_pool).exactly(maxloop).times + expect(subject).to receive(:check_ondemand_requests).exactly(maxloop).times subject.execute!(maxloop,0) end @@ -3135,10 +3161,12 @@ EOT expect(subject).to receive(:check_disk_queue).exactly(0).times expect(subject).to receive(:check_snapshot_queue).exactly(0).times expect(subject).to receive(:check_pool).exactly(0).times + expect(subject).to receive(:check_ondemand_requests).exactly(0).times $threads[pool] = alive_thread $threads['disk_manager'] = alive_thread $threads['snapshot_manager'] = alive_thread + $threads['ondemand_provisioner'] = alive_thread subject.execute!(maxloop,0) end @@ -4641,11 +4669,9 @@ EOT describe '#create_ondemand_vms' do context 'when requested does not have corresponding data' do - #end it 'logs an error' do redis_connection_pool.with do |redis| expect(logger).to receive(:log).with('s', "Failed to find odrequest for request_id '1111'") - #expect(redis).to receive(:zrem).with('vmpooler__provisioning__request', '1111') subject.create_ondemand_vms('1111', redis) end end @@ -4685,5 +4711,235 @@ EOT expect(result).to eq(0) end end + + context 'with a request to create a single vm' do + let(:request_string) { "#{pool}:#{pool}:1" } + let(:pool_alias) { pool } + before(:each) do + config[:config]['ondemand_clone_limit'] = 10 + expect(subject).to receive(:get_provider_for_pool).and_return(provider) + redis_connection_pool.with do |redis| + create_ondemand_creationtask("#{request_string}:#{request_id}", current_time.to_i, redis) + end + end + + it 'creates the vm' do + redis_connection_pool.with do |redis| + expect(subject).to receive(:clone_vm).with(pool, provider, request_id, pool_alias) + subject.process_ondemand_vms(redis) + end + end + end + + context 'with a request to create more instances than the limit' do + let(:request_string) { "#{pool}:#{pool}:5" } + let(:request_string_remaining) { "#{pool}:#{pool}:2" } + let(:pool_alias) { pool } + before(:each) do + config[:config]['ondemand_clone_limit'] = 3 + expect(subject).to receive(:get_provider_for_pool).and_return(provider) + redis_connection_pool.with do |redis| + create_ondemand_creationtask("#{request_string}:#{request_id}", current_time.to_i, redis) + end + end + + it 'should create the maximum number of vms' do + redis_connection_pool.with do |redis| + expect(subject).to receive(:clone_vm).with(pool, provider, request_id, pool_alias).exactly(3).times + subject.process_ondemand_vms(redis) + end + end + + it 'should add the remaining number back as a new create task with the same score' do + redis_connection_pool.with do |redis| + expect(redis).to receive(:zadd).with('vmpooler__odcreate__task', current_time.to_i, "#{request_string_remaining}:#{request_id}") + subject.process_ondemand_vms(redis) + end + end + + it 'should return the number of requests processed' do + redis_connection_pool.with do |redis| + result = subject.process_ondemand_vms(redis) + expect(result).to eq(1) + end + end + end + + context 'when the limit has been reached' do + let(:clone_count) { { 'ondemand_clone_count' => 3 } } + before(:each) do + config[:config]['ondemand_clone_limit'] = 3 + subject.instance_variable_set(:@tasks, clone_count) + end + + it 'does not create any instances' do + redis_connection_pool.with do |redis| + expect(subject).to_not receive(:clone_vm) + subject.process_ondemand_vms(redis) + end + end + end + end + + describe '#vms_ready?' do + let(:request_string) { "#{pool}:#{pool}:5" } + let(:platform_alias) { pool } + before(:each) do + redis_connection_pool.with do |redis| + create_ondemand_request_for_test(request_id, current_time.to_i, request_string, redis) + end + end + + it 'returns false when vms for request_id are not ready' do + redis_connection_pool.with do |redis| + result = subject.vms_ready?(request_id, redis) + expect(result).to be false + end + end + + context 'with a request that has all instances ready' do + before(:each) do + redis_connection_pool.with do |redis| + ['vm1','vm2','vm3','vm4','vm5'].each do |v| + redis.sadd("vmpooler__#{request_id}__#{platform_alias}__#{pool}", v) + end + end + end + + it 'returns true' do + redis_connection_pool.with do |redis| + result = subject.vms_ready?(request_id, redis) + expect(result).to be true + end + end + end + + context 'with a request that has some instances ready' do + let(:request_string) { "#{pool}:#{pool}:3,#{pool}2:#{pool}2:3" } + before(:each) do + redis_connection_pool.with do |redis| + create_ondemand_request_for_test(request_id, current_time.to_i, request_string, redis) + ['vm1','vm2','vm3'].each do |v| + redis.sadd("vmpooler__#{request_id}__#{platform_alias}__#{pool}", v) + end + end + end + + it 'returns false' do + redis_connection_pool.with do |redis| + result = subject.vms_ready?(request_id, redis) + expect(result).to be false + end + end + end + end + + describe '#check_ondemand_requests_ready' do + it 'returns 0 when no provisoning requests are in progress' do + redis_connection_pool.with do |redis| + result = subject.check_ondemand_requests_ready(redis) + expect(result).to eq(0) + end + end + + context 'with requests in progress' do + before(:each) do + redis_connection_pool.with do |redis| + create_ondemand_processing(request_id, current_time, redis) + end + end + + it 'returns the number of requests processed' do + expect(subject).to receive(:vms_ready?).and_return(false) + redis_connection_pool.with do |redis| + result = subject.check_ondemand_requests_ready(redis) + expect(result).to eq(1) + end + end + + context 'when the request is ready' do + before(:each) do + expect(subject).to receive(:vms_ready?).and_return(true) + end + + it 'sets the request as ready' do + redis_connection_pool.with do |redis| + expect(redis).to receive(:hset).with("vmpooler__odrequest__#{request_id}", 'status', 'ready') + subject.check_ondemand_requests_ready(redis) + end + end + + it 'removes the request from processing' do + redis_connection_pool.with do |redis| + expect(redis).to receive(:zrem).with('vmpooler__provisioning__processing', request_id) + subject.check_ondemand_requests_ready(redis) + end + end + end + end + end + + describe 'check_ondemand_requests' do + let(:threads) {[]} + let(:maxloop) { 0 } + let(:loop_delay_min) { 5 } + let(:loop_delay_max) { 60 } + let(:loop_delay_decay) { 2.0 } + + before(:each) do + expect(Thread).to receive(:new).and_yield + end + + it 'should log the ondemand provisioner is starting' do + expect(subject).to receive(:_check_ondemand_requests).with(maxloop, loop_delay_min, loop_delay_max, loop_delay_decay) + expect(logger).to receive(:log).with('d', "[*] [ondemand_provisioner] starting worker thread") + + expect($threads.count).to be(0) + subject.check_ondemand_requests + expect($threads.count).to be(1) + end + + context' delays between loops' do + let(:maxloop) { 2 } + let(:loop_delay) { 1 } + + it 'when a non-default loop delay is specified' do + expect(subject).to receive(:sleep_with_wakeup_events).with(loop_delay, Numeric, Hash).exactly(maxloop).times + + subject.check_ondemand_requests(maxloop,loop_delay,loop_delay) + end + end + + context 'delays between loops with a specified min and max value' do + let(:maxloop) { 5 } + let(:loop_delay_min) { 1 } + let(:loop_delay_max) { 60 } + let(:loop_decay) { 3.0 } + + it 'delay values increase with a decay' do + expect(subject).to receive(:sleep_with_wakeup_events).with(3, Numeric, Hash).once + expect(subject).to receive(:sleep_with_wakeup_events).with(9, Numeric, Hash).once + expect(subject).to receive(:sleep_with_wakeup_events).with(27, Numeric, Hash).once + expect(subject).to receive(:sleep_with_wakeup_events).with(60, Numeric, Hash).twice + + subject.check_ondemand_requests(maxloop,loop_delay_min,loop_delay_max,loop_decay) + end + end + + context 'loops specified number of times (5)' do + let(:maxloop) { 5 } + + it 'should run startup tasks only once' do + expect(logger).to receive(:log).with('d', "[*] [ondemand_provisioner] starting worker thread") + + subject.check_ondemand_requests(maxloop,0) + end + + it 'should run per thread tasks 5 times' do + expect(subject).to receive(:process_ondemand_requests).and_return(0).exactly(maxloop).times + + subject.check_ondemand_requests(maxloop,0) + end + end end end