C++实现简易RPC

原文:https://zhuanlan.zhihu.com/p/662574190

好久没有在知乎写随笔捏.. 因为这些天在玩Octopath Traveler和学习MIT 6.5840课程的说

MIT 6.5840课程lab中构建分布式系统时,节点之间的通信都是通过远程过程调用(Remote Procedure call)实现的,简单地说就是,RPC客户端请求远程服务端执行指定函数,并将计算结果通过网络传回

无论是lab1使用的Go语言标准库的net/rpc/还是之后lab中课程组模拟的带RPC功能的网络,RPC的大概流程都是

  1. 客户端将实参对象序列化到字节,发送到服务端
  2. 服务端要根据客户端请求正确路由到指定函数,当然函数名可以放到RPC传输所依赖的网络协议里,比如HTTP或自己定义的。在我的实现里,我是利用HTTP协议完成RPC的,直接把函数名放在HTTP url里了.. 然后在服务端写了个url拦截
  3. 服务端反序列化得到形参对象,把返回值对象序列化到字节回复客户端,客户端再反序列化

更多关于RPC的详情,这篇带图的文章讲的很清楚!

综上,为了能在C++实现RPC的方法(不用其他第三方库,除了fmt因为我写时用的C++ 17所以没有std::format),我们得写这几部分代码

  • 对象序列化/反序列化
  • 服务端函数注册和函数的路由
  • 客户端

当然第1, 3部分是很简单啦,但在服务端侧处理RPC函数时,我们要实现运行期函数多态,这里我是利用std::function实现,当然用函数指针肯定也可以

首先是对象序列化,搜了一下很多轮子都是用Protobuf 实现对象序列化的,好像是个根据配置文件自动生成类型序列化,反序列化代码的框架

我用的是之前写的一个静态反射宏 + 内部嵌套模板的序列化库,本质上也是代码生成器罢了,只不过是C++标准自带的而已。这样我们摆脱了对第三方框架的依赖,坏处是编译器会累一点

DEF_DATA_CLASS(Person, (unsigned short) age, (std::string) name,
               (std::vector<int>) scores);
Person p{24,  "alice", {3, 4, 89}}; 
cout << Json{p} << endl;
cout << Json{Json{p}.to_type<Person>().value()} << endl;
//{"age":24,"name":"alice","scores":[3,4,89]}
//{"age":24,"name":"alice","scores":[3,4,89]}

用法大概就是这样,现在我们虽然可以(反)序列化对象了,但是我这个实现很naive,从代码可以看出我使用的序列化协议是JSON,使用JSON的话,就会有这些处理与传输开销:转义字符的处理、JSON自描述(每次都要传输字段名字)、不好压缩空字段、JSON文本的递归下降解析

所以其实像ProtobufGonet/rpc使用的序列化方案gob都是使用二进制的方案提高性能,所以优化你的rpc时,可以考虑设计一个高效的二进制序列化方案,然后传输在TCP协议上

举栗一个我做MIT 6.5840时遇到的bug说明Go语言中gob对数据空字段的优化

[Worker 1116111] mapping, Task={7 pg-tom_sawyer.txt 0 8 10}
[Worker 1116111] reducing, Task={0  1 8 10}

这里我是同步顺序调用两个RPC,期望的两个Task返回值如上

[Worker 1117085] mapping, Task={7 pg-tom_sawyer.txt 0 8 10}
[Worker 1117085] reducing, Task={7 pg-tom_sawyer.txt 1 8 10}

但实际结果是这样的,这里的病灶是,客户端内两个RPC接受返回值的Task对象是同一个,但是前两个字段没有在第二次RPC更新为0"",不用想都想得到这里因为是intstring的默认值,服务端应该没有把这两个字段序列化到TCP字节流里。所以说net/rpc假设了客户端是用一个Go中的默认对象(Type{})来接受返回值的

第二是选择RPC底层的网络协议,认真写的话最好用TCP,多套一层HTTP有额外开销嘛,反正我因为之前用Linux Epoll写了一个HTTP服务器,所以选择在HTTP之上传输,单纯是懒得再写一个应用层协议了..

当我说应用层协议时,我想说的是我们必须要能分辨出RPC数据的边界,有实践过网络编程的话就会知道TCP/UDP传输的是字节流,服务端缓冲区单次读取可能读取的是不完整的RPC请求,或是多个RPC请求

所以使用TCP的话,至少请求起始处要有长度字段标明数据长度给服务端解析

第三是服务端函数注册与路由

namespace my::net::rpc {
    static std::string_view default_route_prefix = "/rpc/";
    using HandlerType = std::function<std::string(std::string_view)>;
}

这里我的函数名存在HTTP url中,前缀默认为/rpc/

RPC函数的约束我参考的Go rpc,形参数量必须为1,而且形参和返回值类型必须能被我的Json库序列化,对于每一个RPC函数,创建一个rpc::HandlerType,它是一个输入std::string_view(指向HTTP内存缓冲区内的JSON),输出std::string(序列化的RPC返回值)的std::function

