原文:https://zhuanlan.zhihu.com/p/662574190
好久没有在知乎写随笔捏.. 因为这些天在玩Octopath Traveler和学习MIT 6.5840
课程的说
在MIT 6.5840
课程lab
中构建分布式系统时,节点之间的通信都是通过远程过程调用(Remote Procedure call
)实现的,简单地说就是,RPC
客户端请求远程服务端执行指定函数,并将计算结果通过网络传回
无论是lab1
使用的Go
语言标准库的net/rpc/
还是之后lab
中课程组模拟的带RPC
功能的网络,RPC
的大概流程都是
- 客户端将实参对象序列化到字节,发送到服务端
- 服务端要根据客户端请求正确路由到指定函数,当然函数名可以放到
RPC
传输所依赖的网络协议里,比如HTTP
或自己定义的。在我的实现里,我是利用HTTP
协议完成RPC
的,直接把函数名放在HTTP url
里了.. 然后在服务端写了个url
拦截 - 服务端反序列化得到形参对象,把返回值对象序列化到字节回复客户端,客户端再反序列化
更多关于RPC
的详情,这篇带图的文章讲的很清楚!
综上,为了能在C++
实现RPC
的方法(不用其他第三方库,除了fmt
因为我写时用的C++ 17
所以没有std::format
),我们得写这几部分代码
- 对象序列化/反序列化
- 服务端函数注册和函数的路由
- 客户端
当然第1, 3
部分是很简单啦,但在服务端侧处理RPC
函数时,我们要实现运行期函数多态,这里我是利用std::function
实现,当然用函数指针肯定也可以
首先是对象序列化,搜了一下很多轮子都是用Protobuf
实现对象序列化的,好像是个根据配置文件自动生成类型序列化,反序列化代码的框架
我用的是之前写的一个静态反射宏 + 内部嵌套模板的序列化库,本质上也是代码生成器罢了,只不过是C++
标准自带的而已。这样我们摆脱了对第三方框架的依赖,坏处是编译器会累一点
DEF_DATA_CLASS(Person, (unsigned short) age, (std::string) name,
(std::vector<int>) scores);
Person p{24, "alice", {3, 4, 89}};
cout << Json{p} << endl;
cout << Json{Json{p}.to_type<Person>().value()} << endl;
//{"age":24,"name":"alice","scores":[3,4,89]}
//{"age":24,"name":"alice","scores":[3,4,89]}
用法大概就是这样,现在我们虽然可以(反)序列化对象了,但是我这个实现很naive
,从代码可以看出我使用的序列化协议是JSON
,使用JSON
的话,就会有这些处理与传输开销:转义字符的处理、JSON
自描述(每次都要传输字段名字)、不好压缩空字段、JSON
文本的递归下降解析
所以其实像Protobuf
与Go
中net/rpc
使用的序列化方案gob
都是使用二进制的方案提高性能,所以优化你的rpc
时,可以考虑设计一个高效的二进制序列化方案,然后传输在TCP
协议上
举栗一个我做MIT 6.5840
时遇到的bug
说明Go
语言中gob
对数据空字段的优化
[Worker 1116111] mapping, Task={7 pg-tom_sawyer.txt 0 8 10}
[Worker 1116111] reducing, Task={0 1 8 10}
这里我是同步顺序调用两个RPC
,期望的两个Task
返回值如上
[Worker 1117085] mapping, Task={7 pg-tom_sawyer.txt 0 8 10}
[Worker 1117085] reducing, Task={7 pg-tom_sawyer.txt 1 8 10}
但实际结果是这样的,这里的病灶是,客户端内两个RPC
接受返回值的Task
对象是同一个,但是前两个字段没有在第二次RPC
更新为0
和""
,不用想都想得到这里因为是int
和string
的默认值,服务端应该没有把这两个字段序列化到TCP
字节流里。所以说net/rpc
假设了客户端是用一个Go
中的默认对象(Type{}
)来接受返回值的
第二是选择RPC
底层的网络协议,认真写的话最好用TCP
,多套一层HTTP
有额外开销嘛,反正我因为之前用Linux Epoll
写了一个HTTP
服务器,所以选择在HTTP
之上传输,单纯是懒得再写一个应用层协议了..
当我说应用层协议时,我想说的是我们必须要能分辨出RPC
数据的边界,有实践过网络编程的话就会知道TCP/UDP
传输的是字节流,服务端缓冲区单次读取可能读取的是不完整的RPC
请求,或是多个RPC
请求
所以使用TCP
的话,至少请求起始处要有长度字段标明数据长度给服务端解析
第三是服务端函数注册与路由
namespace my::net::rpc {
static std::string_view default_route_prefix = "/rpc/";
using HandlerType = std::function<std::string(std::string_view)>;
}
这里我的函数名存在HTTP url
中,前缀默认为/rpc/
而RPC
函数的约束我参考的Go rpc
,形参数量必须为1
,而且形参和返回值类型必须能被我的Json
库序列化,对于每一个RPC
函数,创建一个rpc::HandlerType
,它是一个输入std::string_view
(指向HTTP
内存缓冲区内的JSON
),输出std::string
(序列化的RPC
返回值)的std::function
namespace my::net::http {
using RpcFuncTable = std::unordered_map<std::string, rpc::HandlerType>;
// ...
RpcFuncTable m_rpc_funcs;
// ...
template <typename Func>
bool rpc_register(std::string const &name, Func func) {
auto url = std::string{rpc::default_route_prefix} + name;
// $"/rpc/{name}"
if (m_rpc_funcs.find(url) != m_rpc_funcs.end())
return false;
m_rpc_funcs[url] = [f = func](std::string_view content) -> std::string {
using namespace MyJson;
auto j = Json::from_json_text(content);
// ...
using ArgT = std::remove_cv_t<std::remove_reference_t<rpc::FirstArgT<Func>>>;
auto arg = j.value().to_type<ArgT>();
// ...
return Json{f(arg.value())}.to_json_text();
};
return true;
}
}
所以服务端维护一个函数名到相应HandlerType
的哈希m_rpc_funcs
就可以
当通过rpc_register<Func>
注册一个类型为Func
的新函数(C++
函数或函数指针或函数对象)时,利用lambda
写一个函数装饰器,生成的闭包也输入string_view
,返回string
,所以可以在哈希表里构造一个对应的rpc::HandlerType
(也就是std::function<std::string(std::string_view)>
)
闭包内部反序列化、调用注册的函数、序列化就完成了RPC
函数的多态
其中有一个rpc::FirstArgT<Func>
模板,是用于获取函数那单个形参类型的,如果函数是一个C++
函数对象f
,比如lambda
,那我们调用f
时实际调用的是f.operator()
,所以这个模板对函数指针和函数对象作了不同的特化
template <typename T> struct arg_type;
// function
template <typename R, typename Arg> struct arg_type<R(Arg)> {
using type = Arg; };
// function pointer
template <typename R, typename Arg> struct arg_type<R (*)(Arg)> {
using type = Arg; };
// member function pointer
template <typename C, typename R, typename Arg> struct arg_type<R (C::*)(Arg)> {
using type = Arg; };
// const member function pointer
template <typename C, typename R, typename Arg> struct arg_type<R (C::*)(Arg) const> {
using type = Arg;};
template <typename F, typename = void> struct first_type {
using type = typename arg_type<F>::type; };
template <typename F> struct first_type<F, std::enable_if_t<std::is_class_v<F>>> {
using type = typename arg_type<decltype(&F::operator())>::type;};
template <typename F> using FirstArgT = typename detail::first_type<F>::type;
接下来只要让服务端正确路由RPC
请求到相应的rpc::HandlerType
,然后返回HTTP
报文
auto rpc_url = std::string{request.url};
if (rpc_table.find(rpc_url) != rpc_table.end()) {
auto res = rpc_table[rpc_url](request.content);
m_keep_alive = request.keep_alive;
std::stringstream ss;
ss << "HTTP/1.1 200 OK\r\n";
ss << "Connection: " << (m_keep_alive ? "keep-alive" : "close") << "\r\n";
ss << "Content-Length: " << res.size() << "\r\n\r\n";
ss << res;
m_response_buffer.s = ss.str();
}
第四是写客户端,到这部分就是纯搬砖了..
Client cli;
cli.dial("/*server_addr*/:/*server_port*/");
cout << cli.call<string, string>("echo", "i am client, rpc okay").value() << endl;
类似Go rpc
先写一个dial
函数指定RPC
服务端IP
然后写一个模板函数call<ArgT, ResT>
指明RPC
调用的输入输出类型,这里call
使用短连接的话,每次connect
一次服务端发送HTTP
请求
bool dial(const std::string &address);
template <typename ArgT, typename ResT>
std::optional<ResT> call(std::string const &name, ArgT const &arg)
测试一下,先定义数据类型
namespace MyTypeList {
DEF_DATA_CLASS(Arg, (float) a, (int) b)
}
然后在服务器上注册两个RPC
函数,分别是lambda
和普通函数
using namespace my;
using namespace std;
using namespace MyTypeList;
string f(Arg a) {
return string {"[Server Add]"}
+ std::to_string(a.a) + " + "
+ std::to_string(a.b) + " = "
+ std::to_string(a.b + a.a);
}
auto main(int argc, char *argv[]) -> int {
// ...
auto server = net::http::Reactor(config);
server.rpc_register("echo", [](const std::string &x) {
return std::string{"[Server Echo] "} + x;
});
server.rpc_register("calculate", f);
server.run();
}
我的服务端与客户端Log
分别成功打印了
[********:49632][200](RPC) method="POST", url="/rpc/echo", version="HTTP/1.1", host="", keep-alive=false, content-length=23, content="i am client, rpc okay"
[********:49640][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=20, content={"a":7.59506,"b":19}
[********:49650][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=20, content={"a":1.42983,"b":15}
[********:49652][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=19, content={"a":11.4494,"b":2}
[********:49668][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=19, content={"a":12.5055,"b":3}
[********:49682][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=20, content={"a":3.33604,"b":15}
和
[Server Echo] i am client, rpc okay
[Server Add]7.595060 + 19 = 26.595060
[Server Add]1.429830 + 15 = 16.429831
[Server Add]11.449400 + 2 = 13.449400
[Server Add]12.505500 + 3 = 15.505500
[Server Add]3.336040 + 15 = 18.336040
Full Code: [server, rpc_client]