当前位置: 首页 > 图灵资讯 > 技术篇> java 使用 mysql-binlog-connector-java 同步Mysql数据

java 使用 mysql-binlog-connector-java 同步Mysql数据

来源:图灵教育
时间:2023-05-12 10:06:55
一、前言

关于Mysql的数据同步,一般的方法是读取Mysql的日志binlog文件,以获取数据变更和同步。

在目前的开源项目中,监控mysql binlog有很多工具:

  • mysql-binlog-connector-java
  • canal

在这篇文章中,小编将向大家介绍,mysql-binlog-connector-java 的使用。

二、谈生意

在最近的项目中,在重建用户系统后,用户系统发现仍然存在一些不稳定的地方。为了减轻用户系统的压力,一些频繁查询的接口被提取到公共服务中,用户系统的一些热点数据被缓存为MEMSQL和redis。

然后使用数据同步,始终监控用户系统的RDS,同步公共服务的Redis和Memsql。

三、mysql-binlog-connector-java 介绍

mysql-binlog-connector-java是github上的开源项目。是二进制日志监听器。

日志同步作为mysql,具有以下优点:

  • 支持binlog文件的分析,GTID的分析(全局事务id)
  • 支持重连
  • 支持设置故障转移策略
  • 安全使用TLS协议
  • JMX-friendly
  • 实时监控状态
  • 没有第三方依赖
三、使用引入依赖
<dependency>            <groupId>com.github.shyiko</groupId>            <artifactId>mysql-binlog-connector-java</artifactId>            <version>0.13.0</version>        </dependency>
Code

实体DataConfigg连接到封装数据库

package com.xx.common.component;public class DataConfig {    private String javaHost;    private int javaPort;    private String javaUserName;    private String javaPassword;    private Long javaPosition;    private String javaFileName;    public String getJavaHost() {        return javaHost;    }    public void setJavaHost(String javaHost) {        this.javaHost = javaHost;    }    public int getJavaPort() {        return javaPort;    }    public void setJavaPort(int javaPort) {        this.javaPort = javaPort;    }    public String getJavaUserName() {        return javaUserName;    }    public void setJavaUserName(String javaUserName) {        this.javaUserName = javaUserName;    }    public String getJavaPassword() {        return javaPassword;    }    public void setJavaPassword(String javaPassword) {        this.javaPassword = javaPassword;    }    public Long getJavaPosition() {        return javaPosition;    }    public void setJavaPosition(Long javaPosition) {        this.javaPosition = javaPosition;    }    public String getJavaFileName() {        return javaFileName;    }    public void setJavaFileName(String javaFileName) {        this.javaFileName = javaFileName;    }    @Override    public String toString() {        return "DataConfig{" +                "javaHost='" + javaHost + '\'' +                ", javaPort=" + javaPort +                ", javaUserName='" + javaUserName + '\'' +                ", javaPassword='" + javaPassword + '\'' +                ", javaPosition=" + javaPosition +                ", javaFileName='" + javaFileName + '\'' +                '}';    }}

导入前binlog的增量位置

package com.xxx.common.service;import com.github.shyiko.mysql.binlog.network.AuthenticationException;import com.github.shyiko.mysql.binlog.network.ServerException;import com.github.shyiko.mysql.binlog.network.SocketFactory;import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;import com.xxx.common.logger.DushuLogger;import com.xxxx.common.component.DataConfig;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.util.Arrays;import java.util.LinkedList;import java.util.List;import java.util.concurrent.TimeUnit;@Servicepublic class BinLogService {    @Resource(name="dataConfig")    public DataConfig dataConfig;    public void getBinLogPosition() throws Exception{        ////链接数据库服务器        SocketFactory socketFactory = null;        Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();        DushuLogger.info("==========dataConfig=======",dataConfig.toString());                socket.connect(new InetSocketAddress((dataConfig.getJavaHost()), dataConfig.getJavaPort()), (int) TimeUnit.SECONDS.toMillis(3));        PacketChannel channel = new PacketChannel(socket);        byte[] initialHandshakePacket = channel.read();        if (initialHandshakePacket[0] == (byte) 0xFF /* error */) {            byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length);            ErrorPacket errorPacket = new ErrorPacket(bytes);            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),                    errorPacket.getSqlState());        }        //认证        GreetingPacket greetingPacket = new GreetingPacket(initialHandshakePacket);        int collation = greetingPacket.getServerCollation();        int packetNumber = 1;        AuthenticateCommand authenticateCommand = new AuthenticateCommand(null, dataConfig.getJavaUserName(), dataConfig.getJavaPassword(),                greetingPacket.getScramble());        authenticateCommand.setCollation(collation);        channel.write(authenticateCommand, packetNumber);        byte[] authenticationResult = channel.read();        if (authenticationResult[0] != (byte) 0x00 /* ok */) {            if (authenticationResult[0] == (byte) 0xFF /* error */) {                byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length);                ErrorPacket errorPacket = new ErrorPacket(bytes);                throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),                        errorPacket.getSqlState());            }            throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")");        }        //执行mysql命令show执行 master status        ResultSetRowPacket[] result;        channel.write(new QueryCommand("show master status"));        List<ResultSetRowPacket> resultSet = new LinkedList<ResultSetRowPacket>();        byte[] statementResult = channel.read();        if (statementResult[0] == (byte) 0xFF /* error */) {            byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length);            ErrorPacket errorPacket = new ErrorPacket(bytes);            throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),                    errorPacket.getSqlState());        }        while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ }        for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) {            resultSet.add(new ResultSetRowPacket(bytes));        }        result = resultSet.toArray(new ResultSetRowPacket[resultSet.size()]);        if (result.length == 0) {            throw new IOException("Failed to determine binlog filename/position");        }        ResultSetRowPacket resultSetRow = result[0];        String binlogFilename = resultSetRow.getValue(0);        long binlogPosition = Long.parseLong(resultSetRow.getValue(1));        DushuLogger.info(“当前binlog文件名称”+binlogFilename+",位置:"+binlogPosition);        dataConfig.setJavaFileName(binlogFilename);        dataConfig.setJavaPosition(binlogPosition);        if (socket != null && != null && !socket.isClosed()) {            socket.close();        }    }}

启动监听

这里主要通过监控binlog的各种事件来处理先关

package com.xxx.common.service;import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.*;import com.xxxxx.common.component.DataConfig;import com.xxxx.common.logger.DushuLogger;import com.xxxxx.common.util.DataMapManager;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.io.IOException;@Servicepublic class IncrementSyncUserService {    @Resource(name="dataConfig")    public DataConfig dataConfig;    @Autowired    private JavaUserService userService;    @Autowired    private JavaUserInfoService userInfoService;    @Autowired    private JavaUserWealthService userWealthService;    @Autowired    private JavaUserTokenService userTokenService;    public void increaseSyncUserData() {        Thread t = new Thread(new Runnable() {            @Override            public void run() {                DushuLogger.info(”增量数据监听启动...");                DataMapManager manager = DataMapManager.getInstance();//添加对表的监控                manager.register("t_user", userService);                manager.register("t_user_info", userInfoService);                manager.register("t_user_wealth", userWealthService);                manager.register("t_user_token", userTokenService);                BinaryLogClient client = new BinaryLogClient(dataConfig.getJavaHost(), dataConfig.getJavaPort(), dataConfig.getJavaUserName(), dataConfig.getJavaPassword());                if (dataConfig.getJavaPosition() != null && dataConfig.getJavaFileName() != null) {                    ///增量需要指定binlog信息                    client.setBinlogFilename(dataConfig.getJavaFileName());                    client.setBinlogPosition(dataConfig.getJavaPosition());                }                DushuLogger.info("将从"+client.getBinlogFilename()+“文件”+client.getBinlogPosition()+开始注册事件的位置监听);                client.registerEventListener(new BinaryLogClient.EventListener() {                    @Override                    public void onEvent(Event event) {                        EventData data = event.getData();                        if(data instanceof WriteRowsEventData){                            WriteRowsEventData writerData = (WriteRowsEventData)data;                            try {                                manager.executeInsert(writerData);                            } catch (Exception e) {                                DushuLogger.error(e,e);                            }                        }                        if(data instanceof UpdateRowsEventData){                            UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData)data;                            try {                                manager.executeUpdate(updateRowsEventData);                            } catch (Exception e) {                                DushuLogger.error(e,e);                            }                        }                        //用户信息没有删除逻辑,delete不需要实现                        if(data instanceof DeleteRowsEventData){                            DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;                            try {                                manager.executeDelete(deleteRowsEventData);                            } catch (Exception e) {                                DushuLogger.error(e,e);                            }                        }                        if(data instanceof TableMapEventData){                            TableMapEventData tableMapEventData = (TableMapEventData) data;                            manager.updateTableMap(tableMapEventData);                        }                    }                });                try {                    client.connect();                } catch (IOException e) {                    DushuLogger.error(e,e);                }            }        });        t.start();    }}

DataMapManager

package com.xxxx.common.util;import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;import com.github.shyiko.mysql.binlog.event.TableMapEventData;import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;import java.util.HashMap;import java.util.Map;public class DataMapManager {    private static final DataMapManager manager = new DataMapManager();    public static DataMapManager getInstance() {        return new DataMapManager();    }    private Map<String, DataMapCallback<?>> registerInfo = new HashMap<>();    private Map<Long, String> tableMap = new HashMap<>();    public void register(String tableName, String[] tableRows, DataMapCallback<?> callback) {        if (registerInfo.containsKey(tableName)) {            throw new RuntimeException(“该表已注册”);        }        registerInfo.put(tableName, callback);    }    public void register(String tableName, DataMapCallback<?> callback) {        if (registerInfo.containsKey(tableName)) {            throw new RuntimeException(“该表已注册”);        }        registerInfo.put(tableName, callback);    }    public void executeInsert(WriteRowsEventData data) {        String tableName = tableMap.get(data.getTableId());        if (tableName != null) {            DataMapCallback<?= null) {            DataMapCallback<?> callback = registerInfo.get(tableName);            if (callback != null) {                callback.doInsertCallback(data.getRows());            }        }    }    public void executeDelete(DeleteRowsEventData data) {        String tableName = tableMap.get(data.getTableId());        if (tableName != null) {            DataMapCallback<?= null) {            DataMapCallback<?> callback = registerInfo.get(tableName);            if (callback != null) {                callback.doDeleteCallback(data.getRows());            }        }    }    public void executeUpdate(UpdateRowsEventData data) {        String tableName = tableMap.get(data.getTableId());        if (tableName != null) {            DataMapCallback<?= null) {            DataMapCallback<?> callback = registerInfo.get(tableName);            if (callback != null) {                callback.doUpdateCallback(data.getRows());            }        }    }    public void updateTableMap(TableMapEventData data) {        if (!tableMap.containsKey(data.getTableId())) {            tableMap.put(data.getTableId(), data.getTable());        }    }}

DataMapCallback

package com.xxxxx.common.util;import java.io.Serializable;import java.lang.reflect.ParameterizedType;import java.util.List;import java.util.Map;public abstract class DataMapCallback<T> {    private Class<T> rawType;    private String[] rows;    public abstract void executeInsert(T data);    public abstract void executeUpdate(UpdateBean<T> data);    public abstract void executeDelete(T data);    public String[] getRowNames() {        if (rows == null) {            rows = DataMapUtil.getRowNameList(getRawType());        }        return rows;    }    public void doInsertCallback(List<Serializable[]> rows) {        List<T> list = (List<T>) DataMapUtil.loadInsertData(rows, getRawType(), getRowNames());        for (T t : list) {            executeInsert(t);        }    }    public void doDeleteCallback(List<Serializable[]> rows) {        List<T> list = (List<T>) DataMapUtil.loadInsertData(rows, getRawType(), getRowNames());        for (T t : list) {            executeDelete(t);        }    }    public void doUpdateCallback(List<Map.Entry<Serializable[], Serializable[]>> rows) {        List<UpdateBean<T>> list = DataMapUtil.loadUpdateData(rows, getRawType(), getRowNames());        for (UpdateBean<T> t : list) {            executeUpdate(t);        }    }    public Class<T> getRawType() {        if (rawType == null) {            rawType = (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];        }        return rawType;    }}

DataMapUtil

package com.xxxx.common.util;import com.xxxxx.common.logger.DushuLogger;import com.xxxxx.common.utils.basic.JsonUtil;import org.springframework.util.ObjectUtils;import java.io.Serializable;import java.lang.reflect.Field;import java.text.SimpleDateFormat;import java.util.*;public class DataMapUtil {    public static <T> List<T> loadInsertData(List<Serializable[]> rows, Class<T> clazz, String[] rowNames) {        List<T> result = new ArrayList<>();        for (Serializable[] data : rows) {            T cee = getMapData(data, clazz, rowNames);            result.add(cee);        }        return result;    }    public static <T> T getMapData(Serializable[] data, Class<T> clazz, String[] rowNames) {        StringBuilder sb = new StringBuilder("{");        int index = 0;        for (int i = 0; i < rowNames.length; i++) {            String name = rowNames[i];            Object d = data[i];            if (d != null) {                if (index++ == 0) {                    sb.append("\"" + name + "\":");                } else {                    sb.append(",\"" + name + "\":");                }                if (BitSet.class.isAssignableFrom(d.getClass())) {                    BitSet bitSet = (BitSet) d;                    sb.append("\"" + bitSet.get(0) + "\"");                } else if (Date.class.isAssignableFrom(d.getClass())) {                    Date date = (Date) d;                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                    simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));                    String formatDate = simpleDateFormat.format(date);                    sb.append("\"" + formatDate + "\"");                } else {                    sb.append("\"" + d + "\"");                }            }        }        sb.append("}");        T cee = JsonUtil.fromJSON(sb.toString(), clazz);        return cee;    }    public static <T> T diffUpdateData(Map.Entry<Serializable[], Serializable[]> entry, Class<T> clazz, String[] rowNames) throws InstantiationException, IllegalAccessException {        StringBuilder sb = new StringBuilder("{");        int index = 0;        Serializable[] before = entry.getKey();        Serializable[] after = entry.getValue();        for (int i = 0; i < rowNames.length; i++) {            String name = rowNames[i];            Object beforeData = before[i];            Object afterData = after[i];            if (!ObjectUtils.nullSafeEquals(beforeData, afterData)) {                if (afterData != null) {                    if (index++ == 0) {                        sb.append("\"" + name + "\":");                    } else {                        sb.append(",\"" + name + "\":");                    }                    if (BitSet.class.isAssignableFrom(afterData.getClass())) {                        BitSet bitSet = (BitSet) afterData;                        sb.append("\"" + bitSet.get(0) + "\"");                    } else if (Date.class.isAssignableFrom(afterData.getClass())) {                        Date date = (Date) afterData;                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));                        String formatDate = simpleDateFormat.format(date);                        sb.append("\"" + formatDate + "\"");                    } else {                        sb.append("\"" + afterData + "\"");                    }                }            }        }        sb.append("}");        T cee = JsonUtil.fromJSON(sb.toString(), clazz);        return cee;    }    public static <T> List<UpdateBean<T>> loadUpdateData(List<Map.Entry<Serializable[], Serializable[]>> rows, Class<T> clazz, String[] rowNames) {        List<UpdateBean<T>> result = new ArrayList<>();        for (Map.Entry<Serializable[], Serializable[]> entry : rows) {            result.add(getUpdateData(entry, clazz, rowNames));        }        return result;    }    public static <T> UpdateBean<T> getUpdateData(Map.Entry<Serializable[], Serializable[]> entry, Class<T> clazz, String[] rowNames) {        UpdateBean<T> updateBean = new UpdateBean<>();        Serializable[] before = entry.getKey();        Serializable[] after = entry.getValue();        T beforeBean = getMapData(before, clazz, rowNames);        T afterBean = getMapData(after, clazz, rowNames);        try {            T diffBean = diffUpdateData(entry, clazz, rowNames);            updateBean.setDiffValue(diffBean);        } catch (InstantiationException e) {            DushuLogger.error(e,e);        } catch (IllegalAccessException e) {            DushuLogger.error(e,e);        }        updateBean.setOldValue(beforeBean);        updateBean.setNewValue(afterBean);        return updateBean;    }    public static String[] getRowNameList(Class<?> clazz) {        Field[] fieldList = clazz.getDeclaredFields();        String[] names = new String[fieldList.length];        for (int i = 0; i < fieldList.length; i++) {            names[i] = fieldList[i].getName();        }        return names;    }}

UpdateBean

package com.xxxx.common.util;public class UpdateBean<T> {    private T oldValue;        private T newValue;        private T diffValue;    public T getOldValue() {        return oldValue;    }    public void setOldValue(T oldValue) {        this.oldValue = oldValue;    }    public T getNewValue() {        return newValue;    }    public void setNewValue(T newValue) {        this.newValue = newValue;    }    public T getDiffValue() {        return diffValue;    }    public void setDiffValue(T diffValue) {        this.diffValue = diffValue;    }}

userService,执行具体的同步操作,同步redis,同步memsql。

package com.xxx.common.service;import com.xxxx.common.dto.JavaUserDTO;import com.xxxx.common.dto.UserDTO;import com.xxxx.common.logger.DushuLogger;import com.xxx.common.mapper.UserSystemMapper;import com.xxxx.common.util.DataMapCallback;import com.xxxxx.common.util.UpdateBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class JavaUserService extends DataMapCallback<JavaUserDTO> {    @Autowired    private UserSystemMapper userSystemMapper;//    @Autowired//    private UserMapper userMapper;    @Autowired    private UserRedisService userRedisService;    @Override    public void executeInsert(JavaUserDTO data) {        Integer userId = data.getUserNo();        UserDTO userDTO = userSystemMapper.getUserById(userId);        if (userDTO == null) {            return;        }//        userMapper.insertByBatch(Arrays.asList(userDTO));        userRedisService.insertToRedis(userDTO);        DushuLogger.info(Insert事件被触发,新增用户,数据对象[” + userDTO.toString() + "]");    }    @Override    public void executeUpdate(UpdateBean<JavaUserDTO> data) {        JavaUserDTO oldDto = data.getOldValue();        JavaUserDTO newDto = data.getNewValue();        UserDTO userDTO = userSystemMapper.getUserById(newDto.getUserNo());        if (userDTO == null) {            return;        }//        userMapper.updateWithTUser(userDTO);        userRedisService.updateWithUserNo(userDTO);        userRedisService.updateWithMobile(oldDto.getAreaCode(), oldDto.getMobile(), userDTO);        DushuLogger.info(Update事件被触发,用户更新,数据对象[” + userDTO.toStringWithJavaUser() + "]");    }    @Override    public void executeDelete(JavaUserDTO data) {    }}
四、小结

目前,使用仍然稳定,但当事件点有大量数据并发时,数据同步将被延迟。这个错误仍然没有得到优化。以后再进行优化。