diff --git a/lib/vmpooler/api/helpers.rb b/lib/vmpooler/api/helpers.rb index c7be9ba..0f216ab 100644 --- a/lib/vmpooler/api/helpers.rb +++ b/lib/vmpooler/api/helpers.rb @@ -6,108 +6,61 @@ module Vmpooler module Helpers + def tracer + @tracer ||= OpenTelemetry.tracer_provider.tracer('api', Vmpooler::VERSION) + end + def has_token? request.env['HTTP_X_AUTH_TOKEN'].nil? ? false : true end def valid_token?(backend) - return false unless has_token? + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + return false unless has_token? - backend.exists?("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}") ? true : false + backend.exists?("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}") ? true : false + end end def validate_token(backend) - if valid_token?(backend) - backend.hset("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}", 'last', Time.now) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + if valid_token?(backend) + backend.hset("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}", 'last', Time.now) - return true + return true + end + + content_type :json + + result = { 'ok' => false } + + headers['WWW-Authenticate'] = 'Basic realm="Authentication required"' + halt 401, JSON.pretty_generate(result) end - - content_type :json - - result = { 'ok' => false } - - headers['WWW-Authenticate'] = 'Basic realm="Authentication required"' - halt 401, JSON.pretty_generate(result) end def validate_auth(backend) - return if authorized? + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + return if authorized? - content_type :json + content_type :json - result = { 'ok' => false } + result = { 'ok' => false } - headers['WWW-Authenticate'] = 'Basic realm="Authentication required"' - halt 401, JSON.pretty_generate(result) + headers['WWW-Authenticate'] = 'Basic realm="Authentication required"' + halt 401, JSON.pretty_generate(result) + end end def authorized? - @auth ||= Rack::Auth::Basic::Request.new(request.env) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + @auth ||= Rack::Auth::Basic::Request.new(request.env) - if @auth.provided? and @auth.basic? and @auth.credentials - username, password = @auth.credentials + if @auth.provided? and @auth.basic? and @auth.credentials + username, password = @auth.credentials - if authenticate(Vmpooler::API.settings.config[:auth], username, password) - return true - end - end - - return false - end - - def authenticate_ldap(port, host, encryption_hash, user_object, base, username_str, password_str) - ldap = Net::LDAP.new( - :host => host, - :port => port, - :encryption => encryption_hash, - :base => base, - :auth => { - :method => :simple, - :username => "#{user_object}=#{username_str},#{base}", - :password => password_str - } - ) - - return true if ldap.bind - - return false - end - - def authenticate(auth, username_str, password_str) - case auth['provider'] - when 'dummy' - return (username_str != password_str) - when 'ldap' - ldap_base = auth[:ldap]['base'] - ldap_port = auth[:ldap]['port'] || 389 - ldap_user_obj = auth[:ldap]['user_object'] - ldap_host = auth[:ldap]['host'] - ldap_encryption_hash = auth[:ldap]['encryption'] || { - :method => :start_tls, - :tls_options => { :ssl_version => 'TLSv1' } - } - - unless ldap_base.is_a? Array - ldap_base = ldap_base.split - end - - unless ldap_user_obj.is_a? Array - ldap_user_obj = ldap_user_obj.split - end - - ldap_base.each do |search_base| - ldap_user_obj.each do |search_user_obj| - result = authenticate_ldap( - ldap_port, - ldap_host, - ldap_encryption_hash, - search_user_obj, - search_base, - username_str, - password_str - ) - return true if result + if authenticate(Vmpooler::API.settings.config[:auth], username, password) + return true end end @@ -115,27 +68,108 @@ module Vmpooler end end - def export_tags(backend, hostname, tags) - backend.pipelined do - tags.each_pair do |tag, value| - next if value.nil? or value.empty? + def authenticate_ldap(port, host, encryption_hash, user_object, base, username_str, password_str) + tracer.in_span( + "Vmpooler::API::Helpers.#{__method__}", + attributes: { + 'net.peer.name' => host, + 'net.peer.port' => port, + 'net.transport' => 'ip_tcp', + 'enduser.id' => username_str + }, + kind: :client + ) do + ldap = Net::LDAP.new( + :host => host, + :port => port, + :encryption => encryption_hash, + :base => base, + :auth => { + :method => :simple, + :username => "#{user_object}=#{username_str},#{base}", + :password => password_str + } + ) - backend.hset("vmpooler__vm__#{hostname}", "tag:#{tag}", value) - backend.hset("vmpooler__tag__#{Date.today}", "#{hostname}:#{tag}", value) + return true if ldap.bind + + return false + end + end + + def authenticate(auth, username_str, password_str) + tracer.in_span( + "Vmpooler::API::Helpers.#{__method__}", + attributes: { + 'enduser.id' => username_str + } + ) do + case auth['provider'] + when 'dummy' + return (username_str != password_str) + when 'ldap' + ldap_base = auth[:ldap]['base'] + ldap_port = auth[:ldap]['port'] || 389 + ldap_user_obj = auth[:ldap]['user_object'] + ldap_host = auth[:ldap]['host'] + ldap_encryption_hash = auth[:ldap]['encryption'] || { + :method => :start_tls, + :tls_options => { :ssl_version => 'TLSv1' } + } + + unless ldap_base.is_a? Array + ldap_base = ldap_base.split + end + + unless ldap_user_obj.is_a? Array + ldap_user_obj = ldap_user_obj.split + end + + ldap_base.each do |search_base| + ldap_user_obj.each do |search_user_obj| + result = authenticate_ldap( + ldap_port, + ldap_host, + ldap_encryption_hash, + search_user_obj, + search_base, + username_str, + password_str + ) + return true if result + end + end + + return false + end + end + end + + def export_tags(backend, hostname, tags) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + backend.pipelined do + tags.each_pair do |tag, value| + next if value.nil? or value.empty? + + backend.hset("vmpooler__vm__#{hostname}", "tag:#{tag}", value) + backend.hset("vmpooler__tag__#{Date.today}", "#{hostname}:#{tag}", value) + end end end end def filter_tags(tags) - return unless Vmpooler::API.settings.config[:tagfilter] + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + return unless Vmpooler::API.settings.config[:tagfilter] - tags.each_pair do |tag, value| - next unless filter = Vmpooler::API.settings.config[:tagfilter][tag] + tags.each_pair do |tag, value| + next unless filter = Vmpooler::API.settings.config[:tagfilter][tag] - tags[tag] = value.match(filter).captures.join if value.match(filter) + tags[tag] = value.match(filter).captures.join if value.match(filter) + end + + tags end - - tags end def mean(list) @@ -156,301 +190,321 @@ module Vmpooler end def get_task_times(backend, task, date_str) - backend.hvals("vmpooler__#{task}__" + date_str).map(&:to_f) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + backend.hvals("vmpooler__#{task}__" + date_str).map(&:to_f) + end end # Takes the pools and a key to run scard on # returns an integer for the total count def get_total_across_pools_redis_scard(pools, key, backend) - # using pipelined is much faster than querying each of the pools and adding them - # as we get the result. - res = backend.pipelined do - pools.each do |pool| - backend.scard(key + pool['name']) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + # using pipelined is much faster than querying each of the pools and adding them + # as we get the result. + res = backend.pipelined do + pools.each do |pool| + backend.scard(key + pool['name']) + end end + res.inject(0) { |m, x| m + x }.to_i end - res.inject(0) { |m, x| m + x }.to_i end # Takes the pools and a key to run scard on # returns a hash with each pool name as key and the value being the count as integer def get_list_across_pools_redis_scard(pools, key, backend) - # using pipelined is much faster than querying each of the pools and adding them - # as we get the result. - temp_hash = {} - res = backend.pipelined do - pools.each do |pool| - backend.scard(key + pool['name']) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + # using pipelined is much faster than querying each of the pools and adding them + # as we get the result. + temp_hash = {} + res = backend.pipelined do + pools.each do |pool| + backend.scard(key + pool['name']) + end end + pools.each_with_index do |pool, i| + temp_hash[pool['name']] = res[i].to_i + end + temp_hash end - pools.each_with_index do |pool, i| - temp_hash[pool['name']] = res[i].to_i - end - temp_hash end # Takes the pools and a key to run hget on # returns a hash with each pool name as key and the value as string def get_list_across_pools_redis_hget(pools, key, backend) - # using pipelined is much faster than querying each of the pools and adding them - # as we get the result. - temp_hash = {} - res = backend.pipelined do - pools.each do |pool| - backend.hget(key, pool['name']) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + # using pipelined is much faster than querying each of the pools and adding them + # as we get the result. + temp_hash = {} + res = backend.pipelined do + pools.each do |pool| + backend.hget(key, pool['name']) + end end + pools.each_with_index do |pool, i| + temp_hash[pool['name']] = res[i].to_s + end + temp_hash end - pools.each_with_index do |pool, i| - temp_hash[pool['name']] = res[i].to_s - end - temp_hash end def get_capacity_metrics(pools, backend) - capacity = { - current: 0, - total: 0, - percent: 0 - } + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + capacity = { + current: 0, + total: 0, + percent: 0 + } - pools.each do |pool| - capacity[:total] += pool['size'].to_i + pools.each do |pool| + capacity[:total] += pool['size'].to_i + end + + capacity[:current] = get_total_across_pools_redis_scard(pools, 'vmpooler__ready__', backend) + + if capacity[:total] > 0 + capacity[:percent] = (capacity[:current].fdiv(capacity[:total]) * 100.0).round(1) + end + + capacity end - - capacity[:current] = get_total_across_pools_redis_scard(pools, 'vmpooler__ready__', backend) - - if capacity[:total] > 0 - capacity[:percent] = (capacity[:current].fdiv(capacity[:total]) * 100.0).round(1) - end - - capacity end def get_queue_metrics(pools, backend) - queue = { - pending: 0, - cloning: 0, - booting: 0, - ready: 0, - running: 0, - completed: 0, - total: 0 - } + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + queue = { + pending: 0, + cloning: 0, + booting: 0, + ready: 0, + running: 0, + completed: 0, + total: 0 + } - queue[:pending] = get_total_across_pools_redis_scard(pools, 'vmpooler__pending__', backend) - queue[:ready] = get_total_across_pools_redis_scard(pools, 'vmpooler__ready__', backend) - queue[:running] = get_total_across_pools_redis_scard(pools, 'vmpooler__running__', backend) - queue[:completed] = get_total_across_pools_redis_scard(pools, 'vmpooler__completed__', backend) + queue[:pending] = get_total_across_pools_redis_scard(pools, 'vmpooler__pending__', backend) + queue[:ready] = get_total_across_pools_redis_scard(pools, 'vmpooler__ready__', backend) + queue[:running] = get_total_across_pools_redis_scard(pools, 'vmpooler__running__', backend) + queue[:completed] = get_total_across_pools_redis_scard(pools, 'vmpooler__completed__', backend) - queue[:cloning] = backend.get('vmpooler__tasks__clone').to_i + backend.get('vmpooler__tasks__ondemandclone').to_i - queue[:booting] = queue[:pending].to_i - queue[:cloning].to_i - queue[:booting] = 0 if queue[:booting] < 0 - queue[:total] = queue[:pending].to_i + queue[:ready].to_i + queue[:running].to_i + queue[:completed].to_i + queue[:cloning] = backend.get('vmpooler__tasks__clone').to_i + backend.get('vmpooler__tasks__ondemandclone').to_i + queue[:booting] = queue[:pending].to_i - queue[:cloning].to_i + queue[:booting] = 0 if queue[:booting] < 0 + queue[:total] = queue[:pending].to_i + queue[:ready].to_i + queue[:running].to_i + queue[:completed].to_i - queue + queue + end end def get_tag_metrics(backend, date_str, opts = {}) - opts = {:only => false}.merge(opts) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + opts = {:only => false}.merge(opts) - tags = {} + tags = {} - backend.hgetall("vmpooler__tag__#{date_str}").each do |key, value| - hostname = 'unknown' - tag = 'unknown' + backend.hgetall("vmpooler__tag__#{date_str}").each do |key, value| + hostname = 'unknown' + tag = 'unknown' - if key =~ /:/ - hostname, tag = key.split(':', 2) + if key =~ /:/ + hostname, tag = key.split(':', 2) + end + + next if opts[:only] && tag != opts[:only] + + tags[tag] ||= {} + tags[tag][value] ||= 0 + tags[tag][value] += 1 + + tags[tag]['total'] ||= 0 + tags[tag]['total'] += 1 end - next if opts[:only] && tag != opts[:only] - - tags[tag] ||= {} - tags[tag][value] ||= 0 - tags[tag][value] += 1 - - tags[tag]['total'] ||= 0 - tags[tag]['total'] += 1 + tags end - - tags end def get_tag_summary(backend, from_date, to_date, opts = {}) - opts = {:only => false}.merge(opts) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + opts = {:only => false}.merge(opts) - result = { - tag: {}, - daily: [] - } - - (from_date..to_date).each do |date| - daily = { - date: date.to_s, - tag: get_tag_metrics(backend, date.to_s, opts) + result = { + tag: {}, + daily: [] } - result[:daily].push(daily) - end - result[:daily].each do |daily| - daily[:tag].each_key do |tag| - result[:tag][tag] ||= {} + (from_date..to_date).each do |date| + daily = { + date: date.to_s, + tag: get_tag_metrics(backend, date.to_s, opts) + } + result[:daily].push(daily) + end - daily[:tag][tag].each do |key, value| - result[:tag][tag][key] ||= 0 - result[:tag][tag][key] += value + result[:daily].each do |daily| + daily[:tag].each_key do |tag| + result[:tag][tag] ||= {} + + daily[:tag][tag].each do |key, value| + result[:tag][tag][key] ||= 0 + result[:tag][tag][key] += value + end end end - end - result + result + end end def get_task_metrics(backend, task_str, date_str, opts = {}) - opts = {:bypool => false, :only => false}.merge(opts) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + opts = {:bypool => false, :only => false}.merge(opts) - task = { - duration: { - average: 0, - min: 0, - max: 0, - total: 0 - }, - count: { - total: 0 - } - } + task = { + duration: { + average: 0, + min: 0, + max: 0, + total: 0 + }, + count: { + total: 0 + } + } - task[:count][:total] = backend.hlen("vmpooler__#{task_str}__#{date_str}").to_i + task[:count][:total] = backend.hlen("vmpooler__#{task_str}__#{date_str}").to_i - if task[:count][:total] > 0 - if opts[:bypool] == true - task_times_bypool = {} + if task[:count][:total] > 0 + if opts[:bypool] == true + task_times_bypool = {} - task[:count][:pool] = {} - task[:duration][:pool] = {} + task[:count][:pool] = {} + task[:duration][:pool] = {} - backend.hgetall("vmpooler__#{task_str}__#{date_str}").each do |key, value| - pool = 'unknown' - hostname = 'unknown' + backend.hgetall("vmpooler__#{task_str}__#{date_str}").each do |key, value| + pool = 'unknown' + hostname = 'unknown' - if key =~ /:/ - pool, hostname = key.split(':') - else - hostname = key + if key =~ /:/ + pool, hostname = key.split(':') + else + hostname = key + end + + task[:count][:pool][pool] ||= {} + task[:duration][:pool][pool] ||= {} + + task_times_bypool[pool] ||= [] + task_times_bypool[pool].push(value.to_f) end - task[:count][:pool][pool] ||= {} - task[:duration][:pool][pool] ||= {} + task_times_bypool.each_key do |pool| + task[:count][:pool][pool][:total] = task_times_bypool[pool].length - task_times_bypool[pool] ||= [] - task_times_bypool[pool].push(value.to_f) + task[:duration][:pool][pool][:total] = task_times_bypool[pool].reduce(:+).to_f + task[:duration][:pool][pool][:average] = (task[:duration][:pool][pool][:total] / task[:count][:pool][pool][:total]).round(1) + task[:duration][:pool][pool][:min], task[:duration][:pool][pool][:max] = task_times_bypool[pool].minmax + end end - task_times_bypool.each_key do |pool| - task[:count][:pool][pool][:total] = task_times_bypool[pool].length + task_times = get_task_times(backend, task_str, date_str) - task[:duration][:pool][pool][:total] = task_times_bypool[pool].reduce(:+).to_f - task[:duration][:pool][pool][:average] = (task[:duration][:pool][pool][:total] / task[:count][:pool][pool][:total]).round(1) - task[:duration][:pool][pool][:min], task[:duration][:pool][pool][:max] = task_times_bypool[pool].minmax + task[:duration][:total] = task_times.reduce(:+).to_f + task[:duration][:average] = (task[:duration][:total] / task[:count][:total]).round(1) + task[:duration][:min], task[:duration][:max] = task_times.minmax + end + + if opts[:only] + task.each_key do |key| + task.delete(key) unless key.to_s == opts[:only] end end - task_times = get_task_times(backend, task_str, date_str) - - task[:duration][:total] = task_times.reduce(:+).to_f - task[:duration][:average] = (task[:duration][:total] / task[:count][:total]).round(1) - task[:duration][:min], task[:duration][:max] = task_times.minmax + task end - - if opts[:only] - task.each_key do |key| - task.delete(key) unless key.to_s == opts[:only] - end - end - - task end def get_task_summary(backend, task_str, from_date, to_date, opts = {}) - opts = {:bypool => false, :only => false}.merge(opts) + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + opts = {:bypool => false, :only => false}.merge(opts) - task_sym = task_str.to_sym + task_sym = task_str.to_sym - result = { - task_sym => {}, - daily: [] - } - - (from_date..to_date).each do |date| - daily = { - date: date.to_s, - task_sym => get_task_metrics(backend, task_str, date.to_s, opts) + result = { + task_sym => {}, + daily: [] } - result[:daily].push(daily) - end - daily_task = {} - daily_task_bypool = {} if opts[:bypool] == true + (from_date..to_date).each do |date| + daily = { + date: date.to_s, + task_sym => get_task_metrics(backend, task_str, date.to_s, opts) + } + result[:daily].push(daily) + end - result[:daily].each do |daily| - daily[task_sym].each_key do |type| - result[task_sym][type] ||= {} - daily_task[type] ||= {} + daily_task = {} + daily_task_bypool = {} if opts[:bypool] == true - ['min', 'max'].each do |key| - if daily[task_sym][type][key] - daily_task[type][:data] ||= [] - daily_task[type][:data].push(daily[task_sym][type][key]) + result[:daily].each do |daily| + daily[task_sym].each_key do |type| + result[task_sym][type] ||= {} + daily_task[type] ||= {} + + ['min', 'max'].each do |key| + if daily[task_sym][type][key] + daily_task[type][:data] ||= [] + daily_task[type][:data].push(daily[task_sym][type][key]) + end + end + + result[task_sym][type][:total] ||= 0 + result[task_sym][type][:total] += daily[task_sym][type][:total] + + if opts[:bypool] == true + result[task_sym][type][:pool] ||= {} + daily_task_bypool[type] ||= {} + + next unless daily[task_sym][type][:pool] + + daily[task_sym][type][:pool].each_key do |pool| + result[task_sym][type][:pool][pool] ||= {} + daily_task_bypool[type][pool] ||= {} + + ['min', 'max'].each do |key| + if daily[task_sym][type][:pool][pool][key.to_sym] + daily_task_bypool[type][pool][:data] ||= [] + daily_task_bypool[type][pool][:data].push(daily[task_sym][type][:pool][pool][key.to_sym]) + end + end + + result[task_sym][type][:pool][pool][:total] ||= 0 + result[task_sym][type][:pool][pool][:total] += daily[task_sym][type][:pool][pool][:total] + end end end + end - result[task_sym][type][:total] ||= 0 - result[task_sym][type][:total] += daily[task_sym][type][:total] + result[task_sym].each_key do |type| + if daily_task[type][:data] + result[task_sym][type][:min], result[task_sym][type][:max] = daily_task[type][:data].minmax + result[task_sym][type][:average] = mean(daily_task[type][:data]) + end if opts[:bypool] == true - result[task_sym][type][:pool] ||= {} - daily_task_bypool[type] ||= {} - - next unless daily[task_sym][type][:pool] - - daily[task_sym][type][:pool].each_key do |pool| - result[task_sym][type][:pool][pool] ||= {} - daily_task_bypool[type][pool] ||= {} - - ['min', 'max'].each do |key| - if daily[task_sym][type][:pool][pool][key.to_sym] - daily_task_bypool[type][pool][:data] ||= [] - daily_task_bypool[type][pool][:data].push(daily[task_sym][type][:pool][pool][key.to_sym]) + result[task_sym].each_key do |type| + result[task_sym][type][:pool].each_key do |pool| + if daily_task_bypool[type][pool][:data] + result[task_sym][type][:pool][pool][:min], result[task_sym][type][:pool][pool][:max] = daily_task_bypool[type][pool][:data].minmax + result[task_sym][type][:pool][pool][:average] = mean(daily_task_bypool[type][pool][:data]) end end - - result[task_sym][type][:pool][pool][:total] ||= 0 - result[task_sym][type][:pool][pool][:total] += daily[task_sym][type][:pool][pool][:total] end end end + + result end - - result[task_sym].each_key do |type| - if daily_task[type][:data] - result[task_sym][type][:min], result[task_sym][type][:max] = daily_task[type][:data].minmax - result[task_sym][type][:average] = mean(daily_task[type][:data]) - end - - if opts[:bypool] == true - result[task_sym].each_key do |type| - result[task_sym][type][:pool].each_key do |pool| - if daily_task_bypool[type][pool][:data] - result[task_sym][type][:pool][pool][:min], result[task_sym][type][:pool][pool][:max] = daily_task_bypool[type][pool][:data].minmax - result[task_sym][type][:pool][pool][:average] = mean(daily_task_bypool[type][pool][:data]) - end - end - end - end - end - - result end def pool_index(pools) @@ -464,11 +518,13 @@ module Vmpooler end def template_ready?(pool, backend) - prepared_template = backend.hget('vmpooler__template__prepared', pool['name']) - return false if prepared_template.nil? - return true if pool['template'] == prepared_template + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + prepared_template = backend.hget('vmpooler__template__prepared', pool['name']) + return false if prepared_template.nil? + return true if pool['template'] == prepared_template - return false + return false + end end def is_integer?(x) @@ -479,26 +535,39 @@ module Vmpooler end def open_socket(host, domain = nil, timeout = 1, port = 22, &_block) - Timeout.timeout(timeout) do - target_host = host - target_host = "#{host}.#{domain}" if domain - sock = TCPSocket.new target_host, port - begin - yield sock if block_given? - ensure - sock.close + tracer.in_span( + "Vmpooler::API::Helpers.#{__method__}", + attributes: { + 'net.peer.port' => port, + 'net.transport' => 'ip_tcp' + }, + kind: :client + ) do + Timeout.timeout(timeout) do + target_host = host + target_host = "#{host}.#{domain}" if domain + span = OpenTelemetry::Trace.current_span + span.set_attribute('net.peer.name', target_host) + sock = TCPSocket.new target_host, port + begin + yield sock if block_given? + ensure + sock.close + end end end end def vm_ready?(vm_name, domain = nil) - begin - open_socket(vm_name, domain) - rescue StandardError => _e - return false - end + tracer.in_span("Vmpooler::API::Helpers.#{__method__}") do + begin + open_socket(vm_name, domain) + rescue StandardError => _e + return false + end - true + true + end end end end diff --git a/lib/vmpooler/api/v1.rb b/lib/vmpooler/api/v1.rb index f209de2..c11b2cc 100644 --- a/lib/vmpooler/api/v1.rb +++ b/lib/vmpooler/api/v1.rb @@ -49,13 +49,15 @@ module Vmpooler end def get_template_aliases(template) - result = [] - aliases = Vmpooler::API.settings.config[:alias] - if aliases - result += aliases[template] if aliases[template].is_a?(Array) - template_backends << aliases[template] if aliases[template].is_a?(String) + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = [] + aliases = Vmpooler::API.settings.config[:alias] + if aliases + result += aliases[template] if aliases[template].is_a?(Array) + template_backends << aliases[template] if aliases[template].is_a?(String) + end + result end - result end def get_pool_weights(template_backends) @@ -109,398 +111,463 @@ module Vmpooler end def fetch_single_vm(template) - template_backends = [template] - aliases = Vmpooler::API.settings.config[:alias] - if aliases - template_backends += aliases[template] if aliases[template].is_a?(Array) - template_backends << aliases[template] if aliases[template].is_a?(String) - pool_index = pool_index(pools) - weighted_pools = {} - template_backends.each do |t| - next unless pool_index.key? t + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + template_backends = [template] + aliases = Vmpooler::API.settings.config[:alias] + if aliases + template_backends += aliases[template] if aliases[template].is_a?(Array) + template_backends << aliases[template] if aliases[template].is_a?(String) + pool_index = pool_index(pools) + weighted_pools = {} + template_backends.each do |t| + next unless pool_index.key? t - index = pool_index[t] - clone_target = pools[index]['clone_target'] || config['clone_target'] - next unless config.key?('backend_weight') + index = pool_index[t] + clone_target = pools[index]['clone_target'] || config['clone_target'] + next unless config.key?('backend_weight') - weight = config['backend_weight'][clone_target] - if weight - weighted_pools[t] = weight - end - end - - if weighted_pools.count == template_backends.count - pickup = Pickup.new(weighted_pools) - selection = pickup.pick - template_backends.delete(selection) - template_backends.unshift(selection) - else - first = template_backends.sample - template_backends.delete(first) - template_backends.unshift(first) - end - end - - checkoutlock.synchronize do - template_backends.each do |template_backend| - vms = backend.smembers("vmpooler__ready__#{template_backend}") - next if vms.empty? - - vms.reverse.each do |vm| - ready = vm_ready?(vm, config['domain']) - if ready - smoved = backend.smove("vmpooler__ready__#{template_backend}", "vmpooler__running__#{template_backend}", vm) - if smoved - return [vm, template_backend, template] - else - metrics.increment("checkout.smove.failed.#{template_backend}") - return [nil, nil, nil] - end - else - backend.smove("vmpooler__ready__#{template_backend}", "vmpooler__completed__#{template_backend}", vm) - metrics.increment("checkout.nonresponsive.#{template_backend}") + weight = config['backend_weight'][clone_target] + if weight + weighted_pools[t] = weight end end + + if weighted_pools.count == template_backends.count + pickup = Pickup.new(weighted_pools) + selection = pickup.pick + template_backends.delete(selection) + template_backends.unshift(selection) + else + first = template_backends.sample + template_backends.delete(first) + template_backends.unshift(first) + end + end + + checkoutlock.synchronize do + template_backends.each do |template_backend| + vms = backend.smembers("vmpooler__ready__#{template_backend}") + next if vms.empty? + + vms.reverse.each do |vm| + ready = vm_ready?(vm, config['domain']) + if ready + smoved = backend.smove("vmpooler__ready__#{template_backend}", "vmpooler__running__#{template_backend}", vm) + if smoved + return [vm, template_backend, template] + else + metrics.increment("checkout.smove.failed.#{template_backend}") + return [nil, nil, nil] + end + else + backend.smove("vmpooler__ready__#{template_backend}", "vmpooler__completed__#{template_backend}", vm) + metrics.increment("checkout.nonresponsive.#{template_backend}") + end + end + end + [nil, nil, nil] end - [nil, nil, nil] end end def return_vm_to_ready_state(template, vm) - backend.srem("vmpooler__migrating__#{template}", vm) - backend.hdel("vmpooler__active__#{template}", vm) - backend.hdel("vmpooler__vm__#{vm}", 'checkout', 'token:token', 'token:user') - backend.smove("vmpooler__running__#{template}", "vmpooler__ready__#{template}", vm) + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + backend.srem("vmpooler__migrating__#{template}", vm) + backend.hdel("vmpooler__active__#{template}", vm) + backend.hdel("vmpooler__vm__#{vm}", 'checkout', 'token:token', 'token:user') + backend.smove("vmpooler__running__#{template}", "vmpooler__ready__#{template}", vm) + end end def account_for_starting_vm(template, vm) - user = backend.hget("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}", 'user') - has_token_result = has_token? - backend.sadd("vmpooler__migrating__#{template}", vm) - backend.hset("vmpooler__active__#{template}", vm, Time.now) - backend.hset("vmpooler__vm__#{vm}", 'checkout', Time.now) + tracer.in_span("Vmpooler::API::V1.#{__method__}") do |span| + user = backend.hget("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}", 'user') + span.set_attribute('enduser.id', user) + has_token_result = has_token? + backend.sadd("vmpooler__migrating__#{template}", vm) + backend.hset("vmpooler__active__#{template}", vm, Time.now) + backend.hset("vmpooler__vm__#{vm}", 'checkout', Time.now) - if Vmpooler::API.settings.config[:auth] and has_token_result - backend.hset("vmpooler__vm__#{vm}", 'token:token', request.env['HTTP_X_AUTH_TOKEN']) - backend.hset("vmpooler__vm__#{vm}", 'token:user', user) + if Vmpooler::API.settings.config[:auth] and has_token_result + backend.hset("vmpooler__vm__#{vm}", 'token:token', request.env['HTTP_X_AUTH_TOKEN']) + backend.hset("vmpooler__vm__#{vm}", 'token:user', user) - if config['vm_lifetime_auth'].to_i > 0 - backend.hset("vmpooler__vm__#{vm}", 'lifetime', config['vm_lifetime_auth'].to_i) + if config['vm_lifetime_auth'].to_i > 0 + backend.hset("vmpooler__vm__#{vm}", 'lifetime', config['vm_lifetime_auth'].to_i) + end end end end def update_result_hosts(result, template, vm) - result[template] ||= {} - if result[template]['hostname'] - result[template]['hostname'] = Array(result[template]['hostname']) - result[template]['hostname'].push(vm) - else - result[template]['hostname'] = vm + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result[template] ||= {} + if result[template]['hostname'] + result[template]['hostname'] = Array(result[template]['hostname']) + result[template]['hostname'].push(vm) + else + result[template]['hostname'] = vm + end end end def atomically_allocate_vms(payload) - result = { 'ok' => false } - failed = false - vms = [] + tracer.in_span("Vmpooler::API::V1.#{__method__}") do |span| + result = { 'ok' => false } + failed = false + vms = [] - validate_token(backend) if Vmpooler::API.settings.config[:auth] and has_token? + validate_token(backend) if Vmpooler::API.settings.config[:auth] and has_token? - payload.each do |requested, count| - count.to_i.times do |_i| - vmname, vmpool, vmtemplate = fetch_single_vm(requested) - if vmname - account_for_starting_vm(vmpool, vmname) - vms << [vmpool, vmname, vmtemplate] - metrics.increment("checkout.success.#{vmpool}") - update_user_metrics('allocate', vmname) if Vmpooler::API.settings.config[:config]['usage_stats'] - else - failed = true - metrics.increment("checkout.empty.#{requested}") - break + payload.each do |requested, count| + count.to_i.times do |_i| + vmname, vmpool, vmtemplate = fetch_single_vm(requested) + if vmname + account_for_starting_vm(vmpool, vmname) + vms << [vmpool, vmname, vmtemplate] + metrics.increment("checkout.success.#{vmpool}") + update_user_metrics('allocate', vmname) if Vmpooler::API.settings.config[:config]['usage_stats'] + else + failed = true + metrics.increment("checkout.empty.#{requested}") + break + end end end - end - if failed - vms.each do |(vmpool, vmname, _vmtemplate)| - return_vm_to_ready_state(vmpool, vmname) - end - status 503 - else - vms.each do |(_vmpool, vmname, vmtemplate)| - update_result_hosts(result, vmtemplate, vmname) + if failed + vms.each do |(vmpool, vmname, _vmtemplate)| + return_vm_to_ready_state(vmpool, vmname) + end + span.add_event('error', attributes: { + 'error.type' => 'Vmpooler::API::V1.atomically_allocate_vms', + 'error.message' => '503 due to failing to allocate one or more vms' + }) + status 503 + else + vm_names = [] + vms.each do |(_vmpool, vmname, vmtemplate)| + update_result_hosts(result, vmtemplate, vmname) + vm_names.append(vmname) + end + + span.set_attribute('vmpooler.vm_names', vm_names.join(',')) unless vm_names.empty? + + result['ok'] = true + result['domain'] = config['domain'] if config['domain'] end - result['ok'] = true - result['domain'] = config['domain'] if config['domain'] + result end - - result end def component_to_test(match, labels_string) - return if labels_string.nil? + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + return if labels_string.nil? - labels_string_parts = labels_string.split(',') - labels_string_parts.each do |part| - key, value = part.split('=') - next if value.nil? - return value if key == match + labels_string_parts = labels_string.split(',') + labels_string_parts.each do |part| + key, value = part.split('=') + next if value.nil? + return value if key == match + end + 'none' end - 'none' end def update_user_metrics(operation, vmname) - backend.multi - backend.hget("vmpooler__vm__#{vmname}", 'tag:jenkins_build_url') - backend.hget("vmpooler__vm__#{vmname}", 'token:user') - backend.hget("vmpooler__vm__#{vmname}", 'template') - jenkins_build_url, user, poolname = backend.exec - poolname = poolname.gsub('.', '_') + tracer.in_span("Vmpooler::API::V1.#{__method__}") do |span| + begin + backend.multi + backend.hget("vmpooler__vm__#{vmname}", 'tag:jenkins_build_url') + backend.hget("vmpooler__vm__#{vmname}", 'token:user') + backend.hget("vmpooler__vm__#{vmname}", 'template') + jenkins_build_url, user, poolname = backend.exec + poolname = poolname.gsub('.', '_') - if user - user = user.gsub('.', '_') - else - user = 'unauthenticated' - end - metrics.increment("user.#{user}.#{operation}.#{poolname}") + if user + user = user.gsub('.', '_') + else + user = 'unauthenticated' + end + metrics.increment("user.#{user}.#{operation}.#{poolname}") - if jenkins_build_url - if jenkins_build_url.include? 'litmus' - # Very simple filter for Litmus jobs - just count them coming through for the moment. - metrics.increment("usage_litmus.#{user}.#{operation}.#{poolname}") - return + if jenkins_build_url + if jenkins_build_url.include? 'litmus' + # Very simple filter for Litmus jobs - just count them coming through for the moment. + metrics.increment("usage_litmus.#{user}.#{operation}.#{poolname}") + else + url_parts = jenkins_build_url.split('/')[2..-1] + jenkins_instance = url_parts[0].gsub('.', '_') + value_stream_parts = url_parts[2].split('_') + value_stream_parts = value_stream_parts.map { |s| s.gsub('.', '_') } + value_stream = value_stream_parts.shift + branch = value_stream_parts.pop + project = value_stream_parts.shift + job_name = value_stream_parts.join('_') + build_metadata_parts = url_parts[3] + component_to_test = component_to_test('RMM_COMPONENT_TO_TEST_NAME', build_metadata_parts) + + metrics.increment("usage_jenkins_instance.#{jenkins_instance}.#{value_stream}.#{operation}.#{poolname}") + metrics.increment("usage_branch_project.#{branch}.#{project}.#{operation}.#{poolname}") + metrics.increment("usage_job_component.#{job_name}.#{component_to_test}.#{operation}.#{poolname}") + end + end + rescue StandardError => e + puts 'd', "[!] [#{poolname}] failed while evaluating usage labels on '#{vmname}' with an error: #{e}" + span.record_exception(e) + span.status = OpenTelemetry::Trace::Status.error(e.to_s) + span.add_event('log', attributes: { + 'log.severity' => 'debug', + 'log.message' => "[#{poolname}] failed while evaluating usage labels on '#{vmname}' with an error: #{e}" + }) end - - url_parts = jenkins_build_url.split('/')[2..-1] - jenkins_instance = url_parts[0].gsub('.', '_') - value_stream_parts = url_parts[2].split('_') - value_stream_parts = value_stream_parts.map { |s| s.gsub('.', '_') } - value_stream = value_stream_parts.shift - branch = value_stream_parts.pop - project = value_stream_parts.shift - job_name = value_stream_parts.join('_') - build_metadata_parts = url_parts[3] - component_to_test = component_to_test('RMM_COMPONENT_TO_TEST_NAME', build_metadata_parts) - - metrics.increment("usage_jenkins_instance.#{jenkins_instance}.#{value_stream}.#{operation}.#{poolname}") - metrics.increment("usage_branch_project.#{branch}.#{project}.#{operation}.#{poolname}") - metrics.increment("usage_job_component.#{job_name}.#{component_to_test}.#{operation}.#{poolname}") end - rescue StandardError => e - puts 'd', "[!] [#{poolname}] failed while evaluating usage labels on '#{vmname}' with an error: #{e}" end def reset_pool_size(poolname) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = { 'ok' => false } - pool_index = pool_index(pools) + pool_index = pool_index(pools) - pools_updated = 0 - sync_pool_sizes + pools_updated = 0 + sync_pool_sizes - pool_size_now = pools[pool_index[poolname]]['size'].to_i - pool_size_original = pools_at_startup[pool_index[poolname]]['size'].to_i - result['pool_size_before_reset'] = pool_size_now - result['pool_size_before_overrides'] = pool_size_original + pool_size_now = pools[pool_index[poolname]]['size'].to_i + pool_size_original = pools_at_startup[pool_index[poolname]]['size'].to_i + result['pool_size_before_reset'] = pool_size_now + result['pool_size_before_overrides'] = pool_size_original - unless pool_size_now == pool_size_original - pools[pool_index[poolname]]['size'] = pool_size_original - backend.hdel('vmpooler__config__poolsize', poolname) - backend.sadd('vmpooler__pool__undo_size_override', poolname) - pools_updated += 1 - status 201 + unless pool_size_now == pool_size_original + pools[pool_index[poolname]]['size'] = pool_size_original + backend.hdel('vmpooler__config__poolsize', poolname) + backend.sadd('vmpooler__pool__undo_size_override', poolname) + pools_updated += 1 + status 201 + end + + status 200 unless pools_updated > 0 + result['ok'] = true + result end - - status 200 unless pools_updated > 0 - result['ok'] = true - result end def update_pool_size(payload) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = { 'ok' => false } - pool_index = pool_index(pools) - pools_updated = 0 - sync_pool_sizes + pool_index = pool_index(pools) + pools_updated = 0 + sync_pool_sizes - payload.each do |poolname, size| - unless pools[pool_index[poolname]]['size'] == size.to_i - pools[pool_index[poolname]]['size'] = size.to_i - backend.hset('vmpooler__config__poolsize', poolname, size) - pools_updated += 1 - status 201 + payload.each do |poolname, size| + unless pools[pool_index[poolname]]['size'] == size.to_i + pools[pool_index[poolname]]['size'] = size.to_i + backend.hset('vmpooler__config__poolsize', poolname, size) + pools_updated += 1 + status 201 + end end + status 200 unless pools_updated > 0 + result['ok'] = true + result end - status 200 unless pools_updated > 0 - result['ok'] = true - result end def reset_pool_template(poolname) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = { 'ok' => false } - pool_index_live = pool_index(pools) - pool_index_original = pool_index(pools_at_startup) + pool_index_live = pool_index(pools) + pool_index_original = pool_index(pools_at_startup) - pools_updated = 0 - sync_pool_templates + pools_updated = 0 + sync_pool_templates - template_now = pools[pool_index_live[poolname]]['template'] - template_original = pools_at_startup[pool_index_original[poolname]]['template'] - result['template_before_reset'] = template_now - result['template_before_overrides'] = template_original + template_now = pools[pool_index_live[poolname]]['template'] + template_original = pools_at_startup[pool_index_original[poolname]]['template'] + result['template_before_reset'] = template_now + result['template_before_overrides'] = template_original - unless template_now == template_original - pools[pool_index_live[poolname]]['template'] = template_original - backend.hdel('vmpooler__config__template', poolname) - backend.sadd('vmpooler__pool__undo_template_override', poolname) - pools_updated += 1 - status 201 + unless template_now == template_original + pools[pool_index_live[poolname]]['template'] = template_original + backend.hdel('vmpooler__config__template', poolname) + backend.sadd('vmpooler__pool__undo_template_override', poolname) + pools_updated += 1 + status 201 + end + + status 200 unless pools_updated > 0 + result['ok'] = true + result end - - status 200 unless pools_updated > 0 - result['ok'] = true - result end def update_pool_template(payload) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = { 'ok' => false } - pool_index = pool_index(pools) - pools_updated = 0 - sync_pool_templates + pool_index = pool_index(pools) + pools_updated = 0 + sync_pool_templates - payload.each do |poolname, template| - unless pools[pool_index[poolname]]['template'] == template - pools[pool_index[poolname]]['template'] = template - backend.hset('vmpooler__config__template', poolname, template) - pools_updated += 1 - status 201 + payload.each do |poolname, template| + unless pools[pool_index[poolname]]['template'] == template + pools[pool_index[poolname]]['template'] = template + backend.hset('vmpooler__config__template', poolname, template) + pools_updated += 1 + status 201 + end end + status 200 unless pools_updated > 0 + result['ok'] = true + result end - status 200 unless pools_updated > 0 - result['ok'] = true - result end def reset_pool(payload) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = { 'ok' => false } - payload.each do |poolname, _count| - backend.sadd('vmpooler__poolreset', poolname) + payload.each do |poolname, _count| + backend.sadd('vmpooler__poolreset', poolname) + end + status 201 + result['ok'] = true + result end - status 201 - result['ok'] = true - result end def update_clone_target(payload) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + result = { 'ok' => false } - pool_index = pool_index(pools) - pools_updated = 0 - sync_clone_targets + pool_index = pool_index(pools) + pools_updated = 0 + sync_clone_targets - payload.each do |poolname, clone_target| - unless pools[pool_index[poolname]]['clone_target'] == clone_target - pools[pool_index[poolname]]['clone_target'] = clone_target - backend.hset('vmpooler__config__clone_target', poolname, clone_target) - pools_updated += 1 - status 201 + payload.each do |poolname, clone_target| + unless pools[pool_index[poolname]]['clone_target'] == clone_target + pools[pool_index[poolname]]['clone_target'] = clone_target + backend.hset('vmpooler__config__clone_target', poolname, clone_target) + pools_updated += 1 + status 201 + end end + status 200 unless pools_updated > 0 + result['ok'] = true + result end - status 200 unless pools_updated > 0 - result['ok'] = true - result end def sync_pool_templates - pool_index = pool_index(pools) - template_configs = backend.hgetall('vmpooler__config__template') - template_configs&.each do |poolname, template| - next unless pool_index.include? poolname + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + pool_index = pool_index(pools) + template_configs = backend.hgetall('vmpooler__config__template') + template_configs&.each do |poolname, template| + next unless pool_index.include? poolname - pools[pool_index[poolname]]['template'] = template + pools[pool_index[poolname]]['template'] = template + end end end def sync_pool_sizes - pool_index = pool_index(pools) - poolsize_configs = backend.hgetall('vmpooler__config__poolsize') - poolsize_configs&.each do |poolname, size| - next unless pool_index.include? poolname + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + pool_index = pool_index(pools) + poolsize_configs = backend.hgetall('vmpooler__config__poolsize') + poolsize_configs&.each do |poolname, size| + next unless pool_index.include? poolname - pools[pool_index[poolname]]['size'] = size.to_i + pools[pool_index[poolname]]['size'] = size.to_i + end end end def sync_clone_targets - pool_index = pool_index(pools) - clone_target_configs = backend.hgetall('vmpooler__config__clone_target') - clone_target_configs&.each do |poolname, clone_target| - next unless pool_index.include? poolname + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + pool_index = pool_index(pools) + clone_target_configs = backend.hgetall('vmpooler__config__clone_target') + clone_target_configs&.each do |poolname, clone_target| + next unless pool_index.include? poolname - pools[pool_index[poolname]]['clone_target'] = clone_target + pools[pool_index[poolname]]['clone_target'] = clone_target + end end end def too_many_requested?(payload) - payload&.each do |poolname, count| - next unless count.to_i > config['max_ondemand_instances_per_request'] + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + payload&.each do |poolname, count| + next unless count.to_i > config['max_ondemand_instances_per_request'] - metrics.increment("ondemandrequest_fail.toomanyrequests.#{poolname}") - return true + metrics.increment("ondemandrequest_fail.toomanyrequests.#{poolname}") + return true + end + false end - false end def generate_ondemand_request(payload) - result = { 'ok': false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do |span| + result = { 'ok': false } - requested_instances = payload.reject { |k, _v| k == 'request_id' } - if too_many_requested?(requested_instances) - result['message'] = "requested amount of instances exceeds the maximum #{config['max_ondemand_instances_per_request']}" - status 403 - return result + requested_instances = payload.reject { |k, _v| k == 'request_id' } + if too_many_requested?(requested_instances) + e_message = "requested amount of instances exceeds the maximum #{config['max_ondemand_instances_per_request']}" + result['message'] = e_message + status 403 + span.add_event('error', attributes: { + 'error.type' => 'Vmpooler::API::V1.generate_ondemand_request', + 'error.message' => "403 due to #{e_message}" + }) + return result + end + + score = Time.now.to_i + request_id = payload['request_id'] + request_id ||= generate_request_id + result['request_id'] = request_id + span.set_attribute('vmpooler.request_id', request_id) + + if backend.exists?("vmpooler__odrequest__#{request_id}") + e_message = "request_id '#{request_id}' has already been created" + result['message'] = e_message + status 409 + span.add_event('error', attributes: { + 'error.type' => 'Vmpooler::API::V1.generate_ondemand_request', + 'error.message' => "409 due to #{e_message}" + }) + metrics.increment('ondemandrequest_generate.duplicaterequests') + return result + end + + status 201 + + platforms_with_aliases = [] + requested_instances.each do |poolname, count| + selection = evaluate_template_aliases(poolname, count) + selection.map { |selected_pool, selected_pool_count| platforms_with_aliases << "#{poolname}:#{selected_pool}:#{selected_pool_count}" } + end + platforms_string = platforms_with_aliases.join(',') + + return result unless backend.zadd('vmpooler__provisioning__request', score, request_id) + + backend.hset("vmpooler__odrequest__#{request_id}", 'requested', platforms_string) + if Vmpooler::API.settings.config[:auth] and has_token? + token_token = request.env['HTTP_X_AUTH_TOKEN'] + token_user = backend.hget("vmpooler__token__#{token_token}", 'user') + backend.hset("vmpooler__odrequest__#{request_id}", 'token:token', token_token) + backend.hset("vmpooler__odrequest__#{request_id}", 'token:user', token_user) + span.set_attribute('enduser.id', token_user) + end + + result['domain'] = config['domain'] if config['domain'] + result[:ok] = true + metrics.increment('ondemandrequest_generate.success') + result end - - score = Time.now.to_i - request_id = payload['request_id'] - request_id ||= generate_request_id - result['request_id'] = request_id - - if backend.exists?("vmpooler__odrequest__#{request_id}") - result['message'] = "request_id '#{request_id}' has already been created" - status 409 - metrics.increment('ondemandrequest_generate.duplicaterequests') - return result - end - - status 201 - - platforms_with_aliases = [] - requested_instances.each do |poolname, count| - selection = evaluate_template_aliases(poolname, count) - selection.map { |selected_pool, selected_pool_count| platforms_with_aliases << "#{poolname}:#{selected_pool}:#{selected_pool_count}" } - end - platforms_string = platforms_with_aliases.join(',') - - return result unless backend.zadd('vmpooler__provisioning__request', score, request_id) - - backend.hset("vmpooler__odrequest__#{request_id}", 'requested', platforms_string) - if Vmpooler::API.settings.config[:auth] and has_token? - backend.hset("vmpooler__odrequest__#{request_id}", 'token:token', request.env['HTTP_X_AUTH_TOKEN']) - backend.hset("vmpooler__odrequest__#{request_id}", 'token:user', - backend.hget("vmpooler__token__#{request.env['HTTP_X_AUTH_TOKEN']}", 'user')) - end - - result['domain'] = config['domain'] if config['domain'] - result[:ok] = true - metrics.increment('ondemandrequest_generate.success') - result end def generate_request_id @@ -813,6 +880,8 @@ module Vmpooler data = backend.hgetall(key) if data['user'] == Rack::Auth::Basic::Request.new(request.env).username + span = OpenTelemetry::Trace.current_span + span.set_attribute('enduser.id', data['user']) token = key.split('__').last result[token] ||= {} @@ -899,6 +968,8 @@ module Vmpooler backend.hset("vmpooler__token__#{result['token']}", 'user', @auth.username) backend.hset("vmpooler__token__#{result['token']}", 'created', Time.now) + span = OpenTelemetry::Trace.current_span + span.set_attribute('enduser.id', @auth.username) status 200 result['ok'] = true @@ -946,6 +1017,8 @@ module Vmpooler status 404 end rescue JSON::ParserError + span = OpenTelemetry::Trace.current_span + span.status = OpenTelemetry::Trace::Status.error('JSON payload could not be parsed') status 400 result = { 'ok' => false, @@ -1031,134 +1104,160 @@ module Vmpooler end def extract_templates_from_query_params(params) - payload = {} + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + payload = {} - params.split('+').each do |template| - payload[template] ||= 0 - payload[template] += 1 + params.split('+').each do |template| + payload[template] ||= 0 + payload[template] += 1 + end + + payload end - - payload end def invalid_templates(payload) - invalid = [] - payload.keys.each do |template| - invalid << template unless pool_exists?(template) + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + invalid = [] + payload.keys.each do |template| + invalid << template unless pool_exists?(template) + end + invalid end - invalid end def invalid_template_or_size(payload) - invalid = [] - payload.each do |pool, size| - invalid << pool unless pool_exists?(pool) - unless is_integer?(size) - invalid << pool - next + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + invalid = [] + payload.each do |pool, size| + invalid << pool unless pool_exists?(pool) + unless is_integer?(size) + invalid << pool + next + end + invalid << pool unless Integer(size) >= 0 end - invalid << pool unless Integer(size) >= 0 + invalid end - invalid end def invalid_template_or_path(payload) - invalid = [] - payload.each do |pool, template| - invalid << pool unless pool_exists?(pool) - invalid << pool unless template.include? '/' - invalid << pool if template[0] == '/' - invalid << pool if template[-1] == '/' + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + invalid = [] + payload.each do |pool, template| + invalid << pool unless pool_exists?(pool) + invalid << pool unless template.include? '/' + invalid << pool if template[0] == '/' + invalid << pool if template[-1] == '/' + end + invalid end - invalid end def invalid_pool(payload) - invalid = [] - payload.each do |pool, _clone_target| - invalid << pool unless pool_exists?(pool) + tracer.in_span("Vmpooler::API::V1.#{__method__}") do + invalid = [] + payload.each do |pool, _clone_target| + invalid << pool unless pool_exists?(pool) + end + invalid end - invalid end def check_ondemand_request(request_id) - result = { 'ok' => false } - request_hash = backend.hgetall("vmpooler__odrequest__#{request_id}") - if request_hash.empty? - result['message'] = "no request found for request_id '#{request_id}'" - return result - end + tracer.in_span("Vmpooler::API::V1.#{__method__}") do |span| + span.set_attribute('vmpooler.request_id', request_id) + result = { 'ok' => false } + request_hash = backend.hgetall("vmpooler__odrequest__#{request_id}") + if request_hash.empty? + e_message = "no request found for request_id '#{request_id}'" + result['message'] = e_message + span.add_event('error', attributes: { + 'error.type' => 'Vmpooler::API::V1.check_ondemand_request', + 'error.message' => e_message + }) + return result + end - result['request_id'] = request_id - result['ready'] = false - result['ok'] = true - status 202 + result['request_id'] = request_id + result['ready'] = false + result['ok'] = true + status 202 - case request_hash['status'] - when 'ready' - result['ready'] = true - Parsing.get_platform_pool_count(request_hash['requested']) do |platform_alias, pool, _count| - instances = backend.smembers("vmpooler__#{request_id}__#{platform_alias}__#{pool}") + case request_hash['status'] + when 'ready' + result['ready'] = true + Parsing.get_platform_pool_count(request_hash['requested']) do |platform_alias, pool, _count| + instances = backend.smembers("vmpooler__#{request_id}__#{platform_alias}__#{pool}") - if result.key?(platform_alias) - result[platform_alias][:hostname] = result[platform_alias][:hostname] + instances - else - result[platform_alias] = { 'hostname': instances } + if result.key?(platform_alias) + result[platform_alias][:hostname] = result[platform_alias][:hostname] + instances + else + result[platform_alias] = { 'hostname': instances } + end + end + result['domain'] = config['domain'] if config['domain'] + status 200 + when 'failed' + result['message'] = "The request failed to provision instances within the configured ondemand_request_ttl '#{config['ondemand_request_ttl']}'" + status 200 + when 'deleted' + result['message'] = 'The request has been deleted' + status 200 + else + Parsing.get_platform_pool_count(request_hash['requested']) do |platform_alias, pool, count| + instance_count = backend.scard("vmpooler__#{request_id}__#{platform_alias}__#{pool}") + instances_pending = count.to_i - instance_count.to_i + + if result.key?(platform_alias) && result[platform_alias].key?(:ready) + result[platform_alias][:ready] = (result[platform_alias][:ready].to_i + instance_count).to_s + result[platform_alias][:pending] = (result[platform_alias][:pending].to_i + instances_pending).to_s + else + result[platform_alias] = { + 'ready': instance_count.to_s, + 'pending': instances_pending.to_s + } + end end end - result['domain'] = config['domain'] if config['domain'] - status 200 - when 'failed' - result['message'] = "The request failed to provision instances within the configured ondemand_request_ttl '#{config['ondemand_request_ttl']}'" - status 200 - when 'deleted' - result['message'] = 'The request has been deleted' - status 200 - else - Parsing.get_platform_pool_count(request_hash['requested']) do |platform_alias, pool, count| - instance_count = backend.scard("vmpooler__#{request_id}__#{platform_alias}__#{pool}") - instances_pending = count.to_i - instance_count.to_i - if result.key?(platform_alias) && result[platform_alias].key?(:ready) - result[platform_alias][:ready] = (result[platform_alias][:ready].to_i + instance_count).to_s - result[platform_alias][:pending] = (result[platform_alias][:pending].to_i + instances_pending).to_s - else - result[platform_alias] = { - 'ready': instance_count.to_s, - 'pending': instances_pending.to_s - } - end - end + result end - - result end def delete_ondemand_request(request_id) - result = { 'ok' => false } + tracer.in_span("Vmpooler::API::V1.#{__method__}") do |span| + span.set_attribute('vmpooler.request_id', request_id) + result = { 'ok' => false } - platforms = backend.hget("vmpooler__odrequest__#{request_id}", 'requested') - unless platforms - result['message'] = "no request found for request_id '#{request_id}'" - return result - end - - if backend.hget("vmpooler__odrequest__#{request_id}", 'status') == 'deleted' - result['message'] = 'the request has already been deleted' - else - backend.hset("vmpooler__odrequest__#{request_id}", 'status', 'deleted') - - Parsing.get_platform_pool_count(platforms) do |platform_alias, pool, _count| - backend.smembers("vmpooler__#{request_id}__#{platform_alias}__#{pool}")&.each do |vm| - backend.smove("vmpooler__running__#{pool}", "vmpooler__completed__#{pool}", vm) - end - backend.del("vmpooler__#{request_id}__#{platform_alias}__#{pool}") + platforms = backend.hget("vmpooler__odrequest__#{request_id}", 'requested') + unless platforms + e_message = "no request found for request_id '#{request_id}'" + result['message'] = e_message + span.add_event('error', attributes: { + 'error.type' => 'Vmpooler::API::V1.delete_ondemand_request', + 'error.message' => e_message + }) + return result end - backend.expire("vmpooler__odrequest__#{request_id}", 129_600_0) + + if backend.hget("vmpooler__odrequest__#{request_id}", 'status') == 'deleted' + result['message'] = 'the request has already been deleted' + else + backend.hset("vmpooler__odrequest__#{request_id}", 'status', 'deleted') + + Parsing.get_platform_pool_count(platforms) do |platform_alias, pool, _count| + backend.smembers("vmpooler__#{request_id}__#{platform_alias}__#{pool}")&.each do |vm| + backend.smove("vmpooler__running__#{pool}", "vmpooler__completed__#{pool}", vm) + end + backend.del("vmpooler__#{request_id}__#{platform_alias}__#{pool}") + end + backend.expire("vmpooler__odrequest__#{request_id}", 129_600_0) + end + status 200 + result['ok'] = true + result end - status 200 - result['ok'] = true - result end post "#{api_prefix}/vm/:template/?" do @@ -1303,7 +1402,10 @@ module Vmpooler if backend.exists?("vmpooler__vm__#{params[:hostname]}") begin jdata = JSON.parse(request.body.read) - rescue StandardError + rescue StandardError => e + span = OpenTelemetry::Trace.current_span + span.record_exception(e) + span.status = OpenTelemetry::Trace::Status.error(e.to_s) halt 400, JSON.pretty_generate(result) end @@ -1559,6 +1661,9 @@ module Vmpooler status 404 end rescue JSON::ParserError + span = OpenTelemetry::Trace.current_span + span.record_exception(e) + span.status = OpenTelemetry::Trace::Status.error('JSON payload could not be parsed') status 400 result = { 'ok' => false,