Add CsvAgent which parses CSV and serializes data to CSV

Dominik Sander 9 gadi atpakaļ
vecāks
revīzija
74c4a2d5b8
2 mainītis faili ar 439 papildinājumiem un 0 dzēšanām
  1. 195 0
      app/models/agents/csv_agent.rb
  2. 244 0
      spec/models/agents/csv_agent_spec.rb

+ 195 - 0
app/models/agents/csv_agent.rb

@@ -0,0 +1,195 @@
1
+module Agents
2
+  class CsvAgent < Agent
3
+    include FormConfigurable
4
+    include FileHandling
5
+
6
+    cannot_be_scheduled!
7
+    consumes_file_pointer!
8
+
9
+    def default_options
10
+      {
11
+        'mode' => 'parse',
12
+        'separator' => ',',
13
+        'use_fields' => '',
14
+        'output' => 'event_per_row',
15
+        'with_header' => 'true',
16
+        'data_path' => '$.data',
17
+        'data_key' => 'data'
18
+      }
19
+    end
20
+
21
+    description do
22
+      <<-MD
23
+        The `CsvAgent` parses or serializes CSV data. When parsing, events can either be emitted for the entire CSV, or one per row.
24
+
25
+        Set `mode` to `parse` to parse CSV from incoming event, when set to `serialize` the agent serilizes the data of events to CSV.
26
+
27
+        ### Universal options
28
+
29
+        Specify the `separator` which is used to seperate the fields from each other (default is `,`).
30
+
31
+        `data_key` sets the key which contains the serialized CSV or parsed CSV data in emitted events.
32
+
33
+        ### Parsing
34
+
35
+        If `use_fields` is set to a comma seperated string and the CSV file contains field headers the agent will only extract the specified fields.
36
+
37
+        `output` determines wheather one event per row is emitted or one event that includes all the rows.
38
+
39
+        Set `with_header` to `true` if first line of the CSV file are field names.
40
+
41
+        #{receiving_file_handling_agent_description}
42
+
43
+        When receiving the CSV data in a regular event use [JSONPath](http://goessner.net/articles/JsonPath/) to select the path in `data_path`. `data_path` is only used when the received event does not contain a 'file pointer'.
44
+
45
+        ### Serializing
46
+
47
+        If `use_fields` is set to a comma seperated string and the first received event has a object at the specified `data_path` the generated CSV will only include the given fields.
48
+
49
+        Set `with_header` to `true` to include a field header in the CSV.
50
+
51
+        Use [JSONPath](http://goessner.net/articles/JsonPath/) in `data_path` to select with part of the received events should be serialized.
52
+      MD
53
+    end
54
+
55
+    event_description do
56
+      "Events will looks like this:\n\n    %s" % if interpolated['mode'] == 'parse'
57
+        rows = if boolify(interpolated['with_header'])
58
+          [{'column' => 'row1 value1', 'column2' => 'row1 value2'}, {'column' => 'row2 value3', 'column2' => 'row2 value4'}]
59
+        else
60
+          [['row1 value1', 'row1 value2'], ['row2 value1', 'row2 value2']]
61
+        end
62
+        if interpolated['output'] == 'event_per_row'
63
+          Utils.pretty_print(interpolated['data_key'] => rows[0])
64
+        else
65
+          Utils.pretty_print(interpolated['data_key'] => rows)
66
+        end
67
+      else
68
+        Utils.pretty_print(interpolated['data_key'] => '"generated","csv","data"' + "\n" + '"column1","column2","column3"')
69
+      end
70
+    end
71
+
72
+    form_configurable :mode, type: :array, values: %w(parse serialize)
73
+    form_configurable :separator, type: :string
74
+    form_configurable :data_key, type: :string
75
+    form_configurable :with_header, type: :boolean
76
+    form_configurable :use_fields, type: :string
77
+    form_configurable :output, type: :array, values: %w(event_per_row event_per_file)
78
+    form_configurable :data_path, type: :string
79
+
80
+    def validate_options
81
+      if options['with_header'].blank? || ![true, false].include?(boolify(options['with_header']))
82
+        errors.add(:base, "The 'with_header' options is required and must be set to 'true' or 'false'")
83
+      end
84
+      if options['mode'] == 'serialize' && options['data_path'].blank?
85
+        errors.add(:base, "When mode is set to serialize data_path has to be present.")
86
+      end
87
+    end
88
+
89
+    def working?
90
+      received_event_without_error?
91
+    end
92
+
93
+    def receive(incoming_events)
94
+      case options['mode']
95
+      when 'parse'
96
+        parse(incoming_events)
97
+      when 'serialize'
98
+        serialize(incoming_events)
99
+      end
100
+    end
101
+
102
+    private
103
+    def serialize(incoming_events)
104
+      mo = interpolated(incoming_events.first)
105
+      rows = rows_from_events(incoming_events, mo)
106
+      csv = CSV.generate(col_sep: separator(mo), force_quotes: true ) do |csv|
107
+        if boolify(mo['with_header']) && rows.first.is_a?(Hash)
108
+          if mo['use_fields'].present?
109
+            csv << extract_options(mo)
110
+          else
111
+            csv << rows.first.keys
112
+          end
113
+        end
114
+        rows.each do |data|
115
+          if data.is_a?(Hash)
116
+            if mo['use_fields'].present?
117
+              csv << data.extract!(*extract_options(mo)).values
118
+            else
119
+              csv << data.values
120
+            end
121
+          else
122
+            csv << data
123
+          end
124
+        end
125
+      end
126
+      create_event payload: { mo['data_key'] => csv }
127
+    end
128
+
129
+    def rows_from_events(incoming_events, mo)
130
+      [].tap do |rows|
131
+        incoming_events.each do |event|
132
+          data = Utils.value_at(event.payload, mo['data_path'])
133
+          if data.is_a?(Array) && (data[0].is_a?(Array) || data[0].is_a?(Hash))
134
+            data.each { |row| rows << row }
135
+          else
136
+            rows << data
137
+          end
138
+        end
139
+      end
140
+    end
141
+
142
+    def parse(incoming_events)
143
+      incoming_events.each do |event|
144
+        mo = interpolated(event)
145
+        next unless io = local_get_io(event)
146
+        if mo['output'] == 'event_per_row'
147
+          parse_csv(io, mo) do |payload|
148
+            create_event payload: { mo['data_key'] => payload }
149
+          end
150
+        else
151
+          create_event payload: { mo['data_key'] => parse_csv(io, mo, []) }
152
+        end
153
+      end
154
+    end
155
+
156
+    def local_get_io(event)
157
+      if io = get_io(event)
158
+        io
159
+      else
160
+        Utils.value_at(event.payload, interpolated['data_path'])
161
+      end
162
+    end
163
+
164
+    def parse_csv(io, mo, array = nil)
165
+      CSV.new(io, col_sep: separator(mo), headers: boolify(mo['with_header'])).each do |row|
166
+        if block_given?
167
+          yield get_payload(row, mo)
168
+        else
169
+          array << get_payload(row, mo)
170
+        end
171
+      end
172
+      array
173
+    end
174
+
175
+    def separator(mo)
176
+      mo['separator'] == '\\t' ? "\t" : mo['separator']
177
+    end
178
+
179
+    def get_payload(row, mo)
180
+      if boolify(mo['with_header'])
181
+        if mo['use_fields'].present?
182
+          row.to_hash.extract!(*extract_options(mo))
183
+        else
184
+          row.to_hash
185
+        end
186
+      else
187
+        row
188
+      end
189
+    end
190
+
191
+    def extract_options(mo)
192
+      mo['use_fields'].split(',').map(&:strip)
193
+    end
194
+  end
195
+end

+ 244 - 0
spec/models/agents/csv_agent_spec.rb

@@ -0,0 +1,244 @@
1
+require 'rails_helper'
2
+
3
+describe Agents::CsvAgent do
4
+  before(:each) do
5
+    @valid_params = {
6
+                      'mode' => 'parse',
7
+                      'separator' => ',',
8
+                      'use_fields' => '',
9
+                      'output' => 'event_per_row',
10
+                      'with_header' => 'true',
11
+                      'data_path' => '$.data',
12
+                      'data_key' => 'data'
13
+                    }
14
+
15
+    @checker = Agents::CsvAgent.new(:name => 'somename', :options => @valid_params)
16
+    @checker.user = users(:jane)
17
+    @checker.save!
18
+    @lfa = Agents::LocalFileAgent.new(name: 'local', options: {path: '{{}}', watch: 'false', append: 'false', mode: 'read'})
19
+    @lfa.user = users(:jane)
20
+    @lfa.save!
21
+  end
22
+
23
+  it_behaves_like 'FileHandlingConsumer'
24
+
25
+  context '#validate_options' do
26
+    it 'is valid with the given options' do
27
+      expect(@checker).to be_valid
28
+    end
29
+
30
+    it "requires with_header to be either 'true' or 'false'" do
31
+      @checker.options['with_header'] = 'true'
32
+      expect(@checker).to be_valid
33
+      @checker.options['with_header'] = 'false'
34
+      expect(@checker).to be_valid
35
+      @checker.options['with_header'] = 'test'
36
+      expect(@checker).not_to be_valid
37
+    end
38
+
39
+    it "data_path has to be set in serialize mode" do
40
+      @checker.options['mode'] = 'serialize'
41
+      @checker.options['data_path'] = ''
42
+      expect(@checker).not_to be_valid
43
+    end
44
+  end
45
+
46
+  context '#working' do
47
+    it 'is not working without having received an event' do
48
+      expect(@checker).not_to be_working
49
+    end
50
+
51
+    it 'is working after receiving an event without error' do
52
+      @checker.last_receive_at = Time.now
53
+      expect(@checker).to be_working
54
+    end
55
+  end
56
+
57
+  context '#receive' do
58
+    after(:all) do
59
+      FileUtils.rm(File.join(Rails.root, 'tmp', 'csv'))
60
+    end
61
+
62
+    def event_with_contents(contents)
63
+      path = File.join(Rails.root, 'tmp', 'csv')
64
+      File.open(path, 'w') do |f|
65
+        f.write(contents)
66
+      end
67
+      Event.new(payload: { 'file_pointer' => {'agent_id' => @lfa.id, 'file' => path } }, user_id: @checker.user_id)
68
+    end
69
+
70
+    context "agent options" do
71
+      let(:with_headers) { event_with_contents("one,two\n1,2\n2,3") }
72
+      let(:without_headers) { event_with_contents("1,2\n2,3") }
73
+
74
+      context "output" do
75
+        it "creates one event per row" do
76
+          @checker.options['output'] = 'event_per_row'
77
+          expect { @checker.receive([with_headers]) }.to change(Event, :count).by(2)
78
+          expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '2', 'two' => '3'})
79
+        end
80
+
81
+        it "creates one event per file" do
82
+          @checker.options['output'] = 'event_per_file'
83
+          expect { @checker.receive([with_headers]) }.to change(Event, :count).by(1)
84
+          expect(Event.last.payload).to eq(@checker.options['data_key'] => [{"one"=>"1", "two"=>"2"}, {"one"=>"2", "two"=>"3"}])
85
+        end
86
+      end
87
+
88
+      context "with_header" do
89
+        it "works without headers" do
90
+          @checker.options['with_header'] = 'false'
91
+          expect { @checker.receive([without_headers]) }.to change(Event, :count).by(2)
92
+          expect(Event.last.payload).to eq({@checker.options['data_key']=>["2", "3"]})
93
+        end
94
+
95
+        it "works without headers and event_per_file" do
96
+          @checker.options['with_header'] = 'false'
97
+          @checker.options['output'] = 'event_per_file'
98
+          expect { @checker.receive([without_headers]) }.to change(Event, :count).by(1)
99
+          expect(Event.last.payload).to eq({@checker.options['data_key']=>[['1', '2'], ["2", "3"]]})
100
+        end
101
+      end
102
+
103
+      context "use_fields" do
104
+        it "extracts the specified columns" do
105
+          @checker.options['use_fields'] = 'one'
106
+          expect { @checker.receive([with_headers]) }.to change(Event, :count).by(2)
107
+          expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '2'})
108
+        end
109
+      end
110
+
111
+      context "data_path" do
112
+        it "can receive the CSV via a regular event" do
113
+          @checker.options['data_path'] = '$.data'
114
+          event = Event.new(payload: {'data' => "one,two\r\n1,2\r\n2,3"})
115
+          expect { @checker.receive([event]) }.to change(Event, :count).by(2)
116
+          expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '2', 'two' => '3'})
117
+        end
118
+      end
119
+    end
120
+
121
+    context "handling different CSV formats" do
122
+      it "works with windows line endings" do
123
+        event = event_with_contents("one,two\r\n1,2\r\n2,3")
124
+        expect { @checker.receive([event]) }.to change(Event, :count).by(2)
125
+        expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '2', 'two' => '3'})
126
+      end
127
+
128
+      it "works with OSX line endings" do
129
+        event = event_with_contents("one,two\r1,2\r2,3")
130
+        expect { @checker.receive([event]) }.to change(Event, :count).by(2)
131
+        expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '2', 'two' => '3'})
132
+      end
133
+
134
+      it "handles quotes correctly" do
135
+        event = event_with_contents("\"one\",\"two\"\n1,2\n\"\"2, two\",3")
136
+        expect { @checker.receive([event]) }.to change(Event, :count).by(2)
137
+        expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '"2, two', 'two' => '3'})
138
+      end
139
+
140
+      it "works with tab seperated csv" do
141
+        event = event_with_contents("one\ttwo\r\n1\t2\r\n2\t3")
142
+        @checker.options['separator'] = '\\t'
143
+        expect { @checker.receive([event]) }.to change(Event, :count).by(2)
144
+        expect(Event.last.payload).to eq(@checker.options['data_key'] => {'one' => '2', 'two' => '3'})
145
+      end
146
+    end
147
+
148
+    context "serializing" do
149
+      before(:each) do
150
+        @checker.options['mode'] = 'serialize'
151
+        @checker.options['data_path'] = '$.data'
152
+        @checker.options['data_key'] = 'data'
153
+      end
154
+
155
+      it "writes headers when with_header is true" do
156
+        event = Event.new(payload: { 'data' => {'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'} })
157
+        expect { @checker.receive([event])}.to change(Event, :count).by(1)
158
+        expect(Event.last.payload).to eq('data' => "\"key\",\"key2\",\"key3\"\n\"value\",\"value2\",\"value3\"\n")
159
+      end
160
+
161
+      it "writes one row per received event" do
162
+        event = Event.new(payload: { 'data' => {'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'} })
163
+        event2 = Event.new(payload: { 'data' => {'key' => '2value', 'key2' => '2value2', 'key3' => '2value3'} })
164
+        expect { @checker.receive([event, event2])}.to change(Event, :count).by(1)
165
+        expect(Event.last.payload).to eq('data' => "\"key\",\"key2\",\"key3\"\n\"value\",\"value2\",\"value3\"\n\"2value\",\"2value2\",\"2value3\"\n")
166
+      end
167
+
168
+      it "accepts multiple rows per event" do
169
+        event = Event.new(payload: { 'data' => [{'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'}, {'key' => '2value', 'key2' => '2value2', 'key3' => '2value3'}] })
170
+        expect { @checker.receive([event])}.to change(Event, :count).by(1)
171
+        expect(Event.last.payload).to eq('data' => "\"key\",\"key2\",\"key3\"\n\"value\",\"value2\",\"value3\"\n\"2value\",\"2value2\",\"2value3\"\n")
172
+      end
173
+
174
+      it "does not write the headers when with_header is false" do
175
+        @checker.options['with_header'] = 'false'
176
+        event = Event.new(payload: { 'data' => {'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'} })
177
+        expect { @checker.receive([event])}.to change(Event, :count).by(1)
178
+        expect(Event.last.payload).to eq('data' => "\"value\",\"value2\",\"value3\"\n")
179
+      end
180
+
181
+      it "only serialize the keys specified in use_fields" do
182
+        @checker.options['use_fields'] = 'key2, key3'
183
+        event = Event.new(payload: { 'data' => {'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'} })
184
+        expect { @checker.receive([event])}.to change(Event, :count).by(1)
185
+        expect(Event.last.payload).to eq('data' => "\"key2\",\"key3\"\n\"value2\",\"value3\"\n")
186
+      end
187
+
188
+      it "respects the order of use_fields" do
189
+        @checker.options['use_fields'] = 'key3, key'
190
+        event = Event.new(payload: { 'data' => {'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'} })
191
+        expect { @checker.receive([event])}.to change(Event, :count).by(1)
192
+        expect(Event.last.payload).to eq('data' => "\"key3\",\"key\"\n\"value3\",\"value\"\n")
193
+      end
194
+
195
+      it "respects use_fields and writes no header" do
196
+        @checker.options['with_header'] = 'false'
197
+        @checker.options['use_fields'] = 'key2, key3'
198
+        event = Event.new(payload: { 'data' => {'key' => 'value', 'key2' => 'value2', 'key3' => 'value3'} })
199
+        expect { @checker.receive([event])}.to change(Event, :count).by(1)
200
+        expect(Event.last.payload).to eq('data' => "\"value2\",\"value3\"\n")
201
+      end
202
+
203
+      context "arrays" do
204
+        it "does not write a header" do
205
+          @checker.options['with_header'] = 'false'
206
+          event = Event.new(payload: { 'data' => ['value1', 'value2'] })
207
+          event2 = Event.new(payload: { 'data' => ['value3', 'value4'] })
208
+          expect { @checker.receive([event, event2])}.to change(Event, :count).by(1)
209
+          expect(Event.last.payload).to eq('data' => "\"value1\",\"value2\"\n\"value3\",\"value4\"\n")
210
+        end
211
+
212
+        it "handles nested arrays" do
213
+          event = Event.new(payload: { 'data' => [['value1', 'value2'], ['value3', 'value4']] })
214
+          expect { @checker.receive([event])}.to change(Event, :count).by(1)
215
+          expect(Event.last.payload).to eq('data' => "\"value1\",\"value2\"\n\"value3\",\"value4\"\n")
216
+        end
217
+      end
218
+    end
219
+  end
220
+
221
+  context '#event_description' do
222
+    it "works with event_per_row and headers" do
223
+      @checker.options['output'] = 'event_per_row'
224
+      @checker.options['with_header'] = 'true'
225
+      description = @checker.event_description
226
+      expect(description).not_to match(/\n\s+\[\n/)
227
+      expect(description).to include(": {\n")
228
+    end
229
+
230
+    it "works with event_per_file and without headers" do
231
+      @checker.options['output'] = 'event_per_file'
232
+      @checker.options['with_header'] = 'false'
233
+      description = @checker.event_description
234
+      expect(description).to match(/\n\s+\[\n/)
235
+      expect(description).not_to include(": {\n")
236
+    end
237
+
238
+    it "shows dummy CSV when in serialize mode" do
239
+      @checker.options['mode'] = 'serialize'
240
+      description = @checker.event_description
241
+      expect(description).to include('"generated\",\"csv')
242
+    end
243
+  end
244
+end