Add circuit breaker and adaptive timeout for provider resilience

This commit is contained in:
Mahima Singh 2025-12-26 17:01:38 +05:30
parent 76eb62577b
commit efc31a3280
6 changed files with 566 additions and 0 deletions

View file

@ -0,0 +1,130 @@
# frozen_string_literal: true
module Vmpooler
# Adaptive timeout that adjusts based on observed connection performance
# to optimize between responsiveness and reliability.
#
# Tracks recent connection durations and adjusts timeout to p95 + buffer,
# reducing timeout on failures to fail faster during outages.
class AdaptiveTimeout
attr_reader :current_timeout
# Initialize adaptive timeout
#
# @param name [String] Name for logging (e.g., "vsphere_connections")
# @param logger [Object] Logger instance
# @param metrics [Object] Metrics instance
# @param min [Integer] Minimum timeout in seconds
# @param max [Integer] Maximum timeout in seconds
# @param initial [Integer] Initial timeout in seconds
# @param max_samples [Integer] Number of recent samples to track
def initialize(name:, logger:, metrics:, min: 5, max: 60, initial: 30, max_samples: 100)
@name = name
@logger = logger
@metrics = metrics
@min_timeout = min
@max_timeout = max
@current_timeout = initial
@recent_durations = []
@max_samples = max_samples
@mutex = Mutex.new
end
# Get current timeout value (thread-safe)
# @return [Integer] Current timeout in seconds
def timeout
@mutex.synchronize { @current_timeout }
end
# Record a successful operation duration
# @param duration [Float] Duration in seconds
def record_success(duration)
@mutex.synchronize do
@recent_durations << duration
@recent_durations.shift if @recent_durations.size > @max_samples
# Adjust timeout based on recent performance
adjust_timeout if @recent_durations.size >= 10
end
end
# Record a failure (timeout or error)
# Reduces current timeout to fail faster on subsequent attempts
def record_failure
@mutex.synchronize do
old_timeout = @current_timeout
# Reduce timeout by 20% on failure, but don't go below minimum
@current_timeout = [(@current_timeout * 0.8).round, @min_timeout].max
if old_timeout != @current_timeout
@logger.log('d', "[*] [adaptive_timeout] '#{@name}' reduced timeout #{old_timeout}s → #{@current_timeout}s after failure")
@metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout)
end
end
end
# Reset to initial timeout (useful after recovery)
def reset
@mutex.synchronize do
@recent_durations.clear
old_timeout = @current_timeout
@current_timeout = [@max_timeout, 30].min
@logger.log('d', "[*] [adaptive_timeout] '#{@name}' reset timeout #{old_timeout}s → #{@current_timeout}s")
@metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout)
end
end
# Get statistics about recent durations
# @return [Hash] Statistics including min, max, avg, p95
def stats
@mutex.synchronize do
return { samples: 0 } if @recent_durations.empty?
sorted = @recent_durations.sort
{
samples: sorted.size,
min: sorted.first.round(2),
max: sorted.last.round(2),
avg: (sorted.sum / sorted.size.to_f).round(2),
p50: percentile(sorted, 0.50).round(2),
p95: percentile(sorted, 0.95).round(2),
p99: percentile(sorted, 0.99).round(2),
current_timeout: @current_timeout
}
end
end
private
def adjust_timeout
return if @recent_durations.empty?
sorted = @recent_durations.sort
p95_duration = percentile(sorted, 0.95)
# Set timeout to p95 + 50% buffer, bounded by min/max
new_timeout = (p95_duration * 1.5).round
new_timeout = [[new_timeout, @min_timeout].max, @max_timeout].min
# Only adjust if change is significant (> 5 seconds)
if (new_timeout - @current_timeout).abs > 5
old_timeout = @current_timeout
@current_timeout = new_timeout
@logger.log('d', "[*] [adaptive_timeout] '#{@name}' adjusted timeout #{old_timeout}s → #{@current_timeout}s (p95: #{p95_duration.round(2)}s)")
@metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout)
@metrics.gauge("adaptive_timeout.p95.#{@name}", p95_duration)
end
end
def percentile(sorted_array, percentile)
return 0 if sorted_array.empty?
index = (sorted_array.size * percentile).ceil - 1
index = [index, 0].max
index = [index, sorted_array.size - 1].min
sorted_array[index]
end
end
end

View file

@ -0,0 +1,193 @@
# frozen_string_literal: true
module Vmpooler
# Circuit breaker pattern implementation to prevent cascading failures
# when a provider becomes unresponsive or experiences repeated failures.
#
# States:
# - CLOSED: Normal operation, requests flow through
# - OPEN: Provider is failing, reject requests immediately (fail fast)
# - HALF_OPEN: Testing if provider has recovered with limited requests
class CircuitBreaker
STATES = [:closed, :open, :half_open].freeze
class CircuitOpenError < StandardError; end
attr_reader :state, :failure_count, :success_count
# Initialize a new circuit breaker
#
# @param name [String] Name for logging/metrics (e.g., "vsphere_provider")
# @param logger [Object] Logger instance
# @param metrics [Object] Metrics instance
# @param failure_threshold [Integer] Number of failures before opening circuit
# @param timeout [Integer] Seconds to wait in open state before testing (half-open)
# @param half_open_attempts [Integer] Number of successful test requests needed to close
def initialize(name:, logger:, metrics:, failure_threshold: 5, timeout: 30, half_open_attempts: 3)
@name = name
@logger = logger
@metrics = metrics
@failure_threshold = failure_threshold
@timeout = timeout
@half_open_attempts = half_open_attempts
@state = :closed
@failure_count = 0
@success_count = 0
@last_failure_time = nil
@mutex = Mutex.new
end
# Execute a block with circuit breaker protection
#
# @yield Block to execute if circuit allows
# @return Result of the block
# @raise CircuitOpenError if circuit is open and timeout hasn't elapsed
def call
check_state
begin
result = yield
on_success
result
rescue StandardError => e
on_failure(e)
raise
end
end
# Check if circuit allows requests
# @return [Boolean] true if circuit is closed or half-open
def allow_request?
@mutex.synchronize do
case @state
when :closed
true
when :half_open
true
when :open
if should_attempt_reset?
true
else
false
end
end
end
end
# Get current circuit breaker status
# @return [Hash] Status information
def status
@mutex.synchronize do
{
name: @name,
state: @state,
failure_count: @failure_count,
success_count: @success_count,
last_failure_time: @last_failure_time,
next_retry_time: next_retry_time
}
end
end
private
def check_state
@mutex.synchronize do
case @state
when :open
if should_attempt_reset?
transition_to_half_open
else
time_remaining = (@timeout - (Time.now - @last_failure_time)).round(1)
raise CircuitOpenError, "Circuit breaker '#{@name}' is open (#{@failure_count} failures, retry in #{time_remaining}s)"
end
when :half_open
# Allow limited requests through for testing
when :closed
# Normal operation
end
end
end
def should_attempt_reset?
return false unless @last_failure_time
Time.now - @last_failure_time >= @timeout
end
def next_retry_time
return nil unless @last_failure_time && @state == :open
@last_failure_time + @timeout
end
def on_success
@mutex.synchronize do
case @state
when :closed
# Reset failure count on success in closed state
@failure_count = 0 if @failure_count > 0
when :half_open
@success_count += 1
@failure_count = 0
@logger.log('d', "[+] [circuit_breaker] '#{@name}' successful test request (#{@success_count}/#{@half_open_attempts})")
if @success_count >= @half_open_attempts
transition_to_closed
end
when :open
# Should not happen, but reset if we somehow get a success
transition_to_closed
end
end
end
def on_failure(error)
@mutex.synchronize do
@failure_count += 1
@last_failure_time = Time.now
case @state
when :closed
@logger.log('d', "[!] [circuit_breaker] '#{@name}' failure #{@failure_count}/#{@failure_threshold}: #{error.class}")
if @failure_count >= @failure_threshold
transition_to_open
end
when :half_open
@logger.log('d', "[!] [circuit_breaker] '#{@name}' failed during half-open test")
transition_to_open
when :open
# Already open, just log
@logger.log('d', "[!] [circuit_breaker] '#{@name}' additional failure while open")
end
end
end
def transition_to_open
@state = :open
@success_count = 0
@logger.log('s', "[!] [circuit_breaker] '#{@name}' OPENED after #{@failure_count} failures (will retry in #{@timeout}s)")
@metrics.increment("circuit_breaker.opened.#{@name}")
@metrics.gauge("circuit_breaker.state.#{@name}", 1) # 1 = open
end
def transition_to_half_open
@state = :half_open
@success_count = 0
@failure_count = 0
@logger.log('s', "[*] [circuit_breaker] '#{@name}' HALF-OPEN, testing provider health")
@metrics.increment("circuit_breaker.half_open.#{@name}")
@metrics.gauge("circuit_breaker.state.#{@name}", 0.5) # 0.5 = half-open
end
def transition_to_closed
@state = :closed
@failure_count = 0
@success_count = 0
@logger.log('s', "[+] [circuit_breaker] '#{@name}' CLOSED, provider recovered")
@metrics.increment("circuit_breaker.closed.#{@name}")
@metrics.gauge("circuit_breaker.state.#{@name}", 0) # 0 = closed
end
end
end

