long_runnable.rb 3.2KB

    =begin Usage Example: class Agents::ExampleAgent < Agent include LongRunnable # Optional # Override this method if you need to group multiple agents based on an API key, # or server they connect to. # Have a look at the TwitterStreamAgent for an example. def self.setup_worker; end class Worker < LongRunnable::Worker # Optional # Called after initialization of the Worker class, use this method as an initializer. def setup; end # Required # Put your agent logic in here, it must not return. If it does your agent will be restarted. def run; end # Optional # Use this method the gracefully stop your agent but make sure the run method return, or # terminate the thread. def stop; end end end =end module LongRunnable extend ActiveSupport::Concern included do |base| AgentRunner.register(base) end def start_worker? true end def worker_id(config = nil) "#{self.class.to_s}-#{id}-#{Digest::SHA1.hexdigest((config.presence || options).to_json)}" end module ClassMethods def setup_worker active.map do |agent| next unless agent.start_worker? self::Worker.new(id: agent.worker_id, agent: agent) end.compact end end class Worker attr_reader :thread, :id, :agent, :config, :mutex, :scheduler, :restarting def initialize(options = {}) @id = options[:id] @agent = options[:agent] @config = options[:config] @restarting = false end def run raise StandardError, 'Override LongRunnable::Worker#run in your agent Worker subclass.' end def run! @thread = Thread.new do Thread.current[:name] = "#{id}-#{Time.now}" begin run rescue SignalException, SystemExit stop! rescue StandardError => e message = "#{id} Exception #{e.message}:\n#{e.backtrace.first(10).join("\n")}" AgentRunner.with_connection do agent.error(message) end end end end def setup!(scheduler, mutex) @scheduler = scheduler @mutex = mutex setup if respond_to?(:setup) end def stop! @scheduler.jobs(tag: id).each(&:unschedule) if respond_to?(:stop) stop else terminate_thread! end end def terminate_thread! if thread thread.instance_eval { ActiveRecord::Base.connection_pool.release_connection } thread.wakeup if thread.status == 'sleep' thread.terminate end end def restart! without_alive_check do puts "--> Restarting #{id} at #{Time.now} <--" stop! setup!(scheduler, mutex) run! end end def every(*args, &blk) schedule(:every, args, &blk) end def cron(*args, &blk) schedule(:cron, args, &blk) end def schedule_in(*args, &blk) schedule(:schedule_in, args, &blk) end def boolify(value) agent.send(:boolify, value) end private def schedule(method, args, &blk) @scheduler.send(method, *args, tag: id, &blk) end def without_alive_check(&blk) @restarting = true yield ensure @restarting = false end end end