Canal+Kafka实现MySQL与Redis数据同步(1)
前言在许多业务条件下,我们将在系统中添加redis缓存进行查询优化。
如果数据库数据更新,则需要在业务代码中同步更新redis代码。
这种数据同步代码与业务代码结合在一起会不是很优雅。答案是肯定的,可以抽出这些数据同步代码形成独立的模块。
架构图canal是伪装成slave订阅mysql的binlog,实现数据同步的中间件。
使用canal最简单的方法是tcp模式。
事实上,canal支持直接发送到MQ。目前,最新版本支持三种主流MQ:Kafka、RocketMQ、RabbitMQ。目前canal的RabitMQ模式有一定的bug,所以一般使用Kafka或RocketMQ。
Kafka用于实现Redis与MySQL的数据同步。架构图如下:
通过架构图,我们清楚地知道要使用的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
搭建Kafka首先,在官网下载安装包:
解压,打开/config/server.properties配置文件,修改日志目录:
首先启动Zookeeper,我用3.6.1版本:
然后启动Kafka,在Kafkabin目录下打开cmd,输入命令:
kafka-server-start.bat ../../config/server.properties
在Zookeeeper上注册了Kafka相关配置信息:
然后创建一个接收canal传输的数据的队列,并使用命令:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic
创建的队列名是canaltopic
。
canal官网下载相关安装包:
找到canal.deployer-1.1.4/conf目录下的canal.properties配置文件:
# tcp, kafka, RocketMQ 这里选择kafka模式canalal.serverMode = kafka# 分析器的线程数,打开此配置,不打开就会堵塞或不进行分析.instance.parser.parallelThreadSize = 16# 配置MQ服务地址,这里配置了kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录中有example同名目录,可配置多个canal.destinations = example
然后配置instance,找到/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql中执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canall.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0
配置完成后,可以启动canal。
测试这时可以打开kafka的消费者窗口,测试kafka是否收到消息。
监控消费使用命令:
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
这里使用win10系统的cmd命令行,win10系统的默认编码是GBK,而canal Server是UTF-8编码,所以控制台会出现乱码:
在执行cmd命令之前,切换到UTF-8编码,使用命令:chcp 65001
然后执行打开kafka消费端的命令,不乱码:
下一步是启动Redis,将数据同步到Redis。
Redis客户端封装环境建设完成后,我们可以编写代码。
首先引入Kafka和Redis的maven依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>
application.添加以下配置:yml文件:
spring: redis: host: 127.0.0.1 port: 6379 database: 0 password: 123456
操作Redis的工具类包装:
@Componentpublic class RedisClient { /** * 获取redis模板 */ @Resource private StringRedisTemplate stringRedisTemplate; /** * 设置redis的key-value */ public void setString(String key, String value) { setString(key, value, null); } /** * 设置redis的key-value,带过期时间 */ public void setString(String key, String value, Long timeOut) { stringRedisTemplate.opsForValue().set(key, value); if (timeOut != null) { stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS); } } /** * 在redis中获得key对应的值 */ public String getString(String key) { return stringRedisTemplate.opsForValue().get(key); } /** * 在redis中删除key对应的值 */ public Boolean deleteKey(String key) { return stringRedisTemplate.delete(key); }}
本文由博客群发等运营工具平台 OpenWrite 发布