Merge pull request #1277 from dsander/twitter-stream-fix

TwitterStreamAgent prevent connection in use and restart issues

Dominik Sander vor 8 Jahren
Ursprung
Commit
652739f18e

+ 3 - 1
app/concerns/long_runnable.rb

@@ -98,13 +98,15 @@ module LongRunnable
98 98
 
99 99
     def terminate_thread!
100 100
       if thread
101
-        thread.terminate
101
+        thread.instance_eval { ActiveRecord::Base.connection_pool.release_connection }
102 102
         thread.wakeup if thread.status == 'sleep'
103
+        thread.terminate
103 104
       end
104 105
     end
105 106
 
106 107
     def restart!
107 108
       without_alive_check do
109
+        puts "--> Restarting #{id} at #{Time.now} <--"
108 110
         stop!
109 111
         setup!(scheduler, mutex)
110 112
         run!

+ 6 - 5
app/models/agents/twitter_stream_agent.rb

@@ -157,16 +157,14 @@ module Agents
157 157
       def setup
158 158
         require 'twitter/json_stream'
159 159
         @filter_to_agent_map = @config[:filter_to_agent_map]
160
-
161
-        schedule_in RELOAD_TIMEOUT do
162
-          puts "--> Restarting TwitterStream #{id} at #{Time.now} <--"
163
-          restart!
164
-        end
165 160
       end
166 161
 
167 162
       def run
168 163
         @recent_tweets = []
169 164
         EventMachine.run do
165
+          EventMachine.add_periodic_timer(RELOAD_TIMEOUT) do
166
+            restart!
167
+          end
170 168
           stream!(@filter_to_agent_map.keys, @agent) do |status|
171 169
             handle_status(status)
172 170
           end
@@ -200,6 +198,9 @@ module Agents
200 198
 
201 199
         stream.on_error do |message|
202 200
           STDERR.puts " --> Twitter error: #{message} at #{Time.now} <--"
201
+          STDERR.puts " --> Sleeping for 15 seconds"
202
+          sleep 15
203
+          restart!
203 204
         end
204 205
 
205 206
         stream.on_no_data do |message|

+ 1 - 0
spec/concerns/long_runnable_spec.rb

@@ -118,6 +118,7 @@ describe LongRunnable do
118 118
         mock(@worker).stop!
119 119
         mock(@worker).setup!(@worker.scheduler, @worker.mutex)
120 120
         mock(@worker).run!
121
+        mock(@worker).puts(anything) { |text| expect(text).to match(/Restarting/) }
121 122
         @worker.restart!
122 123
       end
123 124
     end

+ 11 - 3
spec/models/agents/twitter_stream_agent_spec.rb

@@ -169,20 +169,23 @@ describe Agents::TwitterStreamAgent do
169 169
       @config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
170 170
       @worker = Agents::TwitterStreamAgent::Worker.new(@config)
171 171
       @worker.instance_variable_set(:@recent_tweets, [])
172
-      mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
172
+      #mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
173 173
       @worker.setup!(nil, Mutex.new)
174 174
     end
175 175
 
176 176
     context "#run" do
177
-      it "starts the stream" do
177
+      before(:each) do
178 178
         mock(EventMachine).run.yields
179
+        mock(EventMachine).add_periodic_timer(3600)
180
+      end
181
+
182
+      it "starts the stream" do
179 183
         mock(@worker).stream!(['agent'], @agent)
180 184
         mock(Thread).stop
181 185
         @worker.run
182 186
       end
183 187
 
184 188
       it "yields received tweets" do
185
-        mock(EventMachine).run.yields
186 189
         mock(@worker).stream!(['agent'], @agent).yields('status' => 'hello')
187 190
         mock(@worker).handle_status('status' => 'hello')
188 191
         mock(Thread).stop
@@ -222,6 +225,9 @@ describe Agents::TwitterStreamAgent do
222 225
         it "logs error messages" do
223 226
           stub_without(:on_error).on_error.yields('woups')
224 227
           mock(STDERR).puts(anything) { |text| expect(text).to match(/woups/) }
228
+          mock(STDERR).puts(anything) { |text| expect(text).to match(/Sleeping/) }
229
+          mock(@worker).sleep(15)
230
+          mock(@worker).restart!
225 231
           @worker.send(:stream!, ['agent'], @agent)
226 232
         end
227 233
 
@@ -257,6 +263,7 @@ describe Agents::TwitterStreamAgent do
257 263
 
258 264
       it "deduplicates tweets" do
259 265
         @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
266
+        mock(@worker).puts(anything) { |text| expect(text).to match(/Skipping/) }
260 267
         @worker.send(:handle_status, {'text' => 'dup', 'id_str' => '1'})
261 268
         expect(@worker.instance_variable_get(:'@recent_tweets').select { |str| str == '1' }.length).to eq 1
262 269
       end
@@ -264,6 +271,7 @@ describe Agents::TwitterStreamAgent do
264 271
       it "calls the agent to process the tweet" do
265 272
         mock(@mock_agent).name { 'mock' }
266 273
         mock(@mock_agent).process_tweet('agent', {'text' => 'agent'})
274
+        mock(@worker).puts(anything) { |text| expect(text).to match(/received/) }
267 275
         @worker.send(:handle_status, {'text' => 'agent', 'id_str' => '123'})
268 276
         expect(@worker.instance_variable_get(:'@recent_tweets')).to include('123')
269 277
       end