From efc31a32800440bc3ea6b44fdbeee4bd0ecbfc3b Mon Sep 17 00:00:00 2001 From: Mahima Singh <105724608+smahima27@users.noreply.github.com> Date: Fri, 26 Dec 2025 17:01:38 +0530 Subject: [PATCH] Add circuit breaker and adaptive timeout for provider resilience --- lib/vmpooler/adaptive_timeout.rb | 130 ++++++++++++++++ lib/vmpooler/circuit_breaker.rb | 193 ++++++++++++++++++++++++ lib/vmpooler/generic_connection_pool.rb | 28 ++++ lib/vmpooler/pool_manager.rb | 78 ++++++++++ lib/vmpooler/providers/base.rb | 89 +++++++++++ vmpooler.yaml.example | 48 ++++++ 6 files changed, 566 insertions(+) create mode 100644 lib/vmpooler/adaptive_timeout.rb create mode 100644 lib/vmpooler/circuit_breaker.rb diff --git a/lib/vmpooler/adaptive_timeout.rb b/lib/vmpooler/adaptive_timeout.rb new file mode 100644 index 0000000..2bb47fb --- /dev/null +++ b/lib/vmpooler/adaptive_timeout.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +module Vmpooler + # Adaptive timeout that adjusts based on observed connection performance + # to optimize between responsiveness and reliability. + # + # Tracks recent connection durations and adjusts timeout to p95 + buffer, + # reducing timeout on failures to fail faster during outages. + class AdaptiveTimeout + attr_reader :current_timeout + + # Initialize adaptive timeout + # + # @param name [String] Name for logging (e.g., "vsphere_connections") + # @param logger [Object] Logger instance + # @param metrics [Object] Metrics instance + # @param min [Integer] Minimum timeout in seconds + # @param max [Integer] Maximum timeout in seconds + # @param initial [Integer] Initial timeout in seconds + # @param max_samples [Integer] Number of recent samples to track + def initialize(name:, logger:, metrics:, min: 5, max: 60, initial: 30, max_samples: 100) + @name = name + @logger = logger + @metrics = metrics + @min_timeout = min + @max_timeout = max + @current_timeout = initial + @recent_durations = [] + @max_samples = max_samples + @mutex = Mutex.new + end + + # Get current timeout value (thread-safe) + # @return [Integer] Current timeout in seconds + def timeout + @mutex.synchronize { @current_timeout } + end + + # Record a successful operation duration + # @param duration [Float] Duration in seconds + def record_success(duration) + @mutex.synchronize do + @recent_durations << duration + @recent_durations.shift if @recent_durations.size > @max_samples + + # Adjust timeout based on recent performance + adjust_timeout if @recent_durations.size >= 10 + end + end + + # Record a failure (timeout or error) + # Reduces current timeout to fail faster on subsequent attempts + def record_failure + @mutex.synchronize do + old_timeout = @current_timeout + # Reduce timeout by 20% on failure, but don't go below minimum + @current_timeout = [(@current_timeout * 0.8).round, @min_timeout].max + + if old_timeout != @current_timeout + @logger.log('d', "[*] [adaptive_timeout] '#{@name}' reduced timeout #{old_timeout}s → #{@current_timeout}s after failure") + @metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout) + end + end + end + + # Reset to initial timeout (useful after recovery) + def reset + @mutex.synchronize do + @recent_durations.clear + old_timeout = @current_timeout + @current_timeout = [@max_timeout, 30].min + + @logger.log('d', "[*] [adaptive_timeout] '#{@name}' reset timeout #{old_timeout}s → #{@current_timeout}s") + @metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout) + end + end + + # Get statistics about recent durations + # @return [Hash] Statistics including min, max, avg, p95 + def stats + @mutex.synchronize do + return { samples: 0 } if @recent_durations.empty? + + sorted = @recent_durations.sort + { + samples: sorted.size, + min: sorted.first.round(2), + max: sorted.last.round(2), + avg: (sorted.sum / sorted.size.to_f).round(2), + p50: percentile(sorted, 0.50).round(2), + p95: percentile(sorted, 0.95).round(2), + p99: percentile(sorted, 0.99).round(2), + current_timeout: @current_timeout + } + end + end + + private + + def adjust_timeout + return if @recent_durations.empty? + + sorted = @recent_durations.sort + p95_duration = percentile(sorted, 0.95) + + # Set timeout to p95 + 50% buffer, bounded by min/max + new_timeout = (p95_duration * 1.5).round + new_timeout = [[new_timeout, @min_timeout].max, @max_timeout].min + + # Only adjust if change is significant (> 5 seconds) + if (new_timeout - @current_timeout).abs > 5 + old_timeout = @current_timeout + @current_timeout = new_timeout + + @logger.log('d', "[*] [adaptive_timeout] '#{@name}' adjusted timeout #{old_timeout}s → #{@current_timeout}s (p95: #{p95_duration.round(2)}s)") + @metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout) + @metrics.gauge("adaptive_timeout.p95.#{@name}", p95_duration) + end + end + + def percentile(sorted_array, percentile) + return 0 if sorted_array.empty? + + index = (sorted_array.size * percentile).ceil - 1 + index = [index, 0].max + index = [index, sorted_array.size - 1].min + sorted_array[index] + end + end +end diff --git a/lib/vmpooler/circuit_breaker.rb b/lib/vmpooler/circuit_breaker.rb new file mode 100644 index 0000000..48407c9 --- /dev/null +++ b/lib/vmpooler/circuit_breaker.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +module Vmpooler + # Circuit breaker pattern implementation to prevent cascading failures + # when a provider becomes unresponsive or experiences repeated failures. + # + # States: + # - CLOSED: Normal operation, requests flow through + # - OPEN: Provider is failing, reject requests immediately (fail fast) + # - HALF_OPEN: Testing if provider has recovered with limited requests + class CircuitBreaker + STATES = [:closed, :open, :half_open].freeze + + class CircuitOpenError < StandardError; end + + attr_reader :state, :failure_count, :success_count + + # Initialize a new circuit breaker + # + # @param name [String] Name for logging/metrics (e.g., "vsphere_provider") + # @param logger [Object] Logger instance + # @param metrics [Object] Metrics instance + # @param failure_threshold [Integer] Number of failures before opening circuit + # @param timeout [Integer] Seconds to wait in open state before testing (half-open) + # @param half_open_attempts [Integer] Number of successful test requests needed to close + def initialize(name:, logger:, metrics:, failure_threshold: 5, timeout: 30, half_open_attempts: 3) + @name = name + @logger = logger + @metrics = metrics + @failure_threshold = failure_threshold + @timeout = timeout + @half_open_attempts = half_open_attempts + + @state = :closed + @failure_count = 0 + @success_count = 0 + @last_failure_time = nil + @mutex = Mutex.new + end + + # Execute a block with circuit breaker protection + # + # @yield Block to execute if circuit allows + # @return Result of the block + # @raise CircuitOpenError if circuit is open and timeout hasn't elapsed + def call + check_state + + begin + result = yield + on_success + result + rescue StandardError => e + on_failure(e) + raise + end + end + + # Check if circuit allows requests + # @return [Boolean] true if circuit is closed or half-open + def allow_request? + @mutex.synchronize do + case @state + when :closed + true + when :half_open + true + when :open + if should_attempt_reset? + true + else + false + end + end + end + end + + # Get current circuit breaker status + # @return [Hash] Status information + def status + @mutex.synchronize do + { + name: @name, + state: @state, + failure_count: @failure_count, + success_count: @success_count, + last_failure_time: @last_failure_time, + next_retry_time: next_retry_time + } + end + end + + private + + def check_state + @mutex.synchronize do + case @state + when :open + if should_attempt_reset? + transition_to_half_open + else + time_remaining = (@timeout - (Time.now - @last_failure_time)).round(1) + raise CircuitOpenError, "Circuit breaker '#{@name}' is open (#{@failure_count} failures, retry in #{time_remaining}s)" + end + when :half_open + # Allow limited requests through for testing + when :closed + # Normal operation + end + end + end + + def should_attempt_reset? + return false unless @last_failure_time + + Time.now - @last_failure_time >= @timeout + end + + def next_retry_time + return nil unless @last_failure_time && @state == :open + + @last_failure_time + @timeout + end + + def on_success + @mutex.synchronize do + case @state + when :closed + # Reset failure count on success in closed state + @failure_count = 0 if @failure_count > 0 + when :half_open + @success_count += 1 + @failure_count = 0 + @logger.log('d', "[+] [circuit_breaker] '#{@name}' successful test request (#{@success_count}/#{@half_open_attempts})") + + if @success_count >= @half_open_attempts + transition_to_closed + end + when :open + # Should not happen, but reset if we somehow get a success + transition_to_closed + end + end + end + + def on_failure(error) + @mutex.synchronize do + @failure_count += 1 + @last_failure_time = Time.now + + case @state + when :closed + @logger.log('d', "[!] [circuit_breaker] '#{@name}' failure #{@failure_count}/#{@failure_threshold}: #{error.class}") + if @failure_count >= @failure_threshold + transition_to_open + end + when :half_open + @logger.log('d', "[!] [circuit_breaker] '#{@name}' failed during half-open test") + transition_to_open + when :open + # Already open, just log + @logger.log('d', "[!] [circuit_breaker] '#{@name}' additional failure while open") + end + end + end + + def transition_to_open + @state = :open + @success_count = 0 + @logger.log('s', "[!] [circuit_breaker] '#{@name}' OPENED after #{@failure_count} failures (will retry in #{@timeout}s)") + @metrics.increment("circuit_breaker.opened.#{@name}") + @metrics.gauge("circuit_breaker.state.#{@name}", 1) # 1 = open + end + + def transition_to_half_open + @state = :half_open + @success_count = 0 + @failure_count = 0 + @logger.log('s', "[*] [circuit_breaker] '#{@name}' HALF-OPEN, testing provider health") + @metrics.increment("circuit_breaker.half_open.#{@name}") + @metrics.gauge("circuit_breaker.state.#{@name}", 0.5) # 0.5 = half-open + end + + def transition_to_closed + @state = :closed + @failure_count = 0 + @success_count = 0 + @logger.log('s', "[+] [circuit_breaker] '#{@name}' CLOSED, provider recovered") + @metrics.increment("circuit_breaker.closed.#{@name}") + @metrics.gauge("circuit_breaker.state.#{@name}", 0) # 0 = closed + end + end +end diff --git a/lib/vmpooler/generic_connection_pool.rb b/lib/vmpooler/generic_connection_pool.rb index 5299b23..56a0b1f 100644 --- a/lib/vmpooler/generic_connection_pool.rb +++ b/lib/vmpooler/generic_connection_pool.rb @@ -34,6 +34,34 @@ module Vmpooler end end end + + # Get connection pool health status + # @return [Hash] Health status including utilization and queue depth + def health_status + { + size: @size, + available: @available.length, + in_use: @size - @available.length, + utilization: ((@size - @available.length).to_f / @size * 100).round(2), + waiting_threads: (@queue.respond_to?(:length) ? @queue.length : 0), + state: determine_health_state + } + end + + private + + def determine_health_state + utilization = ((@size - @available.length).to_f / @size * 100) + waiting = @queue.respond_to?(:length) ? @queue.length : 0 + + if utilization >= 90 || waiting > 5 + :critical # Pool exhausted or many waiting threads + elsif utilization >= 70 || waiting > 2 + :warning # Pool under stress + else + :healthy # Normal operation + end + end end end end diff --git a/lib/vmpooler/pool_manager.rb b/lib/vmpooler/pool_manager.rb index 9c6def6..fedc697 100644 --- a/lib/vmpooler/pool_manager.rb +++ b/lib/vmpooler/pool_manager.rb @@ -1287,6 +1287,63 @@ module Vmpooler $metrics.gauge('vmpooler_health.status', status_value) end + # Monitor connection pool health across all providers + def monitor_connection_pools + return unless $providers + + $providers.each do |provider_name, provider| + begin + next unless provider.respond_to?(:connection_pool) + + pool = provider.connection_pool + next unless pool.respond_to?(:health_status) + + health = pool.health_status + + # Push metrics + $metrics.gauge("connection_pool.#{provider_name}.available", health[:available]) + $metrics.gauge("connection_pool.#{provider_name}.in_use", health[:in_use]) + $metrics.gauge("connection_pool.#{provider_name}.utilization", health[:utilization]) + $metrics.gauge("connection_pool.#{provider_name}.waiting", health[:waiting_threads]) + + # Log warnings for unhealthy states + if health[:state] == :critical + $logger.log('s', "[!] [connection_pool] '#{provider_name}' CRITICAL: #{health[:utilization]}% used, #{health[:waiting_threads]} waiting") + $metrics.increment("connection_pool.#{provider_name}.critical") + elsif health[:state] == :warning + $logger.log('d', "[*] [connection_pool] '#{provider_name}' WARNING: #{health[:utilization]}% used, #{health[:waiting_threads]} waiting") + $metrics.increment("connection_pool.#{provider_name}.warning") + end + + # Check circuit breaker status + if provider.respond_to?(:circuit_breaker) && provider.circuit_breaker + cb_status = provider.circuit_breaker.status + state_value = { closed: 0, half_open: 0.5, open: 1 }[cb_status[:state]] || 1 + $metrics.gauge("circuit_breaker.state.#{provider_name}", state_value) + $metrics.gauge("circuit_breaker.failures.#{provider_name}", cb_status[:failure_count]) + end + + # Log adaptive timeout stats + if provider.respond_to?(:adaptive_timeout) && provider.adaptive_timeout + timeout_stats = provider.adaptive_timeout.stats + if timeout_stats[:samples] > 0 + $metrics.gauge("adaptive_timeout.current.#{provider_name}", timeout_stats[:current_timeout]) + $metrics.gauge("adaptive_timeout.p95.#{provider_name}", timeout_stats[:p95]) + end + end + rescue StandardError => e + $logger.log('d', "[!] [connection_pool_monitor] Failed to monitor '#{provider_name}': #{e}") + end + end + end + + def connection_pool_monitor_enabled? + global_config = $config[:config] || {} + enabled = global_config['connection_pool_monitor_enabled'] + enabled = true if enabled.nil? # Default to enabled + enabled + end + def create_vm_disk(pool_name, vm, disk_size, provider) Thread.new do begin @@ -2525,6 +2582,27 @@ module Vmpooler end end + # Connection pool monitoring thread + if connection_pool_monitor_enabled? + monitor_interval = ($config[:config] && $config[:config]['connection_pool_monitor_interval']) || 10 # default 10 seconds + if !$threads['connection_pool_monitor'] + $threads['connection_pool_monitor'] = Thread.new do + loop do + monitor_connection_pools + sleep(monitor_interval) + end + end + elsif !$threads['connection_pool_monitor'].alive? + $logger.log('d', '[!] [connection_pool_monitor] worker thread died, restarting') + $threads['connection_pool_monitor'] = Thread.new do + loop do + monitor_connection_pools + sleep(monitor_interval) + end + end + end + end + sleep(loop_delay) unless maxloop == 0 diff --git a/lib/vmpooler/providers/base.rb b/lib/vmpooler/providers/base.rb index e5d458e..10b89a3 100644 --- a/lib/vmpooler/providers/base.rb +++ b/lib/vmpooler/providers/base.rb @@ -13,6 +13,10 @@ module Vmpooler attr_reader :metrics # Provider options passed in during initialization attr_reader :provider_options + # Circuit breaker for provider resilience + attr_reader :circuit_breaker + # Adaptive timeout for connections + attr_reader :adaptive_timeout def initialize(config, logger, metrics, redis_connection_pool, name, options) @config = config @@ -30,6 +34,11 @@ module Vmpooler @provider_options = options logger.log('s', "[!] Creating provider '#{name}'") + + # Initialize circuit breaker if enabled + initialize_circuit_breaker if circuit_breaker_enabled? + # Initialize adaptive timeout if enabled + initialize_adaptive_timeout if adaptive_timeout_enabled? end # Helper Methods @@ -271,6 +280,86 @@ module Vmpooler logger.log('s', '[!] purge_unconfigured_folders was renamed to purge_unconfigured_resources, please update your provider implementation') purge_unconfigured_resources(allowlist) end + + private + + # Circuit breaker configuration and initialization + + def circuit_breaker_enabled? + global_config = @config[:config] || {} + provider_cfg = provider_config || {} + + # Check provider-specific setting first, then global + enabled = provider_cfg['circuit_breaker_enabled'] + enabled = global_config['circuit_breaker_enabled'] if enabled.nil? + enabled = true if enabled.nil? # Default to enabled + + enabled + end + + def initialize_circuit_breaker + require 'vmpooler/circuit_breaker' + + global_config = @config[:config] || {} + provider_cfg = provider_config || {} + + # Get circuit breaker settings (provider-specific overrides global) + failure_threshold = provider_cfg['circuit_breaker_failure_threshold'] || + global_config['circuit_breaker_failure_threshold'] || 5 + timeout = provider_cfg['circuit_breaker_timeout'] || + global_config['circuit_breaker_timeout'] || 30 + half_open_attempts = provider_cfg['circuit_breaker_half_open_attempts'] || + global_config['circuit_breaker_half_open_attempts'] || 3 + + @circuit_breaker = Vmpooler::CircuitBreaker.new( + name: @provider_name, + logger: @logger, + metrics: @metrics, + failure_threshold: failure_threshold.to_i, + timeout: timeout.to_i, + half_open_attempts: half_open_attempts.to_i + ) + + @logger.log('d', "[*] [#{@provider_name}] Circuit breaker initialized (threshold: #{failure_threshold}, timeout: #{timeout}s)") + end + + def adaptive_timeout_enabled? + global_config = @config[:config] || {} + provider_cfg = provider_config || {} + + # Check provider-specific setting first, then global + enabled = provider_cfg['adaptive_timeout_enabled'] + enabled = global_config['adaptive_timeout_enabled'] if enabled.nil? + enabled = true if enabled.nil? # Default to enabled + + enabled + end + + def initialize_adaptive_timeout + require 'vmpooler/adaptive_timeout' + + global_config = @config[:config] || {} + provider_cfg = provider_config || {} + + # Get adaptive timeout settings (provider-specific overrides global) + min = provider_cfg['connection_pool_timeout_min'] || + global_config['connection_pool_timeout_min'] || 5 + max = provider_cfg['connection_pool_timeout_max'] || + global_config['connection_pool_timeout_max'] || 60 + initial = provider_cfg['connection_pool_timeout_initial'] || + global_config['connection_pool_timeout_initial'] || 30 + + @adaptive_timeout = Vmpooler::AdaptiveTimeout.new( + name: "#{@provider_name}_connections", + logger: @logger, + metrics: @metrics, + min: min.to_i, + max: max.to_i, + initial: initial.to_i + ) + + @logger.log('d', "[*] [#{@provider_name}] Adaptive timeout initialized (min: #{min}s, max: #{max}s, initial: #{initial}s)") + end end end end diff --git a/vmpooler.yaml.example b/vmpooler.yaml.example index f05ded2..fd3fd22 100644 --- a/vmpooler.yaml.example +++ b/vmpooler.yaml.example @@ -522,6 +522,54 @@ # for example, the first time is 2 seconds, then 4, 8, 16 etc. until it reaches check_loop_delay_max. # This value must be greater than 1.0. # +# - circuit_breaker_enabled (optional; default: true) +# Enable circuit breaker pattern for provider connections to prevent cascading failures. +# When a provider experiences repeated failures, the circuit breaker will "open" and reject +# requests immediately (fail-fast) rather than waiting for timeouts, allowing the provider +# to recover while protecting the system from thread exhaustion. +# (optional; default: true) +# +# - circuit_breaker_failure_threshold (optional; default: 5) +# Number of consecutive failures before opening the circuit breaker. +# Lower values make the circuit breaker more sensitive to failures. +# (optional; default: 5) +# +# - circuit_breaker_timeout (optional; default: 30) seconds +# How long to keep the circuit breaker open before attempting to test recovery. +# After this timeout, the circuit enters "half-open" state to test if the provider has recovered. +# (optional; default: 30) +# +# - circuit_breaker_half_open_attempts (optional; default: 3) +# Number of successful test requests required in half-open state before closing the circuit. +# (optional; default: 3) +# +# - adaptive_timeout_enabled (optional; default: true) +# Enable adaptive timeout that adjusts connection timeouts based on observed performance. +# The timeout will adapt to p95 latency + 50% buffer, bounded by min/max values. +# On failures, timeout is reduced to fail faster on subsequent attempts. +# (optional; default: true) +# +# - connection_pool_timeout_min (optional; default: 5) seconds +# Minimum connection timeout for adaptive timeout mechanism. +# (optional; default: 5) +# +# - connection_pool_timeout_max (optional; default: 60) seconds +# Maximum connection timeout for adaptive timeout mechanism. +# (optional; default: 60) +# +# - connection_pool_timeout_initial (optional; default: 30) seconds +# Initial connection timeout before adaptation begins. +# (optional; default: 30) +# +# - connection_pool_monitor_enabled (optional; default: true) +# Enable monitoring of connection pool health across all providers. +# Emits metrics for pool utilization, waiting threads, and circuit breaker status. +# (optional; default: true) +# +# - connection_pool_monitor_interval (optional; default: 10) seconds +# How often to check connection pool health and emit metrics. +# (optional; default: 10) +# # - manage_host_selection (Only affects vSphere Provider) # Allow host selection to be determined by vmpooler # Hosts are selected based on current CPU utilization and cycled between when there are multiple targets