python coroutines

协程

由程序负责任务切换,可以减少线程/进程上下文切换的消耗。用户态实现任务切换,无需进入内核态。

用途

虽然 Python 有多线程的概念,但是由于 GIL 的存在,并不能利用多核资源。如果易不能充分利用单进程资源,可能会带来严重的性能问题。

相关

EventLoop

python 默认只为主线程创建 loop。如下 tornado 代码实现了自动为创建 loop 的功能,使用 asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) 来生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if sys.platform == "win32" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
# "Any thread" and "selector" should be orthogonal, but there's not a clean
# interface for composing policies so pick the right base.
_BasePolicy = asyncio.WindowsSelectorEventLoopPolicy # type: ignore
else:
_BasePolicy = asyncio.DefaultEventLoopPolicy


class AnyThreadEventLoopPolicy(_BasePolicy): # type: ignore
"""Event loop policy that allows loop creation on any thread.

The default `asyncio` event loop policy only automatically creates
event loops in the main threads. Other threads must create event
loops explicitly or `asyncio.get_event_loop` (and therefore
`.IOLoop.current`) will fail. Installing this policy allows event
loops to be created automatically on any thread, matching the
behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).

Usage::

asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())

.. versionadded:: 5.0

"""

def get_event_loop(self) -> asyncio.AbstractEventLoop:
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
# This was an AssertionError in Python 3.4.2 (which ships with Debian Jessie)
# and changed to a RuntimeError in 3.4.3.
# "There is no current event loop in thread %r"
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop

示例

定时器

下面是使用协程实现的定时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# coding: utf-8
import asyncio
import threading
import time
from datetime import datetime
from typing import Callable


class Scheduler:
cache: set[str] = set()

@classmethod
async def _do_schedule(cls, name: str, delay: int, interval: int, cb: Callable, args, kwargs):
await asyncio.sleep(delay)
while name in cls.cache:
try:
cb(*args, **kwargs)
except Exception as e:
print('execute target failed, e=', e)
await asyncio.sleep(interval)

@classmethod
def _schedule_wrapper(cls, name: str, delay: int, interval: int, cb: Callable, args, kwargs):
asyncio.run(cls._do_schedule(name, delay, interval, cb, args, kwargs))

@classmethod
def schedule(cls, name: str, delay: int, interval: int, cb: Callable, *args, **kwargs):
assert name not in cls.cache, 'duplicate scheduler with name ' + name
threading.Thread(target=cls._schedule_wrapper,
args=(name, delay, interval, cb, args, kwargs),
daemon=True).start()

cls.cache.add(name)

@classmethod
def stop(cls, name: str):
if name in cls.cache:
cls.cache.remove(name)


def cbk(a, b, c):
print('execute at', datetime.now(), 'with args:', (a, b, c))


if __name__ == '__main__':
Scheduler.schedule('first', 1, 1, cbk, 'a', 'b', c='c')
Scheduler.schedule('second', 1, 1, cbk, 'd', 'e', c='f')
time.sleep(3)
Scheduler.stop('first')
try:
while True:
pass
except KeyboardInterrupt:
pass

异常

loop argument must agree with Future

下看下抛出异常的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('A Future, a coroutine or an awaitable is required')

参考

curator x discovery

概念

  • ServiceDiscovery ,创建 ServiceProvider 对象,首先需要有 ServiceDiscovery ;所有请求直接访问 zk
  • ServiceProvider, 特定服务发现的封装,并集成了负载均衡策略;集成了 ServiceCache ,有节点监听和缓存
  • ServiceCache ,会在本地内存缓存,并使用 watcher 来保持数据最新

说明

ServiceDiscoveryServiceProvider 需要调用 start 方法后可用。

注册

  • 使用 ServiceDiscoveryregisterService 注册服务后,只要 ServiceDiscoverystop ,会一直保持节点注册
  • 服务被强制 stop ,没有及时调用 unregisterService 接口来取消注册,zk 节点会保存一段时间(几秒),然后由 zk 摘除

查询

  • ServiceProvider 的接口,会实时调用 zk 查询数据,

监听

ServiceCacheListener 有两个方法。

  • cacheChanged 当服务节点变化时,会调用该方法
  • stateChanged 当 zk 连接状态变化时,会调用该方法

grpc java

Channel

gRPC 代码实现中,Channel 是一个虚拟类,是物理连接的逻辑概念。ManagedChannelImpl 和 ManagedChannelImpl2 继承了该类,并实现了 newCall 和 authority 接口。

SubChannel

SubChannel 是 LoadBalancer 的内部类,在了解 SubChannel 之前,需要先了解 SocketAddress 和 EquivalentAddressGroup。

SocketAddress

SocketAddress 是一个虚拟类,代表一个不包含协议信息的 Socket 地址。

EquivalentAddressGroup

