csv
`csv` 处理器将事件中的逗号分隔值(CSV)解析为列。
配置
下表描述了可用于配置 `csv` 选项的选项。
选项 | 必需 | 类型 | 描述 |
---|---|---|---|
source | 否 | 字符串 | 事件中将被解析的字段。默认值为 `message`。 |
quote_character | 否 | 字符串 | 用作单个数据列文本限定符的字符。默认值为 `"`。 |
delimiter | 否 | 字符串 | 分隔每个列的字符。默认值为 `,`。 |
delete_header | 否 | 布尔型 | 如果指定,事件头(`column_names_source_key`)在事件解析后被删除。如果没有事件头,则不执行任何操作。默认值为 true。 |
column_names_source_key | 否 | 字符串 | 事件中指定 CSV 列名的字段,该字段将自动检测。如果需要额外的列名,则根据其索引自动生成列名。如果也定义了 `column_names`,则 `column_names_source_key` 中的标题也可以用于生成事件字段。如果此字段中指定的列太少,则自动生成剩余的列名。如果此字段中指定的列名太多,则 CSV 处理器会忽略多余的列名。 |
column_names | 否 | 列表 | 用户为 CSV 列指定名称。如果 CSV 记录中没有数据列且未定义 `column_names_source_key`,则默认值为 `[column1, column2, ..., columnN]`。如果定义了 `column_names_source_key`,则 `column_names_source_key` 中的标题会生成事件字段。如果此字段中指定的列太少,则自动生成剩余的列名。如果此字段中指定的列名太多,则 CSV 处理器会忽略多余的列名。 |
用法
根据 CSV 列的格式,将以下示例添加到 `pipelines.yaml` 文件中。
用户指定的列名
以下 `pipelines.yaml` 示例配置指向名为 `ingest.csv` 的文件作为源。然后,`csv` 处理器使用 `column_names` 设置中指定的列名解析 `.csv` 文件中的数据,如下例所示
csv-pipeline:
source:
file:
path: "/full/path/to/ingest.csv"
record_type: "event"
processor:
- csv:
column_names: ["col1", "col2"]
sink:
- stdout:
运行时,处理器将解析消息。尽管处理器设置中只指定了两个列名,但会自动生成第三个列名,因为 `ingest.csv` 中包含的数据包括三列:`1,2,3`
{"message": "1,2,3", "col1": "1", "col2": "2", "column3": "3"}
自动检测列名
以下配置会自动检测通过 `s3 source` 摄入的 CSV 文件的标题
csv-s3-pipeline:
source:
s3:
notification_type: "sqs"
codec:
newline:
skip_lines: 1
header_destination: "header"
compression: none
sqs:
queue_url: "https://sqs.<region>.amazonaws.com/<account id>/<queue name>"
aws:
region: "<region>"
processor:
- csv:
column_names_source_key: "header"
sink:
- stdout:
例如,如果附加了 Amazon Simple Queue Service (SQS) 队列的 Amazon Simple Storage Service (Amazon S3) 存储桶中的 `ingest.csv` 文件包含以下数据
Should,skip,this,line
a,b,c
1,2,3
然后 `csv` 处理器将处理以下事件
{"header": "a,b,c", "message": "1,2,3"}
然后,处理器将事件解析为以下输出。由于 `delete_header` 默认为 `true`,因此标题 `a,b,c` 将从输出中删除
{"message": "1,2,3", "a": "1", "b": "2", "c": "3"}
指标
下表描述了常见的抽象处理器指标。
指标名称 | 类型 | 描述 |
---|---|---|
recordsIn | 计数器 | 表示记录进入管道组件的指标。 |
recordsOut | 计数器 | 表示记录从管道组件流出的指标。 |
timeElapsed | 计时器 | 表示管道组件执行期间经过时间的指标。 |
`csv` 处理器包含以下自定义指标。
计数器
`csv` 处理器包含以下计数器指标
- `csvInvalidEvents`:无效事件的数量,通常是由于事件本身中未闭合的引号引起的。当解析无效事件时,OpenSearch Data Prepper 会抛出异常。