> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-home-button.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> Distributed エンジンを使用するテーブルは自身のデータを保存しませんが、 複数のサーバーにまたがる分散クエリ処理を可能にします。読み取りは自動的に 並列化されます。読み取り時には、存在する場合、リモートサーバー上のテーブルの 索引が使用されます。

# Distributed テーブルエンジン

<Warning>
  **Cloud の Distributed エンジン**

  ClickHouse Cloud で Distributed テーブルエンジンを作成するには、[`remote` と `remoteSecure`](/ja/reference/functions/table-functions/remote) テーブル関数を使用できます。
  ClickHouse Cloud では `Distributed(...)` 構文は使用できません。
</Warning>

Distributed エンジンを使用するテーブルは自身のデータを保存しませんが、複数のサーバーにまたがる分散クエリ処理を可能にします。
読み取りは自動的に並列化されます。読み取り時には、存在する場合、リモートサーバー上のテーブルの索引が使用されます。

<div id="distributed-creating-a-table">
  ## テーブルの作成
</div>

```sql theme={null}
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]
```

<div id="distributed-from-a-table">
  ### テーブルから
</div>

`Distributed` テーブルが現在のサーバー上のテーブルを参照している場合は、そのテーブルのスキーマをそのまま利用できます。

```sql theme={null}
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]
```

<div id="distributed-parameters">
  ### Distributed パラメーター
</div>

| パラメーター                    | 説明                                                                                                                                                                                                                                                                                                                    |
| ------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `cluster`                 | サーバーの設定ファイル内のクラスター名                                                                                                                                                                                                                                                                                                   |
| `database`                | リモートデータベースの名前                                                                                                                                                                                                                                                                                                         |
| `table`                   | リモートテーブルの名前                                                                                                                                                                                                                                                                                                           |
| `sharding_key` (Optional) | シャーディングキー。<br /> `sharding_key` の指定は、次の場合に必要です。 <ul><li>分散テーブルへの `INSERT` の場合 (テーブルエンジンがデータをどのように分割するかを判断するために `sharding_key` を必要とするため) 。ただし、`insert_distributed_one_random_shard` 設定が有効な場合、`INSERT` にシャーディングキーは不要です。</li><li>`optimize_skip_unused_shards` を使用する場合。クエリ対象にする分片を判断するために `sharding_key` が必要です</li></ul> |
| `policy_name` (Optional)  | ポリシー名。バックグラウンド送信用の一時ファイルの保存に使用されます                                                                                                                                                                                                                                                                                    |

**関連項目**

