缓存与数据库一致性问题如何解决

最近不是正好在研究 canal 嘛,刚巧前两天看了一篇关于解决缓存与数据库一致性问题的文章,里边提到了一种解决方案是结合 canal 来操作的,所以阿q就想趁热打铁,手动来实现一下。
架构文中提到的思想是:
采用先更新数据库,后删除缓存的方式来解决并发引发的一致性问题;采用异步重试的方式来保证“更新数据库、删除缓存”这两步都能执行成功;可以采用订阅变更日志的方式来清除 redis 中的缓存;基于这种思想,阿q脑海中搭建了以下架构
app 从 redis 中查询信息,将数据的更新写入 mysql 数据库中;canal 向 mysql 发送 dump 协议,接收 binlog 推送的数据;canal 将接收到的数据投递给 mq 消息队列;mq 消息队列消费消息,同时删除 redis 中对应数据的缓存;环境准备这篇文章中有 mysql 的安装教程:mysql 安装
这篇文章中有 canal 的安装教程以及对 mysql 的相关配置:canal安装
考虑到我们服务器之前安装过 rabbitmq ,所以我们就用 rabbitmq 来充当消息队列吧。
canal 配置修改 conf/canal.properties 配置
# 指定模式canal.servermode = rabbitmq# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2canal.destinations = example # rabbitmq 服务端 iprabbitmq.host = 127.0.0.1# rabbitmq 虚拟主机 rabbitmq.virtual.host = / # rabbitmq 交换机 rabbitmq.exchange = xxx# rabbitmq 用户名rabbitmq.username = xxx# rabbitmq 密码rabbitmq.password = xxxrabbitmq.deliverymode =修改实例配置文件 conf/example/instance.properties
#配置 slaveid,自定义,不等于 mysql 的 server id 即可canal.instance.mysql.slaveid=10 # 数据库地址:配置自己的ip和端口canal.instance.master.address=ip:port # 数据库用户名和密码 canal.instance.dbusername=xxx canal.instance.dbpassword=xxx # 指定库和表canal.instance.filter.regex=.*\\\\..* // 这里的 .* 表示 canal.instance.master.address 下面的所有数据库 # mq config# rabbitmq 的 routing keycanal.mq.topic=xxx然后重启 canal 服务。
数据库建表语句
create table `product_info` ( `id` bigint(20) not null auto_increment, `name` varchar(255) default null, `price` decimal(10,4) default null, `create_date` datetime default null, `update_date` datetime default null, primary key (`id`)) engine=innodb auto_increment=1 default charset=utf8数据初始化
insert into cheetah.product_info(id, name, price, create_date, update_date)values(1, '从你的全世界路过', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39');insert into cheetah.product_info(id, name, price, create_date, update_date)values(2, '乔布斯传', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42');insert into cheetah.product_info(id, name, price, create_date, update_date)values(3, 'java开发', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');实战项目引入的依赖比较多,为了不占用过多的篇幅,大家可以在公众号【阿q说代码】后台回复“canal”获取项目源码!
mysql 和 redis 的相关配置在此不再赘述,有不懂的可以私聊阿q:qingqing-4132;
rabbitmq 配置@configurationpublic class rabbitmqconfig { public static final string canal_queue = canal_queue;//队列 public static final string direct_exchange = canal;//交换机,要与canal中配置的相同 public static final string routing_key = routingkey;//routing-key,要与canal中配置的相同 /** * 定义队列 **/ @bean public queue canalqueue(){ return new queue(canal_queue,true); } /** * 定义直连交换机 **/ @bean public directexchange directexchange(){ return new directexchange(direct_exchange); } /** * 队列和交换机绑定 **/ @bean public binding orderbinding() { return bindingbuilder.bind(canalqueue()).to(directexchange()).with(routing_key); }}商品信息入缓存/** * 获取商品信息: * 先从缓存中查,如果不存在再去数据库中查,然后将数据保存到缓存中 * @param productinfoid * @return */@overridepublic productinfo findproductinfo(long productinfoid) { //1.从缓存中获取商品信息 object object = redistemplate.opsforvalue().get(redis_product_key + productinfoid); if(objectutil.isnotempty(object)){ return (productinfo)object; } //2.如果缓存中不存在,从数据库获取信息 productinfo productinfo = this.basemapper.selectbyid(productinfoid); if(productinfo != null){ //3.将商品信息缓存 redistemplate.opsforvalue().set(redis_product_key+productinfoid, productinfo, redis_product_key_expire, timeunit.seconds); return productinfo; } return null;}执行方法后,查看 redis 客户端是否有数据存入
更新数据入mq/** * 更新商品信息 * @param productinfo * @return */@postmapping(/update)public ajaxresult update(@requestbody productinfo productinfo){ productinfoservice.updatebyid(productinfo); return ajaxresult.success();}当我执行完 update 方法的时候,去rabbitmq management 查看,发现并没有消息进入队列。
问题描述通过排查之后我在服务器中 canal 下的 /usr/local/logs/example/example.log文件里发现了问题所在。
原因就是meta.dat中保存的位点信息和数据库的位点信息不一致导致 canal 抓取不到数据库的动作。
于是我找到 canal 的 conf/example/instance.properties 实例配置文件,发现没有将canal.instance.master.address=127.0.0.1:3306设置成自己的数据库地址。
解决方案先停止 canal 服务的运行;删除meta.dat文件;再重启 canal,问题解决;再次执行 update 方法,会发现 rabbitmq management中已经有我们想要的数据了。
mq接收数据编写 rabbitmq 消费代码的逻辑
@rabbitlistener(queues = canal_queue)//监听队列名称public void getmsg(message message, channel channel, string msg) throws ioexception { long deliverytag = message.getmessageproperties().getdeliverytag(); try { log.info(消费的队列消息来自: + message.getmessageproperties().getconsumerqueue()); //删除reids中对应的key productinfodetail productinfodetail = json.parseobject(msg, productinfodetail.class); log.info(库名:+ productinfodetail.getdatabase()); log.info(表名: + productinfodetail.gettable()); if(productinfodetail!=null && productinfodetail.getdata()!=null){ list当我们再次调用 update接口时,控制台会打印以下信息
从图中打印的信息可以看出就是我们的库和表以及消息队列,redis 客户端中缓存的信息也被删除了。
拓展看到这,你肯定会问:rabbitmq 是阅后即焚的机制,它确认消息被消费者消费后会立刻删除,如果此时我们的业务还没有跑完,没来的及删除 redis 中的缓存就宕机了,岂不是缓存一直都得不到更新了吗?
首先我们要明确的是 rabbitmq 是通过消费者回执来确认消费者是否成功处理消息的,即消费者获取消息后,应该向 rabbitmq 发送 ack 回执,表明自己已经处理消息了。
为了不让上述问题出现,消费者返回 ack 回执的时机就显得非常重要了, 而 springamqp 也为我们提供了三种可选的确认模式:
manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack;auto:自动 ack ,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,抛出异常则返回 nack;none:关闭 ack,mq 假定消费者获取消息后会成功处理,因此消息投递后立即被删除;由此可知在 none 模式下消息投递最不可靠,可能会丢失消息;在默认的 auto 模式下如果出现服务器宕机的情况也是会丢失消息的,本次实战中,阿q为了防止消息丢失采用的是 manual 这种模式,配置信息如下:
spring: rabbitmq: listener: simple: acknowledge-mode: manual #开启手动确认所以在代码中也就出现了
//用于肯定确认channel.basicack(deliverytag, true);//用于否定确认channel.basicreject(deliverytag ,true);当然此种模式虽然不会丢失消息,但是会导致效率变低。

电工最实用的经验公式
华为荣耀9、努比亚Z17、小米6、OPPOR11都是颜值担当你会怎么选?华为荣耀9、努比亚Z17、小米6、OPPOR11区别对比评
机电伺服系统一般由哪些环节组成_机电伺服系统的主要特点
低功耗蓝牙模块常见天线输出方式及选型建议
诺基亚新一代产品X7测评,AI智能场景侦测功能加持,拍暗叫绝
缓存与数据库一致性问题如何解决
一文看懂码灵半导体CFW32C7UL系列MPU产品应用(八):高端扫码设备应用(下)
小米无人机发布了,性价比如何?
针对单片机串行通信技术全解析
Switch便携底座氮化镓GaN充电器配件扩展坞拆解
沪电全力备战 5G建网,并规划年底完成黄石二厂建设
如何完成Mysql主从复制的在线配置
台湾5G频谱最终拍卖总价约合328.61亿人民币 创下全球单一频宽最高价
利用IBIS模型完善信号完整性计算和PCB设计
智能电动机保护器自动检测系统设计
美国一项顶尖新技术打破芯片体积物理极限
诚迈科技已成为国产OS希望 今年股价较去年大涨20多倍
一家味精加工厂就能卡住了全世界芯片企业?
小米6什么时候上市?小米6最新消息:小米6和小米mix2将会在4月19同时亮相
激光雷达第一股禾赛科技拟科创板上市募资20亿