> ## Documentation Index
> Fetch the complete documentation index at: https://private-7c7dfe99-home-button.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

> 处理重复行和已删除的行。

# 去重策略（使用 CDC）

export const Image = ({img, alt, size}) => {
  return <Frame>
      <img src={img} alt={alt} />
    </Frame>;
};

由于 ClickHouse 的数据存储结构和复制机制，从 Postgres 复制到 ClickHouse 的更新和删除操作会在 ClickHouse 中产生重复行。本文将介绍其原因，以及在 ClickHouse 中处理重复数据的策略。

<div id="how-does-data-get-replicated">
  ## 数据如何复制？
</div>

<div id="PostgreSQL-logical-decoding">
  ### PostgreSQL 逻辑解码
</div>

ClickPipes 使用 [Postgres Logical Decoding](https://www.pgedge.com/blog/logical-replication-evolution-in-chronological-order-clustering-solution-built-around-logical-replication) 来实时捕获 Postgres 中发生的变更。Postgres 中的 Logical Decoding 过程使 ClickPipes 这类客户端能够以便于人类阅读的格式接收这些变更，也就是一系列 INSERT、UPDATE 和 DELETE。

<div id="replacingmergetree">
  ### ReplacingMergeTree
</div>

ClickPipes 使用 [ReplacingMergeTree](/zh/reference/engines/table-engines/mergetree-family/replacingmergetree) 引擎将 Postgres 表映射到 ClickHouse。ClickHouse 在仅追加型工作负载下表现最佳，因此不建议频繁执行 UPDATE。这正是 ReplacingMergeTree 特别适用的场景。

使用 ReplacingMergeTree 时，更新会表示为插入该行的一个较新版本 (`_peerdb_version`) ；而删除则表示为插入该行的一个较新版本，并将 `_peerdb_is_deleted` 标记为 true。ReplacingMergeTree 引擎会在后台对数据进行去重和合并，并为给定主键 (id) 保留该行的最新版本，从而能够以版本化插入的方式高效处理 UPDATE 和 DELETE。

下面是 ClickPipes 在 ClickHouse 中创建表时执行的 CREATE TABLE 语句示例。

```sql theme={null}
CREATE TABLE users
(
    `id` Int32,
    `reputation` String,
    `creationdate` DateTime64(6),
    `displayname` String,
    `lastaccessdate` DateTime64(6),
    `aboutme` String,
    `views` Int32,
    `upvotes` Int32,
    `downvotes` Int32,
    `websiteurl` String,
    `location` String,
    `accountid` Int32,
    `_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
    `_peerdb_is_deleted` Int8,
    `_peerdb_version` Int64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
PRIMARY KEY id
ORDER BY id;
```

<div id="illustrative-example">
  ### 示例说明
</div>

下图通过一个基础示例说明了如何使用 ClickPipes 在 PostgreSQL 和 ClickHouse 之间同步 `users` 表。

<Image img="https://mintcdn.com/private-7c7dfe99-home-button/u7z0gNe6GWCJXTn9/images/integrations/data-ingestion/clickpipes/postgres/postgres-cdc-initial-load.png?fit=max&auto=format&n=u7z0gNe6GWCJXTn9&q=85&s=39574171b75f3ba03ebbde2d6d27f41a" alt="ClickPipes 初始加载" size="lg" width="3840" height="2160" data-path="images/integrations/data-ingestion/clickpipes/postgres/postgres-cdc-initial-load.png" />

**步骤 1** 展示了 PostgreSQL 中 2 行数据的初始快照，以及 ClickPipes 将这 2 行数据初始加载到 ClickHouse 的过程。可以看到，这两行数据都会原样复制到 ClickHouse。

**步骤 2** 展示了对 `users` 表执行的三个操作：插入一行新数据、更新一行现有数据，以及删除另一行数据。

**步骤 3** 展示了 ClickPipes 如何将 INSERT、UPDATE 和 DELETE 操作以带版本的插入方式复制到 ClickHouse。UPDATE 会表现为 ID 2 对应行的一个新版本，而 DELETE 会表现为 ID 1 的一个新版本，并通过 `_is_deleted` 标记为 true。因此，与 PostgreSQL 相比，ClickHouse 中会多出三行数据。

因此，运行类似 `SELECT count(*) FROM users;` 这样的简单查询时，ClickHouse 和 PostgreSQL 中的结果可能会不同。根据 [ClickHouse merge 文档](/zh/concepts/core-concepts/merges#replacing-merges)，过期的行版本最终会在合并过程中被丢弃。但是，合并发生的时机无法预测，这意味着在合并完成之前，ClickHouse 中的查询可能会返回不一致的结果。

如何确保 ClickHouse 和 PostgreSQL 中的查询结果完全一致？

<div id="deduplicate-using-final-keyword">
  ### 使用 FINAL 关键字去重
</div>

在 ClickHouse 查询中，推荐的数据去重方式是使用 [FINAL 修饰符。](/zh/reference/statements/select/from#final-modifier) 这样可确保只返回去重后的行。

下面来看看如何将它应用到三种不同的查询中。

*请注意以下查询中的 WHERE 子句，它用于过滤掉已删除的行。*

* **简单计数查询**：统计 posts 的数量。

这是你可以运行的最简单查询，用于检查同步是否已顺利完成。这两个查询应返回相同的计数结果。

```sql theme={null}
-- PostgreSQL
SELECT count(*) FROM posts;

-- ClickHouse 
SELECT count(*) FROM posts FINAL WHERE _peerdb_is_deleted=0;
```

* **使用 JOIN 的简单聚合**：累计浏览量最高的前 10 位用户。

这是一个针对单表进行聚合的示例。如果这里存在重复数据，会严重影响 sum 函数的结果。

```sql highlight={8,22} theme={null}
-- PostgreSQL 
SELECT
    sum(p.viewcount) AS viewcount,
    p.owneruserid AS user_id,
    u.displayname AS display_name
FROM posts p
LEFT JOIN users u ON u.id = p.owneruserid
WHERE p.owneruserid > 0
GROUP BY user_id, display_name
ORDER BY viewcount DESC
LIMIT 10;

-- ClickHouse 
SELECT
    sum(p.viewcount) AS viewcount,
    p.owneruserid AS user_id,
    u.displayname AS display_name
FROM posts AS p
FINAL
LEFT JOIN users AS u
FINAL ON (u.id = p.owneruserid) AND (u._peerdb_is_deleted = 0)
WHERE (p.owneruserid > 0) AND (p._peerdb_is_deleted = 0)
GROUP BY
    user_id,
    display_name
ORDER BY viewcount DESC
LIMIT 10
```

<div id="final-setting">
  #### FINAL 设置
</div>

你无需在查询中为每个表名都添加 FINAL 修饰符，而是可以使用 [FINAL 设置](/zh/reference/settings/session-settings#final)，将其自动应用到查询中的所有表。

此设置既可按查询应用，也可应用于整个会话。

```sql theme={null}
-- 按查询设置 FINAL
SELECT count(*) FROM posts SETTINGS FINAL = 1;

-- 为会话设置 FINAL
SET final = 1;
SELECT count(*) FROM posts; 
```

<div id="row-policy">
  #### ROW policy
</div>

隐藏冗余的 `_peerdb_is_deleted = 0` 过滤器，一个简单的方法是使用 [ROW policy。](/zh/concepts/features/security/access-rights#row-policy-management) 下面是一个示例：创建行策略，从而在对表 votes 的所有查询中排除已删除的行。

```sql theme={null}
-- 对所有用户应用行策略
CREATE ROW POLICY cdc_policy ON votes FOR SELECT USING _peerdb_is_deleted = 0 TO ALL;
```

> 行策略会应用于一组用户和角色。在此示例中，它会应用于所有用户和角色。也可以将其调整为仅应用于特定用户或角色。

<div id="query-like-with-postgres">
  ### 像在 Postgres 中那样查询
</div>

将分析型数据集从 PostgreSQL 迁移到 ClickHouse 时，通常需要修改应用程序中的查询，以适应数据处理和查询执行上的差异。

本节将介绍如何在保持原有查询不变的情况下对数据进行去重。

<div id="views">
  #### 视图
</div>

[视图](/zh/reference/statements/create/view#normal-view) 是隐藏查询中 FINAL 关键字的一个好方法，因为它们不存储任何数据，只是在每次访问时从另一张表读取。

下面是一个示例，展示如何在 ClickHouse 中为数据库中的每张表创建包含 FINAL 关键字并过滤已删除行的视图。

```sql theme={null}
CREATE VIEW posts_view AS SELECT * FROM posts FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW users_view AS SELECT * FROM users FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW votes_view AS SELECT * FROM votes FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW comments_view AS SELECT * FROM comments FINAL WHERE _peerdb_is_deleted=0;
```

然后，我们可以使用与在 PostgreSQL 中相同的查询语句来查询这些视图。

```sql theme={null}
-- 浏览量最高的帖子
SELECT
    sum(viewcount) AS viewcount,
    owneruserid
FROM posts_view
WHERE owneruserid > 0
GROUP BY owneruserid
ORDER BY viewcount DESC
LIMIT 10
```

<div id="refreshable-material-view">
  #### 可刷新materialized view
</div>

另一种方法是使用[可刷新materialized view](/zh/concepts/features/materialized-views/refreshable-materialized-view)，它支持按计划执行查询，对行去重，并将结果存储到目标表中。每次按计划刷新时，目标表都会被最新的查询结果替换。

这种方法的主要优势在于，带有 FINAL 关键字的查询只会在刷新时执行一次，因此后续针对目标表的查询无需再使用 FINAL。

不过，它的缺点是目标表中的数据只能保持在最近一次刷新的状态。也就是说，对于许多用例来说，几分钟到几小时的刷新间隔可能已经足够。

```sql theme={null}
-- 创建去重后的 posts 表 
CREATE TABLE deduplicated_posts AS posts;

-- 创建 Materialized view 并设置为每小时刷新一次
CREATE MATERIALIZED VIEW deduplicated_posts_mv REFRESH EVERY 1 HOUR TO deduplicated_posts AS 
SELECT * FROM posts FINAL WHERE _peerdb_is_deleted=0 
```

然后，您就可以像平常一样查询 `deduplicated_posts` 表了。

```sql theme={null}
SELECT
    sum(viewcount) AS viewcount,
    owneruserid
FROM deduplicated_posts
WHERE owneruserid > 0
GROUP BY owneruserid
ORDER BY viewcount DESC
LIMIT 10;
```
