通常我们写程序时需要封装,需要函数调用,函数调用通常在本程序内完成。随着计算机技术的发展和需求场景的变化,有时就需要从一台计算机上执行另外一台计算机上的程序的需求,因此发展出来了RPC技术。特别是目前随着互联网技术的快速迭代和发展,用户和需求几乎都是以指数式的方式在高速增长,这个时候绝大多数情况下程序都是部署在多台机器上,就需要在调用其他物理机器上的程序的情况。

什么是RPC

远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。 如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用。

用通俗易懂的语言描述就是:RPC允许跨机器、跨语言调用计算机程序方法。打个比方,我用Go语言写了个获取用户信息的方法GetUserInfo,并把Go程序部署在阿里云服务器上面,现在我有一个部署在腾讯云上面的Node.js项目,需要调用Go的GetUserInfo方法获取用户信息,Node.js跨机器调用Go方法的过程就是RPC调用。

RPC基本原理

RPC 首要解决的是通讯的问题,主流的 RPC 框架分为基于 HTTP 和基于 TCP 的两种。

基于 HTTP 的 RPC 调用很简单,就和我们访问网页一样,只是它的返回结果很单一(JSON 或 XML)。它的优点在于实现简单,标准化和跨语言,比较适合对外提供 OpenAPI 的场景,而它的缺点是 HTTP 协议传输效率较低、短连接开销较大(HTTP 2.0 后有很大改进,异步,协议压缩)。

基于 TCP 的 RPC 调用,由于 TCP 协议处于协议栈的下层,能够更加灵活地对协议字段进行定制,减少网络开销,提高性能,实现更大的吞吐量和并发数。但是需要更多地关注底层复杂的细节,跨语言和跨平台难度大,实现的代价更高,它比较适合内部系统之间追求极致性能的场景。

后面我们讲 RPC 都是基于 TCP 的,因为它是目前业界主流 RPC 框架支持的方式。首先来看看 RPC 调用的基本流程,以便大家对它有宏观的认识:

  1. 调用方(Client)通过本地的 RPC 代理(Proxy)调用相应的接口
  2. 本地代理将 RPC 的服务名,方法名和参数等等信息转换成一个标准的 RPC Request 对象交给 RPC 框架
  3. RPC 框架采用 RPC 协议(RPC Protocol)将 RPC Request 对象序列化成二进制形式,然后通过 TCP 通道传递给服务提供方 (Server)
  4. 服务端(Server)收到二进制数据后,将它反序列化成 RPC Request 对象
  5. 服务端(Server)根据 RPC Request 中的信息找到本地对应的方法,传入参数执行,得到结果,并将结果封装成 RPC Response 交给 RPC 框架
  6. RPC 框架通过 RPC 协议(RPC Protocol)将 RPC Response 对象序列化成二进制形式,然后通过 TCP 通道传递给服务调用方(Client)
  7. 调用方(Client)收到二进制数据后,将它反序列化成 RPC Response 对象,并且将结果通过本地代理(Proxy)返回给业务代码

下图将这个过程表现的很形象。

image-20210121081943604

标准库RPC

Go 标准库net/rpc提供了一个简单、强大且高性能的 RPC 实现。仅需编写很少的代码就能实现 RPC 服务。

HTTP RPC

下面看看例子:Server端Server.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package drpc

import (
	"errors"
	"log"
	"net/http"
	"net/rpc"
)

type Args struct {
	A, B int
}

type Quotient struct {
	Quo, Rem int
}

type Arity int

func (t *Arity) Multiply(args *Args, reply *int) error {
	*reply = args.A * args.B
	return nil
}

func (t *Arity) Divide(args *Args, quo *Quotient) error {
	if args.B == 0 {
		return errors.New("divide by 0")
	}

	quo.Quo = args.A / args.B
	quo.Rem = args.A % args.B
	return nil
}

func RPCServer() {
	arity := new(Arity)
	_ = rpc.Register(arity)
	rpc.HandleHTTP()
	if err := http.ListenAndServe(":1234", nil); err != nil {
		log.Fatal("serve error: ", err)
	}
}

我们定义了一个Arith类型,为它编写了两个方法MultiplyDivide。创建Arith类型的对象arith,调用rpc.Register(arith)会注册这两个方法。rpc库对注册的方法有一定的限制,方法必须满足签名func (t *T) MethodName(argType T1, replyType *T2) error

  • 首先,方法必须是导出的(名字首字母大写);
  • 其次,方法接受两个参数,必须是导出的或内置类型。第一个参数表示客户端传递过来的请求参数,第二个是需要返回给客户端的响应。第二个参数必须为指针类型(需要修改);
  • 最后,方法必须返回一个error类型的值。返回非nil的值,表示调用出错。

