@@ -188,7 +188,7 @@ GEM |
||
188 | 188 |
mime-types (1.25.1) |
189 | 189 |
mini_portile (0.6.0) |
190 | 190 |
minitest (5.4.0) |
191 |
- mqtt (0.2.0) |
|
191 |
+ mqtt (0.3.1) |
|
192 | 192 |
multi_json (1.10.1) |
193 | 193 |
multi_xml (0.5.5) |
194 | 194 |
multipart-post (2.0.0) |
@@ -110,31 +110,44 @@ module Agents |
||
110 | 110 |
incoming_events.each do |event| |
111 | 111 |
c.publish(interpolated(event)['topic'], event) |
112 | 112 |
end |
113 |
- |
|
114 |
- c.disconnect |
|
115 | 113 |
end |
116 | 114 |
end |
117 | 115 |
|
118 | 116 |
|
119 | 117 |
def check |
120 |
- mqtt_client.connect do |c| |
|
121 |
- |
|
122 |
- Timeout::timeout((interpolated['max_read_time'].presence || 15).to_i) { |
|
123 |
- c.get(interpolated['topic']) do |topic, message| |
|
124 |
- |
|
125 |
- # A lot of services generate JSON. Try that first |
|
126 |
- payload = JSON.parse(message) rescue message |
|
118 |
+ last_message = memory['last_message'] |
|
127 | 119 |
|
128 |
- create_event :payload => { |
|
129 |
- 'topic' => topic, |
|
130 |
- 'message' => payload, |
|
131 |
- 'time' => Time.now.to_i |
|
132 |
- } |
|
133 |
- end |
|
134 |
- } rescue TimeoutError |
|
135 |
- |
|
136 |
- c.disconnect |
|
120 |
+ mqtt_client.connect do |c| |
|
121 |
+ begin |
|
122 |
+ Timeout.timeout((interpolated['max_read_time'].presence || 15).to_i) { |
|
123 |
+ c.get_packet(interpolated['topic']) do |packet| |
|
124 |
+ topic, payload = message = [packet.topic, packet.payload] |
|
125 |
+ |
|
126 |
+ # Ignore a message if it is previously received |
|
127 |
+ next if (packet.retain || packet.duplicate) && message == last_message |
|
128 |
+ |
|
129 |
+ last_message = message |
|
130 |
+ |
|
131 |
+ # A lot of services generate JSON, so try that. |
|
132 |
+ begin |
|
133 |
+ payload = JSON.parse(payload) |
|
134 |
+ rescue |
|
135 |
+ end |
|
136 |
+ |
|
137 |
+ create_event payload: { |
|
138 |
+ 'topic' => topic, |
|
139 |
+ 'message' => payload, |
|
140 |
+ 'time' => Time.now.to_i |
|
141 |
+ } |
|
142 |
+ end |
|
143 |
+ } |
|
144 |
+ rescue Timeout::Error |
|
145 |
+ end |
|
137 | 146 |
end |
147 |
+ |
|
148 |
+ # Remember the last original (non-retain, non-duplicate) message |
|
149 |
+ self.memory['last_message'] = last_message |
|
150 |
+ save! |
|
138 | 151 |
end |
139 | 152 |
|
140 | 153 |
end |
@@ -8,7 +8,6 @@ describe Agents::MqttAgent do |
||
8 | 8 |
@error_log = StringIO.new |
9 | 9 |
|
10 | 10 |
@server = MQTT::FakeServer.new(41234, '127.0.0.1') |
11 |
- @server.just_one = true |
|
12 | 11 |
@server.logger = Logger.new(@error_log) |
13 | 12 |
@server.logger.level = Logger::DEBUG |
14 | 13 |
@server.start |
@@ -34,7 +33,14 @@ describe Agents::MqttAgent do |
||
34 | 33 |
end |
35 | 34 |
|
36 | 35 |
describe "#check" do |
37 |
- it "should check that initial run creates an event" do |
|
36 |
+ it "should create events in the initial run" do |
|
37 |
+ expect { @checker.check }.to change { Event.count }.by(2) |
|
38 |
+ end |
|
39 |
+ |
|
40 |
+ it "should ignore retained messages that are previously received" do |
|
41 |
+ expect { @checker.check }.to change { Event.count }.by(2) |
|
42 |
+ expect { @checker.check }.to change { Event.count }.by(1) |
|
43 |
+ expect { @checker.check }.to change { Event.count }.by(1) |
|
38 | 44 |
expect { @checker.check }.to change { Event.count }.by(2) |
39 | 45 |
end |
40 | 46 |
end |
@@ -52,7 +52,9 @@ class MQTT::FakeServer |
||
52 | 52 |
@port = @socket.addr[1] |
53 | 53 |
@thread ||= Thread.new do |
54 | 54 |
logger.info "Started a fake MQTT server on #{@address}:#{@port}" |
55 |
+ @times = 0 |
|
55 | 56 |
loop do |
57 |
+ @times += 1 |
|
56 | 58 |
# Wait for a client to connect |
57 | 59 |
client = @socket.accept |
58 | 60 |
@pings_received = 0 |
@@ -103,16 +105,33 @@ class MQTT::FakeServer |
||
103 | 105 |
:granted_qos => 0 |
104 | 106 |
) |
105 | 107 |
topic = packet.topics[0][0] |
106 |
- client.write MQTT::Packet::Publish.new( |
|
107 |
- :topic => topic, |
|
108 |
- :payload => "hello #{topic}", |
|
109 |
- :retain => true |
|
110 |
- ) |
|
111 |
- client.write MQTT::Packet::Publish.new( |
|
112 |
- :topic => topic, |
|
113 |
- :payload => "did you know about #{topic}", |
|
114 |
- :retain => true |
|
115 |
- ) |
|
108 |
+ case @times |
|
109 |
+ when 1, ->x { x >= 3 } |
|
110 |
+ # Deliver retained messages |
|
111 |
+ client.write MQTT::Packet::Publish.new( |
|
112 |
+ :topic => topic, |
|
113 |
+ :payload => "did you know about #{topic}", |
|
114 |
+ :retain => true |
|
115 |
+ ) |
|
116 |
+ client.write MQTT::Packet::Publish.new( |
|
117 |
+ :topic => topic, |
|
118 |
+ :payload => "hello #{topic}", |
|
119 |
+ :retain => true |
|
120 |
+ ) |
|
121 |
+ when 2 |
|
122 |
+ # Deliver a still retained message |
|
123 |
+ client.write MQTT::Packet::Publish.new( |
|
124 |
+ :topic => topic, |
|
125 |
+ :payload => "hello #{topic}", |
|
126 |
+ :retain => true |
|
127 |
+ ) |
|
128 |
+ # Deliver a fresh message |
|
129 |
+ client.write MQTT::Packet::Publish.new( |
|
130 |
+ :topic => topic, |
|
131 |
+ :payload => "did you know about #{topic}", |
|
132 |
+ :retain => false |
|
133 |
+ ) |
|
134 |
+ end |
|
116 | 135 |
|
117 | 136 |
when MQTT::Packet::Pingreq |
118 | 137 |
client.write MQTT::Packet::Pingresp.new |
@@ -134,4 +153,4 @@ if __FILE__ == $0 |
||
134 | 153 |
server = MQTT::FakeServer.new(MQTT::DEFAULT_PORT) |
135 | 154 |
server.logger.level = Logger::DEBUG |
136 | 155 |
server.run |
137 |
-end |
|
156 |
+end |