peak_detector_agent.rb 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. require 'pp'
  2. module Agents
  3. class PeakDetectorAgent < Agent
  4. cannot_be_scheduled!
  5. description <<-MD
  6. Use a PeakDetectorAgent to watch for peaks in an event stream. When a peak is detected, the resulting Event will have a payload message of `message`. You can include extractions in the message, for example: `I saw a bar of: <foo.bar>`
  7. The `value_path` value is a [JSONPaths](http://goessner.net/articles/JsonPath/) to the value of interest. `group_by_path` is a hash path that will be used to group values, if present.
  8. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent.
  9. You may set `window_duration` to change the default memory window length of two weeks,
  10. `peak_spacing` to change the default minimum peak spacing of two days, and
  11. `std_multiple` to change the default standard deviation threshold multiple of 3.
  12. MD
  13. event_description <<-MD
  14. Events look like:
  15. {
  16. "message": "Your message",
  17. "peak": 6,
  18. "peak_time": 3456789242,
  19. "grouped_by": "something"
  20. }
  21. MD
  22. def validate_options
  23. unless options[:expected_receive_period_in_days].present? && options[:message].present? && options[:value_path].present?
  24. errors.add(:base, "expected_receive_period_in_days, value_path, and message are required")
  25. end
  26. end
  27. def default_options
  28. {
  29. :expected_receive_period_in_days => "2",
  30. :group_by_path => "filter",
  31. :value_path => "count",
  32. :message => "A peak was found"
  33. }
  34. end
  35. def working?
  36. last_receive_at && last_receive_at > options[:expected_receive_period_in_days].to_i.days.ago
  37. end
  38. def receive(incoming_events)
  39. incoming_events.sort_by(&:created_at).each do |event|
  40. group = group_for(event)
  41. remember group, event
  42. check_for_peak group, event
  43. end
  44. end
  45. private
  46. def check_for_peak(group, event)
  47. memory[:peaks] ||= {}
  48. memory[:peaks][group] ||= []
  49. if memory[:data][group].length > 4 && (memory[:peaks][group].empty? || memory[:peaks][group].last < event.created_at.to_i - peak_spacing)
  50. average_value, standard_deviation = stats_for(group, :skip_last => 2)
  51. newest_value = memory[:data][group][-1].first.to_f
  52. second_newest_value, second_newest_time = memory[:data][group][-2].map(&:to_f)
  53. #pp({:newest_value => newest_value,
  54. # :second_newest_value => second_newest_value,
  55. # :average_value => average_value,
  56. # :standard_deviation => standard_deviation,
  57. # :threshold => average_value + std_multiple * standard_deviation })
  58. if newest_value < second_newest_value && second_newest_value > average_value + std_multiple * standard_deviation
  59. memory[:peaks][group] << second_newest_time
  60. memory[:peaks][group].reject! { |p| p <= second_newest_time - window_duration }
  61. create_event :payload => {:message => options[:message], :peak => second_newest_value, :peak_time => second_newest_time, :grouped_by => group.to_s}
  62. end
  63. end
  64. end
  65. def stats_for(group, options = {})
  66. data = memory[:data][group].map { |d| d.first.to_f }
  67. data = data[0...(memory[:data][group].length - (options[:skip_last] || 0))]
  68. length = data.length.to_f
  69. mean = 0
  70. mean_variance = 0
  71. data.each do |value|
  72. mean += value
  73. end
  74. mean /= length
  75. data.each do |value|
  76. variance = (value - mean)**2
  77. mean_variance += variance
  78. end
  79. mean_variance /= length
  80. standard_deviation = Math.sqrt(mean_variance)
  81. [mean, standard_deviation]
  82. end
  83. def window_duration
  84. (options[:window_duration].present? && options[:window_duration].to_i) || 2.weeks
  85. end
  86. def std_multiple
  87. (options[:std_multiple].present? && options[:std_multiple].to_i) || 3
  88. end
  89. def peak_spacing
  90. (options[:peak_spacing].present? && options[:peak_spacing].to_i) || 2.days
  91. end
  92. def group_for(event)
  93. ((options[:group_by_path].present? && Utils.value_at(event.payload, options[:group_by_path])) || 'no_group').to_sym
  94. end
  95. def remember(group, event)
  96. memory[:data] ||= {}
  97. memory[:data][group] ||= []
  98. memory[:data][group] << [Utils.value_at(event.payload, options[:value_path]), event.created_at.to_i]
  99. cleanup group
  100. end
  101. def cleanup(group)
  102. newest_time = memory[:data][group].last.last
  103. memory[:data][group].reject! { |value, time| time <= newest_time - window_duration }
  104. end
  105. end
  106. end