Link Search Menu Expand Document Documentation Menu

Logstash

Logstash 是一个实时事件处理引擎。它是 OpenSearch 堆栈的一部分,该堆栈包括 OpenSearch、Beats 和 OpenSearch Dashboards。

您可以将事件从许多不同来源发送到 Logstash。Logstash 处理事件并将其发送到一个或多个目的地。例如,您可以将 Web 服务器的访问日志发送到 Logstash。Logstash 从每个日志中提取有用信息,并将其发送到 OpenSearch 等目的地。

将事件发送到 Logstash 允许您将事件处理与您的应用程序解耦。您的应用程序只需将事件发送到 Logstash,无需了解事件后续的任何处理。

开源社区最初构建 Logstash 是为了处理日志数据,但现在您可以处理任何类型的事件,包括 XML 或 JSON 格式的事件。

管道结构

Logstash 的工作方式是配置一个包含三个阶段的管道:输入、过滤器和输出。

每个阶段使用一个或多个插件。Logstash 拥有 200 多个内置插件,因此您很有可能找到所需的功能。除了内置插件,您还可以使用社区插件,甚至编写自己的插件。

管道结构如下

input {
  input_plugin => {}
}

filter {
  filter_plugin => {}
}

output {
  output_plugin => {}
}

其中

  • input(输入)同时接收来自多个来源的事件,如日志。Logstash 支持多种输入插件,用于 TCP/UDP、文件、syslog、Microsoft Windows EventLogs、stdin、HTTP 等。您还可以使用名为 Beats 的开源输入工具集合来收集事件。输入插件将事件发送到过滤器。
  • filter(过滤器)以某种方式解析和丰富事件。Logstash 拥有大量的过滤器插件,可以修改事件并将其传递给输出。例如,grok 过滤器将非结构化事件解析为字段,而 mutate 过滤器则更改字段。过滤器按顺序执行。
  • output(输出)将过滤后的事件发送到一个或多个目的地。Logstash 支持广泛的输出插件,用于 OpenSearch、TCP/UDP、电子邮件、文件、stdout、HTTP、Nagios 等目的地。

输入和输出阶段都支持编解码器,以在事件进入或退出管道时对其进行处理。一些流行的编解码器是 jsonmultilinejson 编解码器处理 JSON 格式的数据,而 multiline 编解码器将多行事件合并为单行。

您还可以在管道配置中编写条件语句,以便在满足特定条件时执行某些操作。

安装 Logstash

要在 OpenSearch 上安装 Logstash,请首先在您的集群上安装 Logstash,然后安装 OpenSearch Logstash 插件,具体步骤如下所述。

Tarball

确保已安装 Java Development Kit (JDK) 版本 8 或 11。

  1. Logstash 下载页面下载 Logstash tarball。

  2. 在终端中导航到下载的文件夹并解压文件。确保您的 Logstash 版本和平台与下载的文件匹配

      tar -zxvf logstash-8.8.2-linux-x86_64.tar.gz
    
  3. 导航到 logstash-8.8.2 目录。

  4. 使用以下命令安装插件

      bin/logstash-plugin install logstash-output-opensearch
    

    您将收到以下输出

      Validating logstash-output-opensearch
      Resolving mixin dependencies
      Updating mixin dependencies logstash-mixin-ecs_compatibility_support
      Bundler attempted to update logstash-mixin-ecs_compatibility_support but its version stayed the same
      Installing logstash-output-opensearch
      Installation successful
    

您可以将管道配置添加到 config 目录。Logstash 将来自插件的所有数据保存在 data 目录中。bin 目录包含用于启动 Logstash 和管理插件的二进制文件。

Docker

  1. 按照 Logstash 下载页面所述,拉取最新的 Logstash 镜像。

     docker pull docker.elastic.co/logstash/logstash:8.8.2
    
  2. 创建 Docker 网络

     docker network create test
    
  3. 使用此网络启动 OpenSearch

     docker run -p 9200:9200 -p 9600:9600 --name opensearch --net test -e "discovery.type=single-node" opensearchproject/opensearch:1.2.0
    
  4. 启动 Logstash

     docker run -it --rm --name logstash --net test opensearchproject/logstash-oss-with-opensearch-output-plugin:7.16.2 -e 'input { stdin { } } output {
       opensearch {
         hosts => ["https://opensearch:9200"]
         index => "opensearch-logstash-docker-%{+YYYY.MM.dd}"
         user => "admin"
         password => "admin"
         ssl => true
         ssl_certificate_verification => false
       }
     }'
    

