服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Mysql - MySQL特定表全量、增量数据同步到消息队列-解决方案

MySQL特定表全量、增量数据同步到消息队列-解决方案

2022-01-12 18:14李雷 Mysql

mysql要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应,下面就为大家分享一下

1、原始需求

既要同步原始全量数据,也要实时同步mysql特定库的特定表增量数据,同时对应的修改、删除也要对应。

数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。

应用场景:数据etl同步、降低业务服务器压力。

2、解决方案

MySQL特定表全量、增量数据同步到消息队列-解决方案

3、canal介绍、安装

canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariadb)。

工作原理:mysql主备复制实现

MySQL特定表全量、增量数据同步到消息队列-解决方案

从上层来看,复制分成三步:

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据。

canal的工作原理

MySQL特定表全量、增量数据同步到消息队列-解决方案

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

架构

MySQL特定表全量、增量数据同步到消息队列-解决方案

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventparser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventsink (parser和store链接器,进行数据过滤,加工,分发的工作)
  • eventstore (数据存储)
  • metamanager (增量订阅&消费信息管理器)

安装

1、mysql、kafka环境准备

2、canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz

4、对目录conf里文件参数配置

对canal.properties配置:

MySQL特定表全量、增量数据同步到消息队列-解决方案

MySQL特定表全量、增量数据同步到消息队列-解决方案

进入conf/example里,对instance.properties配置:

MySQL特定表全量、增量数据同步到消息队列-解决方案

MySQL特定表全量、增量数据同步到消息队列-解决方案

5、启动:bin/startup.sh

6、日志查看:

MySQL特定表全量、增量数据同步到消息队列-解决方案

4、验证

1、开发对应的kafka消费者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package org.kafka;
 
import java.util.arrays;
import java.util.properties;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.apache.kafka.clients.consumer.consumerrecords;
import org.apache.kafka.clients.consumer.kafkaconsumer;
import org.apache.kafka.common.serialization.stringdeserializer;
 
 
/**
 *
 * title: kafkaconsumertest
 * description:
 *  kafka消费者 demo
 * version:1.0.0
 * @author pancm
 * @date 2018年1月26日
 */
public class kafkaconsumertest implements runnable {
 
    private final kafkaconsumer<string, string> consumer;
    private consumerrecords<string, string> msglist;
    private final string topic;
    private static final string groupid = "groupa";
 
    public kafkaconsumertest(string topicname) {
        properties props = new properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", groupid);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", stringdeserializer.class.getname());
        props.put("value.deserializer", stringdeserializer.class.getname());
        this.consumer = new kafkaconsumer<string, string>(props);
        this.topic = topicname;
        this.consumer.subscribe(arrays.aslist(topic));
    }
 
    @override
    public void run() {
        int messageno = 1;
        system.out.println("---------开始消费---------");
        try {
            for (; ; ) {
                msglist = consumer.poll(1000);
                if (null != msglist && msglist.count() > 0) {
                    for (consumerrecord<string, string> record : msglist) {
                        //消费100条就打印 ,但打印的数据不一定是这个规律的
 
                            system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
 
 
//                            string v = decodeunicode(record.value());
 
//                            system.out.println(v);
 
                        //当消费了1000条就退出
                        if (messageno % 1000 == 0) {
                            break;
                        }
                        messageno++;
                    }
                } else {
                    thread.sleep(11);
                }
            }
        } catch (interruptedexception e) {
            e.printstacktrace();
        } finally {
            consumer.close();
        }
    }
 
    public static void main(string args[]) {
        kafkaconsumertest test1 = new kafkaconsumertest("sample-data");
        thread thread1 = new thread(test1);
        thread1.start();
    }
 
 
    /*
     * 中文转unicode编码
     */
    public static string gbencoding(final string gbstring) {
        char[] utfbytes = gbstring.tochararray();
        string unicodebytes = "";
        for (int i = 0; i < utfbytes.length; i++) {
            string hexb = integer.tohexstring(utfbytes[i]);
            if (hexb.length() <= 2) {
                hexb = "00" + hexb;
            }
            unicodebytes = unicodebytes + "\\u" + hexb;
        }
        return unicodebytes;
    }
 
    /*
     * unicode编码转中文
     */
    public static string decodeunicode(final string datastr) {
        int start = 0;
        int end = 0;
        final stringbuffer buffer = new stringbuffer();
        while (start > -1) {
            end = datastr.indexof("\\u", start + 2);
            string charstr = "";
            if (end == -1) {
                charstr = datastr.substring(start + 2, datastr.length());
            } else {
                charstr = datastr.substring(start + 2, end);
            }
            char letter = (char) integer.parseint(charstr, 16); // 16进制parse整形字符串。
            buffer.append(new character(letter).tostring());
            start = end;
        }
        return buffer.tostring();
 
    }
}

