> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-home-button.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> AvroConfluent 格式文档

# AvroConfluent

| 输入 | 输出 | 别名 |
| -- | -- | -- |
| ✔  | ✔  |    |

<div id="description">
  ## 描述
</div>

[Apache Avro](https://avro.apache.org/) 是一种面向行的序列化格式，使用二进制编码来高效处理数据。`AvroConfluent` 格式支持借助 [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html) (或与其 API 兼容的服务) 读取和写入采用 Avro 编码的消息。

每条消息都使用 Confluent 传输格式：一个魔数字节 (`0x00`) ，后跟一个 4 字节大端序的 schema ID，再后跟 Avro 二进制数据。读取时，ClickHouse 会通过查询 registry 来解析 schema ID。写入时，ClickHouse 会注册根据输出列推导出的 schema，并将生成的 ID 添加到每一行前面。为获得最佳性能，schema 会被缓存。

<a id="data-types-matching" />

<div id="data-type-mapping">
  ## 数据类型映射
</div>

下表列出了 Apache Avro format 支持的所有 data types，以及它们在 `INSERT` 和 `SELECT` queries 中对应的 ClickHouse [data types](/zh/reference/data-types)。

| Avro data type `INSERT`                     | ClickHouse data type                                                                                  | Avro data type `SELECT`          |
| ------------------------------------------- | ----------------------------------------------------------------------------------------------------- | -------------------------------- |
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/zh/reference/data-types/int-uint), [UInt(8\16\32)](/zh/reference/data-types/int-uint) | `int`                            |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/zh/reference/data-types/int-uint), [UInt64](/zh/reference/data-types/int-uint)               | `long`                           |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/zh/reference/data-types/float)                                                             | `float`                          |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/zh/reference/data-types/float)                                                             | `double`                         |
| `bytes`, `string`, `fixed`, `enum`          | [String](/zh/reference/data-types/string)                                                             | `bytes` 或 `string` \*            |
| `bytes`, `string`, `fixed`                  | [FixedString(N)](/zh/reference/data-types/fixedstring)                                                | `fixed(N)`                       |
| `enum`                                      | [Enum(8\16)](/zh/reference/data-types/enum)                                                           | `enum`                           |
| `array(T)`                                  | [Array(T)](/zh/reference/data-types/array)                                                            | `array(T)`                       |
| `map(V, K)`                                 | [Map(V, K)](/zh/reference/data-types/map)                                                             | `map(string, K)`                 |
| `union(null, T)`, `union(T, null)`          | [Nullable(T)](/zh/reference/data-types/date)                                                          | `union(null, T)`                 |
| `union(T1, T2, …)` \*\*                     | [Variant(T1, T2, …)](/zh/reference/data-types/variant)                                                | `union(T1, T2, …)` \*\*          |
| `null`                                      | [Nullable(Nothing)](/zh/reference/data-types/special-data-types/nothing)                              | `null`                           |
| `int (date)` \*\*\*                         | [Date](/zh/reference/data-types/date), [Date32](/zh/reference/data-types/date32)                      | `int (date)` \*\*\*              |
| `long (timestamp-millis)` \*\*\*            | [DateTime64(3)](/zh/reference/data-types/datetime)                                                    | `long (timestamp-millis)` \*\*\* |
| `long (timestamp-micros)` \*\*\*            | [DateTime64(6)](/zh/reference/data-types/datetime)                                                    | `long (timestamp-micros)` \*\*\* |
| `bytes (decimal)`  \*\*\*                   | [DateTime64(N)](/zh/reference/data-types/datetime)                                                    | `bytes (decimal)`  \*\*\*        |
| `int`                                       | [IPv4](/zh/reference/data-types/ipv4)                                                                 | `int`                            |
| `fixed(16)`                                 | [IPv6](/zh/reference/data-types/ipv6)                                                                 | `fixed(16)`                      |
| `bytes (decimal)` \*\*\*                    | [Decimal(P, S)](/zh/reference/data-types/decimal)                                                     | `bytes (decimal)` \*\*\*         |
| `string (uuid)` \*\*\*                      | [UUID](/zh/reference/data-types/uuid)                                                                 | `string (uuid)` \*\*\*           |
| `fixed(16)`                                 | [Int128/UInt128](/zh/reference/data-types/int-uint)                                                   | `fixed(16)`                      |
| `fixed(32)`                                 | [Int256/UInt256](/zh/reference/data-types/int-uint)                                                   | `fixed(32)`                      |
| `record`                                    | [Tuple](/zh/reference/data-types/tuple)                                                               | `record`                         |

