Java消息队列任务的平滑关闭

摘要

 

对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?

 

1.问题背景

 

对于消息队列的监听,我们一般使用Java写一个独立的程序,在Linux服务器上运行。程序启动后,通过消息队列客户端接收消息,放入一个线程池进行异步处理,并发的快速处理。

 

那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?

 

正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。

 

因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。

 

2.问题分析

 

平滑关闭的思路如下:

 

1.在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中

 

2.关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)

 

3.程序退出

 

关闭消息订阅:一般消息队列的客户端都提供关闭连接的方法,具体可以自行查看api

 

关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭。

 

那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?

 

在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量

我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。

 

伪代码如下

 

下面通过一个demo模拟相关逻辑操作

 

首先模拟一个生产者,每秒生产5个消息

 

然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。(详细代码请点击“阅读原文”)


 

当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数

另打开一个终端,通过ps命令查看进程号,或者通过nohup启动Java进程拿到进程id

当我们执行kill -12 pid的时候 可以看到关闭业务逻辑

 

3.问题总结

 

在部门的实际业务中,消息队列的消息量还是挺大的,某些业务高峰时每秒有几百的消息量,因此对消息的处理要保证速度,避免消息积压,也可以通过负载解决单个订阅节点的压力。

 

在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。

 

 

  1. da shang
    donate-alipay
               donate-weixin weixinpay

发表评论↓↓