当前位置: 首页 > 图灵资讯 > 技术篇> Canal+Kafka实现MySQL与Redis数据同步(一)

Canal+Kafka实现MySQL与Redis数据同步(一)

来源:图灵教育
时间:2023-11-19 17:30:32

Canal+Kafka实现MySQL与Redis数据同步(1)

img

前言

在许多业务条件下,我们将在系统中添加redis缓存进行查询优化。

如果数据库数据更新,则需要在业务代码中同步更新redis代码。

这种数据同步代码与业务代码结合在一起会不是很优雅。答案是肯定的,可以抽出这些数据同步代码形成独立的模块。

架构图

canal是伪装成slave订阅mysql的binlog,实现数据同步的中间件。

使用canal最简单的方法是tcp模式。

事实上,canal支持直接发送到MQ。目前,最新版本支持三种主流MQ:Kafka、RocketMQ、RabbitMQ。目前canal的RabitMQ模式有一定的bug,所以一般使用Kafka或RocketMQ。

img

Kafka用于实现Redis与MySQL的数据同步。架构图如下:

img

通过架构图,我们清楚地知道要使用的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

搭建Kafka

首先,在官网下载安装包:

img

解压,打开/config/server.properties配置文件,修改日志目录:

首先启动Zookeeper,我用3.6.1版本:

img

然后启动Kafka,在Kafkabin目录下打开cmd,输入命令:

kafka-server-start.bat ../../config/server.properties

在Zookeeeper上注册了Kafka相关配置信息:

img

然后创建一个接收canal传输的数据的队列,并使用命令:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic

创建的队列名是canaltopic

img

配置Cannal Server

canal官网下载相关安装包:

img

找到canal.deployer-1.1.4/conf目录下的canal.properties配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式canalal.serverMode = kafka# 分析器的线程数,打开此配置,不打开就会堵塞或不进行分析.instance.parser.parallelThreadSize = 16# 配置MQ服务地址,这里配置了kafka对应的地址和端口canal.mq.servers = 127.0.0.1:9092# 配置instance,在conf目录中有example同名目录,可配置多个canal.destinations = example

然后配置instance,找到/conf/example/instance.properties配置文件:

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql中执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canall.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0

配置完成后,可以启动canal。

测试

这时可以打开kafka的消费者窗口,测试kafka是否收到消息。

监控消费使用命令:

kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic

这里使用win10系统的cmd命令行,win10系统的默认编码是GBK,而canal Server是UTF-8编码,所以控制台会出现乱码:

img

在执行cmd命令之前,切换到UTF-8编码,使用命令:chcp 65001

然后执行打开kafka消费端的命令,不乱码:

img

下一步是启动Redis,将数据同步到Redis。

Redis客户端封装

环境建设完成后,我们可以编写代码。

首先引入Kafka和Redis的maven依赖:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

application.添加以下配置:yml文件:

spring:    redis:    host: 127.0.0.1    port: 6379    database: 0    password: 123456

操作Redis的工具类包装:

@Componentpublic class RedisClient {    /**     * 获取redis模板     */    @Resource    private StringRedisTemplate stringRedisTemplate;    /**     * 设置redis的key-value     */    public void setString(String key, String value) {        setString(key, value, null);    }    /**     * 设置redis的key-value,带过期时间     */    public void setString(String key, String value, Long timeOut) {        stringRedisTemplate.opsForValue().set(key, value);        if (timeOut != null) {            stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);        }    }    /**     * 在redis中获得key对应的值     */    public String getString(String key) {        return stringRedisTemplate.opsForValue().get(key);    }    /**     * 在redis中删除key对应的值     */    public Boolean deleteKey(String key) {        return stringRedisTemplate.delete(key);    }}

本文由博客群发等运营工具平台 OpenWrite 发布