Flink CDC 基本使用

Flink CDC 基本使用

简介

Flink CDC(Change Data Capture)是一种用于实时数据流处理的技术。它可以监控关系型数据库的变化,将变化的数据抽取出来并转化成可以被Flink处理的数据流。
这样,我们就可以在Flink中对数据进行实时的处理和分析。

准备 MySQL 数据库

init.sql 文件

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE users
(
    id    INT PRIMARY KEY AUTO_INCREMENT,
    name  VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL
);
INSERT INTO users (name, email)
VALUES ('John Doe', 'john@example.com');

docker-compose 文件

services:
  mysql:
    image: mysql:8
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - TZ=Asia/Shanghai
    volumes:
      - db_data:/var/lib/mysql
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
volumes:
  db_data:

创建一个Spring Boot项目

使用 IDEA 创建一个 Spring Boot 项目,选择 Gradle 作为构建工具

添加依赖

build.gradle 中添加 Flink CDC 的依赖

implementation group: 'com.ververica', name: 'flink-connector-mysql-cdc', version: '2.3.0'
implementation group: 'org.apache.flink', name: 'flink-connector-base', version: '1.16.1'
implementation group: 'org.apache.flink', name: 'flink-table-runtime', version: '1.16.1'
implementation group: 'org.apache.flink', name: 'flink-runtime-web', version: '1.16.1'

写一个自定义的 Sink

package com.whb.flinkcdcdemo;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

@Slf4j
public class MySink extends RichSinkFunction<String> {
    @Override
    public void invoke(String value, Context context) throws Exception {
        log.info("MySink invoke: {}", value);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        log.info("MySink open");
    }

    @Override
    public void close() throws Exception {
        log.info("MySink close");
    }
}
package com.whb.flinkcdcdemo;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FlinkCdcDemoApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(FlinkCdcDemoApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 创建数据源
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(3306)
                .databaseList("mydb")
                .tableList("mydb.users")
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema())
                // 包含表结构变更
                .includeSchemaChanges(true)
                .build();

        Configuration config = new Configuration();
        // 设置web ui端口
        config.setInteger(RestOptions.PORT, 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
        // 指定检查点之间的时间间隔(毫秒)
        env.enableCheckpointing(3000)
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // 数据源并行度 默认为CPU的核数
                .setParallelism(2)
                .addSink(new MySink())
                // 并行任务数 默认为CPU的核数
                .setParallelism(4)
                .name("MySink");

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

运行项目

./gradlew bootRun
INSERT INTO mydb.users (name, email)
VALUES ('whb', 'whb@qq.com')
  • 插入数据日志
MySink invoke: {"before":null,"after":{"id":2,"name":"whb","email":"whb@qq.com"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1679285096000,"snapshot":"false","db":"mydb","sequence":null,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":372,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1679285096301,"transaction":null}
  • 修改数据
UPDATE mydb.users
SET email = 'whb@whb.com'
WHERE id = 2
  • 修改数据日志
MySink invoke: {"before":{"id":2,"name":"whb","email":"whb@qq.com"},"after":{"id":2,"name":"whb","email":"whb@whb.com"},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1679285399000,"snapshot":"false","db":"mydb","sequence":null,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":685,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1679285399799,"transaction":null}
  • 删除数据
DELETE
FROM mydb.users
WHERE id = 2
  • 删除数据日志
MySink invoke: {"before":{"id":2,"name":"whb","email":"whb@whb.com"},"after":null,"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1679285515000,"snapshot":"false","db":"mydb","sequence":null,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":1013,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1679285515884,"transaction":null}

这是一个 Flink Debezium CDC(Change Data Capture)源的数据格式,表示一个 MySQL 数据库表中的一条数据变更记录。下面是每个字段的含义:

  • before 字段:表示变更前的数据记录,是一个 JSON 对象。
  • after 字段:表示变更后的数据记录,是一个 JSON 对象。如果这是一条删除操作,则该字段为 null。
  • source 字段:表示变更的来源信息,是一个 JSON 对象。包含以下字段:
    • version:Debezium 版本号。
    • connector:连接器类型,这里是 MySQL。
    • name:连接器名称。
    • ts_ms:变更事件的时间戳,以毫秒为单位。
    • snapshot:是否是快照事件。
    • db:数据库名称。
    • sequence:事件序列号。
    • table:表名称。
    • server_id:MySQL 服务器 ID。
    • gtid:全局事务 ID。
    • file:MySQL binlog 文件名。
    • pos:binlog 文件中的位置。
    • row:binlog 文件中的行号。
    • thread:处理事件的线程信息。
    • query:执行的 SQL 语句。
  • op 字段:表示操作类型,包括插入(c)、更新(u)和删除(d)。
  • ts_ms 字段:事件时间戳,以毫秒为单位。
  • transaction 字段:表示事务信息,是一个 JSON 对象。包含以下字段:
    • id:事务 ID。
    • total_order:事务总序列号。
    • snapshot:是否是快照事件。
    • data_collection_order:数据采集序列号。

总结

通过 Flink CDC,我们可以方便地将关系型数据库中的数据实时同步到 Flink 流处理引擎中,并进行实时处理。本文介绍了 Flink CDC
的基本使用方法,希望能对大家有所帮助。

参考链接

https://ververica.github.io/flink-cdc-connectors/master/index.html
https://github.com/ververica/flink-cdc-connectors
https://nightlies.apache.org/flink/flink-docs-master/