twitter_stream.rb 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #!/usr/bin/env ruby
  2. # This process is used by TwitterStreamAgents to watch the Twitter stream in real time. It periodically checks for
  3. # new or changed TwitterStreamAgents and starts to follow the stream for them. It is typically run by foreman via
  4. # the included Procfile.
  5. unless defined?(Rails)
  6. puts
  7. puts "Please run me with rails runner, for example:"
  8. puts " RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb"
  9. puts
  10. exit 1
  11. end
  12. require 'cgi'
  13. require 'json'
  14. require 'twitter/json_stream'
  15. require 'em-http-request'
  16. require 'pp'
  17. def stream!(filters, options = {}, &block)
  18. stream = Twitter::JSONStream.connect(
  19. :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
  20. :oauth => {
  21. :consumer_key => options[:consumer_key],
  22. :consumer_secret => options[:consumer_secret],
  23. :access_key => options[:oauth_token] || options[:access_key],
  24. :access_secret => options[:oauth_token_secret] || options[:access_secret]
  25. },
  26. :ssl => true
  27. )
  28. stream.each_item do |status|
  29. status = JSON.parse(status) if status.is_a?(String)
  30. next unless status
  31. next if status.has_key?('delete')
  32. next unless status['text']
  33. status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
  34. block.call(status)
  35. end
  36. stream.on_error do |message|
  37. STDERR.puts " --> Twitter error: #{message} <--"
  38. end
  39. stream.on_no_data do |message|
  40. STDERR.puts " --> Got no data for awhile; trying to reconnect."
  41. EventMachine::stop_event_loop
  42. end
  43. stream.on_max_reconnects do |timeout, retries|
  44. STDERR.puts " --> Oops, tried too many times! <--"
  45. EventMachine::stop_event_loop
  46. end
  47. end
  48. def load_and_run(agents)
  49. agents.group_by { |agent| agent.options[:twitter_username] }.each do |twitter_username, agents|
  50. filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.map(&:strip).inject({}) { |m, f| m[f] = []; m }
  51. agents.each do |agent|
  52. agent.options[:filters].flatten.uniq.compact.map(&:strip).each do |filter|
  53. filter_to_agent_map[filter] << agent
  54. end
  55. end
  56. options = agents.first.options.slice(:consumer_key, :consumer_secret, :access_key, :oauth_token, :access_secret, :oauth_token_secret)
  57. recent_tweets = []
  58. stream!(filter_to_agent_map.keys, options) do |status|
  59. if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
  60. puts "Skipping retweet: #{status["text"]}"
  61. elsif recent_tweets.include?(status["id_str"])
  62. puts "Skipping duplicate tweet: #{status["text"]}"
  63. else
  64. recent_tweets << status["id_str"]
  65. recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
  66. puts status["text"]
  67. filter_to_agent_map.keys.each do |filter|
  68. if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
  69. filter_to_agent_map[filter].each do |agent|
  70. puts " -> #{agent.name}"
  71. agent.process_tweet(filter, status)
  72. end
  73. end
  74. end
  75. end
  76. end
  77. end
  78. end
  79. RELOAD_TIMEOUT = 10.minutes
  80. DUPLICATE_DETECTION_LENGTH = 1000
  81. SEPARATOR = /[^\w_\-]+/
  82. while true
  83. begin
  84. agents = Agents::TwitterStreamAgent.all
  85. EventMachine::run do
  86. EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
  87. puts "Reloading EventMachine and all Agents..."
  88. EventMachine::stop_event_loop
  89. }
  90. if agents.length == 0
  91. puts "No agents found. Will look again in a minute."
  92. sleep 60
  93. EventMachine::stop_event_loop
  94. else
  95. puts "Found #{agents.length} agent(s). Loading them now..."
  96. load_and_run agents
  97. end
  98. end
  99. print "Pausing..."; STDOUT.flush
  100. sleep 5
  101. puts "done."
  102. rescue SignalException, SystemExit
  103. EventMachine::stop_event_loop if EventMachine.reactor_running?
  104. exit
  105. rescue StandardError => e
  106. STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
  107. STDERR.puts "Waiting for a couple of minutes..."
  108. sleep 120
  109. end
  110. end