twitter_stream_agent.rb 4.3KB

    module Agents class TwitterStreamAgent < Agent include TwitterConcern cannot_receive_events! description <<-MD The TwitterStreamAgent follows the Twitter stream in real time, watching for certain keywords, or filters, that you provide. You must provide an oAuth `consumer_key`, `consumer_secret`, `oauth_token`, and `oauth_token_secret`, as well as an array of `filters`. Multiple words in a filter must all show up in a tweet, but are independent of order. If you provide an array instead of a filter, the first entry will be considered primary and any additional values will be treated as aliases. To get oAuth credentials for Twitter, [follow these instructions](https://github.com/cantino/huginn/wiki/Getting-a-twitter-oauth-token). Set `expected_update_period_in_days` to the maximum amount of time that you'd expect to pass between Events being created by this Agent. `generate` should be either `events` or `counts`. If set to `counts`, it will output event summaries whenever the Agent is scheduled. MD event_description <<-MD When in `counts` mode, TwitterStreamAgent events look like: { "filter": "hello world", "count": 25, "time": 3456785456 } When in `events` mode, TwitterStreamAgent events look like: { "filter": "selectorgadget", ... every Tweet field, including ... "text": "something", "user": { "name": "Mr. Someone", "screen_name": "Someone", "location": "Vancouver BC Canada", "description": "...", "followers_count": 486, "friends_count": 1983, "created_at": "Mon Aug 29 23:38:14 +0000 2011", "time_zone": "Pacific Time (US & Canada)", "statuses_count": 3807, "lang": "en" }, "retweet_count": 0, "entities": ... "lang": "en" } MD default_schedule "11pm" def validate_options unless options[:filters].present? && options[:expected_update_period_in_days].present? && options[:generate].present? errors.add(:base, "expected_update_period_in_days, generate, and filters are required fields") end end def working? event_created_within(options[:expected_update_period_in_days]) && !recent_error_logs? end def default_options { :consumer_key => "---", :consumer_secret => "---", :oauth_token => "---", :oauth_token_secret => "---", :filters => %w[keyword1 keyword2], :expected_update_period_in_days => "2", :generate => "events" } end def process_tweet(filter, status) filter = lookup_filter(filter) if filter if options[:generate] == "counts" # Avoid memory pollution by reloading the Agent. agent = Agent.find(id) agent.memory[:filter_counts] ||= {} agent.memory[:filter_counts][filter.to_sym] ||= 0 agent.memory[:filter_counts][filter.to_sym] += 1 remove_unused_keys!(agent, :filter_counts) agent.save! else create_event :payload => status.merge(:filter => filter.to_s) end end end def check if options[:generate] == "counts" && memory[:filter_counts] && memory[:filter_counts].length > 0 memory[:filter_counts].each do |filter, count| create_event :payload => { :filter => filter.to_s, :count => count, :time => Time.now.to_i } end end memory[:filter_counts] = {} end protected def lookup_filter(filter) options[:filters].each do |known_filter| if known_filter == filter return filter elsif known_filter.is_a?(Array) if known_filter.include?(filter) return known_filter.first end end end end def remove_unused_keys!(agent, base) if agent.memory[base] (agent.memory[base].keys - agent.options[:filters].map {|f| f.is_a?(Array) ? f.first.to_sym : f.to_sym }).each do |removed_key| agent.memory[base].delete(removed_key) end end end end end