Merge pull request #579 from knu/mqtt_agent-dedup

MqttAgent: Ignore a retained message previously received.

Akinori MUSHA 9 years ago
parent
commit
f4478d2be3
4 changed files with 70 additions and 32 deletions
  1. 1 1
      Gemfile.lock
  2. 31 18
      app/models/agents/mqtt_agent.rb
  3. 8 2
      spec/models/agents/mqtt_agent_spec.rb
  4. 30 11
      spec/support/fake_mqtt_server.rb

+ 1 - 1
Gemfile.lock

@@ -184,7 +184,7 @@ GEM
184 184
     mime-types (1.25.1)
185 185
     mini_portile (0.6.0)
186 186
     minitest (5.4.0)
187
-    mqtt (0.2.0)
187
+    mqtt (0.3.1)
188 188
     multi_json (1.10.1)
189 189
     multi_xml (0.5.5)
190 190
     multipart-post (2.0.0)

+ 31 - 18
app/models/agents/mqtt_agent.rb

@@ -110,31 +110,44 @@ module Agents
110 110
         incoming_events.each do |event|
111 111
           c.publish(interpolated(event)['topic'], event)
112 112
         end
113
-
114
-        c.disconnect
115 113
       end
116 114
     end
117 115
 
118 116
 
119 117
     def check
120
-      mqtt_client.connect do |c|
121
-
122
-        Timeout::timeout((interpolated['max_read_time'].presence || 15).to_i) {
123
-          c.get(interpolated['topic']) do |topic, message|
124
-
125
-            # A lot of services generate JSON. Try that first
126
-            payload = JSON.parse(message) rescue message
118
+      last_message = memory['last_message']
127 119
 
128
-            create_event :payload => { 
129
-              'topic' => topic, 
130
-              'message' => payload, 
131
-              'time' => Time.now.to_i 
132
-            }
133
-          end
134
-        } rescue TimeoutError
135
-
136
-        c.disconnect   
120
+      mqtt_client.connect do |c|
121
+        begin
122
+          Timeout.timeout((interpolated['max_read_time'].presence || 15).to_i) {
123
+            c.get_packet(interpolated['topic']) do |packet|
124
+              topic, payload = message = [packet.topic, packet.payload]
125
+
126
+              # Ignore a message if it is previously received
127
+              next if (packet.retain || packet.duplicate) && message == last_message
128
+
129
+              last_message = message
130
+
131
+              # A lot of services generate JSON, so try that.
132
+              begin
133
+                payload = JSON.parse(payload)
134
+              rescue
135
+              end
136
+
137
+              create_event payload: {
138
+                'topic' => topic,
139
+                'message' => payload,
140
+                'time' => Time.now.to_i
141
+              }
142
+            end
143
+          }
144
+        rescue Timeout::Error
145
+        end
137 146
       end
147
+
148
+      # Remember the last original (non-retain, non-duplicate) message
149
+      self.memory['last_message'] = last_message
150
+      save!
138 151
     end
139 152
 
140 153
   end

+ 8 - 2
spec/models/agents/mqtt_agent_spec.rb

@@ -8,7 +8,6 @@ describe Agents::MqttAgent do
8 8
     @error_log = StringIO.new
9 9
 
10 10
     @server = MQTT::FakeServer.new(41234, '127.0.0.1')
11
-    @server.just_one = true
12 11
     @server.logger = Logger.new(@error_log)
13 12
     @server.logger.level = Logger::DEBUG
14 13
     @server.start
@@ -34,7 +33,14 @@ describe Agents::MqttAgent do
34 33
   end
35 34
 
36 35
   describe "#check" do
37
-    it "should check that initial run creates an event" do
36
+    it "should create events in the initial run" do
37
+      expect { @checker.check }.to change { Event.count }.by(2)
38
+    end
39
+
40
+    it "should ignore retained messages that are previously received" do
41
+      expect { @checker.check }.to change { Event.count }.by(2)
42
+      expect { @checker.check }.to change { Event.count }.by(1)
43
+      expect { @checker.check }.to change { Event.count }.by(1)
38 44
       expect { @checker.check }.to change { Event.count }.by(2)
39 45
     end
40 46
   end

+ 30 - 11
spec/support/fake_mqtt_server.rb

@@ -52,7 +52,9 @@ class MQTT::FakeServer
52 52
     @port = @socket.addr[1]
53 53
     @thread ||= Thread.new do
54 54
       logger.info "Started a fake MQTT server on #{@address}:#{@port}"
55
+      @times = 0
55 56
       loop do
57
+        @times += 1
56 58
         # Wait for a client to connect
57 59
         client = @socket.accept
58 60
         @pings_received = 0
@@ -103,16 +105,33 @@ class MQTT::FakeServer
103 105
             :granted_qos => 0
104 106
           )
105 107
           topic = packet.topics[0][0]
106
-          client.write MQTT::Packet::Publish.new(
107
-            :topic => topic,
108
-            :payload => "hello #{topic}",
109
-            :retain => true
110
-          )
111
-          client.write MQTT::Packet::Publish.new(
112
-            :topic => topic,
113
-            :payload => "did you know about #{topic}",
114
-            :retain => true
115
-          )
108
+          case @times
109
+          when 1, ->x { x >= 3 }
110
+            # Deliver retained messages
111
+            client.write MQTT::Packet::Publish.new(
112
+              :topic => topic,
113
+              :payload => "did you know about #{topic}",
114
+              :retain => true
115
+            )
116
+            client.write MQTT::Packet::Publish.new(
117
+              :topic => topic,
118
+              :payload => "hello #{topic}",
119
+              :retain => true
120
+            )
121
+          when 2
122
+            # Deliver a still retained message
123
+            client.write MQTT::Packet::Publish.new(
124
+              :topic => topic,
125
+              :payload => "hello #{topic}",
126
+              :retain => true
127
+            )
128
+            # Deliver a fresh message
129
+            client.write MQTT::Packet::Publish.new(
130
+              :topic => topic,
131
+              :payload => "did you know about #{topic}",
132
+              :retain => false
133
+            )
134
+          end
116 135
 
117 136
         when MQTT::Packet::Pingreq
118 137
           client.write MQTT::Packet::Pingresp.new
@@ -134,4 +153,4 @@ if __FILE__ == $0
134 153
   server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT)
135 154
   server.logger.level = Logger::DEBUG
136 155
   server.run
137
-end
156
+end