EquivalentAddressGroup 是一组 SocketAddress,在 Channel 创建连接时,其包含的 SocketAddress 被视为无差异的。

SubChannel

再回到 SubChannel,他表示和一个服务器或者 EquivalentAddressGroup 表示的一组等价服务器的 一个物理连接,这里需要注意的是,他至多有一个物理连接。在发起新的 RPCs 时,如果没有激活的 transport,在被安排 RPC 请求时,会创建 transport。调用 requestConnection ,会请求 Subchannel 在没有 transport 的情况下创建 transport。

SubChannel 有一个 List<EquivalentAddressGroup> 属性,可以通过 setAddresses(List<EquivalentAddressGroup> addrs)setAddresses(EquivalentAddressGroup addrs) 设置。

InternalSubchannel

InternalSubchannel 表示一个 SocketAddress 的 Transports 。他实现了 TransportProvider 接口,定义了 obtainActiveTransport 方法,该方法如果没有激活的 transports,则会调用 startNewTransport 进行创建。

获取 SocketAddress

在创建 transports 时,需要先获取 SocketAddress。在创建 InternalSubChannel 时,会传入 List<EquivalentAddressGroup>需要注意的是,InternalSubChannel 默认使用 第一个 EquivalentAddressGroup 的 第一个 SocketAddress ,只有在 transport 被关闭时,才会尝试下一个服务地址。

尝试完所有的地址,全部失败后,此时 InternalSubChannel 处于 TRANSIENT_FAILURE 状态,等待一个 delay 时间后,重新尝试。

NameResolver

NameResolver 是一个可插拔的组件(pluggable component),代码层面是一个接口,用来解析一个 target(URI),并返回给调用方一个地址列表,gRPC 内置了 DnsNameResolver。

地址的返回是基于 Listener 机制,NameResolver 的实现类,需要定义 start 方法,方法会传入一个 Listener,当服务列表发生变化时,调用 Listener 的 onResult 方法,通知 NameResolver 的持有方。

LoadBalancer

LoadBalancer 是一个可插拔的组件,接受 NameResolver 拿到的服务方列表,提供可用的 SubChannel。

RoundRobinLoadBalancer

从 RoundRobinLoadBalancer 的 handleResolvedAddresses 实现可以发现。

  • 每次刷新时

    • 对新增服务方创建 SubChannel
    • 对于删掉的服务方进行剔除
    • 对于可用的服务方,不会重新创建 SubChannel

ManagedChannelImpl

+Channel

Channel 去执行一次远程调用,是通过 newCall 方法,传入 **MethodDescriptor ** 和 CallOptions。对于 ManagedChannelImpl,其实现 Channel 接口,newCall 方法转而调用 InterceptorChannel 的 newCall,先简单说下 InterceptorChannel。

managedChannelImpl 字段是调用 ClientInterceptors.intercept(channel, interceptors) 构造,先说 InterceptorChannel 再说 ClientInterceptors。

InterceptorChannel 将 Interceptor 和 Channel 的结合,由 channel + interceptor 构造,调用 channel 的 newCall 时,会执行 interceptor 的 interceptCall,该调用会传入 channel。对于一个原始 channel 和 多个 interceptor,先将 interceptor 倒序,然后依次创建 InterceptorChannel,进行包装。

1
2
3
for (ClientInterceptor interceptor : interceptors) {
channel = new InterceptorChannel(channel, interceptor);
}

相比之下,ClientInterceptors 只是一个工具类。

接着,怎么用上 NameSolver 的。在构造 interceptorChannel 时,传入一个 channel。这个channel 是一个 RealChannel 对象。

RealChannel 实现了 Channel 接口。

+SubChannel

这里需要再提一下,Channel 是逻辑连接,SubChannel 是物理连接。ManagedChannelImpl 实现了 Channel 接口,同时,有一个内部类 SubchannelImpl 实现 SubChannel。

创建物理连接

首先调用 SubchannelImpl 的 requestConnection ,在方法内会调用 InternalSubchannelobtainActiveTransport 创建和 SocketAddress 的连接。

+NameResolver

在 ManagedChannelImpl 内,定义了 NameResolverListener,实现了NameResolver.Listener 接口,在 NameResolverListener 内做了 LoadBalancer 和 NameResolver 的结合。

NameResolver + LoadBalancer

在 NameResolverListener 的 onResult 方法内,当服务器地址变更时会执行改方法。首先,会从参数中获取服务端列 表List<EquivalentAddressGroup>,接下来调用 AutoConfiguredLoadBalancer 的 tryHandleResolvedAddresses 方法,再调用 LoadBalancer 的 handleResolvedAddresses。整个调用实现比较绕,直接了解内置的 LB 即可。

yEd

自适应大小

动态调整node至label大小

File > Preferences > Editor > Dynamically Adjust Node Size to Label Size

新建节点时,node 大小根据 label 大小动态调整。

不启用效果

启用效果

调整格式

调整node适应label