* `bytes` is default, controlled by setting [`output_format_avro_string_column_pattern`](/zh/reference/settings/formats#output_format_avro_string_column_pattern)

\*\*  [Variant 类型](/zh/reference/data-types/variant) 会隐式地接受 `null` 作为字段值，因此，例如 Avro 的 `union(T1, T2, null)` 会被转换为 `Variant(T1, T2)`。
因此，当从 ClickHouse 生成 Avro 时，我们必须始终在 Avro 的 `union` 类型集合中包含 `null` 类型，因为在进行 schema inference 时，我们无法判断是否有某个值实际为 `null`。

\*\*\* [Avro 逻辑类型](https://avro.apache.org/docs/current/spec.html#Logical+Types)

不受支持的 Avro 逻辑类型：

* `time-millis`
* `time-micros`
* `duration`

<div id="format-settings">
  ## 格式设置
</div>

[//]: # "NOTE 这些设置也可以在会话级别设置，但这种用法并不常见，过于突出地说明这一点可能会让用户感到困惑。"

| Setting                                          | Description                                                                   | Default |
| ------------------------------------------------ | ----------------------------------------------------------------------------- | ------- |
| `input_format_avro_allow_missing_fields`         | 当 schema 中找不到某个字段时，是否使用默认值而不是报错。                                              | `0`     |
| `input_format_avro_null_as_default`              | 当向不可为空的列中插入 `null` 值时，是否使用默认值而不是报错。                                           | `0`     |
| `format_avro_schema_registry_url`                | Confluent Schema Registry 的 URL。对于基本身份验证，可以将经过 URL 编码的凭据直接包含在 URL 路径中。        |         |
| `format_avro_schema_registry_connection_timeout` | Schema Registry HTTP 客户端的连接超时时间 (秒) ，用于 schema 拉取和注册。必须大于 0 且小于 600 (10 分钟) 。 | `1`     |
| `format_avro_schema_registry_send_timeout`       | Schema Registry HTTP 客户端的发送超时时间 (秒) 。必须大于 0 且小于 600 (10 分钟) 。                 | `1`     |
| `format_avro_schema_registry_receive_timeout`    | Schema Registry HTTP 客户端的接收超时时间 (秒) 。必须大于 0 且小于 600 (10 分钟) 。                 | `1`     |
| `output_format_avro_confluent_subject`           | 用于输出：schema 在 Schema Registry 中注册时使用的 subject 名称。写入时必填。                       |         |
| `output_format_avro_string_column_pattern`       | 用于输出：将 String 列序列化为 Avro `string` 的正则表达式 (默认为 `bytes`) 。                      |         |

<div id="examples">
  ## 示例
</div>

<div id="reading-from-kafka">
  ### 从 Kafka 读取
</div>

要使用 [Kafka 表引擎](/zh/reference/engines/table-engines/integrations/kafka) 读取采用 Avro 编码的 Kafka topic，请使用 `format_avro_schema_registry_url` 设置指定 Schema Registry 的 URL。

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;
```

<div id="writing-to-kafka">
  ### 写入 Kafka
</div>

要将 AvroConfluent 消息写入 Kafka topic，需要同时设置 Schema Registry URL 和 subject 名称。首次写入时，schema 会自动注册到 registry。

```sql theme={null}
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');
```

<div id="using-basic-authentication">
  #### 使用基本身份验证
</div>

如果您的 Schema Registry 需要使用基本身份验证 (例如您使用的是 Confluent Cloud) ，可以在 `format_avro_schema_registry_url` 设置中提供经过 URL 编码的凭据。

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
```

<div id="troubleshooting">
  ## 故障排除
</div>

要监控摄取进度并排查 Kafka 消费者错误，可以查询 [`system.kafka_consumers` 系统表](/zh/reference/system-tables/kafka_consumers)。如果你的部署包含多个副本 (例如 ClickHouse Cloud) ，则必须使用 [`clusterAllReplicas`](/zh/reference/functions/table-functions/cluster) 表函数。

```sql theme={null}
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
```

如果遇到 schema 解析方面的问题，可以使用 [kafkacat](https://github.com/edenhill/kafkacat) 配合 [clickhouse-local](/zh/concepts/features/tools-and-utilities/clickhouse-local) 进行故障排查：

```bash theme={null}
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
```
