local_file_agent.rb 6.1KB

    module Agents class LocalFileAgent < Agent include LongRunnable include FormConfigurable include FileHandling emits_file_pointer! default_schedule 'every_1h' def self.should_run? ENV['ENABLE_INSECURE_AGENTS'] == "true" end description do <<-MD The LocalFileAgent can watch a file/directory for changes or emit an event for every file in that directory. When receiving an event it writes the received data into a file. `mode` determines if the agent is emitting events for (changed) files or writing received event data to disk. ### Reading When `watch` is set to `true` the LocalFileAgent will watch the specified `path` for changes, the schedule is ignored and the file system is watched continuously. An event will be emitted for every detected change. When `watch` is set to `false` the agent will emit an event for every file in the directory on each scheduled run. #{emitting_file_handling_agent_description} ### Writing Every event will be writting into a file at `path`, Liquid interpolation is possible to change the path per event. When `append` is true the received data will be appended to the file. Use [Liquid](https://github.com/cantino/huginn/wiki/Formatting-Events-using-Liquid) templating in `data` to specify which part of the received event should be written. *Warning*: This type of Agent can read and write any file the user that runs the Huginn server has access to, and is #{Agents::LocalFileAgent.should_run? ? "**currently enabled**" : "**currently disabled**"}. Only enable this Agent if you trust everyone using your Huginn installation. You can enable this Agent in your .env file by setting `ENABLE_INSECURE_AGENTS` to `true`. MD end event_description do "Events will looks like this:\n\n %s" % if boolify(interpolated['watch']) Utils.pretty_print( "file_pointer" => { "file" => "/tmp/test/filename", "agent_id" => id }, "event_type" => "modified/added/removed" ) else Utils.pretty_print( "file_pointer" => { "file" => "/tmp/test/filename", "agent_id" => id } ) end end def default_options { 'mode' => 'read', 'watch' => 'true', 'append' => 'false', 'path' => "", 'data' => '{{ data }}' } end form_configurable :mode, type: :array, values: %w(read write) form_configurable :watch, type: :array, values: %w(true false) form_configurable :path, type: :string form_configurable :append, type: :boolean form_configurable :data, type: :string def validate_options if options['mode'].blank? || !['read', 'write'].include?(options['mode']) errors.add(:base, "The 'mode' option is required and must be set to 'read' or 'write'") end if options['watch'].blank? || ![true, false].include?(boolify(options['watch'])) errors.add(:base, "The 'watch' option is required and must be set to 'true' or 'false'") end if options['append'].blank? || ![true, false].include?(boolify(options['append'])) errors.add(:base, "The 'append' option is required and must be set to 'true' or 'false'") end if options['path'].blank? errors.add(:base, "The 'path' option is required.") end end def working? should_run?(false) && ((interpolated['mode'] == 'read' && check_path_existance && checked_without_error?) || (interpolated['mode'] == 'write' && received_event_without_error?)) end def check return if interpolated['mode'] != 'read' || boolify(interpolated['watch']) || !should_run? return unless check_path_existance(true) if File.directory?(expanded_path) Dir.glob(File.join(expanded_path, '*')).select { |f| File.file?(f) } else [expanded_path] end.each do |file| create_event payload: get_file_pointer(file) end end def receive(incoming_events) return if interpolated['mode'] != 'write' || !should_run? incoming_events.each do |event| mo = interpolated(event) File.open(File.expand_path(mo['path']), boolify(mo['append']) ? 'a' : 'w') do |file| file.write(mo['data']) end end end def start_worker? interpolated['mode'] == 'read' && boolify(interpolated['watch']) && should_run? && check_path_existance end def check_path_existance(log = true) if !File.exist?(expanded_path) error("File or directory '#{expanded_path}' does not exist") if log return false end true end def get_io(file) File.open(file, 'r') end def expanded_path @expanded_path ||= File.expand_path(interpolated['path']) end private def should_run?(log = true) if self.class.should_run? true else error("Unable to run because insecure agents are not enabled. Set ENABLE_INSECURE_AGENTS to true in the Huginn .env configuration.") if log false end end class Worker < LongRunnable::Worker def setup require 'listen' @listener = Listen.to(*listen_options, &method(:callback)) end def run sleep unless agent.check_path_existance(true) @listener.start sleep end def stop @listener.stop end private def callback(*changes) AgentRunner.with_connection do changes.zip([:modified, :added, :removed]).each do |files, event_type| files.each do |file| agent.create_event payload: agent.get_file_pointer(file).merge(event_type: event_type) end end agent.touch(:last_check_at) end end def listen_options if File.directory?(agent.expanded_path) [agent.expanded_path, ignore!: [] ] else [File.dirname(agent.expanded_path), { ignore!: [], only: /\A#{Regexp.escape(File.basename(agent.expanded_path))}\z/ } ] end end end end end