@@ -159,7 +159,7 @@ module Agents |
||
| 159 | 159 |
@filter_to_agent_map = @config[:filter_to_agent_map] |
| 160 | 160 |
|
| 161 | 161 |
schedule_in RELOAD_TIMEOUT do |
| 162 |
- puts "--> Restarting TwitterStream #{id}"
|
|
| 162 |
+ puts "--> Restarting TwitterStream #{id} at #{Time.now} <--"
|
|
| 163 | 163 |
restart! |
| 164 | 164 |
end |
| 165 | 165 |
end |
@@ -199,16 +199,16 @@ module Agents |
||
| 199 | 199 |
end |
| 200 | 200 |
|
| 201 | 201 |
stream.on_error do |message| |
| 202 |
- STDERR.puts " --> Twitter error: #{message} <--"
|
|
| 202 |
+ STDERR.puts " --> Twitter error: #{message} at #{Time.now} <--"
|
|
| 203 | 203 |
end |
| 204 | 204 |
|
| 205 | 205 |
stream.on_no_data do |message| |
| 206 |
- STDERR.puts " --> Got no data for awhile; trying to reconnect." |
|
| 206 |
+ STDERR.puts " --> Got no data for awhile; trying to reconnect at #{Time.now} <--"
|
|
| 207 | 207 |
restart! |
| 208 | 208 |
end |
| 209 | 209 |
|
| 210 | 210 |
stream.on_max_reconnects do |timeout, retries| |
| 211 |
- STDERR.puts " --> Oops, tried too many times! <--" |
|
| 211 |
+ STDERR.puts " --> Oops, tried too many times! at #{Time.now} <--"
|
|
| 212 | 212 |
sleep 60 |
| 213 | 213 |
restart! |
| 214 | 214 |
end |
@@ -222,20 +222,18 @@ module Agents |
||
| 222 | 222 |
status['text'] = status['text'].gsub(/</, "<").gsub(/>/, ">").gsub(/[\t\n\r]/, ' ') |
| 223 | 223 |
|
| 224 | 224 |
if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash) |
| 225 |
- puts "Skipping retweet: #{status["text"]}"
|
|
| 226 | 225 |
return |
| 227 | 226 |
elsif @recent_tweets.include?(status["id_str"]) |
| 228 |
- puts "Skipping duplicate tweet: #{status["text"]}"
|
|
| 227 |
+ puts "(#{Time.now}) Skipping duplicate tweet: #{status["text"]}"
|
|
| 229 | 228 |
return |
| 230 | 229 |
end |
| 231 | 230 |
|
| 232 | 231 |
@recent_tweets << status["id_str"] |
| 233 | 232 |
@recent_tweets.shift if @recent_tweets.length > DUPLICATE_DETECTION_LENGTH |
| 234 |
- puts status["text"] |
|
| 235 | 233 |
@filter_to_agent_map.keys.each do |filter| |
| 236 | 234 |
next unless (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson |
| 237 | 235 |
@filter_to_agent_map[filter].each do |agent| |
| 238 |
- puts " -> #{agent.name}"
|
|
| 236 |
+ puts "(#{Time.now}) #{agent.name} received: #{status["text"]}"
|
|
| 239 | 237 |
AgentRunner.with_connection do |
| 240 | 238 |
agent.process_tweet(filter, status) |
| 241 | 239 |
end |
@@ -222,20 +222,20 @@ describe Agents::TwitterStreamAgent do |
||
| 222 | 222 |
context "callback handling" do |
| 223 | 223 |
it "logs error messages" do |
| 224 | 224 |
stub_without(:on_error).on_error.yields('woups')
|
| 225 |
- mock(STDERR).puts(" --> Twitter error: woups <--")
|
|
| 225 |
+ mock(STDERR).puts(anything) { |text| expect(text).to match(/woups/) }
|
|
| 226 | 226 |
@worker.send(:stream!, ['agent'], @agent) |
| 227 | 227 |
end |
| 228 | 228 |
|
| 229 | 229 |
it "stop when no data was received"do |
| 230 | 230 |
stub_without(:on_no_data).on_no_data.yields |
| 231 | 231 |
mock(@worker).restart! |
| 232 |
- mock(STDERR).puts(" --> Got no data for awhile; trying to reconnect.")
|
|
| 232 |
+ mock(STDERR).puts(anything) |
|
| 233 | 233 |
@worker.send(:stream!, ['agent'], @agent) |
| 234 | 234 |
end |
| 235 | 235 |
|
| 236 | 236 |
it "sleeps for 60 seconds on_max_reconnects" do |
| 237 | 237 |
stub_without(:on_max_reconnects).on_max_reconnects.yields |
| 238 |
- mock(STDERR).puts(" --> Oops, tried too many times! <--")
|
|
| 238 |
+ mock(STDERR).puts(anything) |
|
| 239 | 239 |
mock(@worker).sleep(60) |
| 240 | 240 |
mock(@worker).restart! |
| 241 | 241 |
@worker.send(:stream!, ['agent'], @agent) |
@@ -252,22 +252,21 @@ describe Agents::TwitterStreamAgent do |
||
| 252 | 252 |
|
| 253 | 253 |
context "#handle_status" do |
| 254 | 254 |
it "skips retweets" do |
| 255 |
- mock.instance_of(IO).puts('Skipping retweet: retweet')
|
|
| 256 |
- @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}})
|
|
| 255 |
+ @worker.send(:handle_status, {'text' => 'retweet', 'retweeted_status' => {one: true}, 'id_str' => '123' })
|
|
| 256 |
+ expect(@worker.instance_variable_get(:'@recent_tweets')).not_to include('123')
|
|
| 257 | 257 |
end |
| 258 | 258 |
|
| 259 | 259 |
it "deduplicates tweets" do |
| 260 |
- mock.instance_of(IO).puts("dup")
|
|
| 261 |
- @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
|
|
| 262 |
- mock.instance_of(IO).puts("Skipping duplicate tweet: dup")
|
|
| 263 |
- @worker.send(:handle_status, {'text' => 'dup', 'id_str' => 1})
|
|
| 260 |
+ @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
|
|
| 261 |
+ @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
|
|
| 262 |
+ expect(@worker.instance_variable_get(:'@recent_tweets').select { |str| str == '1' }.length).to eq 1
|
|
| 264 | 263 |
end |
| 265 | 264 |
|
| 266 | 265 |
it "calls the agent to process the tweet" do |
| 267 |
- stub.instance_of(IO).puts |
|
| 268 | 266 |
mock(@mock_agent).name { 'mock' }
|
| 269 | 267 |
mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
|
| 270 |
- @worker.send(:handle_status, {'text' => 'agent'})
|
|
| 268 |
+ @worker.send(:handle_status, {'text' => 'agent', 'id_str' => '123'})
|
|
| 269 |
+ expect(@worker.instance_variable_get(:'@recent_tweets')).to include('123')
|
|
| 271 | 270 |
end |
| 272 | 271 |
end |
| 273 | 272 |
end |