Merge pull request #688 from puppetlabs/P4DEVOPS-8567

Add DLQ, auto-purge, and health checks for Redis queues
This commit is contained in:
Mahima Singh 2025-12-26 15:30:15 +05:30 committed by GitHub
commit 4656d8bd4a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1338 additions and 14 deletions

View file

@ -3,11 +3,11 @@ source ENV['GEM_SOURCE'] || 'https://rubygems.org'
gemspec
# Evaluate Gemfile.local if it exists
if File.exists? "#{__FILE__}.local"
if File.exist? "#{__FILE__}.local"
instance_eval(File.read("#{__FILE__}.local"))
end
# Evaluate ~/.gemfile if it exists
if File.exists?(File.join(Dir.home, '.gemfile'))
if File.exist?(File.join(Dir.home, '.gemfile'))
instance_eval(File.read(File.join(Dir.home, '.gemfile')))
end

View file

@ -197,6 +197,7 @@ GEM
PLATFORMS
arm64-darwin-22
arm64-darwin-23
arm64-darwin-25
universal-java-11
universal-java-17
x86_64-darwin-22

View file

@ -329,6 +329,30 @@ module Vmpooler
buckets: REDIS_CONNECT_BUCKETS,
docstring: 'vmpooler redis connection wait time',
param_labels: %i[type provider]
},
vmpooler_health: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'vmpooler health check metrics',
param_labels: %i[metric_path]
},
vmpooler_purge: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'vmpooler purge metrics',
param_labels: %i[metric_path]
},
vmpooler_destroy: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'vmpooler destroy metrics',
param_labels: %i[poolname]
},
vmpooler_clone: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'vmpooler clone metrics',
param_labels: %i[poolname]
}
}
end

View file