rpc.HandleHTTP()注册 HTTP 路由;http.ListenAndServe(":1234", nil)在端口1234上启动一个 HTTP 服务,请求 rpc 方法会交给rpc内部路由处理。

下面看看Client端client.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package drpc

import (
	"fmt"
	"log"
	"net/rpc"
)

func RPCClient() {
	client, err := rpc.DialHTTP("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	args := &Args{7, 8}
	var reply int
	err = client.Call("Arity.Multiply", args, &reply)
	if err != nil {
		log.Fatal("Multiply error:", err)
	}
	fmt.Printf("Multiply: %d * %d = %d\n", args.A, args.B, reply)

	args = &Args{15, 6}
	var quo Quotient
	err = client.Call("Arity.Divide", args, &quo)
	if err != nil {
		log.Fatal("Divide error:", err)
	}
	fmt.Printf("Divide: %d / %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
}

我们使用rpc.DialHTTP("tcp", ":1234")连接到服务端的监听地址,返回一个 rpc 的客户端对象。后续就可以调用该对象的Call()方法调用服务端对象的对应方法,依次传入方法名(需要加上接收者类型限定)、参数、一个指针(用于接收返回值)。

分别运行server端和client端,在client端看到如下结果:

Multiply: 7 * 8 = 56
Divide: 15 / 6 = 2...3

是不是很简单啊,就这样基于HTTP协议的Go标准库RPC就实现了。看看Server是如何实现一个HTTP Web Server 的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// server
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
	http.Handle(rpcPath, server)
	http.Handle(debugPath, debugHTTP{server})
}
func HandleHTTP() {
	DefaultServer.HandleHTTP(DefaultRPCPath, DefaultDebugPath)
}
// client
func DialHTTP(network, address string) (*Client, error) {
	return DialHTTPPath(network, address, DefaultRPCPath)
}
// rpc/server.go
const (
	// Defaults used by HandleHTTP
	DefaultRPCPath   = "/_goRPC_"
	DefaultDebugPath = "/debug/rpc"
)

看源代码就知道了,标准库启动一个Web Server,然后设置了两个路由,默认所有请求走/_goRPC_;这里也可以看出来还有一个供调试的路由/debug/rpc,我们看看是啥内容(不仅显示暴露的函数个数,还显示了被调用次数):

image-20210121094734187

上面的Client是同步调用Server的,如果Server阻塞,客户端也会跟着阻塞。

如何异步调用呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func AsyncRPCClient() {
	client, err := rpc.DialHTTP("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	args1 := &Args{7, 8}
	var reply int
	multiplyReply := client.Go("Arity.Multiply", args1, &reply, nil)

	args2 := &Args{15, 6}
	var quo Quotient
	divideReply := client.Go("Arity.Divide", args2, &quo, nil)

	ticker := time.NewTicker(time.Millisecond)
	defer ticker.Stop()

	var multiplyReplied, divideReplied bool
	for !multiplyReplied || !divideReplied {
		select {
		case replyCall := <-multiplyReply.Done:
			if err := replyCall.Error; err != nil {
				fmt.Println("Multiply error:", err)
			} else {
				fmt.Printf("Multiply: %d*%d=%d\n", args1.A, args1.B, reply)
			}
			multiplyReplied = true
		case replyCall := <-divideReply.Done:
			if err := replyCall.Error; err != nil {
				fmt.Println("Divide error:", err)
			} else {
				fmt.Printf("Divide: %d/%d=%d...%d\n", args2.A, args2.B, quo.Quo, quo.Rem)
			}
			divideReplied = true
		case <-ticker.C:
			fmt.Println("tick")
		}
	}
}

异步调用使用client.Go()方法,参数与同步调用基本一样。它返回一个rpc.Call对象。

定制方法名

默认情况下,rpc.Register()将方法接收者(receiver)的类型名作为方法名前缀。我们也可以自己设置。这时需要调用RegisterName(name string, rcvr interface{}) error方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// server.go
func RenameRPCServer() {
	arity := new(Arity)
	_ = rpc.RegisterName("math", arity)
	_ = rpc.Register(arity)
	rpc.HandleHTTP()
	if err := http.ListenAndServe(":1234", nil); err != nil {
		log.Fatal("serve error: ", err)
	}
}

// client.go
func RenameRPCClient() {
	client, err := rpc.DialHTTP("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	args := &Args{7, 8}
	var reply int
	err = client.Call("math.Multiply", args, &reply)
	if err != nil {
		log.Fatal("Multiply error:", err)
	}
	fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

TCP RPC

上面我们都是使用 HTTP 协议来实现 rpc 服务的,rpc库也支持直接使用 TCP 协议,使用起来非常简单。

首先,服务端先调用net.Listen("tcp", ":1234")创建一个监听某个 TCP 端口的监听器(Accepter),然后使用rpc.Accept(xxx)在此监听器上接受连接并处理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// server.go
func TCPRPCServer() {
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("listen error:", err)
	}

	arity := new(Arity)
	_ = rpc.Register(arity)
	rpc.Accept(listener)
}

// client.go
func TCPRPCClient() {
	client, err := rpc.Dial("tcp", ":1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	args := &Args{7, 8}
	var reply int
	err = client.Call("Arity.Multiply", args, &reply)
	if err != nil {
		log.Fatal("Multiply error:", err)
	}
	fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

自己处理连接

在服务器端可以自己先获取TCP连接请求,然后在此连接上应用RPC协议来解析数据、处理、并返回:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func CustomTCPRPCServer() {
	l, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("listen error:", err)
	}

	arity := new(Arity)
	_ = rpc.Register(arity)

	for {
		conn, err := l.Accept()
		if err != nil {
			log.Fatal("accept error:", err)
		}

		go rpc.ServeConn(conn)
	}
}

自定义编码格式

Go标准库默认客户端与服务端之间的数据使用gob编码,由于其他语言不支持gob编解码方式,所以使用net/rpc库实现的RPC方法没办法进行跨语言调用。不过我们可以使用其它的格式来编码,在服务端我们要实现rpc.ServerCodec接口。

下面我们自定义一个JSON格式的:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// json_codec.go
// server.go
package drpc

import (
	"bufio"
	"encoding/json"
	"io"
	"log"
	"net/rpc"
)

type JsonServerCodec struct {
	rwc    io.ReadWriteCloser
	dec    *json.Decoder
	enc    *json.Encoder
	encBuf *bufio.Writer
	closed bool
}

func NewJsonServerCodec(conn io.ReadWriteCloser) *JsonServerCodec {
	buf := bufio.NewWriter(conn)
	return &JsonServerCodec{conn, json.NewDecoder(conn), json.NewEncoder(buf), buf, false}
}

func (c *JsonServerCodec) ReadRequestHeader(r *rpc.Request) error {
	return c.dec.Decode(r)
}

func (c *JsonServerCodec) ReadRequestBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *JsonServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
	if err = c.enc.Encode(r); err != nil {
		if c.encBuf.Flush() == nil {
			log.Println("rpc: json error encoding response:", err)
			c.Close()
		}
		return
	}
	if err = c.enc.Encode(body); err != nil {
		if c.encBuf.Flush() == nil {
			log.Println("rpc: json error encoding body:", err)
			c.Close()
		}
		return
	}
	return c.encBuf.Flush()
}

func (c *JsonServerCodec) Close() error {
	if c.closed {
		return nil
	}
	c.closed = true
	return c.rwc.Close()
}

// client +++++++++++++++++++++++++++++++++++++++
type JsonClientCodec struct {
	rwc    io.ReadWriteCloser
	dec    *json.Decoder
	enc    *json.Encoder
	encBuf *bufio.Writer
}

func NewJsonClientCodec(conn io.ReadWriteCloser) *JsonClientCodec {
	encBuf := bufio.NewWriter(conn)
	return &JsonClientCodec{conn, json.NewDecoder(conn), json.NewEncoder(encBuf), encBuf}
}

func (c *JsonClientCodec) WriteRequest(r *rpc.Request, body interface{}) (err error) {
	if err = c.enc.Encode(r); err != nil {
		return
	}
	if err = c.enc.Encode(body); err != nil {
		return
	}
	return c.encBuf.Flush()
}

func (c *JsonClientCodec) ReadResponseHeader(r *rpc.Response) error {
	return c.dec.Decode(r)
}

func (c *JsonClientCodec) ReadResponseBody(body interface{}) error {
	return c.dec.Decode(body)
}

func (c *JsonClientCodec) Close() error {
	return c.rwc.Close()
}


// client.go
func CusJsonTCPRPCClient() {
	conn, err := net.Dial("tcp", ":1234")
	if err != nil {
		log.Fatal("dial error:", err)
	}

	client := rpc.NewClientWithCodec(NewJsonClientCodec(conn))

	args := &Args{7, 8}
	var reply int
	err = client.Call("Arity.Multiply", args, &reply)
	if err != nil {
		log.Fatal("Multiply error:", err)
	}
	fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
}

// server.go
func CusJsonTCPRPCServer() {
	l, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("listen error:", err)
	}

	arity := new(Arity)
	rpc.Register(arity)

	for {
		conn, err := l.Accept()
		if err != nil {
			log.Fatal("accept error:", err)
		}
		go rpc.ServeCodec(NewJsonServerCodec(conn))
	}
}

总结

本文介绍了 Go 标准库中的rpc,它使用非常简单,性能异常强大。很多rpc的第三方库都是对rpc的封装。