重温中间件

消息中间件

Q1: 为什么使用消息中间件

从项目的业务场景看:

  1. 解耦。分属于两个平台的业务,一个平台需要监听另外一个平台数据的变动,前提是要求解耦
  2. 异步。我们这一侧在监听到变动后有一系列的业务逻辑处理,速率不一样。
  3. 削峰。如上所说,生产和消费速率不一致

缺点如下:

  1. 系统可用性降低,增加了依赖
  2. 系统复杂度提高,需要保证消息不被重复消费,需要处理消息丢失的情况,需要处理重新消费的问题,需要保证消息的顺序性
  3. 一致性问题

Q2: Kafka 和 MetaQ 区别

  • 存储形式

    • Kafka采用 partition,每个 topic 的每个 partition 对应一个文件。顺序写入,定时刷盘。但一旦单个 broker 的 partition 过多,则顺序写将退化为随机写,Page Cache 脏⻚过多,频繁触发缺⻚中断, 性能大幅下降。
    • RocketMQ 采用 CommitLog + ConsumeQueue,单个 broker 所有 topic 在 CommitLog 中顺序写, Page Cache 只需保持最新的⻚面即可。同时每个 topic 下的每个 queue 都有一个对应的 ConsumeQueue 文件作为索引。ConsumeQueue 占用 Page Cache 极少,刷盘影响较小。
  • 存储可靠性

    • RocketMQ支持异步刷盘,同步刷盘,同步Replication,异步Replication。
    • Kafka使用异步刷盘,异步Replication。
  • 顺序消息

    • Kafka和RocketMQ都仅支持单topic分区有序。
    • RocketMQ官方虽宣称支持严格有序,但方式为使用单个分区。
  • 延时消息

    • RocketMQ支持固定延时等级的延时消息,等级可配置。

    • kafka 没有完全支持延时消息,但是有延时操作「延时生产、延时拉取、延时数据删除」,可以自己实现延时消息,下文我有介绍两个方法来实现。

      补充:

      1. 延时拉取:Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给 follower 副本,如果没有延时消费,那么当 leader 副本没有新消息时,follower 副本一直拉取,是很浪费资源的。
      2. 延时生产:在使用生产者客户端发送消息的时候将 acks 参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。这部分就是延时生产操作。
  • 消息过滤

    • RocketMQ执行过滤是在Broker端,支持tag过滤及自定义过滤逻辑。
    • Kafka不支持Broker端的消息过滤,需要在消费端自定义实现。
  • 消息失败重试

    • RocketMQ支持定时重试,每次重试间隔逐渐增加。
    • Kafka不支持重试。
  • 消息投递实时性

    • RocketMQ 使用长轮询。
    • Kafka 使用短轮询。
  • DLQ(dead letter queue) 死信队列

    • RocketMQ通过DLQ来记录所有消费失败的消息。
    • Kafka无DLQ。Spring等第三方工具有实现,方式为将失败消息写入一个专⻔的topic。
  • 回溯消费

    • RocketMQ支持按照时间回溯消费,实现原理与Kafka相同。
    • Kafka需要先根据时间戳找到offset,然后从offset开始消费。
  • 事务

    • RocketMQ支持事务消息,采用二阶段提交+ broker 定时回查。但也只能保证生产者与broker的一 致性,broker与消费者之间只能单向重试。即保证的是最终一致性。
    • Kafka从0.11版本开始支持事务消息,除支持最终一致性外,还实现了消息Exactly Once语义(单个 partition)。
  • 服务发现

    • RocketMQ自己实现了namesrv。
    • Kafka使用ZooKeeper。
  • 高可用

    • RocketMQ在高可用设计上粒度只控制在Broker。其保证高可用是通过master-slave主从复制来解决的。
    • Kafka控制高可用的粒度是放在分区上。每个topic的leader分区和replica分区都可以在所有broker 上负载均衡的存储。

    Kafka的这种设计相比RocketMQ这种主从复制的设计有以下好处:

    Kafka中不需要设置从broker,所有的broker都可以收发消息。负载均衡也做的更好。 Kafka的分区选举是自动做的,RocketMQ需要自己指定主从关系。 Kafka分区的复制份数指定为N,则可以容忍N-1个节点的故障。发生故障只需要分区leader 选举下即可,效率很高。

Q3:如何保证高可用

kafka 通过多副本机制来实现高可用,其中所有的副本的集合称为 AR(Assigned Replicas),AR = ISR(In-Sync Replicas)+ OSR(Out-of-Sync Replicas),其中 ISR 是指与 leader 副本保持一定程度的同步「可忍受的滞后范围」的 follower 副本的集合,当 leader 副本出现问题时,会在 ISR 集合中选举出一个新的 leader 来保证集群可用。

