PyFlink 流处理基础实例 MySQL CDC方式实时备份
时间:2022-08-11 00:30:00
01 JDBC SQL 连接器
JDBC 允许使用连接器 JDBC 驱动读取或写入任何类型的关系数据库。
如果在 DDL 主键中定义,JDBC sink 将以 upsert 与外部系统交换模式 UPDATE/DELETE 消息;否则,它将以 append 与外部系统交换模式消息且不支持消费 UPDATE/DELETE 消息。
1.1 下载依赖包
实现关系型数据库 Flink 通过建立 JDBC 执行连接器 SQL 查询,下载 flink-connector-jdbc
下载地址为依赖包 flink-connector-jdbc_2.11-1.14.0.jar
在连接到具体数据库时,也需要对应的驱动依赖,MySql的驱动 jar 包下载地址为 mysql-connector-java
1.2 创建 JDBC 表
在操作中加入上述依赖包后使用 Table API 的 JDBC table 定义如下:
-- 在 Flink SQL 中注册一张 MySQL 表 'users' CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users' ); -- 从另一张表 "T" 写入数据 JDBC 表中 INSERT INTO MyUserTable SELECT id, name, age, status FROM T; -- 查看 JDBC 表中的数据 SELECT id, name, age, status FROM MyUserTable; -- JDBC 作为时态表关联中的维表 SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = MyUserTable.id;
JDBC SQL 连接器的一些参数和含义如下:
connector
:这里应该指定使用哪种类型的连接器。jdbc’。url
:JDBC 数据库 url。table-name
:连接到 JDBC 表的名称。driver
:用于连接此 URL 的 JDBC 如果不设置驱动类名,将自动从 URL 中推导。username
:JDBC 如果指定了用户名 ‘username’ 和 ‘password’ 两者都必须指定任何参数。password
:JDBC 密码。
02 JAR 依赖管理
第三方在作业中需要使用的话 jar 包,Python Table API 提供以下说明 jar 包
# 方法1:本地上传 table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") # 方法2:运行时加载 table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
本地上传方式,通过将本地文件 URL 指向的 jar 包上传到 Flink 集合。通过配置 pipeline.jars
指定 jar包的 URL 列表,不同 jar 分号用于包之间 ;
隔开。
指定操作时的加载方式,以确保 URL 可访问客户端和集群 jar 包路径规则,作业运行时根据路径加载指定 jar 包。
Python DataStream API 提供了类似的方法指示 jar 包:本地上传 add_jars()
运行时加载 add_classpaths()
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar") stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
03 实时 MySql CDC数据备份实例
本实例展示了如何使用 Flink 进行 MySQL 实时同步数据库。通过监控实例 当前 MySQL 数据库的 binlog 然后根据数据变更 binlog 日志实时同步数据变更 MySQL 数据库中。
3.1 Flink CDC 连接器
CDC 简介及用途
CDC (change data capture)
,也就是说,变化数据捕获是备份数据库的一种方式,常用于备份大量数据。
CDC 分为入侵式和非入侵式:
本实例基于非侵入性 MySql 的 binlog 备份日志。
因此,有必要实现基于日志的基础 CDC 数据备份, MySQL 就是要开启 binlog 选项。 MySQL 8.0.X 是默认开启 binlog 是的,您可以执行以下命令并查看它 log_bin
变量值是否为 ON
来检查 MySQL 是否开启 binlog。
show variables like '%log_bin%';
使用 Flink CDC 连接器
首先下载 MySql CDC 连接器 jar 包,下载地址为 flink-sql-connector-mysql-cdc
使用 Table API 可创建如下 mysql cdc table, 通过 MySQL-CDC 连接器从 MySQL 的 binlog 里提取更改
-- creates a mysql cdc table source CREATE TABLE mysql_binlog ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
3.2 实时 MySql CDC数据备份实例
(1)初始化流处理环境并指定依赖 jar 包
为了在 Flink 中实现 MySQL 的 CDC,我们需要准备下面 3 个 jar 包:
flink-connector-jdbc_2.11-1.14.0.jar
:通过 JDBC 连接器来从 MySQL 里读取或写入数据;用于创建 sink 表,定义连接器为 jdbcmysql-connector-java-8.0.16.jar
:JDBC 连接器的驱动( 帮助 java 连接 MySQL )flink-sql-connector-mysql-cdc-2.0.2.jar
:通过 MySQL-CDC 连接器从 MySQL 的 binlog 里提取更改;用于创建 source 表,定义连接器为 mysql-cdc
JAR 名称 | 下载地址 |
---|---|
flink-connector-jdbc_2.11-1.14.0.jar | 下载地址 |
mysql-connector-java-8.0.16.jar | 下载地址 |
flink-sql-connector-mysql-cdc-2.0.2.jar | 下载地址 |
初始化流处理环境并指定依赖 jar 包
# 初始化流处理环境
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# 编辑 jar 包 URLs
jars = []
for file in os.listdir(os.path.abspath(os.path.dirname(__file__))):
if file.endswith('.jar'):
jars.append(os.path.abspath(file))
str_jars = ';'.join(['file://' + jar for jar in jars])
# 指定 jar 依赖
t_env.get_config().get_configuration().set_string("pipeline.jars", str_jars)
(2)创建 MySql CDC 源表
# 创建 MySql CDC 源表
mysql_cdc_ddl = """ CREATE TABLE source ( id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3307', 'database-name' = 'flink', 'table-name' = 'user', 'username' = 'root', 'password' = 'root' ) """
t_env.execute_sql(mysql_cdc_ddl)
(3)创建数据同步的 JDBC 输出表
将数据写入外部数据库时,Flink 使用 DDL 中定义的主键。
如果定义了主键,则连接器将在 upsert 模式下运行,否则,连接器将在追加模式下运行。
- 在 upsert 模式下,Flink 将根据主键插入新行或更新现有行,Flink 可以通过这种方式确保幂等性。
- 在追加模式下,Flink 将所有记录解释为 INSERT 消息,如果在基础数据库中发生主键或唯一约束冲突,则 INSERT 操作可能会失败。
# 创建数据同步的 JDBC 输出表
jdbc_sink_ddl = """ CREATE TABLE sink ( id INT, name STRING, PRIMARY KEY (id) NOT ENFORCED -- 需要定义主键,让连接器在 upsert 模式下运行 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://127.0.0.1:3308/flink', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = 'user', 'username' = 'root', 'password' = 'root' ) """
t_env.execute_sql(jdbc_sink_ddl)
(4)运行实例
指向 MySql CDC 实时数据备份作业之前,我们需要准备实验环境:两个 mysql 数据库。
我们用 docker 容器快速构建实验环境,除了两个 mysql 数据库,还创建一个 adminer 容器:
- mysql1 容器作为待同步的数据源
- mysql2 容器作为备份的数仓
- adminer 容器允许我们在本地没有安装 mysql 客户端的情况下,使用网页来查看和操作 mysql 容器
2 个 mysql 容器 + 1 个 adminer 容器,编写容器编排 docker-compose.yml
如下:
version: "3.5"
services:
mysql1:
image: mysql:8.0.22 # 5.7 版本本地连接不上
command: [
'--default-authentication-plugin=mysql_native_password',
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci',
'--log-bin=mysql-bin',
]
ports:
- 3307:3306
environment:
MYSQL_ROOT_PASSWORD: root
volumes:
- ./examples/mysql:/docker-entrypoint-initdb.d
container_name: mysql1
mysql2:
image: mysql:8.0.22 # 5.7 版本本地连接不上
command: [
'--default-authentication-plugin=mysql_native_password',
'--character-set-server=utf8mb4',
'--collation-server=utf8mb4_unicode_ci'
]
ports:
- 3308:3306 # 第二个数据库的端口是 3307
environment:
MYSQL_ROOT_PASSWORD: root
volumes:
- ./examples/mysql:/docker-entrypoint-initdb.d
container_name: mysql2
adminer:
image: adminer
ports:
- 8089:8080
container_name: adminer
使用 docker-compose up
命令启动容器环境,如果出现错误请检查容器端口映射是否冲突。
实验环境启动之后执行 MySql CDC 实时数据备份作业代码:
python flink_mysql_cdc.py
如果本地有 MySQL 客户端,可以同时连接 3307 和 3308 这两个端口(账密都是 root)。
在 3307 对应的实例上的 flink 数据库的 user 这张表里做相应的增删改,然后看看 3308 对应的同名的表里,是否已经实时同步了数据。
(5)实例完整代码
参考资料
Flink 官方文档 JDBC SQL 连接器
Flink 官方文档 依赖管理
flink-cdc-connectors 官方文档
PyFlink 从入门到精通