twitter_stream.rb 4.1KB

  1. require 'cgi'
  2. require 'json'
  3. require 'em-http-request'
  4. require 'pp'
  5. class TwitterStream
  6. def initialize
  7. @running = true
  8. end
  9. def stop
  10. @running = false
  11. end
  12. def stream!(filters, agent, &block)
  13. stream = Twitter::JSONStream.connect(
  14. :path => "/1/statuses/#{(filters && filters.length > 0) ? 'filter' : 'sample'}.json#{"?track=#{ {|f| CGI::escape(f) }.join(",")}" if filters && filters.length > 0}",
  15. :ssl => true,
  16. :oauth => {
  17. :consumer_key => agent.twitter_consumer_key,
  18. :consumer_secret => agent.twitter_consumer_secret,
  19. :access_key => agent.twitter_oauth_token,
  20. :access_secret => agent.twitter_oauth_token_secret
  21. }
  22. )
  23. stream.each_item do |status|
  24. status = JSON.parse(status) if status.is_a?(String)
  25. next unless status
  26. next if status.has_key?('delete')
  27. next unless status['text']
  28. status['text'] = status['text'].gsub(/&lt;/, "<").gsub(/&gt;/, ">").gsub(/[\t\n\r]/, ' ')
  30. end
  31. stream.on_error do |message|
  32. STDERR.puts " --> Twitter error: #{message} <--"
  33. end
  34. stream.on_no_data do |message|
  35. STDERR.puts " --> Got no data for awhile; trying to reconnect."
  36. EventMachine::stop_event_loop
  37. end
  38. stream.on_max_reconnects do |timeout, retries|
  39. STDERR.puts " --> Oops, tried too many times! <--"
  40. EventMachine::stop_event_loop
  41. end
  42. end
  43. def load_and_run(agents)
  44. agents.group_by { |agent| agent.twitter_oauth_token }.each do |oauth_token, agents|
  45. filter_to_agent_map = { |agent| agent.options[:filters] }{}) { |m, f| m[f] = []; m }
  46. agents.each do |agent|
  47. agent.options[:filters] do |filter|
  48. filter_to_agent_map[filter] << agent
  49. end
  50. end
  51. recent_tweets = []
  52. stream!(filter_to_agent_map.keys, agents.first) do |status|
  53. if status["retweeted_status"].present? && status["retweeted_status"].is_a?(Hash)
  54. puts "Skipping retweet: #{status["text"]}"
  55. elsif recent_tweets.include?(status["id_str"])
  56. puts "Skipping duplicate tweet: #{status["text"]}"
  57. else
  58. recent_tweets << status["id_str"]
  59. recent_tweets.shift if recent_tweets.length > DUPLICATE_DETECTION_LENGTH
  60. puts status["text"]
  61. filter_to_agent_map.keys.each do |filter|
  62. if (filter.downcase.split(SEPARATOR) - status["text"].downcase.split(SEPARATOR)).reject(&:empty?) == [] # Hacky McHackerson
  63. filter_to_agent_map[filter].each do |agent|
  64. puts " -> #{}"
  65. agent.process_tweet(filter, status)
  66. end
  67. end
  68. end
  69. end
  70. end
  71. end
  72. end
  73. RELOAD_TIMEOUT = 10.minutes
  75. SEPARATOR = /[^\w_\-]+/
  76. def run
  77. if Agents::TwitterStreamAgent.dependencies_missing?
  78. STDERR.puts Agents::TwitterStreamAgent.twitter_dependencies_missing
  79. STDERR.flush
  80. return
  81. end
  82. require 'twitter/json_stream'
  83. while @running
  84. begin
  85. agents =
  86. EventMachine::run do
  87. EventMachine.add_periodic_timer(1) {
  88. EventMachine::stop_event_loop if !@running
  89. }
  90. EventMachine.add_periodic_timer(RELOAD_TIMEOUT) {
  91. puts "Reloading EventMachine and all Agents..."
  92. EventMachine::stop_event_loop
  93. }
  94. if agents.length == 0
  95. puts "No agents found. Will look again in a minute."
  96. EventMachine.add_timer(60) {
  97. EventMachine::stop_event_loop
  98. }
  99. else
  100. puts "Found #{agents.length} agent(s). Loading them now..."
  101. load_and_run agents
  102. end
  103. end
  104. rescue SignalException, SystemExit
  105. @running = false
  106. EventMachine::stop_event_loop if EventMachine.reactor_running?
  107. rescue StandardError => e
  108. STDERR.puts "\nException #{e.message}:\n#{e.backtrace.join("\n")}\n\n"
  109. STDERR.puts "Waiting for a couple of minutes..."
  110. sleep 120
  111. end
  112. end
  113. end
  114. end