从终端处理文本

您可以定义一个管道,监听 stdin 上的事件并输出 stdout 上的事件。stdinstdout 指的是您运行 Logstash 的终端。

要在终端中输入一些文本并查看输出中的事件数据

  1. 使用 -e 参数将管道配置直接传递给 Logstash 二进制文件。在这种情况下,stdin 是输入插件,stdout 是输出插件

     bin/logstash -e "input { stdin { } } output { stdout { } }"
    

    添加 —debug 标志以查看更详细的输出。

  2. 在终端中输入“hello world”。Logstash 处理文本并将其输出回终端

     {
      "message" => "hello world",
      "host" => "a483e711a548.ant.amazon.com",
      "@timestamp" => 2021-05-30T05:15:56.816Z,
      "@version" => "1"
     }
    

    message 字段包含您的原始输入。当您不在本地运行 Logstash 时,host 字段是 IP 地址。@timestamp 显示事件处理的日期和时间。Logstash 使用 @version 字段进行内部处理。

  3. Ctrl + C 关闭 Logstash。

故障排除

如果您已经有一个 Logstash 进程正在运行,您将收到错误。要解决此问题:

  1. data 目录中删除 .lock 文件

     cd data
     rm -rf .lock
    
  2. 重启 Logstash。

处理 JSON 或 HTTP 输入并将其输出到文件

定义一个处理 JSON 请求的管道

  1. 在您喜欢的任何文本编辑器中打开 config/pipeline.conf 文件。您可以创建带有任何扩展名的管道配置文件,.conf 扩展名是 Logstash 的约定。添加 json 编解码器以接受 JSON 作为输入,并添加 file 插件将处理后的事件输出到 .txt 文件

     input {
       stdin {
         codec => json
       }
     }
     output {
       file {
         path => "output.txt"
       }
     }
    

    要处理文件中的输入,请将输入文件添加到 events-data 目录,然后将其路径传递给输入中的 file 插件

     input {
       file {
         path => "events-data/input_data.log"
       }
     }
    
  2. 启动 Logstash

     $ bin/logstash -f config/pipeline.conf
    

    config/pipeline.confpipeline.conf 文件的相对路径。您也可以使用绝对路径。

  3. 在终端中添加一个 JSON 对象

     { "amount": 10, "quantity": 2}
    

    管道仅处理单行输入。如果您粘贴跨越多行的 JSON,则会收到错误。

  4. 检查 JSON 对象中的字段是否已添加到 output.txt 文件中。

     $ cat output.txt
    
     {
       "@version": "1",
       "@timestamp": "2021-05-30T05:52:52.421Z",
       "host": "a483e711a548.ant.amazon.com",
       "amount": 10,
       "quantity": 2
     }
    

    如果您输入了无效的 JSON 作为输入,您将看到 JSON 解析错误。Logstash 不会丢弃无效的 JSON,因为您可能仍然想对其进行处理。例如,您可以触发电子邮件或向 Slack 频道发送通知。