2、对表bak1进行增加数据

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
create table `bak1` (
  `vin` varchar(20) not null,
  `p1` double default null,
  `p2` double default null,
  `p3` double default null,
  `p4` double default null,
  `p5` double default null,
  `p6` double default null,
  `p7` double default null,
  `p8` double default null,
  `p9` double default null,
  `p0` double default null
) engine=innodb default charset=utf8mb4
 
show create table bak1;
 
insert into bak1 select '李雷abcv',
  `p1` ,
  `p2` ,
  `p3` ,
  `p4` ,
  `p5` ,
  `p6` ,
  `p7` ,
  `p8` ,
  `p9` ,
  `p0`  from moci limit 10

3、查看输出结果:

MySQL特定表全量、增量数据同步到消息队列-解决方案

到此这篇关于mysql特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关mysql特定表数据同步内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/lilei2blog/p/15608206.html

延伸 · 阅读

精彩推荐
  • MysqlMySQL 数据备份与还原的示例代码

    MySQL 数据备份与还原的示例代码

    这篇文章主要介绍了MySQL 数据备份与还原的相关知识,本文通过示例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下...

    逆心2972019-06-23
  • MysqlMySQL数据库varchar的限制规则说明

    MySQL数据库varchar的限制规则说明

    本文我们主要介绍了MySQL数据库中varchar的限制规则,并以一个实际的例子对限制规则进行了说明,希望能够对您有所帮助。 ...

    mysql技术网4192019-11-23
  • Mysqlmysql 不能插入中文问题

    mysql 不能插入中文问题

    当向mysql5.5插入中文时,会出现类似错误 ERROR 1366 (HY000): Incorrect string value: '\xD6\xD0\xCE\xC4' for column ...

    MYSQL教程网5722019-11-25
  • Mysql详解MySQL中的分组查询与连接查询语句

    详解MySQL中的分组查询与连接查询语句

    这篇文章主要介绍了MySQL中的分组查询与连接查询语句,同时还介绍了一些统计函数的用法,需要的朋友可以参考下 ...

    GALAXY_ZMY5442020-06-03
  • Mysql解决MySQl查询不区分大小写的方法讲解

    解决MySQl查询不区分大小写的方法讲解

    今天小编就为大家分享一篇关于解决MySQl查询不区分大小写的方法讲解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起...

    Veir_dev5592019-06-25
  • Mysql浅谈mysql 树形结构表设计与优化

    浅谈mysql 树形结构表设计与优化

    在诸多的管理类,办公类等系统中,树形结构展示随处可见,本文主要介绍了mysql 树形结构表设计与优化,具有一定的参考价值,感兴趣的小伙伴们可以参...

    小码农叔叔5242021-11-16
  • MysqlMySQL锁的知识点总结

    MySQL锁的知识点总结

    在本篇文章里小编给大家整理了关于MySQL锁的知识点总结以及实例内容,需要的朋友们学习下。...

    别人放弃我坚持吖4362020-12-14
  • MysqlERROR: Error in Log_event::read_log_event()

    ERROR: Error in Log_event::read_log_event()

    ERROR: Error in Log_event::read_log_event(): read error, data_len: 438, event_type: 2 ...

    MYSQL教程网6412020-03-13