Boot/Cloud使用阿里canal.client组件让数据同步到Redis

蚊子 2023年03月10日 355次浏览

前言

需要的工具请前往:https://www.0po.cn/archives/30

还需要阿里canal.client软件,这里提供1.1.5版本:https://wwba.lanzoum.com/ie2UG0prasbc

作用:数据库修改了某一个东西,立即同步到Redis,如果Redis原来没有这个数据,也可以同步,有一点延迟

所有路径都不能有中文


Mysql

打开Mysql的my.ini,划到最后一行,添加三行代码,注意看注解,添加完要重启Mysql

#阿里组件需要的
log-bin=D:\BtSoft\mysql\MySQL5.7\data   #自己电脑的mysql路径
binlog-format=ROW
server_id=1

image-1678432280315


阿里canal.client软件

打开\conf\example\instance.properties文件,修改成这里Mysql的账号密码
image-1678432729432

准备Lua文件

作用:数据库修改了某一个东西,立即同步到Redis,如果Redis原来没有这个数据,也可以同步,有一点延迟

注意看注释

ngx.header.content_type="application/json;charset=utf8"
--同步
local function close_redis(red)
    if not red then
        return
    end
    -- 释放连接(连接池实现),毫秒
    local pool_max_idle_time = 10000 
    -- 连接池大小
    local pool_size = 100 
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    local log = ngx_log
    if not ok then
        log(ngx_ERR, "set redis keepalive error : ", err)
    end
end

local uri_args = ngx.req.get_uri_args()
local cid = uri_args['cid']

local mysqlModel = require("resty.mysql")
local db = mysqlModel:new()
db:set_timeout(1000)
--数据库配置
local ok = db:connect{
	host="127.0.0.1",
	port=3306,
	database="shop_content",
	user="root",
	password="ok"
}

if not ok then
	ngx.say('链接失败')
	db:close()
	return false;
end 
--查询语句
res = db:query("SELECT * FROM `tb_content` WHERE category_id="..cid)

local cjson = require("cjson")
--ngx.say(cjson.encode(res))

db:close()


local redisModel = require("resty.redis")
local redis = redisModel.new()
redis:set_timeout(1000)
--Redis连接
local ok = redis:connect('127.0.0.1',6379)

if not ok then
	ngx.say('链接redis失败')
	return close_redis(redis)
end 

--切换第一个Redis表
redis:select(0)

redis:set("content:"..cid,cjson.encode(res))


close_redis(redis)
ngx.say("{'flag':'success'}")

Java文件

单线程版

没说要动的,就不要动

结构图
image-1678699369484

依赖

   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client-adapter</artifactId>
        <version>1.1.2</version>
        <type>pom</type>
    </dependency>

代码,57-72行注意看,67行要换成自已的lua

package com.zb.util;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class ClientSample {

    @Autowired
    private RestTemplate restTemplate;

    public void main() {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            //创建连接
            connector.connect();
            //监听mysql所有的库和表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            boolean flag = true;
            while (flag) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                //用户没有更改数据库中的数据
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //获取修改的每一条记录
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
            }
        } finally {
            connector.disconnect();
        }
    }

    public void updateContentSync(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("category_id")) {
                System.out.println("数据同步准备完毕,请开始操作");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String cid = column.getValue();
                String url = "http://localhost:9098/mysave?cid=" + cid;
                String data = restTemplate.getForObject(url, String.class);
                System.out.println(data);
            }
        }
    }

    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            //检查到当前执行的代码是事物操作, 跳转下次
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            //代码固定,获取rowChage对象
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            //rowChage getEventType 获取事件类型对象
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            if (entry.getHeader().getSchemaName().equals("shop_content") && entry.getHeader().getTableName().equals("tb_content")) {
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        //rowData.getBeforeColumnsList()获取删除之前的数据
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        //rowData.getAfterColumnsList()获取添加之后的数据
                        //asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
                    } else {
                        System.out.println("1---");
                        updateContentSync(rowData.getBeforeColumnsList());
                        System.out.println("3---");
                    }
                }
            }

        }
    }


    private void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "  update=" + column.getUpdated());
        }
    }
}

启动类

package com.zb;

import com.zb.util.ClientSample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableAsync //开启异步
public class ContentApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(ContentApplication.class, args);
        ClientSample bean = run.getBean(ClientSample.class);
        bean.main();
    }
    @Bean
    public RestTemplate createRestTemplate() {
        return new RestTemplate();
    }
}

Java多线程版:

没说要动,不要动

结构:
image-1678433402367
新增依赖

    <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client-adapter</artifactId>
            <version>1.1.2</version>
            <type>pom</type>
        </dependency>

application.yml

无需额外新增

AsyncProccess工具类
28行需要改动

package com.zb.util;

import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Component
public class AsyncProccess {

    @Autowired
    private RestTemplate restTemplate;

    @Async
    public void updateContentSync(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("category_id")) {
                System.out.println("数据同步准备完毕,请开始操作");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String cid = column.getValue();
                String url = "http://localhost:9090/mysave?cid=" + cid;
                String data = restTemplate.getForObject(url, String.class);
                System.out.println(data);
            }
        }
    }
}

ClientSample工具类

package com.zb.util;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class ClientSample {

    @Autowired
    private AsyncProccess asyncProccess;

    public void main() {
        System.out.println("开启同步");
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        try {
            //创建连接
            connector.connect();
            //监听mysql所有的库和表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            boolean flag = true;
            while (flag) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                //用户没有更改数据库中的数据
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //获取修改的每一条记录
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
            }
        } finally {
            connector.disconnect();
        }
    }


    private void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            //检查到当前执行的代码是事物操作, 跳转下次
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            //代码固定,获取rowChage对象
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            //rowChage getEventType 获取事件类型对象
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            if (entry.getHeader().getSchemaName().equals("shop_content") && entry.getHeader().getTableName().equals("tb_content")) {
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        //rowData.getBeforeColumnsList()获取删除之前的数据
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        //rowData.getAfterColumnsList()获取添加之后的数据
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("1---");
                        asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
                        System.out.println("3---");
                    }
                }
            }

        }
    }


    private void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "  update=" + column.getUpdated());
        }
    }
}

启动类

package com.zb;

import com.zb.util.ClientSample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync //开启异步
public class ContentApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(ContentApplication.class, args);
        ClientSample bean = run.getBean(ClientSample.class);
        bean.main();
    }

    @Bean
    public RestTemplate createRestTemplate() {
        return new RestTemplate();
    }


}


恭喜你,一切都已完成