website_agent.rb 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. require 'nokogiri'
  2. require 'date'
  3. module Agents
  4. class WebsiteAgent < Agent
  5. include WebRequestConcern
  6. default_schedule "every_12h"
  7. UNIQUENESS_LOOK_BACK = 200
  8. UNIQUENESS_FACTOR = 3
  9. description <<-MD
  10. The WebsiteAgent scrapes a website, XML document, or JSON feed and creates Events based on the results.
  11. Specify a `url` and select a `mode` for when to create Events based on the scraped data, either `all` or `on_change`.
  12. `url` can be a single url, or an array of urls (for example, for multiple pages with the exact same structure but different content to scrape)
  13. The `type` value can be `xml`, `html`, `json`, or `text`.
  14. To tell the Agent how to parse the content, specify `extract` as a hash with keys naming the extractions and values of hashes.
  15. When parsing HTML or XML, these sub-hashes specify how each extraction should be done. The Agent first selects a node set from the document for each extraction key by evaluating either a CSS selector in `css` or an XPath expression in `xpath`. It then evaluates an XPath expression in `value` on each node in the node set, converting the result into string. Here's an example:
  16. "extract": {
  17. "url": { "css": "#comic img", "value": "@src" },
  18. "title": { "css": "#comic img", "value": "@title" },
  19. "body_text": { "css": "div.main", "value": ".//text()" }
  20. }
  21. "@_attr_" is the XPath expression to extract the value of an attribute named _attr_ from a node, and ".//text()" is to extract all the enclosed texts. You can also use [XPath functions](http://www.w3.org/TR/xpath/#section-String-Functions) like `normalize-space` to strip and squeeze whitespace, `substring-after` to extract part of a text, and `translate` to remove comma from a formatted number, etc. Note that these functions take a string, not a node set, so what you may think would be written as `normalize-text(.//text())` should actually be `normalize-text(.)`.
  22. When parsing JSON, these sub-hashes specify [JSONPaths](http://goessner.net/articles/JsonPath/) to the values that you care about. For example:
  23. "extract": {
  24. "title": { "path": "results.data[*].title" },
  25. "description": { "path": "results.data[*].description" }
  26. }
  27. When parsing text, each sub-hash should contain a `regexp` and `index`. Output text is matched against the regular expression repeatedly from the beginning through to the end, collecting a captured group specified by `index` in each match. Each index should be either an integer or a string name which corresponds to `(?<_name_>...)`. For example, to parse lines of `_word_: _definition_`, the following should work:
  28. "extract": {
  29. "word": { "regexp": "^(.+?): (.+)$", index: 1 },
  30. "definition": { "regexp": "^(.+?): (.+)$", index: 2 }
  31. }
  32. Or if you prefer names to numbers for index:
  33. "extract": {
  34. "word": { "regexp": "^(?<word>.+?): (?<definition>.+)$", index: 'word' },
  35. "definition": { "regexp": "^(?<word>.+?): (?<definition>.+)$", index: 'definition' }
  36. }
  37. To extract the whole content as one event:
  38. "extract": {
  39. "content": { "regexp": "\A(?m:.)*\z", index: 0 }
  40. }
  41. Beware that `.` does not match the newline character (LF) unless the `m` flag is in effect, and `^`/`$` basically match every line beginning/end. See [this document](http://ruby-doc.org/core-#{RUBY_VERSION}/doc/regexp_rdoc.html) to learn the regular expression variant used in this service.
  42. Note that for all of the formats, whatever you extract MUST have the same number of matches for each extractor. E.g., if you're extracting rows, all extractors must match all rows. For generating CSS selectors, something like [SelectorGadget](http://selectorgadget.com) may be helpful.
  43. Can be configured to use HTTP basic auth by including the `basic_auth` parameter with `"username:password"`, or `["username", "password"]`.
  44. Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent. This is only used to set the "working" status.
  45. Set `uniqueness_look_back` to limit the number of events checked for uniqueness (typically for performance). This defaults to the larger of #{UNIQUENESS_LOOK_BACK} or #{UNIQUENESS_FACTOR}x the number of detected received results.
  46. Set `force_encoding` to an encoding name if the website does not return a Content-Type header with a proper charset.
  47. Set `user_agent` to a custom User-Agent name if the website does not like the default value (`#{default_user_agent}`).
  48. The `headers` field is optional. When present, it should be a hash of headers to send with the request.
  49. The WebsiteAgent can also scrape based on incoming events. It will scrape the url contained in the `url` key of the incoming event payload.
  50. MD
  51. event_description do
  52. "Events will have the following fields:\n\n %s" % [
  53. Utils.pretty_print(Hash[options['extract'].keys.map { |key|
  54. [key, "..."]
  55. }])
  56. ]
  57. end
  58. def working?
  59. event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs?
  60. end
  61. def default_options
  62. {
  63. 'expected_update_period_in_days' => "2",
  64. 'url' => "http://xkcd.com",
  65. 'type' => "html",
  66. 'mode' => "on_change",
  67. 'extract' => {
  68. 'url' => { 'css' => "#comic img", 'value' => "@src" },
  69. 'title' => { 'css' => "#comic img", 'value' => "@alt" },
  70. 'hovertext' => { 'css' => "#comic img", 'value' => "@title" }
  71. }
  72. }
  73. end
  74. def validate_options
  75. # Check for required fields
  76. errors.add(:base, "url and expected_update_period_in_days are required") unless options['expected_update_period_in_days'].present? && options['url'].present?
  77. if !options['extract'].present? && extraction_type != "json"
  78. errors.add(:base, "extract is required for all types except json")
  79. end
  80. # Check for optional fields
  81. if options['mode'].present?
  82. errors.add(:base, "mode must be set to on_change or all") unless %w[on_change all].include?(options['mode'])
  83. end
  84. if options['expected_update_period_in_days'].present?
  85. errors.add(:base, "Invalid expected_update_period_in_days format") unless is_positive_integer?(options['expected_update_period_in_days'])
  86. end
  87. if options['uniqueness_look_back'].present?
  88. errors.add(:base, "Invalid uniqueness_look_back format") unless is_positive_integer?(options['uniqueness_look_back'])
  89. end
  90. if (encoding = options['force_encoding']).present?
  91. case encoding
  92. when String
  93. begin
  94. Encoding.find(encoding)
  95. rescue ArgumentError
  96. errors.add(:base, "Unknown encoding: #{encoding.inspect}")
  97. end
  98. else
  99. errors.add(:base, "force_encoding must be a string")
  100. end
  101. end
  102. validate_web_request_options!
  103. end
  104. def check
  105. check_url interpolated['url']
  106. end
  107. def check_url(in_url)
  108. return unless in_url.present?
  109. Array(in_url).each do |url|
  110. log "Fetching #{url}"
  111. response = faraday.get(url)
  112. if response.success?
  113. body = response.body
  114. if (encoding = interpolated['force_encoding']).present?
  115. body = body.encode(Encoding::UTF_8, encoding)
  116. end
  117. doc = parse(body)
  118. if extract_full_json?
  119. if store_payload!(previous_payloads(1), doc)
  120. log "Storing new result for '#{name}': #{doc.inspect}"
  121. create_event :payload => doc
  122. end
  123. next
  124. end
  125. output =
  126. case extraction_type
  127. when 'json'
  128. extract_json(doc)
  129. when 'text'
  130. extract_text(doc)
  131. else
  132. extract_xml(doc)
  133. end
  134. num_unique_lengths = interpolated['extract'].keys.map { |name| output[name].length }.uniq
  135. if num_unique_lengths.length != 1
  136. raise "Got an uneven number of matches for #{interpolated['name']}: #{interpolated['extract'].inspect}"
  137. end
  138. old_events = previous_payloads num_unique_lengths.first
  139. num_unique_lengths.first.times do |index|
  140. result = {}
  141. interpolated['extract'].keys.each do |name|
  142. result[name] = output[name][index]
  143. if name.to_s == 'url'
  144. result[name] = (response.env[:url] + result[name]).to_s
  145. end
  146. end
  147. if store_payload!(old_events, result)
  148. log "Storing new parsed result for '#{name}': #{result.inspect}"
  149. create_event :payload => result
  150. end
  151. end
  152. else
  153. raise "Failed: #{response.inspect}"
  154. end
  155. end
  156. rescue => e
  157. error e.message
  158. end
  159. def receive(incoming_events)
  160. incoming_events.each do |event|
  161. Thread.current[:current_event] = event
  162. url_to_scrape = event.payload['url']
  163. check_url(url_to_scrape) if url_to_scrape =~ /^https?:\/\//i
  164. end
  165. ensure
  166. Thread.current[:current_event] = nil
  167. end
  168. def interpolated(event = Thread.current[:current_event])
  169. super
  170. end
  171. private
  172. # This method returns true if the result should be stored as a new event.
  173. # If mode is set to 'on_change', this method may return false and update an existing
  174. # event to expire further in the future.
  175. def store_payload!(old_events, result)
  176. case interpolated['mode'].presence
  177. when 'on_change'
  178. result_json = result.to_json
  179. old_events.each do |old_event|
  180. if old_event.payload.to_json == result_json
  181. old_event.expires_at = new_event_expiration_date
  182. old_event.save!
  183. return false
  184. end
  185. end
  186. true
  187. when 'all', ''
  188. true
  189. else
  190. raise "Illegal options[mode]: #{interpolated['mode']}"
  191. end
  192. end
  193. def previous_payloads(num_events)
  194. if interpolated['uniqueness_look_back'].present?
  195. look_back = interpolated['uniqueness_look_back'].to_i
  196. else
  197. # Larger of UNIQUENESS_FACTOR * num_events and UNIQUENESS_LOOK_BACK
  198. look_back = UNIQUENESS_FACTOR * num_events
  199. if look_back < UNIQUENESS_LOOK_BACK
  200. look_back = UNIQUENESS_LOOK_BACK
  201. end
  202. end
  203. events.order("id desc").limit(look_back) if interpolated['mode'] == "on_change"
  204. end
  205. def extract_full_json?
  206. !interpolated['extract'].present? && extraction_type == "json"
  207. end
  208. def extraction_type
  209. (interpolated['type'] || begin
  210. case interpolated['url']
  211. when /\.(rss|xml)$/i
  212. "xml"
  213. when /\.json$/i
  214. "json"
  215. when /\.(txt|text)$/i
  216. "text"
  217. else
  218. "html"
  219. end
  220. end).to_s
  221. end
  222. def extract_each(doc, &block)
  223. interpolated['extract'].each_with_object({}) { |(name, extraction_details), output|
  224. output[name] = block.call(extraction_details)
  225. }
  226. end
  227. def extract_json(doc)
  228. extract_each(doc) { |extraction_details|
  229. result = Utils.values_at(doc, extraction_details['path'])
  230. log "Extracting #{extraction_type} at #{extraction_details['path']}: #{result}"
  231. result
  232. }
  233. end
  234. def extract_text(doc)
  235. extract_each(doc) { |extraction_details|
  236. regexp = Regexp.new(extraction_details['regexp'])
  237. result = []
  238. doc.scan(regexp) {
  239. result << Regexp.last_match[extraction_details['index']]
  240. }
  241. log "Extracting #{extraction_type} at #{regexp}: #{result}"
  242. result
  243. }
  244. end
  245. def extract_xml(doc)
  246. extract_each(doc) { |extraction_details|
  247. case
  248. when css = extraction_details['css']
  249. nodes = doc.css(css)
  250. when xpath = extraction_details['xpath']
  251. doc.remove_namespaces! # ignore xmlns, useful when parsing atom feeds
  252. nodes = doc.xpath(xpath)
  253. else
  254. raise '"css" or "xpath" is required for HTML or XML extraction'
  255. end
  256. case nodes
  257. when Nokogiri::XML::NodeSet
  258. result = nodes.map { |node|
  259. case value = node.xpath(extraction_details['value'])
  260. when Float
  261. # Node#xpath() returns any numeric value as float;
  262. # convert it to integer as appropriate.
  263. value = value.to_i if value.to_i == value
  264. end
  265. value.to_s
  266. }
  267. else
  268. raise "The result of HTML/XML extraction was not a NodeSet"
  269. end
  270. log "Extracting #{extraction_type} at #{xpath || css}: #{result}"
  271. result
  272. }
  273. end
  274. def parse(data)
  275. case extraction_type
  276. when "xml"
  277. Nokogiri::XML(data)
  278. when "json"
  279. JSON.parse(data)
  280. when "html"
  281. Nokogiri::HTML(data)
  282. when "text"
  283. data
  284. else
  285. raise "Unknown extraction type #{extraction_type}"
  286. end
  287. end
  288. def is_positive_integer?(value)
  289. Integer(value) >= 0
  290. rescue
  291. false
  292. end
  293. end
  294. end