Tools > Fit Node To Label

假设,有如下 node。

调整后效果如下。

rsync

安装

1
$ brew install rsync

参数

1
2
P	等价于 --partial --progress
a archive 模式

同步文件夹

1
2
3
4
5
$ rsync -Pav <local> <user@ip:remote-dist>

# 使用特定 id_rsa
$ rsync -Pav -e "ssh -i ~/.ssh/id_rsa_sensors" <local> <user@ip:remote-dist>

实时同步

使用工具 fswatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 安装依赖
$ brew install fswatch

# watch 文件变动
$ fswatch . | xargs -n1 -I{} <do-something>

# 定义同步方法
function dosyn() {
if [ ! -e .RSYNCING ]; then
touch .RSYNCING
echo "begin to sync"
rsync -Pav -e "ssh -i ~/.ssh/<secret-id>" <local> <user@ip:remote-dist> # 修改这里
echo "rsync done at $(date), sleep 30 seconds"
sleep 30
rm .RSYNCING
echo "sleep done at $(date)"
else
echo "syncing OR sleeping ..."
fi
}

[ -e .RSYNCING ] && rm .RSYNCING
export -f dosyn
fswatch -e .RSYNCING -ro . | xargs -P0 -n1 -I{} bash -c 'dosyn'

dubbo

模块

/dev-guide/images/dubbo-modules.jpg

  • dubbo-common : Util 类和通用模型
  • dubbo-remoting : 远程通讯模块,提供通用的客户端和服务端的通讯能力;相当于 dubbo 协议的实现,如果 RPC 使用 RMI 协议则不需要此包;最小原则,可以只看 dubbo-remoting-api + dubbo-remoting-netty4dubbo-remoting-zookeeper
  • dubbo-rpc : 远程调用模块,抽象各种协议,以及动态代理,只包含一对一调用,不关心集群管理;从包图可以看出,dubbo-rpc 是 dubbo 的中心
    • dubbo-rpc-api : 抽象各种协议以及动态代理,实现一对一调用
  • dubbo-cluster : 集群模块,提供多个服务方伪装为一个提供方能力,包括:负载均衡、容错、路由等,集群的地址列表可以静态配置,也可以由注册中心下发
    • 容错:Cluster 接口 + cluster.support 包;Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker(Java 动态代理),对上层透明;伪装过程中包含了容错逻辑
    • 目录(directory):Directory 接口 + cluster.directory 包;Directory 代表了多个 Invoker,可以看做动态变化的 Invoker List,每个 Invoker 对应一个服务端节点
    • 路由(router):Router 接口 + cluster.router 包;负责从多个 Invoker 中按路由规则选出子集,用于调用
    • 配置:Configurator 接口 + cluster.configurator
    • 负载均衡(loadbalance):LoadBalance 接口和 cluster.loadbalance 包;负责从多个 Invoker 中根据负载均衡算法,选出具体的一个用于本次调用
    • 合并结果(merger):Merger 接口 + cluster.merger 包;合并返回结果,用于分组聚合

集群容错

  • dubbo-registry : 注册中心模块,基于注册中心下发地址的集群方式,以及对各种注册中心的抽象
  • dubbo-monitor : 监控模块,统计服务调用次数,调用时间,调用链路跟踪的服务
  • dubbo-config : 配置模块,是 dubbo 对外的 API,用户通过 Config 使用 dubbo,隐藏 dubbo 的所有细节
  • dubbo-container : 容器模块,是一个 Standlone 容器,以简单的 Main 加载 Spring 启动(用于代替容器启动),因为服务通常不需要 Tomcat、JBoss 等Web 容器的特性,没必要用 Web 容器取加载服务
  • dubbo-filter过滤器模块,提供了内置过滤器;
    • doubbo-filter-cache :缓存过滤器
    • dubbo-filter-validation :参数验证过滤器
  • dubbo-plugin :插件模块,提供内置的插件
    • dubbo-qos :提供在线运维命令

BOM

BOM(Bill Of Materials),为了防止用 Maven 管理 Spring 项目时,不同的项目依赖了不同版本的 Spring,可以使用 Maven BOM 来解决这一问题。

doubbo-dependencies-bom

dubbo-dependencies-bom/pom.xml统一定义了 Dubbo 依赖的三方库的版本号。

dubbo-dependencies-bom 文件dubbo-parent ,会引入该 BOM。

引入 dubbo-dependencies-bom 文件

dubbo-bom

dubbo-bom/pom.xml统一定义了 Dubbo 的版本号。

dubbo-bom 文件

dubbo-demodubbo-test 会引入该 BOM 。以 dubbo-demo 为例。

引入 dubbo-bom 文件

dubbo-parent

Dubbo 的 Maven 模块,都会引入该 pom 文件。以 dubbo-cluster 为例。

引入 dubbo-parent 文件

dubbo-all

