慢速消费者

为支持弹性和高可用性,NATS 提供了内置机制,可自动修剪用于跟踪订阅者的注册监听器兴趣图,包括慢速消费者和懒惰监听者。NATS 会自动处理慢速消费者。如果客户端处理消息不够快,NATS 服务器会将其切断。为支持扩展,NATS 提供了客户端连接的自动修剪功能。如果订阅者在 ping-pong 间隔 内未响应服务器的 ping 请求,客户端将被切断(断开连接)。客户端需要具备重连逻辑,以便重新连上服务器。

在 Core NATS 中,无法跟上速度(can't keep up)的消费者的处理方式与许多其他消息系统不同:NATS 倾向于采取保护系统整体的方法,而非迁就特定消费者以确保消息传递。

什么是慢速消费者?

慢速消费者是指无法跟上 NATS 服务器发送的消息流的订阅者。在分布式系统中,这是常见情况,因为生成数据通常比处理数据更容易。当消费者无法足够快地处理数据时,会对系统其余部分产生背压。NATS 提供了减少这种背压的机制。

NATS 可在客户端或服务器中识别慢速消费者,并通过注册的回调、日志消息和服务器监控端点中的统计信息提供通知。

慢速消费者会怎样?

在客户端检测到时,应用程序会收到通知,并且消息会被丢弃,以便让消费者继续并减少潜在的背压。在服务器检测到时,服务器将断开与慢速消费者的连接,以保护自身和消息系统的完整性。

客户端识别的慢速消费者

客户端可以检测到自己是否成为慢速消费者,并通过异步错误回调通知应用程序。最好在客户端本地捕获慢速消费者,而不是让服务器检测到这种情况。以下示例演示如何定义和注册一个异步错误处理程序来处理慢速消费者错误。

func natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
    fmt.Printf("error: %v\n", natsErr)
    if natsErr == nats.ErrSlowConsumer {
        pendingMsgs, _, err := sub.Pending()
        if err != nil {
            fmt.Printf("couldn't get pending messages: %v", err)
            return
        }
        fmt.Printf("Falling behind with %d pending messages on subject %q.\n",
            pendingMsgs, sub.Subject)
        // 记录错误,通知运维人员...
    }
    // 检查其他错误
}

// 在创建连接时设置错误处理程序。
nc, err := nats.Connect("nats://localhost:4222",
  nats.ErrorHandler(natsErrHandler))

使用此示例代码和默认设置,慢速消费者错误将生成类似以下输出:

error: nats: slow consumer, messages dropped Falling behind with 65536 pending messages on subject "foo".

请注意,如果您使用同步订阅者,Subscription.NextMsg(timeout time.Duration) 也会返回一个错误,指示存在慢速消费者并且消息已被丢弃。

服务器识别的慢速消费者

当客户端处理消息不够快时,服务器会将消息缓冲到与客户端的出站连接中。当这种情况发生且服务器无法足够快地向客户端写入数据时,为了保护自身,服务器会将订阅者指定为"慢速消费者",并可能丢弃相关连接。

当服务器发起慢速消费者错误时,您将在服务器输出中看到以下内容:

[54083] 2017/09/28 14:45:18.001357 [INF] ::1:63283 - cid:7 - Slow Consumer Detected

服务器还会记录遇到的慢速消费者错误的数量,可通过监控 varz 端点中的 slow_consumers 字段获取。

解决慢速消费者问题

除了使用 JetStream 或优化您的消费应用程序外,还有几种选择:扩展、计量或根据环境调整 NATS。

使用队列订阅进行扩展

如果您不依赖消息顺序,这是理想的选择。确保您的 NATS 订阅属于一个 队列组,然后根据需要创建更多服务或应用程序实例来进行扩展。这对于微服务来说是一个很好的方法——您的微服务的每个实例将接收一部分消息进行处理,只需添加更多服务实例即可扩展。无需代码更改、配置更改或任何停机时间。

创建可扩展的主题命名空间

通过事先设计,您可以通过主题命名空间进一步分配工作。如果您需要保持消息顺序,这种方法很有用。一般思想是发布到深层主题命名空间,并使用通配符订阅进行消费,同时为将来的扩展和分配工作留出空间。

举一个简单的例子,如果您有一个服务接收来自遍布城市的物联网设备的遥测数据,您可以发布到像 Sensors.NorthSensors.SouthSensors.EastSensors.West 这样的主题命名空间。最初,您将订阅 Sensors.> 在一个消费者中处理所有内容。随着企业的发展和数据流速率超过一个消费者所能处理的范围,您可以用四个消费应用程序替换单个消费者,每个应用程序订阅代表数据较小部分的主题。请注意,您的发布应用程序保持不变。

计量发布者

一个不太理想的选择可能是计量发布者。有几种方法可以做到这一点,从简单地减慢发布者速度到更复杂的方法,如定期发出阻塞请求-回复以匹配订阅者速率。

通过配置调整 NATS

NATS 服务器可以调整以确定在消费者被视为慢速之前可以缓冲多少数据,并且一些官方支持的客户端允许调整缓冲区大小。减小缓冲区大小将使您更快地识别慢速消费者。增加缓冲区大小通常不推荐,除非您正在处理临时数据突发。通常,增加缓冲区容量只会 推迟 慢速消费者问题。

服务器配置

NATS 服务器有一个写入截止时间,用于写入连接。当超过此写入截止时间时,客户端被视为慢速消费者。如果您在服务器中遇到慢速消费者错误,可以增加写入截止时间来缓冲更多数据。

NATS 服务器配置文件中的 write_deadline 配置选项可以调整此设置:

write_deadline: 2s

当您有数据突发需要容纳时,调整此参数是理想的。请确保您不仅仅是在推迟慢速消费者错误。

客户端配置

大多数官方支持的客户端都有一个待处理消息的内部缓冲区,如果本地订阅跟不上,将通过异步错误回调通知您的应用程序。在本地收到错误并不一定意味着服务器已将订阅识别为慢速消费者。

此缓冲区可以通过在创建订阅后设置待处理限制来配置:

if err := sub.SetPendingLimits(1024*500, 1024*5000); err != nil {
  log.Fatalf("Unable to set pending limits: %v", err)
}

默认的订阅者待处理消息限制是 65536,默认的订阅者待处理字节限制是 65536*1024

如果客户端达到此内部限制,它将丢弃消息并继续处理新消息。这与 NATS 的至多一次传递保证保持一致。您的应用程序需要检测丢失的消息并从此状态恢复。