> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-home-button.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> AvroConfluent フォーマットに関するドキュメント

# AvroConfluent

| 入力 | 出力 | エイリアス |
| -- | -- | ----- |
| ✔  | ✔  |       |

<div id="description">
  ## 説明
</div>

[Apache Avro](https://avro.apache.org/) は、効率的なデータ処理のためにバイナリエンコーディングを使用する、行指向のシリアライゼーションフォーマットです。`AvroConfluent` フォーマットは、[Confluent スキーマレジストリ](https://docs.confluent.io/current/schema-registry/index.html) (または API 互換のサービス) を使用した、Avro エンコード済みメッセージの読み書きをサポートします。

各メッセージは Confluent のワイヤ形式に従います。これは、マジックバイト (`0x00`) に続いて 4 バイトのビッグエンディアンのスキーマ ID が配置され、その後に Avro のバイナリデータが続く形式です。読み取り時には、ClickHouse はレジストリに問い合わせてスキーマ ID を解決します。書き込み時には、ClickHouse は出力カラムから導出したスキーマを登録し、生成された ID を各行の先頭に付加します。最適なパフォーマンスを得るため、スキーマはキャッシュされます。

<a id="data-types-matching" />

<div id="data-type-mapping">
  ## データ型マッピング
</div>

次の表は、Apache Avro フォーマットでサポートされるすべてのデータ型と、`INSERT` および `SELECT` クエリでそれぞれに対応する ClickHouse の[データ型](/ja/reference/data-types)を示しています。

| Avro data type `INSERT`                     | ClickHouse data type                                                                                  | Avro data type `SELECT`          |
| ------------------------------------------- | ----------------------------------------------------------------------------------------------------- | -------------------------------- |
| `boolean`, `int`, `long`, `float`, `double` | [Int(8\16\32)](/ja/reference/data-types/int-uint), [UInt(8\16\32)](/ja/reference/data-types/int-uint) | `int`                            |
| `boolean`, `int`, `long`, `float`, `double` | [Int64](/ja/reference/data-types/int-uint), [UInt64](/ja/reference/data-types/int-uint)               | `long`                           |
| `boolean`, `int`, `long`, `float`, `double` | [Float32](/ja/reference/data-types/float)                                                             | `float`                          |
| `boolean`, `int`, `long`, `float`, `double` | [Float64](/ja/reference/data-types/float)                                                             | `double`                         |
| `bytes`, `string`, `fixed`, `enum`          | [String](/ja/reference/data-types/string)                                                             | `bytes` または `string` \*          |
| `bytes`, `string`, `fixed`                  | [FixedString(N)](/ja/reference/data-types/fixedstring)                                                | `fixed(N)`                       |
| `enum`                                      | [Enum(8\16)](/ja/reference/data-types/enum)                                                           | `enum`                           |
| `array(T)`                                  | [Array(T)](/ja/reference/data-types/array)                                                            | `array(T)`                       |
| `map(V, K)`                                 | [Map(V, K)](/ja/reference/data-types/map)                                                             | `map(string, K)`                 |
| `union(null, T)`, `union(T, null)`          | [Nullable(T)](/ja/reference/data-types/date)                                                          | `union(null, T)`                 |
| `union(T1, T2, …)` \*\*                     | [Variant(T1, T2, …)](/ja/reference/data-types/variant)                                                | `union(T1, T2, …)` \*\*          |
| `null`                                      | [Nullable(Nothing)](/ja/reference/data-types/special-data-types/nothing)                              | `null`                           |
| `int (date)` \*\*\*                         | [Date](/ja/reference/data-types/date), [Date32](/ja/reference/data-types/date32)                      | `int (date)` \*\*\*              |
| `long (timestamp-millis)` \*\*\*            | [DateTime64(3)](/ja/reference/data-types/datetime)                                                    | `long (timestamp-millis)` \*\*\* |
| `long (timestamp-micros)` \*\*\*            | [DateTime64(6)](/ja/reference/data-types/datetime)                                                    | `long (timestamp-micros)` \*\*\* |
| `bytes (decimal)`  \*\*\*                   | [DateTime64(N)](/ja/reference/data-types/datetime)                                                    | `bytes (decimal)`  \*\*\*        |
| `int`                                       | [IPv4](/ja/reference/data-types/ipv4)                                                                 | `int`                            |
| `fixed(16)`                                 | [IPv6](/ja/reference/data-types/ipv6)                                                                 | `fixed(16)`                      |
| `bytes (decimal)` \*\*\*                    | [Decimal(P, S)](/ja/reference/data-types/decimal)                                                     | `bytes (decimal)` \*\*\*         |
| `string (uuid)` \*\*\*                      | [UUID](/ja/reference/data-types/uuid)                                                                 | `string (uuid)` \*\*\*           |
| `fixed(16)`                                 | [Int128/UInt128](/ja/reference/data-types/int-uint)                                                   | `fixed(16)`                      |
| `fixed(32)`                                 | [Int256/UInt256](/ja/reference/data-types/int-uint)                                                   | `fixed(32)`                      |
| `record`                                    | [Tuple](/ja/reference/data-types/tuple)                                                               | `record`                         |

* `bytes` がデフォルトで、設定 [`output_format_avro_string_column_pattern`](/ja/reference/settings/formats#output_format_avro_string_column_pattern) で制御されます

\*\*  [Variant 型](/ja/reference/data-types/variant) は `null` をフィールド値として暗黙的に受け入れるため、たとえば Avro の `union(T1, T2, null)` は `Variant(T1, T2)` に変換されます。
そのため、ClickHouse から Avro を生成する際には、Avro の `union` 型の候補に常に `null` 型を含める必要があります。これは、スキーマ推論の時点では、実際に `null` となる値があるかどうか分からないためです。

\*\*\* [Avro 論理型](https://avro.apache.org/docs/current/spec.html#Logical+Types)

サポートされていない Avro 論理データ型:

* `time-millis`
* `time-micros`
* `duration`

<div id="format-settings">
  ## フォーマット設定
</div>

[//]: # "NOTE これらの設定はセッションレベルでも設定できますが、一般的ではないため、あまり目立つように記載するとユーザーを混乱させる可能性があります。"

| Setting                                          | Description                                                                           | Default |
| ------------------------------------------------ | ------------------------------------------------------------------------------------- | ------- |
| `input_format_avro_allow_missing_fields`         | フィールドがスキーマ内に見つからない場合に、エラーを発生させる代わりにデフォルト値を使用するかどうか。                                   | `0`     |
| `input_format_avro_null_as_default`              | `null` 値を非 Nullable のカラムに挿入する際に、エラーを発生させる代わりにデフォルト値を使用するかどうか。                         | `0`     |
| `format_avro_schema_registry_url`                | Confluent スキーマレジストリ の URL。基本認証では、URL エンコードされた認証情報を URL パスに直接含めることができます。               |         |
| `format_avro_schema_registry_connection_timeout` | スキーマレジストリ HTTP クライアントの接続タイムアウト (スキーマ取得と登録の両方で使用) の秒数。0 より大きく、600 (10 分) 未満である必要があります。 | `1`     |
| `format_avro_schema_registry_send_timeout`       | スキーマレジストリ HTTP クライアントの送信タイムアウトの秒数。0 より大きく、600 (10 分) 未満である必要があります。                    | `1`     |
| `format_avro_schema_registry_receive_timeout`    | スキーマレジストリ HTTP クライアントの受信タイムアウトの秒数。0 より大きく、600 (10 分) 未満である必要があります。                    | `1`     |
| `output_format_avro_confluent_subject`           | 出力用: スキーマレジストリでスキーマを登録する subject 名。書き込み時に必須です。                                        |         |
| `output_format_avro_string_column_pattern`       | 出力用: Avro `string` としてシリアライズする String カラムの正規表現 (デフォルトは `bytes`) 。                     |         |

<div id="examples">
  ## 例
</div>

<div id="reading-from-kafka">
  ### Kafka からの読み込み
</div>

[Kafka table engine](/ja/reference/engines/table-engines/integrations/kafka) を使用して Avro でエンコードされた Kafka トピックを読み込むには、`format_avro_schema_registry_url` 設定でスキーマレジストリの URL を指定します。

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url';

SELECT * FROM topic1_stream;
```

<div id="writing-to-kafka">
  ### Kafka への書き込み
</div>

AvroConfluent メッセージを Kafka トピック に書き込むには、スキーマレジストリの URL と subject 名の両方を設定します。スキーマは最初の書き込み時に、自動的にレジストリへ登録されます。

```sql theme={null}
CREATE TABLE topic1_sink
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'http://schema-registry-url',
output_format_avro_confluent_subject = 'topic1-value';

INSERT INTO topic1_sink VALUES ('hello', 'world');
```

<div id="using-basic-authentication">
  #### Basic認証を使用する
</div>

スキーマレジストリでBasic認証が必要な場合 (例: Confluent Cloud を使用している場合) 、`format_avro_schema_registry_url` 設定に URL エンコードされた認証情報を指定できます。

```sql theme={null}
CREATE TABLE topic1_stream
(
    field1 String,
    field2 String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-broker',
kafka_topic_list = 'topic1',
kafka_group_name = 'group1',
kafka_format = 'AvroConfluent',
format_avro_schema_registry_url = 'https://<username>:<password>@schema-registry-url';
```

<div id="troubleshooting">
  ## トラブルシューティング
</div>

インジェストの進行状況を監視し、Kafkaコンシューマーのエラーをデバッグするには、[`system.kafka_consumers` システムテーブル](/ja/reference/system-tables/kafka_consumers)をクエリします。デプロイ環境に複数のレプリカがある場合 (例: ClickHouse Cloud) は、[`clusterAllReplicas`](/ja/reference/functions/table-functions/cluster) テーブル関数を使用する必要があります。

```sql theme={null}
SELECT * FROM clusterAllReplicas('default',system.kafka_consumers)
ORDER BY assignments.partition_id ASC;
```

スキーマ解決で問題が発生した場合は、[kafkacat](https://github.com/edenhill/kafkacat) と [clickhouse-local](/ja/concepts/features/tools-and-utilities/clickhouse-local) を使ってトラブルシューティングできます。

```bash theme={null}
$ kafkacat -b kafka-broker  -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local   --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String"  -q 'select *  from table'
1 a
2 b
3 c
```