定义一个处理 HTTP 请求的管道

  1. 使用 http 插件通过 HTTP 将事件发送到 Logstash

     input {
       http {
         host => "127.0.0.1"
         port => 8080
       }
     }
    
     output {
       file {
         path => "output.txt"
       }
     }
    

    如果不指定任何选项,http 插件将绑定到 localhost 并监听 8080 端口。

  2. 启动 Logstash

     $ bin/logstash -f config/pipeline.conf
    
  3. 使用 Postman 发送 HTTP 请求。将 Content-Type 设置为值为 application/json 的 HTTP 头

     PUT 127.0.0.1:8080
    
     {
       "amount": 10,
       "quantity": 2
     }
    

    或者,您可以使用 curl 命令

     curl -XPUT -H "Content-Type: application/json" -d ' {"amount": 7, "quantity": 3 }' https://:8080 (https://:8080/)
    

    即使我们没有将 json 插件添加到输入中,管道配置仍然有效,因为 HTTP 插件会根据 Content-Type 头自动应用相应的编解码器。如果您指定值为 applications/json,Logstash 会将请求正文解析为 JSON。

    headers 字段包含 Logstash 收到的 HTTP 头。

     {
       "host": "127.0.0.1",
       "quantity": "3",
       "amount": 10,
       "@timestamp": "2021-05-30T06:05:48.135Z",
       "headers": {
         "http_version": "HTTP/1.1",
         "request_method": "PUT",
         "http_user_agent": "PostmanRuntime/7.26.8",
         "connection": "keep-alive",
         "postman_token": "c6cd29cf-1b37-4420-8db3-9faec66b9e7e",
         "http_host": "127.0.0.1:8080",
         "cache_control": "no-cache",
         "request_path": "/",
         "content_type": "application/json",
         "http_accept": "*/*",
         "content_length": "41",
         "accept_encoding": "gzip, deflate, br"
       },
     "@version": "1"
     }
    

自动重新加载管道配置

您可以将 Logstash 配置为检测管道配置文件或输入日志文件的任何更改,并自动重新加载配置。

stdin 插件不支持自动重新加载。

  1. 将名为 start_position 的选项及其值 beginning 添加到输入插件

     input {
       file {
         path => "/Users/<user>/Desktop/logstash7-12.1/events-data/input_file.log"
         start_position => "beginning"
       }
     }
    

    Logstash 只处理添加到输入文件中的任何新事件,并忽略已处理的事件,以避免在重启时多次处理同一事件。

    Logstash 将其进度记录在一个名为 sinceDB 的文件中。Logstash 为每个监视更改的文件创建一个 sinceDB 文件。

  2. 打开 sinceDB 文件以检查输入文件的处理进度。

     cd data/plugins/inputs/file/
     ls -al
    
     -rw-r--r--  1 user  staff   0 Jun 13 10:50 .sincedb_9e484f2a9e6c0d1bdfe6f23ac107ffc5
    
     cat .sincedb_9e484f2a9e6c0d1bdfe6f23ac107ffc5
    
     51575938 1 4 7727
    

    sinceDB 文件中的最后一个数字 (7727) 是已处理的最后一个已知事件的字节偏移量。

  3. 要从头开始处理输入文件,请删除 sinceDB 文件

     rm .sincedb_*
    
  4. 使用 —-config.reload.automatic 参数启动 Logstash。

     bin/logstash -f config/pipeline.conf --config.reload.automatic
    

    reload 选项仅在您在管道配置文件的末尾添加新行时才重新加载。

    示例输出

     {
        "message" => "216.243.171.38 - - [20/Sep/2017:19:11:52 +0200] \"GET /products/view/123 HTTP/1.1\" 200 12798 \"https://codingexplained.com/products\" \"Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)\"",
        "@version" => "1",
           "host" => "a483e711a548.ant.amazon.com",
           "path" => "/Users/kumarjao/Desktop/odfe1/logstash-7.12.1/events-data/input_file.log",
        "@timestamp" => 2021-06-13T18:03:30.423Z
     }
     {
        "message" => "91.59.108.75 - - [20/Sep/2017:20:11:43 +0200] \"GET /js/main.js HTTP/1.1\" 200 588 \"https://codingexplained.com/products/view/863\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:45.0) Gecko/20100101 Firefox/45.0\"",
       "@version" => "1",
           "host" => "a483e711a548.ant.amazon.com",
           "path" => "/Users/kumarjao/Desktop/odfe1/logstash-7.12.1/events-data/input_file.log",
     "@timestamp" => 2021-06-13T18:03:30.424Z
     }
    
  5. 在输入文件中添加新行。
    • Logstash 立即检测到更改并将新行作为事件处理。
  6. pipeline.conf 文件进行更改。
    • Logstash 立即检测到更改并重新加载修改后的管道。

相关文章

剩余 350 字符

有问题?

想做贡献?