Merge branch 'master' of github.com:cantino/huginn

Andrew Cantino 11 年之前
父節點
當前提交
35cab738eb

+ 1 - 0
CHANGES.md

@@ -1,5 +1,6 @@
1 1
 # Changes
2 2
 
3
+* 0.2 (Nov 6, 2013)    - PeakDetectorAgent now uses `window_duration_in_days` and `min_peak_spacing_in_days`.  Additionally, peaks trigger when the time series rises over the standard deviation multiple, not after it starts to fall.
3 4
 * June 29, 2013        - Removed rails\_admin because it was causing deployment issues. Better to have people install their favorite admin tool if they want one.
4 5
 * June, 2013           - A number of new agents have been contributed, including interfaces to Weibo, Twitter, and Twilio, as well as Agents for translation, sentiment analysis, and for posting and receiving webhooks.
5 6
 * March 24, 2013 (0.1) - Refactored loading of Agents for `check` and `receive` to use ids instead of full objects.  This should fix the too-large delayed_job issues.  Added `system_timer` and `fastercsv` to the Gemfile for the Ruby 1.8 platform.

+ 1 - 1
VERSION

@@ -1 +1 @@
1
-0.1
1
+0.2

+ 2 - 3
app/concerns/twitter_concern.rb

@@ -19,12 +19,11 @@ module TwitterConcern
19 19
     Twitter.configure do |config|
20 20
       config.consumer_key = options[:consumer_key]
21 21
       config.consumer_secret = options[:consumer_secret]
22
-      config.oauth_token = options[:oauth_token]
23
-      config.oauth_token_secret = options[:oauth_token_secret]
22
+      config.oauth_token = options[:oauth_token] || options[:access_key]
23
+      config.oauth_token_secret = options[:oauth_token_secret] || options[:access_secret]
24 24
     end
25 25
   end
26 26
 
27 27
   module ClassMethods
28
-
29 28
   end
30 29
 end

+ 24 - 22
app/models/agent.rb

@@ -199,30 +199,32 @@ class Agent < ActiveRecord::Base
199 199
     end
200 200
 
201 201
     def receive!
202
-      sql = Agent.
203
-              select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
204
-              joins("JOIN links ON (links.receiver_id = agents.id)").
205
-              joins("JOIN agents AS sources ON (links.source_id = sources.id)").
206
-              joins("JOIN events ON (events.agent_id = sources.id)").
207
-              where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
208
-
209
-      agents_to_events = {}
210
-      Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
211
-        agents_to_events[receiver_agent_id] ||= []
212
-        agents_to_events[receiver_agent_id] << event_id
213
-      end
214
-
215
-      event_ids = agents_to_events.values.flatten.uniq.compact
216
-
217
-      Agent.where(:id => agents_to_events.keys).each do |agent|
218
-        agent.update_attribute :last_checked_event_id, event_ids.max
219
-        Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
220
-      end
221
-
222
-      {
202
+      Agent.transaction do
203
+        sql = Agent.
204
+                select("agents.id AS receiver_agent_id, sources.id AS source_agent_id, events.id AS event_id").
205
+                joins("JOIN links ON (links.receiver_id = agents.id)").
206
+                joins("JOIN agents AS sources ON (links.source_id = sources.id)").
207
+                joins("JOIN events ON (events.agent_id = sources.id)").
208
+                where("agents.last_checked_event_id IS NULL OR events.id > agents.last_checked_event_id").to_sql
209
+
210
+        agents_to_events = {}
211
+        Agent.connection.select_rows(sql).each do |receiver_agent_id, source_agent_id, event_id|
212
+          agents_to_events[receiver_agent_id] ||= []
213
+          agents_to_events[receiver_agent_id] << event_id
214
+        end
215
+
216
+        event_ids = agents_to_events.values.flatten.uniq.compact
217
+
218
+        Agent.where(:id => agents_to_events.keys).each do |agent|
219
+          agent.update_attribute :last_checked_event_id, event_ids.max
220
+          Agent.async_receive(agent.id, agents_to_events[agent.id].uniq)
221
+        end
222
+
223
+        {
223 224
           :agent_count => agents_to_events.keys.length,
224 225
           :event_count => event_ids.length
225
-      }
226
+        }
227
+      end
226 228
     end
227 229
 