View file

@ -34,6 +34,34 @@ module Vmpooler
end
end
end
# Get connection pool health status
# @return [Hash] Health status including utilization and queue depth
def health_status
{
size: @size,
available: @available.length,
in_use: @size - @available.length,
utilization: ((@size - @available.length).to_f / @size * 100).round(2),
waiting_threads: (@queue.respond_to?(:length) ? @queue.length : 0),
state: determine_health_state
}
end
private
def determine_health_state
utilization = ((@size - @available.length).to_f / @size * 100)
waiting = @queue.respond_to?(:length) ? @queue.length : 0
if utilization >= 90 || waiting > 5
:critical # Pool exhausted or many waiting threads
elsif utilization >= 70 || waiting > 2
:warning # Pool under stress
else
:healthy # Normal operation
end
end
end
end
end

View file

@ -1287,6 +1287,63 @@ module Vmpooler
$metrics.gauge('vmpooler_health.status', status_value)
end
# Monitor connection pool health across all providers
def monitor_connection_pools
return unless $providers
$providers.each do |provider_name, provider|
begin
next unless provider.respond_to?(:connection_pool)
pool = provider.connection_pool
next unless pool.respond_to?(:health_status)
health = pool.health_status
# Push metrics
$metrics.gauge("connection_pool.#{provider_name}.available", health[:available])
$metrics.gauge("connection_pool.#{provider_name}.in_use", health[:in_use])
$metrics.gauge("connection_pool.#{provider_name}.utilization", health[:utilization])
$metrics.gauge("connection_pool.#{provider_name}.waiting", health[:waiting_threads])
# Log warnings for unhealthy states
if health[:state] == :critical
$logger.log('s', "[!] [connection_pool] '#{provider_name}' CRITICAL: #{health[:utilization]}% used, #{health[:waiting_threads]} waiting")
$metrics.increment("connection_pool.#{provider_name}.critical")
elsif health[:state] == :warning
$logger.log('d', "[*] [connection_pool] '#{provider_name}' WARNING: #{health[:utilization]}% used, #{health[:waiting_threads]} waiting")
$metrics.increment("connection_pool.#{provider_name}.warning")
end
# Check circuit breaker status
if provider.respond_to?(:circuit_breaker) && provider.circuit_breaker
cb_status = provider.circuit_breaker.status
state_value = { closed: 0, half_open: 0.5, open: 1 }[cb_status[:state]] || 1
$metrics.gauge("circuit_breaker.state.#{provider_name}", state_value)
$metrics.gauge("circuit_breaker.failures.#{provider_name}", cb_status[:failure_count])
end
# Log adaptive timeout stats
if provider.respond_to?(:adaptive_timeout) && provider.adaptive_timeout
timeout_stats = provider.adaptive_timeout.stats
if timeout_stats[:samples] > 0
$metrics.gauge("adaptive_timeout.current.#{provider_name}", timeout_stats[:current_timeout])
$metrics.gauge("adaptive_timeout.p95.#{provider_name}", timeout_stats[:p95])
end
end
rescue StandardError => e
$logger.log('d', "[!] [connection_pool_monitor] Failed to monitor '#{provider_name}': #{e}")
end
end
end
def connection_pool_monitor_enabled?
global_config = $config[:config] || {}
enabled = global_config['connection_pool_monitor_enabled']
enabled = true if enabled.nil? # Default to enabled
enabled
end
def create_vm_disk(pool_name, vm, disk_size, provider)
Thread.new do
begin
@ -2525,6 +2582,27 @@ module Vmpooler
end
end
# Connection pool monitoring thread
if connection_pool_monitor_enabled?
monitor_interval = ($config[:config] && $config[:config]['connection_pool_monitor_interval']) || 10 # default 10 seconds
if !$threads['connection_pool_monitor']
$threads['connection_pool_monitor'] = Thread.new do
loop do
monitor_connection_pools
sleep(monitor_interval)
end
end
elsif !$threads['connection_pool_monitor'].alive?
$logger.log('d', '[!] [connection_pool_monitor] worker thread died, restarting')
$threads['connection_pool_monitor'] = Thread.new do
loop do
monitor_connection_pools
sleep(monitor_interval)
end
end
end
end
sleep(loop_delay)
unless maxloop == 0

