消息中间件小记

1. 消息中间件的价值

传统意义上来说,消息中间件为我们带来了异步的特性,同时解耦了系统。

假设我们做一个登录系统,那么可能需要有短信服务、安全系统…等等,这让系统变得越来越复杂,我们从新考虑可以发现,登录系统只要判断用户名和密码,而其他能力与原系统应该是可解耦的部分,因此通过消息中间件来解耦,我们不用关心有多少系统知道登录这件事,我们现在需要保证仅仅是一个消息发送到消息中间件,当然我们可以这样做:

通过数据库的记录,我们可以保证一条登录消息一定被消费,登录系统负责写,用到这个状态的系统自己来查,杜宇感知状态的应用还需要对数据库轮询,这是一种解决但是比较简单,同时问题也比较多:增加业务数据库负担、依赖复杂且不安全、扩展性不好。

2. 互联网时代的消息中间件

2.1 发送一致性

所谓发送一致性是说产生消息的业务动作与消息发送的一致。如果业务操作成功,那么这个操作产生的消息一定发送,否则消息丢失。

1
2
3
4
5
6
7
8
9
10
void fool() {
// 业务操作
// DB 、调用服务等
// 发送消息
}
void fool2() {
// 发送消息
// 业务操作
// DB 、调用服务等
}

我们可能会有这两种的操作,但是都是不可靠的,这两种方式都不能选择。

在 JMS 中我们一般是这样发送消息的:

1
2
3
ConnectionFactory -> Connection -> Session -> Message
Destination + Session -> MessageProducer
Destination + Session -> MessageConsumer

同时在 JMS 中的发送消息与业务操作之间事务保证是用 XA 系列接口保证的,这增加了复杂性,同时要求业务操作的资源必须支持 XA 协议。

我们需要一种方案对正常流程影响尽可能小:

  1. 业务应用首先把消息发送给消息中间件,标记状态为待处理
  2. 消息中间件存储消息,并不投递该消息;
  3. 消息中间件返回该消息已经入库,结果成功或失败;
  4. 业务应用收到结果处理:
    1. 如果失败,则放弃业务处理,结束;
    2. 如果成功,则进行业务处理。
  5. 业务处理完成,把处理结果发送给消息中间件;
  6. 消息中间件收到业务处理结果,根据结果处理:
    1. 如果业务失败,则删除消息存储的的信息,结束;
    2. 如果消息成功,把消息存储的状态改为可发送,调度进行消息投递。

开始考虑上述 6 步,最后 2 步是可能出现『不一致』的,如果第 5 步出现问题,那么消息中间件不知道怎样处理该消息;如果第 6 步没有更新状态,也没有了下一步操作。

梳理异常情况可以找到以下 3 种状态:

  • 业务操作未进行,消息未存储;
  • 业务操作未进行,消息已存储,状态为待处理;
  • 业务操作已进行,消息已存储,状态为待处理。

这三种状态中第一种不需要额外处理,另外两种需要根据业务操作的结果来处理。

这里有补偿流程:

  1. 由消息中间件主动询问业务应用获取待处理消息对应的业务操作的结果;
  2. 业务应用对业务操作进行结果查询;
  3. 业务应用将业务结果发送到消息中间件(失败、成功、等待);
  4. 消息中间件根据业务应用返回结果更新消息状态。

这里的 4 步操作是为了确认业务处理操作的结果,前面 3 步失败便失败,最后一步如果失败进行重试即可。

将正向和反向流程结合就是解决业务操作与发送消息一致性的方案了。

正向角度来看我们仅仅是增加了一次网络操作与一次更新 DB 的操作,开销并不大,而且还能优化。

一致性方案下的业务代码:

1
2
3
4
5
6
7
8
9
10
11
Result postMessage(Message, PostMessageCallback) {
// 发送消息到消息中间件
// 获取返回结果
if (result == false) {
return FAIL;
}
// 进行业务操作
// 获取业务操作结果
// 发送业务操作结果到消息中间件
// 返回处理结果
}

在开发中处理提供发送一致性的消息外,还应该提供一个传统的发送消息的接口,为了适应其他场景,也会提供独立的接口把流程的控制权交给业务应用自身。

2.2 消息中间件与使用者的强依赖

在一致性的要求下,可能消息中间件会成为业务系统的强依赖,为了解决这个问题有三种思路:

  1. 提供消息中间件系统的可靠性,但是不是 100% 可靠;
  2. 对于消息中间件系统中影响业务操作的部分,使其可靠性与业务自身的可靠性相同;
  3. 提供弱依赖支持。

第一种方案中既然不能 100% 保证,自然不能入选;第二种方案中『相同的可靠性』就是保证业务能成功就需要消息能够入库;结合第二点和第三点可以得出如下架构:

现在消息表、业务表在一块,写入消息作为一个本地事务来完成,然后通知消息中间件有消息可以发送了,这就处理了一致性问题。

虚线表示这是一个不必要的操作、依赖。消息中间件轮询业务数据库,找到需要发送的消息,取出消息发送即可。

