Implement redis connection pooling, multi, and pipelines.

This commit is contained in:
kirby@puppetlabs.com 2020-04-24 15:14:47 -07:00
parent 52bf6c4c89
commit 9973ed878f
6 changed files with 234 additions and 204 deletions

View file

@ -2,6 +2,7 @@
require 'vmpooler/providers'
require 'spicy-proton'
require 'redis'
module Vmpooler
class PoolManager
@ -9,7 +10,7 @@ module Vmpooler
CHECK_LOOP_DELAY_MAX_DEFAULT = 60
CHECK_LOOP_DELAY_DECAY_DEFAULT = 2.0
def initialize(config, logger, metrics)
def initialize(config, logger, redis_connection_pool, metrics)
$config = config
# Load logger library
@ -19,24 +20,18 @@ module Vmpooler
$metrics = metrics
# Redis connection pool
@redis = ConnectionPool.new(size: 10) {
Vmpooler.redis_connection(
$config[:config][:redis]['server'],
$config[:config][:redis]['port'],
$config[:config][:redis]['password']
)
}
@redis = redis_connection_pool
# VM Provider objects
$providers = {}
$providers = Concurrent::Hash.new
# Our thread-tracker object
$threads = {}
$threads = Concurrent::Hash.new
# Pool mutex
@reconfigure_pool = {}
@reconfigure_pool = Concurrent::Hash.new
@vm_mutex = {}
@vm_mutex = Concurrent::Hash.new
# Name generator for generating host names
@name_generator = Spicy::Proton.new
@ -85,7 +80,10 @@ module Vmpooler
_check_pending_vm(vm, pool, timeout, provider)
rescue StandardError => e
$logger.log('s', "[!] [#{pool}] '#{vm}' #{timeout} #{provider} errored while checking a pending vm : #{e}")
fail_pending_vm(vm, pool, timeout)
@redis.with do |redis|
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
fail_pending_vm(vm, pool, timeout, redis, request_id=request_id)
end
raise
end
end
@ -96,11 +94,13 @@ module Vmpooler
return if mutex.locked?
mutex.synchronize do
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
if provider.vm_ready?(pool, vm)
move_pending_vm_to_ready(vm, pool, request_id, redis)
else
fail_pending_vm(vm, pool, timeout, redis, request_id = request_id)
@redis.with do |redis|
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
if provider.vm_ready?(pool, vm)
move_pending_vm_to_ready(vm, pool, request_id, redis)
else
fail_pending_vm(vm, pool, timeout, redis, request_id = request_id)
end
end
end
end
@ -114,10 +114,14 @@ module Vmpooler
clone_stamp = redis.hget("vmpooler__vm__#{vm}", 'clone')
return true unless clone_stamp
# if clone_stamp == 'QUEUED'
# $logger.log('s', "Waiting for clone_stamp. Got 'QUEUED'.")
# return true
# end
time_since_clone = (Time.now - Time.parse(clone_stamp)) / 60
if time_since_clone > timeout
if exists
pool_alias = get_alias_for_request(pool, request_id, redis) if $request_id
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias') if $request_id
redis.multi
redis.smove('vmpooler__pending__' + pool, 'vmpooler__completed__' + pool, vm)
redis.zadd('vmpooler__odcreate__task', 1, "#{pool_alias}:#{pool_name}:1:#{request_id}") if request_id
@ -136,32 +140,36 @@ module Vmpooler
def move_pending_vm_to_ready(vm, pool, request_id, redis)
clone_time = redis.hget('vmpooler__vm__' + vm, 'clone')
finish = format('%<time>.2f', time: Time.now - Time.parse(clone_time)) if clone_time
# return false unless clone_time
# if clone_time == 'QUEUED'
# $logger.log('s', "Waiting for clone_time. Got 'QUEUED'.")
# return false
# end
finish = format('%<time>.2f', time: Time.now - Time.parse(clone_time))
if request_id
fulfilled = redis.hget("vmpooler__odrequest__#{request_id}", pool)
fulfilled = "#{fulfilled}:#{vm}" if fulfilled
fulfilled ||= vm
redis.multi
redis.hset("vmpooler__active__#{pool}", vm, Time.now)
redis.hset("vmpooler__vm__#{vm}", 'checkout', Time.now)
redis.hset("vmpooler__odrequest__#{request_id}", pool, fulfilled)
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias')
redis.pipelined do
redis.hset("vmpooler__active__#{pool}", vm, Time.now)
redis.hset("vmpooler__vm__#{vm}", 'checkout', Time.now)
redis.sadd("vmpooler__#{request_id}__#{pool_alias}__#{pool}", vm)
end
move_vm_queue(pool, vm, 'pending', 'running', redis)
else
redis.multi
redis.smove('vmpooler__pending__' + pool, 'vmpooler__ready__' + pool, vm)
end
redis.hset('vmpooler__boot__' + Date.today.to_s, pool + ':' + vm, finish) # maybe remove as this is never used by vmpooler itself?
redis.hset("vmpooler__vm__#{vm}", 'ready', Time.now)
redis.pipelined do
redis.hset('vmpooler__boot__' + Date.today.to_s, pool + ':' + vm, finish) # maybe remove as this is never used by vmpooler itself?
redis.hset("vmpooler__vm__#{vm}", 'ready', Time.now)
# last boot time is displayed in API, and used by alarming script
redis.hset('vmpooler__lastboot', pool, Time.now)
redis.exec
# last boot time is displayed in API, and used by alarming script
redis.hset('vmpooler__lastboot', pool, Time.now)
end
$metrics.timing("time_to_ready_state.#{pool}", finish)
$logger.log('s', "[>] [#{pool}] '#{vm}' moved from 'pending' to 'ready' queue")
$logger.log('s', "[>] [#{pool}] '#{vm}' moved from 'pending' to 'ready' queue") unless request_id
$logger.log('s', "[>] [#{pool}] '#{vm}' is 'ready' for request '#{request_id}'") if request_id
end
def vm_still_ready?(pool_name, vm_name, provider, redis)
@ -173,10 +181,10 @@ module Vmpooler
move_vm_queue(pool_name, vm_name, 'ready', 'completed', redis, "is unreachable, removed from 'ready' queue")
end
def check_ready_vm(vm, pool_name, ttl, provider, redis)
def check_ready_vm(vm, pool_name, ttl, provider)
Thread.new do
begin
_check_ready_vm(vm, pool_name, ttl, provider, redis)
_check_ready_vm(vm, pool_name, ttl, provider)
rescue StandardError => e
$logger.log('s', "[!] [#{pool_name}] '#{vm}' failed while checking a ready vm : #{e}")
raise
@ -184,36 +192,38 @@ module Vmpooler
end
end
def _check_ready_vm(vm, pool_name, ttl, provider, redis)
def _check_ready_vm(vm, pool_name, ttl, provider)
# Periodically check that the VM is available
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
check_stamp = redis.hget('vmpooler__vm__' + vm, 'check')
return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime'])
@redis.with do |redis|
check_stamp = redis.hget('vmpooler__vm__' + vm, 'check')
return if check_stamp && (((Time.now - Time.parse(check_stamp)) / 60) <= $config[:config]['vm_checktime'])
redis.hset('vmpooler__vm__' + vm, 'check', Time.now)
# Check if the hosts TTL has expired
if ttl > 0
# if 'boottime' is nil, set bootime to beginning of unix epoch, forces TTL to be assumed expired
boottime = redis.hget("vmpooler__vm__#{vm}", 'ready')
if boottime
boottime = Time.parse(boottime)
else
boottime = Time.at(0)
end
if ((Time.now - boottime) / 60).to_s[/^\d+\.\d{1}/].to_f > ttl
redis.smove('vmpooler__ready__' + pool_name, 'vmpooler__completed__' + pool_name, vm)
redis.hset('vmpooler__vm__' + vm, 'check', Time.now)
# Check if the hosts TTL has expired
if ttl > 0
# if 'boottime' is nil, set bootime to beginning of unix epoch, forces TTL to be assumed expired
boottime = redis.hget("vmpooler__vm__#{vm}", 'ready')
if boottime
boottime = Time.parse(boottime)
else
boottime = Time.at(0)
end
if ((Time.now - boottime) / 60).to_s[/^\d+\.\d{1}/].to_f > ttl
redis.smove('vmpooler__ready__' + pool_name, 'vmpooler__completed__' + pool_name, vm)
$logger.log('d', "[!] [#{pool_name}] '#{vm}' reached end of TTL after #{ttl} minutes, removed from 'ready' queue")
return
$logger.log('d', "[!] [#{pool_name}] '#{vm}' reached end of TTL after #{ttl} minutes, removed from 'ready' queue")
return
end
end
return if mismatched_hostname?(vm, pool_name, provider)
vm_still_ready?(pool_name, vm, provider, redis)
end
return if mismatched_hostname?(vm, pool_name, provider)
vm_still_ready?(pool_name, vm, provider, redis)
end
end
@ -248,10 +258,10 @@ module Vmpooler
true
end
def check_running_vm(vm, pool, ttl, provider, redis)
def check_running_vm(vm, pool, ttl, provider)
Thread.new do
begin
_check_running_vm(vm, pool, ttl, provider, redis)
_check_running_vm(vm, pool, ttl, provider)
rescue StandardError => e
$logger.log('s', "[!] [#{pool}] '#{vm}' failed while checking VM with an error: #{e}")
raise
@ -259,31 +269,39 @@ module Vmpooler
end
end
def _check_running_vm(vm, pool, ttl, provider, redis)
def _check_running_vm(vm, pool, ttl, provider)
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
# Check that VM is within defined lifetime
checkouttime = redis.hget('vmpooler__active__' + pool, vm)
if checkouttime
running = (Time.now - Time.parse(checkouttime)) / 60 / 60
@redis.with do |redis|
# Check that VM is within defined lifetime
checkouttime = redis.hget('vmpooler__active__' + pool, vm)
# return if checkouttime == 'QUEUED'
if checkouttime
# if checkouttime == 'QUEUED'
# $logger.log('s', "checkouttime is #{checkouttime}")
# return
# end
time_since_checkout = Time.now - Time.parse(checkouttime)
running = time_since_checkout / 60 / 60
if (ttl.to_i > 0) && (running.to_i >= ttl.to_i)
move_vm_queue(pool, vm, 'running', 'completed', redis, "reached end of TTL after #{ttl} hours")
return
if (ttl.to_i > 0) && (running.to_i >= ttl.to_i)
move_vm_queue(pool, vm, 'running', 'completed', redis, "reached end of TTL after #{ttl} hours")
return
end
end
end
if provider.vm_ready?(pool, vm)
return
else
host = provider.get_vm(pool, vm)
if host
if provider.vm_ready?(pool, vm)
return
else
move_vm_queue(pool, vm, 'running', 'completed', redis, 'is no longer in inventory, removing from running')
host = provider.get_vm(pool, vm)
if host
return
else
move_vm_queue(pool, vm, 'running', 'completed', redis, 'is no longer in inventory, removing from running')
end
end
end
end
@ -295,15 +313,13 @@ module Vmpooler
end
# Clone a VM
def clone_vm(pool_name, provider, request_id = nil)
def clone_vm(pool_name, provider, request_id = nil, pool_alias = nil)
Thread.new do
begin
_clone_vm(pool_name, provider, request_id)
_clone_vm(pool_name, provider, request_id, pool_alias)
rescue StandardError => e
if request_id
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM for request #{request_id} with an error: #{e}")
pool_alias = get_alias_for_request(pool, request_id)
$logger.log('s', "Pool_alias is #{pool_alias}")
@redis.with do |redis|
redis.zadd('vmpooler__odcreate__task', 1, "#{pool_alias}:#{pool_name}:1:#{request_id}")
end
@ -350,59 +366,58 @@ module Vmpooler
hostname
end
def _clone_vm(pool_name, provider, request_id = nil)
def _clone_vm(pool_name, provider, request_id = nil, pool_alias = nil)
@redis.with do |redis|
new_vmname = find_unique_hostname(pool_name, redis)
mutex = vm_mutex(new_vmname)
mutex.synchronize do
# Add VM to Redis inventory ('pending' pool)
redis.multi
redis.sadd('vmpooler__pending__' + pool_name, new_vmname)
redis.hset('vmpooler__vm__' + new_vmname, 'clone', Time.now)
redis.hset('vmpooler__vm__' + new_vmname, 'template', pool_name)
redis.hset('vmpooler__vm__' + new_vmname, 'request_id', request_id)
redis.exec
end
begin
$logger.log('d', "[ ] [#{pool_name}] Starting to clone '#{new_vmname}'")
start = Time.now
provider.create_vm(pool_name, new_vmname)
finish = format('%<time>.2f', time: Time.now - start)
@redis.with do |redis|
# Add VM to Redis inventory ('pending' pool)
redis.multi
redis.hset('vmpooler__clone__' + Date.today.to_s, pool_name + ':' + new_vmname, finish)
redis.hset('vmpooler__vm__' + new_vmname, 'clone_time', finish)
redis.sadd('vmpooler__pending__' + pool_name, new_vmname)
redis.hset('vmpooler__vm__' + new_vmname, 'clone', Time.now)
redis.hset('vmpooler__vm__' + new_vmname, 'template', pool_name) # This value is used to represent the pool.
redis.hset('vmpooler__vm__' + new_vmname, 'pool', pool_name)
redis.hset('vmpooler__vm__' + new_vmname, 'request_id', request_id) if request_id
redis.hset('vmpooler__vm__' + new_vmname, 'pool_alias', pool_alias) if pool_alias
redis.exec
end
$logger.log('s', "[+] [#{pool_name}] '#{new_vmname}' cloned in #{finish} seconds")
$metrics.timing("clone.#{pool_name}", finish)
rescue StandardError
@redis.with do |redis|
redis.multi
redis.srem("vmpooler__pending__#{pool_name}", new_vmname)
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
redis.expire("vmpooler__vm__#{new_vmname}", expiration_ttl)
redis.exec
end
raise
ensure
if request_id
@tasks['ondemand_clone_count'] -= 1 if request_id
else
@redis.with do |redis|
redis.decr('vmpooler__tasks__clone') unless request_id
begin
$logger.log('d', "[ ] [#{pool_name}] Starting to clone '#{new_vmname}'")
start = Time.now
provider.create_vm(pool_name, new_vmname)
finish = format('%<time>.2f', time: Time.now - start)
redis.pipelined do
redis.hset('vmpooler__clone__' + Date.today.to_s, pool_name + ':' + new_vmname, finish)
redis.hset('vmpooler__vm__' + new_vmname, 'clone_time', finish)
end
$logger.log('s', "[+] [#{pool_name}] '#{new_vmname}' cloned in #{finish} seconds")
$metrics.timing("clone.#{pool_name}", finish)
rescue StandardError
redis.pipelined do
redis.srem("vmpooler__pending__#{pool_name}", new_vmname)
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
redis.expire("vmpooler__vm__#{new_vmname}", expiration_ttl)
end
raise
ensure
if request_id
@tasks['ondemand_clone_count'] -= 1
else
redis.decr('vmpooler__tasks__clone')
end
end
end
end
end
# Destroy a VM
def destroy_vm(vm, pool, provider, redis)
def destroy_vm(vm, pool, provider)
Thread.new do
begin
_destroy_vm(vm, pool, provider, redis)
_destroy_vm(vm, pool, provider)
rescue StandardError => e
$logger.log('d', "[!] [#{pool}] '#{vm}' failed while destroying the VM with an error: #{e}")
raise
@ -410,27 +425,29 @@ module Vmpooler
end
end
def _destroy_vm(vm, pool, provider, redis)
def _destroy_vm(vm, pool, provider)
mutex = vm_mutex(vm)
return if mutex.locked?
mutex.synchronize do
redis.hdel('vmpooler__active__' + pool, vm)
redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now)
@redis.with do |redis|
redis.hdel('vmpooler__active__' + pool, vm)
redis.hset('vmpooler__vm__' + vm, 'destroy', Time.now)
# Auto-expire metadata key
redis.expire('vmpooler__vm__' + vm, ($config[:redis]['data_ttl'].to_i * 60 * 60))
# Auto-expire metadata key
redis.expire('vmpooler__vm__' + vm, ($config[:redis]['data_ttl'].to_i * 60 * 60))
start = Time.now
start = Time.now
provider.destroy_vm(pool, vm)
provider.destroy_vm(pool, vm)
redis.srem('vmpooler__completed__' + pool, vm)
redis.srem('vmpooler__completed__' + pool, vm)
finish = format('%<time>.2f', time: Time.now - start)
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
$metrics.timing("destroy.#{pool}", finish)
get_vm_usage_labels(vm, redis)
finish = format('%<time>.2f', time: Time.now - start)
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
$metrics.timing("destroy.#{pool}", finish)
get_vm_usage_labels(vm, redis)
end
end
dereference_mutex(vm)
end
@ -439,14 +456,13 @@ module Vmpooler
return unless $config[:config]['usage_stats']
redis.multi
checkout = redis.hget("vmpooler__vm__#{vm}", 'checkout')
redis.hget("vmpooler__vm__#{vm}", 'checkout')
redis.hget("vmpooler__vm__#{vm}", 'tag:jenkins_build_url')
redis.hget("vmpooler__vm__#{vm}", 'token:user') || 'unauthenticated'
redis.hget("vmpooler__vm__#{vm}", 'template')
checkout, jenkins_build_url, user, poolname = redis.exec
return if checkout.nil?
jenkins_build_url = redis.hget("vmpooler__vm__#{vm}", 'tag:jenkins_build_url')
user = redis.hget("vmpooler__vm__#{vm}", 'token:user') || 'unauthenticated'
poolname = redis.hget("vmpooler__vm__#{vm}", 'template')
redis.exec
unless jenkins_build_url
user = user.gsub('.', '_')
$metrics.increment("usage.#{user}.#{poolname}")
@ -928,9 +944,9 @@ module Vmpooler
def sync_pool_template(pool)
@redis.with do |redis|
pool_template = redis.hget('vmpooler__config__template', pool['name'])
end
if pool_template
pool['template'] = pool_template unless pool['template'] == pool_template
if pool_template
pool['template'] = pool_template unless pool['template'] == pool_template
end
end
end
@ -1025,17 +1041,20 @@ module Vmpooler
def remove_excess_vms(pool)
@redis.with do |redis|
ready = redis.scard("vmpooler__ready__#{pool['name']}")
total = redis.scard("vmpooler__pending__#{pool['name']}") + ready
redis.multi
redis.scard("vmpooler__ready__#{pool['name']}")
redis.scard("vmpooler__pending__#{pool['name']}")
ready, pending = redis.exec
total = pending.to_i + total.to_i
return if total.nil?
return if total == 0
mutex = pool_mutex(pool['name'])
return if mutex.locked?
return unless ready > pool['size']
return unless ready.to_i > pool['size']
mutex.synchronize do
difference = ready - pool['size']
difference = ready.to_i - pool['size']
difference.times do
next_vm = redis.spop("vmpooler__ready__#{pool['name']}")
move_vm_queue(pool['name'], next_vm, 'ready', 'completed', redis)
@ -1118,7 +1137,7 @@ module Vmpooler
begin
vm_lifetime = redis.hget('vmpooler__vm__' + vm, 'lifetime') || $config[:config]['vm_lifetime'] || 12
pool_check_response[:checked_running_vms] += 1
check_running_vm(vm, pool_name, vm_lifetime, provider, redis)
check_running_vm(vm, pool_name, vm_lifetime, provider)
rescue StandardError => e
$logger.log('d', "[!] [#{pool_name}] _check_pool with an error while evaluating running VMs: #{e}")
end
@ -1153,7 +1172,7 @@ module Vmpooler
if inventory[vm]
begin
pool_check_response[:checked_pending_vms] += 1
check_pending_vm(vm, pool_name, pool_timeout, provider, redis)
check_pending_vm(vm, pool_name, pool_timeout, provider)
rescue StandardError => e
$logger.log('d', "[!] [#{pool_name}] _check_pool failed with an error while evaluating pending VMs: #{e}")
end
@ -1170,22 +1189,22 @@ module Vmpooler
if inventory[vm]
begin
pool_check_response[:destroyed_vms] += 1
destroy_vm(vm, pool_name, provider, redis)
destroy_vm(vm, pool_name, provider)
rescue StandardError => e
redis.multi
redis.srem("vmpooler__completed__#{pool_name}", vm)
redis.hdel("vmpooler__active__#{pool_name}", vm)
redis.del("vmpooler__vm__#{vm}")
redis.exec
redis.pipelined do
redis.srem("vmpooler__completed__#{pool_name}", vm)
redis.hdel("vmpooler__active__#{pool_name}", vm)
redis.del("vmpooler__vm__#{vm}")
end
$logger.log('d', "[!] [#{pool_name}] _check_pool failed with an error while evaluating completed VMs: #{e}")
end
else
$logger.log('s', "[!] [#{pool_name}] '#{vm}' not found in inventory, removed from 'completed' queue")
redis.multi
redis.srem("vmpooler__completed__#{pool_name}", vm)
redis.hdel("vmpooler__active__#{pool_name}", vm)
redis.del("vmpooler__vm__#{vm}")
redis.exec
redis.pipelined do
redis.srem("vmpooler__completed__#{pool_name}", vm)
redis.hdel("vmpooler__active__#{pool_name}", vm)
redis.del("vmpooler__vm__#{vm}")
end
end
end
end
@ -1228,10 +1247,11 @@ module Vmpooler
@redis.with do |redis|
redis.multi
ready = redis.scard("vmpooler__ready__#{pool_name}")
total = redis.scard("vmpooler__pending__#{pool_name}") + ready
running = redis.scard("vmpooler__running__#{pool_name}")
redis.exec
redis.scard("vmpooler__ready__#{pool_name}")
redis.scard("vmpooler__pending__#{pool_name}")
redis.scard("vmpooler__running__#{pool_name}")
ready, pending, running = redis.exec
total = pending.to_i + ready.to_i
$metrics.gauge("ready.#{pool_name}", ready)
$metrics.gauge("running.#{pool_name}", running)
@ -1243,7 +1263,7 @@ module Vmpooler
$logger.log('s', "[!] [#{pool_name}] is empty")
end
(pool_size - total).times do
(pool_size - total.to_i).times do
if redis.get('vmpooler__tasks__clone').to_i < $config[:config]['task_limit'].to_i
begin
redis.incr('vmpooler__tasks__clone')
@ -1369,8 +1389,8 @@ module Vmpooler
requests&.map { |request_id| create_ondemand_vms(request_id, redis) }
provisioning_tasks = process_ondemand_vms(redis)
requests_ready = check_ondemand_requests_ready(redis)
requests.length + provisioning_tasks + requests_ready
end
requests.length + provisioning_tasks + requests_ready
end
def create_ondemand_vms(request_id, redis)
@ -1383,43 +1403,43 @@ module Vmpooler
score = redis.zscore('vmpooler__provisioning__request', request_id)
requested = requested.split(',')
redis.multi
requested.each do |request|
redis.zadd('vmpooler__odcreate__task', Time.now.to_i, "#{request}:#{request_id}")
redis.pipelined do
requested.each do |request|
redis.zadd('vmpooler__odcreate__task', Time.now.to_i, "#{request}:#{request_id}")
end
redis.zrem('vmpooler__provisioning__request', request_id)
redis.zadd('vmpooler__provisioning__processing', score, request_id)
end
redis.zrem('vmpooler__provisioning__request', request_id)
redis.zadd('vmpooler__provisioning__processing', score, request_id)
redis.exec
end
def process_ondemand_vms(redis)
queue_key = 'vmpooler__odcreate__task'
queue = redis.zrange(queue_key, 0, -1)
queue = redis.zrange(queue_key, 0, -1, :with_scores => true)
ondemand_clone_limit = $config[:config]['ondemand_clone_limit']
@tasks['ondemand_clone_count'] = 0 unless @tasks['ondemand_clone_count']
queue.each do |request|
queue.each do |request, score|
if @tasks['ondemand_clone_count'] < ondemand_clone_limit
requested_platform, fulfilled_platform, count, request_id = request.split(':')
pool_alias, pool, count, request_id = request.split(':')
count = count.to_i
provider = get_provider_for_pool(fulfilled_platform)
delta = ondemand_clone_limit - @tasks['ondemand_clone_count']
if delta >= count
provider = get_provider_for_pool(pool)
slots = ondemand_clone_limit - @tasks['ondemand_clone_count']
return if slots == 0
if slots >= count
count.times do
@tasks['ondemand_clone_count'] += 1
clone_vm(fulfilled_platform, provider, request_id)
clone_vm(pool, provider, request_id, pool_alias)
end
redis.zrem(queue_key, request)
else
available_slots = delta - count
available_slots.times do
remaining_count = count - slots
slots.times do
@tasks['ondemand_clone_count'] += 1
clone_vm(fulfilled_platform, provider, request_id)
clone_vm(pool, provider, request_id, pool_alias)
end
redis.pipelined do
redis.zrem(queue_key, request)
redis.zadd(queue_key, score, "#{pool_alias}:#{pool}:#{remaining_count}:#{request_id}")
end
score = redis.zscore(queue_key, request_id)
redis.multi
redis.zrem(queue_key, request)
redis.zadd(queue_key, score, "#{requested_platform}:#{fulfilled_platform}:#{count - delta}:#{request_id}")
redis.exec
end
else
break
@ -1428,25 +1448,14 @@ module Vmpooler
queue.length
end
def get_alias_for_request(pool, request_id, redis)
# Identify the alias associated with a request_id for retries to get tagged properly
requested = redis.hget("vmpooler__odrequest__#{request_id}", 'requested')
requested.split(',').each do |request|
pool_alias, pool_name, count = request.split(':')
return pool_alias if pool_name == pool
end
end
def vms_ready?(request_id, redis)
catch :request_not_ready do
request_hash = redis.hgetall("vmpooler__odrequest__#{request_id}")
requested_platforms = request_hash['requested'].split(',')
requested_platforms.each do |platform|
_platform_alias, pool, count = platform.split(':')
pools_filled = request_hash[pool]
throw :request_not_ready unless pools_filled
fulfilled_count = request_hash[pool].split(':').size
throw :request_not_ready unless fulfilled_count == count.to_i
platform_alias, pool, count = platform.split(':')
pools_filled = redis.scard("vmpooler__#{request_id}__#{platform_alias}__#{pool}")
throw :request_not_ready unless pools_filled.to_i == count.to_i
end
return true
end
@ -1457,6 +1466,7 @@ module Vmpooler
in_progress_requests = redis.zrange('vmpooler__provisioning__processing', 0, -1)
in_progress_requests&.each do |request_id|
next unless vms_ready?(request_id, redis)
$logger.log('s', 'vms are ready')
redis.multi
redis.hset("vmpooler__odrequest__#{request_id}", 'status', 'ready')