Merge pull request #1142 from dsander/fix-twitter-agent-restart-leak

Do check restarting LongRunnable::Worker for liveliness

Dominik Sander 9 years ago
parent
commit
28983a7378

+ 21 - 5
app/concerns/long_runnable.rb

@@ -51,12 +51,13 @@ module LongRunnable
51 51
   end
52 52
 
53 53
   class Worker
54
-    attr_reader :thread, :id, :agent, :config, :mutex, :scheduler
54
+    attr_reader :thread, :id, :agent, :config, :mutex, :scheduler, :restarting
55 55
 
56 56
     def initialize(options = {})
57 57
       @id = options[:id]
58 58
       @agent = options[:agent]
59 59
       @config = options[:config]
60
+      @restarting = false
60 61
     end
61 62
 
62 63
     def run
@@ -65,6 +66,7 @@ module LongRunnable
65 66
 
66 67
     def run!
67 68
       @thread = Thread.new do
69
+        Thread.current[:name] = "#{id}-#{Time.now}"
68 70
         begin
69 71
           run
70 72
         rescue SignalException, SystemExit
@@ -90,14 +92,21 @@ module LongRunnable
90 92
       if respond_to?(:stop)
91 93
         stop
92 94
       else
93
-        thread.terminate
95
+        terminate_thread!
94 96
       end
95 97
     end
96 98
 
99
+    def terminate_thread!
100
+      thread.terminate
101
+      thread.wakeup if thread.status == 'sleep'
102
+    end
103
+
97 104
     def restart!
98
-      stop!
99
-      setup!(scheduler, mutex)
100
-      run!
105
+      without_alive_check do
106
+        stop!
107
+        setup!(scheduler, mutex)
108
+        run!
109
+      end
101 110
     end
102 111
 
103 112
     def every(*args, &blk)
@@ -120,5 +129,12 @@ module LongRunnable
120 129
     def schedule(method, args, &blk)
121 130
       @scheduler.send(method, *args, tag: id, &blk)
122 131
     end
132
+
133
+    def without_alive_check(&blk)
134
+      @restarting = true
135
+      yield
136
+    ensure
137
+      @restarting = false
138
+    end
123 139
   end
124 140
 end

+ 1 - 1
app/models/agents/twitter_stream_agent.rb

@@ -176,7 +176,7 @@ module Agents
176 176
 
177 177
       def stop
178 178
         EventMachine.stop_event_loop if EventMachine.reactor_running?
179
-        thread.terminate
179
+        terminate_thread!
180 180
       end
181 181
 
182 182
       private

+ 1 - 1
lib/agent_runner.rb

@@ -100,7 +100,7 @@ class AgentRunner
100 100
 
101 101
   def restart_dead_workers
102 102
     @workers.each_pair do |id, worker|
103
-      if worker.thread && !worker.thread.alive?
103
+      if !worker.restarting && worker.thread && !worker.thread.alive?
104 104
         puts "Restarting #{id.to_s}"
105 105
         @workers[id].run!
106 106
       end

+ 8 - 0
spec/concerns/long_runnable_spec.rb

@@ -76,6 +76,14 @@ describe LongRunnable do
76 76
     context "#stop!" do
77 77
       it "terminates the thread" do
78 78
         mock(@worker.thread).terminate
79
+        mock(@worker.thread).status { 'run' }
80
+        @worker.stop!
81
+      end
82
+
83
+      it "wakes up sleeping threads after termination" do
84
+        mock(@worker.thread).terminate
85
+        mock(@worker.thread).status { 'sleep' }
86
+        mock(@worker.thread).wakeup
79 87
         @worker.stop!
80 88
       end
81 89
 

+ 1 - 0
spec/models/agents/twitter_stream_agent_spec.rb

@@ -193,6 +193,7 @@ describe Agents::TwitterStreamAgent do
193 193
     context "#stop" do
194 194
       it "stops the thread" do
195 195
         mock(@worker.thread).terminate
196
+        mock(@worker.thread).status
196 197
         @worker.stop
197 198
       end
198 199
     end