Implement auto-scaling, rate-based provisioning, and queue monitoring

This commit is contained in:
Mahima Singh 2025-12-24 19:24:01 +05:30
parent 1a6b08ab81
commit 8abb389712
8 changed files with 881 additions and 3 deletions

View file

@ -3,7 +3,7 @@
module Vmpooler
class API < Sinatra::Base
# Load API components
%w[helpers dashboard v3 request_logger healthcheck].each do |lib|
%w[helpers dashboard v3 request_logger healthcheck queue_monitor].each do |lib|
require "vmpooler/api/#{lib}"
end
# Load dashboard components
@ -53,6 +53,7 @@ module Vmpooler
use Vmpooler::Dashboard
use Vmpooler::API::Dashboard
use Vmpooler::API::V3
use Vmpooler::API::QueueMonitor
end
# Get thee started O WebServer

View file

@ -0,0 +1,197 @@
# frozen_string_literal: true
require 'vmpooler/api'
module Vmpooler
class API
# Queue monitoring endpoint for tracking pool queue depths and health
class QueueMonitor < Sinatra::Base
api_version = '1'
api_prefix = "/api/v#{api_version}"
helpers do
include Vmpooler::API::Helpers
end
# Get queue status for all pools or a specific pool
get "#{api_prefix}/queue/status/?" do
content_type :json
result = {
ok: true,
timestamp: Time.now.to_i,
pools: {}
}
pool_filter = params[:pool]
pools = pool_filter ? [pool_filter] : list_pools
pools.each do |pool_name|
begin
metrics = get_queue_metrics(pool_name)
result[:pools][pool_name] = metrics
rescue StandardError => e
result[:pools][pool_name] = {
error: e.message
}
end
end
JSON.pretty_generate(result)
end
# Get detailed queue metrics for a specific pool
get "#{api_prefix}/queue/status/:pool/?" do
content_type :json
pool_name = params[:pool]
unless pool_exists?(pool_name)
halt 404, JSON.pretty_generate({
ok: false,
error: "Pool '#{pool_name}' not found"
})
end
begin
metrics = get_queue_metrics(pool_name)
result = {
ok: true,
timestamp: Time.now.to_i,
pool: pool_name,
metrics: metrics
}
JSON.pretty_generate(result)
rescue StandardError => e
status 500
JSON.pretty_generate({
ok: false,
error: e.message
})
end
end
# Helper method to get queue metrics for a pool
def get_queue_metrics(pool_name)
redis = redis_connection_pool
metrics = redis.with_metrics do |conn|
{
ready: conn.llen("vmpooler__ready__#{pool_name}") || 0,
running: conn.scard("vmpooler__running__#{pool_name}") || 0,
pending: conn.llen("vmpooler__pending__#{pool_name}") || 0
}
end
# Get pending requests count
pending_requests = get_pending_requests_for_pool(pool_name, redis)
# Get oldest pending request age
oldest_pending = get_oldest_pending_request(pool_name, redis)
# Get pool configuration
pool_config = get_pool_config(pool_name, redis)
# Calculate health metrics
total_vms = metrics[:ready] + metrics[:running] + metrics[:pending]
ready_percentage = total_vms.positive? ? (metrics[:ready].to_f / total_vms * 100).round(2) : 0
capacity_percentage = pool_config[:size].positive? ? ((metrics[:ready] + metrics[:pending]).to_f / pool_config[:size] * 100).round(2) : 0
# Determine health status
health_status = determine_health_status(metrics, pending_requests, pool_config)
{
ready: metrics[:ready],
running: metrics[:running],
pending: metrics[:pending],
total: total_vms,
pending_requests: pending_requests,
oldest_pending_age_seconds: oldest_pending,
pool_size: pool_config[:size],
ready_percentage: ready_percentage,
capacity_percentage: capacity_percentage,
health: health_status
}
end
# Get pending requests count for a pool
def get_pending_requests_for_pool(pool_name, redis)
redis.with_metrics do |conn|
request_keys = conn.keys("vmpooler__request__*")
pending_count = 0
request_keys.each do |key|
request_data = conn.hgetall(key)
if request_data['status'] == 'pending' && request_data.key?(pool_name)
pending_count += request_data[pool_name].to_i
end
end
pending_count
end
end
# Get age of oldest pending request in seconds
def get_oldest_pending_request(pool_name, redis)
redis.with_metrics do |conn|
request_keys = conn.keys("vmpooler__request__*")
oldest_timestamp = nil
request_keys.each do |key|
request_data = conn.hgetall(key)
if request_data['status'] == 'pending' && request_data.key?(pool_name)
requested_at = request_data['requested_at']&.to_i
oldest_timestamp = requested_at if requested_at && (oldest_timestamp.nil? || requested_at < oldest_timestamp)
end
end
oldest_timestamp ? Time.now.to_i - oldest_timestamp : 0
end
end
# Get pool configuration from Redis
def get_pool_config(pool_name, redis)
redis.with_metrics do |conn|
config = conn.hgetall("vmpooler__pool__#{pool_name}")
{
size: config['size']&.to_i || 0,
template: config['template'] || 'unknown'
}
end
end
# Determine health status based on metrics
def determine_health_status(metrics, pending_requests, pool_config)
if metrics[:ready].zero? && pending_requests.positive?
'critical' # No ready VMs and users are waiting
elsif metrics[:ready].zero? && metrics[:pending].positive?
'warning' # No ready VMs but some are being created
elsif metrics[:ready] < (pool_config[:size] * 0.2).ceil
'warning' # Less than 20% ready VMs
elsif pending_requests.positive?
'warning' # Users waiting for VMs
else
'healthy'
end
end
# Check if pool exists
def pool_exists?(pool_name)
redis = redis_connection_pool
redis.with_metrics do |conn|
conn.sismember('vmpooler__pools', pool_name)
end
end
# Get list of all pools
def list_pools
redis = redis_connection_pool
redis.with_metrics do |conn|
conn.smembers('vmpooler__pools') || []
end
end
end
end
end