这样做有以下三点问题:

  1. 业务数据库承载消息数据;
  2. 消息中间件访问业务数据库;
  3. 业务操作的数据库能支持消息中间件的需求。

我们进行一定的变通,可以让消息中间件的轮询操作也由业务应用完成,让消息中间件不与业务应用直接打交道。

绝大部分问题解决了,但是这样架构需要业务操作是支持事务的数据库操作,具有一定的局限性。

更改思路是:可以将本地磁盘作为存储消息的地方,等待消息回复后再把消息发送给消息中间件,所有投递、重试等管理仍然在消息中间件中进行,本地磁盘作为业务应用上发送消息一定成功的一个保证。

image-20200415161417881

风险是:如果消息中间件不可用,并且本地磁盘坏了,那么消息就彻底丢失了。从业务数据上进行消息补发是最彻底的容灾手段。

将本地磁盘作为消息存储的方式有两种做法:

  1. 作为一致性发送消息的解决方案的容灾手段,平时不用,只有出问题了才切换到这种方式;
  2. 直接使用该方式,可以控制业务操作本身调用发送消息的接口的处理时间,此外也有机会在业务应用与消息中间件之间做一些批处理的工作。

2.3 消息模型对消息接收的影响

JMS 中有 Queue 点对点和 Topic 发布订阅两种模式。

Queue 模式下发送端不确定消息被哪个应用消费,被称为 Peer To Peer PTP 方式。

Topic 模式中,接收消息的应用都能收到所有 Topic 的消息,也被称为 Pub/Sub 方式。

在使用 JMS 时,每个 Connection 都有一个唯一的 ClientId ,用于标记连接的唯一性,我们是默认一个接收应用只用了一个连接,可以看一下多连接的情况:

在 Queue 下应用 3 与 应用 4 是两个不同的应用,应用 3 与 JMS 建立了两个连接,而应用 4 与 JMS 建立了一个连接,每个连接收到的消息条数以及收到消息的顺序则不是固定的。

在 Topic 下,应用 3 与应用 4 会受到所有发送到 Topic 的消息。

现在思考我们需要的消息模型应该是什么样的?它需要满足以下需求:

  • 消息发送方和接收方都是集群;
  • 同一个消息的接收方可能有多个集群进行消息的处理;
  • 不同集群对于同一条消息的处理不能相互干扰。

假设现在有两个集群和八条消息,每个集群两台机器,每个集群的机器应该分别处理所有的消息,不能遗漏或重复。

如果使用 JMS Queue 模型,那么集群 A 和集群 B 收到的消息都不完整。

如果使用 JMS Topic 的情况,每一台机器都可以收到所有消息,这种方式负担太大。

正确的做法应该是:

  • 集群之间使用 Topic 模型;
  • 集群内部使用 Queue 模型;
  • 用 ClusterId 标识不同集群。

如果单纯使用 JMS 的话,需要用上图思路将 Topic 与 Queue 进行级联。

用 JMS 的方式级联工作相对繁重,需要实现中转者,推荐抛弃 JMS 的束缚自己实现。

2.4 消息的订阅方式

消息的订阅分为持久与非持久两种方式,非持久是说如果接收消息者关闭则接收不到消息。

持久消息是即使接收者关闭了,还是会接收到消息(下一次启动时投递),只能选择显示关闭。

image-20200428021935265

因此我们的目标就是实现持久订阅。

2.5 消息可靠性

消息从发送端到接收端,中间有三个阶段需要保证可靠性:

  • 消息发送者发送消息到中间件;
  • 中间件把消息存储;
  • 中间件发送消息到消息接收者。

第一步只需要确定消息发送者与消息中间件之间的返回结果。

2.5.1 存储可靠性

  1. 基于文件的消息存储
  2. 采用数据库进行消息存储
  3. 双机内存的消息存储

在实际开发中应该根据实际业务场景进行选择。比如如果当前的使用场景是:

很高的消息吞吐量,写入速度很快,灵活检索,同时消息本身不大(1.5K),对消息的顺序不敏感。

那么可以选择关系型数据库来进行消息存储(比如参考 ActiveMQ 中的 Kaha Persistence 实现)。因为分布式文件系统的稳定性和性能还有待改善,同时也不支持灵活的检索;NoSQL的迁移、扩容在当前场景不重要。可以自己实现一个单机存储,难点有四个:

  • 完全重写一个单机的存储引擎;
  • 各个场景的测试没有遇到问题不代表没有问题,可靠性挑战大;
  • 由于关注吞吐量而不是消息顺序,会导致原本连续的消息有些不需要了,有些则必须保留,会造成文件的缺失;
    • 可以通过保留的信息顺序写入来弥补缺失,空缺的部分不再理会;
  • 检索处理会考虑索引对内存的消耗,必须考虑索引不能完全加载到内存的情况,涉及内存与磁盘的交换;

