Logstash
Logstash 是一个实时事件处理引擎。它是 OpenSearch 堆栈的一部分,该堆栈包括 OpenSearch、Beats 和 OpenSearch Dashboards。
您可以将事件从许多不同来源发送到 Logstash。Logstash 处理事件并将其发送到一个或多个目的地。例如,您可以将 Web 服务器的访问日志发送到 Logstash。Logstash 从每个日志中提取有用信息,并将其发送到 OpenSearch 等目的地。
将事件发送到 Logstash 允许您将事件处理与您的应用程序解耦。您的应用程序只需将事件发送到 Logstash,无需了解事件后续的任何处理。
开源社区最初构建 Logstash 是为了处理日志数据,但现在您可以处理任何类型的事件,包括 XML 或 JSON 格式的事件。
管道结构
Logstash 的工作方式是配置一个包含三个阶段的管道:输入、过滤器和输出。
每个阶段使用一个或多个插件。Logstash 拥有 200 多个内置插件,因此您很有可能找到所需的功能。除了内置插件,您还可以使用社区插件,甚至编写自己的插件。
管道结构如下
input {
input_plugin => {}
}
filter {
filter_plugin => {}
}
output {
output_plugin => {}
}
其中
input
(输入)同时接收来自多个来源的事件,如日志。Logstash 支持多种输入插件,用于 TCP/UDP、文件、syslog、Microsoft Windows EventLogs、stdin、HTTP 等。您还可以使用名为 Beats 的开源输入工具集合来收集事件。输入插件将事件发送到过滤器。filter
(过滤器)以某种方式解析和丰富事件。Logstash 拥有大量的过滤器插件,可以修改事件并将其传递给输出。例如,grok
过滤器将非结构化事件解析为字段,而mutate
过滤器则更改字段。过滤器按顺序执行。output
(输出)将过滤后的事件发送到一个或多个目的地。Logstash 支持广泛的输出插件,用于 OpenSearch、TCP/UDP、电子邮件、文件、stdout、HTTP、Nagios 等目的地。
输入和输出阶段都支持编解码器,以在事件进入或退出管道时对其进行处理。一些流行的编解码器是 json
和 multiline
。json
编解码器处理 JSON 格式的数据,而 multiline
编解码器将多行事件合并为单行。
您还可以在管道配置中编写条件语句,以便在满足特定条件时执行某些操作。
安装 Logstash
要在 OpenSearch 上安装 Logstash,请首先在您的集群上安装 Logstash,然后安装 OpenSearch Logstash 插件,具体步骤如下所述。
Tarball
确保已安装 Java Development Kit (JDK) 版本 8 或 11。
-
从 Logstash 下载页面下载 Logstash tarball。
-
在终端中导航到下载的文件夹并解压文件。确保您的 Logstash 版本和平台与下载的文件匹配
tar -zxvf logstash-8.8.2-linux-x86_64.tar.gz
-
导航到
logstash-8.8.2
目录。 -
使用以下命令安装插件
bin/logstash-plugin install logstash-output-opensearch
您将收到以下输出
Validating logstash-output-opensearch Resolving mixin dependencies Updating mixin dependencies logstash-mixin-ecs_compatibility_support Bundler attempted to update logstash-mixin-ecs_compatibility_support but its version stayed the same Installing logstash-output-opensearch Installation successful
您可以将管道配置添加到 config
目录。Logstash 将来自插件的所有数据保存在 data
目录中。bin
目录包含用于启动 Logstash 和管理插件的二进制文件。
Docker
-
按照 Logstash 下载页面所述,拉取最新的 Logstash 镜像。
docker pull docker.elastic.co/logstash/logstash:8.8.2
-
创建 Docker 网络
docker network create test
-
使用此网络启动 OpenSearch
docker run -p 9200:9200 -p 9600:9600 --name opensearch --net test -e "discovery.type=single-node" opensearchproject/opensearch:1.2.0
-
启动 Logstash
docker run -it --rm --name logstash --net test opensearchproject/logstash-oss-with-opensearch-output-plugin:7.16.2 -e 'input { stdin { } } output { opensearch { hosts => ["https://opensearch:9200"] index => "opensearch-logstash-docker-%{+YYYY.MM.dd}" user => "admin" password => "admin" ssl => true ssl_certificate_verification => false } }'
从终端处理文本
您可以定义一个管道,监听 stdin
上的事件并输出 stdout
上的事件。stdin
和 stdout
指的是您运行 Logstash 的终端。
要在终端中输入一些文本并查看输出中的事件数据
-
使用
-e
参数将管道配置直接传递给 Logstash 二进制文件。在这种情况下,stdin
是输入插件,stdout
是输出插件bin/logstash -e "input { stdin { } } output { stdout { } }"
添加
—debug
标志以查看更详细的输出。 -
在终端中输入“hello world”。Logstash 处理文本并将其输出回终端
{ "message" => "hello world", "host" => "a483e711a548.ant.amazon.com", "@timestamp" => 2021-05-30T05:15:56.816Z, "@version" => "1" }
message
字段包含您的原始输入。当您不在本地运行 Logstash 时,host
字段是 IP 地址。@timestamp
显示事件处理的日期和时间。Logstash 使用@version
字段进行内部处理。 -
按
Ctrl + C
关闭 Logstash。
故障排除
如果您已经有一个 Logstash 进程正在运行,您将收到错误。要解决此问题:
-
从
data
目录中删除.lock
文件cd data rm -rf .lock
-
重启 Logstash。
处理 JSON 或 HTTP 输入并将其输出到文件
定义一个处理 JSON 请求的管道
-
在您喜欢的任何文本编辑器中打开
config/pipeline.conf
文件。您可以创建带有任何扩展名的管道配置文件,.conf
扩展名是 Logstash 的约定。添加json
编解码器以接受 JSON 作为输入,并添加file
插件将处理后的事件输出到.txt
文件input { stdin { codec => json } } output { file { path => "output.txt" } }
要处理文件中的输入,请将输入文件添加到
events-data
目录,然后将其路径传递给输入中的file
插件input { file { path => "events-data/input_data.log" } }
-
启动 Logstash
$ bin/logstash -f config/pipeline.conf
config/pipeline.conf
是pipeline.conf
文件的相对路径。您也可以使用绝对路径。 -
在终端中添加一个 JSON 对象
{ "amount": 10, "quantity": 2}
管道仅处理单行输入。如果您粘贴跨越多行的 JSON,则会收到错误。
-
检查 JSON 对象中的字段是否已添加到
output.txt
文件中。$ cat output.txt { "@version": "1", "@timestamp": "2021-05-30T05:52:52.421Z", "host": "a483e711a548.ant.amazon.com", "amount": 10, "quantity": 2 }
如果您输入了无效的 JSON 作为输入,您将看到 JSON 解析错误。Logstash 不会丢弃无效的 JSON,因为您可能仍然想对其进行处理。例如,您可以触发电子邮件或向 Slack 频道发送通知。
定义一个处理 HTTP 请求的管道
-
使用
http
插件通过 HTTP 将事件发送到 Logstashinput { http { host => "127.0.0.1" port => 8080 } } output { file { path => "output.txt" } }
如果不指定任何选项,
http
插件将绑定到localhost
并监听 8080 端口。 -
启动 Logstash
$ bin/logstash -f config/pipeline.conf
-
使用 Postman 发送 HTTP 请求。将
Content-Type
设置为值为application/json
的 HTTP 头PUT 127.0.0.1:8080 { "amount": 10, "quantity": 2 }
或者,您可以使用
curl
命令curl -XPUT -H "Content-Type: application/json" -d ' {"amount": 7, "quantity": 3 }' https://:8080 (https://:8080/)
即使我们没有将
json
插件添加到输入中,管道配置仍然有效,因为 HTTP 插件会根据Content-Type
头自动应用相应的编解码器。如果您指定值为applications/json
,Logstash 会将请求正文解析为 JSON。headers
字段包含 Logstash 收到的 HTTP 头。{ "host": "127.0.0.1", "quantity": "3", "amount": 10, "@timestamp": "2021-05-30T06:05:48.135Z", "headers": { "http_version": "HTTP/1.1", "request_method": "PUT", "http_user_agent": "PostmanRuntime/7.26.8", "connection": "keep-alive", "postman_token": "c6cd29cf-1b37-4420-8db3-9faec66b9e7e", "http_host": "127.0.0.1:8080", "cache_control": "no-cache", "request_path": "/", "content_type": "application/json", "http_accept": "*/*", "content_length": "41", "accept_encoding": "gzip, deflate, br" }, "@version": "1" }
自动重新加载管道配置
您可以将 Logstash 配置为检测管道配置文件或输入日志文件的任何更改,并自动重新加载配置。
stdin
插件不支持自动重新加载。
-
将名为
start_position
的选项及其值beginning
添加到输入插件input { file { path => "/Users/<user>/Desktop/logstash7-12.1/events-data/input_file.log" start_position => "beginning" } }
Logstash 只处理添加到输入文件中的任何新事件,并忽略已处理的事件,以避免在重启时多次处理同一事件。
Logstash 将其进度记录在一个名为
sinceDB
的文件中。Logstash 为每个监视更改的文件创建一个sinceDB
文件。 -
打开
sinceDB
文件以检查输入文件的处理进度。cd data/plugins/inputs/file/ ls -al -rw-r--r-- 1 user staff 0 Jun 13 10:50 .sincedb_9e484f2a9e6c0d1bdfe6f23ac107ffc5 cat .sincedb_9e484f2a9e6c0d1bdfe6f23ac107ffc5 51575938 1 4 7727
sinceDB
文件中的最后一个数字 (7727) 是已处理的最后一个已知事件的字节偏移量。 -
要从头开始处理输入文件,请删除
sinceDB
文件rm .sincedb_*
-
使用
—-config.reload.automatic
参数启动 Logstash。bin/logstash -f config/pipeline.conf --config.reload.automatic
reload
选项仅在您在管道配置文件的末尾添加新行时才重新加载。示例输出
{ "message" => "216.243.171.38 - - [20/Sep/2017:19:11:52 +0200] \"GET /products/view/123 HTTP/1.1\" 200 12798 \"https://codingexplained.com/products\" \"Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)\"", "@version" => "1", "host" => "a483e711a548.ant.amazon.com", "path" => "/Users/kumarjao/Desktop/odfe1/logstash-7.12.1/events-data/input_file.log", "@timestamp" => 2021-06-13T18:03:30.423Z } { "message" => "91.59.108.75 - - [20/Sep/2017:20:11:43 +0200] \"GET /js/main.js HTTP/1.1\" 200 588 \"https://codingexplained.com/products/view/863\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:45.0) Gecko/20100101 Firefox/45.0\"", "@version" => "1", "host" => "a483e711a548.ant.amazon.com", "path" => "/Users/kumarjao/Desktop/odfe1/logstash-7.12.1/events-data/input_file.log", "@timestamp" => 2021-06-13T18:03:30.424Z }
- 在输入文件中添加新行。
- Logstash 立即检测到更改并将新行作为事件处理。
- 对
pipeline.conf
文件进行更改。- Logstash 立即检测到更改并重新加载修改后的管道。