From 3a0f0880e741009ad2775fdde49d3ecc68653e08 Mon Sep 17 00:00:00 2001 From: "kirby@puppetlabs.com" Date: Fri, 25 May 2018 11:54:46 -0700 Subject: [PATCH] (POOLER-112) Ensure a VM is only destroyed once This commit implements a vm_mutex hash to allow synchronizing VM operations that should only happen once across threads. Without this change pool_manager will try to evaluate or destroy a VM multiple times, which results in an error being thrown by one of the destroy attempts as only one can succeed and a duplication of resources unnecessarily when there are no errors. --- lib/vmpooler/pool_manager.rb | 163 +++++++++++++++++++-------------- spec/unit/pool_manager_spec.rb | 79 +++++++++++++++- 2 files changed, 172 insertions(+), 70 deletions(-) diff --git a/lib/vmpooler/pool_manager.rb b/lib/vmpooler/pool_manager.rb index 866cf36..a00acb4 100644 --- a/lib/vmpooler/pool_manager.rb +++ b/lib/vmpooler/pool_manager.rb @@ -24,6 +24,8 @@ module Vmpooler # Pool mutex @reconfigure_pool = {} + + @vm_mutex = {} end def config @@ -44,15 +46,19 @@ module Vmpooler end def _check_pending_vm(vm, pool, timeout, provider) - host = provider.get_vm(pool, vm) - unless host - fail_pending_vm(vm, pool, timeout, false) - return - end - if provider.vm_ready?(pool, vm) - move_pending_vm_to_ready(vm, pool, host) - else - fail_pending_vm(vm, pool, timeout) + mutex = vm_mutex(vm) + return if mutex.locked? + mutex.synchronize do + host = provider.get_vm(pool, vm) + unless host + fail_pending_vm(vm, pool, timeout, false) + return + end + if provider.vm_ready?(pool, vm) + move_pending_vm_to_ready(vm, pool, host) + else + fail_pending_vm(vm, pool, timeout) + end end end @@ -114,51 +120,55 @@ module Vmpooler def _check_ready_vm(vm, pool, ttl, provider) # Periodically check that the VM is available - check_stamp = $redis.hget('vmpooler__vm__' + vm, 'check') - return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime']) + mutex = vm_mutex(vm) + return if mutex.locked? + mutex.synchronize do + check_stamp = $redis.hget('vmpooler__vm__' + vm, 'check') + return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime']) - host = provider.get_vm(pool, vm) - # Check if the host even exists - unless host - $redis.srem('vmpooler__ready__' + pool, vm) - $logger.log('s', "[!] [#{pool}] '#{vm}' not found in inventory, removed from 'ready' queue") - return - end - - $redis.hset('vmpooler__vm__' + vm, 'check', Time.now) - # Check if the VM is not powered on, before checking TTL - unless host['powerstate'].casecmp('poweredon').zero? - $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) - $logger.log('d', "[!] [#{pool}] '#{vm}' appears to be powered off, removed from 'ready' queue") - return - end - - # Check if the hosts TTL has expired - if ttl > 0 - # host['boottime'] may be nil if host is not powered on - if ((Time.now - host['boottime']) / 60).to_s[/^\d+\.\d{1}/].to_f > ttl - $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) - - $logger.log('d', "[!] [#{pool}] '#{vm}' reached end of TTL after #{ttl} minutes, removed from 'ready' queue") + host = provider.get_vm(pool, vm) + # Check if the host even exists + unless host + $redis.srem('vmpooler__ready__' + pool, vm) + $logger.log('s', "[!] [#{pool}] '#{vm}' not found in inventory, removed from 'ready' queue") return end - end - # Check if the hostname has magically changed from underneath Pooler - if host['hostname'] != vm - $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) - $logger.log('d', "[!] [#{pool}] '#{vm}' has mismatched hostname, removed from 'ready' queue") - return - end + $redis.hset('vmpooler__vm__' + vm, 'check', Time.now) + # Check if the VM is not powered on, before checking TTL + unless host['powerstate'].casecmp('poweredon').zero? + $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) + $logger.log('d', "[!] [#{pool}] '#{vm}' appears to be powered off, removed from 'ready' queue") + return + end - # Check if the VM is still ready/available - begin - raise("VM #{vm} is not ready") unless provider.vm_ready?(pool, vm) - rescue - if $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) - $logger.log('d', "[!] [#{pool}] '#{vm}' is unreachable, removed from 'ready' queue") - else - $logger.log('d', "[!] [#{pool}] '#{vm}' is unreachable, and failed to remove from 'ready' queue") + # Check if the hosts TTL has expired + if ttl > 0 + # host['boottime'] may be nil if host is not powered on + if ((Time.now - host['boottime']) / 60).to_s[/^\d+\.\d{1}/].to_f > ttl + $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) + + $logger.log('d', "[!] [#{pool}] '#{vm}' reached end of TTL after #{ttl} minutes, removed from 'ready' queue") + return + end + end + + # Check if the hostname has magically changed from underneath Pooler + if host['hostname'] != vm + $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) + $logger.log('d', "[!] [#{pool}] '#{vm}' has mismatched hostname, removed from 'ready' queue") + return + end + + # Check if the VM is still ready/available + begin + raise("VM #{vm} is not ready") unless provider.vm_ready?(pool, vm) + rescue + if $redis.smove('vmpooler__ready__' + pool, 'vmpooler__completed__' + pool, vm) + $logger.log('d', "[!] [#{pool}] '#{vm}' is unreachable, removed from 'ready' queue") + else + $logger.log('d', "[!] [#{pool}] '#{vm}' is unreachable, and failed to remove from 'ready' queue") + end end end end @@ -175,16 +185,20 @@ module Vmpooler end def _check_running_vm(vm, pool, ttl, provider) - host = provider.get_vm(pool, vm) + mutex = vm_mutex(vm) + return if mutex.locked? + mutex.synchronize do + host = provider.get_vm(pool, vm) - if host - # Check that VM is within defined lifetime - checkouttime = $redis.hget('vmpooler__active__' + pool, vm) - if checkouttime - running = (Time.now - Time.parse(checkouttime)) / 60 / 60 + if host + # Check that VM is within defined lifetime + checkouttime = $redis.hget('vmpooler__active__' + pool, vm) + if checkouttime + running = (Time.now - Time.parse(checkouttime)) / 60 / 60 - if (ttl.to_i > 0) && (running.to_i >= ttl.to_i) - move_vm_queue(pool, vm, 'running', 'completed', "reached end of TTL after #{ttl} hours") + if (ttl.to_i > 0) && (running.to_i >= ttl.to_i) + move_vm_queue(pool, vm, 'running', 'completed', "reached end of TTL after #{ttl} hours") + end end end end @@ -251,20 +265,24 @@ module Vmpooler end def _destroy_vm(vm, pool, provider) - $redis.srem('vmpooler__completed__' + pool, vm) - $redis.hdel('vmpooler__active__' + pool, vm) - $redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now) + mutex = vm_mutex(vm) + return if mutex.locked? + mutex.synchronize do + $redis.srem('vmpooler__completed__' + pool, vm) + $redis.hdel('vmpooler__active__' + pool, vm) + $redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now) - # Auto-expire metadata key - $redis.expire('vmpooler__vm__' + vm, ($config[:redis]['data_ttl'].to_i * 60 * 60)) + # Auto-expire metadata key + $redis.expire('vmpooler__vm__' + vm, ($config[:redis]['data_ttl'].to_i * 60 * 60)) - start = Time.now + start = Time.now - provider.destroy_vm(pool, vm) + provider.destroy_vm(pool, vm) - finish = format('%.2f', Time.now - start) - $logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds") - $metrics.timing("destroy.#{pool}", finish) + finish = format('%.2f', Time.now - start) + $logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds") + $metrics.timing("destroy.#{pool}", finish) + end end def create_vm_disk(pool_name, vm, disk_size, provider) @@ -467,8 +485,11 @@ module Vmpooler def migrate_vm(vm_name, pool_name, provider) Thread.new do begin - $redis.srem("vmpooler__migrating__#{pool_name}", vm_name) - provider.migrate_vm(pool_name, vm_name) + mutex = vm_mutex(vm_name) + mutex.synchronize do + $redis.srem("vmpooler__migrating__#{pool_name}", vm_name) + provider.migrate_vm(pool_name, vm_name) + end rescue => err $logger.log('s', "[x] [#{pool_name}] '#{vm_name}' migration failed with an error: #{err}") end @@ -579,6 +600,10 @@ module Vmpooler @reconfigure_pool[poolname] || @reconfigure_pool[poolname] = Mutex.new end + def vm_mutex(vmname) + @vm_mutex[vmname] || @vm_mutex[vmname] = Mutex.new + end + def sync_pool_template(pool) pool_template = $redis.hget('vmpooler__config__template', pool['name']) if pool_template diff --git a/spec/unit/pool_manager_spec.rb b/spec/unit/pool_manager_spec.rb index 19f6c3f..be176d6 100644 --- a/spec/unit/pool_manager_spec.rb +++ b/spec/unit/pool_manager_spec.rb @@ -92,6 +92,19 @@ EOT subject._check_pending_vm(vm, pool, timeout, provider) end end + + context 'with a locked vm mutex' do + let(:mutex) { Mutex.new } + before(:each) do + mutex.lock + end + + it 'should return' do + expect(subject).to receive(:vm_mutex).and_return(mutex) + + expect(subject._check_pending_vm(vm, pool, timeout, provider)).to be_nil + end + end end describe '#remove_nonexistent_vm' do @@ -404,6 +417,19 @@ EOT end end end + + context 'with a locked vm mutex' do + let(:mutex) { Mutex.new } + before(:each) do + mutex.lock + end + + it 'should return' do + expect(subject).to receive(:vm_mutex).and_return(mutex) + + expect(subject._check_ready_vm(vm, pool, ttl, provider)).to be_nil + end + end end describe '#check_running_vm' do @@ -479,6 +505,19 @@ EOT expect(redis.sismember("vmpooler__completed__#{pool}", vm)).to be(true) end end + + context 'with a locked vm mutex' do + let(:mutex) { Mutex.new } + before(:each) do + mutex.lock + end + + it 'should return' do + expect(subject).to receive(:vm_mutex).and_return(mutex) + + expect(subject._check_running_vm(vm, pool, timeout, provider)).to be_nil + end + end end describe '#move_vm_queue' do @@ -681,7 +720,7 @@ EOT before(:each) do config[:redis] = nil end - + it 'should raise an error' do expect{ subject._destroy_vm(vm,pool,provider) }.to raise_error(NoMethodError) end @@ -732,6 +771,19 @@ EOT expect{ subject._destroy_vm(vm,pool,provider) }.to raise_error(/MockError/) end end + + context 'when the VM mutex is locked' do + let(:mutex) { Mutex.new } + before(:each) do + mutex.lock + end + + it 'should return' do + expect(subject).to receive(:vm_mutex).with(vm).and_return(mutex) + + expect(subject._destroy_vm(vm,pool,provider)).to eq(nil) + end + end end describe '#create_vm_disk' do @@ -1501,6 +1553,31 @@ EOT subject.migrate_vm(vm, pool, provider) end end + + context 'with a locked vm mutex' do + let(:mutex) { Mutex.new } + before(:each) do + mutex.lock + end + + it 'should return' do + expect(subject).to receive(:vm_mutex).and_return(mutex) + + expect(subject.migrate_vm(vm, pool, provider)).to be_nil + end + end + end + + describe '#vm_mutex' do + it 'should return a mutex' do + expect(subject.vm_mutex(vm)).to be_a(Mutex) + end + + it 'should return the same mutex when called twice' do + first = subject.vm_mutex(vm) + second = subject.vm_mutex(vm) + expect(first).to be(second) + end end describe 'sync_pool_template' do