mirror of
https://github.com/puppetlabs/vmpooler.git
synced 2026-01-27 02:18:41 -05:00
Replace connection_pool with vmpooler generic_connection_pool. Fix tests for changes.
This commit is contained in:
parent
b32b88b753
commit
90e09bfe1c
13 changed files with 808 additions and 509 deletions
|
|
@ -48,7 +48,7 @@ module Vmpooler
|
|||
|
||||
# Place pool configuration in redis so an API instance can discover running pool configuration
|
||||
def load_pools_to_redis
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
previously_configured_pools = redis.smembers('vmpooler__pools')
|
||||
currently_configured_pools = []
|
||||
config[:pools].each do |pool|
|
||||
|
|
@ -80,7 +80,7 @@ module Vmpooler
|
|||
_check_pending_vm(vm, pool, timeout, provider)
|
||||
rescue StandardError => e
|
||||
$logger.log('s', "[!] [#{pool}] '#{vm}' #{timeout} #{provider} errored while checking a pending vm : #{e}")
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
|
||||
fail_pending_vm(vm, pool, timeout, redis, request_id=request_id)
|
||||
end
|
||||
|
|
@ -94,10 +94,10 @@ module Vmpooler
|
|||
return if mutex.locked?
|
||||
|
||||
mutex.synchronize do
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
|
||||
if provider.vm_ready?(pool, vm)
|
||||
move_pending_vm_to_ready(vm, pool, request_id, redis)
|
||||
move_pending_vm_to_ready(vm, pool, redis, request_id)
|
||||
else
|
||||
fail_pending_vm(vm, pool, timeout, redis, request_id = request_id)
|
||||
end
|
||||
|
|
@ -138,13 +138,8 @@ module Vmpooler
|
|||
false
|
||||
end
|
||||
|
||||
def move_pending_vm_to_ready(vm, pool, request_id, redis)
|
||||
def move_pending_vm_to_ready(vm, pool, redis, request_id = nil)
|
||||
clone_time = redis.hget('vmpooler__vm__' + vm, 'clone')
|
||||
# return false unless clone_time
|
||||
# if clone_time == 'QUEUED'
|
||||
# $logger.log('s', "Waiting for clone_time. Got 'QUEUED'.")
|
||||
# return false
|
||||
# end
|
||||
finish = format('%<time>.2f', time: Time.now - Time.parse(clone_time))
|
||||
|
||||
if request_id
|
||||
|
|
@ -198,7 +193,7 @@ module Vmpooler
|
|||
return if mutex.locked?
|
||||
|
||||
mutex.synchronize do
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
check_stamp = redis.hget('vmpooler__vm__' + vm, 'check')
|
||||
return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime'])
|
||||
|
||||
|
|
@ -220,7 +215,7 @@ module Vmpooler
|
|||
end
|
||||
end
|
||||
|
||||
return if mismatched_hostname?(vm, pool_name, provider)
|
||||
return if mismatched_hostname?(vm, pool_name, provider, redis)
|
||||
|
||||
vm_still_ready?(pool_name, vm, provider, redis)
|
||||
end
|
||||
|
|
@ -274,7 +269,7 @@ module Vmpooler
|
|||
return if mutex.locked?
|
||||
|
||||
mutex.synchronize do
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
# Check that VM is within defined lifetime
|
||||
checkouttime = redis.hget('vmpooler__active__' + pool, vm)
|
||||
# return if checkouttime == 'QUEUED'
|
||||
|
|
@ -320,7 +315,7 @@ module Vmpooler
|
|||
rescue StandardError => e
|
||||
if request_id
|
||||
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM for request #{request_id} with an error: #{e}")
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.zadd('vmpooler__odcreate__task', 1, "#{pool_alias}:#{pool_name}:1:#{request_id}")
|
||||
end
|
||||
else
|
||||
|
|
@ -367,7 +362,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def _clone_vm(pool_name, provider, request_id = nil, pool_alias = nil)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
new_vmname = find_unique_hostname(pool_name, redis)
|
||||
mutex = vm_mutex(new_vmname)
|
||||
mutex.synchronize do
|
||||
|
|
@ -430,7 +425,7 @@ module Vmpooler
|
|||
return if mutex.locked?
|
||||
|
||||
mutex.synchronize do
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.hdel('vmpooler__active__' + pool, vm)
|
||||
redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now)
|
||||
|
||||
|
|
@ -581,7 +576,7 @@ module Vmpooler
|
|||
finish = format('%<time>.2f', time: Time.now - start)
|
||||
|
||||
if result
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
rdisks = redis.hget('vmpooler__vm__' + vm_name, 'disk')
|
||||
disks = rdisks ? rdisks.split(':') : []
|
||||
disks.push("+#{disk_size}gb")
|
||||
|
|
@ -616,7 +611,7 @@ module Vmpooler
|
|||
finish = format('%<time>.2f', time: Time.now - start)
|
||||
|
||||
if result
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.hset('vmpooler__vm__' + vm_name, 'snapshot:' + snapshot_name, Time.now.to_s)
|
||||
end
|
||||
$logger.log('s', "[+] [snapshot_manager] '#{vm_name}' snapshot created in #{finish} seconds")
|
||||
|
|
@ -708,7 +703,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def _check_disk_queue
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
task_detail = redis.spop('vmpooler__tasks__disk')
|
||||
unless task_detail.nil?
|
||||
begin
|
||||
|
|
@ -746,7 +741,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def _check_snapshot_queue
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
task_detail = redis.spop('vmpooler__tasks__snapshot')
|
||||
|
||||
unless task_detail.nil?
|
||||
|
|
@ -788,7 +783,7 @@ module Vmpooler
|
|||
begin
|
||||
mutex = vm_mutex(vm_name)
|
||||
mutex.synchronize do
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.srem("vmpooler__migrating__#{pool_name}", vm_name)
|
||||
end
|
||||
provider.migrate_vm(pool_name, vm_name)
|
||||
|
|
@ -823,7 +818,7 @@ module Vmpooler
|
|||
wakeup_by = Time.now + wakeup_period
|
||||
return if time_passed?(:exit_by, exit_by)
|
||||
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
initial_ready_size = redis.scard("vmpooler__ready__#{options[:poolname]}") if options[:pool_size_change]
|
||||
|
||||
initial_clone_target = redis.hget("vmpooler__pool__#{options[:poolname]}", options[:clone_target]) if options[:clone_target_change]
|
||||
|
|
@ -942,7 +937,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def sync_pool_template(pool)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
pool_template = redis.hget('vmpooler__config__template', pool['name'])
|
||||
if pool_template
|
||||
pool['template'] = pool_template unless pool['template'] == pool_template
|
||||
|
|
@ -966,7 +961,7 @@ module Vmpooler
|
|||
|
||||
def evaluate_template(pool, provider)
|
||||
mutex = pool_mutex(pool['name'])
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
prepared_template = redis.hget('vmpooler__template__prepared', pool['name'])
|
||||
configured_template = redis.hget('vmpooler__config__template', pool['name'])
|
||||
return if mutex.locked?
|
||||
|
|
@ -1013,10 +1008,10 @@ module Vmpooler
|
|||
pool['template'] = configured_template
|
||||
$logger.log('s', "[*] [#{pool['name']}] template updated from #{prepared_template} to #{configured_template}")
|
||||
# Remove all ready and pending VMs so new instances are created from the new template
|
||||
drain_pool(pool['name'])
|
||||
drain_pool(pool['name'], redis)
|
||||
# Prepare template for deployment
|
||||
$logger.log('s', "[*] [#{pool['name']}] preparing pool template for deployment")
|
||||
prepare_template(pool, provider)
|
||||
prepare_template(pool, provider, redis)
|
||||
$logger.log('s', "[*] [#{pool['name']}] is ready for use")
|
||||
end
|
||||
|
||||
|
|
@ -1024,7 +1019,7 @@ module Vmpooler
|
|||
mutex = pool_mutex(pool['name'])
|
||||
return if mutex.locked?
|
||||
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
clone_target = redis.hget('vmpooler__config__clone_target', pool['name'])
|
||||
return if clone_target.nil?
|
||||
return if clone_target == pool['clone_target']
|
||||
|
|
@ -1040,7 +1035,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def remove_excess_vms(pool)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.multi
|
||||
redis.scard("vmpooler__ready__#{pool['name']}")
|
||||
redis.scard("vmpooler__pending__#{pool['name']}")
|
||||
|
|
@ -1072,7 +1067,7 @@ module Vmpooler
|
|||
mutex = pool_mutex(pool['name'])
|
||||
return if mutex.locked?
|
||||
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
poolsize = redis.hget('vmpooler__config__poolsize', pool['name'])
|
||||
return if poolsize.nil?
|
||||
|
||||
|
|
@ -1087,7 +1082,7 @@ module Vmpooler
|
|||
|
||||
def reset_pool(pool)
|
||||
poolname = pool['name']
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
return unless redis.sismember('vmpooler__poolreset', poolname)
|
||||
|
||||
redis.srem('vmpooler__poolreset', poolname)
|
||||
|
|
@ -1104,7 +1099,7 @@ module Vmpooler
|
|||
begin
|
||||
mutex = pool_mutex(pool['name'])
|
||||
mutex.synchronize do
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
provider.vms_in_pool(pool['name']).each do |vm|
|
||||
if !redis.sismember('vmpooler__running__' + pool['name'], vm['name']) &&
|
||||
!redis.sismember('vmpooler__ready__' + pool['name'], vm['name']) &&
|
||||
|
|
@ -1131,7 +1126,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def check_running_pool_vms(pool_name, provider, pool_check_response, inventory)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.smembers("vmpooler__running__#{pool_name}").each do |vm|
|
||||
if inventory[vm]
|
||||
begin
|
||||
|
|
@ -1149,12 +1144,12 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def check_ready_pool_vms(pool_name, provider, pool_check_response, inventory, pool_ttl = 0)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.smembers("vmpooler__ready__#{pool_name}").each do |vm|
|
||||
if inventory[vm]
|
||||
begin
|
||||
pool_check_response[:checked_ready_vms] += 1
|
||||
check_ready_vm(vm, pool_name, pool_ttl || 0, provider, redis)
|
||||
check_ready_vm(vm, pool_name, pool_ttl || 0, provider)
|
||||
rescue StandardError => e
|
||||
$logger.log('d', "[!] [#{pool_name}] _check_pool failed with an error while evaluating ready VMs: #{e}")
|
||||
end
|
||||
|
|
@ -1167,7 +1162,7 @@ module Vmpooler
|
|||
|
||||
def check_pending_pool_vms(pool_name, provider, pool_check_response, inventory, pool_timeout = nil)
|
||||
pool_timeout ||= $config[:config]['timeout'] || 15
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.smembers("vmpooler__pending__#{pool_name}").reverse.each do |vm|
|
||||
if inventory[vm]
|
||||
begin
|
||||
|
|
@ -1184,7 +1179,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def check_completed_pool_vms(pool_name, provider, pool_check_response, inventory)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.smembers("vmpooler__completed__#{pool_name}").each do |vm|
|
||||
if inventory[vm]
|
||||
begin
|
||||
|
|
@ -1211,7 +1206,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def check_discovered_pool_vms(pool_name)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.smembers("vmpooler__discovered__#{pool_name}").reverse.each do |vm|
|
||||
%w[pending ready running completed].each do |queue|
|
||||
if redis.sismember("vmpooler__#{queue}__#{pool_name}", vm)
|
||||
|
|
@ -1228,7 +1223,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def check_migrating_pool_vms(pool_name, provider, pool_check_response, inventory)
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.smembers("vmpooler__migrating__#{pool_name}").reverse.each do |vm|
|
||||
if inventory[vm]
|
||||
begin
|
||||
|
|
@ -1245,7 +1240,7 @@ module Vmpooler
|
|||
def repopulate_pool_vms(pool_name, provider, pool_check_response, pool_size)
|
||||
return if pool_mutex(pool_name).locked?
|
||||
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
redis.multi
|
||||
redis.scard("vmpooler__ready__#{pool_name}")
|
||||
redis.scard("vmpooler__pending__#{pool_name}")
|
||||
|
|
@ -1384,7 +1379,7 @@ module Vmpooler
|
|||
end
|
||||
|
||||
def process_ondemand_requests
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
requests = redis.zrange('vmpooler__provisioning__request', 0, -1)
|
||||
requests&.map { |request_id| create_ondemand_vms(request_id, redis) }
|
||||
provisioning_tasks = process_ondemand_vms(redis)
|
||||
|
|
@ -1479,7 +1474,7 @@ module Vmpooler
|
|||
def execute!(maxloop = 0, loop_delay = 1)
|
||||
$logger.log('d', 'starting vmpooler')
|
||||
|
||||
@redis.with do |redis|
|
||||
@redis.with_metrics do |redis|
|
||||
# Clear out the tasks manager, as we don't know about any tasks at this point
|
||||
redis.set('vmpooler__tasks__clone', 0)
|
||||
# Clear out vmpooler__migrations since stale entries may be left after a restart
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue