mirror of
https://github.com/puppetlabs/vmpooler.git
synced 2026-01-27 02:18:41 -05:00
[QENG-4075] Unify graphite and statsd for the pool manager
Prior to this, the `pool_manager.rb` library could take handles for both graphite and statsd endpoints (which were considered mutually exclusive), and then would use one. There was a bevy of conditional logic around sending metrics to the graphite/statsd handles (and actually at least one bug of omission). Here we refactor more, building on earlier work: - Our graphite class comes into line with the API of our Statsd and DummyStatsd classes - In `pool_manager.rb` we now accept a single "metrics" handle, and we drop all the conditional logic around statsd vs. graphite - We move the inconsistent error handling out of the calling classes and into our metrics classes, actually logging to `$stderr` when we can't publish metrics - We unify the setup code to use `config` to determine whether statsd, graphite, or a dummy metrics handle should be used, and make that happen. - Cleaned up some tests. We could probably stand to do a bit more work in this area.
This commit is contained in:
parent
f26e0f9bd7
commit
218f098800
6 changed files with 70 additions and 69 deletions
|
|
@ -54,10 +54,6 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
if parsed_config[:graphite]['server']
|
|
||||||
parsed_config[:graphite]['prefix'] ||= 'vmpooler'
|
|
||||||
end
|
|
||||||
|
|
||||||
if parsed_config[:tagfilter]
|
if parsed_config[:tagfilter]
|
||||||
parsed_config[:tagfilter].keys.each do |tag|
|
parsed_config[:tagfilter].keys.each do |tag|
|
||||||
parsed_config[:tagfilter][tag] = Regexp.new(parsed_config[:tagfilter][tag])
|
parsed_config[:tagfilter][tag] = Regexp.new(parsed_config[:tagfilter][tag])
|
||||||
|
|
@ -65,7 +61,6 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
|
|
||||||
parsed_config[:uptime] = Time.now
|
parsed_config[:uptime] = Time.now
|
||||||
|
|
||||||
parsed_config
|
parsed_config
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -77,19 +72,16 @@ module Vmpooler
|
||||||
Vmpooler::Logger.new logfile
|
Vmpooler::Logger.new logfile
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.new_graphite(server)
|
def self.new_metrics(params)
|
||||||
if server.nil? or server.empty? or server.length == 0
|
if config[:statsd]
|
||||||
nil
|
Vmpooler::Statsd.new(config[:statsd])
|
||||||
|
elsif config[:graphite]
|
||||||
|
Vmpooler::Graphite.new(config[:graphite])
|
||||||
else
|
else
|
||||||
Vmpooler::Graphite.new server
|
Vmpooler::DummyStatsd.new
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.new_statsd(params)
|
|
||||||
return DummyStatsd.new unless params
|
|
||||||
Statsd.new params
|
|
||||||
end
|
|
||||||
|
|
||||||
def self.pools(conf)
|
def self.pools(conf)
|
||||||
conf[:pools]
|
conf[:pools]
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -2,18 +2,41 @@ require 'rubygems' unless defined?(Gem)
|
||||||
|
|
||||||
module Vmpooler
|
module Vmpooler
|
||||||
class Graphite
|
class Graphite
|
||||||
def initialize(
|
attr_reader :server, :port, :prefix
|
||||||
s = 'graphite'
|
|
||||||
)
|
def initialize(params = {})
|
||||||
@server = s
|
if params[:server].nil? || params[:server].empty?
|
||||||
|
raise ArgumentError, "Graphite server is required. Config: #{params.inspect}"
|
||||||
|
end
|
||||||
|
|
||||||
|
@server = params[:server]
|
||||||
|
@port = params[:port] || 2003
|
||||||
|
@prefix = params[:prefix] || "vmpooler"
|
||||||
|
end
|
||||||
|
|
||||||
|
def increment(label)
|
||||||
|
log label, 1
|
||||||
|
end
|
||||||
|
|
||||||
|
def gauge(label, value)
|
||||||
|
log label, value
|
||||||
|
end
|
||||||
|
|
||||||
|
def timing(label, duration)
|
||||||
|
log label, duration
|
||||||
end
|
end
|
||||||
|
|
||||||
def log(path, value)
|
def log(path, value)
|
||||||
Thread.new do
|
Thread.new do
|
||||||
socket = TCPSocket.new(@server, 2003)
|
socket = TCPSocket.new(server, port)
|
||||||
socket.puts "#{path} #{value} #{Time.now.to_i}"
|
begin
|
||||||
|
socket.puts "#{prefix}.#{path} #{value} #{Time.now.to_i}"
|
||||||
|
ensure
|
||||||
socket.close
|
socket.close
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
rescue => err
|
||||||
|
$stderr.puts "Failure logging #{path} to graphite server [#{server}:#{port}]: #{err}"
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,13 @@
|
||||||
module Vmpooler
|
module Vmpooler
|
||||||
class PoolManager
|
class PoolManager
|
||||||
def initialize(config, logger, redis, graphite = nil, statsd = nil)
|
def initialize(config, logger, redis, metrics = nil)
|
||||||
$config = config
|
$config = config
|
||||||
|
|
||||||
# Load logger library
|
# Load logger library
|
||||||
$logger = logger
|
$logger = logger
|
||||||
|
|
||||||
# statsd and graphite are mutex in the context of vmpooler
|
# metrics logging handle
|
||||||
if statsd
|
$metrics = metrics
|
||||||
$statsd = statsd
|
|
||||||
elsif graphite
|
|
||||||
$graphite = graphite
|
|
||||||
end
|
|
||||||
|
|
||||||
# Connect to Redis
|
# Connect to Redis
|
||||||
$redis = redis
|
$redis = redis
|
||||||
|
|
@ -66,7 +62,7 @@ module Vmpooler
|
||||||
(host.summary.guest.hostName == vm)
|
(host.summary.guest.hostName == vm)
|
||||||
|
|
||||||
begin
|
begin
|
||||||
Socket.getaddrinfo(vm, nil)
|
Socket.getaddrinfo(vm, nil) # WTF?
|
||||||
rescue
|
rescue
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -260,11 +256,7 @@ module Vmpooler
|
||||||
|
|
||||||
$redis.decr('vmpooler__tasks__clone')
|
$redis.decr('vmpooler__tasks__clone')
|
||||||
|
|
||||||
begin
|
$metrics.timing("clone.#{vm['template']}", finish)
|
||||||
$statsd.timing($config[:statsd]['prefix'] + ".clone.#{vm['template']}", finish) if $statsd
|
|
||||||
$graphite.log($config[:graphite]['prefix'] + ".clone.#{vm['template']}", finish) if $graphite
|
|
||||||
rescue
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -297,8 +289,7 @@ module Vmpooler
|
||||||
finish = '%.2f' % (Time.now - start)
|
finish = '%.2f' % (Time.now - start)
|
||||||
|
|
||||||
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
|
$logger.log('s', "[-] [#{pool}] '#{vm}' destroyed in #{finish} seconds")
|
||||||
|
$metrics.log("destroy.#{pool}", finish)
|
||||||
$graphite.log($config[:graphite]['prefix'] + ".destroy.#{pool}", finish) if $graphite
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
@ -568,16 +559,8 @@ module Vmpooler
|
||||||
ready = $redis.scard('vmpooler__ready__' + pool['name'])
|
ready = $redis.scard('vmpooler__ready__' + pool['name'])
|
||||||
total = $redis.scard('vmpooler__pending__' + pool['name']) + ready
|
total = $redis.scard('vmpooler__pending__' + pool['name']) + ready
|
||||||
|
|
||||||
begin
|
$metrics.gauge('ready.' + pool['name'], $redis.scard('vmpooler__ready__' + pool['name']))
|
||||||
if $statsd
|
$metrics.gauge('running.' + pool['name'], $redis.scard('vmpooler__running__' + pool['name']))
|
||||||
$statsd.gauge($config[:statsd]['prefix'] + '.ready.' + pool['name'], $redis.scard('vmpooler__ready__' + pool['name']))
|
|
||||||
$statsd.gauge($config[:statsd]['prefix'] + '.running.' + pool['name'], $redis.scard('vmpooler__running__' + pool['name']))
|
|
||||||
elsif $graphite
|
|
||||||
$graphite.log($config[:graphite]['prefix'] + '.ready.' + pool['name'], $redis.scard('vmpooler__ready__' + pool['name']))
|
|
||||||
$graphite.log($config[:graphite]['prefix'] + '.running.' + pool['name'], $redis.scard('vmpooler__running__' + pool['name']))
|
|
||||||
end
|
|
||||||
rescue
|
|
||||||
end
|
|
||||||
|
|
||||||
if $redis.get('vmpooler__empty__' + pool['name'])
|
if $redis.get('vmpooler__empty__' + pool['name'])
|
||||||
unless ready == 0
|
unless ready == 0
|
||||||
|
|
|
||||||
|
|
@ -10,21 +10,27 @@ module Vmpooler
|
||||||
end
|
end
|
||||||
|
|
||||||
host = params[:server]
|
host = params[:server]
|
||||||
port = params[:port] || 8125
|
@port = params[:port] || 8125
|
||||||
@prefix = params[:prefix] || 'vmpooler'
|
@prefix = params[:prefix] || 'vmpooler'
|
||||||
@server = Statsd.new(host, port)
|
@server = Statsd.new(host, @port)
|
||||||
end
|
end
|
||||||
|
|
||||||
def increment(label)
|
def increment(label)
|
||||||
server.increment(prefix + "." + label)
|
server.increment(prefix + "." + label)
|
||||||
|
rescue => err
|
||||||
|
$stderr.puts "Failure incrementing #{prefix}.#{label} on statsd server [#{server}:#{port}]: #{err}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def gauge(label, value)
|
def gauge(label, value)
|
||||||
server.gauge(prefix + "." + label, value)
|
server.gauge(prefix + "." + label, value)
|
||||||
|
rescue => err
|
||||||
|
$stderr.puts "Failure updating gauge #{prefix}.#{label} on statsd server [#{server}:#{port}]: #{err}"
|
||||||
end
|
end
|
||||||
|
|
||||||
def timing(label, duration)
|
def timing(label, duration)
|
||||||
server.timing(prefix + "." + label, duration)
|
server.timing(prefix + "." + label, duration)
|
||||||
|
rescue => err
|
||||||
|
$stderr.puts "Failure updating timing #{prefix}.#{label} on statsd server [#{server}:#{port}]: #{err}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,14 @@ require 'time'
|
||||||
describe 'Pool Manager' do
|
describe 'Pool Manager' do
|
||||||
let(:logger) { double('logger') }
|
let(:logger) { double('logger') }
|
||||||
let(:redis) { double('redis') }
|
let(:redis) { double('redis') }
|
||||||
|
let(:metrics) { Vmpooler::DummyStatsd.new }
|
||||||
let(:config) { {} }
|
let(:config) { {} }
|
||||||
let(:pool) { 'pool1' }
|
let(:pool) { 'pool1' }
|
||||||
let(:vm) { 'vm1' }
|
let(:vm) { 'vm1' }
|
||||||
let(:timeout) { 5 }
|
let(:timeout) { 5 }
|
||||||
let(:host) { double('host') }
|
let(:host) { double('host') }
|
||||||
|
|
||||||
subject { Vmpooler::PoolManager.new(config, logger, redis) }
|
subject { Vmpooler::PoolManager.new(config, logger, redis, metrics) }
|
||||||
|
|
||||||
describe '#_check_pending_vm' do
|
describe '#_check_pending_vm' do
|
||||||
let(:pool_helper) { double('pool') }
|
let(:pool_helper) { double('pool') }
|
||||||
|
|
@ -190,9 +191,7 @@ describe 'Pool Manager' do
|
||||||
|
|
||||||
subject._check_running_vm(vm, pool, timeout)
|
subject._check_running_vm(vm, pool, timeout)
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '#move_running_to_completed' do
|
describe '#move_running_to_completed' do
|
||||||
|
|
@ -239,22 +238,21 @@ describe 'Pool Manager' do
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'logging' do
|
context 'logging' do
|
||||||
|
|
||||||
it 'logs empty pool' do
|
it 'logs empty pool' do
|
||||||
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
||||||
allow(redis).to receive(:scard).with('vmpooler__ready__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__ready__pool1').and_return(0)
|
||||||
|
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(0)
|
||||||
|
|
||||||
expect(logger).to receive(:log).with('s', "[!] [pool1] is empty")
|
expect(logger).to receive(:log).with('s', "[!] [pool1] is empty")
|
||||||
subject._check_pool(config[:pools][0])
|
subject._check_pool(config[:pools][0])
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '#_stats_running_ready' do
|
describe '#_stats_running_ready' do
|
||||||
let(:pool_helper) { double('pool') }
|
let(:pool_helper) { double('pool') }
|
||||||
let(:vsphere) { {pool => pool_helper} }
|
let(:vsphere) { {pool => pool_helper} }
|
||||||
let(:graphite) { double('graphite') }
|
let(:graphite) { Vmpooler::DummyStatsd.new }
|
||||||
let(:config) { {
|
let(:config) { {
|
||||||
config: { task_limit: 10 },
|
config: { task_limit: 10 },
|
||||||
pools: [ {'name' => 'pool1', 'size' => 5} ],
|
pools: [ {'name' => 'pool1', 'size' => 5} ],
|
||||||
|
|
@ -273,7 +271,6 @@ describe 'Pool Manager' do
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'graphite' do
|
context 'graphite' do
|
||||||
let(:graphite) { double('graphite') }
|
|
||||||
subject { Vmpooler::PoolManager.new(config, logger, redis, graphite) }
|
subject { Vmpooler::PoolManager.new(config, logger, redis, graphite) }
|
||||||
|
|
||||||
it 'increments graphite when enabled and statsd disabled' do
|
it 'increments graphite when enabled and statsd disabled' do
|
||||||
|
|
@ -282,8 +279,8 @@ describe 'Pool Manager' do
|
||||||
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
||||||
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(5)
|
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(5)
|
||||||
|
|
||||||
expect(graphite).to receive(:log).with('vmpooler.ready.pool1', 1)
|
expect(graphite).to receive(:gauge).with('ready.pool1', 1)
|
||||||
expect(graphite).to receive(:log).with('vmpooler.running.pool1', 5)
|
expect(graphite).to receive(:gauge).with('running.pool1', 5)
|
||||||
subject._check_pool(config[:pools][0])
|
subject._check_pool(config[:pools][0])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -293,20 +290,20 @@ describe 'Pool Manager' do
|
||||||
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
||||||
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(5)
|
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(5)
|
||||||
|
|
||||||
expect(graphite).to receive(:log).with('vmpooler.ready.pool1', 0)
|
expect(graphite).to receive(:gauge).with('ready.pool1', 0)
|
||||||
expect(graphite).to receive(:log).with('vmpooler.running.pool1', 5)
|
expect(graphite).to receive(:gauge).with('running.pool1', 5)
|
||||||
subject._check_pool(config[:pools][0])
|
subject._check_pool(config[:pools][0])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'statsd' do
|
context 'statsd' do
|
||||||
let(:statsd) { double('statsd') }
|
let(:statsd) { Vmpooler::DummyStatsd.new }
|
||||||
let(:config) { {
|
let(:config) { {
|
||||||
config: { task_limit: 10 },
|
config: { task_limit: 10 },
|
||||||
pools: [ {'name' => 'pool1', 'size' => 5} ],
|
pools: [ {'name' => 'pool1', 'size' => 5} ],
|
||||||
statsd: { 'prefix' => 'vmpooler' }
|
statsd: { 'prefix' => 'vmpooler' }
|
||||||
} }
|
} }
|
||||||
subject { Vmpooler::PoolManager.new(config, logger, redis, graphite, statsd) }
|
subject { Vmpooler::PoolManager.new(config, logger, redis, statsd) }
|
||||||
|
|
||||||
it 'increments statsd when configured' do
|
it 'increments statsd when configured' do
|
||||||
allow(redis).to receive(:scard).with('vmpooler__ready__pool1').and_return(1)
|
allow(redis).to receive(:scard).with('vmpooler__ready__pool1').and_return(1)
|
||||||
|
|
@ -314,8 +311,8 @@ describe 'Pool Manager' do
|
||||||
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
||||||
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(5)
|
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(5)
|
||||||
|
|
||||||
expect(statsd).to receive(:gauge).with('vmpooler.ready.pool1', 1)
|
expect(statsd).to receive(:gauge).with('ready.pool1', 1)
|
||||||
expect(statsd).to receive(:gauge).with('vmpooler.running.pool1', 5)
|
expect(statsd).to receive(:gauge).with('running.pool1', 5)
|
||||||
subject._check_pool(config[:pools][0])
|
subject._check_pool(config[:pools][0])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -323,9 +320,9 @@ describe 'Pool Manager' do
|
||||||
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(1)
|
allow(redis).to receive(:scard).with('vmpooler__running__pool1').and_return(1)
|
||||||
allow(redis).to receive(:scard).with('vmpooler__ready__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__ready__pool1').and_return(0)
|
||||||
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
allow(redis).to receive(:scard).with('vmpooler__pending__pool1').and_return(0)
|
||||||
allow(statsd).to receive(:gauge).with('vmpooler.running.pool1', 1)
|
allow(statsd).to receive(:gauge).with('running.pool1', 1)
|
||||||
|
|
||||||
expect(statsd).to receive(:gauge).with('vmpooler.ready.pool1', 0)
|
expect(statsd).to receive(:gauge).with('ready.pool1', 0)
|
||||||
subject._check_pool(config[:pools][0])
|
subject._check_pool(config[:pools][0])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
8
vmpooler
8
vmpooler
|
|
@ -8,11 +8,12 @@ require 'lib/vmpooler'
|
||||||
config = Vmpooler.config
|
config = Vmpooler.config
|
||||||
redis_host = config[:redis]['server']
|
redis_host = config[:redis]['server']
|
||||||
logger_file = config[:config]['logfile']
|
logger_file = config[:config]['logfile']
|
||||||
graphite = config[:graphite]['server'] ? config[:graphite]['server'] : nil
|
|
||||||
|
metrics = Vmpooler.new_metrics(config)
|
||||||
|
|
||||||
api = Thread.new {
|
api = Thread.new {
|
||||||
thr = Vmpooler::API.new
|
thr = Vmpooler::API.new
|
||||||
thr.helpers.configure(config, Vmpooler.new_redis(redis_host), Vmpooler.new_statsd(config[:statsd]))
|
thr.helpers.configure(config, Vmpooler.new_redis(redis_host), metrics)
|
||||||
thr.helpers.execute!
|
thr.helpers.execute!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -21,8 +22,7 @@ manager = Thread.new {
|
||||||
config,
|
config,
|
||||||
Vmpooler.new_logger(logger_file),
|
Vmpooler.new_logger(logger_file),
|
||||||
Vmpooler.new_redis(redis_host),
|
Vmpooler.new_redis(redis_host),
|
||||||
Vmpooler.new_graphite(graphite),
|
metrics)
|
||||||
Vmpooler.new_statsd(config[:statsd])
|
|
||||||
).execute!
|
).execute!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue