twitter_stream.rb 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env ruby
  2. unless defined?(Rails)
  3. puts
  4. puts "Please run me with rails runner, for example:"
  5. puts " RAILS_ENV=production bundle exec rails runner bin/twitter_stream.rb"
  6. puts
  7. exit 1
  8. end
  9. require 'cgi'
  10. require 'json'
  11. require 'twitter/json_stream'
  12. require 'em-http-request'
  13. require 'pp'
  14. def stream!(username, password, filters, &block)
  15. stream = Twitter::JSONStream.connect(
  16. :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{filters.map {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
  17. :auth => "#{username}:#{password}",
  18. :ssl => true
  19. )
  20. stream.each_item do |status|
  21. status = JSON.parse(status) if status.is_a?(String)
  22. next unless status
  23. next if status.has_key?('delete')
  24. next unless status['text']
  25. status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
  26. block.call(status)
  27. end
  28. stream.on_error do |message|
  29. STDERR.puts " --> Twitter error: #{message} <--"
  30. end
  31. stream.on_no_data do |message|
  32. STDERR.puts " --> Got no data for awhile; trying to reconnect."
  33. EventMachine::stop_event_loop
  34. end
  35. stream.on_max_reconnects do |timeout, retries|
  36. STDERR.puts " --> Oops, tried too many times! <--"
  37. EventMachine::stop_event_loop
  38. end
  39. end
  40. def load_and_run(agents)
  41. agents.group_by { |agent| agent.options[:twitter_username] }.each do |twitter_username, agents|
  42. filter_to_agent_map = agents.map { |agent| agent.options[:filters] }.flatten.uniq.compact.inject({}) { |m, f| m[f] = []; m }
  43. agents.each do |agent|
  44. agent.options[:filters].uniq.map(&:strip).each do |filter|
  45. filter_to_agent_map[filter] << agent
  46. end
  47. end
  48. username = agents.first.options[:twitter_username]
  49. password = agents.first.options[:twitter_password]
  50. recent_tweets = []
  51. stream!(username, password, filter_to_agent_map.keys) do |status|
  52. if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
  53. puts "Skipping retweet: #{status["text"]}"
  54. elsif recent_tweets.include?(status["id_str"])
  55. puts "Skipping duplicate tweet: #{status["text"]}"
  56. else
  57. recent_tweets << status["id_str"]
  58. recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
  59. puts status["text"]
  60. filter_to_agent_map.keys.each do |filter|
  61. if filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR) == [] # Hacky McHackerson
  62. filter_to_agent_map[filter].each do |agent|
  63. puts " -> #{agent.name}"
  64. agent.process_tweet(filter, status)
  65. end
  66. end
  67. end
  68. end
  69. end
  70. end
  71. end
  72. RELOAD_TIMEOUT = 10.minutes
  73. DUPLICATE_DETECTION_LENGTH = 1000
  74. SEPARATOR = /[^\w_\-]+/
  75. while true
  76. begin
  77. agents = Agents::TwitterStreamAgent.all
  78. EventMachine::run do
  79. EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
  80. puts "Reloading EventMachine and all Agents..."
  81. EventMachine::stop_event_loop
  82. }
  83. if agents.length == 0
  84. puts "No agents found. Will look again in a minute."
  85. sleep 60
  86. EventMachine::stop_event_loop
  87. else
  88. puts "Found #{agents.length} agent(s). Loading them now..."
  89. load_and_run agents
  90. end
  91. end
  92. print "Pausing..."; STDOUT.flush
  93. sleep 5
  94. puts "done."
  95. rescue SignalException, SystemExit
  96. EventMachine::stop_event_loop if EventMachine.reactor_running?
  97. exit
  98. rescue StandardError => e
  99. STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
  100. STDERR.puts "Waiting for a couple of minutes..."
  101. sleep 120
  102. end
  103. end