Fix RuboCop style violations

This commit is contained in:
Mahima Singh 2025-12-19 13:33:43 +05:30
parent a83916a0a4
commit 6d6e998bf4

View file

@ -162,12 +162,12 @@ module Vmpooler
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias') if request_id pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias') if request_id
open_socket_error = redis.hget("vmpooler__vm__#{vm}", 'open_socket_error') open_socket_error = redis.hget("vmpooler__vm__#{vm}", 'open_socket_error')
retry_count = redis.hget("vmpooler__odrequest__#{request_id}", 'retry_count').to_i if request_id retry_count = redis.hget("vmpooler__odrequest__#{request_id}", 'retry_count').to_i if request_id
# Move to DLQ before moving to completed queue # Move to DLQ before moving to completed queue
move_to_dlq(vm, pool, 'pending', 'Timeout', move_to_dlq(vm, pool, 'pending', 'Timeout',
open_socket_error || 'VM timed out during pending phase', open_socket_error || 'VM timed out during pending phase',
redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count) redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count)
redis.smove("vmpooler__pending__#{pool}", "vmpooler__completed__#{pool}", vm) redis.smove("vmpooler__pending__#{pool}", "vmpooler__completed__#{pool}", vm)
if request_id if request_id
ondemandrequest_hash = redis.hgetall("vmpooler__odrequest__#{request_id}") ondemandrequest_hash = redis.hgetall("vmpooler__odrequest__#{request_id}")
@ -234,12 +234,12 @@ module Vmpooler
open_socket_error = redis.hget("vmpooler__vm__#{vm_name}", 'open_socket_error') open_socket_error = redis.hget("vmpooler__vm__#{vm_name}", 'open_socket_error')
request_id = redis.hget("vmpooler__vm__#{vm_name}", 'request_id') request_id = redis.hget("vmpooler__vm__#{vm_name}", 'request_id')
pool_alias = redis.hget("vmpooler__vm__#{vm_name}", 'pool_alias') pool_alias = redis.hget("vmpooler__vm__#{vm_name}", 'pool_alias')
# Move to DLQ before moving to completed queue # Move to DLQ before moving to completed queue
move_to_dlq(vm_name, pool_name, 'ready', e.class.name, move_to_dlq(vm_name, pool_name, 'ready', e.class.name,
open_socket_error || 'VM became unreachable in ready queue', open_socket_error || 'VM became unreachable in ready queue',
redis, request_id: request_id, pool_alias: pool_alias) redis, request_id: request_id, pool_alias: pool_alias)
move_vm_queue(pool_name, vm_name, 'ready', 'completed', redis, "removed from 'ready' queue. vm unreachable with error: #{open_socket_error}") move_vm_queue(pool_name, vm_name, 'ready', 'completed', redis, "removed from 'ready' queue. vm unreachable with error: #{open_socket_error}")
end end
@ -382,7 +382,7 @@ module Vmpooler
end end
def dlq_max_entries def dlq_max_entries
($config[:config] && $config[:config]['dlq_max_entries']) || 10000 ($config[:config] && $config[:config]['dlq_max_entries']) || 10_000
end end
def move_to_dlq(vm, pool, queue_type, error_class, error_message, redis, request_id: nil, pool_alias: nil, retry_count: 0, skip_metrics: false) def move_to_dlq(vm, pool, queue_type, error_class, error_message, redis, request_id: nil, pool_alias: nil, retry_count: 0, skip_metrics: false)
@ -566,11 +566,11 @@ module Vmpooler
ondemandrequest_hash = redis.hgetall("vmpooler__odrequest__#{request_id}") ondemandrequest_hash = redis.hgetall("vmpooler__odrequest__#{request_id}")
retry_count = ondemandrequest_hash['retry_count'].to_i if ondemandrequest_hash retry_count = ondemandrequest_hash['retry_count'].to_i if ondemandrequest_hash
end end
# Move to DLQ before removing from pending queue # Move to DLQ before removing from pending queue
move_to_dlq(new_vmname, pool_name, 'clone', e.class.name, e.message, move_to_dlq(new_vmname, pool_name, 'clone', e.class.name, e.message,
redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count) redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count)
redis.pipelined do |pipeline| redis.pipelined do |pipeline|
pipeline.srem("vmpooler__pending__#{pool_name}", new_vmname) pipeline.srem("vmpooler__pending__#{pool_name}", new_vmname)
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60 expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
@ -676,7 +676,7 @@ module Vmpooler
end end
def max_ready_age def max_ready_age
($config[:config] && $config[:config]['max_ready_age']) || 86400 # default 24 hours in seconds ($config[:config] && $config[:config]['max_ready_age']) || 86_400 # default 24 hours in seconds
end end
def max_completed_age def max_completed_age
@ -684,7 +684,7 @@ module Vmpooler
end end
def max_orphaned_age def max_orphaned_age
($config[:config] && $config[:config]['max_orphaned_age']) || 86400 # default 24 hours in seconds ($config[:config] && $config[:config]['max_orphaned_age']) || 86_400 # default 24 hours in seconds
end end
def purge_stale_queue_entries def purge_stale_queue_entries
@ -694,31 +694,31 @@ module Vmpooler
begin begin
$logger.log('d', '[*] [purge] Starting stale queue entry purge cycle') $logger.log('d', '[*] [purge] Starting stale queue entry purge cycle')
purge_start = Time.now purge_start = Time.now
@redis.with_metrics do |redis| @redis.with_metrics do |redis|
total_purged = 0 total_purged = 0
# Purge stale entries from each pool # Purge stale entries from each pool
$config[:pools].each do |pool| $config[:pools].each do |pool|
pool_name = pool['name'] pool_name = pool['name']
# Purge pending queue # Purge pending queue
purged_pending = purge_pending_queue(pool_name, redis) purged_pending = purge_pending_queue(pool_name, redis)
total_purged += purged_pending total_purged += purged_pending
# Purge ready queue # Purge ready queue
purged_ready = purge_ready_queue(pool_name, redis) purged_ready = purge_ready_queue(pool_name, redis)
total_purged += purged_ready total_purged += purged_ready
# Purge completed queue # Purge completed queue
purged_completed = purge_completed_queue(pool_name, redis) purged_completed = purge_completed_queue(pool_name, redis)
total_purged += purged_completed total_purged += purged_completed
end end
# Purge orphaned VM metadata # Purge orphaned VM metadata
purged_orphaned = purge_orphaned_metadata(redis) purged_orphaned = purge_orphaned_metadata(redis)
total_purged += purged_orphaned total_purged += purged_orphaned
purge_duration = Time.now - purge_start purge_duration = Time.now - purge_start
$logger.log('s', "[*] [purge] Completed purge cycle in #{purge_duration.round(2)}s: #{total_purged} entries purged") $logger.log('s', "[*] [purge] Completed purge cycle in #{purge_duration.round(2)}s: #{total_purged} entries purged")
$metrics.timing('purge.cycle.duration', purge_duration) $metrics.timing('purge.cycle.duration', purge_duration)
@ -734,37 +734,37 @@ module Vmpooler
queue_key = "vmpooler__pending__#{pool_name}" queue_key = "vmpooler__pending__#{pool_name}"
vms = redis.smembers(queue_key) vms = redis.smembers(queue_key)
purged_count = 0 purged_count = 0
vms.each do |vm| vms.each do |vm|
begin begin
clone_time_str = redis.hget("vmpooler__vm__#{vm}", 'clone') clone_time_str = redis.hget("vmpooler__vm__#{vm}", 'clone')
next unless clone_time_str next unless clone_time_str
clone_time = Time.parse(clone_time_str) clone_time = Time.parse(clone_time_str)
age = Time.now - clone_time age = Time.now - clone_time
if age > max_pending_age if age > max_pending_age
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id') request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias') pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias')
purged_count += 1 purged_count += 1
if purge_dry_run? if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge stale pending VM '#{vm}' (age: #{age.round(0)}s, max: #{max_pending_age}s)") $logger.log('d', "[*] [purge][dry-run] Would purge stale pending VM '#{vm}' (age: #{age.round(0)}s, max: #{max_pending_age}s)")
else else
# Move to DLQ before removing (skip DLQ metric since we're tracking purge metric) # Move to DLQ before removing (skip DLQ metric since we're tracking purge metric)
move_to_dlq(vm, pool_name, 'pending', 'Purge', move_to_dlq(vm, pool_name, 'pending', 'Purge',
"Stale pending VM (age: #{age.round(0)}s > max: #{max_pending_age}s)", "Stale pending VM (age: #{age.round(0)}s > max: #{max_pending_age}s)",
redis, request_id: request_id, pool_alias: pool_alias, skip_metrics: true) redis, request_id: request_id, pool_alias: pool_alias, skip_metrics: true)
redis.srem(queue_key, vm) redis.srem(queue_key, vm)
# Set expiration on VM metadata if data_ttl is configured # Set expiration on VM metadata if data_ttl is configured
if $config[:redis] && $config[:redis]['data_ttl'] if $config[:redis] && $config[:redis]['data_ttl']
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60 expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
redis.expire("vmpooler__vm__#{vm}", expiration_ttl) redis.expire("vmpooler__vm__#{vm}", expiration_ttl)
end end
$logger.log('d', "[!] [purge] Purged stale pending VM '#{vm}' from '#{pool_name}' (age: #{age.round(0)}s)") $logger.log('d', "[!] [purge] Purged stale pending VM '#{vm}' from '#{pool_name}' (age: #{age.round(0)}s)")
$metrics.increment("purge.pending.#{pool_name}.count") $metrics.increment("purge.pending.#{pool_name}.count")
end end
@ -773,7 +773,7 @@ module Vmpooler
$logger.log('d', "[!] [purge] Error checking pending VM '#{vm}': #{e}") $logger.log('d', "[!] [purge] Error checking pending VM '#{vm}': #{e}")
end end
end end
purged_count purged_count
end end
@ -781,15 +781,15 @@ module Vmpooler
queue_key = "vmpooler__ready__#{pool_name}" queue_key = "vmpooler__ready__#{pool_name}"
vms = redis.smembers(queue_key) vms = redis.smembers(queue_key)
purged_count = 0 purged_count = 0
vms.each do |vm| vms.each do |vm|
begin begin
ready_time_str = redis.hget("vmpooler__vm__#{vm}", 'ready') ready_time_str = redis.hget("vmpooler__vm__#{vm}", 'ready')
next unless ready_time_str next unless ready_time_str
ready_time = Time.parse(ready_time_str) ready_time = Time.parse(ready_time_str)
age = Time.now - ready_time age = Time.now - ready_time
if age > max_ready_age if age > max_ready_age
if purge_dry_run? if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge stale ready VM '#{vm}' (age: #{age.round(0)}s, max: #{max_ready_age}s)") $logger.log('d', "[*] [purge][dry-run] Would purge stale ready VM '#{vm}' (age: #{age.round(0)}s, max: #{max_ready_age}s)")
@ -804,7 +804,7 @@ module Vmpooler
$logger.log('d', "[!] [purge] Error checking ready VM '#{vm}': #{e}") $logger.log('d', "[!] [purge] Error checking ready VM '#{vm}': #{e}")
end end
end end
purged_count purged_count
end end
@ -812,20 +812,20 @@ module Vmpooler
queue_key = "vmpooler__completed__#{pool_name}" queue_key = "vmpooler__completed__#{pool_name}"
vms = redis.smembers(queue_key) vms = redis.smembers(queue_key)
purged_count = 0 purged_count = 0
vms.each do |vm| vms.each do |vm|
begin begin
# Check destroy time or last activity time # Check destroy time or last activity time
destroy_time_str = redis.hget("vmpooler__vm__#{vm}", 'destroy') destroy_time_str = redis.hget("vmpooler__vm__#{vm}", 'destroy')
checkout_time_str = redis.hget("vmpooler__vm__#{vm}", 'checkout') checkout_time_str = redis.hget("vmpooler__vm__#{vm}", 'checkout')
# Use the most recent timestamp # Use the most recent timestamp
timestamp_str = destroy_time_str || checkout_time_str timestamp_str = destroy_time_str || checkout_time_str
next unless timestamp_str next unless timestamp_str
timestamp = Time.parse(timestamp_str) timestamp = Time.parse(timestamp_str)
age = Time.now - timestamp age = Time.now - timestamp
if age > max_completed_age if age > max_completed_age
if purge_dry_run? if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge stale completed VM '#{vm}' (age: #{age.round(0)}s, max: #{max_completed_age}s)") $logger.log('d', "[*] [purge][dry-run] Would purge stale completed VM '#{vm}' (age: #{age.round(0)}s, max: #{max_completed_age}s)")
@ -840,7 +840,7 @@ module Vmpooler
$logger.log('d', "[!] [purge] Error checking completed VM '#{vm}': #{e}") $logger.log('d', "[!] [purge] Error checking completed VM '#{vm}': #{e}")
end end
end end
purged_count purged_count
end end
@ -848,31 +848,31 @@ module Vmpooler
# Find VM metadata that doesn't belong to any queue # Find VM metadata that doesn't belong to any queue
all_vm_keys = redis.keys('vmpooler__vm__*') all_vm_keys = redis.keys('vmpooler__vm__*')
purged_count = 0 purged_count = 0
all_vm_keys.each do |vm_key| all_vm_keys.each do |vm_key|
begin begin
vm = vm_key.sub('vmpooler__vm__', '') vm = vm_key.sub('vmpooler__vm__', '')
# Check if VM exists in any queue # Check if VM exists in any queue
pool_name = redis.hget(vm_key, 'pool') pool_name = redis.hget(vm_key, 'pool')
next unless pool_name next unless pool_name
in_pending = redis.sismember("vmpooler__pending__#{pool_name}", vm) in_pending = redis.sismember("vmpooler__pending__#{pool_name}", vm)
in_ready = redis.sismember("vmpooler__ready__#{pool_name}", vm) in_ready = redis.sismember("vmpooler__ready__#{pool_name}", vm)
in_running = redis.sismember("vmpooler__running__#{pool_name}", vm) in_running = redis.sismember("vmpooler__running__#{pool_name}", vm)
in_completed = redis.sismember("vmpooler__completed__#{pool_name}", vm) in_completed = redis.sismember("vmpooler__completed__#{pool_name}", vm)
in_discovered = redis.sismember("vmpooler__discovered__#{pool_name}", vm) in_discovered = redis.sismember("vmpooler__discovered__#{pool_name}", vm)
in_migrating = redis.sismember("vmpooler__migrating__#{pool_name}", vm) in_migrating = redis.sismember("vmpooler__migrating__#{pool_name}", vm)
# VM is orphaned if not in any queue # VM is orphaned if not in any queue
unless in_pending || in_ready || in_running || in_completed || in_discovered || in_migrating unless in_pending || in_ready || in_running || in_completed || in_discovered || in_migrating
# Check age # Check age
clone_time_str = redis.hget(vm_key, 'clone') clone_time_str = redis.hget(vm_key, 'clone')
next unless clone_time_str next unless clone_time_str
clone_time = Time.parse(clone_time_str) clone_time = Time.parse(clone_time_str)
age = Time.now - clone_time age = Time.now - clone_time
if age > max_orphaned_age if age > max_orphaned_age
if purge_dry_run? if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge orphaned metadata for '#{vm}' (age: #{age.round(0)}s, max: #{max_orphaned_age}s)") $logger.log('d', "[*] [purge][dry-run] Would purge orphaned metadata for '#{vm}' (age: #{age.round(0)}s, max: #{max_orphaned_age}s)")
@ -880,7 +880,7 @@ module Vmpooler
expiration_ttl = 3600 # 1 hour expiration_ttl = 3600 # 1 hour
redis.expire(vm_key, expiration_ttl) redis.expire(vm_key, expiration_ttl)
$logger.log('d', "[!] [purge] Set expiration on orphaned metadata for '#{vm}' (age: #{age.round(0)}s)") $logger.log('d', "[!] [purge] Set expiration on orphaned metadata for '#{vm}' (age: #{age.round(0)}s)")
$metrics.increment("purge.orphaned.count") $metrics.increment('purge.orphaned.count')
end end
purged_count += 1 purged_count += 1
end end
@ -889,7 +889,7 @@ module Vmpooler
$logger.log('d', "[!] [purge] Error checking orphaned metadata '#{vm_key}': #{e}") $logger.log('d', "[!] [purge] Error checking orphaned metadata '#{vm_key}': #{e}")
end end
end end
purged_count purged_count
end end
@ -904,11 +904,11 @@ module Vmpooler
'ready_queue_max' => 500, 'ready_queue_max' => 500,
'dlq_max_warning' => 100, 'dlq_max_warning' => 100,
'dlq_max_critical' => 1000, 'dlq_max_critical' => 1000,
'stuck_vm_age_threshold' => 7200, # 2 hours 'stuck_vm_age_threshold' => 7200, # 2 hours
'stuck_vm_max_warning' => 10, 'stuck_vm_max_warning' => 10,
'stuck_vm_max_critical' => 50 'stuck_vm_max_critical' => 50
} }
if $config[:config] && $config[:config]['health_thresholds'] if $config[:config] && $config[:config]['health_thresholds']
defaults.merge($config[:config]['health_thresholds']) defaults.merge($config[:config]['health_thresholds'])
else else
@ -923,23 +923,23 @@ module Vmpooler
begin begin
$logger.log('d', '[*] [health] Running queue health check') $logger.log('d', '[*] [health] Running queue health check')
health_start = Time.now health_start = Time.now
@redis.with_metrics do |redis| @redis.with_metrics do |redis|
health_metrics = calculate_health_metrics(redis) health_metrics = calculate_health_metrics(redis)
health_status = determine_health_status(health_metrics) health_status = determine_health_status(health_metrics)
# Store health metrics in Redis for API consumption # Store health metrics in Redis for API consumption
redis.hmset('vmpooler__health', *health_metrics.to_a.flatten) redis.hmset('vmpooler__health', *health_metrics.to_a.flatten)
redis.hset('vmpooler__health', 'status', health_status) redis.hset('vmpooler__health', 'status', health_status)
redis.hset('vmpooler__health', 'last_check', Time.now.iso8601) redis.hset('vmpooler__health', 'last_check', Time.now.iso8601)
redis.expire('vmpooler__health', 3600) # Expire after 1 hour redis.expire('vmpooler__health', 3600) # Expire after 1 hour
# Log health summary # Log health summary
log_health_summary(health_metrics, health_status) log_health_summary(health_metrics, health_status)
# Push metrics # Push metrics
push_health_metrics(health_metrics, health_status) push_health_metrics(health_metrics, health_status)
health_duration = Time.now - health_start health_duration = Time.now - health_start
$metrics.timing('health.check.duration', health_duration) $metrics.timing('health.check.duration', health_duration)
end end
@ -955,55 +955,55 @@ module Vmpooler
'tasks' => {}, 'tasks' => {},
'errors' => {} 'errors' => {}
} }
total_stuck_vms = 0 total_stuck_vms = 0
total_dlq_size = 0 total_dlq_size = 0
thresholds = health_thresholds thresholds = health_thresholds
# Check each pool's queues # Check each pool's queues
$config[:pools].each do |pool| $config[:pools].each do |pool|
pool_name = pool['name'] pool_name = pool['name']
metrics['queues'][pool_name] = {} metrics['queues'][pool_name] = {}
# Pending queue metrics # Pending queue metrics
pending_key = "vmpooler__pending__#{pool_name}" pending_key = "vmpooler__pending__#{pool_name}"
pending_vms = redis.smembers(pending_key) pending_vms = redis.smembers(pending_key)
pending_ages = calculate_queue_ages(pending_vms, 'clone', redis) pending_ages = calculate_queue_ages(pending_vms, 'clone', redis)
stuck_pending = pending_ages.count { |age| age > thresholds['stuck_vm_age_threshold'] } stuck_pending = pending_ages.count { |age| age > thresholds['stuck_vm_age_threshold'] }
total_stuck_vms += stuck_pending total_stuck_vms += stuck_pending
metrics['queues'][pool_name]['pending'] = { metrics['queues'][pool_name]['pending'] = {
'size' => pending_vms.size, 'size' => pending_vms.size,
'oldest_age' => pending_ages.max || 0, 'oldest_age' => pending_ages.max || 0,
'avg_age' => pending_ages.empty? ? 0 : (pending_ages.sum / pending_ages.size).round(0), 'avg_age' => pending_ages.empty? ? 0 : (pending_ages.sum / pending_ages.size).round(0),
'stuck_count' => stuck_pending 'stuck_count' => stuck_pending
} }
# Ready queue metrics # Ready queue metrics
ready_key = "vmpooler__ready__#{pool_name}" ready_key = "vmpooler__ready__#{pool_name}"
ready_vms = redis.smembers(ready_key) ready_vms = redis.smembers(ready_key)
ready_ages = calculate_queue_ages(ready_vms, 'ready', redis) ready_ages = calculate_queue_ages(ready_vms, 'ready', redis)
metrics['queues'][pool_name]['ready'] = { metrics['queues'][pool_name]['ready'] = {
'size' => ready_vms.size, 'size' => ready_vms.size,
'oldest_age' => ready_ages.max || 0, 'oldest_age' => ready_ages.max || 0,
'avg_age' => ready_ages.empty? ? 0 : (ready_ages.sum / ready_ages.size).round(0) 'avg_age' => ready_ages.empty? ? 0 : (ready_ages.sum / ready_ages.size).round(0)
} }
# Completed queue metrics # Completed queue metrics
completed_key = "vmpooler__completed__#{pool_name}" completed_key = "vmpooler__completed__#{pool_name}"
completed_size = redis.scard(completed_key) completed_size = redis.scard(completed_key)
metrics['queues'][pool_name]['completed'] = { 'size' => completed_size } metrics['queues'][pool_name]['completed'] = { 'size' => completed_size }
end end
# Task queue metrics # Task queue metrics
clone_active = redis.get('vmpooler__tasks__clone').to_i clone_active = redis.get('vmpooler__tasks__clone').to_i
ondemand_active = redis.get('vmpooler__tasks__ondemandclone').to_i ondemand_active = redis.get('vmpooler__tasks__ondemandclone').to_i
odcreate_pending = redis.zcard('vmpooler__odcreate__task') odcreate_pending = redis.zcard('vmpooler__odcreate__task')
metrics['tasks']['clone'] = { 'active' => clone_active } metrics['tasks']['clone'] = { 'active' => clone_active }
metrics['tasks']['ondemand'] = { 'active' => ondemand_active, 'pending' => odcreate_pending } metrics['tasks']['ondemand'] = { 'active' => ondemand_active, 'pending' => odcreate_pending }
# DLQ metrics # DLQ metrics
if dlq_enabled? if dlq_enabled?
dlq_keys = redis.keys('vmpooler__dlq__*') dlq_keys = redis.keys('vmpooler__dlq__*')
@ -1015,15 +1015,15 @@ module Vmpooler
metrics['queues']['dlq'][queue_type] = { 'size' => dlq_size } metrics['queues']['dlq'][queue_type] = { 'size' => dlq_size }
end end
end end
# Error metrics # Error metrics
metrics['errors']['dlq_total_size'] = total_dlq_size metrics['errors']['dlq_total_size'] = total_dlq_size
metrics['errors']['stuck_vm_count'] = total_stuck_vms metrics['errors']['stuck_vm_count'] = total_stuck_vms
# Orphaned metadata count # Orphaned metadata count
orphaned_count = count_orphaned_metadata(redis) orphaned_count = count_orphaned_metadata(redis)
metrics['errors']['orphaned_metadata_count'] = orphaned_count metrics['errors']['orphaned_metadata_count'] = orphaned_count
metrics metrics
end end
@ -1033,7 +1033,7 @@ module Vmpooler
begin begin
timestamp_str = redis.hget("vmpooler__vm__#{vm}", timestamp_field) timestamp_str = redis.hget("vmpooler__vm__#{vm}", timestamp_field)
next unless timestamp_str next unless timestamp_str
timestamp = Time.parse(timestamp_str) timestamp = Time.parse(timestamp_str)
age = (Time.now - timestamp).to_i age = (Time.now - timestamp).to_i
ages << age ages << age
@ -1047,88 +1047,117 @@ module Vmpooler
def count_orphaned_metadata(redis) def count_orphaned_metadata(redis)
all_vm_keys = redis.keys('vmpooler__vm__*') all_vm_keys = redis.keys('vmpooler__vm__*')
orphaned_count = 0 orphaned_count = 0
all_vm_keys.each do |vm_key| all_vm_keys.each do |vm_key|
begin begin
vm = vm_key.sub('vmpooler__vm__', '') vm = vm_key.sub('vmpooler__vm__', '')
pool_name = redis.hget(vm_key, 'pool') pool_name = redis.hget(vm_key, 'pool')
next unless pool_name next unless pool_name
in_any_queue = redis.sismember("vmpooler__pending__#{pool_name}", vm) || in_any_queue = redis.sismember("vmpooler__pending__#{pool_name}", vm) ||
redis.sismember("vmpooler__ready__#{pool_name}", vm) || redis.sismember("vmpooler__ready__#{pool_name}", vm) ||
redis.sismember("vmpooler__running__#{pool_name}", vm) || redis.sismember("vmpooler__running__#{pool_name}", vm) ||
redis.sismember("vmpooler__completed__#{pool_name}", vm) || redis.sismember("vmpooler__completed__#{pool_name}", vm) ||
redis.sismember("vmpooler__discovered__#{pool_name}", vm) || redis.sismember("vmpooler__discovered__#{pool_name}", vm) ||
redis.sismember("vmpooler__migrating__#{pool_name}", vm) redis.sismember("vmpooler__migrating__#{pool_name}", vm)
orphaned_count += 1 unless in_any_queue orphaned_count += 1 unless in_any_queue
rescue StandardError rescue StandardError
# Skip on error # Skip on error
end end
end end
orphaned_count orphaned_count
end end
def determine_health_status(metrics) def determine_health_status(metrics)
thresholds = health_thresholds thresholds = health_thresholds
# Check DLQ size # Check DLQ size
dlq_size = metrics['errors']['dlq_total_size'] dlq_size = metrics['errors']['dlq_total_size']
return 'unhealthy' if dlq_size > thresholds['dlq_max_critical'] return 'unhealthy' if dlq_size > thresholds['dlq_max_critical']
# Check stuck VM count # Check stuck VM count
stuck_count = metrics['errors']['stuck_vm_count'] stuck_count = metrics['errors']['stuck_vm_count']
return 'unhealthy' if stuck_count > thresholds['stuck_vm_max_critical'] return 'unhealthy' if stuck_count > thresholds['stuck_vm_max_critical']
# Check queue sizes # Check queue sizes
metrics['queues'].each do |pool_name, queues| metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq' next if pool_name == 'dlq'
pending_size = queues['pending']['size'] rescue 0 pending_size = begin
ready_size = queues['ready']['size'] rescue 0 queues['pending']['size']
rescue StandardError
0
end
ready_size = begin
queues['ready']['size']
rescue StandardError
0
end
return 'unhealthy' if pending_size > thresholds['pending_queue_max'] * 2 return 'unhealthy' if pending_size > thresholds['pending_queue_max'] * 2
return 'unhealthy' if ready_size > thresholds['ready_queue_max'] * 2 return 'unhealthy' if ready_size > thresholds['ready_queue_max'] * 2
end end
# Check for degraded conditions # Check for degraded conditions
return 'degraded' if dlq_size > thresholds['dlq_max_warning'] return 'degraded' if dlq_size > thresholds['dlq_max_warning']
return 'degraded' if stuck_count > thresholds['stuck_vm_max_warning'] return 'degraded' if stuck_count > thresholds['stuck_vm_max_warning']
metrics['queues'].each do |pool_name, queues| metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq' next if pool_name == 'dlq'
pending_size = queues['pending']['size'] rescue 0 pending_size = begin
ready_size = queues['ready']['size'] rescue 0 queues['pending']['size']
rescue StandardError
0
end
ready_size = begin
queues['ready']['size']
rescue StandardError
0
end
return 'degraded' if pending_size > thresholds['pending_queue_max'] return 'degraded' if pending_size > thresholds['pending_queue_max']
return 'degraded' if ready_size > thresholds['ready_queue_max'] return 'degraded' if ready_size > thresholds['ready_queue_max']
end end
'healthy' 'healthy'
end end
def log_health_summary(metrics, status) def log_health_summary(metrics, status)
summary = "[*] [health] Status: #{status.upcase}" summary = "[*] [health] Status: #{status.upcase}"
# Queue summary # Queue summary
total_pending = 0 total_pending = 0
total_ready = 0 total_ready = 0
total_completed = 0 total_completed = 0
metrics['queues'].each do |pool_name, queues| metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq' next if pool_name == 'dlq'
total_pending += queues['pending']['size'] rescue 0
total_ready += queues['ready']['size'] rescue 0 total_pending += begin
total_completed += queues['completed']['size'] rescue 0 queues['pending']['size']
rescue StandardError
0
end
total_ready += begin
queues['ready']['size']
rescue StandardError
0
end
total_completed += begin
queues['completed']['size']
rescue StandardError
0
end
end end
summary += " | Queues: P=#{total_pending} R=#{total_ready} C=#{total_completed}" summary += " | Queues: P=#{total_pending} R=#{total_ready} C=#{total_completed}"
summary += " | DLQ=#{metrics['errors']['dlq_total_size']}" summary += " | DLQ=#{metrics['errors']['dlq_total_size']}"
summary += " | Stuck=#{metrics['errors']['stuck_vm_count']}" summary += " | Stuck=#{metrics['errors']['stuck_vm_count']}"
summary += " | Orphaned=#{metrics['errors']['orphaned_metadata_count']}" summary += " | Orphaned=#{metrics['errors']['orphaned_metadata_count']}"
log_level = status == 'healthy' ? 's' : 'd' log_level = status == 'healthy' ? 's' : 'd'
$logger.log(log_level, summary) $logger.log(log_level, summary)
end end
@ -1138,33 +1167,31 @@ module Vmpooler
$metrics.gauge('health.dlq.total_size', metrics['errors']['dlq_total_size']) $metrics.gauge('health.dlq.total_size', metrics['errors']['dlq_total_size'])
$metrics.gauge('health.stuck_vms.count', metrics['errors']['stuck_vm_count']) $metrics.gauge('health.stuck_vms.count', metrics['errors']['stuck_vm_count'])
$metrics.gauge('health.orphaned_metadata.count', metrics['errors']['orphaned_metadata_count']) $metrics.gauge('health.orphaned_metadata.count', metrics['errors']['orphaned_metadata_count'])
# Push per-pool queue metrics # Push per-pool queue metrics
metrics['queues'].each do |pool_name, queues| metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq' next if pool_name == 'dlq'
$metrics.gauge("health.queue.#{pool_name}.pending.size", queues['pending']['size']) $metrics.gauge("health.queue.#{pool_name}.pending.size", queues['pending']['size'])
$metrics.gauge("health.queue.#{pool_name}.pending.oldest_age", queues['pending']['oldest_age']) $metrics.gauge("health.queue.#{pool_name}.pending.oldest_age", queues['pending']['oldest_age'])
$metrics.gauge("health.queue.#{pool_name}.pending.stuck_count", queues['pending']['stuck_count']) $metrics.gauge("health.queue.#{pool_name}.pending.stuck_count", queues['pending']['stuck_count'])
$metrics.gauge("health.queue.#{pool_name}.ready.size", queues['ready']['size']) $metrics.gauge("health.queue.#{pool_name}.ready.size", queues['ready']['size'])
$metrics.gauge("health.queue.#{pool_name}.ready.oldest_age", queues['ready']['oldest_age']) $metrics.gauge("health.queue.#{pool_name}.ready.oldest_age", queues['ready']['oldest_age'])
$metrics.gauge("health.queue.#{pool_name}.completed.size", queues['completed']['size']) $metrics.gauge("health.queue.#{pool_name}.completed.size", queues['completed']['size'])
end end
# Push DLQ metrics # Push DLQ metrics
if metrics['queues']['dlq'] metrics['queues']['dlq']&.each do |queue_type, dlq_metrics|
metrics['queues']['dlq'].each do |queue_type, dlq_metrics| $metrics.gauge("health.dlq.#{queue_type}.size", dlq_metrics['size'])
$metrics.gauge("health.dlq.#{queue_type}.size", dlq_metrics['size'])
end
end end
# Push task metrics # Push task metrics
$metrics.gauge('health.tasks.clone.active', metrics['tasks']['clone']['active']) $metrics.gauge('health.tasks.clone.active', metrics['tasks']['clone']['active'])
$metrics.gauge('health.tasks.ondemand.active', metrics['tasks']['ondemand']['active']) $metrics.gauge('health.tasks.ondemand.active', metrics['tasks']['ondemand']['active'])
$metrics.gauge('health.tasks.ondemand.pending', metrics['tasks']['ondemand']['pending']) $metrics.gauge('health.tasks.ondemand.pending', metrics['tasks']['ondemand']['pending'])
# Push status last (0=healthy, 1=degraded, 2=unhealthy) # Push status last (0=healthy, 1=degraded, 2=unhealthy)
status_value = { 'healthy' => 0, 'degraded' => 1, 'unhealthy' => 2 }[status] || 2 status_value = { 'healthy' => 0, 'degraded' => 1, 'unhealthy' => 2 }[status] || 2
$metrics.gauge('health.status', status_value) $metrics.gauge('health.status', status_value)