Install
docker安装并运行broker
1
2
3
4
5
|
docker pull docker.m.daocloud.io/nats:latest
docker run -itd -p 4222:4222 docker.m.daocloud.io/nats:latest
# -js 指的是 jetstream 模式
docker run -itd --name jetstream -p 4222:4222 docker.m.daocloud.io/nats:latest -js
|
或者直接下载所有可执行文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# 分别是:Broker, 客户端命令行,监控工具,go-SDK
https://github.com/nats-io/nats-server
https://github.com/nats-io/natscli
https://github.com/nats-io/nats-top
https://github.com/nats-io/nats.go
# 得到可执行文件后,运行JetStream
./nats-server -js -m 8222
[INF] Starting nats-server
[INF] Version: 2.10.21
[INF] Starting http monitor on 0.0.0.0:8222
[INF] Starting JetStream
[INF] Listening for client connections on 0.0.0.0:4222
[INF] Server is ready
# 运行监控
./nats-top
|
nats消息队列有三款产品,nats,nats-stream,nats jetstream
- nats:第一代基于最多交付一次模型设计的系统
- nats-stream:nats不支持持久化等功能,为了应对这里提供了nats-stream系统
- jetStream:nats升级到2.0后,将nats-stream后续弃用,官方建议使用jetStream,jetStream 分布式安全、多租户和水平扩展能力。
性能测试工具
1
2
3
4
5
6
7
8
|
# 解压nats.go对应的包之后,还可以进入源代码,自己编辑得到nats-bench性能测试工具
cd ./nats.go-1.37.0/examples/nats-bench/
go get .
go build main.go
mv main nats-bench
# 比如在上面broker运行的情况下,测试一把看看吞吐量
./nats-bench -n 100000000 -np 10 -ms 1 a
|
工作模式
发布-订阅(一对多)
NATS 实现了一对多发布订阅消息模型。当 publisher 往 subject 上发布一条消息后,此 subject 上所有 subscriber 都能收到 此消息,属于一种广播。
1
2
3
4
5
6
7
|
# 1
nats sub topic1
# 2
nats sub topic1
# 3
nats pub topic1 "{{Random 10 10}}"
nats pub topic1 "{{Random 10 10}}" --count=2
|
分组队列(负载均衡)
如果我们把 subscriber 分组,那么当 publisher 往 subject 上发布一条消息后,同一组里只有一个 subscriber 会收到此消息,从而实现了负载均衡。
1
2
3
4
5
6
|
# 1
nats sub topic1 --queue group1
# 2
nats sub topic1 --queue group1
# 3
nats pub topic1 "{{Random 10 10}}" --count=10
|
请求-应答(微服务调用)
一般来说,MQ是以异步的形式工作,也就是说,publisher 往 topic上发布一条消息后,并不在意 subscriber 的 reply 是什么。如果 publisher 在意 subscriber 的 reply 是什么的话,那么消息系统就应该以同步的形式工作,而这会显著降低MQ的性能。
NATS在具体实现中,是通过两次发布订阅来完成的:当 publisher 发布消息后,它会订阅一个特定的 topic,当 subscriber 处理完消息后,它会把 reply 发布到这个特定的 topic。当然,整个过程对使用者是透明的。
1
2
3
4
5
6
|
# 1
nats reply 'weather.>' --command "curl -s wttr.in/{{1}}?format=1" --queue gpA
# 2
nats reply 'weather.>' --command "curl -s wttr.in/{{1}}?format=3" --queue gpA
# 3
nats request weather.beijing '{{Random 5 5}}' --count=10
|
功能非常强大。#3>request通过负载均衡的方式轮流调用了#1和#2做处理,传递了数据,也得到了反馈。
PS:这里有个简单获取天气预报的网址https://wttr.in/beijing?format=3
Go开发示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
package nets
import (
"fmt"
"github.com/nats-io/nats.go"
"time"
)
const serverUrl = "nats://x.x.x.x:4222"
// 发布订阅模型:订阅1
func NatsSub11() {
// Connect to a server
nc, _ := nats.Connect(serverUrl)
defer nc.Close()
// Simple Async Subscriber
if _, err := nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Sub11: %s\n", m.Data)
}); err != nil {
fmt.Println(err)
return
}
time.Sleep(120 * time.Second)
fmt.Printf("Sub11 exit, bye bye...\n")
}
// 发布订阅模型:订阅2
func NatsSub12() {
// Connect to a server
nc, _ := nats.Connect(serverUrl)
defer nc.Close()
// Simple Async Subscriber
if _, err := nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Sub12: %s\n", m.Data)
}); err != nil {
fmt.Println(err)
return
}
time.Sleep(120 * time.Second)
fmt.Printf("Sub12 exit, bye bye...\n")
}
// 发布订阅模型:发布
func NatsPub1() {
// Connect to a server
nc, _ := nats.Connect(serverUrl)
defer nc.Close()
for i := 0; i < 100; i++ {
// Simple Publisher
if err := nc.Publish("foo", []byte(fmt.Sprintf("hello %d", i))); err != nil {
fmt.Println(err)
return
}
time.Sleep(1 * time.Second)
}
fmt.Printf("Pub1 finished, bye bye...\n")
}
func NatsDemoStart() {
go NatsSub11()
go NatsSub12()
go NatsPub1()
time.Sleep(125 * time.Second)
fmt.Printf("NatsDemo finished, bye bye...\n")
}
|
监控
说到监控,除了前面提到的 nats-top 之外,还有诸如 natsboard 之类的 UI 可供选择。
Linux中查看网络包可以用tcpdump:
1
2
3
4
5
|
tcpdump -vv -i ens33 port 4222
# -A 选项来以 ASCII 的形式打印出所有的网络包内容
# -w 选项将所有内容写入 packets.txt 文件
tcpdump -vv -i ens33 port 4222 -A -w packets.txt
|
(完)