@@ -148,75 +148,89 @@ module Agents |
||
148 | 148 |
end |
149 | 149 |
|
150 | 150 |
def check |
151 |
- check_url interpolated['url'] |
|
151 |
+ check_urls(interpolated['url']) |
|
152 | 152 |
end |
153 | 153 |
|
154 |
- def check_url(in_url) |
|
154 |
+ def check_urls(in_url) |
|
155 | 155 |
return unless in_url.present? |
156 | 156 |
|
157 | 157 |
Array(in_url).each do |url| |
158 |
- log "Fetching #{url}" |
|
159 |
- response = faraday.get(url) |
|
160 |
- raise "Failed: #{response.inspect}" unless response.success? |
|
161 |
- |
|
162 |
- interpolation_context.stack { |
|
163 |
- interpolation_context['_response_'] = ResponseDrop.new(response) |
|
164 |
- body = response.body |
|
165 |
- if (encoding = interpolated['force_encoding']).present? |
|
166 |
- body = body.encode(Encoding::UTF_8, encoding) |
|
167 |
- end |
|
168 |
- doc = parse(body) |
|
158 |
+ check_url(url).map do |doc| |
|
159 |
+ create_event payload: doc |
|
160 |
+ end |
|
161 |
+ end |
|
162 |
+ end |
|
169 | 163 |
|
170 |
- if extract_full_json? |
|
171 |
- if store_payload!(previous_payloads(1), doc) |
|
172 |
- log "Storing new result for '#{name}': #{doc.inspect}" |
|
173 |
- create_event :payload => doc |
|
174 |
- end |
|
175 |
- next |
|
176 |
- end |
|
164 |
+ def check_url(url) |
|
165 |
+ log "Fetching #{url}" |
|
166 |
+ response = faraday.get(url) |
|
167 |
+ raise "Failed: #{response.inspect}" unless response.success? |
|
177 | 168 |
|
178 |
- output = |
|
179 |
- case extraction_type |
|
180 |
- when 'json' |
|
181 |
- extract_json(doc) |
|
182 |
- when 'text' |
|
183 |
- extract_text(doc) |
|
184 |
- else |
|
185 |
- extract_xml(doc) |
|
186 |
- end |
|
169 |
+ interpolation_context.stack { |
|
170 |
+ interpolation_context['_response_'] = ResponseDrop.new(response) |
|
171 |
+ body = response.body |
|
172 |
+ if (encoding = interpolated['force_encoding']).present? |
|
173 |
+ body = body.encode(Encoding::UTF_8, encoding) |
|
174 |
+ end |
|
175 |
+ doc = parse(body) |
|
187 | 176 |
|
188 |
- num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq |
|
177 |
+ results = [] |
|
178 |
+ if extract_full_json? |
|
179 |
+ if store_payload!(previous_payloads(1), doc) |
|
180 |
+ log "Storing new result for '#{name}': #{doc.inspect}" |
|
181 |
+ results << doc |
|
182 |
+ end |
|
183 |
+ return results |
|
184 |
+ end |
|
189 | 185 |
|
190 |
- if num_unique_lengths.length != 1 |
|
191 |
- raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}" |
|
186 |
+ output = |
|
187 |
+ case extraction_type |
|
188 |
+ when 'json' |
|
189 |
+ extract_json(doc) |
|
190 |
+ when 'text' |
|
191 |
+ extract_text(doc) |
|
192 |
+ else |
|
193 |
+ extract_xml(doc) |
|
192 | 194 |
end |
193 | 195 |
|
194 |
- old_events = previous_payloads num_unique_lengths.first |
|
195 |
- num_unique_lengths.first.times do |index| |
|
196 |
- result = {} |
|
197 |
- interpolated['extract'].keys.each do |name| |
|
198 |
- result[name] = output[name][index] |
|
199 |
- if name.to_s == 'url' |
|
200 |
- result[name] = (response.env[:url] + result[name]).to_s |
|
201 |
- end |
|
202 |
- end |
|
196 |
+ num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq |
|
203 | 197 |
|
204 |
- if store_payload!(old_events, result) |
|
205 |
- log "Storing new parsed result for '#{name}': #{result.inspect}" |
|
206 |
- create_event :payload => result |
|
198 |
+ if num_unique_lengths.length != 1 |
|
199 |
+ raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}" |
|
200 |
+ end |
|
201 |
+ |
|
202 |
+ old_events = previous_payloads num_unique_lengths.first |
|
203 |
+ num_unique_lengths.first.times do |index| |
|
204 |
+ result = {} |
|
205 |
+ interpolated['extract'].keys.each do |name| |
|
206 |
+ result[name] = output[name][index] |
|
207 |
+ if name.to_s == 'url' |
|
208 |
+ result[name] = (response.env[:url] + result[name]).to_s |
|
207 | 209 |
end |
208 | 210 |
end |
209 |
- } |
|
210 |
- end |
|
211 |
+ |
|
212 |
+ if store_payload!(old_events, result) |
|
213 |
+ log "Storing new parsed result for '#{name}': #{result.inspect}" |
|
214 |
+ results << result |
|
215 |
+ end |
|
216 |
+ end |
|
217 |
+ |
|
218 |
+ results |
|
219 |
+ } |
|
211 | 220 |
rescue => e |
212 | 221 |
error "Error when fetching url: #{e.message}\n#{e.backtrace.join("\n")}" |
222 |
+ return [] |
|
213 | 223 |
end |
214 | 224 |
|
215 | 225 |
def receive(incoming_events) |
216 | 226 |
incoming_events.each do |event| |
217 | 227 |
interpolate_with(event) do |
218 | 228 |
url_to_scrape = event.payload['url'] |
219 |
- check_url(url_to_scrape) if url_to_scrape =~ /^https?:\/\//i |
|
229 |
+ valid_url = url_to_scrape =~ /^https?:\/\//i |
|
230 |
+ docs = valid_url ? check_url(url_to_scrape) : [] |
|
231 |
+ docs.each do |doc| |
|
232 |
+ create_event payload: doc |
|
233 |
+ end |
|
220 | 234 |
end |
221 | 235 |
end |
222 | 236 |
end |