发送结构化数据

一些客户端库提供了发送结构化数据的辅助工具,而另一些则依赖应用程序自行进行编码和解码,并仅接受字节数组进行发送。以下示例展示了如何发送 JSON 数据,但也可以轻松修改为发送协议缓冲区(protocol buffer)、YAML 或其他格式。JSON 是一种文本格式,因此在大多数语言中我们还需要将字符串编码为字节。我们使用 UTF-8,这是 JSON 的标准编码方式。

以一个简单的 股票行情 为例,它会发送每只股票的代码和价格:

{% tabs %} {% tab title="Go" %}

nc, err := nats.Connect("demo.nats.io")
if err != nil {
    log.Fatal(err)
}
defer nc.Close()

ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
if err != nil {
    log.Fatal(err)
}
defer ec.Close()

// 定义对象
type stock struct {
    Symbol string
    Price  int
}

// 发布消息
if err := ec.Publish("updates", &stock{Symbol: "GOOG", Price: 1200}); err != nil {
    log.Fatal(err)
}

{% endtab %}

{% tab title="Java" %}

class StockForJsonPub {
    public String symbol;
    public float price;
}

public class PublishJSON {
    public static void main(String[] args) {
        try {
            Connection nc = Nats.connect("nats://demo.nats.io:4222");

            // 创建数据对象
            StockForJsonPub stk = new StockForJsonPub();
            stk.symbol="GOOG";
            stk.price=1200;

            // 使用 Gson 将对象编码为 JSON
            GsonBuilder builder = new GsonBuilder();
            Gson gson = builder.create();
            String json = gson.toJson(stk);

            // 发布消息
            nc.publish("updates", json.getBytes(StandardCharsets.UTF_8));

            // 确保消息发送完成后再关闭连接
            nc.flush(Duration.ZERO);
            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

{% endtab %}

{% tab title="JavaScript" %}

nc.publish("updates", JSON.stringify({ ticker: "GOOG", price: 2868.87 }));

{% endtab %}

{% tab title="Python" %}

nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

await nc.publish("updates", json.dumps({"symbol": "GOOG", "price": 1200 }).encode())

{% endtab %}

{% tab title="C#" %}

// dotnet add package NATS.Net
using NATS.Net;

await using var client = new NatsClient();

using var cts = new CancellationTokenSource();

Task process = Task.Run(async () =>
{
    // 我们可以将消息反序列化为 UTF-8 字符串,以便在控制台中查看发布的序列化输出
    await foreach (var msg in client.SubscribeAsync<string>("updates", cancellationToken: cts.Token))
    {
        Console.WriteLine($"Received: {msg.Data}");
    }
});

// 等待订阅任务准备就绪
await Task.Delay(1000);

var stock = new Stock { Symbol = "MSFT", Price = 123.45 };

// 默认的序列化器使用 System.Text.Json 对象进行序列化
await client.PublishAsync<Stock>("updates", stock);

// 定义对象
public record Stock {
    public string Symbol { get; set; }
    public double Price { get; set; }
}

// 输出:
// Received: {"Symbol":"MSFT","Price":123.45}

{% endtab %}

{% tab title="Ruby" %}

require 'nats/client'
require 'json'

NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  nc.publish("updates", {"symbol": "GOOG", "price": 1200}.to_json)
end

{% endtab %}

{% tab title="C" %}

// C NATS 客户端中无法配置结构化数据。

{% endtab %} {% endtabs %}