33.flink cdc 实时数据同步利器
创始人
2024-04-28 00:14:12
0

什么是flink cdc?

对很多初入门的人来说是无法理解cdc到底是什么个东西。 有这样一个需求,比如在mysql数据库中存在很多数据,但是公司要把mysql中的数据同步到数据仓库(starrocks), 数据仓库你可以理解为存储了各种各样来自不同数据库中表。
数据的同步目前对mysql来说比较常见是方式是使用:datax 和 canal配合, 为什么需要这两个框架配合呢?
因为datax不支持实时的同步, datax只能定义一个范围去同步,而且同步结束后程序就结束了。但是我想要的是数据仓库中的数据近乎实时的和mysql中的数据保持一致又该怎么办? 答案是再加上canal, canal和datax相反,它只支持指定一个binlog同步,然后会一直同步到现在,并且程序不会结束,会一直同步。 这样datax+canal就可以达到实时同步的功能。
这是业界比较常用的同步方式,datax同步历史数据,canal+kafka同步最新的数据,而且还要有一个程序去读取kafka中的binlog json数据(可以用flink或者spark又或者是flume)。可以看到这个链路比较长,不是很好。
下面是目前常见的cdc同步方案以及对比:
在这里插入图片描述

  1. DataX 不支持增量同步,Canal 不支持全量同步。虽然两者都是非常流行的数据同步工具,但
    在场景支持上仍不完善。
  2. 在全量+增量一体化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
  3. 在架构方面,Apache Flink 是一个非常优秀的分布式流处理框架,因此 Flink CDC 作为
    Apache Flink 的一个组件具有非常灵活的水平扩展能力。而 DataX 和 Canal 是个单机架构,
    在大数据场景下容易面临性能瓶颈的问题。
  4. 在数据加工的能力上,CDC 工具是否能够方便地对数据做一些清洗、过滤、聚合,甚至关联打
    宽? Flink CDC 依托强大的 Flink SQL 流式计算能力,可以非常方便地对数据进行加工。而
    Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。
  5. 另外,在生态方面,这里指的是上下游存储的支持。Flink CDC 上下游非常丰富,支持对接
    MySQL、PostgreSQL 等数据源,还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统
    中,也支持灵活的自定义 connector。
  6. 我们看到flink cdc 是比较友好的方案, 其内部实现上用的是Debezium去采集binlong, 而且可通过参数scan.startup.mode 来控制同步行为:
  1. initial (默认):在第一次启动时对受监视的数据库表执行全量同步,并继续读取最新的 binlog。
  2. earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  3. latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  4. specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  5. timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

一个demo

对flink_01 和flink_02 进行两个分表进行同步合并到:flink_merge


