当前位置: 首页 > 图灵资讯 > 技术篇> Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

来源:图灵教育
时间:2023-11-19 17:22:01

Canal+Kafka实现MySQL与Redis数据同步(2)MQ消费者同步

application.kafka配置信息加上yml配置文件:

spring:  kafka:      # Kafka服务地址    bootstrap-servers: 127.0.0.1:9092    consumer:      # 指定默认组名      group-id: consumer-group1      #序列化反序列化      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    producer:      key-serializer: org.apache.kafka.common.serialization.StringDeserializer      value-serializer: org.apache.kafka.common.serialization.StringDeserializer      # 批量抓取      batch-size: 65536      # 缓存容量      buffer-memory: 524288

根据上述kafka消费命令,我们知道json数据的结构,可以创建canalbean接收对象:

public class CanalBean {    //数据    private List<TbCommodityInfo> data;    //数据库名称    private String database;    private long es;    //递增,从1开始    private int id;    ///是否是DDL语句    private boolean isDdl;    ///表结构的字段类型    private MysqlType mysqlType;    ///UPDATE语句,旧数据    private String old;    ///主键名称    private List<String> pkNames;    ///sql语句    private String sql;    private SqlType sqlType;    //表名    private String table;    private long ts;    ///(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等    private String type;    //getter、setter方法}public class MysqlType {    private String id;    private String commodity_name;    private String commodity_price;    private String number;    private String description;    //getter、setter方法}public class SqlType {    private int id;    private int commodity_name;    private int commodity_price;    private int number;    private int description;}

最后,可以创建消费者canalconsumer进行消费:

@Componentpublic class CanalConsumer {    //日志记录    private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);    ////redis操作工具    @Resource    private RedisClient redisClient;    ///监听队列名称为:canaltopic    @KafkaListener(topics = "canaltopic")    public void receive(ConsumerRecord<?, ?> consumer) {        String value = (String) consumer.value();        log.info("topic名称:{},key:分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);        //转换为javaBean        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);        ///是否获得DDL语句?        boolean isDdl = canalBean.getIsDdl();        //获取类型        String type = canalBean.getType();        ///不是DDL语句        if (!isDdl) {            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();            //过期时间            long TIME_OUT = 600L;            if ("INSERT".equals(type)) {                ///新句                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                    String id = tbCommodityInfo.getId();                    ///添加到redis中,过期时间为10分钟                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);                }            } else if ("UPDATE".equals(type)) {                ///更新句子                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                    String id = tbCommodityInfo.getId();                    ///更新到redis,到期时间为10分钟                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);                }            } else {                ///删除句子                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {                    String id = tbCommodityInfo.getId();                    ///从redis中删除                    redisClient.deleteKey(id);                }            }        }    }}
同步测试MySQL和Redis

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (  `id` varchar(32) NOT NULL,  `commodity_name` varchar(512) DEFAULT NULL COMMENT "商品名称",  `commodity_price` varchar(36) DEFAULT '0' COMMENT 商品价格,  `number` int(10) DEFAULT '0' COMMENT "商品数量",  `description` varchar(2048) DEFAULT '' COMMENT 商品描述,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=“商品信息表”;

首先在MySQL中创建表格。然后启动项目,然后添加一个新的数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES (3e71a81fd8071eaed600163e046c3, '叉包', '3.99', '3', 大叉包,老喜欢”;

tb_commodity_info表查找新数据:

img

Redis还找到了相应的数据,证明同步成功!

img

如果更新?试试Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`="青菜包",`description`=便宜的青菜包’ WHERE `id`=3e71a81fd8071eaed600163e046cc3;

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,而是准实时同步。
  3. 有一些bug,但社区活动较高,可以及时修复提出的bug。
  4. MQ顺序性问题。网络回答,请参考img

虽然有一些缺点,但毕竟没有完美的技术(产品),合适是最重要的。本文由博客群发等运营工具平台发送 OpenWrite 发布