mirror of
https://github.com/puppetlabs/vmpooler.git
synced 2026-01-26 10:08:40 -05:00
Provisioning on demand initially functioning prototype.
This commit is contained in:
parent
edd9d9e62c
commit
70f0482d33
3 changed files with 123 additions and 40 deletions
|
|
@ -59,6 +59,7 @@ module Vmpooler
|
||||||
|
|
||||||
# Set some configuration defaults
|
# Set some configuration defaults
|
||||||
parsed_config[:config]['task_limit'] = string_to_int(ENV['TASK_LIMIT']) || parsed_config[:config]['task_limit'] || 10
|
parsed_config[:config]['task_limit'] = string_to_int(ENV['TASK_LIMIT']) || parsed_config[:config]['task_limit'] || 10
|
||||||
|
parsed_config[:config]['ondemand_clone_limit'] = string_to_int(ENV['ONDEMAND_CLONE_LIMIT']) || parsed_config[:config]['ondemand_clone_limit'] || 10
|
||||||
parsed_config[:config]['migration_limit'] = string_to_int(ENV['MIGRATION_LIMIT']) if ENV['MIGRATION_LIMIT']
|
parsed_config[:config]['migration_limit'] = string_to_int(ENV['MIGRATION_LIMIT']) if ENV['MIGRATION_LIMIT']
|
||||||
parsed_config[:config]['vm_checktime'] = string_to_int(ENV['VM_CHECKTIME']) || parsed_config[:config]['vm_checktime'] || 1
|
parsed_config[:config]['vm_checktime'] = string_to_int(ENV['VM_CHECKTIME']) || parsed_config[:config]['vm_checktime'] || 1
|
||||||
parsed_config[:config]['vm_lifetime'] = string_to_int(ENV['VM_LIFETIME']) || parsed_config[:config]['vm_lifetime'] || 24
|
parsed_config[:config]['vm_lifetime'] = string_to_int(ENV['VM_LIFETIME']) || parsed_config[:config]['vm_lifetime'] || 24
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ module Vmpooler
|
||||||
weighted_pools = get_pool_weights(template_backends)
|
weighted_pools = get_pool_weights(template_backends)
|
||||||
|
|
||||||
pickup = Pickup.new(weighted_pools) if weighted_pools.count == template_backends.count
|
pickup = Pickup.new(weighted_pools) if weighted_pools.count == template_backends.count
|
||||||
count.times do
|
count.to_i.times do
|
||||||
if pickup
|
if pickup
|
||||||
selection << pickup.pick
|
selection << pickup.pick
|
||||||
else
|
else
|
||||||
|
|
@ -95,12 +95,12 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
count.times do
|
count.to_i.times do
|
||||||
selection << template
|
selection << template
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
return count_selection(selection)
|
count_selection(selection)
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_single_vm(template)
|
def fetch_single_vm(template)
|
||||||
|
|
@ -342,17 +342,22 @@ module Vmpooler
|
||||||
|
|
||||||
status 201
|
status 201
|
||||||
|
|
||||||
requested = payload[:requested]
|
|
||||||
platforms_with_aliases = []
|
platforms_with_aliases = []
|
||||||
requested.each do |poolname, count|
|
payload.delete('request_id')
|
||||||
|
payload.each do |poolname, count|
|
||||||
selection = evaluate_template_aliases(poolname, count)
|
selection = evaluate_template_aliases(poolname, count)
|
||||||
selection.map { |aliasname, count| platforms_with_aliases << "#{poolname}:#{aliasname}:#{count}" }
|
selection.map { |alias_name, alias_count| platforms_with_aliases << "#{poolname}:#{alias_name}:#{alias_count}" }
|
||||||
end
|
end
|
||||||
|
|
||||||
platforms_string = platforms_with_aliases.join(',')
|
platforms_string = platforms_with_aliases.join(',')
|
||||||
backend.hset("vmpooler__odrequest__#{request_id}", 'requested', platforms_string)
|
backend.hset("vmpooler__odrequest__#{request_id}", 'requested', platforms_string)
|
||||||
|
if Vmpooler::API.settings.config[:auth] and has_token?
|
||||||
|
backend.hset("vmpooler__odrequest__#{request_id}", 'token:token', request.env['HTTP_X_AUTH_TOKEN'])
|
||||||
|
backend.hset("vmpooler__odrequest__#{request_id}", 'token:user',
|
||||||
|
backend.hget('vmpooler__token__' + request.env['HTTP_X_AUTH_TOKEN'], 'user'))
|
||||||
|
end
|
||||||
|
|
||||||
result['ok'] = true
|
result[:ok] = true
|
||||||
result
|
result
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -772,7 +777,7 @@ module Vmpooler
|
||||||
JSON.pretty_generate(result)
|
JSON.pretty_generate(result)
|
||||||
end
|
end
|
||||||
|
|
||||||
post "#{api_prefix}/ondemandrequest/?" do
|
post "#{api_prefix}/ondemandvm/?" do
|
||||||
content_type :json
|
content_type :json
|
||||||
result = { 'ok' => false }
|
result = { 'ok' => false }
|
||||||
|
|
||||||
|
|
@ -796,13 +801,12 @@ module Vmpooler
|
||||||
JSON.pretty_generate(result)
|
JSON.pretty_generate(result)
|
||||||
end
|
end
|
||||||
|
|
||||||
get "#{api_prefix}/ondemandrequest/:requestid/?" do
|
get "#{api_prefix}/ondemandvm/:requestid/?" do
|
||||||
content_type :json
|
content_type :json
|
||||||
|
result = { 'ok' => false }
|
||||||
result = {'ok': false}
|
|
||||||
|
|
||||||
status 404
|
status 404
|
||||||
result = check_ondemand_request(payload)
|
result = check_ondemand_request(params[:requestid])
|
||||||
|
|
||||||
JSON.pretty_generate(result)
|
JSON.pretty_generate(result)
|
||||||
end
|
end
|
||||||
|
|
@ -882,23 +886,26 @@ module Vmpooler
|
||||||
invalid
|
invalid
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_ondemand_request(payload)
|
def check_ondemand_request(request_id)
|
||||||
result = {'ok': false}
|
result = { 'ok' => false }
|
||||||
request_id = payload[:request_id]
|
result['request_id'] = request_id
|
||||||
|
result['ready'] = false
|
||||||
request_hash = backend.hgetall("vmpooler__odrequest__#{request_id}")
|
request_hash = backend.hgetall("vmpooler__odrequest__#{request_id}")
|
||||||
if request_hash.empty?
|
if request_hash.empty?
|
||||||
result['message'] = "no request found for request_id '#{request_id}'"
|
result['message'] = "no request found for request_id '#{request_id}'"
|
||||||
return result
|
return result
|
||||||
end
|
end
|
||||||
|
|
||||||
|
status 202
|
||||||
|
result['ok'] = true
|
||||||
|
|
||||||
if request_hash['status'] == 'ready'
|
if request_hash['status'] == 'ready'
|
||||||
result['status'] = 'ready'
|
result['ready'] = true
|
||||||
instances = {}
|
platform_parts = request_hash['requested'].split(',')
|
||||||
platforms = request_hash['requested'].split(',').map( { |r| r.split(':')[0] } )
|
platform_parts.each do |platform|
|
||||||
platforms.each do |platform|
|
pool_alias, pool, _count = platform.split(':')
|
||||||
instances[platform] = request_hash[platform].split(':')
|
result[pool_alias] = { 'hostname': request_hash[pool].split(':') }
|
||||||
end
|
end
|
||||||
result['instances'] = instances
|
|
||||||
status 200
|
status 200
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -88,11 +88,11 @@ module Vmpooler
|
||||||
return if mutex.locked?
|
return if mutex.locked?
|
||||||
|
|
||||||
mutex.synchronize do
|
mutex.synchronize do
|
||||||
request_id = $redis.hget("vmpooler__vm__#{vm}", request_id)
|
request_id = $redis.hget("vmpooler__vm__#{vm}", 'request_id')
|
||||||
if provider.vm_ready?(pool, vm)
|
if provider.vm_ready?(pool, vm)
|
||||||
move_pending_vm_to_ready(vm, pool, request_id)
|
move_pending_vm_to_ready(vm, pool, request_id)
|
||||||
else
|
else
|
||||||
fail_pending_vm(vm, pool, timeout, request_id=request_id)
|
fail_pending_vm(vm, pool, timeout, request_id = request_id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
@ -109,9 +109,10 @@ module Vmpooler
|
||||||
time_since_clone = (Time.now - Time.parse(clone_stamp)) / 60
|
time_since_clone = (Time.now - Time.parse(clone_stamp)) / 60
|
||||||
if time_since_clone > timeout
|
if time_since_clone > timeout
|
||||||
if exists
|
if exists
|
||||||
|
pool_alias = get_alias_for_request(pool, request_id) if $request_id
|
||||||
$redis.multi
|
$redis.multi
|
||||||
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__completed__' + pool, vm)
|
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__completed__' + pool, vm)
|
||||||
$redis.zadd('vmpooler__odcreate__task', 1, "#{pool_name}:1:#{request_id}") if request_id
|
$redis.zadd('vmpooler__odcreate__task', 1, "#{pool_alias}:#{pool_name}:1:#{request_id}") if request_id
|
||||||
$redis.exec
|
$redis.exec
|
||||||
$metrics.increment("errors.markedasfailed.#{pool}")
|
$metrics.increment("errors.markedasfailed.#{pool}")
|
||||||
$logger.log('d', "[!] [#{pool}] '#{vm}' marked as 'failed' after #{timeout} minutes")
|
$logger.log('d', "[!] [#{pool}] '#{vm}' marked as 'failed' after #{timeout} minutes")
|
||||||
|
|
@ -131,11 +132,14 @@ module Vmpooler
|
||||||
|
|
||||||
if request_id
|
if request_id
|
||||||
fulfilled = $redis.hget("vmpooler__odrequest__#{request_id}", pool)
|
fulfilled = $redis.hget("vmpooler__odrequest__#{request_id}", pool)
|
||||||
fulfilled = "#{fulfilled}:vm" if fulfilled
|
fulfilled = "#{fulfilled}:#{vm}" if fulfilled
|
||||||
fulfilled = vm unless fulfilled
|
fulfilled ||= vm
|
||||||
|
|
||||||
$redis.multi
|
$redis.multi
|
||||||
|
$redis.hset("vmpooler__active__#{pool}", vm, Time.now)
|
||||||
|
$redis.hset("vmpooler__vm__#{vm}", 'checkout', Time.now)
|
||||||
$redis.hset("vmpooler__odrequest__#{request_id}", pool, fulfilled)
|
$redis.hset("vmpooler__odrequest__#{request_id}", pool, fulfilled)
|
||||||
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__running__' + pool, vm)
|
move_vm_queue(pool, vm, 'pending', 'running')
|
||||||
else
|
else
|
||||||
$redis.multi
|
$redis.multi
|
||||||
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__ready__' + pool, vm)
|
$redis.smove('vmpooler__pending__' + pool, 'vmpooler__ready__' + pool, vm)
|
||||||
|
|
@ -289,8 +293,10 @@ module Vmpooler
|
||||||
_clone_vm(pool_name, provider, request_id)
|
_clone_vm(pool_name, provider, request_id)
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
if request_id
|
if request_id
|
||||||
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM for request #{request_id} with an error: #{e}") if request_id
|
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM for request #{request_id} with an error: #{e}")
|
||||||
$redis.zadd('vmpooler__odcreate__task', 1, "#{pool_name}:1:#{request_id}") if request_id
|
pool_alias = get_alias_for_request(pool, request_id)
|
||||||
|
$logger.log('s', "Pool_alias is #{pool_alias}")
|
||||||
|
$redis.zadd('vmpooler__odcreate__task', 1, "#{pool_alias}:#{pool_name}:1:#{request_id}")
|
||||||
else
|
else
|
||||||
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM with an error: #{e}")
|
$logger.log('s', "[!] [#{pool_name}] failed while cloning VM with an error: #{e}")
|
||||||
end
|
end
|
||||||
|
|
@ -806,6 +812,17 @@ module Vmpooler
|
||||||
break if $redis.sismember('vmpooler__poolreset', options[:poolname])
|
break if $redis.sismember('vmpooler__poolreset', options[:poolname])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if options[:pending_vm]
|
||||||
|
pending = $redis.scard("vmpooler__pending__#{options[:poolname]}")
|
||||||
|
break if pending
|
||||||
|
end
|
||||||
|
|
||||||
|
if options[:ondemand_request]
|
||||||
|
break if $redis.zcard('vmpooler__provisioning__request')
|
||||||
|
break if $redis.zcard('vmpooler__provisioning__processing')
|
||||||
|
break if $redis.zcard('vmpooler__odcreate__task')
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
break if time_passed?(:exit_by, exit_by)
|
break if time_passed?(:exit_by, exit_by)
|
||||||
|
|
@ -844,7 +861,7 @@ module Vmpooler
|
||||||
loop_delay = (loop_delay * loop_delay_decay).to_i
|
loop_delay = (loop_delay * loop_delay_decay).to_i
|
||||||
loop_delay = loop_delay_max if loop_delay > loop_delay_max
|
loop_delay = loop_delay_max if loop_delay > loop_delay_max
|
||||||
end
|
end
|
||||||
sleep_with_wakeup_events(loop_delay, loop_delay_min, pool_size_change: true, poolname: pool['name'], pool_template_change: true, clone_target_change: true, pool_reset: true)
|
sleep_with_wakeup_events(loop_delay, loop_delay_min, pool_size_change: true, poolname: pool['name'], pool_template_change: true, clone_target_change: true, pending_vm: true, pool_reset: true)
|
||||||
|
|
||||||
unless maxloop == 0
|
unless maxloop == 0
|
||||||
break if loop_count >= maxloop
|
break if loop_count >= maxloop
|
||||||
|
|
@ -1277,45 +1294,58 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_ondemand_requests
|
def process_ondemand_requests
|
||||||
requests = $redis.zrange('vmpooler__provisioning__requests', 0, -1)
|
requests = $redis.zrange('vmpooler__provisioning__request', 0, -1)
|
||||||
|
|
||||||
requests&.map { |request_id| create_ondemand_vms(request_id) }
|
requests&.map { |request_id| create_ondemand_vms(request_id) }
|
||||||
|
|
||||||
provisioning_tasks = process_ondemand_vms
|
provisioning_tasks = process_ondemand_vms
|
||||||
|
|
||||||
return requests.length + provisioning_tasks
|
requests_ready = check_ondemand_requests_ready
|
||||||
|
|
||||||
|
requests.length + provisioning_tasks + requests_ready
|
||||||
end
|
end
|
||||||
|
|
||||||
def create_ondemand_vms(request_id)
|
def create_ondemand_vms(request_id)
|
||||||
requested = $redis.hget("vmpooler__odrequest__#{request_id}", 'requested')
|
requested = $redis.hget("vmpooler__odrequest__#{request_id}", 'requested')
|
||||||
|
if ! requested
|
||||||
|
$logger.log('s', "Failed to find odrequest for request_id '#{request_id}'")
|
||||||
|
$redis.zrem('vmpooler__provisioning__request', request_id)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
score = $redis.zscore('vmpooler__provisioning__request', request_id)
|
||||||
requested = requested.split(',')
|
requested = requested.split(',')
|
||||||
|
|
||||||
$redis.multi
|
$redis.multi
|
||||||
requested.map { |request| $redis.zadd('vmpooler__odcreate__task', Time.now, "#{request}:#{request_id}") }
|
requested.each do |request|
|
||||||
$redis.zrem('vmpooler__provisioning__requests', request_id)
|
$redis.zadd('vmpooler__odcreate__task', Time.now.to_i, "#{request}:#{request_id}")
|
||||||
|
end
|
||||||
|
$redis.zrem('vmpooler__provisioning__request', request_id)
|
||||||
|
$redis.zadd('vmpooler__provisioning__processing', score, request_id)
|
||||||
$redis.exec
|
$redis.exec
|
||||||
end
|
end
|
||||||
|
|
||||||
def process_ondemand_vms
|
def process_ondemand_vms
|
||||||
queue_key = 'vmpooler__odcreate__task'
|
queue_key = 'vmpooler__odcreate__task'
|
||||||
queue = $redis.zrange(queue_key, 0, -1)
|
queue = $redis.zrange(queue_key, 0, -1)
|
||||||
ondemand_clone_limit = $config[:config]['ondemand_clone_limit'].to_i
|
ondemand_clone_limit = $config[:config]['ondemand_clone_limit']
|
||||||
|
@tasks['ondemand_clone_count'] = 0 unless @tasks['ondemand_clone_count']
|
||||||
queue.each do |request|
|
queue.each do |request|
|
||||||
requested_platform, fulfilled_platform, count, request_id = request.split(':')
|
|
||||||
if @tasks['ondemand_clone_count'] < ondemand_clone_limit
|
if @tasks['ondemand_clone_count'] < ondemand_clone_limit
|
||||||
provider = get_provider_for_pool(platform)
|
requested_platform, fulfilled_platform, count, request_id = request.split(':')
|
||||||
|
count = count.to_i
|
||||||
|
provider = get_provider_for_pool(fulfilled_platform)
|
||||||
delta = ondemand_clone_limit - @tasks['ondemand_clone_count']
|
delta = ondemand_clone_limit - @tasks['ondemand_clone_count']
|
||||||
if delta >= count
|
if delta >= count
|
||||||
count.times do
|
count.times do
|
||||||
@tasks['ondemand_clone_count'] += 1
|
@tasks['ondemand_clone_count'] += 1
|
||||||
clone_vm(platform, provider, request_id)
|
clone_vm(fulfilled_platform, provider, request_id)
|
||||||
end
|
end
|
||||||
$redis.zrem(queue_key, request_id)
|
$redis.zrem(queue_key, request)
|
||||||
else
|
else
|
||||||
available_slots = delta - count
|
available_slots = delta - count
|
||||||
available_slots.times do
|
available_slots.times do
|
||||||
@tasks['ondemand_clone_count'] += 1
|
@tasks['ondemand_clone_count'] += 1
|
||||||
clone_vm(platform, provider, request_id)
|
clone_vm(fulfilled_platform, provider, request_id)
|
||||||
end
|
end
|
||||||
score = $redis.zscore(queue_key, request_id)
|
score = $redis.zscore(queue_key, request_id)
|
||||||
$redis.multi
|
$redis.multi
|
||||||
|
|
@ -1327,6 +1357,51 @@ module Vmpooler
|
||||||
break
|
break
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
queue.length
|
||||||
|
end
|
||||||
|
|
||||||
|
def get_alias_for_request(pool, request_id)
|
||||||
|
# Identify the alias associated with a request_id for retries to get tagged properly
|
||||||
|
requested = $redis.hget("vmpooler__odrequest__#{request_id}", 'requested')
|
||||||
|
requested.split(',').each do |request|
|
||||||
|
pool_alias, pool_name, count = request.split(':')
|
||||||
|
return pool_alias if pool_name == pool
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def vms_ready?(request_id)
|
||||||
|
catch :request_not_ready do
|
||||||
|
request_hash = $redis.hgetall("vmpooler__odrequest__#{request_id}")
|
||||||
|
requested_platforms = request_hash['requested'].split(',')
|
||||||
|
requested_platforms.each do |platform|
|
||||||
|
_platform_alias, pool, count = platform.split(':')
|
||||||
|
pools_filled = request_hash[pool]
|
||||||
|
throw :request_not_ready unless pools_filled
|
||||||
|
fulfilled_count = request_hash[pool].split(':').size
|
||||||
|
throw :request_not_ready unless fulfilled_count == count.to_i
|
||||||
|
end
|
||||||
|
return true
|
||||||
|
end
|
||||||
|
false
|
||||||
|
end
|
||||||
|
|
||||||
|
def check_ondemand_requests_ready
|
||||||
|
in_progress_requests = $redis.zrange('vmpooler__provisioning__processing', 0, -1)
|
||||||
|
in_progress_requests&.each do |request_id|
|
||||||
|
next unless vms_ready?(request_id)
|
||||||
|
|
||||||
|
$redis.multi
|
||||||
|
$redis.hset("vmpooler__odrequest__#{request_id}", 'status', 'ready')
|
||||||
|
$redis.zrem('vmpooler__provisioning__processing', request_id)
|
||||||
|
$redis.exec
|
||||||
|
end
|
||||||
|
in_progress_requests.length
|
||||||
|
end
|
||||||
|
|
||||||
|
def move_vm_to_running(vm, pool, request_id)
|
||||||
|
$redis.multi
|
||||||
|
$redis.exec
|
||||||
|
# Need to add auth data here
|
||||||
end
|
end
|
||||||
|
|
||||||
def execute!(maxloop = 0, loop_delay = 1)
|
def execute!(maxloop = 0, loop_delay = 1)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue