Add concurrent-ruby and configure additional parts for provisioning on demand

This commit is contained in:
kirby@puppetlabs.com 2020-04-14 16:43:06 -07:00
parent 63712741a0
commit 11c5107279
7 changed files with 151 additions and 35 deletions

View file

@ -1,6 +1,7 @@
# frozen_string_literal: true
module Vmpooler
require 'concurrent'
require 'date'
require 'json'
require 'net/ldap'

View file

@ -42,26 +42,73 @@ module Vmpooler
Vmpooler::API.settings.checkoutlock
end
def fetch_single_vm(template)
template_backends = [template]
def get_template_aliases(template)
result = []
aliases = Vmpooler::API.settings.config[:alias]
if aliases
template_backends += aliases[template] if aliases[template].is_a?(Array)
result += aliases[template] if aliases[template].is_a?(Array)
template_backends << aliases[template] if aliases[template].is_a?(String)
pool_index = pool_index(pools)
weighted_pools = {}
template_backends.each do |t|
next unless pool_index.key? t
end
result
end
index = pool_index[t]
clone_target = pools[index]['clone_target'] || config['clone_target']
next unless config.key?('backend_weight')
def get_pool_weights(template_backends)
pool_index = pool_index(pools)
weighted_pools = {}
template_backends.each do |t|
next unless pool_index.key? t
weight = config['backend_weight'][clone_target]
if weight
weighted_pools[t] = weight
index = pool_index[t]
clone_target = pools[index]['clone_target'] || config['clone_target']
next unless config.key?('backend_weight')
weight = config['backend_weight'][clone_target]
if weight
weighted_pools[t] = weight
end
end
weighted_pools
end
def count_selection(selection)
result = {}
selection.uniq.each do |poolname|
result[poolname] = selection.count(poolname)
end
result
end
def evaluate_template_aliases(template, count)
template_backends = [template]
selection = []
aliases = get_template_aliases(template)
if aliases
template_backends += aliases
weighted_pools = get_pool_weights(template_backends)
pickup = Pickup.new(weighted_pools) if weighted_pools.count == template_backends.count
count.times do
if pickup
selection << pickup.pick
else
selection << template_backends.sample
end
end
else
count.times do
selection << template
end
end
return count_selection(selection)
end
def fetch_single_vm(template)
template_backends = [template]
aliases = get_template_aliases(template)
if aliases
template_backends += aliases
weighted_pools = get_pool_weights(template_backends)
if weighted_pools.count == template_backends.count
pickup = Pickup.new(weighted_pools)
@ -295,8 +342,15 @@ module Vmpooler
status 201
requested = payload[:requested].map { |poolname, count| "#{poolname}:#{count}" }.join(',')
backend.hset("vmpooler__odrequest__#{request_id}", 'requested', requested)
requested = payload[:requested]
platforms_with_aliases = []
requested.each do |poolname, count|
selection = evaluate_template_aliases(poolname, count)
selection.map { |aliasname, count| platforms_with_aliases << "#{poolname}:#{aliasname}:#{count}" }
end
platforms_string = platforms_with_aliases.join(',')
backend.hset("vmpooler__odrequest__#{request_id}", 'requested', platforms_string)
result['ok'] = true
result

View file