dubbo/all/pom.xml ,定义了 dubbo 的打包脚本。我们在使用 dubbo 库时,引入该 pom 文件。

引入关系

流程

/dev-guide/images/dubbo-framework.jpg

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类;由 dubbo-config 模块实现
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
  • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

关系说明

  • 在 RPC 中,Protocol 是核心层,只要 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上添加 Filter 拦截点
  • Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可
  • Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker

依赖关系

/dev-guide/images/dubbo-relation.jpg

  • 图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互。
  • 图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点。
  • 图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用。
  • 图中只包含 RPC 的层,不包含 Remoting 的层,Remoting 整体都隐含在 Protocol 中。

调用链

/dev-guide/images/dubbo-extension.jpg

  • 垂直分层如下:
    • 下方 淡蓝背景( Consumer ):服务消费方使用的接口
    • 上方 淡绿色背景( Provider ):服务提供方使用的接口
    • 中间 粉色背景( Remoting ):通信部分的接口
  • 自 LoadBalance 向上,每一行分成了多个相同的 Interface ,指的是负载均衡后,向 Provider 发起调用。
  • 左边 括号 部分,代表了垂直部分更细化的分层,依次是:Common、Remoting、RPC、Interface 。
  • 右边 蓝色虚线( Init ) 为初始化过程,通过对应的组件进行初始化。例如,ProxyFactory 初始化出 Proxy 。

暴露服务时序

展开总设计图左边服务提供方暴露服务的蓝色初始化链,时序图如下。

/dev-guide/images/dubbo-export.jpg

引用服务时序

展开总设计图右边服务消费方引用服务的蓝色初始化链,时序图如下。

/dev-guide/images/dubbo-refer.jpg

领域模型

Invoker

Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

满眼都是 Invoker

类图。

Invoker 子类

Invocation

Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。

Invocation 子类

Result

Result 是会话域,它持有调用过程中返回值,异常等。

Invocation 子类

Filter

过滤器接口,和我们平时理解的 javax.servlet.Filter 基本一致。

Filter 子类

ProxyFactory

代理工厂接口。服务消费着引用服务的 主过程 如下图。

服务消费着引用服务的主过程

从图中我们可以看出,方法的 invoker 参数,通过 Protocol 将 Service接口 创建出 Invoker 。通过创建 Service 的 Proxy ,实现我们在业务代理调用 Service 的方法时,透明的内部转换成调用 Invoker 的 #invoke(Invocation) 方法。

服务提供者暴露服务的 主过程 如下图。

服务提供者暴露服务的主过程

从图中我们可以看出,该方法创建的 Invoker ,下一步会提交给 Protocol ,从 Invoker 转换到 Exporter 。

类图如下。

ProxyFactory 子类

从图中,我们可以看出 Dubbo 支持 Javassist 和 JDK Proxy 两种方式生成代理。

Protocol

Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口。它负责 Invoker 的生命周期管理。

Dubbo 处理服务暴露的关键就在 Invoker 转换到 Exporter 的过程。下面我们以 Dubbo 和 RMI 这两种典型协议的实现来进行说明。

  • Dubbo 的实现
    Dubbo 协议的 Invoker 转为 Exporter 发生在 DubboProtocol 类的 export 方法,它主要是打开 socket 侦听服务,并接收客户端发来的各种请求,通讯细节由 Dubbo 自己实现。
  • RMI 的实现
    RMI 协议的 Invoker 转为 Exporter 发生在 RmiProtocol 类的 export 方法,它通过 Spring 或 Dubbo 或 JDK 来实现 RMI 服务,通讯细节这一块由 JDK 底层来实现,这就省了不少工作量。

Protocol 子类

Exporter

Exporter ,Invoker 暴露服务在 Protocol 上的对象。

Exporter 子类

InvokerListener

Invoker 监听器。

InvokerListener 子类

ExporterListener

Exporter 监听器。

ExporterListener 子类

配置

dubbo 支持多种配置方式:

  • API 配置
  • 属性配置
  • XML 配置
  • 注解配置

所有配置项分为三大类。

  • 服务发现:表示该配置项用于服务的注册与发现,目的是让消费方找到提供方。
  • 服务治理:表示该配置项用于治理服务间的关系,或为开发测试提供便利条件。
  • 性能调优:表示该配置项用于调优性能,不同的选项对性能会产生影响。

所有配置最终都将转换为 Dubbo URL 表示,并由服务提供方生成,经注册中心传递给消费方,各属性对应 URL 的参数,参见配置项一览表中的对应URL参数列。

首先看下 dubbo-config-api 项目结构。

dubbo-config-api 项目结构

整理下配制间的关系。

配置之间的关系

配置可分为四部分。

  • application-shared

  • provider-side

  • consumer-side

  • sub-config

配置类关系如下。

配置类关系

扩展

做框架应该考虑的一些问题。

  • 避免不必要的引用
  • 依赖接口,而不是具体的实现