228 230
     # Given an Agent id and an array of Event ids, load the Agent, call #receive on it with the Event objects, and then

+ 24 - 21
app/models/agents/peak_detector_agent.rb

@@ -11,9 +11,9 @@ module Agents
11 11
 
12 12
       Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
13 13
 
14
-      You may set `window_duration` to change the default memory window length of two weeks,
15
-      `peak_spacing` to change the default minimum peak spacing of two days, and
16
-      `std_multiple` to change the default standard deviation threshold multiple of 3.
14
+      You may set `window_duration_in_days` to change the default memory window length of `14` days,
15
+      `min_peak_spacing_in_days` to change the default minimum peak spacing of `2` days (peaks closer together will be ignored), and
16
+      `std_multiple` to change the default standard deviation threshold multiple of `3`.
17 17
     MD
18 18
 
19 19
     event_description <<-MD
@@ -61,27 +61,22 @@ module Agents
61 61
       memory[:peaks][group] ||= []
62 62
 
63 63
       if memory[:data][group].length > 4 && (memory[:peaks][group].empty? || memory[:peaks][group].last < event.created_at.to_i - peak_spacing)
64
-        average_value, standard_deviation = stats_for(group, :skip_last => 2)
65
-        newest_value = memory[:data][group][-1].first.to_f
66
-        second_newest_value, second_newest_time = memory[:data][group][-2].map(&:to_f)
67
-
68
-        #pp({:newest_value => newest_value,
69
-        #    :second_newest_value => second_newest_value,
70
-        #    :average_value => average_value,
71
-        #    :standard_deviation => standard_deviation,
72
-        #    :threshold => average_value + std_multiple * standard_deviation })
73
-
74
-        if newest_value < second_newest_value && second_newest_value > average_value + std_multiple * standard_deviation
75
-          memory[:peaks][group] << second_newest_time
76
-          memory[:peaks][group].reject! { |p| p <= second_newest_time - window_duration }
77
-          create_event :payload => {:message => options[:message], :peak => second_newest_value, :peak_time => second_newest_time, :grouped_by => group.to_s}
64
+        average_value, standard_deviation = stats_for(group, :skip_last => 1)
65
+        newest_value, newest_time = memory[:data][group][-1].map(&:to_f)
66
+
67
+        #p [newest_value, average_value, average_value + std_multiple * standard_deviation, standard_deviation]
68
+
69
+        if newest_value > average_value + std_multiple * standard_deviation
70
+          memory[:peaks][group] << newest_time
71
+          memory[:peaks][group].reject! { |p| p <= newest_time - window_duration }
72
+          create_event :payload => {:message => options[:message], :peak => newest_value, :peak_time => newest_time, :grouped_by => group.to_s}
78 73
         end
79 74
       end
80 75
     end
81 76
 
82 77
     def stats_for(group, options = {})
83 78
       data = memory[:data][group].map { |d| d.first.to_f }
84
-      data = data[0...(memory[:data][group].length - (options[:skip_last] || 0))]
79
+      data = data[0...(data.length - (options[:skip_last] || 0))]
85 80
       length = data.length.to_f
86 81
       mean = 0
87 82
       mean_variance = 0
@@ -99,15 +94,23 @@ module Agents
99 94
     end
100 95
 
101 96
     def window_duration
102
-      (options[:window_duration].present? && options[:window_duration].to_i) || 2.weeks
97
+      if options[:window_duration].present? # The older option
98
+        options[:window_duration].to_i
99
+      else
100
+        (options[:window_duration_in_days] || 14).to_f.days
101
+      end
103 102
     end
104 103
 
105 104
     def std_multiple
106
-      (options[:std_multiple].present? && options[:std_multiple].to_i) || 3
105
+      (options[:std_multiple] || 3).to_f
107 106
     end
108 107
 
109 108
     def peak_spacing
110
-      (options[:peak_spacing].present? && options[:peak_spacing].to_i) || 2.days
109
+      if options[:peak_spacing].present? # The older option
110
+        options[:peak_spacing].to_i
111
+      else
112
+        (options[:min_peak_spacing_in_days] || 2).to_f.days
113
+      end
111 114
     end
112 115
 
113 116
     def group_for(event)

+ 46 - 21
app/models/agents/twitter_stream_agent.rb

@@ -1,13 +1,16 @@
1 1
 module Agents
