首页>>互联网>>大数据->一文讲清楚FusionInsight MRS CDL如何使用

一文讲清楚FusionInsight MRS CDL如何使用

时间:2023-11-29 本站 点击:1

摘要:CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。

本此实践介绍以mysql作为数据源进行数据抓取。

前提条件

MRS集群已安装CDL服务。

MySQL数据库需要开启mysql的bin log功能(默认情况下是开启的)。

查看MySQL是否开启bin log:

使用工具或者命令行连接MySQL数据库(本示例使用navicat工具连接),执行show variables like 'log_%'命令查看。

例如在navicat工具选择"File> New Query"新建查询,输入如下SQL命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。

show variableslike 'log_%'

工具准备

现在cdl只能使用rest api的方式进行命令提交,所以需要提前安装工具进行调试。本文使用VSCode工具。

完成之后安装rest client插件:

完成之后创建一个cdl.http的文件进行编辑:

创建CDL任务

CDL任务创建的流程图如下所示:

说明:需要先创建一个MySQL link, 在创建一个Kafka link, 然后再创建一个CDL同步任务并启动。

MySQL link部分rest请求代码

@hostname=172.16.9.113@port=21495@host={{hostname}}:{{port}}@bootstrap="172.16.9.113:21007"@bootstrap_normal="172.16.9.113:21005"@mysql_host="172.16.2.118"@mysql_port="3306"@mysql_database="hudi"@mysql_user="root"@mysql_password="Huawei@123"###getlinksgethttps://{{host}}/api/v1/cdl/link###mysqllinkvalidateposthttps://{{host}}/api/v1/cdl/link?validate=truecontent-type:application/json{"name":"MySQL_link",//link名,全局唯一,不能重复"description":"MySQLconnection",//link描述"link-type":"mysql",//link的类型"enabled":"true","link-config-values":{"inputs":[{"name":"host","value":{{mysql_host}}},//数据库安装节点的ip{"name":"port","value":{{mysql_port}}},//数据库监听的端口{"name":"database.name","value":{{mysql_database}}},//连接的数据库名{"name":"user","value":{{mysql_user}}},//用户{"name":"password","value":{{mysql_password}}},//密码{"name":"schema","value":{{mysql_database}}}//同数据库名]}}###mysqllinkcreateposthttps://{{host}}/api/v1/cdl/linkcontent-type:application/json{"name":"MySQL_link",//link名,全局唯一,不能重复"description":"MySQLconnection",//link描述"link-type":"mysql",//link的类型"enabled":"true","link-config-values":{"inputs":[{"name":"host","value":{{mysql_host}}},//数据库安装节点的ip{"name":"port","value":{{mysql_port}}},//数据库监听的端口{"name":"database.name","value":{{mysql_database}}},//连接的数据库名{"name":"user","value":{{mysql_user}}},//用户{"name":"password","value":{{mysql_password}}},//密码{"name":"schema","value":{{mysql_database}}}//同数据库名]}}###mysqllinkupdateputhttps://{{host}}/api/v1/cdl/link/MySQL_linkcontent-type:application/json{"name":"MySQL_link",//link名,全局唯一,不能重复"description":"MySQLconnection",//link描述"link-type":"mysql",//link的类型"enabled":"true","link-config-values":{"inputs":[{"name":"host","value":{{mysql_host}}},//数据库安装节点的ip{"name":"port","value":{{mysql_port}}},//数据库监听的端口{"name":"database.name","value":{{mysql_database}}},//连接的数据库名{"name":"user","value":{{mysql_user}}},//用户{"name":"password","value":{{mysql_password}}},//密码{"name":"schema","value":{{mysql_database}}}//同数据库名]}}

Kafka link部分rest请求代码

###getlinksgethttps://{{host}}/api/v1/cdl/link###kafkalinkvalidateposthttps://{{host}}/api/v1/cdl/link?validate=truecontent-type:application/json{"name":"kafka_link","description":"testkafkalink","link-type":"kafka","enabled":"true","link-config-values":{"inputs":[{"name":"bootstrap.servers","value":"172.16.9.113:21007"},{"name":"sasl.kerberos.service.name","value":"kafka"},{"name":"security.protocol","value":"SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]}}###kafkalinkcreateposthttps://{{host}}/api/v1/cdl/linkcontent-type:application/json{"name":"kafka_link","description":"testkafkalink","link-type":"kafka","enabled":"true","link-config-values":{"inputs":[{"name":"bootstrap.servers","value":"172.16.9.113:21007"},{"name":"sasl.kerberos.service.name","value":"kafka"},{"name":"security.protocol","value":"SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]}}###kafkalinkupdateputhttps://{{host}}/api/v1/cdl/link/kafka_linkcontent-type:application/json{"name":"kafka_link","description":"testkafkalink","link-type":"kafka","enabled":"true","link-config-values":{"inputs":[{"name":"bootstrap.servers","value":"172.16.9.113:21007"},{"name":"sasl.kerberos.service.name","value":"kafka"},{"name":"security.protocol","value":"SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]}}

CDL任务命令部分rest请求代码

@hostname=172.16.9.113@port=21495@host={{hostname}}:{{port}}@bootstrap="172.16.9.113:21007"@bootstrap_normal="172.16.9.113:21005"@mysql_host="172.16.2.118"@mysql_port="3306"@mysql_database="hudi"@mysql_user="root"@mysql_password="Huawei@123"###createjobposthttps://{{host}}/api/v1/cdl/jobcontent-type:application/json{"job_type":"CDL_JOB",//job类型,目前只支持CDL_JOB这一种"name":"mysql_to_kafka",//job名称"description":"mysql_to_kafka",//job描述"from-link-name":"MySQL_link",//数据源Link"to-link-name":"kafka_link",//目标源Link"from-config-values":{"inputs":[{"name":"connector.class","value":"com.huawei.cdc.connect.mysql.MysqlSourceConnector"},{"name":"schema","value":"hudi"},{"name":"db.name.alias","value":"hudi"},{"name":"whitelist","value":"hudisource"},{"name":"tables","value":"hudisource"},{"name":"tasks.max","value":"10"},{"name":"mode","value":"insert,update,delete"},{"name":"parse.dml.data","value":"true"},{"name":"schema.auto.creation","value":"false"},{"name":"errors.tolerance","value":"all"},{"name":"multiple.topic.partitions.enable","value":"false"},{"name":"topic.table.mapping","value":"[{\"topicName\":\"huditableout\",\"tableName\":\"hudisource\"}]"},{"name":"producer.override.security.protocol","value":"SASL_PLAINTEXT"},//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT{"name":"consumer.override.security.protocol","value":"SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT]},"to-config-values":{"inputs":[]},"job-config-values":{"inputs":[{"name":"global.topic","value":"demo"}]}}###getalljobgethttps://{{host}}/api/v1/cdl/job###submitjobputhttps://{{host}}/api/v1/cdl/job/mysql_to_kafka/start###getjobstatusgethttps://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka###stopjobputhttps://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop###deletejobDELETEhttps://{{host}}/api/v1/cdl/job/mysql_to_kafka

场景验证

生产库MySQL原始数据如下:

提交CDL任务之后

增加操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);

对应kafka消息体:

更改操作: UPDATE hudi.hudisource SET uname=‘AnneMarie333’ WHERE uid=11;

对应kafka消息体:

删除操作:delete from hudi.hudisource where uid=11;

对应kafka消息体:

本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/BigData/1244.html