delay_agent.rb 2.4KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. module Agents
  2. class DelayAgent < Agent
  3. default_schedule "every_12h"
  4. description <<-MD
  5. The DelayAgent stores received Events and emits copies of them on a schedule. Use this as a buffer or queue of Events.
  6. `max_events` should be set to the maximum number of events that you'd like to hold in the buffer. When this number is
  7. reached, new events will either be ignored, or will displace the oldest event already in the buffer, depending on
  8. whether you set `keep` to `newest` or `oldest`.
  9. `expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
  10. that you anticipate passing without this Agent receiving an incoming Event.
  11. MD
  12. def default_options
  13. {
  14. 'expected_receive_period_in_days' => "10",
  15. 'max_events' => "100",
  16. 'keep' => 'newest'
  17. }
  18. end
  19. def validate_options
  20. unless options['expected_receive_period_in_days'].present? && options['expected_receive_period_in_days'].to_i > 0
  21. errors.add(:base, "Please provide 'expected_receive_period_in_days' to indicate how many days can pass before this Agent is considered to be not working")
  22. end
  23. unless options['keep'].present? && options['keep'].in?(%w[newest oldest])
  24. errors.add(:base, "The 'keep' option is required and must be set to 'oldest' or 'newest'")
  25. end
  26. unless options['max_events'].present? && options['max_events'].to_i > 0
  27. errors.add(:base, "The 'max_events' option is required and must be an integer greater than 0")
  28. end
  29. end
  30. def working?
  31. last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
  32. end
  33. def receive(incoming_events)
  34. incoming_events.each do |event|
  35. memory['event_ids'] ||= []
  36. memory['event_ids'] << event.id
  37. if memory['event_ids'].length > interpolated['max_events'].to_i
  38. if interpolated['keep'] == 'newest'
  39. memory['event_ids'].shift
  40. else
  41. memory['event_ids'].pop
  42. end
  43. end
  44. end
  45. end
  46. def check
  47. if memory['event_ids'] && memory['event_ids'].length > 0
  48. received_events.where(id: memory['event_ids']).reorder('events.id asc').each do |event|
  49. create_event payload: event.payload
  50. end
  51. memory['event_ids'] = []
  52. end
  53. end
  54. end
  55. end