Canal+Kafka实现MySQL与Redis数据同步(2)MQ消费者同步
application.kafka配置信息加上yml配置文件:
spring: kafka: # Kafka服务地址 bootstrap-servers: 127.0.0.1:9092 consumer: # 指定默认组名 group-id: consumer-group1 #序列化反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288
根据上述kafka消费命令,我们知道json数据的结构,可以创建canalbean接收对象:
public class CanalBean { //数据 private List<TbCommodityInfo> data; //数据库名称 private String database; private long es; //递增,从1开始 private int id; ///是否是DDL语句 private boolean isDdl; ///表结构的字段类型 private MysqlType mysqlType; ///UPDATE语句,旧数据 private String old; ///主键名称 private List<String> pkNames; ///sql语句 private String sql; private SqlType sqlType; //表名 private String table; private long ts; ///(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等 private String type; //getter、setter方法}public class MysqlType { private String id; private String commodity_name; private String commodity_price; private String number; private String description; //getter、setter方法}public class SqlType { private int id; private int commodity_name; private int commodity_price; private int number; private int description;}
最后,可以创建消费者canalconsumer进行消费:
@Componentpublic class CanalConsumer { //日志记录 private static Logger log = LoggerFactory.getLogger(CanalConsumer.class); ////redis操作工具 @Resource private RedisClient redisClient; ///监听队列名称为:canaltopic @KafkaListener(topics = "canaltopic") public void receive(ConsumerRecord<?, ?> consumer) { String value = (String) consumer.value(); log.info("topic名称:{},key:分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value); //转换为javaBean CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class); ///是否获得DDL语句? boolean isDdl = canalBean.getIsDdl(); //获取类型 String type = canalBean.getType(); ///不是DDL语句 if (!isDdl) { List<TbCommodityInfo> tbCommodityInfos = canalBean.getData(); //过期时间 long TIME_OUT = 600L; if ("INSERT".equals(type)) { ///新句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); ///添加到redis中,过期时间为10分钟 redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT); } } else if ("UPDATE".equals(type)) { ///更新句子 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); ///更新到redis,到期时间为10分钟 redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT); } } else { ///删除句子 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); ///从redis中删除 redisClient.deleteKey(id); } } } }}
同步测试MySQL和Redismysql对应的表结构如下:
CREATE TABLE `tb_commodity_info` ( `id` varchar(32) NOT NULL, `commodity_name` varchar(512) DEFAULT NULL COMMENT "商品名称", `commodity_price` varchar(36) DEFAULT '0' COMMENT 商品价格, `number` int(10) DEFAULT '0' COMMENT "商品数量", `description` varchar(2048) DEFAULT '' COMMENT 商品描述, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=“商品信息表”;
首先在MySQL中创建表格。然后启动项目,然后添加一个新的数据:
INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES (3e71a81fd8071eaed600163e046c3, '叉包', '3.99', '3', 大叉包,老喜欢”;
tb_commodity_info表查找新数据:
Redis还找到了相应的数据,证明同步成功!
如果更新?试试Update语句:
UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`="青菜包",`description`=便宜的青菜包’ WHERE `id`=3e71a81fd8071eaed600163e046cc3;
没有问题!
总结canal的缺点:
- canal只能同步增量数据。
- 不是实时同步,而是准实时同步。
- 有一些bug,但社区活动较高,可以及时修复提出的bug。
- MQ顺序性问题。网络回答,请参考
虽然有一些缺点,但毕竟没有完美的技术(产品),合适是最重要的。本文由博客群发等运营工具平台发送 OpenWrite 发布