其中,ISR 中的一定程度的同步的数值,是可以设置的,滞后范围是通过 HW(高水位) 和 LEO(日志结束偏移量) 来确定的,HW 即当前 ISR 中已同步的偏移量,客户端只能获取到HW及之前的偏移量的消息,LEO 即当前日志下一条待写入消息的 offset。

最后,值得一提的是,kafka 的主从间的复制机制,并不是完全的同步机制(所有的 follower 同步完 leader 的消息之后才确认为已成功提交,性能很低),也不是完全的异步机制(leader 副本写入就认为已经成功提交,leader 一旦突然宕机就会造成数据丢失),而是使用这种 ISR 的方式,有效的权衡了数据可靠性和性能之间的关系。

Q4:如何保证消息不被重复消费

  • 通过Mysql的唯一键约束,例如新增场景下,可以先查询数据库判断是否有该条数据
  • 加个唯一标识,在内存中记录下来

Q5:如何保证消息的可靠性传输,不丢失

  • 生产消息丢失:解决方案为生产者生产消息可以通过comfirm配置ack=all解决,只有当 到达 broker ,并且所有副本都落库数据,才返回 ack = all
  • Broker同步过程中leader宕机可以通过配置ISR副本+重试解决
  • 消费者丢失可以关闭自动提交offset功能,系统处理完成时提交offset

Q6:如何保证消息的顺序性

  • 单 topic,单partition,单 consumer,单线程消费,吞吐量低,不推荐

  • 如只需保证单 key 有序,为每个key申请单独内存 queue,每个线程分别消费一个内存 queue 即可,这样就能保

    证单key(例如用户id、活动id)顺序性,还是使用 kafka 中对于 分区 partition 是有序的功能。

Q7:如何解决消息堆积问题

解决消费者速率跟不上生产者速率的问题:

  • 如果是poll太慢,可以定义消费者组,一个消费者组的消费者可以并发消费数据
  • 如果是消费者的业务逻辑导致速率过慢,可以学习NIO中的 Reactor模型,一个消费者负责监听消息,另其其他线程去进行 work

Q8:如何解决消息延时消费问题

一般这种情况下都是消费者消费出现了问题导致消息延时,消息堆积严重,遇到这种特殊 case,直接扩容,将topic 下的partition进行扩容,然后使用消费者组进行多线程消费,恢复后,恢复原先部署的架构重新用原先的 consumer 机器来消费消息

如何做到延时:

kafka支持延时操作「延时生产、延时拉取、延时数据删除」,有延时队列 DelayQueue

Q9:开放性问题,自己如果要设计一个消息队列,如何设计

  • 需要支持快速水平扩容,broker+partition,partition放不同的机器上,增加机器时将数据根据topic做迁移,
  • 分布式: 需要考虑一致性、可用性、分区容错性
  • 一致性: 生产者的消息确认、消费者的幂等性、Broker的数据同步
  • 可用性: 数据如何保证不丢不重、数据如何持久化、持久化时如何读写
  • 分区容错: 采用何种选举机制、如何进行多副本同步
  • 海量数据: 如何解决消息积压、海量Topic性能下降,性能上,可以借鉴时间轮、零拷贝、IO多路复用、顺序读写、压缩批处理

Q10:metaQ 是否保证消息不重复

不保证,不支持 “Exactly Only one”特性,原因如下:

  • 发送消息阶段,会存在分布式环境下典型的超时问题,即发送消息阶段不能保证消息不重复
  • 订阅消息阶段,由于涉及集群订阅,多个订阅者需要 Rebalance 方式订阅,在 Rebalance 短暂不一致时,会导致消息重复
  • 订阅者意外宕机,消息进度未及时存储,也会产生消息重复

解决方案:

  • 应用方收到消息后,可通过 Tair、DB去重
  • 应用方可通过主动拉的方式保证消息绝不重复

Dubbo/HSF

Q1:请简要介绍Dubbo的核心概念和组件

Dubbo的核心概念包括提供者(Provider)、消费者(Consumer)、注册中心(Registry)和监控中心(Monitor)。Dubbo的组件包括服务接口(Service Interface)、服务提供者(Service Provider)、服务消费者(Service Consumer)、注册中心(Registry Center)、监控中心(Monitor Center)和远程调用(Remote Invocation)。

Q2:Dubbo支持的负载均衡策略有哪些

