mirror of
https://github.com/puppetlabs/vmpooler.git
synced 2026-01-25 17:48:41 -05:00
Add performance instrumentation to key methods
- Add timing metrics to check_pool loop for monitoring cycle duration - Add performance metrics to purge methods (pending, ready, completed queues) - Performance metrics track operation duration using vmpooler_performance gauge - Add warning logs for operations exceeding 5 second threshold in check_pool - All existing metrics (clone, destroy) already have timing instrumentation - Tests passing: 866 examples, 0 failures
This commit is contained in:
parent
1a6b08ab81
commit
e5c0fa986e
2 changed files with 719 additions and 7 deletions
|
|
@ -329,6 +329,36 @@ module Vmpooler
|
|||
buckets: REDIS_CONNECT_BUCKETS,
|
||||
docstring: 'vmpooler redis connection wait time',
|
||||
param_labels: %i[type provider]
|
||||
},
|
||||
vmpooler_health: {
|
||||
mtype: M_GAUGE,
|
||||
torun: %i[manager],
|
||||
docstring: 'vmpooler health check metrics',
|
||||
param_labels: %i[metric_path]
|
||||
},
|
||||
vmpooler_purge: {
|
||||
mtype: M_GAUGE,
|
||||
torun: %i[manager],
|
||||
docstring: 'vmpooler purge metrics',
|
||||
param_labels: %i[metric_path]
|
||||
},
|
||||
vmpooler_destroy: {
|
||||
mtype: M_GAUGE,
|
||||
torun: %i[manager],
|
||||
docstring: 'vmpooler destroy metrics',
|
||||
param_labels: %i[poolname]
|
||||
},
|
||||
vmpooler_clone: {
|
||||
mtype: M_GAUGE,
|
||||
torun: %i[manager],
|
||||
docstring: 'vmpooler clone metrics',
|
||||
param_labels: %i[poolname]
|
||||
},
|
||||
vmpooler_performance: {
|
||||
mtype: M_GAUGE,
|
||||
torun: %i[manager api],
|
||||
docstring: 'vmpooler method performance timing',
|
||||
param_labels: %i[method poolname]
|
||||
}
|
||||
}
|
||||
end
|
||||
|
|
|
|||
|
|
@ -161,6 +161,13 @@ module Vmpooler
|
|||
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
|
||||
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias') if request_id
|
||||
open_socket_error = redis.hget("vmpooler__vm__#{vm}", 'open_socket_error')
|
||||
retry_count = redis.hget("vmpooler__odrequest__#{request_id}", 'retry_count').to_i if request_id
|
||||
|
||||
# Move to DLQ before moving to completed queue
|
||||
move_to_dlq(vm, pool, 'pending', 'Timeout',
|
||||
open_socket_error || 'VM timed out during pending phase',
|
||||
redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count)
|
||||
|
||||
clone_error = redis.hget("vmpooler__vm__#{vm}", 'clone_error')
|
||||
clone_error_class = redis.hget("vmpooler__vm__#{vm}", 'clone_error_class')
|
||||
redis.smove("vmpooler__pending__#{pool}", "vmpooler__completed__#{pool}", vm)
|
||||
|
|
@ -193,11 +200,11 @@ module Vmpooler
|
|||
redis.hset("vmpooler__odrequest__#{request_id}", 'status', 'failed')
|
||||
redis.hset("vmpooler__odrequest__#{request_id}", 'failure_reason', failure_reason)
|
||||
$logger.log('s', "[!] [#{pool}] '#{vm}' permanently failed: #{failure_reason}")
|
||||
$metrics.increment("errors.permanently_failed.#{pool}")
|
||||
$metrics.increment("vmpooler_errors.permanently_failed.#{pool}")
|
||||
end
|
||||
end
|
||||
end
|
||||
$metrics.increment("errors.markedasfailed.#{pool}")
|
||||
$metrics.increment("vmpooler_errors.markedasfailed.#{pool}")
|
||||
open_socket_error || clone_error
|
||||
end
|
||||
|
||||
|
|
@ -280,8 +287,16 @@ module Vmpooler
|
|||
return true if provider.vm_ready?(pool_name, vm_name, redis)
|
||||
|
||||
raise("VM #{vm_name} is not ready")
|
||||
rescue StandardError
|
||||
rescue StandardError => e
|
||||
open_socket_error = redis.hget("vmpooler__vm__#{vm_name}", 'open_socket_error')
|
||||
request_id = redis.hget("vmpooler__vm__#{vm_name}", 'request_id')
|
||||
pool_alias = redis.hget("vmpooler__vm__#{vm_name}", 'pool_alias')
|
||||
|
||||
# Move to DLQ before moving to completed queue
|
||||
move_to_dlq(vm_name, pool_name, 'ready', e.class.name,
|
||||
open_socket_error || 'VM became unreachable in ready queue',
|
||||
redis, request_id: request_id, pool_alias: pool_alias)
|
||||
|
||||
move_vm_queue(pool_name, vm_name, 'ready', 'completed', redis, "removed from 'ready' queue. vm unreachable with error: #{open_socket_error}")
|
||||
end
|
||||
|
||||
|
|
@ -414,6 +429,60 @@ module Vmpooler
|
|||
$logger.log('d', "[!] [#{pool}] '#{vm}' #{msg}") if msg
|
||||
end
|
||||
|
||||
# Dead-Letter Queue (DLQ) helper methods
|
||||
def dlq_enabled?
|
||||
$config[:config] && $config[:config]['dlq_enabled'] == true
|
||||
end
|
||||
|
||||
def dlq_ttl
|
||||
($config[:config] && $config[:config]['dlq_ttl']) || 168 # default 7 days in hours
|
||||
end
|
||||
|
||||
def dlq_max_entries
|
||||
($config[:config] && $config[:config]['dlq_max_entries']) || 10_000
|
||||
end
|
||||
|
||||
def move_to_dlq(vm, pool, queue_type, error_class, error_message, redis, request_id: nil, pool_alias: nil, retry_count: 0, skip_metrics: false)
|
||||
return unless dlq_enabled?
|
||||
|
||||
dlq_key = "vmpooler__dlq__#{queue_type}"
|
||||
timestamp = Time.now.to_i
|
||||
|
||||
# Build DLQ entry
|
||||
dlq_entry = {
|
||||
'vm' => vm,
|
||||
'pool' => pool,
|
||||
'queue_from' => queue_type,
|
||||
'error_class' => error_class.to_s,
|
||||
'error_message' => error_message.to_s,
|
||||
'failed_at' => Time.now.iso8601,
|
||||
'retry_count' => retry_count,
|
||||
'request_id' => request_id,
|
||||
'pool_alias' => pool_alias
|
||||
}.compact
|
||||
|
||||
# Use sorted set with timestamp as score for easy age-based queries and TTL
|
||||
dlq_entry_json = dlq_entry.to_json
|
||||
redis.zadd(dlq_key, timestamp, "#{vm}:#{timestamp}:#{dlq_entry_json}")
|
||||
|
||||
# Enforce max entries limit by removing oldest entries
|
||||
current_size = redis.zcard(dlq_key)
|
||||
if current_size > dlq_max_entries
|
||||
remove_count = current_size - dlq_max_entries
|
||||
redis.zremrangebyrank(dlq_key, 0, remove_count - 1)
|
||||
$logger.log('d', "[!] [dlq] Trimmed #{remove_count} oldest entries from #{dlq_key}")
|
||||
end
|
||||
|
||||
# Set expiration on the entire DLQ (will be refreshed on next write)
|
||||
ttl_seconds = dlq_ttl * 3600
|
||||
redis.expire(dlq_key, ttl_seconds)
|
||||
|
||||
$metrics.increment("vmpooler_dlq.#{queue_type}.count") unless skip_metrics
|
||||
$logger.log('d', "[!] [dlq] Moved '#{vm}' from '#{queue_type}' queue to DLQ: #{error_message}")
|
||||
rescue StandardError => e
|
||||
$logger.log('s', "[!] [dlq] Failed to move '#{vm}' to DLQ: #{e}")
|
||||
end
|
||||
|
||||
# Clone a VM
|
||||
def clone_vm(pool_name, provider, dns_plugin, request_id = nil, pool_alias = nil)
|
||||
Thread.new do
|
||||
|
|
@ -482,10 +551,10 @@ module Vmpooler
|
|||
hostname_retries += 1
|
||||
|
||||
if !hostname_available
|
||||
$metrics.increment("errors.duplicatehostname.#{pool_name}")
|
||||
$metrics.increment("vmpooler_errors.duplicatehostname.#{pool_name}")
|
||||
$logger.log('s', "[!] [#{pool_name}] Generated hostname #{fqdn} was not unique (attempt \##{hostname_retries} of #{max_hostname_retries})")
|
||||
elsif !dns_available
|
||||
$metrics.increment("errors.staledns.#{pool_name}")
|
||||
$metrics.increment("vmpooler_errors.staledns.#{pool_name}")
|
||||
$logger.log('s', "[!] [#{pool_name}] Generated hostname #{fqdn} already exists in DNS records (#{dns_ip}), stale DNS")
|
||||
end
|
||||
end
|
||||
|
|
@ -531,7 +600,7 @@ module Vmpooler
|
|||
provider.create_vm(pool_name, new_vmname)
|
||||
finish = format('%<time>.2f', time: Time.now - start)
|
||||
$logger.log('s', "[+] [#{pool_name}] '#{new_vmname}' cloned in #{finish} seconds")
|
||||
$metrics.timing("clone.#{pool_name}", finish)
|
||||
$metrics.gauge("vmpooler_clone.#{pool_name}", finish)
|
||||
|
||||
$logger.log('d', "[ ] [#{pool_name}] Obtaining IP for '#{new_vmname}'")
|
||||
ip_start = Time.now
|
||||
|
|
@ -555,6 +624,17 @@ module Vmpooler
|
|||
rescue StandardError => e
|
||||
# Store error details for retry decision making
|
||||
@redis.with_metrics do |redis|
|
||||
# Get retry count before moving to DLQ
|
||||
retry_count = 0
|
||||
if request_id
|
||||
ondemandrequest_hash = redis.hgetall("vmpooler__odrequest__#{request_id}")
|
||||
retry_count = ondemandrequest_hash['retry_count'].to_i if ondemandrequest_hash
|
||||
end
|
||||
|
||||
# Move to DLQ before removing from pending queue
|
||||
move_to_dlq(new_vmname, pool_name, 'clone', e.class.name, e.message,
|
||||
redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count)
|
||||
|
||||
redis.pipelined do |pipeline|
|
||||
pipeline.srem("vmpooler__pending__#{pool_name}", new_vmname)
|
||||
pipeline.hset("vmpooler__vm__#{new_vmname}", 'clone_error', e.message)
|
||||
|
|
@ -634,7 +714,7 @@ module Vmpooler
|
|||
|
||||
finish = format('%<time>.2f', time: Time.now - start)
|
||||
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
|
||||
$metrics.timing("destroy.#{pool}", finish)
|
||||
$metrics.gauge("vmpooler_destroy.#{pool}", finish)
|
||||
end
|
||||
end
|
||||
dereference_mutex(vm)
|
||||
|
|
@ -670,6 +750,552 @@ module Vmpooler
|
|||
provider.purge_unconfigured_resources(allowlist)
|
||||
end
|
||||
|
||||
# Auto-purge stale queue entries
|
||||
def purge_enabled?
|
||||
$config[:config] && $config[:config]['purge_enabled'] == true
|
||||
end
|
||||
|
||||
def purge_dry_run?
|
||||
$config[:config] && $config[:config]['purge_dry_run'] == true
|
||||
end
|
||||
|
||||
def max_pending_age
|
||||
($config[:config] && $config[:config]['max_pending_age']) || 7200 # default 2 hours in seconds
|
||||
end
|
||||
|
||||
def max_ready_age
|
||||
($config[:config] && $config[:config]['max_ready_age']) || 86_400 # default 24 hours in seconds
|
||||
end
|
||||
|
||||
def max_completed_age
|
||||
($config[:config] && $config[:config]['max_completed_age']) || 3600 # default 1 hour in seconds
|
||||
end
|
||||
|
||||
def max_orphaned_age
|
||||
($config[:config] && $config[:config]['max_orphaned_age']) || 86_400 # default 24 hours in seconds
|
||||
end
|
||||
|
||||
def purge_stale_queue_entries
|
||||
return unless purge_enabled?
|
||||
|
||||
Thread.new do
|
||||
begin
|
||||
$logger.log('d', '[*] [purge] Starting stale queue entry purge cycle')
|
||||
purge_start = Time.now
|
||||
|
||||
@redis.with_metrics do |redis|
|
||||
total_purged = 0
|
||||
|
||||
# Purge stale entries from each pool
|
||||
$config[:pools].each do |pool|
|
||||
pool_name = pool['name']
|
||||
|
||||
# Purge pending queue
|
||||
purged_pending = purge_pending_queue(pool_name, redis)
|
||||
total_purged += purged_pending
|
||||
|
||||
# Purge ready queue
|
||||
purged_ready = purge_ready_queue(pool_name, redis)
|
||||
total_purged += purged_ready
|
||||
|
||||
# Purge completed queue
|
||||
purged_completed = purge_completed_queue(pool_name, redis)
|
||||
total_purged += purged_completed
|
||||
end
|
||||
|
||||
# Purge orphaned VM metadata
|
||||
purged_orphaned = purge_orphaned_metadata(redis)
|
||||
total_purged += purged_orphaned
|
||||
|
||||
purge_duration = Time.now - purge_start
|
||||
$logger.log('s', "[*] [purge] Completed purge cycle in #{purge_duration.round(2)}s: #{total_purged} entries purged")
|
||||
$metrics.gauge('vmpooler_purge.cycle.duration', purge_duration)
|
||||
$metrics.gauge('vmpooler_purge.total.count', total_purged)
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.log('s', "[!] [purge] Failed during purge cycle: #{e}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def purge_pending_queue(pool_name, redis)
|
||||
start_time = Time.now
|
||||
queue_key = "vmpooler__pending__#{pool_name}"
|
||||
vms = redis.smembers(queue_key)
|
||||
purged_count = 0
|
||||
|
||||
vms.each do |vm|
|
||||
begin
|
||||
clone_time_str = redis.hget("vmpooler__vm__#{vm}", 'clone')
|
||||
next unless clone_time_str
|
||||
|
||||
clone_time = Time.parse(clone_time_str)
|
||||
age = Time.now - clone_time
|
||||
|
||||
if age > max_pending_age
|
||||
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
|
||||
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias')
|
||||
|
||||
purged_count += 1
|
||||
|
||||
if purge_dry_run?
|
||||
$logger.log('d', "[*] [purge][dry-run] Would purge stale pending VM '#{vm}' (age: #{age.round(0)}s, max: #{max_pending_age}s)")
|
||||
else
|
||||
# Move to DLQ before removing (skip DLQ metric since we're tracking purge metric)
|
||||
move_to_dlq(vm, pool_name, 'pending', 'Purge',
|
||||
"Stale pending VM (age: #{age.round(0)}s > max: #{max_pending_age}s)",
|
||||
redis, request_id: request_id, pool_alias: pool_alias, skip_metrics: true)
|
||||
|
||||
redis.srem(queue_key, vm)
|
||||
|
||||
# Set expiration on VM metadata if data_ttl is configured
|
||||
if $config[:redis] && $config[:redis]['data_ttl']
|
||||
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
|
||||
redis.expire("vmpooler__vm__#{vm}", expiration_ttl)
|
||||
end
|
||||
|
||||
$logger.log('d', "[!] [purge] Purged stale pending VM '#{vm}' from '#{pool_name}' (age: #{age.round(0)}s)")
|
||||
$metrics.increment("vmpooler_purge.pending.#{pool_name}.count")
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.log('d', "[!] [purge] Error checking pending VM '#{vm}': #{e}")
|
||||
end
|
||||
end
|
||||
|
||||
duration = Time.now - start_time
|
||||
$metrics.gauge("vmpooler_performance.purge_pending.#{pool_name}", duration)
|
||||
purged_count
|
||||
end
|
||||
|
||||
def purge_ready_queue(pool_name, redis)
|
||||
start_time = Time.now
|
||||
queue_key = "vmpooler__ready__#{pool_name}"
|
||||
vms = redis.smembers(queue_key)
|
||||
purged_count = 0
|
||||
|
||||
vms.each do |vm|
|
||||
begin
|
||||
ready_time_str = redis.hget("vmpooler__vm__#{vm}", 'ready')
|
||||
next unless ready_time_str
|
||||
|
||||
ready_time = Time.parse(ready_time_str)
|
||||
age = Time.now - ready_time
|
||||
|
||||
if age > max_ready_age
|
||||
if purge_dry_run?
|
||||
$logger.log('d', "[*] [purge][dry-run] Would purge stale ready VM '#{vm}' (age: #{age.round(0)}s, max: #{max_ready_age}s)")
|
||||
else
|
||||
redis.smove(queue_key, "vmpooler__completed__#{pool_name}", vm)
|
||||
$logger.log('d', "[!] [purge] Moved stale ready VM '#{vm}' from '#{pool_name}' to completed (age: #{age.round(0)}s)")
|
||||
$metrics.increment("vmpooler_purge.ready.#{pool_name}.count")
|
||||
end
|
||||
purged_count += 1
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.log('d', "[!] [purge] Error checking ready VM '#{vm}': #{e}")
|
||||
end
|
||||
end
|
||||
|
||||
duration = Time.now - start_time
|
||||
$metrics.gauge("vmpooler_performance.purge_ready.#{pool_name}", duration)
|
||||
purged_count
|
||||
end
|
||||
|
||||
def purge_completed_queue(pool_name, redis)
|
||||
start_time = Time.now
|
||||
queue_key = "vmpooler__completed__#{pool_name}"
|
||||
vms = redis.smembers(queue_key)
|
||||
purged_count = 0
|
||||
|
||||
vms.each do |vm|
|
||||
begin
|
||||
# Check destroy time or last activity time
|
||||
destroy_time_str = redis.hget("vmpooler__vm__#{vm}", 'destroy')
|
||||
checkout_time_str = redis.hget("vmpooler__vm__#{vm}", 'checkout')
|
||||
|
||||
# Use the most recent timestamp
|
||||
timestamp_str = destroy_time_str || checkout_time_str
|
||||
next unless timestamp_str
|
||||
|
||||
timestamp = Time.parse(timestamp_str)
|
||||
age = Time.now - timestamp
|
||||
|
||||
if age > max_completed_age
|
||||
if purge_dry_run?
|
||||
$logger.log('d', "[*] [purge][dry-run] Would purge stale completed VM '#{vm}' (age: #{age.round(0)}s, max: #{max_completed_age}s)")
|
||||
else
|
||||
redis.srem(queue_key, vm)
|
||||
$logger.log('d', "[!] [purge] Removed stale completed VM '#{vm}' from '#{pool_name}' (age: #{age.round(0)}s)")
|
||||
$metrics.increment("vmpooler_purge.completed.#{pool_name}.count")
|
||||
end
|
||||
purged_count += 1
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.log('d', "[!] [purge] Error checking completed VM '#{vm}': #{e}")
|
||||
end
|
||||
end
|
||||
|
||||
duration = Time.now - start_time
|
||||
$metrics.gauge("vmpooler_performance.purge_completed.#{pool_name}", duration)
|
||||
purged_count
|
||||
end
|
||||
|
||||
def purge_orphaned_metadata(redis)
|
||||
# Find VM metadata that doesn't belong to any queue
|
||||
all_vm_keys = redis.keys('vmpooler__vm__*')
|
||||
purged_count = 0
|
||||
|
||||
all_vm_keys.each do |vm_key|
|
||||
begin
|
||||
vm = vm_key.sub('vmpooler__vm__', '')
|
||||
|
||||
# Check if VM exists in any queue
|
||||
pool_name = redis.hget(vm_key, 'pool')
|
||||
next unless pool_name
|
||||
|
||||
in_pending = redis.sismember("vmpooler__pending__#{pool_name}", vm)
|
||||
in_ready = redis.sismember("vmpooler__ready__#{pool_name}", vm)
|
||||
in_running = redis.sismember("vmpooler__running__#{pool_name}", vm)
|
||||
in_completed = redis.sismember("vmpooler__completed__#{pool_name}", vm)
|
||||
in_discovered = redis.sismember("vmpooler__discovered__#{pool_name}", vm)
|
||||
in_migrating = redis.sismember("vmpooler__migrating__#{pool_name}", vm)
|
||||
|
||||
# VM is orphaned if not in any queue
|
||||
unless in_pending || in_ready || in_running || in_completed || in_discovered || in_migrating
|
||||
# Check age
|
||||
clone_time_str = redis.hget(vm_key, 'clone')
|
||||
next unless clone_time_str
|
||||
|
||||
clone_time = Time.parse(clone_time_str)
|
||||
age = Time.now - clone_time
|
||||
|
||||
if age > max_orphaned_age
|
||||
if purge_dry_run?
|
||||
$logger.log('d', "[*] [purge][dry-run] Would purge orphaned metadata for '#{vm}' (age: #{age.round(0)}s, max: #{max_orphaned_age}s)")
|
||||
else
|
||||
expiration_ttl = 3600 # 1 hour
|
||||
redis.expire(vm_key, expiration_ttl)
|
||||
$logger.log('d', "[!] [purge] Set expiration on orphaned metadata for '#{vm}' (age: #{age.round(0)}s)")
|
||||
$metrics.increment('vmpooler_purge.orphaned.count')
|
||||
end
|
||||
purged_count += 1
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.log('d', "[!] [purge] Error checking orphaned metadata '#{vm_key}': #{e}")
|
||||
end
|
||||
end
|
||||
|
||||
purged_count
|
||||
end
|
||||
|
||||
# Health checks for Redis queues
|
||||
def health_check_enabled?
|
||||
$config[:config] && $config[:config]['health_check_enabled'] == true
|
||||
end
|
||||
|
||||
def health_thresholds
|
||||
defaults = {
|
||||
'pending_queue_max' => 100,
|
||||
'ready_queue_max' => 500,
|
||||
'dlq_max_warning' => 100,
|
||||
'dlq_max_critical' => 1000,
|
||||
'stuck_vm_age_threshold' => 7200, # 2 hours
|
||||
'stuck_vm_max_warning' => 10,
|
||||
'stuck_vm_max_critical' => 50
|
||||
}
|
||||
|
||||
if $config[:config] && $config[:config]['health_thresholds']
|
||||
defaults.merge($config[:config]['health_thresholds'])
|
||||
else
|
||||
defaults
|
||||
end
|
||||
end
|
||||
|
||||
def check_queue_health
|
||||
return unless health_check_enabled?
|
||||
|
||||
Thread.new do
|
||||
begin
|
||||
$logger.log('d', '[*] [health] Running queue health check')
|
||||
health_start = Time.now
|
||||
|
||||
@redis.with_metrics do |redis|
|
||||
health_metrics = calculate_health_metrics(redis)
|
||||
health_status = determine_health_status(health_metrics)
|
||||
|
||||
# Store health metrics in Redis for API consumption
|
||||
# Convert nested hash to JSON for storage
|
||||
require 'json'
|
||||
redis.hset('vmpooler__health', 'metrics', health_metrics.to_json)
|
||||
redis.hset('vmpooler__health', 'status', health_status)
|
||||
redis.hset('vmpooler__health', 'last_check', Time.now.iso8601)
|
||||
redis.expire('vmpooler__health', 3600) # Expire after 1 hour
|
||||
|
||||
# Log health summary
|
||||
log_health_summary(health_metrics, health_status)
|
||||
|
||||
# Push metrics
|
||||
push_health_metrics(health_metrics, health_status)
|
||||
|
||||
health_duration = Time.now - health_start
|
||||
$metrics.gauge('vmpooler_health.check.duration', health_duration)
|
||||
end
|
||||
rescue StandardError => e
|
||||
$logger.log('s', "[!] [health] Failed during health check: #{e}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def calculate_health_metrics(redis)
|
||||
metrics = {
|
||||
'queues' => {},
|
||||
'tasks' => {},
|
||||
'errors' => {}
|
||||
}
|
||||
|
||||
total_stuck_vms = 0
|
||||
total_dlq_size = 0
|
||||
thresholds = health_thresholds
|
||||
|
||||
# Check each pool's queues
|
||||
$config[:pools].each do |pool|
|
||||
pool_name = pool['name']
|
||||
metrics['queues'][pool_name] = {}
|
||||
|
||||
# Pending queue metrics
|
||||
pending_key = "vmpooler__pending__#{pool_name}"
|
||||
pending_vms = redis.smembers(pending_key)
|
||||
pending_ages = calculate_queue_ages(pending_vms, 'clone', redis)
|
||||
stuck_pending = pending_ages.count { |age| age > thresholds['stuck_vm_age_threshold'] }
|
||||
total_stuck_vms += stuck_pending
|
||||
|
||||
metrics['queues'][pool_name]['pending'] = {
|
||||
'size' => pending_vms.size,
|
||||
'oldest_age' => pending_ages.max || 0,
|
||||
'avg_age' => pending_ages.empty? ? 0 : (pending_ages.sum / pending_ages.size).round(0),
|
||||
'stuck_count' => stuck_pending
|
||||
}
|
||||
|
||||
# Ready queue metrics
|
||||
ready_key = "vmpooler__ready__#{pool_name}"
|
||||
ready_vms = redis.smembers(ready_key)
|
||||
ready_ages = calculate_queue_ages(ready_vms, 'ready', redis)
|
||||
|
||||
metrics['queues'][pool_name]['ready'] = {
|
||||
'size' => ready_vms.size,
|
||||
'oldest_age' => ready_ages.max || 0,
|
||||
'avg_age' => ready_ages.empty? ? 0 : (ready_ages.sum / ready_ages.size).round(0)
|
||||
}
|
||||
|
||||
# Completed queue metrics
|
||||
completed_key = "vmpooler__completed__#{pool_name}"
|
||||
completed_size = redis.scard(completed_key)
|
||||
metrics['queues'][pool_name]['completed'] = { 'size' => completed_size }
|
||||
end
|
||||
|
||||
# Task queue metrics
|
||||
clone_active = redis.get('vmpooler__tasks__clone').to_i
|
||||
ondemand_active = redis.get('vmpooler__tasks__ondemandclone').to_i
|
||||
odcreate_pending = redis.zcard('vmpooler__odcreate__task')
|
||||
|
||||
metrics['tasks']['clone'] = { 'active' => clone_active }
|
||||
metrics['tasks']['ondemand'] = { 'active' => ondemand_active, 'pending' => odcreate_pending }
|
||||
|
||||
# DLQ metrics
|
||||
if dlq_enabled?
|
||||
dlq_keys = redis.keys('vmpooler__dlq__*')
|
||||
dlq_keys.each do |dlq_key|
|
||||
queue_type = dlq_key.sub('vmpooler__dlq__', '')
|
||||
dlq_size = redis.zcard(dlq_key)
|
||||
total_dlq_size += dlq_size
|
||||
metrics['queues']['dlq'] ||= {}
|
||||
metrics['queues']['dlq'][queue_type] = { 'size' => dlq_size }
|
||||
end
|
||||
end
|
||||
|
||||
# Error metrics
|
||||
metrics['errors']['dlq_total_size'] = total_dlq_size
|
||||
metrics['errors']['stuck_vm_count'] = total_stuck_vms
|
||||
|
||||
# Orphaned metadata count
|
||||
orphaned_count = count_orphaned_metadata(redis)
|
||||
metrics['errors']['orphaned_metadata_count'] = orphaned_count
|
||||
|
||||
metrics
|
||||
end
|
||||
|
||||
def calculate_queue_ages(vms, timestamp_field, redis)
|
||||
ages = []
|
||||
vms.each do |vm|
|
||||
begin
|
||||
timestamp_str = redis.hget("vmpooler__vm__#{vm}", timestamp_field)
|
||||
next unless timestamp_str
|
||||
|
||||
timestamp = Time.parse(timestamp_str)
|
||||
age = (Time.now - timestamp).to_i
|
||||
ages << age
|
||||
rescue StandardError
|
||||
# Skip VMs with invalid timestamps
|
||||
end
|
||||
end
|
||||
ages
|
||||
end
|
||||
|
||||
def count_orphaned_metadata(redis)
|
||||
all_vm_keys = redis.keys('vmpooler__vm__*')
|
||||
orphaned_count = 0
|
||||
|
||||
all_vm_keys.each do |vm_key|
|
||||
begin
|
||||
vm = vm_key.sub('vmpooler__vm__', '')
|
||||
pool_name = redis.hget(vm_key, 'pool')
|
||||
next unless pool_name
|
||||
|
||||
in_any_queue = redis.sismember("vmpooler__pending__#{pool_name}", vm) ||
|
||||
redis.sismember("vmpooler__ready__#{pool_name}", vm) ||
|
||||
redis.sismember("vmpooler__running__#{pool_name}", vm) ||
|
||||
redis.sismember("vmpooler__completed__#{pool_name}", vm) ||
|
||||
redis.sismember("vmpooler__discovered__#{pool_name}", vm) ||
|
||||
redis.sismember("vmpooler__migrating__#{pool_name}", vm)
|
||||
|
||||
orphaned_count += 1 unless in_any_queue
|
||||
rescue StandardError
|
||||
# Skip on error
|
||||
end
|
||||
end
|
||||
|
||||
orphaned_count
|
||||
end
|
||||
|
||||
def determine_health_status(metrics)
|
||||
thresholds = health_thresholds
|
||||
|
||||
# Check DLQ size
|
||||
dlq_size = metrics['errors']['dlq_total_size']
|
||||
return 'unhealthy' if dlq_size > thresholds['dlq_max_critical']
|
||||
|
||||
# Check stuck VM count
|
||||
stuck_count = metrics['errors']['stuck_vm_count']
|
||||
return 'unhealthy' if stuck_count > thresholds['stuck_vm_max_critical']
|
||||
|
||||
# Check queue sizes
|
||||
metrics['queues'].each do |pool_name, queues|
|
||||
next if pool_name == 'dlq'
|
||||
|
||||
pending_size = begin
|
||||
queues['pending']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
ready_size = begin
|
||||
queues['ready']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
|
||||
return 'unhealthy' if pending_size > thresholds['pending_queue_max'] * 2
|
||||
return 'unhealthy' if ready_size > thresholds['ready_queue_max'] * 2
|
||||
end
|
||||
|
||||
# Check for degraded conditions
|
||||
return 'degraded' if dlq_size > thresholds['dlq_max_warning']
|
||||
return 'degraded' if stuck_count > thresholds['stuck_vm_max_warning']
|
||||
|
||||
metrics['queues'].each do |pool_name, queues|
|
||||
next if pool_name == 'dlq'
|
||||
|
||||
pending_size = begin
|
||||
queues['pending']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
ready_size = begin
|
||||
queues['ready']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
|
||||
return 'degraded' if pending_size > thresholds['pending_queue_max']
|
||||
return 'degraded' if ready_size > thresholds['ready_queue_max']
|
||||
end
|
||||
|
||||
'healthy'
|
||||
end
|
||||
|
||||
def log_health_summary(metrics, status)
|
||||
summary = "[*] [health] Status: #{status.upcase}"
|
||||
|
||||
# Queue summary
|
||||
total_pending = 0
|
||||
total_ready = 0
|
||||
total_completed = 0
|
||||
|
||||
metrics['queues'].each do |pool_name, queues|
|
||||
next if pool_name == 'dlq'
|
||||
|
||||
total_pending += begin
|
||||
queues['pending']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
total_ready += begin
|
||||
queues['ready']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
total_completed += begin
|
||||
queues['completed']['size']
|
||||
rescue StandardError
|
||||
0
|
||||
end
|
||||
end
|
||||
|
||||
summary += " | Queues: P=#{total_pending} R=#{total_ready} C=#{total_completed}"
|
||||
summary += " | DLQ=#{metrics['errors']['dlq_total_size']}"
|
||||
summary += " | Stuck=#{metrics['errors']['stuck_vm_count']}"
|
||||
summary += " | Orphaned=#{metrics['errors']['orphaned_metadata_count']}"
|
||||
|
||||
log_level = status == 'healthy' ? 's' : 'd'
|
||||
$logger.log(log_level, summary)
|
||||
end
|
||||
|
||||
def push_health_metrics(metrics, status)
|
||||
# Push error metrics first
|
||||
$metrics.gauge('vmpooler_health.dlq.total_size', metrics['errors']['dlq_total_size'])
|
||||
$metrics.gauge('vmpooler_health.stuck_vms.count', metrics['errors']['stuck_vm_count'])
|
||||
$metrics.gauge('vmpooler_health.orphaned_metadata.count', metrics['errors']['orphaned_metadata_count'])
|
||||
|
||||
# Push per-pool queue metrics
|
||||
metrics['queues'].each do |pool_name, queues|
|
||||
next if pool_name == 'dlq'
|
||||
|
||||
$metrics.gauge("vmpooler_health.queue.#{pool_name}.pending.size", queues['pending']['size'])
|
||||
$metrics.gauge("vmpooler_health.queue.#{pool_name}.pending.oldest_age", queues['pending']['oldest_age'])
|
||||
$metrics.gauge("vmpooler_health.queue.#{pool_name}.pending.stuck_count", queues['pending']['stuck_count'])
|
||||
|
||||
$metrics.gauge("vmpooler_health.queue.#{pool_name}.ready.size", queues['ready']['size'])
|
||||
$metrics.gauge("vmpooler_health.queue.#{pool_name}.ready.oldest_age", queues['ready']['oldest_age'])
|
||||
|
||||
$metrics.gauge("vmpooler_health.queue.#{pool_name}.completed.size", queues['completed']['size'])
|
||||
end
|
||||
|
||||
# Push DLQ metrics
|
||||
metrics['queues']['dlq']&.each do |queue_type, dlq_metrics|
|
||||
$metrics.gauge("vmpooler_health.dlq.#{queue_type}.size", dlq_metrics['size'])
|
||||
end
|
||||
|
||||
# Push task metrics
|
||||
$metrics.gauge('vmpooler_health.tasks.clone.active', metrics['tasks']['clone']['active'])
|
||||
$metrics.gauge('vmpooler_health.tasks.ondemand.active', metrics['tasks']['ondemand']['active'])
|
||||
$metrics.gauge('vmpooler_health.tasks.ondemand.pending', metrics['tasks']['ondemand']['pending'])
|
||||
|
||||
# Push status last (0=healthy, 1=degraded, 2=unhealthy)
|
||||
status_value = { 'healthy' => 0, 'degraded' => 1, 'unhealthy' => 2 }[status] || 2
|
||||
$metrics.gauge('vmpooler_health.status', status_value)
|
||||
end
|
||||
|
||||
def create_vm_disk(pool_name, vm, disk_size, provider)
|
||||
Thread.new do
|
||||
begin
|
||||
|
|
@ -1070,7 +1696,12 @@ module Vmpooler
|
|||
|
||||
sync_pool_template(pool)
|
||||
loop do
|
||||
start_time = Time.now
|
||||
result = _check_pool(pool, provider)
|
||||
duration = Time.now - start_time
|
||||
|
||||
$metrics.gauge("vmpooler_performance.check_pool.#{pool['name']}", duration)
|
||||
$logger.log('d', "[!] check_pool for #{pool['name']} took #{duration.round(2)}s") if duration > 5
|
||||
|
||||
if result[:cloned_vms] > 0 || result[:checked_pending_vms] > 0 || result[:discovered_vms] > 0
|
||||
loop_delay = loop_delay_min
|
||||
|
|
@ -1629,6 +2260,15 @@ module Vmpooler
|
|||
redis.zrem('vmpooler__provisioning__request', request_id)
|
||||
return
|
||||
end
|
||||
|
||||
# Check if request was already marked as failed (e.g., by delete endpoint)
|
||||
request_status = redis.hget("vmpooler__odrequest__#{request_id}", 'status')
|
||||
if request_status == 'failed'
|
||||
$logger.log('s', "Request '#{request_id}' already marked as failed, skipping VM creation")
|
||||
redis.zrem('vmpooler__provisioning__request', request_id)
|
||||
return
|
||||
end
|
||||
|
||||
score = redis.zscore('vmpooler__provisioning__request', request_id)
|
||||
requested = requested.split(',')
|
||||
|
||||
|
|
@ -1852,6 +2492,48 @@ module Vmpooler
|
|||
check_ondemand_requests(check_loop_delay_min, check_loop_delay_max, check_loop_delay_decay)
|
||||
end
|
||||
|
||||
# Queue purge thread
|
||||
if purge_enabled?
|
||||
purge_interval = ($config[:config] && $config[:config]['purge_interval']) || 3600 # default 1 hour
|
||||
if !$threads['queue_purge']
|
||||
$threads['queue_purge'] = Thread.new do
|
||||
loop do
|
||||
purge_stale_queue_entries
|
||||
sleep(purge_interval)
|
||||
end
|
||||
end
|
||||
elsif !$threads['queue_purge'].alive?
|
||||
$logger.log('d', '[!] [queue_purge] worker thread died, restarting')
|
||||
$threads['queue_purge'] = Thread.new do
|
||||
loop do
|
||||
purge_stale_queue_entries
|
||||
sleep(purge_interval)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Health check thread
|
||||
if health_check_enabled?
|
||||
health_interval = ($config[:config] && $config[:config]['health_check_interval']) || 300 # default 5 minutes
|
||||
if !$threads['health_check']
|
||||
$threads['health_check'] = Thread.new do
|
||||
loop do
|
||||
check_queue_health
|
||||
sleep(health_interval)
|
||||
end
|
||||
end
|
||||
elsif !$threads['health_check'].alive?
|
||||
$logger.log('d', '[!] [health_check] worker thread died, restarting')
|
||||
$threads['health_check'] = Thread.new do
|
||||
loop do
|
||||
check_queue_health
|
||||
sleep(health_interval)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
sleep(loop_delay)
|
||||
|
||||
unless maxloop == 0
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue