de_duplication_agent.rb 2.4KB

    module Agents class DeDuplicationAgent < Agent include FormConfigurable cannot_be_scheduled! description <<-MD The De-duplication Agent receives a stream of events and remits the event if it is not a duplicate. `property` the value that should be used to determine the uniqueness of the event (empty to use the whole payload) `lookback` amount of past Events to compare the value to (0 for unlimited) `expected_update_period_in_days` is used to determine if the Agent is working. MD event_description <<-MD The DeDuplicationAgent just reemits events it received. MD def default_options { 'property' => '{{value}}', 'lookback' => 100, 'expected_update_period_in_days' => 1 } end form_configurable :property form_configurable :lookback form_configurable :expected_update_period_in_days after_initialize :initialize_memory def initialize_memory memory['properties'] ||= [] end def validate_options unless options['lookback'].present? && options['expected_update_period_in_days'].present? errors.add(:base, "The lookback and expected_update_period_in_days fields are all required.") end end def working? event_created_within?(interpolated['expected_update_period_in_days']) && !recent_error_logs? end def receive(incoming_events) incoming_events.each do |event| handle(interpolated(event), event) end end private def handle(opts, event = nil) property = get_hash(options['property'].blank? ? JSON.dump(event.payload) : opts['property']) if is_unique?(property) created_event = create_event :payload => event.payload log("Propagating new event as '#{property}' is a new unique property.", :inbound_event => event ) update_memory(property, opts['lookback'].to_i) else log("Not propagating as incoming event is a duplicate.", :inbound_event => event ) end end def get_hash(property) if property.to_s.length > 10 Zlib::crc32(property).to_s else property end end def is_unique?(property) !memory['properties'].include?(property) end def update_memory(property, amount) if amount != 0 && memory['properties'].length == amount memory['properties'].shift end memory['properties'].push(property) end end end