扩展点

系统通过扩展机制,来横向扩展系统功能,只要符合对应扩展规范,扩展的过程无需修改系统代码。所谓扩展点,是系统定义的,可以支持扩展的点(流程的某个步骤、代码的某个点)。

dubbo 提供了大量扩展点。

  • 协议扩展
  • 调用拦截扩展
  • 引用监听扩展
  • 暴露监听扩展
  • 集群扩展
  • 路由扩展
  • 负载均衡扩展
  • 合并结果扩展
  • 注册中心扩展
  • 监控中心扩展
  • 扩展点加载扩展
  • 动态代理扩展
  • 编译器扩展
  • 消息派发扩展
  • 线程池扩展
  • 序列化扩展
  • 网络传输扩展
  • 消息交换扩展
  • 组网扩展
  • Telnet 命令扩展
  • 容器扩展
  • 页面扩展
  • 缓存扩展
  • 验证扩展
  • 日志适配扩展

改进

dubbo 借鉴 JDK 标准的 SPI(Service Provider Interface)扩展点机制,重新实现了一套 SPI 机制,做了如下改进。

  • JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。

  • 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过 getName() 获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。

  • 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

代码结构

ubbo SPI 在 dubbo-commonextension 包实现,如下图所示。

代码结构

注解

@SPI

扩展点接口的标识。

@Adaptive

自适应拓展信息的标记。

可添加方法上,分别代表了两种不同的使用方式。标记在拓展接口的方法上,代表自动生成代码实现该接口的 Adaptive 拓展实现类;标记在上,代表手动实现它是一个拓展接口的 Adaptive 拓展实现类。

一个拓展接口,有且仅有一个 Adaptive 拓展实现类。

@Active

自动激活条件的标记。

说明

bazel - usage

查看所有 Targets

1
$ bazel query //... --output label_kind

修改输出路径

1
2
3
# 仍会在项目创建 bazel-* 软链, 一般无需修改
$ bazel --output_base=output build //... # 修改输出路径
$ bazel --output_user_root=output build //... # 修改输出 & 安装路径

使用 github 依赖

在 WORKSPACE 中指定

1
2
3
4
5
http_archive(
name = "com_google_googleapis",
strip_prefix = "googleapis-8b976f7c6187f7f68956207b9a154bc278e11d7e",
urls = ["https://github.com/googleapis/googleapis/archive/8b976f7c6187f7f68956207b9a154bc278e11d7e.tar.gz"],
)

这里的 8b976f7c6187f7f68956207b9a154bc278e11d7e 为 commit id,下载 url 为 https://github.com/{user}/{repo}/archive/{commit}.tar.gz

BUILD 文件中引用

1
2
3
4
5
6
7
8
9
10
11
12
13
proto_library(
name = "person_proto",
srcs = ["person.proto"],
deps = [
"@com_google_googleapis//google/api:annotations_proto", # 这里引用
"@com_google_googleapis//google/api:client_proto",
"@com_google_googleapis//google/api:field_behavior_proto",
"@com_google_googleapis//google/api:resource_proto",
"@com_google_protobuf//:empty_proto",
"@com_google_protobuf//:field_mask_proto",
"@com_google_protobuf//:any_proto",
],
)

proto 中引用

1
2
3
4
import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/api/resource.proto";

部署 jar 包

WORKSPACE 中添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
RULES_JVM_EXTERNAL_TAG = "4.0"
RULES_JVM_EXTERNAL_SHA = "31701ad93dbfe544d597dbe62c9a1fdd76d81d8a9150c2bf1ecf928ecdf97169"

http_archive(
name = "rules_jvm_external",
strip_prefix = "rules_jvm_external-%s" % RULES_JVM_EXTERNAL_TAG,
sha256 = RULES_JVM_EXTERNAL_SHA,
url = "https://github.com/bazelbuild/rules_jvm_external/archive/%s.zip" % RULES_JVM_EXTERNAL_TAG,
)

load("@rules_jvm_external//:defs.bzl", "maven_install")

maven_install(
artifacts = [
"junit:junit:4.12",
"androidx.test.espresso:espresso-core:3.1.1",
"org.hamcrest:hamcrest-library:1.3",
],
repositories = [
# Private repositories are supported through HTTP Basic auth
# "http://test:Test1234@localhost:8081/artifactory/maven-repo-demo",
"https://maven.google.com",
"https://repo1.maven.org/maven2",
],
)

load("@rules_jvm_external//:repositories.bzl", "rules_jvm_external_deps")
rules_jvm_external_deps()
load("@rules_jvm_external//:setup.bzl", "rules_jvm_external_setup")
rules_jvm_external_setup()

BUILD 文件添加 exporter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java_gapic_library(
name = "java_cook_gapic",
srcs = [":cook_proto"],
gapic_yaml = "cook_gapic.yaml",
grpc_service_config = "cook_gapic_service_config.json",
deps = [
":java_cook_proto",
],
)

