peak_detector_agent.rb 4.5KB

    require 'pp' module Agents class PeakDetectorAgent < Agent cannot_be_scheduled! description <<-MD Use a PeakDetectorAgent to watch for peaks in an event stream. When a peak is detected, the resulting Event will have a payload message of `message`. You can include extractions in the message, for example: `I saw a bar of: <foo.bar>` The `value_path` value is a [JSONPaths](http://goessner.net/articles/JsonPath/) to the value of interest. `group_by_path` is a hash path that will be used to group values, if present. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent. You may set `window_duration_in_days` to change the default memory window length of `14` days, `min_peak_spacing_in_days` to change the default minimum peak spacing of `2` days (peaks closer together will be ignored), and `std_multiple` to change the default standard deviation threshold multiple of `3`. MD event_description <<-MD Events look like: { "message": "Your message", "peak": 6, "peak_time": 3456789242, "grouped_by": "something" } MD def validate_options unless options[:expected_receive_period_in_days].present? && options[:message].present? && options[:value_path].present? errors.add(:base, "expected_receive_period_in_days, value_path, and message are required") end end def default_options { :expected_receive_period_in_days => "2", :group_by_path => "filter", :value_path => "count", :message => "A peak was found" } end def working? last_receive_at && last_receive_at > options[:expected_receive_period_in_days].to_i.days.ago && !recent_error_logs? end def receive(incoming_events) incoming_events.sort_by(&:created_at).each do |event| group = group_for(event) remember group, event check_for_peak group, event end end private def check_for_peak(group, event) memory[:peaks] ||= {} memory[:peaks][group] ||= [] if memory[:data][group].length > 4 && (memory[:peaks][group].empty? || memory[:peaks][group].last < event.created_at.to_i - peak_spacing) average_value, standard_deviation = stats_for(group, :skip_last => 1) newest_value, newest_time = memory[:data][group][-1].map(&:to_f) #p [newest_value, average_value, average_value + std_multiple * standard_deviation, standard_deviation] if newest_value > average_value + std_multiple * standard_deviation memory[:peaks][group] << newest_time memory[:peaks][group].reject! { |p| p <= newest_time - window_duration } create_event :payload => {:message => options[:message], :peak => newest_value, :peak_time => newest_time, :grouped_by => group.to_s} end end end def stats_for(group, options = {}) data = memory[:data][group].map { |d| d.first.to_f } data = data[0...(data.length - (options[:skip_last] || 0))] length = data.length.to_f mean = 0 mean_variance = 0 data.each do |value| mean += value end mean /= length data.each do |value| variance = (value - mean)**2 mean_variance += variance end mean_variance /= length standard_deviation = Math.sqrt(mean_variance) [mean, standard_deviation] end def window_duration if options[:window_duration].present? # The older option options[:window_duration].to_i else (options[:window_duration_in_days] || 14).to_f.days end end def std_multiple (options[:std_multiple] || 3).to_f end def peak_spacing if options[:peak_spacing].present? # The older option options[:peak_spacing].to_i else (options[:min_peak_spacing_in_days] || 2).to_f.days end end def group_for(event) ((options[:group_by_path].present? && Utils.value_at(event.payload, options[:group_by_path])) || 'no_group').to_sym end def remember(group, event) memory[:data] ||= {} memory[:data][group] ||= [] memory[:data][group] << [Utils.value_at(event.payload, options[:value_path]), event.created_at.to_i] cleanup group end def cleanup(group) newest_time = memory[:data][group].last.last memory[:data][group].reject! { |value, time| time <= newest_time - window_duration } end end end