那么问题来了,当我们修改程序后,需要重新启动任务的时候,如何保证消息的不丢失呢?
正常来说,订阅者程序关闭后,消息会在发送者队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。唯一可能丢失的消息,就是在关闭的一瞬间,已经从队列中取出但还没有处理完毕的消息。
因此我们需要一套平滑关闭的机制,保证在重启的时候,消息可以正常处理完成。
1.在关闭程序时,首先关闭消息订阅,这个时候消息都在发送者队列中
2.关闭本地消息处理线程池(等待本地线程池中的消息处理完毕)
3.程序退出
关闭线程池:Java的ThreadPoolExecutor线程池提供shutdown()和shutdownNow()两个方法,区别是前者会等待线程池中的消息都处理完毕,后者直接停止线程的执行并返回list集合。因为我们需要使用shutdown()方法进行关闭,并通过isTerminated(),方法判断线程池是否已经关闭。
那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?
在Linux中,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l查看kill 命令的其它信号量,比如使用 12) SIGUSR2 信号量
我们可以在Java程序启动时,注册对应的信号量,对信号量进行监听,在收到对应的kill操作时,执行相关的业务操作。
首先模拟一个生产者,每秒生产5个消息
然后模拟一个订阅者,收到消息后交给线程池进行处理,线程池固定4个线程,每个消息处理时间1秒,这样线程池每秒会积压1个消息。(详细代码请点击“阅读原文”)
在某些业务场景中,对消息的完整性要求不那么高,那么就不用考虑重启时的一点损耗。反之,就需要好好思考和设计了。