首页 > 图灵资讯 > 技术篇>正文
java 使用 mysql-binlog-connector-java 同步Mysql数据
2023-05-12 10:06:55
一、前言
关于Mysql的数据同步,一般的方法是读取Mysql的日志binlog文件,以获取数据变更和同步。
在目前的开源项目中,监控mysql binlog有很多工具:
- mysql-binlog-connector-java
- canal
在这篇文章中,小编将向大家介绍,mysql-binlog-connector-java 的使用。
二、谈生意在最近的项目中,在重建用户系统后,用户系统发现仍然存在一些不稳定的地方。为了减轻用户系统的压力,一些频繁查询的接口被提取到公共服务中,用户系统的一些热点数据被缓存为MEMSQL和redis。
然后使用数据同步,始终监控用户系统的RDS,同步公共服务的Redis和Memsql。
三、mysql-binlog-connector-java 介绍mysql-binlog-connector-java是github上的开源项目。是二进制日志监听器。
日志同步作为mysql,具有以下优点:
- 支持binlog文件的分析,GTID的分析(全局事务id)
- 支持重连
- 支持设置故障转移策略
- 安全使用TLS协议
- JMX-friendly
- 实时监控状态
- 没有第三方依赖
<dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.13.0</version> </dependency>
Code
实体DataConfigg连接到封装数据库
package com.xx.common.component;public class DataConfig { private String javaHost; private int javaPort; private String javaUserName; private String javaPassword; private Long javaPosition; private String javaFileName; public String getJavaHost() { return javaHost; } public void setJavaHost(String javaHost) { this.javaHost = javaHost; } public int getJavaPort() { return javaPort; } public void setJavaPort(int javaPort) { this.javaPort = javaPort; } public String getJavaUserName() { return javaUserName; } public void setJavaUserName(String javaUserName) { this.javaUserName = javaUserName; } public String getJavaPassword() { return javaPassword; } public void setJavaPassword(String javaPassword) { this.javaPassword = javaPassword; } public Long getJavaPosition() { return javaPosition; } public void setJavaPosition(Long javaPosition) { this.javaPosition = javaPosition; } public String getJavaFileName() { return javaFileName; } public void setJavaFileName(String javaFileName) { this.javaFileName = javaFileName; } @Override public String toString() { return "DataConfig{" + "javaHost='" + javaHost + '\'' + ", javaPort=" + javaPort + ", javaUserName='" + javaUserName + '\'' + ", javaPassword='" + javaPassword + '\'' + ", javaPosition=" + javaPosition + ", javaFileName='" + javaFileName + '\'' + '}'; }}
导入前binlog的增量位置
package com.xxx.common.service;import com.github.shyiko.mysql.binlog.network.AuthenticationException;import com.github.shyiko.mysql.binlog.network.ServerException;import com.github.shyiko.mysql.binlog.network.SocketFactory;import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;import com.xxx.common.logger.DushuLogger;import com.xxxx.common.component.DataConfig;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.util.Arrays;import java.util.LinkedList;import java.util.List;import java.util.concurrent.TimeUnit;@Servicepublic class BinLogService { @Resource(name="dataConfig") public DataConfig dataConfig; public void getBinLogPosition() throws Exception{ ////链接数据库服务器 SocketFactory socketFactory = null; Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket(); DushuLogger.info("==========dataConfig=======",dataConfig.toString()); socket.connect(new InetSocketAddress((dataConfig.getJavaHost()), dataConfig.getJavaPort()), (int) TimeUnit.SECONDS.toMillis(3)); PacketChannel channel = new PacketChannel(socket); byte[] initialHandshakePacket = channel.read(); if (initialHandshakePacket[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length); ErrorPacket errorPacket = new ErrorPacket(bytes); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } //认证 GreetingPacket greetingPacket = new GreetingPacket(initialHandshakePacket); int collation = greetingPacket.getServerCollation(); int packetNumber = 1; AuthenticateCommand authenticateCommand = new AuthenticateCommand(null, dataConfig.getJavaUserName(), dataConfig.getJavaPassword(), greetingPacket.getScramble()); authenticateCommand.setCollation(collation); channel.write(authenticateCommand, packetNumber); byte[] authenticationResult = channel.read(); if (authenticationResult[0] != (byte) 0x00 /* ok */) { if (authenticationResult[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(authenticationResult, 1, authenticationResult.length); ErrorPacket errorPacket = new ErrorPacket(bytes); throw new AuthenticationException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } throw new AuthenticationException("Unexpected authentication result (" + authenticationResult[0] + ")"); } //执行mysql命令show执行 master status ResultSetRowPacket[] result; channel.write(new QueryCommand("show master status")); List<ResultSetRowPacket> resultSet = new LinkedList<ResultSetRowPacket>(); byte[] statementResult = channel.read(); if (statementResult[0] == (byte) 0xFF /* error */) { byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); ErrorPacket errorPacket = new ErrorPacket(bytes); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState()); } while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { resultSet.add(new ResultSetRowPacket(bytes)); } result = resultSet.toArray(new ResultSetRowPacket[resultSet.size()]); if (result.length == 0) { throw new IOException("Failed to determine binlog filename/position"); } ResultSetRowPacket resultSetRow = result[0]; String binlogFilename = resultSetRow.getValue(0); long binlogPosition = Long.parseLong(resultSetRow.getValue(1)); DushuLogger.info(“当前binlog文件名称”+binlogFilename+",位置:"+binlogPosition); dataConfig.setJavaFileName(binlogFilename); dataConfig.setJavaPosition(binlogPosition); if (socket != null && != null && !socket.isClosed()) { socket.close(); } }}
启动监听
这里主要通过监控binlog的各种事件来处理先关
package com.xxx.common.service;import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.*;import com.xxxxx.common.component.DataConfig;import com.xxxx.common.logger.DushuLogger;import com.xxxxx.common.util.DataMapManager;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.io.IOException;@Servicepublic class IncrementSyncUserService { @Resource(name="dataConfig") public DataConfig dataConfig; @Autowired private JavaUserService userService; @Autowired private JavaUserInfoService userInfoService; @Autowired private JavaUserWealthService userWealthService; @Autowired private JavaUserTokenService userTokenService; public void increaseSyncUserData() { Thread t = new Thread(new Runnable() { @Override public void run() { DushuLogger.info(”增量数据监听启动..."); DataMapManager manager = DataMapManager.getInstance();//添加对表的监控 manager.register("t_user", userService); manager.register("t_user_info", userInfoService); manager.register("t_user_wealth", userWealthService); manager.register("t_user_token", userTokenService); BinaryLogClient client = new BinaryLogClient(dataConfig.getJavaHost(), dataConfig.getJavaPort(), dataConfig.getJavaUserName(), dataConfig.getJavaPassword()); if (dataConfig.getJavaPosition() != null && dataConfig.getJavaFileName() != null) { ///增量需要指定binlog信息 client.setBinlogFilename(dataConfig.getJavaFileName()); client.setBinlogPosition(dataConfig.getJavaPosition()); } DushuLogger.info("将从"+client.getBinlogFilename()+“文件”+client.getBinlogPosition()+开始注册事件的位置监听); client.registerEventListener(new BinaryLogClient.EventListener() { @Override public void onEvent(Event event) { EventData data = event.getData(); if(data instanceof WriteRowsEventData){ WriteRowsEventData writerData = (WriteRowsEventData)data; try { manager.executeInsert(writerData); } catch (Exception e) { DushuLogger.error(e,e); } } if(data instanceof UpdateRowsEventData){ UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData)data; try { manager.executeUpdate(updateRowsEventData); } catch (Exception e) { DushuLogger.error(e,e); } } //用户信息没有删除逻辑,delete不需要实现 if(data instanceof DeleteRowsEventData){ DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data; try { manager.executeDelete(deleteRowsEventData); } catch (Exception e) { DushuLogger.error(e,e); } } if(data instanceof TableMapEventData){ TableMapEventData tableMapEventData = (TableMapEventData) data; manager.updateTableMap(tableMapEventData); } } }); try { client.connect(); } catch (IOException e) { DushuLogger.error(e,e); } } }); t.start(); }}
DataMapManager
package com.xxxx.common.util;import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;import com.github.shyiko.mysql.binlog.event.TableMapEventData;import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;import java.util.HashMap;import java.util.Map;public class DataMapManager { private static final DataMapManager manager = new DataMapManager(); public static DataMapManager getInstance() { return new DataMapManager(); } private Map<String, DataMapCallback<?>> registerInfo = new HashMap<>(); private Map<Long, String> tableMap = new HashMap<>(); public void register(String tableName, String[] tableRows, DataMapCallback<?> callback) { if (registerInfo.containsKey(tableName)) { throw new RuntimeException(“该表已注册”); } registerInfo.put(tableName, callback); } public void register(String tableName, DataMapCallback<?> callback) { if (registerInfo.containsKey(tableName)) { throw new RuntimeException(“该表已注册”); } registerInfo.put(tableName, callback); } public void executeInsert(WriteRowsEventData data) { String tableName = tableMap.get(data.getTableId()); if (tableName != null) { DataMapCallback<?= null) { DataMapCallback<?> callback = registerInfo.get(tableName); if (callback != null) { callback.doInsertCallback(data.getRows()); } } } public void executeDelete(DeleteRowsEventData data) { String tableName = tableMap.get(data.getTableId()); if (tableName != null) { DataMapCallback<?= null) { DataMapCallback<?> callback = registerInfo.get(tableName); if (callback != null) { callback.doDeleteCallback(data.getRows()); } } } public void executeUpdate(UpdateRowsEventData data) { String tableName = tableMap.get(data.getTableId()); if (tableName != null) { DataMapCallback<?= null) { DataMapCallback<?> callback = registerInfo.get(tableName); if (callback != null) { callback.doUpdateCallback(data.getRows()); } } } public void updateTableMap(TableMapEventData data) { if (!tableMap.containsKey(data.getTableId())) { tableMap.put(data.getTableId(), data.getTable()); } }}
DataMapCallback
package com.xxxxx.common.util;import java.io.Serializable;import java.lang.reflect.ParameterizedType;import java.util.List;import java.util.Map;public abstract class DataMapCallback<T> { private Class<T> rawType; private String[] rows; public abstract void executeInsert(T data); public abstract void executeUpdate(UpdateBean<T> data); public abstract void executeDelete(T data); public String[] getRowNames() { if (rows == null) { rows = DataMapUtil.getRowNameList(getRawType()); } return rows; } public void doInsertCallback(List<Serializable[]> rows) { List<T> list = (List<T>) DataMapUtil.loadInsertData(rows, getRawType(), getRowNames()); for (T t : list) { executeInsert(t); } } public void doDeleteCallback(List<Serializable[]> rows) { List<T> list = (List<T>) DataMapUtil.loadInsertData(rows, getRawType(), getRowNames()); for (T t : list) { executeDelete(t); } } public void doUpdateCallback(List<Map.Entry<Serializable[], Serializable[]>> rows) { List<UpdateBean<T>> list = DataMapUtil.loadUpdateData(rows, getRawType(), getRowNames()); for (UpdateBean<T> t : list) { executeUpdate(t); } } public Class<T> getRawType() { if (rawType == null) { rawType = (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return rawType; }}
DataMapUtil
package com.xxxx.common.util;import com.xxxxx.common.logger.DushuLogger;import com.xxxxx.common.utils.basic.JsonUtil;import org.springframework.util.ObjectUtils;import java.io.Serializable;import java.lang.reflect.Field;import java.text.SimpleDateFormat;import java.util.*;public class DataMapUtil { public static <T> List<T> loadInsertData(List<Serializable[]> rows, Class<T> clazz, String[] rowNames) { List<T> result = new ArrayList<>(); for (Serializable[] data : rows) { T cee = getMapData(data, clazz, rowNames); result.add(cee); } return result; } public static <T> T getMapData(Serializable[] data, Class<T> clazz, String[] rowNames) { StringBuilder sb = new StringBuilder("{"); int index = 0; for (int i = 0; i < rowNames.length; i++) { String name = rowNames[i]; Object d = data[i]; if (d != null) { if (index++ == 0) { sb.append("\"" + name + "\":"); } else { sb.append(",\"" + name + "\":"); } if (BitSet.class.isAssignableFrom(d.getClass())) { BitSet bitSet = (BitSet) d; sb.append("\"" + bitSet.get(0) + "\""); } else if (Date.class.isAssignableFrom(d.getClass())) { Date date = (Date) d; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); String formatDate = simpleDateFormat.format(date); sb.append("\"" + formatDate + "\""); } else { sb.append("\"" + d + "\""); } } } sb.append("}"); T cee = JsonUtil.fromJSON(sb.toString(), clazz); return cee; } public static <T> T diffUpdateData(Map.Entry<Serializable[], Serializable[]> entry, Class<T> clazz, String[] rowNames) throws InstantiationException, IllegalAccessException { StringBuilder sb = new StringBuilder("{"); int index = 0; Serializable[] before = entry.getKey(); Serializable[] after = entry.getValue(); for (int i = 0; i < rowNames.length; i++) { String name = rowNames[i]; Object beforeData = before[i]; Object afterData = after[i]; if (!ObjectUtils.nullSafeEquals(beforeData, afterData)) { if (afterData != null) { if (index++ == 0) { sb.append("\"" + name + "\":"); } else { sb.append(",\"" + name + "\":"); } if (BitSet.class.isAssignableFrom(afterData.getClass())) { BitSet bitSet = (BitSet) afterData; sb.append("\"" + bitSet.get(0) + "\""); } else if (Date.class.isAssignableFrom(afterData.getClass())) { Date date = (Date) afterData; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); String formatDate = simpleDateFormat.format(date); sb.append("\"" + formatDate + "\""); } else { sb.append("\"" + afterData + "\""); } } } } sb.append("}"); T cee = JsonUtil.fromJSON(sb.toString(), clazz); return cee; } public static <T> List<UpdateBean<T>> loadUpdateData(List<Map.Entry<Serializable[], Serializable[]>> rows, Class<T> clazz, String[] rowNames) { List<UpdateBean<T>> result = new ArrayList<>(); for (Map.Entry<Serializable[], Serializable[]> entry : rows) { result.add(getUpdateData(entry, clazz, rowNames)); } return result; } public static <T> UpdateBean<T> getUpdateData(Map.Entry<Serializable[], Serializable[]> entry, Class<T> clazz, String[] rowNames) { UpdateBean<T> updateBean = new UpdateBean<>(); Serializable[] before = entry.getKey(); Serializable[] after = entry.getValue(); T beforeBean = getMapData(before, clazz, rowNames); T afterBean = getMapData(after, clazz, rowNames); try { T diffBean = diffUpdateData(entry, clazz, rowNames); updateBean.setDiffValue(diffBean); } catch (InstantiationException e) { DushuLogger.error(e,e); } catch (IllegalAccessException e) { DushuLogger.error(e,e); } updateBean.setOldValue(beforeBean); updateBean.setNewValue(afterBean); return updateBean; } public static String[] getRowNameList(Class<?> clazz) { Field[] fieldList = clazz.getDeclaredFields(); String[] names = new String[fieldList.length]; for (int i = 0; i < fieldList.length; i++) { names[i] = fieldList[i].getName(); } return names; }}
UpdateBean
package com.xxxx.common.util;public class UpdateBean<T> { private T oldValue; private T newValue; private T diffValue; public T getOldValue() { return oldValue; } public void setOldValue(T oldValue) { this.oldValue = oldValue; } public T getNewValue() { return newValue; } public void setNewValue(T newValue) { this.newValue = newValue; } public T getDiffValue() { return diffValue; } public void setDiffValue(T diffValue) { this.diffValue = diffValue; }}
userService,执行具体的同步操作,同步redis,同步memsql。
package com.xxx.common.service;import com.xxxx.common.dto.JavaUserDTO;import com.xxxx.common.dto.UserDTO;import com.xxxx.common.logger.DushuLogger;import com.xxx.common.mapper.UserSystemMapper;import com.xxxx.common.util.DataMapCallback;import com.xxxxx.common.util.UpdateBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Servicepublic class JavaUserService extends DataMapCallback<JavaUserDTO> { @Autowired private UserSystemMapper userSystemMapper;// @Autowired// private UserMapper userMapper; @Autowired private UserRedisService userRedisService; @Override public void executeInsert(JavaUserDTO data) { Integer userId = data.getUserNo(); UserDTO userDTO = userSystemMapper.getUserById(userId); if (userDTO == null) { return; }// userMapper.insertByBatch(Arrays.asList(userDTO)); userRedisService.insertToRedis(userDTO); DushuLogger.info(Insert事件被触发,新增用户,数据对象[” + userDTO.toString() + "]"); } @Override public void executeUpdate(UpdateBean<JavaUserDTO> data) { JavaUserDTO oldDto = data.getOldValue(); JavaUserDTO newDto = data.getNewValue(); UserDTO userDTO = userSystemMapper.getUserById(newDto.getUserNo()); if (userDTO == null) { return; }// userMapper.updateWithTUser(userDTO); userRedisService.updateWithUserNo(userDTO); userRedisService.updateWithMobile(oldDto.getAreaCode(), oldDto.getMobile(), userDTO); DushuLogger.info(Update事件被触发,用户更新,数据对象[” + userDTO.toStringWithJavaUser() + "]"); } @Override public void executeDelete(JavaUserDTO data) { }}
四、小结
目前,使用仍然稳定,但当事件点有大量数据并发时,数据同步将被延迟。这个错误仍然没有得到优化。以后再进行优化。