# 定义 exporter
java_export(
name = "java_cook_gapic_export",
maven_coordinates = "pub.wii.cook:cook-v1:0.0.1",
# pom_template = "pom.tmpl", # You can omit this
# srcs = glob(["*.java"]), # 如果是 java 项目, 这里部署的是 gapic 生成的 jar 包, 先注释掉
runtime_deps = [
":java_cook_gapic"
],
)

执行 exporter

1
2
3
4
5
# 部署到本地
$ bazel run --define "maven_repo=file://$HOME/.m2/repository" //cook/v1:java_cook_gapic_export.publish

# 部署到 maven 仓库, 注意这里要用 https 协议, 不支持 http
$ bazel run --define "maven_repo=https://127.0.0.1:8081/artifactory/maven-repo-demo" --define "maven_user=test" --define "maven_password=Test1234" //cook/v1:java_cook_gapic_export.publish

gapis

Introduction

打包

使用 gRPC + protocol buffers 时,会面临三种打包方式,proto librarygrpc librarygapic library 。不同的打包方式需要使用不同的工具/插件,以如下定义为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
syntax = "proto3";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

proto library

首先是打包 proto library,需要下载 protoc 程序(在 protobuf 的 release 页面下载)。

1
$ protoc --java_out=jv greeter.proto

这个命令只会生成数据结构,会忽略 service 定义。

gRPC library

然后,是打包 gRPC library,需要下载对应的 generator,工具下载 / 本地编译步骤参考这里,另外 gRPC 相关的库及代码在这里,命令参考如下。

1
$ protoc --plugin=protoc-gen-grpc-java=/Users/wii/Downloads/protoc-gen-grpc-java-1.38.0-osx-x86_64.exe --grpc-java_out=jv --java_out=jv greeter.proto

gRPC library 和 proto library 的区别是,会生成用于服务调用的 client 和 server 端代码。

gapic library

最后,是打包 gapic library,同样需要下载对应的 generator,以 Java 为例,工具参考gapic-generator-javagapic-generator。顺便,artman 作为一个可选的生成工具,已经停止维护。要打包 java 的 gapic library ,首先需要编译插件,从gapic-generator 下载 release 包,运行命令 ./gradlew fatjar 进行编译。

首先,生成 description 文件。

1
$ protoc --include_imports --include_source_info -o greeter.description greeter.proto

接着,生成 proto message classes。

1
2
$ mkdir java-proto-lib-out
$ protoc --java_out=java-proto-lib-out greeter.proto

然后,生成 gRPC stub 。

1
2
$ mkdir java-grpc-lib-out
$ protoc --plugin=protoc-gen-grpc=/Users/wii/Downloads/protoc-gen-grpc-java-1.38.0-osx-x86_64.exe --grpc_out=java-grpc-lib-out greeter.proto

进一步,新建服务描述文件。

1
2
3
4
5
type: google.api.Service
config_version: 3
name: greeter.gapic.cook.pub.wii

title: Example Library API

接着,生成客户端配置文件。

1
2
3
4
5
6
java -XX:-OmitStackTraceInFastThrow -cp /Users/wii/Downloads/gapic-generator-2.11.1/build/libs/gapic-generator-2.11.1-fatjar.jar \
com.google.api.codegen.GeneratorMain GAPIC_CONFIG \
--descriptor_set=greeter.proto.pb \
--service_yaml=greeter-service.yaml \
-o=/Users/wii/Data/tmp/grpc/greeter-gapic-config.yaml
# 输出文件使用绝对路径

然后,创建 package 的 metadata 配置文件。

1
2
3
4
5
6
7
artifact_type: GAPIC
proto_deps:
- google-common-protos
api_name: greeter
api_version: v1
organization_name: solo-kingdom
proto_path: wii/cook/gapic/greeter/v1

最后,生成代码。

java

1
2
3
4
5
6
7
8
java -cp /Users/wii/Downloads/gapic-generator-2.11.1/build/libs/gapic-generator-2.11.1-fatjar.jar \
com.google.api.codegen.GeneratorMain LEGACY_GAPIC_AND_PACKAGE \
--descriptor_set=greeter.proto.pb \
--service_yaml=greeter-service.yaml \
--gapic_yaml=greeter-gapic-config.yaml \
--package_yaml2=greeter-meta-config.yaml \
--language=java \
--o=java-code-gen

python

1
2
3
4
5
6
7
8
java -cp /Users/wii/Downloads/gapic-generator-2.11.1/build/libs/gapic-generator-2.11.1-fatjar.jar \
com.google.api.codegen.GeneratorMain LEGACY_GAPIC_AND_PACKAGE \
--descriptor_set=greeter.proto.pb \
--service_yaml=greeter-service.yaml \
--gapic_yaml=greeter-gapic-config.yaml \
--language=python \
--package_yaml2=greeter-meta-config.yaml \
--o=python-code-gen