Dubbo支持的负载均衡策略包括随机(Random)、轮询(Round Robin)、最少活跃调用数(Least Active)、一致性哈希(Consistent Hashing)和权重(Weighted)。每种策略的特点如下:

  • 随机:随机选择一个可用的服务提供者进行调用。
  • 轮询:按照顺序依次选择可用的服务提供者进行调用。
  • 最少活跃调用数:选择活跃调用数最小的服务提供者进行调用,适用于并发较高的场景。
  • 一致性哈希:根据请求的键值进行哈希计算,选择哈希值最接近的服务提供者进行调用,适用于需要保持稳定调用关系的场景。
  • 权重:根据服务提供者的权重值进行选择,权重越高的服务提供者被选择的概率越大。

Q3:Dubbo的服务治理包括哪些方面?请简要说明每个方面的作用

  • 服务注册与发现:通过注册中心实现服务的注册与发现,使得服务消费者能够动态获取可用的服务列表。
  • 负载均衡:通过负载均衡策略选择合适的服务提供者,平衡请求的分布,提高系统的稳定性和性能。
  • 集群容错:通过集群容错机制处理服务提供者的故障,保证系统的可靠性和可用性。
  • 服务路由:根据路由规则将请求路由到指定的服务提供者,实现灰度发布、A/B测试等功能。
  • 服务降级:在高并发或故障情况下,根据配置降低服务的负载或返回默认值,保护核心业务的可用性。
  • 服务限流:对服务进行流量控制,防止恶意请求或异常情况导致系统过载。
  • 监控与统计:对服务的调用进行监控和统计,包括调用次数、响应时间等指标,帮助分析和优化系统性能。

Q4:Dubbo的服务注册与发现机制是如何实现的?

Dubbo的服务注册与发现机制是通过注册中心来实现的。在Dubbo中,服务提供者在启动时会将自己提供的服务信息注册到注册中心,包括服务接口、IP地址、端口等。而服务消费者在调用服务时会向注册中心查询可用的服务列表,然后根据负载均衡策略选择一个服务提供者进行调用。注册中心负责管理服务提供者的注册信息,维护可用服务列表,并提供相应的查询接口供服务消费者使用。

Dubbo支持多种注册中心,如ZooKeeper、Consul、Etcd等。这些注册中心提供了高可用、分布式的服务注册与发现能力,可以在集群环境中保证服务的可用性和弹性。

Q5:Dubbo的序列化支持哪些方式?如何选择适合的序列化方式?

Dubbo支持多种序列化方式,包括Hessian、Java原生序列化、JSON、Protobuf等。不同的序列化方式具有不同的特点和适用场景。

  • Hessian:Hessian是一种基于二进制的高性能序列化方式,支持跨语言调用。它在性能和序列化体积方面都有较好的表现,适合在Java环境中使用。
  • Java原生序列化:Java原生序列化是Java标准库提供的序列化方式。它简单易用,但在性能和序列化体积方面不如其他序列化方式高效,通常不推荐在生产环境中使用。
  • JSON:JSON是一种文本格式的序列化方式,具有良好的可读性和跨语言支持。它适用于与非Java语言进行通信或在Web环境中使用。
  • Protobuf:Protobuf是Google开发的一种高效的二进制序列化方式。它具有很高的性能和较小的序列化体积,适合在高性能、大规模数据传输的场景中使用。

选择适合的序列化方式需要考虑性能、序列化体积、跨语言支持和可读性等因素,并根据具体的业务需求和环境进行评估和选择。

衡量一个序列化框架,主要有以下几个指标:

  • 性能。包括时间开销方面的性能和空间开销方面的性能。时间开销,是指序列化反序列化解析的时间,空间开销,则是指相同的对象在序列化后所占的字节数。
  • 通用性。是否支持跨平台,跨语言。
  • 可扩展性、兼容性。如果序列化协议具有良好的可扩展性,支持自动增加新的业务字段,而不影响老的服务,这将大大提供系统的灵活度。

序列化框架总结

Q6:Dubbo的集群容错机制是如何处理服务提供者的故障

Dubbo提供了多种集群容错机制来处理服务提供者的故障,确保系统的可靠性和可用性。

  • 失败自动切换(Failover):当调用失败时,Dubbo自动切换到其他可用的服务提供者进行重试,直到调用成功或达到最大重试次数。适用于对服务的可用性要求较高的关键业务场景。
  • 快速失败(Failfast):快速失败机制在发生错误时立即返回失败结果,不进行重试。适用于对响应时间敏感的非关键业务场景,如页面渲染等。
  • 失败安全(Failsafe):失败安全机制会忽略调用的错误,直接返回默认值,保证调用的安全性和稳定性。适用于监控、统计等对结果实时性要求不高的场景。
  • 失败定位(Failback):失败定位机制会记录失败的请求,并在后台定时重发,尝试恢复失败的调用。适用于需要保证调用可靠性的场景。
  • 广播(Broadcast):广播机制会将请求发送给所有提供者并收集结果,通常用于通知所有提供者更新缓存等场景。