namespace my::net::http {
    
using RpcFuncTable = std::unordered_map<std::string, rpc::HandlerType>;
    // ...
	RpcFuncTable m_rpc_funcs;
    // ...
    
template <typename Func>
  bool rpc_register(std::string const &name, Func func) {
    auto url = std::string{rpc::default_route_prefix} + name;
    // $"/rpc/{name}"
    if (m_rpc_funcs.find(url) != m_rpc_funcs.end())
      return false;
    m_rpc_funcs[url] = [f = func](std::string_view content) -> std::string {
      using namespace MyJson;
      auto j = Json::from_json_text(content);
      // ...
      using ArgT = std::remove_cv_t<std::remove_reference_t<rpc::FirstArgT<Func>>>; 
      auto arg = j.value().to_type<ArgT>();
      // ...
      return Json{f(arg.value())}.to_json_text();
    };
    return true;
  }
}

所以服务端维护一个函数名到相应HandlerType的哈希m_rpc_funcs就可以

当通过rpc_register<Func>注册一个类型为Func的新函数(C++函数或函数指针或函数对象)时,利用lambda写一个函数装饰器,生成的闭包也输入string_view,返回string,所以可以在哈希表里构造一个对应的rpc::HandlerType(也就是std::function<std::string(std::string_view)>

闭包内部反序列化、调用注册的函数、序列化就完成了RPC函数的多态

其中有一个rpc::FirstArgT<Func>模板,是用于获取函数那单个形参类型的,如果函数是一个C++函数对象f,比如lambda,那我们调用f时实际调用的是f.operator(),所以这个模板对函数指针和函数对象作了不同的特化

template <typename T> struct arg_type;
// function
template <typename R, typename Arg> struct arg_type<R(Arg)> { 
   using type = Arg; };
// function pointer
template <typename R, typename Arg> struct arg_type<R (*)(Arg)> { 
   using type = Arg; };
// member function pointer
template <typename C, typename R, typename Arg> struct arg_type<R (C::*)(Arg)> { 
   using type = Arg; };
// const member function pointer
template <typename C, typename R, typename Arg> struct arg_type<R (C::*)(Arg) const> {
   using type = Arg;};
template <typename F, typename = void> struct first_type { 
   using type = typename arg_type<F>::type; };
template <typename F> struct first_type<F, std::enable_if_t<std::is_class_v<F>>> {
   using type = typename arg_type<decltype(&F::operator())>::type;}; 

template <typename F> using FirstArgT = typename detail::first_type<F>::type;

接下来只要让服务端正确路由RPC请求到相应的rpc::HandlerType,然后返回HTTP报文

auto rpc_url = std::string{request.url};
if (rpc_table.find(rpc_url) != rpc_table.end()) {
    auto res = rpc_table[rpc_url](request.content);
    m_keep_alive = request.keep_alive;
    std::stringstream ss;
    ss << "HTTP/1.1 200 OK\r\n";
    ss << "Connection: " << (m_keep_alive ? "keep-alive" : "close") << "\r\n";
    ss << "Content-Length: " << res.size() << "\r\n\r\n";
    ss << res;
    m_response_buffer.s = ss.str();
}

第四是写客户端,到这部分就是纯搬砖了..

Client cli;
cli.dial("/*server_addr*/:/*server_port*/");
cout << cli.call<string, string>("echo", "i am client, rpc okay").value() << endl;

类似Go rpc先写一个dial函数指定RPC服务端IP

然后写一个模板函数call<ArgT, ResT>指明RPC调用的输入输出类型,这里call使用短连接的话,每次connect一次服务端发送HTTP请求

bool dial(const std::string &address);

template <typename ArgT, typename ResT>
std::optional<ResT> call(std::string const &name, ArgT const &arg)

测试一下,先定义数据类型

namespace MyTypeList {
    DEF_DATA_CLASS(Arg, (float) a, (int) b)
}

然后在服务器上注册两个RPC函数,分别是lambda和普通函数

using namespace my; 
using namespace std;
using namespace MyTypeList; 
string f(Arg a) {
  return string {"[Server Add]"} 
    + std::to_string(a.a) + " + "
    + std::to_string(a.b) + " = "
    + std::to_string(a.b + a.a);
}
auto main(int argc, char *argv[]) -> int {
  // ...
  auto server = net::http::Reactor(config);
  server.rpc_register("echo", [](const std::string &x) {
    return std::string{"[Server Echo] "} + x;
  });

  server.rpc_register("calculate", f);
  server.run();
}

我的服务端与客户端Log分别成功打印了

[********:49632][200](RPC) method="POST", url="/rpc/echo", version="HTTP/1.1", host="", keep-alive=false, content-length=23, content="i am client, rpc okay"
[********:49640][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=20, content={"a":7.59506,"b":19}
[********:49650][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=20, content={"a":1.42983,"b":15}
[********:49652][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=19, content={"a":11.4494,"b":2}
[********:49668][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=19, content={"a":12.5055,"b":3}
[********:49682][200](RPC) method="POST", url="/rpc/calculate", version="HTTP/1.1", host="", keep-alive=false, content-length=20, content={"a":3.33604,"b":15}

[Server Echo] i am client, rpc okay
[Server Add]7.595060 + 19 = 26.595060
[Server Add]1.429830 + 15 = 16.429831
[Server Add]11.449400 + 2 = 13.449400
[Server Add]12.505500 + 3 = 15.505500
[Server Add]3.336040 + 15 = 18.336040

Full Code: [server, rpc_client]