Install

docker安装并运行broker

1
2
3
4
5
docker pull docker.m.daocloud.io/nats:latest
docker run -itd -p 4222:4222 docker.m.daocloud.io/nats:latest

# -js 指的是 jetstream 模式
docker run -itd --name jetstream -p 4222:4222 docker.m.daocloud.io/nats:latest -js

或者直接下载所有可执行文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 分别是:Broker, 客户端命令行,监控工具,go-SDK
https://github.com/nats-io/nats-server
https://github.com/nats-io/natscli
https://github.com/nats-io/nats-top
https://github.com/nats-io/nats.go

# 得到可执行文件后,运行JetStream
./nats-server -js -m 8222
[INF] Starting nats-server
[INF] Version:  2.10.21
[INF] Starting http monitor on 0.0.0.0:8222
[INF] Starting JetStream
[INF] Listening for client connections on 0.0.0.0:4222
[INF] Server is ready

# 运行监控
./nats-top 

nats消息队列有三款产品,nats,nats-stream,nats jetstream

  1. nats:第一代基于最多交付一次模型设计的系统
  2. nats-stream:nats不支持持久化等功能,为了应对这里提供了nats-stream系统
  3. jetStream:nats升级到2.0后,将nats-stream后续弃用,官方建议使用jetStream,jetStream 分布式安全、多租户和水平扩展能力。

性能测试工具

1
2
3
4
5
6
7
8
# 解压nats.go对应的包之后,还可以进入源代码,自己编辑得到nats-bench性能测试工具
cd ./nats.go-1.37.0/examples/nats-bench/
go get .
go build main.go
mv main nats-bench

# 比如在上面broker运行的情况下,测试一把看看吞吐量
./nats-bench -n 100000000 -np 10 -ms 1 a

工作模式

发布-订阅(一对多)

NATS 实现了一对多发布订阅消息模型。当 publisher 往 subject 上发布一条消息后,此 subject 上所有 subscriber 都能收到 此消息,属于一种广播。

1
2
3
4
5
6
7
# 1
nats sub topic1
# 2
nats sub topic1
# 3
nats pub topic1 "{{Random 10 10}}"
nats pub topic1 "{{Random 10 10}}" --count=2

分组队列(负载均衡)

如果我们把 subscriber 分组,那么当 publisher 往 subject 上发布一条消息后,同一组里只有一个 subscriber 会收到此消息,从而实现了负载均衡。

1
2
3
4
5
6
# 1
nats sub topic1 --queue group1
# 2
nats sub topic1 --queue group1
# 3
nats pub topic1 "{{Random 10 10}}" --count=10

请求-应答(微服务调用)

一般来说,MQ是以异步的形式工作,也就是说,publisher 往 topic上发布一条消息后,并不在意 subscriber 的 reply 是什么。如果 publisher 在意 subscriber 的 reply 是什么的话,那么消息系统就应该以同步的形式工作,而这会显著降低MQ的性能。

NATS在具体实现中,是通过两次发布订阅来完成的:当 publisher 发布消息后,它会订阅一个特定的 topic,当 subscriber 处理完消息后,它会把 reply 发布到这个特定的 topic。当然,整个过程对使用者是透明的。

1
2
3
4
5
6
# 1
nats reply 'weather.>' --command "curl -s wttr.in/{{1}}?format=1" --queue gpA
# 2
nats reply 'weather.>' --command "curl -s wttr.in/{{1}}?format=3" --queue gpA
# 3
nats request weather.beijing '{{Random 5 5}}' --count=10

功能非常强大。#3>request通过负载均衡的方式轮流调用了#1和#2做处理,传递了数据,也得到了反馈。

PS:这里有个简单获取天气预报的网址https://wttr.in/beijing?format=3

Go开发示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package nets

import (
	"fmt"
	"github.com/nats-io/nats.go"
	"time"
)

const serverUrl = "nats://x.x.x.x:4222"

// 发布订阅模型:订阅1
func NatsSub11() {
	// Connect to a server
	nc, _ := nats.Connect(serverUrl)
	defer nc.Close()

	// Simple Async Subscriber
	if _, err := nc.Subscribe("foo", func(m *nats.Msg) {
		fmt.Printf("Sub11: %s\n", m.Data)
	}); err != nil {
		fmt.Println(err)
		return
	}
	time.Sleep(120 * time.Second)
	fmt.Printf("Sub11 exit, bye bye...\n")
}

// 发布订阅模型:订阅2
func NatsSub12() {
	// Connect to a server
	nc, _ := nats.Connect(serverUrl)
	defer nc.Close()

	// Simple Async Subscriber
	if _, err := nc.Subscribe("foo", func(m *nats.Msg) {
		fmt.Printf("Sub12: %s\n", m.Data)
	}); err != nil {
		fmt.Println(err)
		return
	}
	time.Sleep(120 * time.Second)
	fmt.Printf("Sub12 exit, bye bye...\n")
}

// 发布订阅模型:发布
func NatsPub1() {
	// Connect to a server
	nc, _ := nats.Connect(serverUrl)
	defer nc.Close()

	for i := 0; i < 100; i++ {
		// Simple Publisher
		if err := nc.Publish("foo", []byte(fmt.Sprintf("hello %d", i))); err != nil {
			fmt.Println(err)
			return
		}
		time.Sleep(1 * time.Second)
	}
	fmt.Printf("Pub1 finished, bye bye...\n")
}

func NatsDemoStart() {
	go NatsSub11()
	go NatsSub12()
	go NatsPub1()
	time.Sleep(125 * time.Second)
	fmt.Printf("NatsDemo finished, bye bye...\n")
}

监控

说到监控,除了前面提到的 nats-top 之外,还有诸如 natsboard 之类的 UI 可供选择。

Linux中查看网络包可以用tcpdump:

1
2
3
4
5
tcpdump -vv -i ens33 port 4222

# -A 选项来以 ASCII 的形式打印出所有的网络包内容
# -w 选项将所有内容写入 packets.txt 文件
tcpdump -vv -i ens33 port 4222 -A -w packets.txt

(完)