|
module Agents
class CsvAgent < Agent
include FormConfigurable
include FileHandling
cannot_be_scheduled!
consumes_file_pointer!
def default_options
{
'mode' => 'parse',
'separator' => ',',
'use_fields' => '',
'output' => 'event_per_row',
'with_header' => 'true',
'data_path' => '$.data',
'data_key' => 'data'
}
end
description do
<<-MD
The `CsvAgent` parses or serializes CSV data. When parsing, events can either be emitted for the entire CSV, or one per row.
Set `mode` to `parse` to parse CSV from incoming event, when set to `serialize` the agent serilizes the data of events to CSV.
### Universal options
Specify the `separator` which is used to seperate the fields from each other (default is `,`).
`data_key` sets the key which contains the serialized CSV or parsed CSV data in emitted events.
### Parsing
If `use_fields` is set to a comma seperated string and the CSV file contains field headers the agent will only extract the specified fields.
`output` determines wheather one event per row is emitted or one event that includes all the rows.
Set `with_header` to `true` if first line of the CSV file are field names.
#{receiving_file_handling_agent_description}
When receiving the CSV data in a regular event use [JSONPath](http://goessner.net/articles/JsonPath/) to select the path in `data_path`. `data_path` is only used when the received event does not contain a 'file pointer'.
### Serializing
If `use_fields` is set to a comma seperated string and the first received event has a object at the specified `data_path` the generated CSV will only include the given fields.
Set `with_header` to `true` to include a field header in the CSV.
Use [JSONPath](http://goessner.net/articles/JsonPath/) in `data_path` to select with part of the received events should be serialized.
MD
end
event_description do
"Events will looks like this:\n\n %s" % if interpolated['mode'] == 'parse'
rows = if boolify(interpolated['with_header'])
[{'column' => 'row1 value1', 'column2' => 'row1 value2'}, {'column' => 'row2 value3', 'column2' => 'row2 value4'}]
else
[['row1 value1', 'row1 value2'], ['row2 value1', 'row2 value2']]
end
if interpolated['output'] == 'event_per_row'
Utils.pretty_print(interpolated['data_key'] => rows[0])
else
Utils.pretty_print(interpolated['data_key'] => rows)
end
else
Utils.pretty_print(interpolated['data_key'] => '"generated","csv","data"' + "\n" + '"column1","column2","column3"')
end
end
form_configurable :mode, type: :array, values: %w(parse serialize)
form_configurable :separator, type: :string
form_configurable :data_key, type: :string
form_configurable :with_header, type: :boolean
form_configurable :use_fields, type: :string
form_configurable :output, type: :array, values: %w(event_per_row event_per_file)
form_configurable :data_path, type: :string
def validate_options
if options['with_header'].blank? || ![true, false].include?(boolify(options['with_header']))
errors.add(:base, "The 'with_header' options is required and must be set to 'true' or 'false'")
end
if options['mode'] == 'serialize' && options['data_path'].blank?
errors.add(:base, "When mode is set to serialize data_path has to be present.")
end
end
def working?
received_event_without_error?
end
def receive(incoming_events)
case options['mode']
when 'parse'
parse(incoming_events)
when 'serialize'
serialize(incoming_events)
end
end
private
def serialize(incoming_events)
mo = interpolated(incoming_events.first)
rows = rows_from_events(incoming_events, mo)
csv = CSV.generate(col_sep: separator(mo), force_quotes: true ) do |csv|
if boolify(mo['with_header']) && rows.first.is_a?(Hash)
if mo['use_fields'].present?
csv << extract_options(mo)
else
csv << rows.first.keys
end
end
rows.each do |data|
if data.is_a?(Hash)
if mo['use_fields'].present?
csv << data.extract!(*extract_options(mo)).values
else
csv << data.values
end
else
csv << data
end
end
end
create_event payload: { mo['data_key'] => csv }
end
def rows_from_events(incoming_events, mo)
[].tap do |rows|
incoming_events.each do |event|
data = Utils.value_at(event.payload, mo['data_path'])
if data.is_a?(Array) && (data[0].is_a?(Array) || data[0].is_a?(Hash))
data.each { |row| rows << row }
else
rows << data
end
end
end
end
def parse(incoming_events)
incoming_events.each do |event|
mo = interpolated(event)
next unless io = local_get_io(event)
if mo['output'] == 'event_per_row'
parse_csv(io, mo) do |payload|
create_event payload: { mo['data_key'] => payload }
end
else
create_event payload: { mo['data_key'] => parse_csv(io, mo, []) }
end
end
end
def local_get_io(event)
if io = get_io(event)
io
else
Utils.value_at(event.payload, interpolated['data_path'])
end
end
def parse_csv(io, mo, array = nil)
CSV.new(io, col_sep: separator(mo), headers: boolify(mo['with_header'])).each do |row|
if block_given?
yield get_payload(row, mo)
else
array << get_payload(row, mo)
end
end
array
end
def separator(mo)
mo['separator'] == '\\t' ? "\t" : mo['separator']
end
def get_payload(row, mo)
if boolify(mo['with_header'])
if mo['use_fields'].present?
row.to_hash.extract!(*extract_options(mo))
else
row.to_hash
end
else
row
end
end
def extract_options(mo)
mo['use_fields'].split(',').map(&:strip)
end
end
end
|