可以发现完全自主实现一个单机存储引擎很麻烦,可以采用现有数据库。使用现有数据库后下一步操作就是数据库的设计。对于消息中间件而言,也要考虑到数据冗余、级联查询的问题,当然希望一个消息只用一个单行的数据来解决,对于一个消息而言,可以分成三个部分:

  • 消息的 Header 信息;
    • 如消息 Id 、创建时间、投递次数、优先级、自定义键值对等;
  • 消息的 Body;
    • 消息的具体内容;
  • 消息的投递目标;
    • 指单条消息要投递到的目标集群的 ClusterId ;

消息表示、投递表示可以放到一张表中。

同时需要注意两个问题:

  • 投递列表字段有长度限制,一个变通是将投递列表转换成多个字段;
  • 无法按照单独的接收者来进行消息的调度,整合以后根据 ClusterId 进行调度无法直接做到,无法直接给 ClusterId 建立索引。

合并投递表和消息表后,如果订阅集群挂了,我们再去统一发送消息是比较低效的,此时可以在消息调度外增加一个针对特定集群的调度支持,也就是为需要尽快调度的集群建一个调度表,看上去不太优雅,但是比较好用。

此外还需要考虑到安全问题,单机不考虑;多机数据复制有延迟,如果需要低延迟且异地容灾可以用异步复制;也可以用应用实现双写,但是会让应用变得复杂。这些需要针对具体业务权衡。

在使用文件系统、数据库存储的时候因为磁盘 IO 的原因性能会受到限制。一个改进是采用混合方式进行存储的管理,可以用双机的内存来保证数据的可靠,如图所示正常情况下消息持久化是不工作的,而是基于内存存储信息来提供高吞吐,一旦机器故障,则停止另一台数据的写操作,并把当前数据落库。

正常机器把数据落库保证安全,只要不遇到两台机器都故障消息是安全的。

2.5.2 消息系统扩容

如图为消息中间件与消息存储之间的关系,不同的消息中间件可能会共用存储,而同一个消息中间件机器可能使用不同的存储。

  • 消息中间件自身扩容
    • 软负载动态感知新加入的中间件机器到集群;
    • 同一个存储区分消息的中间件来源,可以给每条数据增加一个 server 标识,新的消息中间件加入会有新的 server 标识;
    • 需要注意如果有消息中间件长期不可用,需要把这些标识的消息给其他机器处理。
  • 消息存储的扩容
    • 不保证消息顺序,push 方式,这两点令存储变得相对简单;
    • 消息发送到中间件,中间件将消息入库,这时中间件知道消息在哪儿,不会出现主动搜索的场景(比如通过消息的 Id 查询),消息的索引信息都会在内存中;
    • 通过 push 绕过了主动查询的动作,使得存储扩容变得容易(不用担心分库分表找某一条会复杂)。

2.5.3 消息投递可靠性

这里有两个优化点:

首先是消息投递一定要采用多线程方式,如果某一个消息订阅者集群有一个很慢的订阅者会有堵塞,也可以把消息返回的处理工作放到另一个线程池中,保证投递的环节不会堵死。

其次在如图场景中,一个应用中可能有多个订阅者订阅了相同的消息,消息中间件会向机器发送多次同样的消息。

  • 单机多订阅者共享连接;
  • 消息只发送一次,然后单机的多订阅者生成多个实例。

2.6 订阅者视角消息的重复产生

有两大类原因会导致产生重复消息。

  • 消息发送端应用的消息重复发送;
    • 消息中间件收到消息并成功存储,但是消息中间件出现问题(应用异常);
    • 中间件收到消息,但是存储异常(超时),导致发送端重试,此时网络恢复,导致消息重复;
    • 总结:消息到达存储因各种原因没收到成功的返回,并且又有重试机制导致消息重复;
    • 处理:重试使用相同的消息 Id 。
  • 消息到达消息存储,由消息中间件进行向外投递时产生重复;
    • 消息成功到达接收者,接收者处理完成出问题(网络异常、处理超时、应用异常),消息中间件不知道处理结果再次投递;
    • 接收者处理完,中间件收到结果,但是此时消息存储异常,导致消息中间件再次向接收者投递;
    • 总结:消息接收者处理完成,消息中间件不能及时更新投递状态;
    • 处理:分布式事务,但是成本高,或者让接收者来处理重复,进行幂等操作。

2.7 消息投递的其他属性支持

  • 消息优先级;
  • 订阅者消息处理顺序与分级订阅;
    • 如果希望消息在消费者中的处理有顺序的,可以用字段定义优先级,也可以分级订阅;
    • 将消息接收者分成一般接收者与优先接收者,做到先后顺序;
  • 自定义属性;
  • 局部顺序;
    • 众多消息中与某件事相关的消息是有序的,而多件事之间是无序的;

2.8 保证顺序的消息队列

在中间件内部,有多个物理上的队列,每一个队列严格按照顺序被接收,而消息中间件内部的队列之间是互不影响的。

具体实现中,每一个接收者在队列上有一个当前消费消息的位置,这个位置是已经完成消费的消息。同一个队列中,不同接收着维护各自的指针,该方式下,就能完成接收端的自主控制。

2.9 Push 与 Pull 方式

push 动作来自服务端,pull 动作来自消费端。因此实时性强的操作应该用 push ,实时性不强的可以选用 pull 方式。