View file

@ -13,6 +13,10 @@ module Vmpooler
attr_reader :metrics
# Provider options passed in during initialization
attr_reader :provider_options
# Circuit breaker for provider resilience
attr_reader :circuit_breaker
# Adaptive timeout for connections
attr_reader :adaptive_timeout
def initialize(config, logger, metrics, redis_connection_pool, name, options)
@config = config
@ -30,6 +34,11 @@ module Vmpooler
@provider_options = options
logger.log('s', "[!] Creating provider '#{name}'")
# Initialize circuit breaker if enabled
initialize_circuit_breaker if circuit_breaker_enabled?
# Initialize adaptive timeout if enabled
initialize_adaptive_timeout if adaptive_timeout_enabled?
end
# Helper Methods
@ -271,6 +280,86 @@ module Vmpooler
logger.log('s', '[!] purge_unconfigured_folders was renamed to purge_unconfigured_resources, please update your provider implementation')
purge_unconfigured_resources(allowlist)
end
private
# Circuit breaker configuration and initialization
def circuit_breaker_enabled?
global_config = @config[:config] || {}
provider_cfg = provider_config || {}
# Check provider-specific setting first, then global
enabled = provider_cfg['circuit_breaker_enabled']
enabled = global_config['circuit_breaker_enabled'] if enabled.nil?
enabled = true if enabled.nil? # Default to enabled
enabled
end
def initialize_circuit_breaker
require 'vmpooler/circuit_breaker'
global_config = @config[:config] || {}
provider_cfg = provider_config || {}
# Get circuit breaker settings (provider-specific overrides global)
failure_threshold = provider_cfg['circuit_breaker_failure_threshold'] ||
global_config['circuit_breaker_failure_threshold'] || 5
timeout = provider_cfg['circuit_breaker_timeout'] ||
global_config['circuit_breaker_timeout'] || 30
half_open_attempts = provider_cfg['circuit_breaker_half_open_attempts'] ||
global_config['circuit_breaker_half_open_attempts'] || 3
@circuit_breaker = Vmpooler::CircuitBreaker.new(
name: @provider_name,
logger: @logger,
metrics: @metrics,
failure_threshold: failure_threshold.to_i,
timeout: timeout.to_i,
half_open_attempts: half_open_attempts.to_i
)
@logger.log('d', "[*] [#{@provider_name}] Circuit breaker initialized (threshold: #{failure_threshold}, timeout: #{timeout}s)")
end
def adaptive_timeout_enabled?
global_config = @config[:config] || {}
provider_cfg = provider_config || {}
# Check provider-specific setting first, then global
enabled = provider_cfg['adaptive_timeout_enabled']
enabled = global_config['adaptive_timeout_enabled'] if enabled.nil?
enabled = true if enabled.nil? # Default to enabled
enabled
end
def initialize_adaptive_timeout
require 'vmpooler/adaptive_timeout'
global_config = @config[:config] || {}
provider_cfg = provider_config || {}
# Get adaptive timeout settings (provider-specific overrides global)
min = provider_cfg['connection_pool_timeout_min'] ||
global_config['connection_pool_timeout_min'] || 5
max = provider_cfg['connection_pool_timeout_max'] ||
global_config['connection_pool_timeout_max'] || 60
initial = provider_cfg['connection_pool_timeout_initial'] ||
global_config['connection_pool_timeout_initial'] || 30
@adaptive_timeout = Vmpooler::AdaptiveTimeout.new(
name: "#{@provider_name}_connections",
logger: @logger,
metrics: @metrics,
min: min.to_i,
max: max.to_i,
initial: initial.to_i
)
@logger.log('d', "[*] [#{@provider_name}] Adaptive timeout initialized (min: #{min}s, max: #{max}s, initial: #{initial}s)")
end
end
end
end