* [distributed\_foreground\_insert](/ja/reference/settings/session-settings#distributed_foreground_insert) 設定
* 例については [MergeTree](/ja/reference/engines/table-engines/mergetree-family/mergetree#table_engine-mergetree-multiple-volumes) を参照してください

<div id="distributed-settings">
  ### Distributed の設定
</div>

| 設定                                         | 説明                                                                                                                                                           | デフォルト値  |
| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------- |
| `fsync_after_insert`                       | Distributed へのバックグラウンド `INSERT` 後に、ファイルデータに対して `fsync` を実行します。OS が **イニシエーター ノード** のディスク上のファイルに、挿入されたデータ全体を確実にフラッシュしたことを保証します。                               | `false` |
| `fsync_directories`                        | ディレクトリに対して `fsync` を実行します。Distributed テーブルへのバックグラウンド `INSERT` に関連する操作 (`INSERT` 後、データを分片へ送信した後など) のあとで、OS がディレクトリのメタデータを確実に更新したことを保証します。                     | `false` |
| `skip_unavailable_shards`                  | true の場合、ClickHouse は利用できない分片を自動的にスキップします。分片は次の場合に利用不可とマークされます。1) 接続障害により分片に到達できない。2) DNS で分片を名前解決できない。3) その分片上にテーブルが存在しない。                                  | `false` |
| `bytes_to_throw_insert`                    | バックグラウンド `INSERT` 用に保留される圧縮済みバイト数がこの値を超える場合、例外がスローされます。`0` の場合はスローしません。                                                                                      | `0`     |
| `bytes_to_delay_insert`                    | バックグラウンド `INSERT` 用に保留される圧縮済みバイト数がこの値を超える場合、クエリは遅延されます。`0` の場合は遅延しません。                                                                                       | `0`     |
| `max_delay_to_insert`                      | バックグラウンド送信の保留バイト数が多い場合に、Distributed テーブルへデータを `INSERT` する際の最大遅延時間 (秒) 。                                                                                      | `60`    |
| `background_insert_batch`                  | [`distributed_background_insert_batch`](/ja/reference/settings/session-settings#distributed_background_insert_batch) と同じです                                   | `0`     |
| `background_insert_split_batch_on_failure` | [`distributed_background_insert_split_batch_on_failure`](/ja/reference/settings/session-settings#distributed_background_insert_split_batch_on_failure) と同じです | `0`     |
| `background_insert_sleep_time_ms`          | [`distributed_background_insert_sleep_time_ms`](/ja/reference/settings/session-settings#distributed_background_insert_sleep_time_ms) と同じです                   | `0`     |
| `background_insert_max_sleep_time_ms`      | [`distributed_background_insert_max_sleep_time_ms`](/ja/reference/settings/session-settings#distributed_background_insert_max_sleep_time_ms) と同じです           | `0`     |
| `flush_on_detach`                          | `DETACH` / `DROP` / サーバーのシャットダウン時に、データをリモートノードへフラッシュします。                                                                                                     | `true`  |

<Note>
  **耐久性設定** (`fsync_...`):

  * 影響するのはバックグラウンド `INSERT` (つまり `distributed_foreground_insert=false`) のみです。これは、データが最初にイニシエーター ノードのディスクに保存され、その後バックグラウンドで分片へ送信される場合です。
  * `INSERT` のパフォーマンスが大幅に低下する可能性があります
  * Distributed テーブルのフォルダ内に保存されたデータを、**挿入を受け付けたノード** に書き込む処理に影響します。基盤となる MergeTree テーブルへの書き込み保証が必要な場合は、`system.merge_tree_settings` の耐久性設定 (`...fsync...`) を参照してください

  **挿入制限設定** (`..._insert`) については、以下も参照してください:

  * [`distributed_foreground_insert`](/ja/reference/settings/session-settings#distributed_foreground_insert) 設定
  * [`prefer_localhost_replica`](/ja/reference/settings/session-settings#prefer_localhost_replica) 設定
  * `bytes_to_throw_insert` は `bytes_to_delay_insert` より先に処理されるため、`bytes_to_delay_insert` より小さい値に設定しないでください
</Note>

**例**

```sql theme={null}
CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;
```

データは、`logs` クラスター内のすべてのサーバーにある `default.hits` テーブルから読み取られます。データは読み取られるだけでなく、可能な範囲でリモートサーバー上で部分的に処理も行われます。たとえば、`GROUP BY` を含むクエリでは、データはリモートサーバー上で集計され、集約関数の中間状態がリクエスト元のサーバーに送信されます。その後、データはさらに集計されます。

データベース名の代わりに、文字列を返す定数式を使用できます。例: `currentDatabase()`。

<div id="distributed-clusters">
  ## クラスター
</div>

クラスターは、[サーバー設定ファイル](/ja/concepts/features/configuration/server-config/configuration-files)で構成します。

```xml theme={null}
<remote_servers>
    <logs>
        <!-- 分散クエリ用のサーバー間クラスター単位のシークレット
             デフォルト: シークレットなし（認証は実行されません）

             設定した場合、分散クエリは分片上で検証されます。少なくとも以下の条件を満たす必要があります:
             - 該当クラスターが分片上に存在すること、
             - 該当クラスターが同じシークレットを持つこと。

             また（より重要な点として）、initial_user が
             クエリの実行ユーザーとして使用されます。
        -->
        <!-- <secret></secret> -->
        
        <!-- 省略可能。このクラスターで 分散 DDL クエリ（ON CLUSTER 句）を許可するかどうか。デフォルト: true（許可）。-->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->
        
        <shard>
            <!-- 省略可能。データ書き込み時の分片の重み。デフォルト: 1。-->
            <weight>1</weight>
            <!-- 省略可能。分片の名前。空でなく、クラスター内の分片間で一意である必要があります。指定しない場合は空になります。-->
            <name>shard_01</name>
            <!-- 省略可能。データをいずれか1つのレプリカにのみ書き込むかどうか。デフォルト: false（すべてのレプリカにデータを書き込む）。-->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- 省略可能。負荷分散におけるレプリカの優先度（load_balancing 設定も参照）。デフォルト: 1（値が小さいほど優先度が高い）。-->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>
```

ここでは、`logs` という名前のクラスターを定義しています。このクラスターは 2 つの分片で構成され、各分片には 2 つのレプリカがあります。分片とは、データの異なる部分を保持するサーバーのことです (すべてのデータを読み取るには、すべての分片にアクセスする必要があります) 。レプリカは同じデータを持つ複製サーバーです (すべてのデータを読み取る場合、各分片についていずれか 1 つのレプリカにアクセスすれば十分です) 。

クラスター名にドットを含めることはできません。

各サーバーには、`host`、`port`、および必要に応じて `user`、`password`、`secure`、`compression`、`bind_host` を指定します。

| パラメータ         | 説明                                                                                                                                                                       | デフォルト値    |
| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------- |
| `host`        | リモートサーバーのアドレスです。ドメイン、IPv4 アドレス、IPv6 アドレスのいずれかを使用できます。ドメインを指定すると、サーバーは起動時に DNS リクエストを実行し、その結果はサーバーの稼働中保持されます。DNS リクエストに失敗すると、サーバーは起動しません。DNS レコードを変更した場合は、サーバーを再起動してください。 | -         |
| `port`        | メッセージ交換用の TCP ポートです (config 内の `tcp_port`。通常は 9000 に設定されます) 。`http_port` と混同しないでください。                                                                                    | -         |
| `user`        | リモートサーバーへの接続に使用するユーザー名です。このユーザーには、指定したサーバーに接続するためのアクセス権が必要です。アクセスは `users.xml` ファイルで設定します。詳しくは、[アクセス権](/ja/concepts/features/security/access-rights) のセクションを参照してください。    | `default` |
| `password`    | リモートサーバーへの接続に使用するパスワードです (マスクされません) 。                                                                                                                                    | ''        |
| `secure`      | セキュアな SSL/TLS 接続を使用するかどうかを指定します。通常はポートの指定も必要です (デフォルトのセキュアポートは `9440` です) 。サーバーは `<tcp_port_secure>9440</tcp_port_secure>` で待ち受け、正しい証明書が設定されている必要があります。                  | `false`   |
| `compression` | データ圧縮を使用します。                                                                                                                                                             | `true`    |
| `bind_host`   | このノードからリモートサーバーへ接続する際に使用する送信元アドレスです。サポートされるのは IPv4 アドレスのみです。ClickHouse の分散クエリで使用される送信元 IP アドレスの設定が必要な、高度なデプロイメントのユースケースを想定しています。                                         | -         |

レプリカを指定すると、読み取り時には各分片ごとに利用可能なレプリカのうち 1 つが選択されます。負荷分散アルゴリズム (どのレプリカにアクセスするかの優先度) は設定できます。詳しくは [load\_balancing](/ja/reference/settings/session-settings#load_balancing) 設定を参照してください。サーバーとの接続が確立できない場合は、短いタイムアウトで接続が試行されます。接続に失敗すると次のレプリカが選択され、すべてのレプリカに対して同様に処理されます。すべてのレプリカへの接続試行が失敗した場合は、同じ方法で数回再試行されます。これはレジリエンスの向上には有効ですが、完全な耐障害性を保証するものではありません。リモートサーバーが接続を受け付けても、動作しなかったり、正常に動作しなかったりする可能性があるためです。

指定する分片は 1 つだけでもかまいません (この場合、クエリ処理は 分散 ではなく remote と呼ぶのが適切です) 。また、分片はいくつでも指定できます。各分片には、1 つ以上の任意の数のレプリカを指定できます。分片ごとに異なる数のレプリカを指定することもできます。

設定には、必要な数だけクラスターを指定できます。

クラスターを確認するには、`system.clusters` テーブルを使用します。

`Distributed` エンジンを使うと、クラスターをローカルサーバーのように扱えます。ただし、クラスターの設定を動的に指定することはできず、サーバー設定ファイルで設定する必要があります。通常、クラスター内のすべてのサーバーは同じクラスター設定を持ちます (必須ではありません) 。設定ファイル内のクラスターは、サーバーを再起動せずに動的に更新されます。

毎回、未知の分片とレプリカの集合にクエリを送る必要がある場合は、`Distributed` テーブルを作成する必要はありません。代わりに `remote` テーブル関数を使用してください。[Table functions](/ja/reference/functions/table-functions) セクションを参照してください。

<div id="distributed-writing-data">
  ## データの書き込み
</div>

クラスターにデータを書き込む方法は 2 つあります。

1 つ目は、どのデータをどのサーバーに書き込むかを定義し、各分片に対して直接書き込みを行う方法です。つまり、`Distributed` テーブルが参照しているクラスター内のリモートテーブルに対して、直接 `INSERT` ステートメントを実行します。これは最も柔軟な方法であり、対象領域の要件によって複雑になるような場合でも、任意のシャーディング方式を使用できます。また、データを異なる分片に完全に独立して書き込めるため、最も効率的な方法でもあります。

2 つ目は、`Distributed` テーブルに対して `INSERT` ステートメントを実行する方法です。この場合、テーブル自体が挿入されたデータを各サーバーに分散します。`Distributed` テーブルに書き込むには、`sharding_key` パラメータが設定されている必要があります (分片が 1 つしかない場合を除く) 。

各分片には、設定ファイルで `<weight>` を定義できます。デフォルトでは、重みは `1` です。データは、分片の重みに比例した量で各分片に分散されます。まずすべての分片の重みを合計し、次に各分片の重みを合計値で割って、それぞれの分片の比率を決定します。たとえば、2 つの分片があり、1 つ目の重みが 1、2 つ目の重みが 2 の場合、1 つ目には挿入された行の 3 分の 1 (1 / 3) 、2 つ目には 3 分の 2 (2 / 3) が送られます。

各分片には、設定ファイルで `internal_replication` パラメータを定義できます。このパラメータが `true` に設定されている場合、書き込み操作では最初に正常なレプリカが選択され、そこにデータが書き込まれます。`Distributed` テーブルの基になるテーブルがレプリケートテーブル (たとえば `Replicated*MergeTree` テーブルエンジンのいずれか) である場合は、これを使用してください。テーブルレプリカの 1 つが書き込みを受け取り、その後ほかのレプリカへ自動的にレプリケーションされます。

`internal_replication` が `false` (デフォルト) に設定されている場合、データはすべてのレプリカに書き込まれます。この場合、`Distributed` テーブル自体がデータをレプリケートします。これはレプリケートテーブルを使用するよりも劣ります。レプリカ間の整合性が確認されないため、時間の経過とともに、それぞれに含まれるデータがわずかに異なってくるからです。

データの 1 行が送られる分片を選択するには、シャーディング式を評価し、それを分片の総重みで割った余りを取ります。その行は、`prev_weights` から `prev_weights + weight` までの余りに対応する半区間に該当する分片へ送られます。ここで、`prev_weights` は番号の小さい分片の総重み、`weight` はこの分片の重みです。たとえば、2 つの分片があり、1 つ目の重みが 9、2 つ目の重みが 10 の場合、行は範囲 \[0, 9) の余りに対しては 1 つ目の分片に、範囲 \[9, 19) の余りに対しては 2 つ目の分片に送られます。

シャーディング式には、整数を返す、定数およびテーブルのカラムから成る任意の式を使用できます。たとえば、データをランダムに分散するために `rand()` を、ユーザー ID を割った余りで分散するために `UserID` を使用できます (この場合、1 人のユーザーのデータは 1 つの分片に配置されるため、ユーザー単位で `IN` や `JOIN` を実行しやすくなります) 。いずれかのカラムの分布が十分に均一でない場合は、`intHash64(UserID)` のようにハッシュ関数で包むことができます。

単純な除算の余りによる方法は、シャーディングの手法としては限定的であり、常に適切とは限りません。これは中規模から大規模のデータ量 (数十台のサーバー) では機能しますが、非常に大規模なデータ量 (数百台以上のサーバー) には適しません。後者の場合は、`Distributed` テーブルのエントリを使うのではなく、対象領域に必要なシャーディング方式を使用してください。

次のような場合には、シャーディング方式を慎重に検討する必要があります。

* 特定のキーによるデータの結合 (`IN` または `JOIN`) が必要なクエリを使用する場合。データがこのキーでシャーディングされていれば、`GLOBAL IN` や `GLOBAL JOIN` の代わりにローカルの `IN` や `JOIN` を使用でき、はるかに効率的です。
* 多数のサーバー (数百台以上) を使用し、かつ多数の小さなクエリを処理する場合。たとえば、個々のクライアント (Web サイト、広告主、またはパートナーなど) のデータに対するクエリです。小さなクエリがクラスター全体に影響しないようにするには、1 つのクライアントのデータを 1 つの分片に配置するのが合理的です。あるいは、二段階シャーディングを設定することもできます。つまり、クラスター全体を「レイヤー」に分割し、各レイヤーは複数の分片で構成される場合があります。1 つのクライアントのデータは 1 つのレイヤー上に配置されますが、必要に応じてそのレイヤーに分片を追加でき、データはその中でランダムに分散されます。各レイヤーごとに `Distributed` テーブルを作成し、グローバルクエリ用に 1 つの共有分散テーブルを作成します。

データはバックグラウンドで書き込まれます。テーブルに挿入されると、データブロックはローカルファイルシステムに書き込まれるだけです。データはその後、できるだけ早くバックグラウンドでリモートサーバーに送信されます。データ送信の間隔は、[distributed\_background\_insert\_sleep\_time\_ms](/ja/reference/settings/session-settings#distributed_background_insert_sleep_time_ms) および [distributed\_background\_insert\_max\_sleep\_time\_ms](/ja/reference/settings/session-settings#distributed_background_insert_max_sleep_time_ms) の設定で管理されます。`Distributed` エンジンは、挿入されたデータを含む各ファイルを個別に送信しますが、[distributed\_background\_insert\_batch](/ja/reference/settings/session-settings#distributed_background_insert_batch) 設定を使うと、ファイルのバッチ送信を有効にできます。この設定により、ローカルサーバーとネットワークのリソースをより効率的に活用できるため、クラスターのパフォーマンスが向上します。データが正常に送信されているかどうかは、テーブルディレクトリ `/var/lib/clickhouse/data/database/table/` 内のファイル一覧 (送信待ちのデータ) を確認してください。バックグラウンドタスクを実行するスレッド数は、[background\_distributed\_schedule\_pool\_size](/ja/reference/settings/server-settings/settings#background_distributed_schedule_pool_size) 設定で指定できます。

`Distributed` テーブルへの `INSERT` 後にサーバーが停止した、または異常終了した場合 (たとえばハードウェア障害による場合) 、挿入されたデータが失われる可能性があります。テーブルディレクトリで破損したデータパートが検出されると、それは `broken` サブディレクトリに移され、以後は使用されません。

<div id="distributed-reading-data">
  ## データの読み取り
</div>

`Distributed` テーブルに対してクエリを実行すると、`SELECT` クエリはすべての分片に送信され、データが分片間でどのように分散されていても機能します (完全にランダムに分散されていても問題ありません) 。新しい分片を追加しても、古いデータをその分片に移行する必要はありません。代わりに、より大きな重みを設定して新しいデータを書き込めます。データの分布はやや不均一になりますが、クエリは正しく効率的に動作します。

`max_parallel_replicas` オプションを有効にすると、1 つの分片内のすべてのレプリカにまたがってクエリ処理が並列化されます。詳しくは、[max\_parallel\_replicas](/ja/reference/settings/session-settings#max_parallel_replicas) のセクションを参照してください。

分散 `in` および `global in` クエリがどのように処理されるかについて詳しくは、[こちら](/ja/reference/statements/in#distributed-subqueries) のドキュメントを参照してください。

<div id="virtual-columns">
  ## 仮想カラム
</div>

<div id="_shard_num">
  #### \_Shard\_num
</div>

`_shard_num` — テーブル `system.clusters` の `shard_num` の値が格納されています。型: [UInt32](/ja/reference/data-types/int-uint)。

<Note>
  [`remote`](/ja/reference/functions/table-functions/remote) および \[`cluster](../../../sql-reference/table-functions/cluster.md) テーブル関数は内部で一時的な Distributed テーブルを作成するため、`\_shard\_num\` はそれらでも利用できます。
</Note>

**関連項目**

* [仮想カラム](/ja/reference/engines/table-engines#table_engines-virtual_columns) の説明
* [`background_distributed_schedule_pool_size`](/ja/reference/settings/server-settings/settings#background_distributed_schedule_pool_size) 設定
* [`shardNum()`](/ja/reference/functions/regular-functions/other-functions#shardNum) および [`shardCount()`](/ja/reference/functions/regular-functions/other-functions#shardCount) 関数
