require 'pp' module Agents class PeakDetectorAgent < Agent cannot_be_scheduled! description <<-MD Use a PeakDetectorAgent to watch for peaks in an event stream. When a peak is detected, the resulting Event will have a payload message of `message`. You can include extractions in the message, for example: `I saw a bar of: ` The `value_path` value is a [JSONPaths](http://goessner.net/articles/JsonPath/) to the value of interest. `group_by_path` is a hash path that will be used to group values, if present. Set `expected_receive_period_in_days` to the maximum amount of time that you'd expect to pass between Events being received by this Agent. You may set `window_duration` to change the default memory window length of two weeks, `peak_spacing` to change the default minimum peak spacing of two days, and `std_multiple` to change the default standard deviation threshold multiple of 3. MD event_description <<-MD Events look like: { "message": "Your message", "peak": 6, "peak_time": 3456789242, "grouped_by": "something" } MD def validate_options unless options[:expected_receive_period_in_days].present? && options[:message].present? && options[:value_path].present? errors.add(:base, "expected_receive_period_in_days, value_path, and message are required") end end def default_options { :expected_receive_period_in_days => "2", :group_by_path => "filter", :value_path => "count", :message => "A peak was found" } end def working? last_receive_at && last_receive_at > options[:expected_receive_period_in_days].to_i.days.ago && !recent_error_logs? end def receive(incoming_events) incoming_events.sort_by(&:created_at).each do |event| group = group_for(event) remember group, event check_for_peak group, event end end private def check_for_peak(group, event) memory[:peaks] ||= {} memory[:peaks][group] ||= [] if memory[:data][group].length > 4 && (memory[:peaks][group].empty? || memory[:peaks][group].last < event.created_at.to_i - peak_spacing) average_value, standard_deviation = stats_for(group, :skip_last => 2) newest_value = memory[:data][group][-1].first.to_f second_newest_value, second_newest_time = memory[:data][group][-2].map(&:to_f) #pp({:newest_value => newest_value, # :second_newest_value => second_newest_value, # :average_value => average_value, # :standard_deviation => standard_deviation, # :threshold => average_value + std_multiple * standard_deviation }) if newest_value < second_newest_value && second_newest_value > average_value + std_multiple * standard_deviation memory[:peaks][group] << second_newest_time memory[:peaks][group].reject! { |p| p <= second_newest_time - window_duration } create_event :payload => {:message => options[:message], :peak => second_newest_value, :peak_time => second_newest_time, :grouped_by => group.to_s} end end end def stats_for(group, options = {}) data = memory[:data][group].map { |d| d.first.to_f } data = data[0...(memory[:data][group].length - (options[:skip_last] || 0))] length = data.length.to_f mean = 0 mean_variance = 0 data.each do |value| mean += value end mean /= length data.each do |value| variance = (value - mean)**2 mean_variance += variance end mean_variance /= length standard_deviation = Math.sqrt(mean_variance) [mean, standard_deviation] end def window_duration (options[:window_duration].present? && options[:window_duration].to_i) || 2.weeks end def std_multiple (options[:std_multiple].present? && options[:std_multiple].to_i) || 3 end def peak_spacing (options[:peak_spacing].present? && options[:peak_spacing].to_i) || 2.days end def group_for(event) ((options[:group_by_path].present? && Utils.value_at(event.payload, options[:group_by_path])) || 'no_group').to_sym end def remember(group, event) memory[:data] ||= {} memory[:data][group] ||= [] memory[:data][group] << [Utils.value_at(event.payload, options[:value_path]), event.created_at.to_i] cleanup group end def cleanup(group) newest_time = memory[:data][group].last.last memory[:data][group].reject! { |value, time| time <= newest_time - window_duration } end end end