twitter_stream_agent.rb 4.4KB

    module Agents class TwitterStreamAgent < Agent include TwitterConcern cannot_receive_events! description <<-MD The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide. To follow the Twitter stream, provide an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases. Twitter credentials must be supplied as either [credentials](/user_credentials) called `twitter_consumer_key`, `twitter_consumer_secret`, `twitter_oauth_token`, and `twitter_oauth_token_secret`, or as options to this Agent called `consumer_key`, `consumer_secret`, `oauth_token`, and `oauth_token_secret`. To get oAuth credentials for Twitter, [follow these instructions](https://github.com/cantino/huginn/wiki/Getting-a-twitter-oauth-token). Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled. MD event_description <<-MD When in `counts` mode, TwitterStreamAgent events look like: { "filter": "hello world", "count": 25, "time": 3456785456 } When in `events` mode, TwitterStreamAgent events look like: { "filter": "selectorgadget", ... every Tweet field, including ... "text": "something", "user": { "name": "Mr. Someone", "screen_name": "Someone", "location": "Vancouver BC Canada", "description": "...", "followers_count": 486, "friends_count": 1983, "created_at": "Mon Aug 29 23:38:14 +0000 2011", "time_zone": "Pacific Time (US & Canada)", "statuses_count": 3807, "lang": "en" }, "retweet_count": 0, "entities": ... "lang": "en" } MD default_schedule "11pm" def validate_options unless options['filters'].present? && options['expected_update_period_in_days'].present? && options['generate'].present? errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields") end end def working? event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs? end def default_options { 'filters' => %w[keyword1 keyword2], 'expected_update_period_in_days' => "2", 'generate' => "events" } end def process_tweet(filter, status) filter = lookup_filter(filter) if filter if interpolated['generate'] == "counts" # Avoid memory pollution by reloading the Agent. agent = Agent.find(id) agent.memory['filter_counts'] ||= {} agent.memory['filter_counts'][filter] ||= 0 agent.memory['filter_counts'][filter] += 1 remove_unused_keys!(agent, 'filter_counts') agent.save! else create_event :payload => status.merge('filter' => filter) end end end def check if interpolated['generate'] == "counts" && memory['filter_counts'] && memory['filter_counts'].length > 0 memory['filter_counts'].each do |filter, count| create_event :payload => { 'filter' => filter, 'count' => count, 'time' => Time.now.to_i } end end memory['filter_counts'] = {} end protected def lookup_filter(filter) interpolated['filters'].each do |known_filter| if known_filter == filter return filter elsif known_filter.is_a?(Array) if known_filter.include?(filter) return known_filter.first end end end end def remove_unused_keys!(agent, base) if agent.memory[base] (agent.memory[base].keys - agent.interpolated['filters'].map {|f| f.is_a?(Array) ? f.first.to_s : f.to_s }).each do |removed_key| agent.memory[base].delete(removed_key) end end end end end