add a transaction around event receiving

Andrew Cantino 10 years ago
parent
commit
f32c6f0f7c
2 changed files with 68 additions and 59 deletions
  1. 24 22
      app/models/agent.rb
  2. 44 37
      bin/schedule.rb

+ 24 - 22
app/models/agent.rb

@@ -199,30 +199,32 @@ class Agent < ActiveRecord::Base
199 199
     end
200 200
 
201 201
     def receive!
202
-      sql = Agent.
203
-              select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
204
-              joins("JOIN links ON (links.receiver_id = agents.id)").
205
-              joins("JOIN agents AS sources ON (links.source_id = sources.id)").
206
-              joins("JOIN events ON (events.agent_id = sources.id)").
207
-              where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
208
-
209
-      agents_to_events = {}
210
-      Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
211
-        agents_to_events[receiver_agent_id] ||= []
212
-        agents_to_events[receiver_agent_id] << event_id
213
-      end
214
-
215
-      event_ids = agents_to_events.values.flatten.uniq.compact
216
-
217
-      Agent.where(:id => agents_to_events.keys).each do |agent|
218
-        agent.update_attribute :last_checked_event_id, event_ids.max
219
-        Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
220
-      end
221
-
222
-      {
202
+      Agent.transaction do
203
+        sql = Agent.
204
+                select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
205
+                joins("JOIN links ON (links.receiver_id = agents.id)").
206
+                joins("JOIN agents AS sources ON (links.source_id = sources.id)").
207
+                joins("JOIN events ON (events.agent_id = sources.id)").
208
+                where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
209
+
210
+        agents_to_events = {}
211
+        Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
212
+          agents_to_events[receiver_agent_id] ||= []
213
+          agents_to_events[receiver_agent_id] << event_id
214
+        end
215
+
216
+        event_ids = agents_to_events.values.flatten.uniq.compact
217
+
218
+        Agent.where(:id => agents_to_events.keys).each do |agent|
219
+          agent.update_attribute :last_checked_event_id, event_ids.max
220
+          Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
221
+        end
222
+
223
+        {
223 224
           :agent_count => agents_to_events.keys.length,
224 225
           :event_count => event_ids.length
225
-      }
226
+        }
227
+      end
226 228
     end
227 229
 
228 230
     # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then

+ 44 - 37
bin/schedule.rb

@@ -10,57 +10,64 @@ end
10 10
 
11 11
 require 'rufus/scheduler'
12 12
 
13
-def run_schedule(time, mutex)
14
-  ActiveRecord::Base.connection_pool.with_connection do
15
-    mutex.synchronize do
16
-      puts "Queuing schedule for #{time}"
17
-      Agent.delay.run_schedule(time)
13
+class HuginnScheduler
14
+  def run_schedule(time, mutex)
15
+    ActiveRecord::Base.connection_pool.with_connection do
16
+      mutex.synchronize do
17
+        puts "Queuing schedule for #{time}"
18
+        Agent.delay.run_schedule(time)
19
+      end
18 20
     end
19 21
   end
20
-end
21 22
 
22
-def propogate!(mutex)
23
-  ActiveRecord::Base.connection_pool.with_connection do
24
-    mutex.synchronize do
25
-      puts "Queuing event propagation"
26
-      Agent.delay.receive!
23
+  def propagate!(mutex)
24
+    ActiveRecord::Base.connection_pool.with_connection do
25
+      mutex.synchronize do
26
+        puts "Queuing event propagation"
27
+        Agent.delay.receive!
28
+      end
27 29
     end
28 30
   end
29
-end
30 31
 
31
-mutex = Mutex.new
32
+  def run!
33
+    mutex = Mutex.new
32 34
 
33
-scheduler = Rufus::Scheduler.new
35
+    rufus_scheduler = Rufus::Scheduler.new
34 36
 
35
-# Schedule event propagation.
37
+    # Schedule event propagation.
36 38
 
37
-scheduler.every '5m' do
38
-  propogate!(mutex)
39
-end
39
+    rufus_scheduler.every '1m' do
40
+      propagate!(mutex)
41
+    end
40 42
 
41
-# Schedule repeating events.
43
+    # Schedule repeating events.
42 44
 
43
-%w[2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
44
-  scheduler.every schedule do
45
-    run_schedule "every_#{schedule}", mutex
46
-  end
47
-end
45
+    %w[2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
46
+      rufus_scheduler.every schedule do
47
+        run_schedule "every_#{schedule}", mutex
48
+      end
49
+    end
48 50
 
49
-# Schedule events for specific times.
51
+    # Schedule events for specific times.
50 52
 
51
-# Times are assumed to be in PST for now.  Can store a user#timezone later.
52
-24.times do |hour|
53
-  scheduler.cron "0 #{hour} * * * America/Los_Angeles" do
54
-    if hour == 0
55
-      run_schedule "midnight", mutex
56
-    elsif hour < 12
57
-      run_schedule "#{hour}am", mutex
58
-    elsif hour == 12
59
-      run_schedule "noon", mutex
60
-    else
61
-      run_schedule "#{hour - 12}pm", mutex
53
+    # Times are assumed to be in PST for now.  Can store a user#timezone later.
54
+    24.times do |hour|
55
+      rufus_scheduler.cron "0 #{hour} * * * America/Los_Angeles" do
56
+        if hour == 0
57
+          run_schedule "midnight", mutex
58
+        elsif hour < 12
59
+          run_schedule "#{hour}am", mutex
60
+        elsif hour == 12
61
+          run_schedule "noon", mutex
62
+        else
63
+          run_schedule "#{hour - 12}pm", mutex
64
+        end
65
+      end
62 66
     end
67
+
68
+    rufus_scheduler.join
63 69
   end
64 70
 end
65 71
 
66
-scheduler.join
72
+scheduler = HuginnScheduler.new
73
+scheduler.run!