huginn_scheduler.rb 4.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. require 'rufus/scheduler'
  2. class Rufus::Scheduler
  3. SCHEDULER_AGENT_TAG = Agents::SchedulerAgent.name
  4. class Job
  5. # Store an ID of SchedulerAgent in this job.
  6. def scheduler_agent_id= id
  7. self[:scheduler_agent_id] = id
  8. end
  9. # Extract an ID of SchedulerAgent if any.
  10. def scheduler_agent_id
  11. self[:scheduler_agent_id]
  12. end
  13. # Return a SchedulerAgent tied to this job. Return nil if it is
  14. # not found or disabled.
  15. def scheduler_agent
  16. agent_id = scheduler_agent_id or return nil
  17. Agent.of_type(Agents::SchedulerAgent).active.find_by(id: agent_id)
  18. end
  19. end
  20. # Get all jobs tied to any SchedulerAgent
  21. def scheduler_agent_jobs
  22. jobs(tag: SCHEDULER_AGENT_TAG)
  23. end
  24. # Get a job tied to a given SchedulerAgent
  25. def scheduler_agent_job(agent)
  26. scheduler_agent_jobs.find { |job|
  27. job[:scheduler_agent_id] == agent.id
  28. }
  29. end
  30. # Schedule or reschedule a job for a given SchedulerAgent and return
  31. # the running job. Return nil if unscheduled.
  32. def schedule_scheduler_agent(agent)
  33. job = scheduler_agent_job(agent)
  34. if agent.disabled?
  35. if job
  36. puts "Unscheduling SchedulerAgent##{agent.id} (disabled)"
  37. job.unschedule
  38. end
  39. nil
  40. else
  41. if job
  42. return job if agent.memory['scheduled_at'] == job.scheduled_at.to_i
  43. puts "Rescheduling SchedulerAgent##{agent.id}"
  44. job.unschedule
  45. else
  46. puts "Scheduling SchedulerAgent##{agent.id}"
  47. end
  48. agent_id = agent.id
  49. job = schedule_cron agent.options['schedule'], tag: SCHEDULER_AGENT_TAG do |job|
  50. job.scheduler_agent_id = agent_id
  51. if scheduler_agent = job.scheduler_agent
  52. scheduler_agent.check!
  53. else
  54. puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (disabled or deleted)"
  55. job.unschedule
  56. end
  57. end
  58. agent.memory['scheduled_at'] = job.scheduled_at.to_i
  59. agent.save
  60. job
  61. end
  62. end
  63. # Schedule or reschedule jobs for all SchedulerAgents and unschedule
  64. # orphaned jobs if any.
  65. def schedule_scheduler_agents
  66. scheduled_jobs = Agent.of_type(Agents::SchedulerAgent).map { |scheduler_agent|
  67. schedule_scheduler_agent(scheduler_agent)
  68. }.compact
  69. (scheduler_agent_jobs - scheduled_jobs).each { |job|
  70. puts "Unscheduling SchedulerAgent##{job.scheduler_agent_id} (orphaned)"
  71. job.unschedule
  72. }
  73. end
  74. end
  75. class HuginnScheduler
  76. attr_accessor :mutex
  77. def initialize
  78. @rufus_scheduler = Rufus::Scheduler.new
  79. end
  80. def stop
  81. @rufus_scheduler.stop
  82. end
  83. def run_schedule(time)
  84. with_mutex do
  85. puts "Queuing schedule for #{time}"
  86. Agent.delay.run_schedule(time)
  87. end
  88. end
  89. def propagate!
  90. with_mutex do
  91. puts "Queuing event propagation"
  92. Agent.delay.receive!
  93. end
  94. end
  95. def cleanup_expired_events!
  96. with_mutex do
  97. puts "Running event cleanup"
  98. Event.delay.cleanup_expired!
  99. end
  100. end
  101. def with_mutex
  102. ActiveRecord::Base.connection_pool.with_connection do
  103. mutex.synchronize do
  104. yield
  105. end
  106. end
  107. end
  108. def run!
  109. self.mutex = Mutex.new
  110. tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"]
  111. # Schedule event propagation.
  112. @rufus_scheduler.every '1m' do
  113. propagate!
  114. end
  115. # Schedule event cleanup.
  116. @rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do
  117. cleanup_expired_events!
  118. end
  119. # Schedule repeating events.
  120. %w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
  121. @rufus_scheduler.every schedule do
  122. run_schedule "every_#{schedule}"
  123. end
  124. end
  125. # Schedule events for specific times.
  126. # Times are assumed to be in PST for now. Can store a user#timezone later.
  127. 24.times do |hour|
  128. @rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
  129. if hour == 0
  130. run_schedule "midnight"
  131. elsif hour < 12
  132. run_schedule "#{hour}am"
  133. elsif hour == 12
  134. run_schedule "noon"
  135. else
  136. run_schedule "#{hour - 12}pm"
  137. end
  138. end
  139. end
  140. # Schedule Scheduler Agents
  141. @rufus_scheduler.every '1m' do
  142. @rufus_scheduler.schedule_scheduler_agents
  143. end
  144. @rufus_scheduler.join
  145. end
  146. end