虽然呢连接状态本身很有趣,但在连接状态发生变化时及时得知可能更有意义。大多数(如果不是全部的话)NATS客户端库都提供了监听与连接及其状态相关的事件的方法。
这些监听器的实际API取决于语言,以下示例展示了几个更常见的用例。有关更具体的说明,请参阅您正在使用的客户端库的API文档。
连接事件可能包括连接被关闭、断开或重新连接。重新连接涉及断开、连接,但根据库的实现方式,也可能包括服务器尝试找到服务器时的多次断开,或者服务器重启时的多次断开。
{% tabs %} {% tab title="Go" %}
// 在NATS Go客户端中没有单一的连接事件监听器。
// 相反,您可以使用以下方法给每个事件单独设置处理程序:
nc, err := nats.Connect("demo.nats.io",
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
log.Printf("client disconnected: %v", err)
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
log.Printf("client reconnected")
}),
nats.ClosedHandler(func(_ *nats.Conn) {
log.Printf("client closed")
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
DisconnectHandler(cb ConnHandler)
ReconnectHandler(cb ConnHandler)
ClosedHandler(cb ConnHandler)
DiscoveredServersHandler(cb ConnHandler)
ErrorHandler(cb ErrHandler){% endtab %}
{% tab title="Java" %}
class MyConnectionListener implements ConnectionListener {
public void connectionEvent(Connection natsConnection, Events event) {
System.out.println("Connection event - " + event);
}
}
public class SetConnectionListener {
public static void main(String[] args) {
try {
Options options = new Options.Builder()
.server("nats://demo.nats.io:4222")
.connectionListener(new MyConnectionListener()) // 设置监听器
.build();
Connection nc = Nats.connect(options);
// 用连接做点事情
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}{% endtab %}
{% tab title="JavaScript" %}
const nc = await connect({ servers: ["demo.nats.io"] });
nc.closed().then(() => {
t.log("the connection closed!");
});
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Events.Disconnect:
t.log(`客户端断开了连接 - ${s.data}`);
break;
case Events.LDM:
t.log("客户端被要求重连");
break;
case Events.Update:
t.log(`客户端收到了集群更新 - ${s.data}`);
break;
case Events.Reconnect:
t.log(`客户端重新连接成功 - ${s.data}`);
break;
case Events.Error:
t.log("客户端遇到了权限错误");
break;
case DebugEvents.Reconnecting:
t.log("客户端正在尝试重新连接");
break;
case DebugEvents.StaleConnection:
t.log("客户端有一个过时的连接");
break;
default:
t.log(`遇到了未知状态 ${s.type}`);
}
}
})().then();{% endtab %}
{% tab title="Python" %}
# Asyncio NATS客户端可以定义多个事件回调
async def disconnected_cb():
print("Got disconnected!")
async def reconnected_cb():
# 查看重新连接后连接到哪个服务器
print("已重连到 {url}".format(url=nc.connected_url.netloc))
async def error_cb(e):
print("发生了一个错误:{}".format(e))
async def closed_cb():
print("连接已关闭")
# 设置断开连接和重新连接时的回调
options["disconnected_cb"] = disconnected_cb
options["reconnected_cb"] = reconnected_cb
# 设置发生错误或连接关闭时的回调
options["error_cb"] = error_cb
options["closed_cb"] = closed_cb
await nc.connect(**options){% endtab %}
{% tab title="C#" %}
// dotnet add package NATS.Net
using NATS.Net;
await using var client = new NatsClient();
client.Connection.ConnectionDisconnected += async (sender, args) =>
{
Console.WriteLine($"Disconnected: {args.Message}");
};
client.Connection.ConnectionOpened += async (sender, args) =>
{
Console.WriteLine($"Connected: {args.Message}");
};
client.Connection.ReconnectFailed += async (sender, args) =>
{
Console.WriteLine($"Reconnect Failed: {args.Message}");
};
await client.ConnectAsync();{% endtab %}
{% tab title="Ruby" %}
# 在Ruby NATS客户端中没有单一的连接事件监听器。
# 相比之下,您可以使用以下方法设置单独的事件处理程序:
NATS.on_disconnect do
end
NATS.on_reconnect do
end
NATS.on_close do
end
NATS.on_error do
end{% endtab %}
{% tab title="C" %}
static void
disconnectedCB(natsConnection *conn, void *closure)
{
// Do something
printf("Connection disconnected\n");
}
static void
reconnectedCB(natsConnection *conn, void *closure)
{
// Do something
printf("Connection reconnected\n");
}
static void
closedCB(natsConnection *conn, void *closure)
{
// Do something
printf("Connection closed\n");
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetDisconnectedCB(opts, disconnectedCB, NULL);
if (s == NATS_OK)
s = natsOptions_SetReconnectedCB(opts, reconnectedCB, NULL);
if (s == NATS_OK)
s = natsOptions_SetClosedCB(opts, closedCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// 销毁已创建的对象
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);{% endtab %} {% endtabs %}
当使用集群时,可能有服务器添加或更改。某些客户端允许您监听此变化。 {% tabs %} {% tab title="Go" %}
// 当有新服务器加入集群时收到通知。
// 打印所有已知服务器、仅发现的服务器。
nc, err := nats.Connect("demo.nats.io",
nats.DiscoveredServersHandler(func(nc *nats.Conn) {
log.Printf("Known servers: %v\n", nc.Servers())
log.Printf("Discovered servers: %v\n", nc.DiscoveredServers())
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// 用连接做点事情{% endtab %}
{% tab title="Java" %}
class ServersAddedListener implements ConnectionListener {
public void connectionEvent(Connection nc, Events event) {
if (event == Events.DISCOVERED_SERVERS) {
for (String server : nc.getServers()) {
System.out.println("Known server: "+server);
}
}
}
}
public class ListenForNewServers {
public static void main(String[] args) {
try {
Options options = new Options.Builder().
server("nats://demo.nats.io:4222").
connectionListener(new ServersAddedListener()). // 设置监听器
build();
Connection nc = Nats.connect(options);
// 用连接做点事情
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}{% endtab %}
{% tab title="JavaScript" %}
const nc = await connect({ servers: ["demo.nats.io:4222"] });
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Status.Update:
t.log(`servers added - ${s.data.added}`);
t.log(`servers deleted - ${s.data.deleted}`);
break;
default:
}
}
})().then();{% endtab %}
{% tab title="Python" %}
# Asyncio NATS 客户端尚未支持监听新服务器事件{% endtab %}
{% tab title="C#" %}
// NATS .NET 客户端尚未支持监听新服务器事件{% endtab %}
{% tab title="Ruby" %}
# Ruby NATS 客户端尚未支持监听新服务器事件{% endtab %}
{% tab title="C" %}
static void
discoveredServersCB(natsConnection *conn, void *closure)
{
natsStatus s = NATS_OK;
char **servers = NULL;
int count = 0;
s = natsConnection_GetDiscoveredServers(conn, &servers, &count);
if (s == NATS_OK)
{
int i;
// 做一些操作...
for (i=0; i<count; i++)
printf("Discovered server: %s\n", servers[i]);
// 释放已分配的内存
for (i=0; i<count; i++)
free(servers[i]);
free(servers);
}
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetDiscoveredServersCB(opts, discoveredServersCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// 销毁已创建的对象
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);{% endtab %} {% endtabs %}
客户端库可能会将 从服务器发到客户端的错误 与事件分离。许多服务器事件不会被应用程序代码处理,从而导致连接关闭。监听错误对于调试问题非常有用。
{% tabs %} {% tab title="Go" %}
// 设置当异步错误发生时将被调用的回调函数。
nc, err := nats.Connect("demo.nats.io",
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
log.Printf("Error: %v", err)
}))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// 用连接做点事情{% endtab %}
{% tab title="Java" %}
class MyErrorListener implements ErrorListener {
public void errorOccurred(Connection conn, String error)
{
System.out.println("The server notificed the client with: "+error);
}
public void exceptionOccurred(Connection conn, Exception exp) {
System.out.println("The connection handled an exception: "+exp.getLocalizedMessage());
}
public void slowConsumerDetected(Connection conn, Consumer consumer) {
System.out.println("A slow consumer was detected.");
}
}
public class SetErrorListener {
public static void main(String[] args) {
try {
Options options = new Options.Builder().
server("nats://demo.nats.io:4222").
errorListener(new MyErrorListener()). // 设置监听器
build();
Connection nc = Nats.connect(options);
// 用连接做点事情
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}{% endtab %}
{% tab title="JavaScript" %}
const nc = await connect({ servers: ["demo.nats.io"] });
// 如果客户端因错误而关闭,您可以在关闭处理程序中捕获这种情况:
nc.closed().then((err) => {
if (err) {
t.log(`the connection closed with an error ${err.message}`);
} else {
t.log(`the connection closed.`);
}
});
// 如果您有状态监听器,它也会收到通知
(async () => {
for await (const s of nc.status()) {
switch (s.type) {
case Status.Error:
// 通常如果收到这个,NATS连接将会关闭
t.log("客户端从服务器收到了异步错误");
break;
default:
t.log(`遇到了未知状态 ${s.type}`);
}
}
})().then();{% endtab %}
{% tab title="Python" %}
nc = NATS()
async def error_cb(e):
print("Error: ", e)
await nc.connect(
servers=["nats://demo.nats.io:4222"],
reconnect_time_wait=10,
error_cb=error_cb,
)
# 用连接做点事情.{% endtab %}
{% tab title="C#" %}
// dotnet add package NATS.Net
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Net;
// NATS .NET客户端目前不支持错误处理程序
// 相反,您可以使用日志记录器,因为服务器错误会以错误级别和事件ID 1005(协议日志事件)记录。
await using var client = new NatsClient(new NatsOpts
{
LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
});{% endtab %}
{% tab title="Ruby" %}
require 'nats/client'
NATS.start(servers:["nats://demo.nats.io:4222"]) do |nc|
nc.on_error do |e|
puts "Error: #{e}"
end
nc.close
end{% endtab %}
{% tab title="C" %}
static void
errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
{
// Do something
printf("Error: %d - %s\n", s, natsStatus_GetText(s));
}
(...)
natsConnection *conn = NULL;
natsOptions *opts = NULL;
natsStatus s = NATS_OK;
s = natsOptions_Create(&opts);
if (s == NATS_OK)
s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
if (s == NATS_OK)
s = natsConnection_Connect(&conn, opts);
(...)
// 销毁已创建的对象
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);{% endtab %} {% endtabs %}