Q7:dubbo 工作原理

  • 第一层:service 层,接口层,给服务提供者和消费者来实现的
  • 第二层:config 层,配置层,主要是对 dubbo 进行各种配置的
  • 第三层:proxy 层,服务代理层,无论是 consumer 还是 provider,dubbo 都会给你生成代理,代理之间进行网络通信
  • 第四层:registry 层,服务注册层,负责服务的注册与发现
  • 第五层:cluster 层,集群层,封装多个服务提供者的路由以及负载均衡,将多个实例组合成一个服务
  • 第六层:monitor 层,监控层,对 rpc 接口的调用次数和调用时间进行监控
  • 第七层:protocal 层,远程调用层,封装 rpc 调用
  • 第八层:exchange 层,信息交换层,封装请求响应模式,同步转异步
  • 第九层:transport 层,网络传输层,抽象 mina 和 netty 为统一接口
  • 第十层:serialize 层,数据序列化层

工作流程

  • 第一步:provider 向注册中心去注册
  • 第二步:consumer 从注册中心订阅服务,注册中心会通知 consumer 注册好的服务
  • 第三步:consumer 调用 provider
  • 第四步:consumer 和 provider 都异步通知监控中心

Q8:分布式服务接口的幂等性如何设计(比如不能重复扣款)?

其实保证幂等性主要是三点:

  • 对于每个请求必须有一个唯一的标识,举个栗子:订单支付请求,肯定得包含订单 id,一个订单 id 最多支付一次,对吧。
  • 每次处理完请求之后,必须有一个记录标识这个请求处理过了。常见的方案是在 mysql 中记录个状态啥的,比如支付之前记录一条这个订单的支付流水。
  • 每次接收请求需要进行判断,判断之前是否处理过。比如说,如果有一个订单已经支付了,就已经有了一条支付流水,那么如果重复发送这个请求,则此时先插入支付流水,orderId 已经存在了,唯一键约束生效,报错插入不进去的。然后你就不用再扣款了。

Q9:分布式服务接口请求的顺序性如何保证?

首先,一般来说,个人建议是,你们从业务逻辑上设计的这个系统最好是不需要这种顺序性的保证,因为一旦引入顺序性保障,比如使用分布式锁,会导致系统复杂度上升,而且会带来效率低下,热点数据压力过大等问题。

下面我给个我们用过的方案吧,简单来说,首先你得用 dubbo 的一致性 hash 负载均衡策略,将比如某一个订单 id 对应的请求都给分发到某个机器上去,接着就是在那个机器上,因为可能还是多线程并发执行的,你可能得立即将某个订单 id 对应的请求扔一个内存队列里去,强制排队,这样来确保他们的顺序性。

Q10:如何设计一个类似 Dubbo 的 RPC 框架

  1. 通信协议:选择合适的通信协议用于服务提供者和服务消费者之间的通信。常见的选择包括基于TCP的自定义协议、HTTP协议或WebSocket等。协议的选择应考虑性能、可靠性、跨语言支持等因素。
  2. 服务注册与发现:设计一个注册中心用于服务提供者注册服务信息,并提供服务发现功能供服务消费者查询可用的服务列表。可以考虑使用类似于ZooKeeper、Etcd或Consul等的分布式协调服务作为注册中心。
  3. 序列化与反序列化:实现对象的序列化与反序列化,将方法调用的请求和响应进行编码和解码。常用的序列化方式有JSON、XML、Protobuf等,选择合适的序列化方式可以考虑性能、序列化体积、跨语言支持等因素。
  4. 负载均衡:实现负载均衡策略,选择合适的服务提供者进行请求的路由和分发。常见的负载均衡策略包括随机、轮询、最少活跃调用数等。
  5. 容错机制:设计容错机制来处理服务提供者的故障和异常情况。常见的容错策略有失败自动切换、快速失败、失败安全、失败定位等。
  6. 线程池和并发控制:管理请求的线程池,控制并发请求的数量,避免系统过载和资源浪费。
  7. 监控与统计:收集和统计服务的调用次数、响应时间等指标,提供可视化的监控界面和报告。
  8. 安全机制:考虑服务的安全性,包括身份验证、权限控制和数据加密等。
  9. 扩展机制:提供可扩展的插件机制,允许用户根据自己的需求扩展框架的功能。
Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------