View file

@ -3,6 +3,8 @@
require 'vmpooler/dns'
require 'vmpooler/providers'
require 'vmpooler/util/parsing'
require 'vmpooler/pool_manager/auto_scaler'
require 'vmpooler/pool_manager/rate_provisioner'
require 'spicy-proton'
require 'resolv' # ruby standard lib
@ -41,6 +43,12 @@ module Vmpooler
# Name generator for generating host names
@name_generator = Spicy::Proton.new
# Auto-scaler for dynamic pool sizing
@auto_scaler = AutoScaler.new(@redis, logger, metrics)
# Rate provisioner for dynamic clone concurrency
@rate_provisioner = RateProvisioner.new(@redis, logger, metrics)
# load specified providers from config file
load_used_providers
@ -1457,6 +1465,17 @@ module Vmpooler
dns_plugin = get_dns_plugin_class_for_pool(pool_name)
# Get the pool configuration to check for rate-based provisioning
pool_config = $config[:pools].find { |p| p['name'] == pool_name }
# Determine clone concurrency based on demand
# Use rate-based provisioning if enabled, otherwise use default task_limit
max_concurrent_clones = if pool_config && @rate_provisioner.enabled_for_pool?(pool_config)
@rate_provisioner.get_clone_concurrency(pool_config, pool_name)
else
$config[:config]['task_limit'].to_i
end
unless pool_size == 0
if redis.get("vmpooler__empty__#{pool_name}")
redis.del("vmpooler__empty__#{pool_name}") unless ready == 0
@ -1467,7 +1486,8 @@ module Vmpooler
end
(pool_size - total.to_i).times do
if redis.get('vmpooler__tasks__clone').to_i < $config[:config]['task_limit'].to_i
# Use dynamic concurrency limit from rate provisioner
if redis.get('vmpooler__tasks__clone').to_i < max_concurrent_clones
begin
redis.incr('vmpooler__tasks__clone')
pool_check_response[:cloned_vms] += 1
@ -1526,6 +1546,9 @@ module Vmpooler
# Additionally, a pool will drain ready and pending instances
update_clone_target(pool)
# Apply auto-scaling if enabled for this pool
@auto_scaler.apply_auto_scaling(pool)
repopulate_pool_vms(pool['name'], provider, pool_check_response, pool['size'])
# Remove VMs in excess of the configured pool size

View file