CREATE TABLE `flink_01` (`indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',`indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',`indicator_code` int NOT NULL COMMENT '指标编码',`table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',`window_start` datetime NOT NULL COMMENT '窗口开始时间',`window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',`create_time` datetime DEFAULT NULL COMMENT '创建更新时间',`indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62', 0, 'app_login_log', '2022-12-16 00:00:00', '2022-12-17 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '3', 0, 'app_login_log', '2022-12-17 00:00:00', '2022-12-18 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '37', 0, 'app_login_log', '2022-12-19 00:00:00', '2022-12-20 00:00:00', '2022-12-20 11:22:02', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '20', 0, 'app_login_log', '2022-12-20 00:00:00', '2022-12-21 00:00:00', '2022-12-21 10:41:24', '登录用户数');
INSERT INTO `test`.`flink_01`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '19', 0, 'app_login_log', '2022-12-21 00:00:00', '2022-12-21 15:19:00', '2022-12-21 15:46:27', '登录用户数');CREATE TABLE `flink_02` (`indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',`indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',`indicator_code` int NOT NULL COMMENT '指标编码',`table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',`window_start` datetime NOT NULL COMMENT '窗口开始时间',`window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',`create_time` datetime DEFAULT NULL COMMENT '创建更新时间',`indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '52', 0, 'app_login_log', '2022-12-14 00:00:00', '2022-12-15 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '49', 0, 'app_login_log', '2022-12-15 00:00:00', '2022-12-16 00:00:00', '2022-12-19 18:09:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '62', 0, 'app_login_log', '2022-12-16 00:00:00', '2022-12-17 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '3', 0, 'app_login_log', '2022-12-17 00:00:00', '2022-12-18 00:00:00', '2022-12-19 18:09:25', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '37', 0, 'app_login_log', '2022-12-19 00:00:00', '2022-12-20 00:00:00', '2022-12-20 11:22:02', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '20', 0, 'app_login_log', '2022-12-20 00:00:00', '2022-12-21 00:00:00', '2022-12-21 10:41:24', '登录用户数');
INSERT INTO `test`.`flink_02`(`indicator_name`, `indicator_value`, `indicator_code`, `table_name`, `window_start`, `window_end`, `create_time`, `indicator_description`) VALUES ('all_login_num', '19', 1, 'app_login_log', '2022-12-21 00:00:00', '2022-12-21 15:19:00', '2022-12-21 15:46:27', '登录用户数');CREATE TABLE `flink_merge` (`indicator_name` varchar(255) DEFAULT NULL COMMENT '指标名称',`indicator_value` varchar(255) DEFAULT NULL COMMENT '指标值',`indicator_code` int NOT NULL COMMENT '指标编码',`table_name` varchar(255) NOT NULL COMMENT '指标计算上游表名',`window_start` datetime NOT NULL COMMENT '窗口开始时间',`window_end` datetime DEFAULT NULL COMMENT '窗口截止时间',`create_time` datetime DEFAULT NULL COMMENT '创建更新时间',`indicator_description` varchar(255) DEFAULT NULL COMMENT '指标描述',PRIMARY KEY (`indicator_code`,`table_name`,`window_start`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

所需要的依赖jar:

  1. mysql 的驱动请自行下载
  2. flink-sql 的连接器
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/ 在这里下载flinksql 连接器
org.apache.flinkflink-connector-jdbc_2.111.13.6
  1. flink-cdc 依赖
    https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc%28ZH%29.html#a-name-id-002-a 在这里下载
    在这里插入图片描述

下载后的jar统一放在flink安装目录下的lib目录下即可。

运行程序

package com.test.demo.table.sql;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class flinkcdc {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()
//                .inBatchMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);// 'table-name' = 'flink.*' 意思是读取tablename以flink开头的所有的表tableEnv.executeSql("CREATE TABLE `source_table`\n" +"(\n" +"    `indicator_name`        STRING,\n" +"    `indicator_value`       STRING,\n" +"    `indicator_code`        INT,\n" +"    `table_name`            STRING,\n" +"    `window_start`          TIMESTAMP(0),\n" +"    `window_end`            TIMESTAMP(0),\n" +"    `create_time`           TIMESTAMP,\n" +"    `indicator_description` STRING,\n" +"    PRIMARY KEY (`indicator_code`, `table_name`, `window_start`) NOT ENFORCED\n" +") WITH (\n" +"   'connector' = 'mysql-cdc',\n" +"   'hostname' = '172.18.3.135',\n" +"   'scan.startup.mode' = 'initial',\n" +"   'port' = '3306',\n" +"   'username' = 'root',\n" +"   'password' = '123456',\n" +"   'database-name' = 'test',\n" +"   'table-name' = 'flink.*'\n" +")");//        tableEnv.sqlQuery("select * from MyTable").execute().print();
//查询的时候定义event_time窗口tableEnv.executeSql("CREATE TABLE `flink_merge`\n" +"(\n" +"    `indicator_name`        STRING,\n" +"    `indicator_value`       STRING,\n" +"    `indicator_code`        INT,\n" +"    `table_name`            STRING,\n" +"    `window_start`          TIMESTAMP(0),\n" +"    `window_end`            TIMESTAMP(0),\n" +"    `create_time`           TIMESTAMP,\n" +"    `indicator_description` STRING,\n" +"    PRIMARY KEY (`indicator_code`, `table_name`, `window_start`) NOT ENFORCED\n" +") WITH (\n" +"      'connector' = 'jdbc',\n" +"      'url' = 'jdbc:mysql://172.18.3.135:3306/test',\n" +"      'driver' = 'com.mysql.cj.jdbc.Driver',\n" +"      'username' = 'root',\n" +"      'password' = '123456',\n" +"      'table-name' = 'flink_merge'\n" +")");//直接sql查询tableEnv.executeSql("insert into flink_merge select * from source_table");}
}

总结

按照上面的步骤就可以进行实时同步了, 如果你要在生产环境用建议配置上savepoint 和checkpoint, 这样可以达到断点续传的功能。 文件比较简短适合有一定flink基础的人快速开发,如果你对flink还不是很了解建议先去学下flink相关的知识,再来进行cdc的实验。 flink cdc可以说是以后数据同步的主流,和其他方式相比架构比较简单,而且通过参数控制是否是全量同步,十分友好。
多说一句,目前对flinksql我们公司已不用写代码进行开发了,而是用的streamx框架,streamx框架可以很方便配置savepoint/chekpoints, 以及启动参数,而且可以在web页面启动flinksql 不需要在控制台写一堆参数提交到yarn上,很方便。

相关内容

热门资讯

sony电视安卓系统慢,畅享流... 你有没有遇到过这种情况?家里的索尼电视用着用着,安卓系统突然变得慢吞吞的,就像老牛拉车一样,让人心里...
安卓系统美化软件app,个性化... 你知道吗?手机对于我们现代人来说,简直就是生活的必需品。而手机里最核心的部分,莫过于那个陪伴我们每天...
安卓系统内存如何更改,Andr... 你有没有发现,手机用久了,就像人一样,会变得“臃肿”起来?尤其是安卓系统,内存不够用,简直让人头疼!...
安卓原声系统录屏,还原操作过程 你有没有想过,手机屏幕上的每一个动作,都能被记录下来,变成一段段有趣的视频呢?没错,这就是安卓原声系...
安卓系统底层垃圾清理,Andr... 手机用久了是不是感觉越来越慢了?别急,今天就来给你揭秘安卓系统底层垃圾清理的奥秘,让你的手机焕发新生...
安卓系统怎么恢复页面,轻松找回... 手机里的安卓系统突然间页面乱糟糟的,是不是让你头疼不已?别急,今天就来手把手教你如何恢复安卓系统的页...
安卓如何系统更新软件,轻松掌握... 亲爱的安卓用户们,你是否也和我一样,时不时地收到系统更新通知,心里痒痒的想要知道这些更新到底有什么新...
安卓系统英语单词,Explor... 你知道吗?在智能手机的世界里,有一个系统可是无人不知、无人不晓,那就是安卓系统!它就像一位全能的魔术...
领克升级安卓系统,畅享科技新篇... 你知道吗?最近我的小领克车又升级了安卓系统,简直就像给它换了个新灵魂,让我对它爱不释手。今天就来给你...
安卓手机开机系统优化,解锁流畅... 你那安卓手机是不是最近有点儿慢吞吞的?别急,今天就来给你支几招,让你的安卓手机焕发新生,速度飞快,就...
奇葩软件推荐安卓系统,盘点那些... 你知道吗?在安卓系统的世界里,竟然隐藏着这么些奇葩软件,它们不仅功能独特,而且让人眼前一亮。今天,就...
安卓系统壁纸清除软件,还原手机... 手机里的壁纸是不是已经看腻了?想要换一换心情,却又发现那些可爱的壁纸怎么也清除不掉?别急,今天就来给...
还原原来的安卓系统,深度解析系... 你有没有发现,手机用久了,系统越来越卡,功能也越来越不人性化?别急,今天就来教你一招,还原原来的安卓...
miyou是安卓系统吗,安卓系... 亲爱的读者,你是否曾好奇过,那些在手机上如影随形的APP,它们背后的操作系统究竟是怎样的呢?今天,我...
安卓系统查询器,系统查询器功能... 你有没有想过,你的安卓手机里藏着多少秘密?别惊讶,今天就要带你一探究竟,揭开安卓系统查询器的神秘面纱...
系统之家的安卓手机,畅享科技魅... 你有没有发现,最近身边的朋友都换上了新手机?没错,说的就是那些性能强大、设计时尚的安卓手机。今天,就...
美版系统升级安卓,畅享智能生活... 你知道吗?最近手机界可是热闹非凡呢!尤其是那些安卓用户,他们的手机系统又要迎来一场大变身——美版系统...
安卓系统方位键在哪,方位键布局... 你是不是在用安卓手机的时候,突然发现方位键不见了?别急,别急,让我来给你详细地介绍一下安卓系统方位键...
安卓系统如何设置快,安卓系统快... 你有没有发现,手机用久了,操作起来总觉得有点慢吞吞的?别急,今天就来教你怎么给安卓系统来个“加速大法...
安卓系统源自于什么,安卓系统的... 你有没有想过,我们每天离不开的安卓系统,它究竟是从哪里来的呢?是不是觉得它就像是从天而降的神奇宝贝,...