@@ -1,76 +1,37 @@ |
||
| 1 | 1 |
require 'rufus/scheduler' |
| 2 | 2 |
|
| 3 | 3 |
class HuginnScheduler |
| 4 |
+ FAILED_JOBS_TO_KEEP = 100 |
|
| 4 | 5 |
attr_accessor :mutex |
| 5 | 6 |
|
| 6 | 7 |
def initialize |
| 7 | 8 |
@rufus_scheduler = Rufus::Scheduler.new |
| 9 |
+ self.mutex = Mutex.new |
|
| 8 | 10 |
end |
| 9 | 11 |
|
| 10 | 12 |
def stop |
| 11 | 13 |
@rufus_scheduler.stop |
| 12 | 14 |
end |
| 13 | 15 |
|
| 14 |
- def run_schedule(time) |
|
| 15 |
- with_mutex do |
|
| 16 |
- puts "Queuing schedule for #{time}"
|
|
| 17 |
- Agent.delay.run_schedule(time) |
|
| 18 |
- end |
|
| 19 |
- end |
|
| 20 |
- |
|
| 21 |
- def propagate! |
|
| 22 |
- with_mutex do |
|
| 23 |
- puts "Queuing event propagation" |
|
| 24 |
- Agent.delay.receive! |
|
| 25 |
- end |
|
| 26 |
- end |
|
| 27 |
- |
|
| 28 |
- def cleanup_expired_events! |
|
| 29 |
- with_mutex do |
|
| 30 |
- puts "Running event cleanup" |
|
| 31 |
- Event.delay.cleanup_expired! |
|
| 32 |
- end |
|
| 33 |
- end |
|
| 34 |
- |
|
| 35 |
- def cleanup_failed_jobs! |
|
| 36 |
- first_to_delete = Delayed::Job.where.not(failed_at: nil).order("failed_at DESC").offset(ENV['FAILED_JOBS_TO_KEEP'].try(:to_i) || 100).limit(ENV['FAILED_JOBS_TO_KEEP'].try(:to_i) || 100).pluck(:failed_at).first
|
|
| 37 |
- Delayed::Job.where(["failed_at <= ?", first_to_delete]).delete_all if first_to_delete.present? |
|
| 38 |
- end |
|
| 39 |
- |
|
| 40 |
- def with_mutex |
|
| 41 |
- ActiveRecord::Base.connection_pool.with_connection do |
|
| 42 |
- mutex.synchronize do |
|
| 43 |
- yield |
|
| 44 |
- end |
|
| 45 |
- end |
|
| 46 |
- end |
|
| 47 |
- |
|
| 48 | 16 |
def run! |
| 49 |
- self.mutex = Mutex.new |
|
| 50 |
- |
|
| 51 | 17 |
tzinfo_friendly_timezone = ActiveSupport::TimeZone::MAPPING[ENV['TIMEZONE'].present? ? ENV['TIMEZONE'] : "Pacific Time (US & Canada)"] |
| 52 | 18 |
|
| 53 | 19 |
# Schedule event propagation. |
| 54 |
- |
|
| 55 | 20 |
@rufus_scheduler.every '1m' do |
| 56 | 21 |
propagate! |
| 57 | 22 |
end |
| 58 | 23 |
|
| 59 | 24 |
# Schedule event cleanup. |
| 60 |
- |
|
| 61 | 25 |
@rufus_scheduler.cron "0 0 * * * " + tzinfo_friendly_timezone do |
| 62 | 26 |
cleanup_expired_events! |
| 63 | 27 |
end |
| 64 | 28 |
|
| 65 | 29 |
# Schedule failed job cleanup. |
| 66 |
- |
|
| 67 | 30 |
@rufus_scheduler.every '1h' do |
| 68 | 31 |
cleanup_failed_jobs! |
| 69 | 32 |
end |
| 70 | 33 |
|
| 71 |
- |
|
| 72 | 34 |
# Schedule repeating events. |
| 73 |
- |
|
| 74 | 35 |
%w[1m 2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule| |
| 75 | 36 |
@rufus_scheduler.every schedule do |
| 76 | 37 |
run_schedule "every_#{schedule}"
|
@@ -78,22 +39,60 @@ class HuginnScheduler |
||
| 78 | 39 |
end |
| 79 | 40 |
|
| 80 | 41 |
# Schedule events for specific times. |
| 81 |
- |
|
| 82 |
- # Times are assumed to be in PST for now. Can store a user#timezone later. |
|
| 83 | 42 |
24.times do |hour| |
| 84 | 43 |
@rufus_scheduler.cron "0 #{hour} * * * " + tzinfo_friendly_timezone do
|
| 85 |
- if hour == 0 |
|
| 86 |
- run_schedule "midnight" |
|
| 87 |
- elsif hour < 12 |
|
| 88 |
- run_schedule "#{hour}am"
|
|
| 89 |
- elsif hour == 12 |
|
| 90 |
- run_schedule "noon" |
|
| 91 |
- else |
|
| 92 |
- run_schedule "#{hour - 12}pm"
|
|
| 93 |
- end |
|
| 44 |
+ run_schedule hour_to_schedule_name(hour) |
|
| 94 | 45 |
end |
| 95 | 46 |
end |
| 96 | 47 |
|
| 97 | 48 |
@rufus_scheduler.join |
| 98 | 49 |
end |
| 50 |
+ |
|
| 51 |
+ private |
|
| 52 |
+ def run_schedule(time) |
|
| 53 |
+ with_mutex do |
|
| 54 |
+ puts "Queuing schedule for #{time}"
|
|
| 55 |
+ Agent.delay.run_schedule(time) |
|
| 56 |
+ end |
|
| 57 |
+ end |
|
| 58 |
+ |
|
| 59 |
+ def propagate! |
|
| 60 |
+ with_mutex do |
|
| 61 |
+ puts "Queuing event propagation" |
|
| 62 |
+ Agent.delay.receive! |
|
| 63 |
+ end |
|
| 64 |
+ end |
|
| 65 |
+ |
|
| 66 |
+ def cleanup_expired_events! |
|
| 67 |
+ with_mutex do |
|
| 68 |
+ puts "Running event cleanup" |
|
| 69 |
+ Event.delay.cleanup_expired! |
|
| 70 |
+ end |
|
| 71 |
+ end |
|
| 72 |
+ |
|
| 73 |
+ def cleanup_failed_jobs! |
|
| 74 |
+ num_to_keep = (ENV['FAILED_JOBS_TO_KEEP'].presence || FAILED_JOBS_TO_KEEP).to_i |
|
| 75 |
+ first_to_delete = Delayed::Job.where.not(failed_at: nil).order("failed_at DESC").offset(num_to_keep).limit(num_to_keep).pluck(:failed_at).first
|
|
| 76 |
+ Delayed::Job.where(["failed_at <= ?", first_to_delete]).delete_all if first_to_delete.present? |
|
| 77 |
+ end |
|
| 78 |
+ |
|
| 79 |
+ def hour_to_schedule_name(hour) |
|
| 80 |
+ if hour == 0 |
|
| 81 |
+ "midnight" |
|
| 82 |
+ elsif hour < 12 |
|
| 83 |
+ "#{hour}am"
|
|
| 84 |
+ elsif hour == 12 |
|
| 85 |
+ "noon" |
|
| 86 |
+ else |
|
| 87 |
+ "#{hour - 12}pm"
|
|
| 88 |
+ end |
|
| 89 |
+ end |
|
| 90 |
+ |
|
| 91 |
+ def with_mutex |
|
| 92 |
+ ActiveRecord::Base.connection_pool.with_connection do |
|
| 93 |
+ mutex.synchronize do |
|
| 94 |
+ yield |
|
| 95 |
+ end |
|
| 96 |
+ end |
|
| 97 |
+ end |
|
| 99 | 98 |
end |
@@ -3,3 +3,4 @@ TWITTER_OAUTH_KEY=twitteroauthkey |
||
| 3 | 3 |
TWITTER_OAUTH_SECRET=twitteroauthsecret |
| 4 | 4 |
THIRTY_SEVEN_SIGNALS_OAUTH_KEY=TESTKEY |
| 5 | 5 |
THIRTY_SEVEN_SIGNALS_OAUTH_SECRET=TESTSECRET |
| 6 |
+FAILED_JOBS_TO_KEEP=2 |
@@ -0,0 +1,77 @@ |
||
| 1 |
+require 'spec_helper' |
|
| 2 |
+ |
|
| 3 |
+describe HuginnScheduler do |
|
| 4 |
+ before(:each) do |
|
| 5 |
+ @scheduler = HuginnScheduler.new |
|
| 6 |
+ stub |
|
| 7 |
+ end |
|
| 8 |
+ |
|
| 9 |
+ it "should stop the scheduler" do |
|
| 10 |
+ mock.instance_of(Rufus::Scheduler).stop |
|
| 11 |
+ @scheduler.stop |
|
| 12 |
+ end |
|
| 13 |
+ |
|
| 14 |
+ it "schould register the schedules with the rufus scheduler and run" do |
|
| 15 |
+ mock.instance_of(Rufus::Scheduler).join |
|
| 16 |
+ @scheduler.run! |
|
| 17 |
+ end |
|
| 18 |
+ |
|
| 19 |
+ it "should run scheduled agents" do |
|
| 20 |
+ mock(Agent).run_schedule('every_1h')
|
|
| 21 |
+ mock.instance_of(IO).puts('Queuing schedule for every_1h')
|
|
| 22 |
+ @scheduler.send(:run_schedule, 'every_1h') |
|
| 23 |
+ end |
|
| 24 |
+ |
|
| 25 |
+ it "should propagate events" do |
|
| 26 |
+ mock(Agent).receive! |
|
| 27 |
+ stub.instance_of(IO).puts |
|
| 28 |
+ @scheduler.send(:propagate!) |
|
| 29 |
+ end |
|
| 30 |
+ |
|
| 31 |
+ it "schould clean up expired events" do |
|
| 32 |
+ mock(Event).cleanup_expired! |
|
| 33 |
+ stub.instance_of(IO).puts |
|
| 34 |
+ @scheduler.send(:cleanup_expired_events!) |
|
| 35 |
+ end |
|
| 36 |
+ |
|
| 37 |
+ describe "#hour_to_schedule_name" do |
|
| 38 |
+ it "for 0h" do |
|
| 39 |
+ @scheduler.send(:hour_to_schedule_name, 0).should == 'midnight' |
|
| 40 |
+ end |
|
| 41 |
+ |
|
| 42 |
+ it "for the forenoon" do |
|
| 43 |
+ @scheduler.send(:hour_to_schedule_name, 6).should == '6am' |
|
| 44 |
+ end |
|
| 45 |
+ |
|
| 46 |
+ it "for 12h" do |
|
| 47 |
+ @scheduler.send(:hour_to_schedule_name, 12).should == 'noon' |
|
| 48 |
+ end |
|
| 49 |
+ |
|
| 50 |
+ it "for the afternoon" do |
|
| 51 |
+ @scheduler.send(:hour_to_schedule_name, 17).should == '5pm' |
|
| 52 |
+ end |
|
| 53 |
+ end |
|
| 54 |
+ |
|
| 55 |
+ describe "cleanup_failed_jobs!" do |
|
| 56 |
+ before do |
|
| 57 |
+ 3.times do |i| |
|
| 58 |
+ Delayed::Job.create(failed_at: Time.now - i.minutes) |
|
| 59 |
+ end |
|
| 60 |
+ @keep = Delayed::Job.order(:failed_at)[1] |
|
| 61 |
+ end |
|
| 62 |
+ |
|
| 63 |
+ it "work with set FAILED_JOBS_TO_KEEP env variable", focus: true do |
|
| 64 |
+ expect { @scheduler.send(:cleanup_failed_jobs!) }.to change(Delayed::Job, :count).by(-1)
|
|
| 65 |
+ expect { @scheduler.send(:cleanup_failed_jobs!) }.to change(Delayed::Job, :count).by(0)
|
|
| 66 |
+ @keep.id.should == Delayed::Job.order(:failed_at)[0].id |
|
| 67 |
+ end |
|
| 68 |
+ |
|
| 69 |
+ |
|
| 70 |
+ it "work without the FAILED_JOBS_TO_KEEP env variable" do |
|
| 71 |
+ old = ENV['FAILED_JOBS_TO_KEEP'] |
|
| 72 |
+ ENV['FAILED_JOBS_TO_KEEP'] = nil |
|
| 73 |
+ expect { @scheduler.send(:cleanup_failed_jobs!) }.to change(Delayed::Job, :count).by(0)
|
|
| 74 |
+ ENV['FAILED_JOBS_TO_KEEP'] = old |
|
| 75 |
+ end |
|
| 76 |
+ end |
|
| 77 |
+end |