@ -0,0 +1,160 @@
# frozen_string_literal: true
module Vmpooler
class PoolManager
# Auto-scaling module for dynamically adjusting pool sizes based on demand
class AutoScaler
attr_reader :logger, :redis, :metrics
def initialize(redis_connection_pool, logger, metrics)
@redis = redis_connection_pool
@logger = logger
@metrics = metrics
@last_scale_time = Concurrent::Hash.new
end
# Check if auto-scaling is enabled for a pool
def enabled_for_pool?(pool)
return false unless pool['auto_scale']
return false unless pool['auto_scale']['enabled'] == true
true
end
# Calculate the target pool size based on current metrics
def calculate_target_size(pool, pool_name)
auto_scale_config = pool['auto_scale']
min_size = auto_scale_config['min_size'] || pool['size']
max_size = auto_scale_config['max_size'] || pool['size'] * 5
scale_up_threshold = auto_scale_config['scale_up_threshold'] || 20
scale_down_threshold = auto_scale_config['scale_down_threshold'] || 80
cooldown_period = auto_scale_config['cooldown_period'] || 300
# Check cooldown period
last_scale = @last_scale_time[pool_name]
if last_scale && (Time.now - last_scale) < cooldown_period
logger.log('d', "[~] [#{pool_name}] Auto-scaling in cooldown period (#{cooldown_period}s)")
return pool['size']
end
# Get current pool metrics
metrics = get_pool_metrics(pool_name)
current_size = pool['size']
ready_count = metrics[:ready]
running_count = metrics[:running]
pending_count = metrics[:pending]
# Calculate total VMs (ready + running + pending)
total_vms = ready_count + running_count + pending_count
total_vms = 1 if total_vms.zero? # Avoid division by zero
# Calculate ready percentage
ready_percentage = (ready_count.to_f / total_vms * 100).round(2)
logger.log('d', "[~] [#{pool_name}] Metrics: ready=#{ready_count}, running=#{running_count}, pending=#{pending_count}, ready%=#{ready_percentage}")
# Determine if we need to scale
if ready_percentage < scale_up_threshold
# Scale up: increase pool size
new_size = calculate_scale_up_size(current_size, max_size, ready_percentage, scale_up_threshold)
if new_size > current_size
logger.log('s', "[+] [#{pool_name}] Scaling UP: #{current_size} -> #{new_size} (ready: #{ready_percentage}% < #{scale_up_threshold}%)")
@last_scale_time[pool_name] = Time.now
metrics.increment("scale_up.#{pool_name}")
return new_size
end
elsif ready_percentage > scale_down_threshold
# Scale down: decrease pool size (only if no pending requests)
pending_requests = get_pending_requests_count(pool_name)
if pending_requests.zero?
new_size = calculate_scale_down_size(current_size, min_size, ready_percentage, scale_down_threshold)
if new_size < current_size
logger.log('s', "[-] [#{pool_name}] Scaling DOWN: #{current_size} -> #{new_size} (ready: #{ready_percentage}% > #{scale_down_threshold}%)")
@last_scale_time[pool_name] = Time.now
metrics.increment("scale_down.#{pool_name}")
return new_size
end
else
logger.log('d', "[~] [#{pool_name}] Not scaling down: #{pending_requests} pending requests")
end
end
current_size
end
# Get current pool metrics from Redis
def get_pool_metrics(pool_name)
@redis.with_metrics do |redis|
{
ready: redis.llen("vmpooler__ready__#{pool_name}") || 0,
running: redis.scard("vmpooler__running__#{pool_name}") || 0,
pending: redis.llen("vmpooler__pending__#{pool_name}") || 0
}
end
end
# Get count of pending VM requests
def get_pending_requests_count(pool_name)
@redis.with_metrics do |redis|
# Check for pending requests in request queue
request_keys = redis.keys("vmpooler__request__*")
pending_count = 0
request_keys.each do |key|
request_data = redis.hgetall(key)
if request_data['status'] == 'pending' && request_data.key?(pool_name)
pending_count += request_data[pool_name].to_i
end
end
pending_count
end
end
# Calculate new size when scaling up
def calculate_scale_up_size(current_size, max_size, ready_percentage, threshold)
# Aggressive scaling when very low on ready VMs
if ready_percentage < threshold / 2
# Double the size or add 10, whichever is larger
new_size = [current_size * 2, current_size + 10].max
else
# Moderate scaling: increase by 50%
new_size = (current_size * 1.5).ceil
end
[new_size, max_size].min
end
# Calculate new size when scaling down
def calculate_scale_down_size(current_size, min_size, ready_percentage, threshold)
# Conservative scaling down: only reduce by 25%
new_size = (current_size * 0.75).floor
[new_size, min_size].max
end
# Apply auto-scaling to a pool
def apply_auto_scaling(pool)
return unless enabled_for_pool?(pool)
pool_name = pool['name']
target_size = calculate_target_size(pool, pool_name)
if target_size != pool['size']
pool['size'] = target_size
update_pool_size_in_redis(pool_name, target_size)
end
rescue StandardError => e
logger.log('s', "[!] [#{pool_name}] Auto-scaling error: #{e.message}")
logger.log('s', e.backtrace.join("\n")) if logger.level == 'debug'
end
# Update pool size in Redis
def update_pool_size_in_redis(pool_name, new_size)
@redis.with_metrics do |redis|
redis.hset("vmpooler__pool__#{pool_name}", 'size', new_size)
end
end
end
end
end

