MqttAgent: Ignore a retained message previously received.

This should fix the problem where MqttAgent creates an event from the
same last message every time it runs if it is retained by the server.

Akinori MUSHA 10 years ago
parent
commit
ca05d40498
3 changed files with 57 additions and 17 deletions
  1. 19 4
      app/models/agents/mqtt_agent.rb
  2. 8 2
      spec/models/agents/mqtt_agent_spec.rb
  3. 30 11
      spec/support/fake_mqtt_server.rb

+ 19 - 4
app/models/agents/mqtt_agent.rb

@@ -115,15 +115,26 @@ module Agents
115 115
 
116 116
 
117 117
     def check
118
+      last_message = memory['last_message']
119
+
118 120
       mqtt_client.connect do |c|
119 121
         begin
120 122
           Timeout.timeout((interpolated['max_read_time'].presence || 15).to_i) {
121
-            c.get(interpolated['topic']) do |topic, message|
123
+            c.get_packet(interpolated['topic']) do |packet|
124
+              topic, payload = message = [packet.topic, packet.payload]
125
+
126
+              # Ignore a message if it is previously received
127
+              next if (packet.retain || packet.duplicate) && message == last_message
122 128
 
123
-              # A lot of services generate JSON. Try that first
124
-              payload = JSON.parse(message) rescue message
129
+              last_message = message
125 130
 
126
-              create_event :payload => {
131
+              # A lot of services generate JSON, so try that.
132
+              begin
133
+                payload = JSON.parse(payload)
134
+              rescue
135
+              end
136
+
137
+              create_event payload: {
127 138
                 'topic' => topic,
128 139
                 'message' => payload,
129 140
                 'time' => Time.now.to_i
@@ -133,6 +144,10 @@ module Agents
133 144
         rescue Timeout::Error
134 145
         end
135 146
       end
147
+
148
+      # Remember the last original (non-retain, non-duplicate) message
149
+      self.memory['last_message'] = last_message
150
+      save!
136 151
     end
137 152
 
138 153
   end

+ 8 - 2
spec/models/agents/mqtt_agent_spec.rb

@@ -8,7 +8,6 @@ describe Agents::MqttAgent do
8 8
     @error_log = StringIO.new
9 9
 
10 10
     @server = MQTT::FakeServer.new(41234, '127.0.0.1')
11
-    @server.just_one = true
12 11
     @server.logger = Logger.new(@error_log)
13 12
     @server.logger.level = Logger::DEBUG
14 13
     @server.start
@@ -34,7 +33,14 @@ describe Agents::MqttAgent do
34 33
   end
35 34
 
36 35
   describe "#check" do
37
-    it "should check that initial run creates an event" do
36
+    it "should create events in the initial run" do
37
+      expect { @checker.check }.to change { Event.count }.by(2)
38
+    end
39
+
40
+    it "should ignore retained messages that are previously received" do
41
+      expect { @checker.check }.to change { Event.count }.by(2)
42
+      expect { @checker.check }.to change { Event.count }.by(1)
43
+      expect { @checker.check }.to change { Event.count }.by(1)
38 44
       expect { @checker.check }.to change { Event.count }.by(2)
39 45
     end
40 46
   end

+ 30 - 11
spec/support/fake_mqtt_server.rb

@@ -52,7 +52,9 @@ class MQTT::FakeServer
52 52
     @port = @socket.addr[1]
53 53
     @thread ||= Thread.new do
54 54
       logger.info "Started a fake MQTT server on #{@address}:#{@port}"
55
+      @times = 0
55 56
       loop do
57
+        @times += 1
56 58
         # Wait for a client to connect
57 59
         client = @socket.accept
58 60
         @pings_received = 0
@@ -103,16 +105,33 @@ class MQTT::FakeServer
103 105
             :granted_qos => 0
104 106
           )
105 107
           topic = packet.topics[0][0]
106
-          client.write MQTT::Packet::Publish.new(
107
-            :topic => topic,
108
-            :payload => "hello #{topic}",
109
-            :retain => true
110
-          )
111
-          client.write MQTT::Packet::Publish.new(
112
-            :topic => topic,
113
-            :payload => "did you know about #{topic}",
114
-            :retain => true
115
-          )
108
+          case @times
109
+          when 1, ->x { x >= 3 }
110
+            # Deliver retained messages
111
+            client.write MQTT::Packet::Publish.new(
112
+              :topic => topic,
113
+              :payload => "did you know about #{topic}",
114
+              :retain => true
115
+            )
116
+            client.write MQTT::Packet::Publish.new(
117
+              :topic => topic,
118
+              :payload => "hello #{topic}",
119
+              :retain => true
120
+            )
121
+          when 2
122
+            # Deliver a still retained message
123
+            client.write MQTT::Packet::Publish.new(
124
+              :topic => topic,
125
+              :payload => "hello #{topic}",
126
+              :retain => true
127
+            )
128
+            # Deliver a fresh message
129
+            client.write MQTT::Packet::Publish.new(
130
+              :topic => topic,
131
+              :payload => "did you know about #{topic}",
132
+              :retain => false
133
+            )
134
+          end
116 135
 
117 136
         when MQTT::Packet::Pingreq
118 137
           client.write MQTT::Packet::Pingresp.new
@@ -134,4 +153,4 @@ if __FILE__ == $0
134 153
   server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT)
135 154
   server.logger.level = Logger::DEBUG
136 155
   server.run
137
-end
156
+end