View file

@ -522,6 +522,54 @@
# for example, the first time is 2 seconds, then 4, 8, 16 etc. until it reaches check_loop_delay_max.
# This value must be greater than 1.0.
#
# - circuit_breaker_enabled (optional; default: true)
# Enable circuit breaker pattern for provider connections to prevent cascading failures.
# When a provider experiences repeated failures, the circuit breaker will "open" and reject
# requests immediately (fail-fast) rather than waiting for timeouts, allowing the provider
# to recover while protecting the system from thread exhaustion.
# (optional; default: true)
#
# - circuit_breaker_failure_threshold (optional; default: 5)
# Number of consecutive failures before opening the circuit breaker.
# Lower values make the circuit breaker more sensitive to failures.
# (optional; default: 5)
#
# - circuit_breaker_timeout (optional; default: 30) seconds
# How long to keep the circuit breaker open before attempting to test recovery.
# After this timeout, the circuit enters "half-open" state to test if the provider has recovered.
# (optional; default: 30)
#
# - circuit_breaker_half_open_attempts (optional; default: 3)
# Number of successful test requests required in half-open state before closing the circuit.
# (optional; default: 3)
#
# - adaptive_timeout_enabled (optional; default: true)
# Enable adaptive timeout that adjusts connection timeouts based on observed performance.
# The timeout will adapt to p95 latency + 50% buffer, bounded by min/max values.
# On failures, timeout is reduced to fail faster on subsequent attempts.
# (optional; default: true)
#
# - connection_pool_timeout_min (optional; default: 5) seconds
# Minimum connection timeout for adaptive timeout mechanism.
# (optional; default: 5)
#
# - connection_pool_timeout_max (optional; default: 60) seconds
# Maximum connection timeout for adaptive timeout mechanism.
# (optional; default: 60)
#
# - connection_pool_timeout_initial (optional; default: 30) seconds
# Initial connection timeout before adaptation begins.
# (optional; default: 30)
#
# - connection_pool_monitor_enabled (optional; default: true)
# Enable monitoring of connection pool health across all providers.
# Emits metrics for pool utilization, waiting threads, and circuit breaker status.
# (optional; default: true)
#
# - connection_pool_monitor_interval (optional; default: 10) seconds
# How often to check connection pool health and emit metrics.
# (optional; default: 10)
#
# - manage_host_selection (Only affects vSphere Provider)
# Allow host selection to be determined by vmpooler
# Hosts are selected based on current CPU utilization and cycled between when there are multiple targets