website_agent.rb 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. require 'nokogiri'
  2. require 'date'
  3. module Agents
  4. class WebsiteAgent < Agent
  5. include WebRequestConcern
  6. can_dry_run!
  7. default_schedule "every_12h"
  8. UNIQUENESS_LOOK_BACK = 200
  9. UNIQUENESS_FACTOR = 3
  10. description <<-MD
  11. The WebsiteAgent scrapes a website, XML document, or JSON feed and creates Events based on the results.
  12. Specify a `url` and select a `mode` for when to create Events based on the scraped data, either `all` or `on_change`.
  13. `url` can be a single url, or an array of urls (for example, for multiple pages with the exact same structure but different content to scrape)
  14. The `type` value can be `xml`, `html`, `json`, or `text`.
  15. To tell the Agent how to parse the content, specify `extract` as a hash with keys naming the extractions and values of hashes.
  16. When parsing HTML or XML, these sub-hashes specify how each extraction should be done. The Agent first selects a node set from the document for each extraction key by evaluating either a CSS selector in `css` or an XPath expression in `xpath`. It then evaluates an XPath expression in `value` on each node in the node set, converting the result into string. Here's an example:
  17. "extract": {
  18. "url": { "css": "#comic img", "value": "@src" },
  19. "title": { "css": "#comic img", "value": "@title" },
  20. "body_text": { "css": "div.main", "value": ".//text()" }
  21. }
  22. "@_attr_" is the XPath expression to extract the value of an attribute named _attr_ from a node, and ".//text()" is to extract all the enclosed texts. You can also use [XPath functions](http://www.w3.org/TR/xpath/#section-String-Functions) like `normalize-space` to strip and squeeze whitespace, `substring-after` to extract part of a text, and `translate` to remove comma from a formatted number, etc. Note that these functions take a string, not a node set, so what you may think would be written as `normalize-space(.//text())` should actually be `normalize-space(.)`.
  23. Beware that when parsing an XML document (i.e. `type` is `xml`) using `xpath` expressions all namespaces are stripped from the document unless a toplevel option `use_namespaces` is set to true.
  24. When parsing JSON, these sub-hashes specify [JSONPaths](http://goessner.net/articles/JsonPath/) to the values that you care about. For example:
  25. "extract": {
  26. "title": { "path": "results.data[*].title" },
  27. "description": { "path": "results.data[*].description" }
  28. }
  29. When parsing text, each sub-hash should contain a `regexp` and `index`. Output text is matched against the regular expression repeatedly from the beginning through to the end, collecting a captured group specified by `index` in each match. Each index should be either an integer or a string name which corresponds to <code>(?&lt;<em>name</em>&gt;...)</code>. For example, to parse lines of <code><em>word</em>: <em>definition</em></code>, the following should work:
  30. "extract": {
  31. "word": { "regexp": "^(.+?): (.+)$", index: 1 },
  32. "definition": { "regexp": "^(.+?): (.+)$", index: 2 }
  33. }
  34. Or if you prefer names to numbers for index:
  35. "extract": {
  36. "word": { "regexp": "^(?<word>.+?): (?<definition>.+)$", index: 'word' },
  37. "definition": { "regexp": "^(?<word>.+?): (?<definition>.+)$", index: 'definition' }
  38. }
  39. To extract the whole content as one event:
  40. "extract": {
  41. "content": { "regexp": "\A(?m:.)*\z", index: 0 }
  42. }
  43. Beware that `.` does not match the newline character (LF) unless the `m` flag is in effect, and `^`/`$` basically match every line beginning/end. See [this document](http://ruby-doc.org/core-#{RUBY_VERSION}/doc/regexp_rdoc.html) to learn the regular expression variant used in this service.
  44. Note that for all of the formats, whatever you extract MUST have the same number of matches for each extractor. E.g., if you're extracting rows, all extractors must match all rows. For generating CSS selectors, something like [SelectorGadget](http://selectorgadget.com) may be helpful.
  45. Can be configured to use HTTP basic auth by including the `basic_auth` parameter with `"username:password"`, or `["username", "password"]`.
  46. Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent. This is only used to set the "working" status.
  47. Set `uniqueness_look_back` to limit the number of events checked for uniqueness (typically for performance). This defaults to the larger of #{UNIQUENESS_LOOK_BACK} or #{UNIQUENESS_FACTOR}x the number of detected received results.
  48. Set `force_encoding` to an encoding name if the website does not return a Content-Type header with a proper charset.
  49. Set `user_agent` to a custom User-Agent name if the website does not like the default value (`#{default_user_agent}`).
  50. The `headers` field is optional. When present, it should be a hash of headers to send with the request.
  51. Set `disable_ssl_verification` to `true` to disable ssl verification.
  52. Set `unzip` to `gzip` to inflate the resource using gzip.
  53. The WebsiteAgent can also scrape based on incoming events. It will scrape the url contained in the `url` key of the incoming event payload. If you specify `merge` as the mode, it will retain the old payload and update it with the new values.
  54. In Liquid templating, the following variable is available:
  55. * `_response_`: A response object with the following keys:
  56. * `status`: HTTP status as integer. (Almost always 200)
  57. * `headers`: Response headers; for example, `{{ _response_.headers.Content-Type }}` expands to the value of the Content-Type header. Keys are insensitive to cases and -/_.
  58. MD
  59. event_description do
  60. "Events will have the following fields:\n\n %s" % [
  61. Utils.pretty_print(Hash[options['extract'].keys.map { |key|
  62. [key, "..."]
  63. }])
  64. ]
  65. end
  66. def working?
  67. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  68. end
  69. def default_options
  70. {
  71. 'expected_update_period_in_days' => "2",
  72. 'url' => "http://xkcd.com",
  73. 'type' => "html",
  74. 'mode' => "on_change",
  75. 'extract' => {
  76. 'url' => { 'css' => "#comic img", 'value' => "@src" },
  77. 'title' => { 'css' => "#comic img", 'value' => "@alt" },
  78. 'hovertext' => { 'css' => "#comic img", 'value' => "@title" }
  79. }
  80. }
  81. end
  82. def validate_options
  83. # Check for required fields
  84. errors.add(:base, "url and expected_update_period_in_days are required") unless options['expected_update_period_in_days'].present? && options['url'].present?
  85. if !options['extract'].present? && extraction_type != "json"
  86. errors.add(:base, "extract is required for all types except json")
  87. end
  88. # Check for optional fields
  89. if options['mode'].present?
  90. errors.add(:base, "mode must be set to on_change, all or merge") unless %w[on_change all merge].include?(options['mode'])
  91. end
  92. if options['expected_update_period_in_days'].present?
  93. errors.add(:base, "Invalid expected_update_period_in_days format") unless is_positive_integer?(options['expected_update_period_in_days'])
  94. end
  95. if options['uniqueness_look_back'].present?
  96. errors.add(:base, "Invalid uniqueness_look_back format") unless is_positive_integer?(options['uniqueness_look_back'])
  97. end
  98. if (encoding = options['force_encoding']).present?
  99. case encoding
  100. when String
  101. begin
  102. Encoding.find(encoding)
  103. rescue ArgumentError
  104. errors.add(:base, "Unknown encoding: #{encoding.inspect}")
  105. end
  106. else
  107. errors.add(:base, "force_encoding must be a string")
  108. end
  109. end
  110. validate_web_request_options!
  111. validate_extract_options!
  112. end
  113. def validate_extract_options!
  114. if extraction_type == "json" && interpolated['extract'].is_a?(Hash)
  115. unless interpolated['extract'].all? { |name, details| details.is_a?(Hash) && details['path'].present? }
  116. errors.add(:base, 'When type is json, all extractions must have a path attribute.')
  117. end
  118. end
  119. end
  120. def check
  121. check_urls(interpolated['url'])
  122. end
  123. def check_urls(in_url)
  124. return unless in_url.present?
  125. Array(in_url).each do |url|
  126. check_url(url)
  127. end
  128. end
  129. def check_url(url, payload = {})
  130. log "Fetching #{url}"
  131. response = faraday.get(url)
  132. raise "Failed: #{response.inspect}" unless response.success?
  133. interpolation_context.stack {
  134. interpolation_context['_response_'] = ResponseDrop.new(response)
  135. body = response.body
  136. if (encoding = interpolated['force_encoding']).present?
  137. body = body.encode(Encoding::UTF_8, encoding)
  138. end
  139. if interpolated['unzip'] == "gzip"
  140. body = ActiveSupport::Gzip.decompress(body)
  141. end
  142. doc = parse(body)
  143. if extract_full_json?
  144. if store_payload!(previous_payloads(1), doc)
  145. log "Storing new result for '#{name}': #{doc.inspect}"
  146. create_event payload: payload.merge(doc)
  147. end
  148. return
  149. end
  150. output =
  151. case extraction_type
  152. when 'json'
  153. extract_json(doc)
  154. when 'text'
  155. extract_text(doc)
  156. else
  157. extract_xml(doc)
  158. end
  159. num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq
  160. if num_unique_lengths.length != 1
  161. raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}"
  162. end
  163. old_events = previous_payloads num_unique_lengths.first
  164. num_unique_lengths.first.times do |index|
  165. result = {}
  166. interpolated['extract'].keys.each do |name|
  167. result[name] = output[name][index]
  168. if name.to_s == 'url'
  169. result[name] = (response.env[:url] + result[name]).to_s
  170. end
  171. end
  172. if store_payload!(old_events, result)
  173. log "Storing new parsed result for '#{name}': #{result.inspect}"
  174. create_event payload: payload.merge(result)
  175. end
  176. end
  177. }
  178. rescue => e
  179. error "Error when fetching url: #{e.message}\n#{e.backtrace.join("\n")}"
  180. end
  181. def receive(incoming_events)
  182. incoming_events.each do |event|
  183. interpolate_with(event) do
  184. url_to_scrape = event.payload['url']
  185. next unless url_to_scrape =~ /^https?:\/\//i
  186. check_url(url_to_scrape,
  187. interpolated['mode'].to_s == "merge" ? event.payload : {})
  188. end
  189. end
  190. end
  191. private
  192. # This method returns true if the result should be stored as a new event.
  193. # If mode is set to 'on_change', this method may return false and update an existing
  194. # event to expire further in the future.
  195. def store_payload!(old_events, result)
  196. case interpolated['mode'].presence
  197. when 'on_change'
  198. result_json = result.to_json
  199. if found = old_events.find { |event| event.payload.to_json == result_json }
  200. found.update!(expires_at: new_event_expiration_date)
  201. false
  202. else
  203. true
  204. end
  205. when 'all', 'merge', ''
  206. true
  207. else
  208. raise "Illegal options[mode]: #{interpolated['mode']}"
  209. end
  210. end
  211. def previous_payloads(num_events)
  212. if interpolated['uniqueness_look_back'].present?
  213. look_back = interpolated['uniqueness_look_back'].to_i
  214. else
  215. # Larger of UNIQUENESS_FACTOR * num_events and UNIQUENESS_LOOK_BACK
  216. look_back = UNIQUENESS_FACTOR * num_events
  217. if look_back < UNIQUENESS_LOOK_BACK
  218. look_back = UNIQUENESS_LOOK_BACK
  219. end
  220. end
  221. events.order("id desc").limit(look_back) if interpolated['mode'] == "on_change"
  222. end
  223. def extract_full_json?
  224. !interpolated['extract'].present? && extraction_type == "json"
  225. end
  226. def extraction_type
  227. (interpolated['type'] || begin
  228. case interpolated['url']
  229. when /\.(rss|xml)$/i
  230. "xml"
  231. when /\.json$/i
  232. "json"
  233. when /\.(txt|text)$/i
  234. "text"
  235. else
  236. "html"
  237. end
  238. end).to_s
  239. end
  240. def use_namespaces?
  241. if value = interpolated.key?('use_namespaces')
  242. boolify(interpolated['use_namespaces'])
  243. else
  244. interpolated['extract'].none? { |name, extraction_details|
  245. extraction_details.key?('xpath')
  246. }
  247. end
  248. end
  249. def extract_each(&block)
  250. interpolated['extract'].each_with_object({}) { |(name, extraction_details), output|
  251. output[name] = block.call(extraction_details)
  252. }
  253. end
  254. def extract_json(doc)
  255. extract_each { |extraction_details|
  256. result = Utils.values_at(doc, extraction_details['path'])
  257. log "Extracting #{extraction_type} at #{extraction_details['path']}: #{result}"
  258. result
  259. }
  260. end
  261. def extract_text(doc)
  262. extract_each { |extraction_details|
  263. regexp = Regexp.new(extraction_details['regexp'])
  264. result = []
  265. doc.scan(regexp) {
  266. result << Regexp.last_match[extraction_details['index']]
  267. }
  268. log "Extracting #{extraction_type} at #{regexp}: #{result}"
  269. result
  270. }
  271. end
  272. def extract_xml(doc)
  273. extract_each { |extraction_details|
  274. case
  275. when css = extraction_details['css']
  276. nodes = doc.css(css)
  277. when xpath = extraction_details['xpath']
  278. nodes = doc.xpath(xpath)
  279. else
  280. raise '"css" or "xpath" is required for HTML or XML extraction'
  281. end
  282. case nodes
  283. when Nokogiri::XML::NodeSet
  284. result = nodes.map { |node|
  285. case value = node.xpath(extraction_details['value'])
  286. when Float
  287. # Node#xpath() returns any numeric value as float;
  288. # convert it to integer as appropriate.
  289. value = value.to_i if value.to_i == value
  290. end
  291. value.to_s
  292. }
  293. else
  294. raise "The result of HTML/XML extraction was not a NodeSet"
  295. end
  296. log "Extracting #{extraction_type} at #{xpath || css}: #{result}"
  297. result
  298. }
  299. end
  300. def parse(data)
  301. case type = extraction_type
  302. when "xml"
  303. doc = Nokogiri::XML(data)
  304. # ignore xmlns, useful when parsing atom feeds
  305. doc.remove_namespaces! unless use_namespaces?
  306. doc
  307. when "json"
  308. JSON.parse(data)
  309. when "html"
  310. Nokogiri::HTML(data)
  311. when "text"
  312. data
  313. else
  314. raise "Unknown extraction type: #{type}"
  315. end
  316. end
  317. def is_positive_integer?(value)
  318. Integer(value) >= 0
  319. rescue
  320. false
  321. end
  322. # Wraps Faraday::Response
  323. class ResponseDrop < LiquidDroppable::Drop
  324. def headers
  325. HeaderDrop.new(@object.headers)
  326. end
  327. # Integer value of HTTP status
  328. def status
  329. @object.status
  330. end
  331. end
  332. # Wraps Faraday::Utils::Headers
  333. class HeaderDrop < LiquidDroppable::Drop
  334. def before_method(name)
  335. @object[name.tr('_', '-')]
  336. end
  337. end
  338. end
  339. end