|
require 'cgi'
require 'json'
require 'rufus-scheduler'
require 'pp'
require 'twitter'
class AgentRunner
@@agents = []
def initialize(options = {})
@workers = {}
@signal_queue = []
@options = options
@options[:only] = [@options[:only]].flatten if @options[:only]
@options[:except] = [@options[:except]].flatten if @options[:except]
@mutex = Mutex.new
@scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)
@scheduler.every 5 do
restart_dead_workers if @running
end
@scheduler.every 60 do
run_workers if @running
end
set_traps
end
def stop
puts "Stopping AgentRunner..."
@running = false
@workers.each_pair do |_, w| w.stop! end
@scheduler.stop
end
def run
@running = true
run_workers
while @running
if signal = @signal_queue.shift
handle_signal(signal)
end
sleep 0.25
end
@scheduler.join
end
def set_traps
%w(INT TERM QUIT).each do |signal|
Signal.trap(signal) { @signal_queue << signal }
end
end
def self.register(agent)
@@agents << agent unless @@agents.include?(agent)
end
def self.with_connection
ActiveRecord::Base.connection_pool.with_connection do
yield
end
end
private
def run_workers
workers = load_workers
new_worker_ids = workers.keys
current_worker_ids = @workers.keys
(current_worker_ids - new_worker_ids).each do |outdated_worker_id|
puts "Killing #{outdated_worker_id}"
@workers[outdated_worker_id].stop!
@workers.delete(outdated_worker_id)
end
(new_worker_ids - current_worker_ids).each do |new_worker_id|
puts "Starting #{new_worker_id}"
@workers[new_worker_id] = workers[new_worker_id]
@workers[new_worker_id].setup!(@scheduler, @mutex)
@workers[new_worker_id].run!
end
end
def load_workers
workers = {}
@@agents.each do |klass|
next if @options[:only] && !@options[:only].include?(klass)
next if @options[:except] && @options[:except].include?(klass)
AgentRunner.with_connection do
(klass.setup_worker || [])
end.each do |agent_worker|
workers[agent_worker.id] = agent_worker
end
end
workers
end
def restart_dead_workers
@workers.each_pair do |id, worker|
if worker.thread && !worker.thread.alive?
puts "Restarting #{id.to_s}"
@workers[id].run!
end
end
end
def handle_signal(signal)
case signal
when 'INT', 'TERM', 'QUIT'
stop
end
end
end
require 'agents/twitter_stream_agent'
require 'agents/jabber_agent'
require 'huginn_scheduler'
require 'delayed_job_worker'
|