@ -161,6 +161,13 @@ module Vmpooler
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias') if request_id
open_socket_error = redis.hget("vmpooler__vm__#{vm}", 'open_socket_error')
retry_count = redis.hget("vmpooler__odrequest__#{request_id}", 'retry_count').to_i if request_id
# Move to DLQ before moving to completed queue
move_to_dlq(vm, pool, 'pending', 'Timeout',
open_socket_error || 'VM timed out during pending phase',
redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count)
clone_error = redis.hget("vmpooler__vm__#{vm}", 'clone_error')
clone_error_class = redis.hget("vmpooler__vm__#{vm}", 'clone_error_class')
redis.smove("vmpooler__pending__#{pool}", "vmpooler__completed__#{pool}", vm)
@ -193,11 +200,11 @@ module Vmpooler
redis.hset("vmpooler__odrequest__#{request_id}", 'status', 'failed')
redis.hset("vmpooler__odrequest__#{request_id}", 'failure_reason', failure_reason)
$logger.log('s', "[!] [#{pool}] '#{vm}' permanently failed: #{failure_reason}")
$metrics.increment("errors.permanently_failed.#{pool}")
$metrics.increment("vmpooler_errors.permanently_failed.#{pool}")
end
end
end
$metrics.increment("errors.markedasfailed.#{pool}")
$metrics.increment("vmpooler_errors.markedasfailed.#{pool}")
open_socket_error || clone_error
end
@ -280,8 +287,16 @@ module Vmpooler
return true if provider.vm_ready?(pool_name, vm_name, redis)
raise("VM #{vm_name} is not ready")
rescue StandardError
rescue StandardError => e
open_socket_error = redis.hget("vmpooler__vm__#{vm_name}", 'open_socket_error')
request_id = redis.hget("vmpooler__vm__#{vm_name}", 'request_id')
pool_alias = redis.hget("vmpooler__vm__#{vm_name}", 'pool_alias')
# Move to DLQ before moving to completed queue
move_to_dlq(vm_name, pool_name, 'ready', e.class.name,
open_socket_error || 'VM became unreachable in ready queue',
redis, request_id: request_id, pool_alias: pool_alias)
move_vm_queue(pool_name, vm_name, 'ready', 'completed', redis, "removed from 'ready' queue. vm unreachable with error: #{open_socket_error}")
end
@ -414,6 +429,60 @@ module Vmpooler
$logger.log('d', "[!] [#{pool}] '#{vm}' #{msg}") if msg
end
# Dead-Letter Queue (DLQ) helper methods
def dlq_enabled?
$config[:config] && $config[:config]['dlq_enabled'] == true
end
def dlq_ttl
($config[:config] && $config[:config]['dlq_ttl']) || 168 # default 7 days in hours
end
def dlq_max_entries
($config[:config] && $config[:config]['dlq_max_entries']) || 10_000
end
def move_to_dlq(vm, pool, queue_type, error_class, error_message, redis, request_id: nil, pool_alias: nil, retry_count: 0, skip_metrics: false)
return unless dlq_enabled?
dlq_key = "vmpooler__dlq__#{queue_type}"
timestamp = Time.now.to_i
# Build DLQ entry
dlq_entry = {
'vm' => vm,
'pool' => pool,
'queue_from' => queue_type,
'error_class' => error_class.to_s,
'error_message' => error_message.to_s,
'failed_at' => Time.now.iso8601,
'retry_count' => retry_count,
'request_id' => request_id,
'pool_alias' => pool_alias
}.compact
# Use sorted set with timestamp as score for easy age-based queries and TTL
dlq_entry_json = dlq_entry.to_json
redis.zadd(dlq_key, timestamp, "#{vm}:#{timestamp}:#{dlq_entry_json}")
# Enforce max entries limit by removing oldest entries
current_size = redis.zcard(dlq_key)
if current_size > dlq_max_entries
remove_count = current_size - dlq_max_entries
redis.zremrangebyrank(dlq_key, 0, remove_count - 1)
$logger.log('d', "[!] [dlq] Trimmed #{remove_count} oldest entries from #{dlq_key}")
end
# Set expiration on the entire DLQ (will be refreshed on next write)
ttl_seconds = dlq_ttl * 3600
redis.expire(dlq_key, ttl_seconds)
$metrics.increment("vmpooler_dlq.#{queue_type}.count") unless skip_metrics
$logger.log('d', "[!] [dlq] Moved '#{vm}' from '#{queue_type}' queue to DLQ: #{error_message}")
rescue StandardError => e
$logger.log('s', "[!] [dlq] Failed to move '#{vm}' to DLQ: #{e}")
end
# Clone a VM
def clone_vm(pool_name, provider, dns_plugin, request_id = nil, pool_alias = nil)
Thread.new do
@ -482,10 +551,10 @@ module Vmpooler
hostname_retries += 1
if !hostname_available
$metrics.increment("errors.duplicatehostname.#{pool_name}")
$metrics.increment("vmpooler_errors.duplicatehostname.#{pool_name}")
$logger.log('s', "[!] [#{pool_name}] Generated hostname #{fqdn} was not unique (attempt \##{hostname_retries} of #{max_hostname_retries})")
elsif !dns_available
$metrics.increment("errors.staledns.#{pool_name}")
$metrics.increment("vmpooler_errors.staledns.#{pool_name}")
$logger.log('s', "[!] [#{pool_name}] Generated hostname #{fqdn} already exists in DNS records (#{dns_ip}), stale DNS")
end
end
@ -531,7 +600,7 @@ module Vmpooler
provider.create_vm(pool_name, new_vmname)
finish = format('%<time>.2f', time: Time.now - start)
$logger.log('s', "[+] [#{pool_name}] '#{new_vmname}' cloned in #{finish} seconds")
$metrics.timing("clone.#{pool_name}", finish)
$metrics.gauge("vmpooler_clone.#{pool_name}", finish)
$logger.log('d', "[ ] [#{pool_name}] Obtaining IP for '#{new_vmname}'")
ip_start = Time.now
@ -555,6 +624,17 @@ module Vmpooler
rescue StandardError => e
# Store error details for retry decision making
@redis.with_metrics do |redis|
# Get retry count before moving to DLQ
retry_count = 0
if request_id
ondemandrequest_hash = redis.hgetall("vmpooler__odrequest__#{request_id}")
retry_count = ondemandrequest_hash['retry_count'].to_i if ondemandrequest_hash
end
# Move to DLQ before removing from pending queue
move_to_dlq(new_vmname, pool_name, 'clone', e.class.name, e.message,
redis, request_id: request_id, pool_alias: pool_alias, retry_count: retry_count)
redis.pipelined do |pipeline|
pipeline.srem("vmpooler__pending__#{pool_name}", new_vmname)
pipeline.hset("vmpooler__vm__#{new_vmname}", 'clone_error', e.message)
@ -634,7 +714,7 @@ module Vmpooler
finish = format('%<time>.2f', time: Time.now - start)
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
$metrics.timing("destroy.#{pool}", finish)
$metrics.gauge("vmpooler_destroy.#{pool}", finish)
end
end
dereference_mutex(vm)
@ -670,6 +750,543 @@ module Vmpooler
provider.purge_unconfigured_resources(allowlist)
end
# Auto-purge stale queue entries
def purge_enabled?
$config[:config] && $config[:config]['purge_enabled'] == true
end
def purge_dry_run?
$config[:config] && $config[:config]['purge_dry_run'] == true
end
def max_pending_age
($config[:config] && $config[:config]['max_pending_age']) || 7200 # default 2 hours in seconds
end
def max_ready_age
($config[:config] && $config[:config]['max_ready_age']) || 86_400 # default 24 hours in seconds
end
def max_completed_age
($config[:config] && $config[:config]['max_completed_age']) || 3600 # default 1 hour in seconds
end
def max_orphaned_age
($config[:config] && $config[:config]['max_orphaned_age']) || 86_400 # default 24 hours in seconds
end
def purge_stale_queue_entries
return unless purge_enabled?
Thread.new do
begin
$logger.log('d', '[*] [purge] Starting stale queue entry purge cycle')
purge_start = Time.now
@redis.with_metrics do |redis|
total_purged = 0
# Purge stale entries from each pool
$config[:pools].each do |pool|
pool_name = pool['name']
# Purge pending queue
purged_pending = purge_pending_queue(pool_name, redis)
total_purged += purged_pending
# Purge ready queue
purged_ready = purge_ready_queue(pool_name, redis)
total_purged += purged_ready
# Purge completed queue
purged_completed = purge_completed_queue(pool_name, redis)
total_purged += purged_completed
end
# Purge orphaned VM metadata
purged_orphaned = purge_orphaned_metadata(redis)
total_purged += purged_orphaned
purge_duration = Time.now - purge_start
$logger.log('s', "[*] [purge] Completed purge cycle in #{purge_duration.round(2)}s: #{total_purged} entries purged")
$metrics.gauge('vmpooler_purge.cycle.duration', purge_duration)
$metrics.gauge('vmpooler_purge.total.count', total_purged)
end
rescue StandardError => e
$logger.log('s', "[!] [purge] Failed during purge cycle: #{e}")
end
end
end
def purge_pending_queue(pool_name, redis)
queue_key = "vmpooler__pending__#{pool_name}"
vms = redis.smembers(queue_key)
purged_count = 0
vms.each do |vm|
begin
clone_time_str = redis.hget("vmpooler__vm__#{vm}", 'clone')
next unless clone_time_str
clone_time = Time.parse(clone_time_str)
age = Time.now - clone_time
if age > max_pending_age
request_id = redis.hget("vmpooler__vm__#{vm}", 'request_id')
pool_alias = redis.hget("vmpooler__vm__#{vm}", 'pool_alias')
purged_count += 1
if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge stale pending VM '#{vm}' (age: #{age.round(0)}s, max: #{max_pending_age}s)")
else
# Move to DLQ before removing (skip DLQ metric since we're tracking purge metric)
move_to_dlq(vm, pool_name, 'pending', 'Purge',
"Stale pending VM (age: #{age.round(0)}s > max: #{max_pending_age}s)",
redis, request_id: request_id, pool_alias: pool_alias, skip_metrics: true)
redis.srem(queue_key, vm)
# Set expiration on VM metadata if data_ttl is configured
if $config[:redis] && $config[:redis]['data_ttl']
expiration_ttl = $config[:redis]['data_ttl'].to_i * 60 * 60
redis.expire("vmpooler__vm__#{vm}", expiration_ttl)
end
$logger.log('d', "[!] [purge] Purged stale pending VM '#{vm}' from '#{pool_name}' (age: #{age.round(0)}s)")
$metrics.increment("vmpooler_purge.pending.#{pool_name}.count")
end
end
rescue StandardError => e
$logger.log('d', "[!] [purge] Error checking pending VM '#{vm}': #{e}")
end
end
purged_count
end
def purge_ready_queue(pool_name, redis)
queue_key = "vmpooler__ready__#{pool_name}"
vms = redis.smembers(queue_key)
purged_count = 0
vms.each do |vm|
begin
ready_time_str = redis.hget("vmpooler__vm__#{vm}", 'ready')
next unless ready_time_str
ready_time = Time.parse(ready_time_str)
age = Time.now - ready_time
if age > max_ready_age
if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge stale ready VM '#{vm}' (age: #{age.round(0)}s, max: #{max_ready_age}s)")
else
redis.smove(queue_key, "vmpooler__completed__#{pool_name}", vm)
$logger.log('d', "[!] [purge] Moved stale ready VM '#{vm}' from '#{pool_name}' to completed (age: #{age.round(0)}s)")
$metrics.increment("vmpooler_purge.ready.#{pool_name}.count")
end
purged_count += 1
end
rescue StandardError => e
$logger.log('d', "[!] [purge] Error checking ready VM '#{vm}': #{e}")
end
end
purged_count
end
def purge_completed_queue(pool_name, redis)
queue_key = "vmpooler__completed__#{pool_name}"
vms = redis.smembers(queue_key)
purged_count = 0
vms.each do |vm|
begin
# Check destroy time or last activity time
destroy_time_str = redis.hget("vmpooler__vm__#{vm}", 'destroy')
checkout_time_str = redis.hget("vmpooler__vm__#{vm}", 'checkout')
# Use the most recent timestamp
timestamp_str = destroy_time_str || checkout_time_str
next unless timestamp_str
timestamp = Time.parse(timestamp_str)
age = Time.now - timestamp
if age > max_completed_age
if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge stale completed VM '#{vm}' (age: #{age.round(0)}s, max: #{max_completed_age}s)")
else
redis.srem(queue_key, vm)
$logger.log('d', "[!] [purge] Removed stale completed VM '#{vm}' from '#{pool_name}' (age: #{age.round(0)}s)")
$metrics.increment("vmpooler_purge.completed.#{pool_name}.count")
end
purged_count += 1
end
rescue StandardError => e
$logger.log('d', "[!] [purge] Error checking completed VM '#{vm}': #{e}")
end
end
purged_count
end
def purge_orphaned_metadata(redis)
# Find VM metadata that doesn't belong to any queue
all_vm_keys = redis.keys('vmpooler__vm__*')
purged_count = 0
all_vm_keys.each do |vm_key|
begin
vm = vm_key.sub('vmpooler__vm__', '')
# Check if VM exists in any queue
pool_name = redis.hget(vm_key, 'pool')
next unless pool_name
in_pending = redis.sismember("vmpooler__pending__#{pool_name}", vm)
in_ready = redis.sismember("vmpooler__ready__#{pool_name}", vm)
in_running = redis.sismember("vmpooler__running__#{pool_name}", vm)
in_completed = redis.sismember("vmpooler__completed__#{pool_name}", vm)
in_discovered = redis.sismember("vmpooler__discovered__#{pool_name}", vm)
in_migrating = redis.sismember("vmpooler__migrating__#{pool_name}", vm)
# VM is orphaned if not in any queue
unless in_pending || in_ready || in_running || in_completed || in_discovered || in_migrating
# Check age
clone_time_str = redis.hget(vm_key, 'clone')
next unless clone_time_str
clone_time = Time.parse(clone_time_str)
age = Time.now - clone_time
if age > max_orphaned_age
if purge_dry_run?
$logger.log('d', "[*] [purge][dry-run] Would purge orphaned metadata for '#{vm}' (age: #{age.round(0)}s, max: #{max_orphaned_age}s)")
else
expiration_ttl = 3600 # 1 hour
redis.expire(vm_key, expiration_ttl)
$logger.log('d', "[!] [purge] Set expiration on orphaned metadata for '#{vm}' (age: #{age.round(0)}s)")
$metrics.increment('vmpooler_purge.orphaned.count')
end
purged_count += 1
end
end
rescue StandardError => e
$logger.log('d', "[!] [purge] Error checking orphaned metadata '#{vm_key}': #{e}")
end
end
purged_count
end
# Health checks for Redis queues
def health_check_enabled?
$config[:config] && $config[:config]['health_check_enabled'] == true
end
def health_thresholds
defaults = {
'pending_queue_max' => 100,
'ready_queue_max' => 500,
'dlq_max_warning' => 100,
'dlq_max_critical' => 1000,
'stuck_vm_age_threshold' => 7200, # 2 hours
'stuck_vm_max_warning' => 10,
'stuck_vm_max_critical' => 50
}
if $config[:config] && $config[:config]['health_thresholds']
defaults.merge($config[:config]['health_thresholds'])
else
defaults
end
end
def check_queue_health
return unless health_check_enabled?
Thread.new do
begin
$logger.log('d', '[*] [health] Running queue health check')
health_start = Time.now
@redis.with_metrics do |redis|
health_metrics = calculate_health_metrics(redis)
health_status = determine_health_status(health_metrics)
# Store health metrics in Redis for API consumption
# Convert nested hash to JSON for storage
require 'json'
redis.hset('vmpooler__health', 'metrics', health_metrics.to_json)
redis.hset('vmpooler__health', 'status', health_status)
redis.hset('vmpooler__health', 'last_check', Time.now.iso8601)
redis.expire('vmpooler__health', 3600) # Expire after 1 hour
# Log health summary
log_health_summary(health_metrics, health_status)
# Push metrics
push_health_metrics(health_metrics, health_status)
health_duration = Time.now - health_start
$metrics.gauge('vmpooler_health.check.duration', health_duration)
end
rescue StandardError => e
$logger.log('s', "[!] [health] Failed during health check: #{e}")
end
end
end
def calculate_health_metrics(redis)
metrics = {
'queues' => {},
'tasks' => {},
'errors' => {}
}
total_stuck_vms = 0
total_dlq_size = 0
thresholds = health_thresholds
# Check each pool's queues
$config[:pools].each do |pool|
pool_name = pool['name']
metrics['queues'][pool_name] = {}
# Pending queue metrics
pending_key = "vmpooler__pending__#{pool_name}"
pending_vms = redis.smembers(pending_key)
pending_ages = calculate_queue_ages(pending_vms, 'clone', redis)
stuck_pending = pending_ages.count { |age| age > thresholds['stuck_vm_age_threshold'] }
total_stuck_vms += stuck_pending
metrics['queues'][pool_name]['pending'] = {
'size' => pending_vms.size,
'oldest_age' => pending_ages.max || 0,
'avg_age' => pending_ages.empty? ? 0 : (pending_ages.sum / pending_ages.size).round(0),
'stuck_count' => stuck_pending
}
# Ready queue metrics
ready_key = "vmpooler__ready__#{pool_name}"
ready_vms = redis.smembers(ready_key)
ready_ages = calculate_queue_ages(ready_vms, 'ready', redis)
metrics['queues'][pool_name]['ready'] = {
'size' => ready_vms.size,
'oldest_age' => ready_ages.max || 0,
'avg_age' => ready_ages.empty? ? 0 : (ready_ages.sum / ready_ages.size).round(0)
}
# Completed queue metrics
completed_key = "vmpooler__completed__#{pool_name}"
completed_size = redis.scard(completed_key)
metrics['queues'][pool_name]['completed'] = { 'size' => completed_size }
end
# Task queue metrics
clone_active = redis.get('vmpooler__tasks__clone').to_i
ondemand_active = redis.get('vmpooler__tasks__ondemandclone').to_i
odcreate_pending = redis.zcard('vmpooler__odcreate__task')
metrics['tasks']['clone'] = { 'active' => clone_active }
metrics['tasks']['ondemand'] = { 'active' => ondemand_active, 'pending' => odcreate_pending }
# DLQ metrics
if dlq_enabled?
dlq_keys = redis.keys('vmpooler__dlq__*')
dlq_keys.each do |dlq_key|
queue_type = dlq_key.sub('vmpooler__dlq__', '')
dlq_size = redis.zcard(dlq_key)
total_dlq_size += dlq_size
metrics['queues']['dlq'] ||= {}
metrics['queues']['dlq'][queue_type] = { 'size' => dlq_size }
end
end
# Error metrics
metrics['errors']['dlq_total_size'] = total_dlq_size
metrics['errors']['stuck_vm_count'] = total_stuck_vms
# Orphaned metadata count
orphaned_count = count_orphaned_metadata(redis)
metrics['errors']['orphaned_metadata_count'] = orphaned_count
metrics
end
def calculate_queue_ages(vms, timestamp_field, redis)
ages = []
vms.each do |vm|
begin
timestamp_str = redis.hget("vmpooler__vm__#{vm}", timestamp_field)
next unless timestamp_str
timestamp = Time.parse(timestamp_str)
age = (Time.now - timestamp).to_i
ages << age
rescue StandardError
# Skip VMs with invalid timestamps
end
end
ages
end
def count_orphaned_metadata(redis)
all_vm_keys = redis.keys('vmpooler__vm__*')
orphaned_count = 0
all_vm_keys.each do |vm_key|
begin
vm = vm_key.sub('vmpooler__vm__', '')
pool_name = redis.hget(vm_key, 'pool')
next unless pool_name
in_any_queue = redis.sismember("vmpooler__pending__#{pool_name}", vm) ||
redis.sismember("vmpooler__ready__#{pool_name}", vm) ||
redis.sismember("vmpooler__running__#{pool_name}", vm) ||
redis.sismember("vmpooler__completed__#{pool_name}", vm) ||
redis.sismember("vmpooler__discovered__#{pool_name}", vm) ||
redis.sismember("vmpooler__migrating__#{pool_name}", vm)
orphaned_count += 1 unless in_any_queue
rescue StandardError
# Skip on error
end
end
orphaned_count
end
def determine_health_status(metrics)
thresholds = health_thresholds
# Check DLQ size
dlq_size = metrics['errors']['dlq_total_size']
return 'unhealthy' if dlq_size > thresholds['dlq_max_critical']
# Check stuck VM count
stuck_count = metrics['errors']['stuck_vm_count']
return 'unhealthy' if stuck_count > thresholds['stuck_vm_max_critical']
# Check queue sizes
metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq'
pending_size = begin
queues['pending']['size']
rescue StandardError
0
end
ready_size = begin
queues['ready']['size']
rescue StandardError
0
end
return 'unhealthy' if pending_size > thresholds['pending_queue_max'] * 2
return 'unhealthy' if ready_size > thresholds['ready_queue_max'] * 2
end
# Check for degraded conditions
return 'degraded' if dlq_size > thresholds['dlq_max_warning']
return 'degraded' if stuck_count > thresholds['stuck_vm_max_warning']
metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq'
pending_size = begin
queues['pending']['size']
rescue StandardError
0
end
ready_size = begin
queues['ready']['size']
rescue StandardError
0
end
return 'degraded' if pending_size > thresholds['pending_queue_max']
return 'degraded' if ready_size > thresholds['ready_queue_max']
end
'healthy'
end
def log_health_summary(metrics, status)
summary = "[*] [health] Status: #{status.upcase}"
# Queue summary
total_pending = 0
total_ready = 0
total_completed = 0
metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq'
total_pending += begin
queues['pending']['size']
rescue StandardError
0
end
total_ready += begin
queues['ready']['size']
rescue StandardError
0
end
total_completed += begin
queues['completed']['size']
rescue StandardError
0
end
end
summary += " | Queues: P=#{total_pending} R=#{total_ready} C=#{total_completed}"
summary += " | DLQ=#{metrics['errors']['dlq_total_size']}"
summary += " | Stuck=#{metrics['errors']['stuck_vm_count']}"
summary += " | Orphaned=#{metrics['errors']['orphaned_metadata_count']}"
log_level = status == 'healthy' ? 's' : 'd'
$logger.log(log_level, summary)
end
def push_health_metrics(metrics, status)
# Push error metrics first
$metrics.gauge('vmpooler_health.dlq.total_size', metrics['errors']['dlq_total_size'])
$metrics.gauge('vmpooler_health.stuck_vms.count', metrics['errors']['stuck_vm_count'])
$metrics.gauge('vmpooler_health.orphaned_metadata.count', metrics['errors']['orphaned_metadata_count'])
# Push per-pool queue metrics
metrics['queues'].each do |pool_name, queues|
next if pool_name == 'dlq'
$metrics.gauge("vmpooler_health.queue.#{pool_name}.pending.size", queues['pending']['size'])
$metrics.gauge("vmpooler_health.queue.#{pool_name}.pending.oldest_age", queues['pending']['oldest_age'])
$metrics.gauge("vmpooler_health.queue.#{pool_name}.pending.stuck_count", queues['pending']['stuck_count'])
$metrics.gauge("vmpooler_health.queue.#{pool_name}.ready.size", queues['ready']['size'])
$metrics.gauge("vmpooler_health.queue.#{pool_name}.ready.oldest_age", queues['ready']['oldest_age'])
$metrics.gauge("vmpooler_health.queue.#{pool_name}.completed.size", queues['completed']['size'])
end
# Push DLQ metrics
metrics['queues']['dlq']&.each do |queue_type, dlq_metrics|
$metrics.gauge("vmpooler_health.dlq.#{queue_type}.size", dlq_metrics['size'])
end
# Push task metrics
$metrics.gauge('vmpooler_health.tasks.clone.active', metrics['tasks']['clone']['active'])
$metrics.gauge('vmpooler_health.tasks.ondemand.active', metrics['tasks']['ondemand']['active'])
$metrics.gauge('vmpooler_health.tasks.ondemand.pending', metrics['tasks']['ondemand']['pending'])
# Push status last (0=healthy, 1=degraded, 2=unhealthy)
status_value = { 'healthy' => 0, 'degraded' => 1, 'unhealthy' => 2 }[status] || 2
$metrics.gauge('vmpooler_health.status', status_value)
end
def create_vm_disk(pool_name, vm, disk_size, provider)
Thread.new do
begin
@ -1629,6 +2246,15 @@ module Vmpooler
redis.zrem('vmpooler__provisioning__request', request_id)
return
end
# Check if request was already marked as failed (e.g., by delete endpoint)
request_status = redis.hget("vmpooler__odrequest__#{request_id}", 'status')
if request_status == 'failed'
$logger.log('s', "Request '#{request_id}' already marked as failed, skipping VM creation")
redis.zrem('vmpooler__provisioning__request', request_id)
return
end
score = redis.zscore('vmpooler__provisioning__request', request_id)
requested = requested.split(',')
@ -1852,6 +2478,48 @@ module Vmpooler
check_ondemand_requests(check_loop_delay_min, check_loop_delay_max, check_loop_delay_decay)
end
# Queue purge thread
if purge_enabled?
purge_interval = ($config[:config] && $config[:config]['purge_interval']) || 3600 # default 1 hour
if !$threads['queue_purge']
$threads['queue_purge'] = Thread.new do
loop do
purge_stale_queue_entries
sleep(purge_interval)
end
end
elsif !$threads['queue_purge'].alive?
$logger.log('d', '[!] [queue_purge] worker thread died, restarting')
$threads['queue_purge'] = Thread.new do
loop do
purge_stale_queue_entries
sleep(purge_interval)
end
end
end
end
# Health check thread
if health_check_enabled?
health_interval = ($config[:config] && $config[:config]['health_check_interval']) || 300 # default 5 minutes
if !$threads['health_check']
$threads['health_check'] = Thread.new do
loop do
check_queue_health
sleep(health_interval)
end
end
elsif !$threads['health_check'].alive?
$logger.log('d', '[!] [health_check] worker thread died, restarting')
$threads['health_check'] = Thread.new do
loop do
check_queue_health
sleep(health_interval)
end
end
end
end
sleep(loop_delay)
unless maxloop == 0

View file

@ -1107,7 +1107,8 @@ EOT
context 'with no errors during cloning' do
before(:each) do
allow(metrics).to receive(:timing)
expect(metrics).to receive(:timing).with(/clone\./,/0/)
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with(/vmpooler_clone\./,/0/)
expect(provider).to receive(:create_vm).with(pool, String)
allow(provider).to receive(:get_vm_ip_address).and_return(1)
allow(subject).to receive(:get_domain_for_pool).and_return('example.com')
@ -1158,7 +1159,8 @@ EOT
context 'with a failure to get ip address after cloning' do
it 'should log a message that it completed being cloned' do
allow(metrics).to receive(:timing)
expect(metrics).to receive(:timing).with(/clone\./,/0/)
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with(/vmpooler_clone\./,/0/)
expect(provider).to receive(:create_vm).with(pool, String)
allow(provider).to receive(:get_vm_ip_address).and_return(nil)
@ -1217,7 +1219,8 @@ EOT
context 'with request_id' do
before(:each) do
allow(metrics).to receive(:timing)
expect(metrics).to receive(:timing).with(/clone\./,/0/)
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with(/vmpooler_clone\./,/0/)
expect(provider).to receive(:create_vm).with(pool, String)
allow(provider).to receive(:get_vm_ip_address).with(vm,pool).and_return(1)
allow(subject).to receive(:get_dns_plugin_class_name_for_pool).and_return(dns_plugin)
@ -1255,7 +1258,7 @@ EOT
resolv = class_double("Resolv").as_stubbed_const(:transfer_nested_constants => true)
expect(subject).to receive(:generate_and_check_hostname).exactly(3).times.and_return([vm_name, true]) #skip this, make it available all times
expect(resolv).to receive(:getaddress).exactly(3).times.and_return("1.2.3.4")
expect(metrics).to receive(:increment).with("errors.staledns.#{pool}").exactly(3).times
expect(metrics).to receive(:increment).with("vmpooler_errors.staledns.#{pool}").exactly(3).times
expect{subject._clone_vm(pool,provider,dns_plugin)}.to raise_error(/Unable to generate a unique hostname after/)
end
it 'should be successful if DNS does not exist' do
@ -1353,7 +1356,8 @@ EOT
it 'should emit a timing metric' do
allow(subject).to receive(:get_vm_usage_labels)
allow(metrics).to receive(:timing)
expect(metrics).to receive(:timing).with("destroy.#{pool}", String)
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with("vmpooler_destroy.#{pool}", String)
subject._destroy_vm(vm,pool,provider,dns_plugin)
end
@ -5174,6 +5178,44 @@ EOT
end
end
context 'when request is already marked as failed' do
let(:request_string) { "#{pool}:#{pool}:1" }
before(:each) do
redis_connection_pool.with do |redis|
create_ondemand_request_for_test(request_id, current_time.to_i, request_string, redis)
set_ondemand_request_status(request_id, 'failed', redis)
end
end
it 'logs that the request is already failed' do
redis_connection_pool.with do |redis|
expect(logger).to receive(:log).with('s', "Request '#{request_id}' already marked as failed, skipping VM creation")
subject.create_ondemand_vms(request_id, redis)
end
end
it 'removes the request from provisioning__request queue' do
redis_connection_pool.with do |redis|
subject.create_ondemand_vms(request_id, redis)
expect(redis.zscore('vmpooler__provisioning__request', request_id)).to be_nil
end
end
it 'does not create VM tasks' do
redis_connection_pool.with do |redis|
subject.create_ondemand_vms(request_id, redis)
expect(redis.zcard('vmpooler__odcreate__task')).to eq(0)
end
end
it 'does not add to provisioning__processing queue' do
redis_connection_pool.with do |redis|
subject.create_ondemand_vms(request_id, redis)
expect(redis.zscore('vmpooler__provisioning__processing', request_id)).to be_nil
end
end
end
context 'with a request that has data' do
let(:request_string) { "#{pool}:#{pool}:1" }
before(:each) do

View file

@ -0,0 +1,497 @@
# frozen_string_literal: true
require 'spec_helper'
require 'vmpooler/pool_manager'
describe 'Vmpooler::PoolManager - Queue Reliability Features' do
let(:logger) { MockLogger.new }
let(:redis_connection_pool) { ConnectionPool.new(size: 1) { redis } }
let(:metrics) { Vmpooler::Metrics::DummyStatsd.new }
let(:config) { YAML.load(<<~EOT
---
:config:
task_limit: 10
vm_checktime: 1
vm_lifetime: 12
prefix: 'pooler-'
dlq_enabled: true
dlq_ttl: 168
dlq_max_entries: 100
purge_enabled: true
purge_dry_run: false
max_pending_age: 7200
max_ready_age: 86400
max_completed_age: 3600
health_check_enabled: true
health_check_interval: 300
health_thresholds:
pending_queue_max: 100
ready_queue_max: 500
dlq_max_warning: 100
dlq_max_critical: 1000
stuck_vm_age_threshold: 7200
:providers:
:dummy: {}
:pools:
- name: 'test-pool'
size: 5
provider: 'dummy'
EOT
)
}
subject { Vmpooler::PoolManager.new(config, logger, redis_connection_pool, metrics) }
describe 'Dead-Letter Queue (DLQ)' do
let(:vm) { 'vm-abc123' }
let(:pool) { 'test-pool' }
let(:error_class) { 'StandardError' }
let(:error_message) { 'template does not exist' }
let(:request_id) { 'req-123' }
let(:pool_alias) { 'test-alias' }
before(:each) do
redis_connection_pool.with do |redis_connection|
allow(redis_connection).to receive(:zadd)
allow(redis_connection).to receive(:zcard).and_return(0)
allow(redis_connection).to receive(:expire)
end
end
describe '#dlq_enabled?' do
it 'returns true when dlq_enabled is true in config' do
expect(subject.dlq_enabled?).to be true
end
it 'returns false when dlq_enabled is false in config' do
config[:config]['dlq_enabled'] = false
expect(subject.dlq_enabled?).to be false
end
end
describe '#dlq_ttl' do
it 'returns configured TTL' do
expect(subject.dlq_ttl).to eq(168)
end
it 'returns default TTL when not configured' do
config[:config].delete('dlq_ttl')
expect(subject.dlq_ttl).to eq(168)
end
end
describe '#dlq_max_entries' do
it 'returns configured max entries' do
expect(subject.dlq_max_entries).to eq(100)
end
it 'returns default max entries when not configured' do
config[:config].delete('dlq_max_entries')
expect(subject.dlq_max_entries).to eq(10000)
end
end
describe '#move_to_dlq' do
context 'when DLQ is enabled' do
it 'adds entry to DLQ sorted set' do
redis_connection_pool.with do |redis_connection|
dlq_key = 'vmpooler__dlq__pending'
expect(redis_connection).to receive(:zadd).with(dlq_key, anything, anything)
expect(redis_connection).to receive(:expire).with(dlq_key, anything)
subject.move_to_dlq(vm, pool, 'pending', error_class, error_message,
redis_connection, request_id: request_id, pool_alias: pool_alias)
end
end
it 'includes error details in DLQ entry' do
redis_connection_pool.with do |redis_connection|
expect(redis_connection).to receive(:zadd) do |_key, _score, entry|
expect(entry).to include(vm)
expect(entry).to include(error_message)
expect(entry).to include(error_class)
end
subject.move_to_dlq(vm, pool, 'pending', error_class, error_message, redis_connection)
end
end
it 'increments DLQ metrics' do
redis_connection_pool.with do |redis_connection|
expect(metrics).to receive(:increment).with('vmpooler_dlq.pending.count')
subject.move_to_dlq(vm, pool, 'pending', error_class, error_message, redis_connection)
end
end
it 'enforces max entries limit' do
redis_connection_pool.with do |redis_connection|
allow(redis_connection).to receive(:zcard).and_return(150)
expect(redis_connection).to receive(:zremrangebyrank).with(anything, 0, 49)
subject.move_to_dlq(vm, pool, 'pending', error_class, error_message, redis_connection)
end
end
end
context 'when DLQ is disabled' do
before { config[:config]['dlq_enabled'] = false }
it 'does not add entry to DLQ' do
redis_connection_pool.with do |redis_connection|
expect(redis_connection).not_to receive(:zadd)
subject.move_to_dlq(vm, pool, 'pending', error_class, error_message, redis_connection)
end
end
end
end
end
describe 'Auto-Purge' do
describe '#purge_enabled?' do
it 'returns true when purge_enabled is true in config' do
expect(subject.purge_enabled?).to be true
end
it 'returns false when purge_enabled is false in config' do
config[:config]['purge_enabled'] = false
expect(subject.purge_enabled?).to be false
end
end
describe '#purge_dry_run?' do
it 'returns false when purge_dry_run is false in config' do
expect(subject.purge_dry_run?).to be false
end
it 'returns true when purge_dry_run is true in config' do
config[:config]['purge_dry_run'] = true
expect(subject.purge_dry_run?).to be true
end
end
describe '#max_pending_age' do
it 'returns configured max age' do
expect(subject.max_pending_age).to eq(7200)
end
it 'returns default max age when not configured' do
config[:config].delete('max_pending_age')
expect(subject.max_pending_age).to eq(7200)
end
end
describe '#purge_pending_queue' do
let(:pool) { 'test-pool' }
let(:old_vm) { 'vm-old' }
let(:new_vm) { 'vm-new' }
before(:each) do
redis_connection_pool.with do |redis_connection|
# Old VM (3 hours old, exceeds 2 hour threshold)
redis_connection.sadd("vmpooler__pending__#{pool}", old_vm)
redis_connection.hset("vmpooler__vm__#{old_vm}", 'clone', (Time.now - 10800).to_s)
# New VM (30 minutes old, within threshold)
redis_connection.sadd("vmpooler__pending__#{pool}", new_vm)
redis_connection.hset("vmpooler__vm__#{new_vm}", 'clone', (Time.now - 1800).to_s)
end
end
context 'when not in dry-run mode' do
it 'purges stale pending VMs' do
redis_connection_pool.with do |redis_connection|
purged_count = subject.purge_pending_queue(pool, redis_connection)
expect(purged_count).to eq(1)
expect(redis_connection.sismember("vmpooler__pending__#{pool}", old_vm)).to be false
expect(redis_connection.sismember("vmpooler__pending__#{pool}", new_vm)).to be true
end
end
it 'moves purged VMs to DLQ' do
redis_connection_pool.with do |redis_connection|
expect(subject).to receive(:move_to_dlq).with(
old_vm, pool, 'pending', 'Purge', anything, redis_connection, anything
)
subject.purge_pending_queue(pool, redis_connection)
end
end
it 'increments purge metrics' do
redis_connection_pool.with do |redis_connection|
expect(metrics).to receive(:increment).with("vmpooler_purge.pending.#{pool}.count")
subject.purge_pending_queue(pool, redis_connection)
end
end
end
context 'when in dry-run mode' do
before { config[:config]['purge_dry_run'] = true }
it 'detects but does not purge stale VMs' do
redis_connection_pool.with do |redis_connection|
purged_count = subject.purge_pending_queue(pool, redis_connection)
expect(purged_count).to eq(1)
expect(redis_connection.sismember("vmpooler__pending__#{pool}", old_vm)).to be true
end
end
it 'does not move to DLQ' do
redis_connection_pool.with do |redis_connection|
expect(subject).not_to receive(:move_to_dlq)
subject.purge_pending_queue(pool, redis_connection)
end
end
end
end
describe '#purge_ready_queue' do
let(:pool) { 'test-pool' }
let(:old_vm) { 'vm-old-ready' }
let(:new_vm) { 'vm-new-ready' }
before(:each) do
redis_connection_pool.with do |redis_connection|
# Old VM (25 hours old, exceeds 24 hour threshold)
redis_connection.sadd("vmpooler__ready__#{pool}", old_vm)
redis_connection.hset("vmpooler__vm__#{old_vm}", 'ready', (Time.now - 90000).to_s)
# New VM (2 hours old, within threshold)
redis_connection.sadd("vmpooler__ready__#{pool}", new_vm)
redis_connection.hset("vmpooler__vm__#{new_vm}", 'ready', (Time.now - 7200).to_s)
end
end
it 'moves stale ready VMs to completed queue' do
redis_connection_pool.with do |redis_connection|
purged_count = subject.purge_ready_queue(pool, redis_connection)
expect(purged_count).to eq(1)
expect(redis_connection.sismember("vmpooler__ready__#{pool}", old_vm)).to be false
expect(redis_connection.sismember("vmpooler__completed__#{pool}", old_vm)).to be true
expect(redis_connection.sismember("vmpooler__ready__#{pool}", new_vm)).to be true
end
end
end
describe '#purge_completed_queue' do
let(:pool) { 'test-pool' }
let(:old_vm) { 'vm-old-completed' }
let(:new_vm) { 'vm-new-completed' }
before(:each) do
redis_connection_pool.with do |redis_connection|
# Old VM (2 hours old, exceeds 1 hour threshold)
redis_connection.sadd("vmpooler__completed__#{pool}", old_vm)
redis_connection.hset("vmpooler__vm__#{old_vm}", 'destroy', (Time.now - 7200).to_s)
# New VM (30 minutes old, within threshold)
redis_connection.sadd("vmpooler__completed__#{pool}", new_vm)
redis_connection.hset("vmpooler__vm__#{new_vm}", 'destroy', (Time.now - 1800).to_s)
end
end
it 'removes stale completed VMs' do
redis_connection_pool.with do |redis_connection|
purged_count = subject.purge_completed_queue(pool, redis_connection)
expect(purged_count).to eq(1)
expect(redis_connection.sismember("vmpooler__completed__#{pool}", old_vm)).to be false
expect(redis_connection.sismember("vmpooler__completed__#{pool}", new_vm)).to be true
end
end
end
end
describe 'Health Checks' do
describe '#health_check_enabled?' do
it 'returns true when health_check_enabled is true in config' do
expect(subject.health_check_enabled?).to be true
end
it 'returns false when health_check_enabled is false in config' do
config[:config]['health_check_enabled'] = false
expect(subject.health_check_enabled?).to be false
end
end
describe '#health_thresholds' do
it 'returns configured thresholds' do
thresholds = subject.health_thresholds
expect(thresholds['pending_queue_max']).to eq(100)
expect(thresholds['stuck_vm_age_threshold']).to eq(7200)
end
it 'merges with defaults when partially configured' do
config[:config]['health_thresholds'] = { 'pending_queue_max' => 200 }
thresholds = subject.health_thresholds
expect(thresholds['pending_queue_max']).to eq(200)
expect(thresholds['ready_queue_max']).to eq(500) # default
end
end
describe '#calculate_queue_ages' do
let(:pool) { 'test-pool' }
let(:vm1) { 'vm-1' }
let(:vm2) { 'vm-2' }
let(:vm3) { 'vm-3' }
before(:each) do
redis_connection_pool.with do |redis_connection|
redis_connection.hset("vmpooler__vm__#{vm1}", 'clone', (Time.now - 3600).to_s)
redis_connection.hset("vmpooler__vm__#{vm2}", 'clone', (Time.now - 7200).to_s)
redis_connection.hset("vmpooler__vm__#{vm3}", 'clone', (Time.now - 1800).to_s)
end
end
it 'calculates ages for all VMs' do
redis_connection_pool.with do |redis_connection|
vms = [vm1, vm2, vm3]
ages = subject.calculate_queue_ages(vms, 'clone', redis_connection)
expect(ages.length).to eq(3)
expect(ages[0]).to be_within(5).of(3600)
expect(ages[1]).to be_within(5).of(7200)
expect(ages[2]).to be_within(5).of(1800)
end
end
it 'skips VMs with missing timestamps' do
redis_connection_pool.with do |redis_connection|
vms = [vm1, 'vm-nonexistent', vm3]
ages = subject.calculate_queue_ages(vms, 'clone', redis_connection)
expect(ages.length).to eq(2)
end
end
end
describe '#determine_health_status' do
let(:base_metrics) do
{
'queues' => {
'test-pool' => {
'pending' => { 'size' => 10, 'stuck_count' => 2 },
'ready' => { 'size' => 50 }
}
},
'errors' => {
'dlq_total_size' => 50,
'stuck_vm_count' => 2
}
}
end
it 'returns healthy when all metrics are within thresholds' do
status = subject.determine_health_status(base_metrics)
expect(status).to eq('healthy')
end
it 'returns degraded when DLQ size exceeds warning threshold' do
metrics = base_metrics.dup
metrics['errors']['dlq_total_size'] = 150
status = subject.determine_health_status(metrics)
expect(status).to eq('degraded')
end
it 'returns unhealthy when DLQ size exceeds critical threshold' do
metrics = base_metrics.dup
metrics['errors']['dlq_total_size'] = 1500
status = subject.determine_health_status(metrics)
expect(status).to eq('unhealthy')
end
it 'returns degraded when pending queue exceeds warning threshold' do
metrics = base_metrics.dup
metrics['queues']['test-pool']['pending']['size'] = 120
status = subject.determine_health_status(metrics)
expect(status).to eq('degraded')
end
it 'returns unhealthy when pending queue exceeds critical threshold' do
metrics = base_metrics.dup
metrics['queues']['test-pool']['pending']['size'] = 250
status = subject.determine_health_status(metrics)
expect(status).to eq('unhealthy')
end
it 'returns unhealthy when stuck VM count exceeds critical threshold' do
metrics = base_metrics.dup
metrics['errors']['stuck_vm_count'] = 60
status = subject.determine_health_status(metrics)
expect(status).to eq('unhealthy')
end
end
describe '#push_health_metrics' do
let(:metrics_data) do
{
'queues' => {
'test-pool' => {
'pending' => { 'size' => 10, 'oldest_age' => 3600, 'stuck_count' => 2 },
'ready' => { 'size' => 50, 'oldest_age' => 7200 },
'completed' => { 'size' => 5 }
}
},
'tasks' => {
'clone' => { 'active' => 3 },
'ondemand' => { 'active' => 2, 'pending' => 5 }
},
'errors' => {
'dlq_total_size' => 25,
'stuck_vm_count' => 2,
'orphaned_metadata_count' => 3
}
}
end
it 'pushes status metric' do
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with('vmpooler_health.status', 0)
subject.push_health_metrics(metrics_data, 'healthy')
end
it 'pushes error metrics' do
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with('vmpooler_health.dlq.total_size', 25)
expect(metrics).to receive(:gauge).with('vmpooler_health.stuck_vms.count', 2)
expect(metrics).to receive(:gauge).with('vmpooler_health.orphaned_metadata.count', 3)
subject.push_health_metrics(metrics_data, 'healthy')
end
it 'pushes per-pool queue metrics' do
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with('vmpooler_health.queue.test-pool.pending.size', 10)
expect(metrics).to receive(:gauge).with('vmpooler_health.queue.test-pool.pending.oldest_age', 3600)
expect(metrics).to receive(:gauge).with('vmpooler_health.queue.test-pool.pending.stuck_count', 2)
expect(metrics).to receive(:gauge).with('vmpooler_health.queue.test-pool.ready.size', 50)
subject.push_health_metrics(metrics_data, 'healthy')
end
it 'pushes task metrics' do
allow(metrics).to receive(:gauge)
expect(metrics).to receive(:gauge).with('vmpooler_health.tasks.clone.active', 3)
expect(metrics).to receive(:gauge).with('vmpooler_health.tasks.ondemand.active', 2)
expect(metrics).to receive(:gauge).with('vmpooler_health.tasks.ondemand.pending', 5)
subject.push_health_metrics(metrics_data, 'healthy')
end
end
end
end

92
vmpooler.yml.example Normal file
View file

@ -0,0 +1,92 @@
---
# VMPooler Configuration Example with Dead-Letter Queue, Auto-Purge, and Health Checks
# Redis Configuration
:redis:
server: 'localhost'
port: 6379
data_ttl: 168 # hours - how long to keep VM metadata in Redis
# Dead-Letter Queue (DLQ) Configuration
dlq_enabled: true
dlq_ttl: 168 # hours (7 days) - how long to keep DLQ entries
dlq_max_entries: 10000 # maximum entries per DLQ queue before trimming
# Application Configuration
:config:
# ... other existing config ...
# Dead-Letter Queue (DLQ) - Optional, defaults shown
dlq_enabled: false # Set to true to enable DLQ
dlq_ttl: 168 # hours (7 days)
dlq_max_entries: 10000 # per DLQ queue
# Auto-Purge Stale Queue Entries
purge_enabled: false # Set to true to enable auto-purge
purge_interval: 3600 # seconds (1 hour) - how often to run purge cycle
purge_dry_run: false # Set to true to log what would be purged without actually purging
# Auto-Purge Age Thresholds (in seconds)
max_pending_age: 7200 # 2 hours - VMs stuck in pending
max_ready_age: 86400 # 24 hours - VMs idle in ready queue
max_completed_age: 3600 # 1 hour - VMs in completed queue
max_orphaned_age: 86400 # 24 hours - orphaned VM metadata
max_request_age: 86400 # 24 hours - stale on-demand requests
# Health Checks
health_check_enabled: false # Set to true to enable health checks
health_check_interval: 300 # seconds (5 minutes) - how often to run health checks
# Health Check Thresholds
health_thresholds:
pending_queue_max: 100 # Warning threshold for pending queue size
ready_queue_max: 500 # Warning threshold for ready queue size
dlq_max_warning: 100 # Warning threshold for DLQ size
dlq_max_critical: 1000 # Critical threshold for DLQ size
stuck_vm_age_threshold: 7200 # 2 hours - age at which VM is considered "stuck"
stuck_vm_max_warning: 10 # Warning threshold for stuck VM count
stuck_vm_max_critical: 50 # Critical threshold for stuck VM count
# Pool Configuration
:pools:
- name: 'centos-7-x86_64'
size: 5
provider: 'vsphere'
# ... other pool settings ...
# Provider Configuration
:providers:
:vsphere:
server: 'vcenter.example.com'
username: 'vmpooler'
password: 'secret'
# ... other provider settings ...
# Example: Production Configuration
# For production use, you might want:
# :config:
# dlq_enabled: true
# dlq_ttl: 168 # Keep failed VMs for a week
#
# purge_enabled: true
# purge_interval: 1800 # Run every 30 minutes
# purge_dry_run: false
# max_pending_age: 3600 # Purge pending VMs after 1 hour
# max_ready_age: 172800 # Purge ready VMs after 2 days
#
# health_check_enabled: true
# health_check_interval: 300 # Check every 5 minutes
# Example: Development Configuration
# For development/testing, you might want:
# :config:
# dlq_enabled: true
# dlq_ttl: 24 # Keep failed VMs for a day
#
# purge_enabled: true
# purge_interval: 600 # Run every 10 minutes
# purge_dry_run: true # Test mode - log but don't actually purge
# max_pending_age: 1800 # More aggressive - 30 minutes
#
# health_check_enabled: true
# health_check_interval: 60 # Check every minute