View file

@ -0,0 +1,95 @@
# frozen_string_literal: true
module Vmpooler
class PoolManager
# Rate-based provisioning for adjusting clone concurrency based on demand
class RateProvisioner
attr_reader :logger, :redis, :metrics
def initialize(redis_connection_pool, logger, metrics)
@redis = redis_connection_pool
@logger = logger
@metrics = metrics
@current_mode = Concurrent::Hash.new # Track provisioning mode per pool
end
# Check if rate-based provisioning is enabled for a pool
def enabled_for_pool?(pool)
return false unless pool['rate_provisioning']
return false unless pool['rate_provisioning']['enabled'] == true
true
end
# Get the appropriate clone concurrency based on current demand
def get_clone_concurrency(pool, pool_name)
return pool['clone_target_concurrency'] || 2 unless enabled_for_pool?(pool)
rate_config = pool['rate_provisioning']
normal_concurrency = rate_config['normal_concurrency'] || 2
high_demand_concurrency = rate_config['high_demand_concurrency'] || 5
threshold = rate_config['queue_depth_threshold'] || 5
# Get current queue metrics
ready_count = get_ready_count(pool_name)
pending_requests = get_pending_requests_count(pool_name)
# Determine if we're in high-demand mode
# High demand = many pending requests OR very few ready VMs
high_demand = (pending_requests >= threshold) || (ready_count.zero? && pending_requests.positive?)
new_mode = high_demand ? :high_demand : :normal
old_mode = @current_mode[pool_name] || :normal
# Log mode changes
if new_mode != old_mode
concurrency = new_mode == :high_demand ? high_demand_concurrency : normal_concurrency
logger.log('s', "[~] [#{pool_name}] Provisioning mode: #{old_mode} -> #{new_mode} (concurrency: #{concurrency}, pending: #{pending_requests}, ready: #{ready_count})")
@current_mode[pool_name] = new_mode
metrics.increment("provisioning_mode_change.#{pool_name}.#{new_mode}")
end
new_mode == :high_demand ? high_demand_concurrency : normal_concurrency
end
# Get count of ready VMs
def get_ready_count(pool_name)
@redis.with_metrics do |redis|
redis.llen("vmpooler__ready__#{pool_name}") || 0
end
end
# Get count of pending VM requests
def get_pending_requests_count(pool_name)
@redis.with_metrics do |redis|
# Check for pending requests in request queue
request_keys = redis.keys("vmpooler__request__*")
pending_count = 0
request_keys.each do |key|
request_data = redis.hgetall(key)
if request_data['status'] == 'pending' && request_data.key?(pool_name)
pending_count += request_data[pool_name].to_i
end
end
# Also check the queue itself for waiting allocations
queue_depth = redis.llen("vmpooler__pending__#{pool_name}") || 0
[pending_count, queue_depth].max
end
end
# Get current provisioning mode for a pool
def get_current_mode(pool_name)
@current_mode[pool_name] || :normal
end
# Force reset to normal mode (useful for testing or recovery)
def reset_to_normal(pool_name)
@current_mode[pool_name] = :normal
logger.log('d', "[~] [#{pool_name}] Provisioning mode reset to normal")
end
end
end
end