@ -35,6 +35,8 @@ module Vmpooler
# Name generator for generating host names
@name_generator = Spicy::Proton.new
@tasks = Concurrent::Hash.new
# load specified providers from config file
load_used_providers
end
@ -86,10 +88,11 @@ module Vmpooler
return if mutex.locked?
mutex.synchronize do
request_id = $redis.hget("vmpooler__vm__#{vm}", request_id)
if provider.vm_ready?(pool, vm)
move_pending_vm_to_ready(vm, pool)
move_pending_vm_to_ready(vm, pool, request_id)
else
fail_pending_vm(vm, pool, timeout)
fail_pending_vm(vm, pool, timeout, request_id=request_id)
end
end
end
@ -99,14 +102,17 @@ module Vmpooler
$logger.log('d', "[!] [#{pool}] '#{vm}' no longer exists. Removing from pending.")
end
def fail_pending_vm(vm, pool, timeout, exists = true)
def fail_pending_vm(vm, pool, timeout, exists = true, request_id = nil)
clone_stamp = $redis.hget("vmpooler__vm__#{vm}", 'clone')
return true unless clone_stamp
time_since_clone = (Time.now - Time.parse(clone_stamp)) / 60
if time_since_clone > timeout
if exists
$redis.multi
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__completed__' + pool, vm)
$redis.zadd('vmpooler__odcreate__task', 1, "#{pool_name}:1:#{request_id}") if request_id
$redis.exec
$metrics.increment("errors.markedasfailed.#{pool}")
$logger.log('d', "[!] [#{pool}] '#{vm}' marked as 'failed' after #{timeout} minutes")
else
@ -119,16 +125,28 @@ module Vmpooler
false
end
def move_pending_vm_to_ready(vm, pool)
def move_pending_vm_to_ready(vm, pool, request_id)
clone_time = $redis.hget('vmpooler__vm__' + vm, 'clone')
finish = format('%<time>.2f', time: Time.now - Time.parse(clone_time)) if clone_time
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__ready__' + pool, vm)
if request_id
fulfilled = $redis.hget("vmpooler__odrequest__#{request_id}", pool)
fulfilled = "#{fulfilled}:vm" if fulfilled
fulfilled = vm unless fulfilled
$redis.multi
$redis.hset("vmpooler__odrequest__#{request_id}", pool, fulfilled)
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__running__' + pool, vm)
else
$redis.multi
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__ready__' + pool, vm)
end
$redis.hset('vmpooler__boot__' + Date.today.to_s, pool + ':' + vm, finish) # maybe remove as this is never used by vmpooler itself?
$redis.hset("vmpooler__vm__#{vm}", 'ready', Time.now)
# last boot time is displayed in API, and used by alarming script
$redis.hset('vmpooler__lastboot', pool, Time.now)
$redis.exec
$metrics.timing("time_to_ready_state.#{pool}", finish)
$logger.log('s', "[>] [#{pool}] '#{vm}' moved from 'pending' to 'ready' queue")
@ -265,12 +283,17 @@ module Vmpooler
end
# Clone a VM
def clone_vm(pool_name, provider)
def clone_vm(pool_name, provider, request_id = nil)
Thread.new do
begin
_clone_vm(pool_name, provider)
_clone_vm(pool_name, provider, request_id)
rescue StandardError => e
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM with an error: #{e}")
if request_id
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM for request #{request_id} with an error: #{e}") if request_id
$redis.zadd('vmpooler__odcreate__task', 1, "#{pool_name}:1:#{request_id}") if request_id
else
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM with an error: #{e}")
end
raise
end
end
@ -311,13 +334,16 @@ module Vmpooler
hostname
end
def _clone_vm(pool_name, provider)
def _clone_vm(pool_name, provider, request_id = nil)
new_vmname = find_unique_hostname(pool_name)
# Add VM to Redis inventory ('pending' pool)
$redis.multi
$redis.sadd('vmpooler__pending__' + pool_name, new_vmname)
$redis.hset('vmpooler__vm__' + new_vmname, 'clone', Time.now)
$redis.hset('vmpooler__vm__' + new_vmname, 'template', pool_name)
$redis.hset('vmpooler__vm__' + new_vmname, 'request_id', request_id)
$redis.exec
begin
$logger.log('d', "[ ] [#{pool_name}] Starting to clone '#{new_vmname}'")
@ -325,18 +351,23 @@ module Vmpooler
provider.create_vm(pool_name, new_vmname)
finish = format('%<time>.2f', time: Time.now - start)
$redis.multi
$redis.hset('vmpooler__clone__' + Date.today.to_s, pool_name + ':' + new_vmname, finish)
$redis.hset('vmpooler__vm__' + new_vmname, 'clone_time', finish)
$redis.exec
$logger.log('s', "[+] [#{pool_name}] '#{new_vmname}' cloned in #{finish} seconds")
$metrics.timing("clone.#{pool_name}", finish)
rescue StandardError
$redis.multi
$redis.srem("vmpooler__pending__#{pool_name}", new_vmname)
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
$redis.expire("vmpooler__vm__#{new_vmname}", expiration_ttl)
$redis.exec
raise
ensure
$redis.decr('vmpooler__tasks__clone')
$redis.decr('vmpooler__tasks__clone') unless request_id
@tasks['ondemand_clone_count'] -= 1 if request_id
end
end
@ -1219,9 +1250,9 @@ module Vmpooler
loop_delay_decay = CHECK_LOOP_DELAY_DECAY_DEFAULT)
# Use the pool setings if they exist
loop_delay_min = pool['check_loop_delay_min'] unless pool['check_loop_delay_min'].nil?
loop_delay_max = pool['check_loop_delay_max'] unless pool['check_loop_delay_max'].nil?
loop_delay_decay = pool['check_loop_delay_decay'] unless pool['check_loop_delay_decay'].nil?
loop_delay_min = $config[:config]['check_loop_delay_min'] unless $config[:config]['check_loop_delay_min'].nil?
loop_delay_max = $config[:config]['check_loop_delay_max'] unless $config[:config]['check_loop_delay_max'].nil?
loop_delay_decay = $config[:config]['check_loop_delay_decay'] unless $config[:config]['check_loop_delay_decay'].nil?
loop_delay_decay = 2.0 if loop_delay_decay <= 1.0
loop_delay_max = loop_delay_min if loop_delay_max.nil? || loop_delay_max < loop_delay_min
@ -1247,19 +1278,43 @@ module Vmpooler
def process_ondemand_requests
requests = $redis.zrange('vmpooler__provisioning__requests', 0, -1)
return 0 if requests.empty?
if ! requests.empty?
requests.each do |request_id|
create_ondemand_vms(request_id)
$redis.zrem('vmpooler__provisioning__requests', request_id)
end
requests.each do |request_id|
create_ondemand_vms(request_id)
$redis.zrem('vmpooler__provisioning__requests', request_id)
end
return requests.length
provisioning_tasks = $redis.zrange('vmpooler__odcreate__task', 0, -1)
if ! requests.empty?
process_ondemand_vms(provisioning_tasks) unless provisioning_tasks.empty?
end
return requests.length + provisioning_tasks.length
end
def create_ondemand_vms(request_id)
requested = $redis.hget("vmpooler__odrequest__#{request_id}", 'requested')
requested = requested.split(',')
$redis.multi
requested.map { |request| $redis.zadd('vmpooler__odcreate__task', Time.now, "#{request}:#{request_id}") }
$redis.exec
end
def process_ondemand_vms(queue)
ondemand_clone_limit = $config[:config]['ondemand_clone_limit'].to_i
queue.each do |request|
platform, count, request_id = request.split(':')
if @tasks['ondemand_clone_count'] + count <= ondemand_clone_limit
provider = get_provider_for_pool(platform)
count.times do
@tasks['ondemand_clone_count'] += 1
clone_vm(platform, provider, request_id)
end
end
end
end
def execute!(maxloop = 0, loop_delay = 1)
@ -1356,7 +1411,7 @@ module Vmpooler
check_ondemand_requests
elsif !$threads['ondemand_provisioner'].alive?
$logger.log('d', '[!] [ondemand_provisioner] worker thread died, restarting')
check_ondemand_requests
check_ondemand_requests(check_loop_delay_min, check_loop_delay_max, check_loop_delay_decay)
end
sleep(loop_delay)