顺便,这里提供了一个示例。

最后,在使用 gapic 时,可能需要引用一些 google 的 proto 文件,比如 google/api/annotation.proto ,这些文件定义在这里

Bazel

rules_proto

获取 sha256

1
2
3
4
$ COMMIT_ID=734b8d41d39a903c70132828616f26cb2c7f908c
$ wget https://github.com/stackb/rules_proto/archive/${COMMIT_ID}.tar.gz
$ shasum -a 256 ${COMMIT_ID}.tar.gz
c89348b73f4bc59c0add4074cc0c620a5a2a08338eb4ef207d57eaa8453b82e8

Artman

1
2
# install
$ pip3 install googleapis-artman

配置文件

生成 gapic 库时,指定的配置文件。

1
2
3
4
5
6
7
8
9
10
java_gapic_library(
name = "java_cook_gapic",
srcs = [":cook_proto"],
gapic_yaml = "cook_gapic.yaml", # 这里指定的文件
grpc_service_config = "cook_gapic_service_config.json",
deps = [
":java_cook_proto",
":java_cook_grpc"
],
)

配置参考 auth.proto 内定义的 Authentication 结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// `Authentication` defines the authentication configuration for API methods
// provided by an API service.
//
// Example:
//
// name: calendar.googleapis.com
// authentication:
// providers:
// - id: google_calendar_auth
// jwks_uri: https://www.googleapis.com/oauth2/v1/certs
// issuer: https://securetoken.google.com
// rules:
// - selector: "*"
// requirements:
// provider_id: google_calendar_auth
// - selector: google.calendar.Delegate
// oauth:
// canonical_scopes: https://www.googleapis.com/auth/calendar.read
message Authentication {
// A list of authentication rules that apply to individual API methods.
//
// **NOTE:** All service configuration rules follow "last one wins" order.
repeated AuthenticationRule rules = 3;

// Defines a set of authentication providers that a service supports.
repeated AuthProvider providers = 4;
}

可以看到,有两个字段,rules & providers。对于 rules,数据结构为 AuthenticationRule。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Authentication rules for the service.
//
// By default, if a method has any authentication requirements, every request
// must include a valid credential matching one of the requirements.
// It's an error to include more than one kind of credential in a single
// request.
//
// If a method doesn't have any auth requirements, request credentials will be
// ignored.
message AuthenticationRule {
// Selects the methods to which this rule applies.
//
// Refer to [selector][google.api.DocumentationRule.selector] for syntax details.
string selector = 1;

// The requirements for OAuth credentials.
OAuthRequirements oauth = 2;

// If true, the service accepts API keys without any other credential.
// This flag only applies to HTTP and gRPC requests.
bool allow_without_credential = 5;

// Requirements for additional authentication providers.
repeated AuthRequirement requirements = 7;
}

示例如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type: google.api.Service
config_version: 3
name: analyticsadmin.googleapis.com
title: Google Analytics Admin API

# 横线开头定义数组
apis:
- name: google.analytics.admin.v1alpha.AnalyticsAdminService

authentication:
rules:
- selector: 'google.analytics.admin.v1alpha.AnalyticsAdminService.*'
oauth:
canonical_scopes: |-
https://www.googleapis.com/auth/analytics.edit
- selector: '*'
allow_without_credential: true # 无需认证

Reference

gRPC

Introduction

gRPC 是谷歌开源的一款 RPC 框架,protocal buffers 作为它的 IDL(Interface Definition Language,接口定义语言),也可以作为其底层数据交换格式。

Overview

在 gRPC,一个客户端应用可以直接调用不同机器上服务端的方法。在一些 RPC 系统中,gRPC 是基于以下思路,定义一个服务接口,指定可以远程调用的方法,机器参数和返回类型;在服务端,实现这个接口,并且运行一个 gRPC 服务,来处理客户端的请求;在客户端,保存一个存根(stub,在一些语言中简称为客户端)提供和服务端相同的方法。

Concept Diagram

Protocol Buffers

默认情况下,gRPC 使用 Protocol Buffers (Protobuffer,PB,谷歌开源的且成熟的序列化结构化数据的机制/工具)。使用 PB ,首先需要在 proto 文件中定义一个需要序列化的结构。

1
2
3
4
5
message Person {
string name = 1;
int32 id = 2;
bool has_ponycopter = 3;
}

如果需要定义一个 Service。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// file: greeter.proto

syntax = "proto3";
// The greeter service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

如下命令。

1
$ protoc --java_out=jv greeter.proto

并不会生成用于 RPC 的 client 和 server,需要使用如下命令。

1
$ protoc --java_out=jv --grpc-java_out=jv greeter.proto