2 2
   class TwitterStreamAgent < Agent
3
+    include TwitterConcern
3 4
     cannot_receive_events!
4 5
 
5 6
     description <<-MD
6 7
       The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide.
7 8
 
8
-      You must provide an oAuth `consumer_key`, `consumer_secret`, `access_key`, and `access_secret`, as well as an array of `filters`.  Multiple words in a filter
9
+      You must provide an oAuth `consumer_key`, `consumer_secret`, `oauth_token`, and `oauth_token_secret`, as well as an array of `filters`.  Multiple words in a filter
9 10
       must all show up in a tweet, but are independent of order.
10 11
 
12
+      If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases.
13
+
11 14
       To get oAuth credentials for Twitter, [follow these instructions](https://github.com/cantino/huginn/wiki/Getting-a-twitter-oauth-token).
12 15
 
13 16
       Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent.
@@ -51,14 +54,10 @@ module Agents
51 54
     default_schedule "11pm"
52 55
 
53 56
     def validate_options
54
-      unless options[:consumer_key].present? &&
55
-             options[:consumer_secret].present? &&
56
-             options[:access_key].present? &&
57
-             options[:access_secret].present? &&
58
-             options[:filters].present? &&
57
+      unless options[:filters].present? &&
59 58
              options[:expected_update_period_in_days].present? &&
60 59
              options[:generate].present?
61
-        errors.add(:base, "expected_update_period_in_days, generate, consumer_key, consumer_secret, access_key, access_secret, and filters are required fields")
60
+        errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields")
62 61
       end
63 62
     end
64 63
 
@@ -70,8 +69,8 @@ module Agents
70 69
       {
71 70
           :consumer_key => "---",
72 71
           :consumer_secret => "---",
73
-          :access_key => "---",
74
-          :access_secret => "---",
72
+          :oauth_token => "---",
73
+          :oauth_token_secret => "---",
75 74
           :filters => %w[keyword1 keyword2],
76 75
           :expected_update_period_in_days => "2",
77 76
           :generate => "events"
@@ -79,25 +78,51 @@ module Agents
79 78
     end
80 79
 
81 80
     def process_tweet(filter, status)
82
-      if options[:generate] == "counts"
83
-        # Avoid memory pollution
84
-        me = Agent.find(id)
85
-        me.memory[:filter_counts] ||= {}
86
-        me.memory[:filter_counts][filter.to_sym] ||= 0
87
-        me.memory[:filter_counts][filter.to_sym] += 1
88
-        me.save!
89
-      else
90
-        create_event :payload => status.merge(:filter => filter.to_s)
81
+      filter = lookup_filter(filter)
82
+
83
+      if filter
84
+        if options[:generate] == "counts"
85
+          # Avoid memory pollution by reloading the Agent.
86
+          agent = Agent.find(id)
87
+          agent.memory[:filter_counts] ||= {}
88
+          agent.memory[:filter_counts][filter.to_sym] ||= 0
89
+          agent.memory[:filter_counts][filter.to_sym] += 1
90
+          remove_unused_keys!(agent, :filter_counts)
91
+          agent.save!
92
+        else
93
+          create_event :payload => status.merge(:filter => filter.to_s)
94
+        end
91 95
       end
92 96
     end
93 97
 
94 98
     def check
95
-      if memory[:filter_counts] && memory[:filter_counts].length > 0
99
+      if options[:generate] == "counts" && memory[:filter_counts] && memory[:filter_counts].length > 0
96 100
         memory[:filter_counts].each do |filter, count|
97 101
           create_event :payload => { :filter => filter.to_s, :count => count, :time => Time.now.to_i }
98 102
         end
99
-        memory[:filter_counts] = {}
100
-        save!
103
+      end
104
+      memory[:filter_counts] = {}
105
+    end
106
+
107
+    protected
108
+
109
+    def lookup_filter(filter)
110
+      options[:filters].each do |known_filter|
111
+        if known_filter == filter
112
+          return filter
113
+        elsif known_filter.is_a?(Array)
114
+          if known_filter.include?(filter)
115
+            return known_filter.first
116
+          end
117
+        end
118
+      end
119
+    end
120
+
121
+    def remove_unused_keys!(agent, base)
122
+      if agent.memory[base]
123
+        (agent.memory[base].keys - agent.options[:filters].map {|f| f.is_a?(Array) ? f.first.to_sym : f.to_sym }).each do |removed_key|
124
+          agent.memory[base].delete(removed_key)
125
+        end
101 126
       end
102 127
     end
103 128
   end

+ 5 - 1
app/models/event.rb

@@ -1,5 +1,5 @@
1 1
 class Event < ActiveRecord::Base
2
-  attr_accessible :lat, :lng, :payload, :user_id, :user
2
+  attr_accessible :lat, :lng, :payload, :user_id, :user, :expires_at
3 3
 
4 4
   acts_as_mappable
5 5
 
@@ -21,4 +21,8 @@ class Event < ActiveRecord::Base
21 21
   def reemit!
22 22
     agent.create_event :payload => payload, :lat => lat, :lng => lng
23 23
   end
24
+
25
+  def self.cleanup_expired!
26
+    Event.where("expires_at IS NOT NULL AND expires_at < ?", Time.now).delete_all
27
+  end
24 28
 end

+ 60 - 34
bin/schedule.rb

@@ -10,57 +10,83 @@ end
10 10
 
11 11
 require 'rufus/scheduler'
12 12
 
13
-def run_schedule(time, mutex)
14
-  ActiveRecord::Base.connection_pool.with_connection do
15
-    mutex.synchronize do
13
+class HuginnScheduler
14
+  attr_accessor :mutex
15
+
16
+  def run_schedule(time)
17
+    with_mutex do
16 18
       puts "Queuing schedule for #{time}"
17 19
       Agent.delay.run_schedule(time)
18 20
     end
19 21
   end
20
-end
21 22
 
22
-def propogate!(mutex)
23
-  ActiveRecord::Base.connection_pool.with_connection do
24
-    mutex.synchronize do
23
+  def propagate!
24
+    with_mutex do
25 25
       puts "Queuing event propagation"
26 26
       Agent.delay.receive!
27 27
     end
28 28
   end
29
-end
30 29
 
31
-mutex = Mutex.new
30
+  def cleanup_expired_events!
31
+    with_mutex do
32
+      puts "Running event cleanup"
33
+      Event.delay.cleanup_expired!
34
+    end
35
+  end
32 36
 
33
-scheduler = Rufus::Scheduler.new
37
+  def with_mutex
38
+    ActiveRecord::Base.connection_pool.with_connection do
39
+      mutex.synchronize do
40
+        yield
41
+      end
42
+    end
43
+  end
34 44
 
35
-# Schedule event propagation.
45
+  def run!
46
+    self.mutex = Mutex.new
36 47
 
37
-scheduler.every '5m' do
38
-  propogate!(mutex)
39
-end
48
+    rufus_scheduler = Rufus::Scheduler.new
40 49
 
41
-# Schedule repeating events.
50
+    # Schedule event propagation.
42 51
 
43
-%w[2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
44
-  scheduler.every schedule do
45
-    run_schedule "every_#{schedule}", mutex
46
-  end
47
-end
52
+    rufus_scheduler.every '1m' do
53
+      propagate!
54
+    end
55
+
56
+    # Schedule event cleanup.
57
+
58
+    rufus_scheduler.cron "0 0 * * * America/Los_Angeles" do
59
+      cleanup_expired_events!
60
+    end
61
+
62
+    # Schedule repeating events.
48 63
 
49
-# Schedule events for specific times.
50
-
51
-# Times are assumed to be in PST for now.  Can store a user#timezone later.
52
-24.times do |hour|
53
-  scheduler.cron "0 #{hour} * * * America/Los_Angeles" do
54
-    if hour == 0
55
-      run_schedule "midnight", mutex
56
-    elsif hour < 12
57
-      run_schedule "#{hour}am", mutex
58
-    elsif hour == 12
59
-      run_schedule "noon", mutex
60
-    else
61
-      run_schedule "#{hour - 12}pm", mutex
64
+    %w[2m 5m 10m 30m 1h 2h 5h 12h 1d 2d 7d].each do |schedule|
65
+      rufus_scheduler.every schedule do
66
+        run_schedule "every_#{schedule}"
67
+      end
62 68
     end
69
+
70
+    # Schedule events for specific times.
71
+
72
+    # Times are assumed to be in PST for now.  Can store a user#timezone later.
73
+    24.times do |hour|
74
+      rufus_scheduler.cron "0 #{hour} * * * America/Los_Angeles" do
75
+        if hour == 0
76
+          run_schedule "midnight"
77
+        elsif hour < 12
78
+          run_schedule "#{hour}am"
79
+        elsif hour == 12
80
+          run_schedule "noon"
81
+        else
82
+          run_schedule "#{hour - 12}pm"
83
+        end
84
+      end
85
+    end
86
+
87
+    rufus_scheduler.join
63 88
   end
64 89
 end
65 90
 
66
-scheduler.join
91
+scheduler = HuginnScheduler.new
92
+scheduler.run!

+ 6 - 6
bin/twitter_stream.rb

@@ -17,11 +17,11 @@ require 'pp'
17 17
 def stream!(filters, options = {}, &block)
18 18
   stream = Twitter::JSONStream.connect(
19 19
     :path    => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
20
-    :oauth => {
20
+    :oauth   => {
21 21
       :consumer_key    => options[:consumer_key],
22 22
       :consumer_secret => options[:consumer_secret],
23
-      :access_key      => options[:access_key],
24
-      :access_secret   => options[:access_secret]
23
+      :access_key      => options[:oauth_token] || options[:access_key],
24
+      :access_secret   => options[:oauth_token_secret] || options[:access_secret]
25 25
     },
26 26
     :ssl     => true
27 27
   )
@@ -52,15 +52,15 @@ end
52 52
 
53 53
 def load_and_run(agents)
54 54
   agents.group_by { |agent| agent.options[:twitter_username] }.each do |twitter_username, agents|
55
-    filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.inject({}) { |m, f| m[f] = []; m }
55
+    filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
56 56
 
57 57
     agents.each do |agent|
58
-      agent.options[:filters].uniq.map(&:strip).each do |filter|
58
+      agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
59 59
         filter_to_agent_map[filter] << agent
60 60
       end
61 61
     end
62 62
 
63
-    options = agents.first.options.slice(:consumer_key, :consumer_secret, :access_key, :access_secret)
63
+    options = agents.first.options.slice(:consumer_key, :consumer_secret, :access_key, :oauth_token, :access_secret, :oauth_token_secret)
64 64
 
65 65
     recent_tweets = []
66 66
 

+ 6 - 0
db/migrate/20131105063248_add_expires_at_to_events.rb

@@ -0,0 +1,6 @@
1
+class AddExpiresAtToEvents < ActiveRecord::Migration
2
+  def change
3
+    add_column :events, :expires_at, :datetime
4
+    add_index :events, :expires_at
5
+  end
6
+end

+ 3 - 1
db/schema.rb

@@ -11,7 +11,7 @@
11 11
 #
12 12
 # It's strongly recommended to check this file into your version control system.
13 13
 
14
-ActiveRecord::Schema.define(:version => 20130819160603) do
14
+ActiveRecord::Schema.define(:version => 20131105063248) do
15 15
 
16 16
   create_table "agent_logs", :force => true do |t|
17 17
     t.integer  "agent_id",                         :null => false
@@ -67,9 +67,11 @@ ActiveRecord::Schema.define(:version => 20130819160603) do
67 67
     t.text     "payload",    :limit => 16777215
68 68
     t.datetime "created_at",                                                     :null => false
69 69
     t.datetime "updated_at",                                                     :null => false
70
+    t.datetime "expires_at"
70 71
   end
71 72
 
72 73
   add_index "events", ["agent_id", "created_at"], :name => "index_events_on_agent_id_and_created_at"
74
+  add_index "events", ["expires_at"], :name => "index_events_on_expires_at"
73 75
   add_index "events", ["user_id", "created_at"], :name => "index_events_on_user_id_and_created_at"
74 76
 
75 77
   create_table "links", :force => true do |t|

+ 1 - 1
spec/models/agent_spec.rb

@@ -186,7 +186,7 @@ describe Agent do
186 186
         }
187 187
         Agent.async_check(agents(:bob_weather_agent).id)
188 188
         lambda {
189
-          Agent.receive!
189
+          Agent.async_receive(agents(:bob_rain_notifier_agent).id, [agents(:bob_weather_agent).events.last.id])
190 190
         }.should raise_error
191 191
         log = agents(:bob_rain_notifier_agent).logs.first
192 192
         log.message.should =~ /Exception/

+ 6 - 7
spec/models/agents/peak_detector_agent_spec.rb

@@ -36,7 +36,7 @@ describe Agents::PeakDetectorAgent do
36 36
     end
37 37
 
38 38
     it "keeps a rolling window of data" do
39
-      @agent.options[:window_duration] = 5.hours
39
+      @agent.options[:window_duration_in_days] = 5/24.0
40 40
       @agent.receive build_events(:keys => [:count],
41 41
                                   :values => [1, 2, 3, 4, 5, 6, 7, 8].map {|i| [i]},
42 42
                                   :pattern => { :filter => "something" })
@@ -47,14 +47,14 @@ describe Agents::PeakDetectorAgent do
47 47
       build_events(:keys => [:count],
48 48
                    :values => [5, 6,
49 49
                                4, 5,
50
-                               8, 11,
50
+                               4, 5,
51 51
                                15, 11, # peak
52
-                               8, 5,
52
+                               8, 50, # ignored because it's too close to the first peak
53 53
                                4, 5].map {|i| [i]},
54 54
                    :pattern => { :filter => "something" }).each.with_index do |event, index|
55 55
         lambda {
56 56
           @agent.receive([event])
57
-        }.should change { @agent.events.count }.by( index == 7 ? 1 : 0 )
57
+        }.should change { @agent.events.count }.by( index == 6 ? 1 : 0 )
58 58
       end
59 59
 
60 60
       @agent.events.last.payload[:peak].should == 15.0
@@ -62,10 +62,9 @@ describe Agents::PeakDetectorAgent do
62 62
     end
63 63
 
64 64
     it "keeps a rolling window of peaks" do
65
-      @agent.options[:window_duration] = 5.hours
66
-      @agent.options[:peak_spacing] = 1.hour
65
+      @agent.options[:min_peak_spacing_in_days] = 1/24.0
67 66
       @agent.receive build_events(:keys => [:count],
68
-                                  :values => [1, 1, 1, 1, 1, 1, 10, 1, 1, 1, 10, 1, 1, 1, 10, 1].map {|i| [i]},
67
+                                  :values => [1, 1, 1, 1, 1, 1, 10, 1, 1, 1, 1, 1, 1, 1, 10, 1].map {|i| [i]},
69 68
                                   :pattern => { :filter => "something" })
70 69
       @agent.memory[:peaks][:something].length.should == 2
71 70
     end

+ 127 - 0
spec/models/agents/twitter_stream_agent_spec.rb

@@ -0,0 +1,127 @@
1
+require 'spec_helper'
2
+
3
+describe Agents::TwitterStreamAgent do
4
+  before do
5
+    @opts = {
6
+      :consumer_key => "---",
7
+      :consumer_secret => "---",
8
+      :oauth_token => "---",
9
+      :oauth_token_secret => "---",
10
+      :filters => %w[keyword1 keyword2],
11
+      :expected_update_period_in_days => "2",
12
+      :generate => "events"
13
+    }
14
+
15
+    @agent = Agents::TwitterStreamAgent.new(:name => "HuginnBot", :options => @opts)
16
+    @agent.user = users(:bob)
17
+    @agent.save!
18
+  end
19
+
20
+  describe '#process_tweet' do
21
+    context "when generate is set to 'counts'" do
22
+      before do
23
+        @agent.options[:generate] = 'counts'
24
+      end
25
+
26
+      it 'records counts' do
27
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
28
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
29
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
30
+
31
+        @agent.reload
32
+        @agent.memory[:filter_counts][:keyword1].should == 2
33
+        @agent.memory[:filter_counts][:keyword2].should == 1
34
+      end
35
+
36
+      it 'records counts for keyword sets as well' do
37
+        @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
38
+        @agent.save!
39
+
40
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
41
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
42
+        @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
43
+        @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
44
+        @agent.process_tweet('keyword1-3', {:text => "something", :user => {:name => "Mr. Someone"}})
45
+        @agent.process_tweet('keyword1-1', {:text => "something", :user => {:name => "Mr. Someone"}})
46
+
47
+        @agent.reload
48
+        @agent.memory[:filter_counts][:'keyword1-1'].should == 4 # it stores on the first keyword
49
+        @agent.memory[:filter_counts][:keyword2].should == 2
50
+      end
51
+
52
+      it 'removes unused keys' do
53
+        @agent.memory[:filter_counts] = {:keyword1 => 2, :keyword2 => 3, :keyword3 => 4}
54
+        @agent.save!
55
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
56
+        @agent.reload.memory[:filter_counts].should == {:keyword1 => 3, :keyword2 => 3}
57
+      end
58
+    end
59
+
60
+    context "when generate is set to 'events'" do
61
+      it 'emits events immediately' do
62
+        lambda {
63
+          @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
64
+        }.should change { @agent.events.count }.by(1)
65
+
66
+        @agent.events.last.payload.should == {
67
+          :filter => 'keyword1',
68
+          :text => "something",
69
+          :user => {:name => "Mr. Someone"}
70
+        }
71
+      end
72
+
73
+      it 'handles keyword sets too' do
74
+        @agent.options[:filters][0] = %w[keyword1-1 keyword1-2 keyword1-3]
75
+        @agent.save!
76
+
77
+        lambda {
78
+          @agent.process_tweet('keyword1-2', {:text => "something", :user => {:name => "Mr. Someone"}})
79
+        }.should change { @agent.events.count }.by(1)
80
+
81
+        @agent.events.last.payload.should == {
82
+          :filter => 'keyword1-1',
83
+          :text => "something",
84
+          :user => {:name => "Mr. Someone"}
85
+        }
86
+      end
87
+    end
88
+  end
89
+
90
+  describe '#check' do
91
+    context "when generate is set to 'counts'" do
92
+      before do
93
+        @agent.options[:generate] = 'counts'
94
+        @agent.save!
95
+      end
96
+
97
+      it 'emits events' do
98
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
99
+        @agent.process_tweet('keyword2', {:text => "something", :user => {:name => "Mr. Someone"}})
100
+        @agent.process_tweet('keyword1', {:text => "something", :user => {:name => "Mr. Someone"}})
101
+
102
+        lambda {
103
+          @agent.reload.check
104
+        }.should change { @agent.events.count }.by(2)
105
+
106
+        @agent.events[-1].payload[:filter].should == 'keyword1'
107
+        @agent.events[-1].payload[:count].should == 2
108
+
109
+        @agent.events[-2].payload[:filter].should == 'keyword2'
110
+        @agent.events[-2].payload[:count].should == 1
111
+
112
+        @agent.memory[:filter_counts].should == {}
113
+      end
114
+    end
115
+
116
+    context "when generate is not set to 'counts'" do
117
+      it 'does nothing' do
118
+        @agent.memory[:filter_counts] = { :keyword1 => 2 }
119
+        @agent.save!
120
+        lambda {
121
+          @agent.reload.check
122
+        }.should_not change { Event.count }
123
+        @agent.memory[:filter_counts].should == {}
124
+      end
125
+    end
126
+  end
127
+end

+ 34 - 0
spec/models/event_spec.rb

@@ -16,4 +16,38 @@ describe Event do
16 16
       Event.last.created_at.should be_within(1).of(Time.now)
17 17
     end
18 18
   end
19
+
20
+  describe ".cleanup_expired!" do
21
+    it "removes any Events whose expired_at date is non-null and in the past" do
22
+      event = agents(:jane_weather_agent).create_event :expires_at => 2.hours.from_now
23
+
24
+      current_time = Time.now
25
+      stub(Time).now { current_time }
26
+
27
+      Event.cleanup_expired!
28
+      Event.find_by_id(event.id).should_not be_nil
29
+      current_time = 119.minutes.from_now
30
+      Event.cleanup_expired!
31
+      Event.find_by_id(event.id).should_not be_nil
32
+      current_time = 2.minutes.from_now
33
+      Event.cleanup_expired!
34
+      Event.find_by_id(event.id).should be_nil
35
+    end
36
+
37
+    it "doesn't touch Events with no expired_at" do
38
+      event = Event.new
39
+      event.agent = agents(:jane_weather_agent)
40
+      event.expires_at = nil
41
+      event.save!
42
+
43
+      current_time = Time.now
44
+      stub(Time).now { current_time }
45
+
46
+      Event.cleanup_expired!
47
+      Event.find_by_id(event.id).should_not be_nil
48
+      current_time = 2.days.from_now
49
+      Event.cleanup_expired!
50
+      Event.find_by_id(event.id).should_not be_nil
51
+    end
52
+  end
19 53
 end