create worker thread for client
ensure tests user worker thread
This commit is contained in:
parent
d215389cb5
commit
baec6a395d
|
@ -2,13 +2,14 @@
|
|||
|
||||
require 'json'
|
||||
require 'socket'
|
||||
require 'thread'
|
||||
|
||||
class PrometheusExporter::Client
|
||||
|
||||
MAX_SOCKET_AGE = 25
|
||||
MAX_QUEUE_SIZE = 10_000
|
||||
|
||||
def initialize(host:, port:, max_queue_size: nil, manual_mode: false)
|
||||
def initialize(host:, port:, max_queue_size: nil, thread_sleep: 0.5)
|
||||
@queue = Queue.new
|
||||
@socket = nil
|
||||
@socket_started = nil
|
||||
|
@ -23,7 +24,9 @@ class PrometheusExporter::Client
|
|||
@max_queue_size = max_queue_size
|
||||
@host = host
|
||||
@port = port
|
||||
@manual_mode = manual_mode
|
||||
@worker_thread = nil
|
||||
@mutex = Mutex.new
|
||||
@thread_sleep = thread_sleep
|
||||
|
||||
end
|
||||
|
||||
|
@ -33,6 +36,8 @@ class PrometheusExporter::Client
|
|||
STDERR.puts "Prometheus Exporter client is dropping message cause queue is full"
|
||||
@queue.pop
|
||||
end
|
||||
|
||||
ensure_worker_thread!
|
||||
end
|
||||
|
||||
def process_queue
|
||||
|
@ -46,7 +51,7 @@ class PrometheusExporter::Client
|
|||
@socket.write(message)
|
||||
@socket.write("\r\n")
|
||||
rescue
|
||||
@queue.unshift message
|
||||
STDERR.puts "Prometheus Exporter is dropping a message cause queue is full"
|
||||
@socket = nil
|
||||
raise
|
||||
end
|
||||
|
@ -54,11 +59,41 @@ class PrometheusExporter::Client
|
|||
end
|
||||
|
||||
def stop
|
||||
@mutex.synchronize do
|
||||
@worker_thread&.kill
|
||||
while @worker_thread.alive?
|
||||
sleep 0.001
|
||||
end
|
||||
@worker_thread = nil
|
||||
end
|
||||
|
||||
close_socket!
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def worker_loop
|
||||
close_socket_if_old!
|
||||
process_queue
|
||||
sleep @thread_sleep
|
||||
rescue => e
|
||||
STDERR.puts "Prometheus Exporter, failed to send message #{e}"
|
||||
end
|
||||
|
||||
def ensure_worker_thread!
|
||||
unless @worker_thread&.alive?
|
||||
@mutex.synchronize do
|
||||
return if @worker_thread&.alive?
|
||||
|
||||
@worker_thread = Thread.new do
|
||||
while true
|
||||
worker_loop
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def close_socket!
|
||||
if @socket
|
||||
@socket.write("0\r\n")
|
||||
|
@ -70,10 +105,15 @@ class PrometheusExporter::Client
|
|||
end
|
||||
end
|
||||
|
||||
def ensure_socket!
|
||||
def close_socket_if_old!
|
||||
if @socket && ((@socket_started + MAX_SOCKET_AGE) > Time.now.to_f)
|
||||
close_socket!
|
||||
end
|
||||
end
|
||||
|
||||
def ensure_socket!
|
||||
close_socket_if_old!
|
||||
|
||||
@socket = TCPSocket.new @host, @port
|
||||
|
||||
@socket.write("POST /send-metrics HTTP/1.1\r\n")
|
||||
|
|
|
@ -45,13 +45,11 @@ class PrometheusExporterTest < Minitest::Test
|
|||
server = PrometheusExporter::Server::WebServer.new port: port, collector: collector
|
||||
server.start
|
||||
|
||||
client = PrometheusExporter::Client.new host: "localhost", port: port, manual_mode: true
|
||||
client = PrometheusExporter::Client.new host: "localhost", port: port, thread_sleep: 0.001
|
||||
client.send "type" => "mem metric", "value" => 150
|
||||
client.send "type" => "mem metric", "value" => 199
|
||||
|
||||
client.process_queue
|
||||
|
||||
TestHelper.wait_for(1) do
|
||||
TestHelper.wait_for(2) do
|
||||
collector.prometheus_metrics_text =~ /199/
|
||||
end
|
||||
|
||||
|
@ -60,8 +58,19 @@ class PrometheusExporterTest < Minitest::Test
|
|||
body = Net::HTTP.get(URI("http://localhost:#{port}/metrics"))
|
||||
assert_match(/199/, body)
|
||||
|
||||
one_minute = Time.now + 60
|
||||
Time.stub(:now, one_minute) do
|
||||
client.send "type" => "mem metric", "value" => 200.1
|
||||
|
||||
TestHelper.wait_for(2) do
|
||||
collector.prometheus_metrics_text =~ /200.1/
|
||||
end
|
||||
|
||||
assert_match(/200.1/, collector.prometheus_metrics_text)
|
||||
end
|
||||
|
||||
ensure
|
||||
client.stop
|
||||
server.stop
|
||||
client.stop rescue nil
|
||||
server.stop rescue nil
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue