(POOLER-112) Ensure a VM is only destroyed once

This commit implements a vm_mutex hash to allow synchronizing VM operations that should only happen once across threads. Without this change pool_manager will try to evaluate or destroy a VM multiple times, which results in an error being thrown by one of the destroy attempts as only one can succeed and a duplication of resources unnecessarily when there are no errors.
This commit is contained in:
kirby@puppetlabs.com 2018-05-25 11:54:46 -07:00
parent 89e1f17738
commit 3a0f0880e7
2 changed files with 172 additions and 70 deletions

View file

@ -24,6 +24,8 @@ module Vmpooler
# Pool mutex # Pool mutex
@reconfigure_pool = {} @reconfigure_pool = {}
@vm_mutex = {}
end end
def config def config
@ -44,6 +46,9 @@ module Vmpooler
end end
def _check_pending_vm(vm, pool, timeout, provider) def _check_pending_vm(vm, pool, timeout, provider)
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
host = provider.get_vm(pool, vm) host = provider.get_vm(pool, vm)
unless host unless host
fail_pending_vm(vm, pool, timeout, false) fail_pending_vm(vm, pool, timeout, false)
@ -55,6 +60,7 @@ module Vmpooler
fail_pending_vm(vm, pool, timeout) fail_pending_vm(vm, pool, timeout)
end end
end end
end
def remove_nonexistent_vm(vm, pool) def remove_nonexistent_vm(vm, pool)
$redis.srem("vmpooler__pending__#{pool}", vm) $redis.srem("vmpooler__pending__#{pool}", vm)
@ -114,6 +120,9 @@ module Vmpooler
def _check_ready_vm(vm, pool, ttl, provider) def _check_ready_vm(vm, pool, ttl, provider)
# Periodically check that the VM is available # Periodically check that the VM is available
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
check_stamp = $redis.hget('vmpooler__vm__' + vm, 'check') check_stamp = $redis.hget('vmpooler__vm__' + vm, 'check')
return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime']) return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime'])
@ -162,6 +171,7 @@ module Vmpooler
end end
end end
end end
end
def check_running_vm(vm, pool, ttl, provider) def check_running_vm(vm, pool, ttl, provider)
Thread.new do Thread.new do
@ -175,6 +185,9 @@ module Vmpooler
end end
def _check_running_vm(vm, pool, ttl, provider) def _check_running_vm(vm, pool, ttl, provider)
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
host = provider.get_vm(pool, vm) host = provider.get_vm(pool, vm)
if host if host
@ -189,6 +202,7 @@ module Vmpooler
end end
end end
end end
end
def move_vm_queue(pool, vm, queue_from, queue_to, msg = nil) def move_vm_queue(pool, vm, queue_from, queue_to, msg = nil)
$redis.smove("vmpooler__#{queue_from}__#{pool}", "vmpooler__#{queue_to}__#{pool}", vm) $redis.smove("vmpooler__#{queue_from}__#{pool}", "vmpooler__#{queue_to}__#{pool}", vm)
@ -251,6 +265,9 @@ module Vmpooler
end end
def _destroy_vm(vm, pool, provider) def _destroy_vm(vm, pool, provider)
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
$redis.srem('vmpooler__completed__' + pool, vm) $redis.srem('vmpooler__completed__' + pool, vm)
$redis.hdel('vmpooler__active__' + pool, vm) $redis.hdel('vmpooler__active__' + pool, vm)
$redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now) $redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now)
@ -266,6 +283,7 @@ module Vmpooler
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds") $logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
$metrics.timing("destroy.#{pool}", finish) $metrics.timing("destroy.#{pool}", finish)
end end
end
def create_vm_disk(pool_name, vm, disk_size, provider) def create_vm_disk(pool_name, vm, disk_size, provider)
Thread.new do Thread.new do
@ -467,8 +485,11 @@ module Vmpooler
def migrate_vm(vm_name, pool_name, provider) def migrate_vm(vm_name, pool_name, provider)
Thread.new do Thread.new do
begin begin
mutex = vm_mutex(vm_name)
mutex.synchronize do
$redis.srem("vmpooler__migrating__#{pool_name}", vm_name) $redis.srem("vmpooler__migrating__#{pool_name}", vm_name)
provider.migrate_vm(pool_name, vm_name) provider.migrate_vm(pool_name, vm_name)
end
rescue => err rescue => err
$logger.log('s', "[x] [#{pool_name}] '#{vm_name}' migration failed with an error: #{err}") $logger.log('s', "[x] [#{pool_name}] '#{vm_name}' migration failed with an error: #{err}")
end end
@ -579,6 +600,10 @@ module Vmpooler
@reconfigure_pool[poolname] || @reconfigure_pool[poolname] = Mutex.new @reconfigure_pool[poolname] || @reconfigure_pool[poolname] = Mutex.new
end end
def vm_mutex(vmname)
@vm_mutex[vmname] || @vm_mutex[vmname] = Mutex.new
end
def sync_pool_template(pool) def sync_pool_template(pool)
pool_template = $redis.hget('vmpooler__config__template', pool['name']) pool_template = $redis.hget('vmpooler__config__template', pool['name'])
if pool_template if pool_template

View file

@ -92,6 +92,19 @@ EOT
subject._check_pending_vm(vm, pool, timeout, provider) subject._check_pending_vm(vm, pool, timeout, provider)
end end
end end
context 'with a locked vm mutex' do
let(:mutex) { Mutex.new }
before(:each) do
mutex.lock
end
it 'should return' do
expect(subject).to receive(:vm_mutex).and_return(mutex)
expect(subject._check_pending_vm(vm, pool, timeout, provider)).to be_nil
end
end
end end
describe '#remove_nonexistent_vm' do describe '#remove_nonexistent_vm' do
@ -404,6 +417,19 @@ EOT
end end
end end
end end
context 'with a locked vm mutex' do
let(:mutex) { Mutex.new }
before(:each) do
mutex.lock
end
it 'should return' do
expect(subject).to receive(:vm_mutex).and_return(mutex)
expect(subject._check_ready_vm(vm, pool, ttl, provider)).to be_nil
end
end
end end
describe '#check_running_vm' do describe '#check_running_vm' do
@ -479,6 +505,19 @@ EOT
expect(redis.sismember("vmpooler__completed__#{pool}", vm)).to be(true) expect(redis.sismember("vmpooler__completed__#{pool}", vm)).to be(true)
end end
end end
context 'with a locked vm mutex' do
let(:mutex) { Mutex.new }
before(:each) do
mutex.lock
end
it 'should return' do
expect(subject).to receive(:vm_mutex).and_return(mutex)
expect(subject._check_running_vm(vm, pool, timeout, provider)).to be_nil
end
end
end end
describe '#move_vm_queue' do describe '#move_vm_queue' do
@ -732,6 +771,19 @@ EOT
expect{ subject._destroy_vm(vm,pool,provider) }.to raise_error(/MockError/) expect{ subject._destroy_vm(vm,pool,provider) }.to raise_error(/MockError/)
end end
end end
context 'when the VM mutex is locked' do
let(:mutex) { Mutex.new }
before(:each) do
mutex.lock
end
it 'should return' do
expect(subject).to receive(:vm_mutex).with(vm).and_return(mutex)
expect(subject._destroy_vm(vm,pool,provider)).to eq(nil)
end
end
end end
describe '#create_vm_disk' do describe '#create_vm_disk' do
@ -1501,6 +1553,31 @@ EOT
subject.migrate_vm(vm, pool, provider) subject.migrate_vm(vm, pool, provider)
end end
end end
context 'with a locked vm mutex' do
let(:mutex) { Mutex.new }
before(:each) do
mutex.lock
end
it 'should return' do
expect(subject).to receive(:vm_mutex).and_return(mutex)
expect(subject.migrate_vm(vm, pool, provider)).to be_nil
end
end
end
describe '#vm_mutex' do
it 'should return a mutex' do
expect(subject.vm_mutex(vm)).to be_a(Mutex)
end
it 'should return the same mutex when called twice' do
first = subject.vm_mutex(vm)
second = subject.vm_mutex(vm)
expect(first).to be(second)
end
end end
describe 'sync_pool_template' do describe 'sync_pool_template' do