Link Search Menu Expand Document Documentation Menu

csv

`csv` 处理器将事件中的逗号分隔值(CSV)解析为列。

配置

下表描述了可用于配置 `csv` 选项的选项。

选项 必需 类型 描述
source 字符串 事件中将被解析的字段。默认值为 `message`。
quote_character 字符串 用作单个数据列文本限定符的字符。默认值为 `"`。
delimiter 字符串 分隔每个列的字符。默认值为 `,`。
delete_header 布尔型 如果指定,事件头(`column_names_source_key`)在事件解析后被删除。如果没有事件头,则不执行任何操作。默认值为 true。
column_names_source_key 字符串 事件中指定 CSV 列名的字段,该字段将自动检测。如果需要额外的列名,则根据其索引自动生成列名。如果也定义了 `column_names`,则 `column_names_source_key` 中的标题也可以用于生成事件字段。如果此字段中指定的列太少,则自动生成剩余的列名。如果此字段中指定的列名太多,则 CSV 处理器会忽略多余的列名。
column_names 列表 用户为 CSV 列指定名称。如果 CSV 记录中没有数据列且未定义 `column_names_source_key`,则默认值为 `[column1, column2, ..., columnN]`。如果定义了 `column_names_source_key`,则 `column_names_source_key` 中的标题会生成事件字段。如果此字段中指定的列太少,则自动生成剩余的列名。如果此字段中指定的列名太多,则 CSV 处理器会忽略多余的列名。

用法

根据 CSV 列的格式,将以下示例添加到 `pipelines.yaml` 文件中。

用户指定的列名

以下 `pipelines.yaml` 示例配置指向名为 `ingest.csv` 的文件作为源。然后,`csv` 处理器使用 `column_names` 设置中指定的列名解析 `.csv` 文件中的数据,如下例所示

csv-pipeline:
  source:
    file:
      path: "/full/path/to/ingest.csv"
      record_type: "event"
  processor:
    - csv:
        column_names: ["col1", "col2"]
  sink:
    - stdout:

运行时,处理器将解析消息。尽管处理器设置中只指定了两个列名,但会自动生成第三个列名,因为 `ingest.csv` 中包含的数据包括三列:`1,2,3`

{"message": "1,2,3", "col1": "1", "col2": "2", "column3": "3"}

自动检测列名

以下配置会自动检测通过 `s3 source` 摄入的 CSV 文件的标题

csv-s3-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline:
          skip_lines: 1
          header_destination: "header"
      compression: none
      sqs:
        queue_url: "https://sqs.<region>.amazonaws.com/<account id>/<queue name>"
      aws:
        region: "<region>"
  processor:
    - csv:
        column_names_source_key: "header"
  sink:
    - stdout:

例如,如果附加了 Amazon Simple Queue Service (SQS) 队列的 Amazon Simple Storage Service (Amazon S3) 存储桶中的 `ingest.csv` 文件包含以下数据

Should,skip,this,line
a,b,c
1,2,3

然后 `csv` 处理器将处理以下事件

{"header": "a,b,c", "message": "1,2,3"}

然后,处理器将事件解析为以下输出。由于 `delete_header` 默认为 `true`,因此标题 `a,b,c` 将从输出中删除

{"message": "1,2,3", "a": "1", "b": "2", "c": "3"}

指标

下表描述了常见的抽象处理器指标。

指标名称 类型 描述
recordsIn 计数器 表示记录进入管道组件的指标。
recordsOut 计数器 表示记录从管道组件流出的指标。
timeElapsed 计时器 表示管道组件执行期间经过时间的指标。

`csv` 处理器包含以下自定义指标。

计数器

`csv` 处理器包含以下计数器指标

  • `csvInvalidEvents`:无效事件的数量,通常是由于事件本身中未闭合的引号引起的。当解析无效事件时,OpenSearch Data Prepper 会抛出异常。
剩余 350 字符

有问题?

想贡献?