在使用之前,需要安装工具 protoc-gen-grpc-java,工具下载 / 本地编译步骤参考这里。至于下载,可以移步 这里,选择一个版本并点击链接,在介绍表格部分有 Files 字段,点击 View all ,现在指定平台的可执行文件即可,然后运行如下命令。

1
$ protoc --plugin=protoc-gen-grpc-java=/Users/wii/Downloads/protoc-gen-grpc-java-1.38.0-osx-x86_64.exe --grpc-java_out=jv --java_out=jv greeter.proto 

注意 指定 plugin 时,使用决定路径。

查看生成结果。

1
2
$ ls jv
GreeterGrpc.java GreeterOuterClass.java

Core Concepts

服务定义

gRPC 默认使用 Protocol Buffers 作为服务定义语言,如果需要也可以使用其他可选项。

1
2
3
4
5
6
7
8
9
10
11
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

gRPC 允许定义多种类型的服务方法。

  • 一元 RPCs(Unary RPCs),客户端发送一次请求,得到一次响应,和通常的方法调用类似

    1
    rpc SayHello(HelloRequest) returns (HelloResponse);
  • 服务端流 RPCs(Server streaming RPCs),客户端向服务端发送一次请求,得到一个流来读取一系列的消息。客户端从返回的流中读取数据,直到没有更多信息。gRPC 保证一次独立调用中消息的顺序

    1
    rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
  • 客户端流 RPCs(Client streaming RPCs ),客户端使用流,向服务端发送一系列的消息。客户端结束写入后,等待服务端读取并返回一个响应。同样,gRPC 保证一次独立调用中消息的顺序

    1
    rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
  • 双向流 RPCs(Bidirectional streaming RPCs),客户端和服务端通过读写流发送消息序列,两个流操作独立,所以客户端和服务端可以以任意顺序读写。比如,服务端可以等待接受到所有消息后返回响应消息序列;也可以每接受一个消息,返回一个响应消息。

    1
    rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

使用 API

首先,在 .proto 文件中定义服务(service),gRPC 提供 PB(protocol buffer)编译插件,来生成客户端和服务端代码。gRPC 用户通常在客户端调用这些API,在服务端实现这些 API。

  • 在服务端,实现服务接口定义的方法,并且运行一个 gRPC 服务来处理客户端的请求。gRPC 基础设施解码进来的请求,执行服务方法,并编码服务响应
  • 在客户端,有一个本地对象,称作 stub(对于一些语言,术语为 client),其实现和服务端相同的方法。client 可以在本地对象上调用这些方法,使用合适的 pb 消息类型包装调用的参数,gRPC接着传送请求给服务端,并且返回服务端的 pb 格式消息响应。

同步 vs 异步

gRPC 在大部分语言中,具备异步和同步风格的接口。

RPC 声明周期

Unary RPC

  • 当客户端调用 stub 的方法时,服务端会被告知这次调用的 metadata,包括方法的名称,请求的超时时间等
  • 服务端可以马上返回自己的初始 metadata(在响应前必须传送),也可以等待客户端的请求消息,具体的行为由应用程序指定
  • 一但服务器得到了完整的客户端请求消息,会做对应的工作并构造一个响应消息。接着返回响应,及详细的状态(status)和可选的尾部 metadata。
  • 如果返回状态是 OK,客户端会得到这个响应,这就完成了客户端的调用。

Server streaming RPC

server streaming RPC 和 unary RPC 类似,不同的是服务端在响应中返回一个消息流(a stream of message)。在所有的消息发送完后,详细的状态码和可选的尾部 metadata 会被传送给客户端。

Client streaming RPC

服务端返回单个消息的实际,通常是接受到所有消息后,但并非必须如此。

Bidirectional streaming RPC

双向流RPC,服务端接受到的客户端的 metadata、请求方法、deadline。

Deadlines / Timeouts

gRPC 允许客户端指定超时时间。服务端可以知道一个指定 RPC 调用是否超时,或者剩余处理时间。

终止 RPC

可能存在服务端成功返回,但是客户端请求超时的情况。也有可能,服务端在客户端还在发送请求时来结束本次RPC。

取消 RPC

服务端和客户端都可以在任意时间取消一次 RPC 请求。

Metadata

Metadata 包含一次特定 RPC 请求的信息,数据格式是 k-v 对的形式,key为string,value可以是string,也有可能是二进制数据。

Channels

一个 gRPC 通道提供一个和指定host和port的 gRPC 服务器的连接,当创建一个客户端 stub 时会用到。客户端可以指定通道参数,来修改 gPRC 的默认行为,比如控制消息压缩开关。一个 channel 包含状态,包括 connectedidle

gRPC 怎么处理关闭 channel 的流程,语言间有所不同。

Appendix

thrift VS protocol buffer

  • pb 生成 service 的 client 和 server 代码,需要额外的插件

负载均衡

关于 gRPC 的负载均衡,这里 列举了主要的几种方式,胖客户端、Lookaside LB、Proxy。

参考