Flink CDC 基本使用
简介
Flink CDC(Change Data Capture)是一种用于实时数据流处理的技术。它可以监控关系型数据库的变化,将变化的数据抽取出来并转化成可以被Flink处理的数据流。
这样,我们就可以在Flink中对数据进行实时的处理和分析。
Flink CDC 在 Spring Boot 中的应用
准备 MySQL 数据库
init.sql 文件
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE users
(
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL
);
INSERT INTO users (name, email)
VALUES ('John Doe', 'john@example.com');
docker-compose 文件
services:
mysql:
image: mysql:8
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- TZ=Asia/Shanghai
volumes:
- db_data:/var/lib/mysql
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
volumes:
db_data:
创建一个Spring Boot项目
使用 IDEA 创建一个 Spring Boot 项目,选择 Gradle 作为构建工具
添加依赖
在 build.gradle
中添加 Flink CDC 的依赖
implementation group: 'com.ververica', name: 'flink-connector-mysql-cdc', version: '2.3.0'
implementation group: 'org.apache.flink', name: 'flink-connector-base', version: '1.16.1'
implementation group: 'org.apache.flink', name: 'flink-table-runtime', version: '1.16.1'
implementation group: 'org.apache.flink', name: 'flink-runtime-web', version: '1.16.1'
写一个自定义的 Sink
package com.whb.flinkcdcdemo;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@Slf4j
public class MySink extends RichSinkFunction<String> {
@Override
public void invoke(String value, Context context) throws Exception {
log.info("MySink invoke: {}", value);
}
@Override
public void open(Configuration parameters) throws Exception {
log.info("MySink open");
}
@Override
public void close() throws Exception {
log.info("MySink close");
}
}
编写一个 Flink 作业
package com.whb.flinkcdcdemo;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FlinkCdcDemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(FlinkCdcDemoApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
// 创建数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
.databaseList("mydb")
.tableList("mydb.users")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
// 包含表结构变更
.includeSchemaChanges(true)
.build();
Configuration config = new Configuration();
// 设置web ui端口
config.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
// 指定检查点之间的时间间隔(毫秒)
env.enableCheckpointing(3000)
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 数据源并行度 默认为CPU的核数
.setParallelism(2)
.addSink(new MySink())
// 并行任务数 默认为CPU的核数
.setParallelism(4)
.name("MySink");
env.execute("Print MySQL Snapshot + Binlog");
}
}
运行项目
./gradlew bootRun
-
打开浏览器访问 http://localhost:8081
可以看到 Flink 的 Dashboard -
插入数据
INSERT INTO mydb.users (name, email)
VALUES ('whb', 'whb@qq.com')
- 插入数据日志
MySink invoke: {"before":null,"after":{"id":2,"name":"whb","email":"whb@qq.com"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1679285096000,"snapshot":"false","db":"mydb","sequence":null,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":372,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1679285096301,"transaction":null}
- 修改数据
UPDATE mydb.users
SET email = 'whb@whb.com'
WHERE id = 2
- 修改数据日志
MySink invoke: {"before":{"id":2,"name":"whb","email":"whb@qq.com"},"after":{"id":2,"name":"whb","email":"whb@whb.com"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1679285399000,"snapshot":"false","db":"mydb","sequence":null,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":685,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1679285399799,"transaction":null}
- 删除数据
DELETE
FROM mydb.users
WHERE id = 2
- 删除数据日志
MySink invoke: {"before":{"id":2,"name":"whb","email":"whb@whb.com"},"after":null,"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1679285515000,"snapshot":"false","db":"mydb","sequence":null,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":1013,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1679285515884,"transaction":null}
这是一个 Flink Debezium CDC(Change Data Capture)源的数据格式,表示一个 MySQL 数据库表中的一条数据变更记录。下面是每个字段的含义:
before
字段:表示变更前的数据记录,是一个 JSON 对象。after
字段:表示变更后的数据记录,是一个 JSON 对象。如果这是一条删除操作,则该字段为 null。source
字段:表示变更的来源信息,是一个 JSON 对象。包含以下字段:version
:Debezium 版本号。connector
:连接器类型,这里是 MySQL。name
:连接器名称。ts_ms
:变更事件的时间戳,以毫秒为单位。snapshot
:是否是快照事件。db
:数据库名称。sequence
:事件序列号。table
:表名称。server_id
:MySQL 服务器 ID。gtid
:全局事务 ID。file
:MySQL binlog 文件名。pos
:binlog 文件中的位置。row
:binlog 文件中的行号。thread
:处理事件的线程信息。query
:执行的 SQL 语句。
op
字段:表示操作类型,包括插入(c
)、更新(u
)和删除(d
)。ts_ms
字段:事件时间戳,以毫秒为单位。transaction
字段:表示事务信息,是一个 JSON 对象。包含以下字段:id
:事务 ID。total_order
:事务总序列号。snapshot
:是否是快照事件。data_collection_order
:数据采集序列号。
总结
通过 Flink CDC,我们可以方便地将关系型数据库中的数据实时同步到 Flink 流处理引擎中,并进行实时处理。本文介绍了 Flink CDC
的基本使用方法,希望能对大家有所帮助。
参考链接
https://ververica.github.io/flink-cdc-connectors/master/index.html
https://github.com/ververica/flink-cdc-connectors
https://nightlies.apache.org/flink/flink-docs-master/