linux commands misc

netstat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 参数
-a 列出所有连接
-t TCP连接
-u UDP连接
-t 禁用反向DNS查找,提高输出速度
-l 只列出正在监听的端口
-p 列出PID和程序名称
-e 列出程序所属用户
-s 打印网络数据(接受、发送包统计等数据)
-r 打印路由信息
-i 打印网络接口信息
-c 持续打印网络信息

# 常用
$ netstat -ct # 获取持续输出
$ netstat -atnp # 获取所有活动的TCP连接
$ netstat -ie # 打印用户友好的网络接口信息

watch

1
2
3
4
5
6
7
8
9
# 参数
-n 设置间隔时间
-d 高亮显示变化区域
-t 关闭顶部的时间间隔、命令、当前时间信息

# 示例
$ watch -n 1 -d netstat -ant # 观察每秒网络连接变化
$ watch -n 1 -d 'pstree|grep http' # 观察每秒http链接的变化
$ watch -n 10 'cat /proc/loadavg' # 每10秒输出一次系统平均负载

awk

1
2
# 示例
$ awk '{ print $1 }' # 打印首列

sed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 格式
$ sed [-Ealn] [-e command] [-f command_file] [-i extension] [file ...]
$ sed [-Ealn] command [file ...]
# command
[address[,address]]function[arguments]
# 示例
$ 1,20s/old/new/g
# 参数
-n slilent模式,是输出处理行
-e 通过命令行参数附加编辑操作
-i inplace 修改文件
-f 指定sed命令文件
# funciont
a 新增(后)
i 插入(前)
c 替换
d 删除
p 打印选择数据
s 取代
# 匹配
## () 匹配模式,\1, \2 使用模式值
$ echo "http://localhost:8080/uri/path?p=v" | sed -e 's/^\([^:]*\):\/\/\([^:]*\):\([0-9]*\)\(.*\)$/protocol=[\1] host=[\2] port=[\3] pathAndParams=[\4]/g'
protocol=[http] host=[localhost] port=[8080] pathAndParams=[/uri/path?p=v]

# 示例
$ 1,20s/old/new/g 替换1~20行内的old为new
$ 2,5d 删除2~5行
$ 3,$d 删除第三行至结尾数据

cut

1
2
3
4
# 示例
$ cut -d ' ' -f3,5 # 打印第3,5列
$ echo "localhost:8080" | cut -d ':' -f1 # 提取host
localhost

tr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# tr: translate characters, 转换和删除字符
# 格式
tr [-Ccsu] string1 string2 # 替换 string1 中字符为 string2 中位置对应的字符
tr [-Ccu] -d string1
tr [-Ccu] -s string1
tr [-Ccu] -ds string1 string2
# 参数
-d 删除指令字符
-c 反选指令字符串
-C 类似-c,反选指令集中字符

# class
[:class:] Represents all characters belonging to the defined character class. alnum <alphanumeric characters>
alpha <alphabetic characters>
blank <whitespace characters>
cntrl <control characters>
digit <numeric characters>
graph <graphic characters>
ideogram <ideographic characters>
lower <lower-case alphabetic characters>
phonogram <phonographic characters>
print <printable characters>
punct <punctuation characters>
rune <valid characters>
space <space characters>
special <special characters>
upper <upper-case characters>
xdigit <hexadecimal characters>

# 示例
$ echo "What a cute dog" | tr a-z A-Z
WHAT A CUTE DOG
$ $ echo "What a cute dog" | tr [:lower:] [:upper:]
WHAT A CUTE DOG

sort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 排序
# 格式
sort [-bcCdfghiRMmnrsuVz] [-k field1[,field2]] [-S memsize] [-T dir] [-t char] [-o output] [file ...]

# 参数
-u 删除重复key
-s 稳定排序
-b 忽略开头空白符
-d 字典序输出
-i 忽略不可打印字符
-R 乱序输出
-n 数字排序
-t 指定分隔符
-k 指定排序字段

# 示例
$ cat t2
10.0.0.1:8080
10.0.0.2:8080
10.0.0.1:8090
10.0.0.3:8070
10.0.0.1:8060
$ cat t2 | sort -t ':' -k 1
10.0.0.1:8060
10.0.0.1:8080
10.0.0.1:8090
10.0.0.2:8080
10.0.0.3:8070
$ cat t2 | sort -t ':' -k 2
10.0.0.1:8060
10.0.0.3:8070
10.0.0.1:8080
10.0.0.2:8080
10.0.0.1:8090

uniq

1
# 删除重复行,一般与sort结合使用

date

1
2
3
4
5
6
7
8
9
10
# format
date +"%Y%m%d"
# 分钟
date +"%M"

# minus
-d "-1 days"

# 前一台日期
date -d "-1 days" +"%Y-%m-%d"

json

1
2
3
4
5
6
7
8
# 美化 json 字符串
$ echo '{"data":{"name":"wii","age":18}}' | python -m json.tool
{
"data": {
"age": 18,
"name": "wii"
}
}

time

1
2
3
4
5
6
# 统计程序运行时间
time <program args>
...
real 0m0.003s
user 0m0.001s
sys 0m0.002s

python coroutines

协程

由程序负责任务切换,可以减少线程/进程上下文切换的消耗。用户态实现任务切换,无需进入内核态。

用途

虽然 Python 有多线程的概念,但是由于 GIL 的存在,并不能利用多核资源。如果易不能充分利用单进程资源,可能会带来严重的性能问题。

相关

EventLoop

python 默认只为主线程创建 loop。如下 tornado 代码实现了自动为创建 loop 的功能,使用 asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) 来生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if sys.platform == "win32" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
# "Any thread" and "selector" should be orthogonal, but there's not a clean
# interface for composing policies so pick the right base.
_BasePolicy = asyncio.WindowsSelectorEventLoopPolicy # type: ignore
else:
_BasePolicy = asyncio.DefaultEventLoopPolicy


class AnyThreadEventLoopPolicy(_BasePolicy): # type: ignore
"""Event loop policy that allows loop creation on any thread.

The default `asyncio` event loop policy only automatically creates
event loops in the main threads. Other threads must create event
loops explicitly or `asyncio.get_event_loop` (and therefore
`.IOLoop.current`) will fail. Installing this policy allows event
loops to be created automatically on any thread, matching the
behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).

Usage::

asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())

.. versionadded:: 5.0

"""

def get_event_loop(self) -> asyncio.AbstractEventLoop:
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
# This was an AssertionError in Python 3.4.2 (which ships with Debian Jessie)
# and changed to a RuntimeError in 3.4.3.
# "There is no current event loop in thread %r"
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop

示例

定时器

下面是使用协程实现的定时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# coding: utf-8
import asyncio
import threading
import time
from datetime import datetime
from typing import Callable


class Scheduler:
cache: set[str] = set()

@classmethod
async def _do_schedule(cls, name: str, delay: int, interval: int, cb: Callable, args, kwargs):
await asyncio.sleep(delay)
while name in cls.cache:
try:
cb(*args, **kwargs)
except Exception as e:
print('execute target failed, e=', e)
await asyncio.sleep(interval)

@classmethod
def _schedule_wrapper(cls, name: str, delay: int, interval: int, cb: Callable, args, kwargs):
asyncio.run(cls._do_schedule(name, delay, interval, cb, args, kwargs))

@classmethod
def schedule(cls, name: str, delay: int, interval: int, cb: Callable, *args, **kwargs):
assert name not in cls.cache, 'duplicate scheduler with name ' + name
threading.Thread(target=cls._schedule_wrapper,
args=(name, delay, interval, cb, args, kwargs),
daemon=True).start()

cls.cache.add(name)

@classmethod
def stop(cls, name: str):
if name in cls.cache:
cls.cache.remove(name)


def cbk(a, b, c):
print('execute at', datetime.now(), 'with args:', (a, b, c))


if __name__ == '__main__':
Scheduler.schedule('first', 1, 1, cbk, 'a', 'b', c='c')
Scheduler.schedule('second', 1, 1, cbk, 'd', 'e', c='f')
time.sleep(3)
Scheduler.stop('first')
try:
while True:
pass
except KeyboardInterrupt:
pass

异常

loop argument must agree with Future

下看下抛出异常的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('A Future, a coroutine or an awaitable is required')

参考

curator x discovery

概念

  • ServiceDiscovery ,创建 ServiceProvider 对象,首先需要有 ServiceDiscovery ;所有请求直接访问 zk
  • ServiceProvider, 特定服务发现的封装,并集成了负载均衡策略;集成了 ServiceCache ,有节点监听和缓存
  • ServiceCache ,会在本地内存缓存,并使用 watcher 来保持数据最新

说明

ServiceDiscoveryServiceProvider 需要调用 start 方法后可用。

注册

  • 使用 ServiceDiscoveryregisterService 注册服务后,只要 ServiceDiscoverystop ,会一直保持节点注册
  • 服务被强制 stop ,没有及时调用 unregisterService 接口来取消注册,zk 节点会保存一段时间(几秒),然后由 zk 摘除

查询

  • ServiceProvider 的接口,会实时调用 zk 查询数据,

监听

ServiceCacheListener 有两个方法。

  • cacheChanged 当服务节点变化时,会调用该方法
  • stateChanged 当 zk 连接状态变化时,会调用该方法

grpc java

Channel

gRPC 代码实现中,Channel 是一个虚拟类,是物理连接的逻辑概念。ManagedChannelImpl 和 ManagedChannelImpl2 继承了该类,并实现了 newCall 和 authority 接口。

SubChannel

SubChannel 是 LoadBalancer 的内部类,在了解 SubChannel 之前,需要先了解 SocketAddress 和 EquivalentAddressGroup。

SocketAddress

SocketAddress 是一个虚拟类,代表一个不包含协议信息的 Socket 地址。

EquivalentAddressGroup

EquivalentAddressGroup 是一组 SocketAddress,在 Channel 创建连接时,其包含的 SocketAddress 被视为无差异的。

SubChannel

再回到 SubChannel,他表示和一个服务器或者 EquivalentAddressGroup 表示的一组等价服务器的 一个物理连接,这里需要注意的是,他至多有一个物理连接。在发起新的 RPCs 时,如果没有激活的 transport,在被安排 RPC 请求时,会创建 transport。调用 requestConnection ,会请求 Subchannel 在没有 transport 的情况下创建 transport。

SubChannel 有一个 List<EquivalentAddressGroup> 属性,可以通过 setAddresses(List<EquivalentAddressGroup> addrs)setAddresses(EquivalentAddressGroup addrs) 设置。

InternalSubchannel

InternalSubchannel 表示一个 SocketAddress 的 Transports 。他实现了 TransportProvider 接口,定义了 obtainActiveTransport 方法,该方法如果没有激活的 transports,则会调用 startNewTransport 进行创建。

获取 SocketAddress

在创建 transports 时,需要先获取 SocketAddress。在创建 InternalSubChannel 时,会传入 List<EquivalentAddressGroup>需要注意的是,InternalSubChannel 默认使用 第一个 EquivalentAddressGroup 的 第一个 SocketAddress ,只有在 transport 被关闭时,才会尝试下一个服务地址。

尝试完所有的地址,全部失败后,此时 InternalSubChannel 处于 TRANSIENT_FAILURE 状态,等待一个 delay 时间后,重新尝试。

NameResolver

NameResolver 是一个可插拔的组件(pluggable component),代码层面是一个接口,用来解析一个 target(URI),并返回给调用方一个地址列表,gRPC 内置了 DnsNameResolver。

地址的返回是基于 Listener 机制,NameResolver 的实现类,需要定义 start 方法,方法会传入一个 Listener,当服务列表发生变化时,调用 Listener 的 onResult 方法,通知 NameResolver 的持有方。

LoadBalancer

LoadBalancer 是一个可插拔的组件,接受 NameResolver 拿到的服务方列表,提供可用的 SubChannel。

RoundRobinLoadBalancer

从 RoundRobinLoadBalancer 的 handleResolvedAddresses 实现可以发现。

  • 每次刷新时

    • 对新增服务方创建 SubChannel
    • 对于删掉的服务方进行剔除
    • 对于可用的服务方,不会重新创建 SubChannel

ManagedChannelImpl

+Channel

Channel 去执行一次远程调用,是通过 newCall 方法,传入 **MethodDescriptor ** 和 CallOptions。对于 ManagedChannelImpl,其实现 Channel 接口,newCall 方法转而调用 InterceptorChannel 的 newCall,先简单说下 InterceptorChannel。

managedChannelImpl 字段是调用 ClientInterceptors.intercept(channel, interceptors) 构造,先说 InterceptorChannel 再说 ClientInterceptors。

InterceptorChannel 将 Interceptor 和 Channel 的结合,由 channel + interceptor 构造,调用 channel 的 newCall 时,会执行 interceptor 的 interceptCall,该调用会传入 channel。对于一个原始 channel 和 多个 interceptor,先将 interceptor 倒序,然后依次创建 InterceptorChannel,进行包装。

1
2
3
for (ClientInterceptor interceptor : interceptors) {
channel = new InterceptorChannel(channel, interceptor);
}

相比之下,ClientInterceptors 只是一个工具类。

接着,怎么用上 NameSolver 的。在构造 interceptorChannel 时,传入一个 channel。这个channel 是一个 RealChannel 对象。

RealChannel 实现了 Channel 接口。

+SubChannel

这里需要再提一下,Channel 是逻辑连接,SubChannel 是物理连接。ManagedChannelImpl 实现了 Channel 接口,同时,有一个内部类 SubchannelImpl 实现 SubChannel。

创建物理连接

首先调用 SubchannelImpl 的 requestConnection ,在方法内会调用 InternalSubchannelobtainActiveTransport 创建和 SocketAddress 的连接。

+NameResolver

在 ManagedChannelImpl 内,定义了 NameResolverListener,实现了NameResolver.Listener 接口,在 NameResolverListener 内做了 LoadBalancer 和 NameResolver 的结合。

NameResolver + LoadBalancer

在 NameResolverListener 的 onResult 方法内,当服务器地址变更时会执行改方法。首先,会从参数中获取服务端列 表List<EquivalentAddressGroup>,接下来调用 AutoConfiguredLoadBalancer 的 tryHandleResolvedAddresses 方法,再调用 LoadBalancer 的 handleResolvedAddresses。整个调用实现比较绕,直接了解内置的 LB 即可。

yEd

自适应大小

动态调整node至label大小

File > Preferences > Editor > Dynamically Adjust Node Size to Label Size

新建节点时,node 大小根据 label 大小动态调整。

不启用效果

启用效果

调整格式

调整node适应label

Tools > Fit Node To Label

假设,有如下 node。

调整后效果如下。

rsync

安装

1
$ brew install rsync

参数

1
2
P	等价于 --partial --progress
a archive 模式

同步文件夹

1
2
3
4
5
$ rsync -Pav <local> <user@ip:remote-dist>

# 使用特定 id_rsa
$ rsync -Pav -e "ssh -i ~/.ssh/id_rsa_sensors" <local> <user@ip:remote-dist>

实时同步

使用工具 fswatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 安装依赖
$ brew install fswatch

# watch 文件变动
$ fswatch . | xargs -n1 -I{} <do-something>

# 定义同步方法
function dosyn() {
if [ ! -e .RSYNCING ]; then
touch .RSYNCING
echo "begin to sync"
rsync -Pav -e "ssh -i ~/.ssh/<secret-id>" <local> <user@ip:remote-dist> # 修改这里
echo "rsync done at $(date), sleep 30 seconds"
sleep 30
rm .RSYNCING
echo "sleep done at $(date)"
else
echo "syncing OR sleeping ..."
fi
}

[ -e .RSYNCING ] && rm .RSYNCING
export -f dosyn
fswatch -e .RSYNCING -ro . | xargs -P0 -n1 -I{} bash -c 'dosyn'

dubbo

模块

/dev-guide/images/dubbo-modules.jpg

  • dubbo-common : Util 类和通用模型
  • dubbo-remoting : 远程通讯模块,提供通用的客户端和服务端的通讯能力;相当于 dubbo 协议的实现,如果 RPC 使用 RMI 协议则不需要此包;最小原则,可以只看 dubbo-remoting-api + dubbo-remoting-netty4dubbo-remoting-zookeeper
  • dubbo-rpc : 远程调用模块,抽象各种协议,以及动态代理,只包含一对一调用,不关心集群管理;从包图可以看出,dubbo-rpc 是 dubbo 的中心
    • dubbo-rpc-api : 抽象各种协议以及动态代理,实现一对一调用
  • dubbo-cluster : 集群模块,提供多个服务方伪装为一个提供方能力,包括:负载均衡、容错、路由等,集群的地址列表可以静态配置,也可以由注册中心下发
    • 容错:Cluster 接口 + cluster.support 包;Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker(Java 动态代理),对上层透明;伪装过程中包含了容错逻辑
    • 目录(directory):Directory 接口 + cluster.directory 包;Directory 代表了多个 Invoker,可以看做动态变化的 Invoker List,每个 Invoker 对应一个服务端节点
    • 路由(router):Router 接口 + cluster.router 包;负责从多个 Invoker 中按路由规则选出子集,用于调用
    • 配置:Configurator 接口 + cluster.configurator
    • 负载均衡(loadbalance):LoadBalance 接口和 cluster.loadbalance 包;负责从多个 Invoker 中根据负载均衡算法,选出具体的一个用于本次调用
    • 合并结果(merger):Merger 接口 + cluster.merger 包;合并返回结果,用于分组聚合

集群容错

  • dubbo-registry : 注册中心模块,基于注册中心下发地址的集群方式,以及对各种注册中心的抽象
  • dubbo-monitor : 监控模块,统计服务调用次数,调用时间,调用链路跟踪的服务
  • dubbo-config : 配置模块,是 dubbo 对外的 API,用户通过 Config 使用 dubbo,隐藏 dubbo 的所有细节
  • dubbo-container : 容器模块,是一个 Standlone 容器,以简单的 Main 加载 Spring 启动(用于代替容器启动),因为服务通常不需要 Tomcat、JBoss 等Web 容器的特性,没必要用 Web 容器取加载服务
  • dubbo-filter过滤器模块,提供了内置过滤器;
    • doubbo-filter-cache :缓存过滤器
    • dubbo-filter-validation :参数验证过滤器
  • dubbo-plugin :插件模块,提供内置的插件
    • dubbo-qos :提供在线运维命令

BOM

BOM(Bill Of Materials),为了防止用 Maven 管理 Spring 项目时,不同的项目依赖了不同版本的 Spring,可以使用 Maven BOM 来解决这一问题。

doubbo-dependencies-bom

dubbo-dependencies-bom/pom.xml统一定义了 Dubbo 依赖的三方库的版本号。

dubbo-dependencies-bom 文件dubbo-parent ,会引入该 BOM。

引入 dubbo-dependencies-bom 文件

dubbo-bom

dubbo-bom/pom.xml统一定义了 Dubbo 的版本号。

dubbo-bom 文件

dubbo-demodubbo-test 会引入该 BOM 。以 dubbo-demo 为例。

引入 dubbo-bom 文件

dubbo-parent

Dubbo 的 Maven 模块,都会引入该 pom 文件。以 dubbo-cluster 为例。

引入 dubbo-parent 文件

dubbo-all

dubbo/all/pom.xml ,定义了 dubbo 的打包脚本。我们在使用 dubbo 库时,引入该 pom 文件。

引入关系

流程

/dev-guide/images/dubbo-framework.jpg

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

各层说明

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类;由 dubbo-config 模块实现
  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
  • protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
  • serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

关系说明

  • 在 RPC 中,Protocol 是核心层,只要 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上添加 Filter 拦截点
  • Cluster 是外围概念,所以 Cluster 的目的是将多个 Invoker 伪装成一个 Invoker,这样其它人只要关注 Protocol 层 Invoker 即可
  • Proxy 层封装了所有接口的透明化代理,而在其它层都以 Invoker 为中心,只有到了暴露给用户使用时,才用 Proxy 将 Invoker 转成接口,或将接口实现转成 Invoker

依赖关系

/dev-guide/images/dubbo-relation.jpg

  • 图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互。
  • 图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点。
  • 图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用。
  • 图中只包含 RPC 的层,不包含 Remoting 的层,Remoting 整体都隐含在 Protocol 中。

调用链

/dev-guide/images/dubbo-extension.jpg

  • 垂直分层如下:
    • 下方 淡蓝背景( Consumer ):服务消费方使用的接口
    • 上方 淡绿色背景( Provider ):服务提供方使用的接口
    • 中间 粉色背景( Remoting ):通信部分的接口
  • 自 LoadBalance 向上,每一行分成了多个相同的 Interface ,指的是负载均衡后,向 Provider 发起调用。
  • 左边 括号 部分,代表了垂直部分更细化的分层,依次是:Common、Remoting、RPC、Interface 。
  • 右边 蓝色虚线( Init ) 为初始化过程,通过对应的组件进行初始化。例如,ProxyFactory 初始化出 Proxy 。

暴露服务时序

展开总设计图左边服务提供方暴露服务的蓝色初始化链,时序图如下。

/dev-guide/images/dubbo-export.jpg

引用服务时序

展开总设计图右边服务消费方引用服务的蓝色初始化链,时序图如下。

/dev-guide/images/dubbo-refer.jpg

领域模型

Invoker

Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

满眼都是 Invoker

类图。

Invoker 子类

Invocation

Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。

Invocation 子类

Result

Result 是会话域,它持有调用过程中返回值,异常等。

Invocation 子类

Filter

过滤器接口,和我们平时理解的 javax.servlet.Filter 基本一致。

Filter 子类

ProxyFactory

代理工厂接口。服务消费着引用服务的 主过程 如下图。

服务消费着引用服务的主过程

从图中我们可以看出,方法的 invoker 参数,通过 Protocol 将 Service接口 创建出 Invoker 。通过创建 Service 的 Proxy ,实现我们在业务代理调用 Service 的方法时,透明的内部转换成调用 Invoker 的 #invoke(Invocation) 方法。

服务提供者暴露服务的 主过程 如下图。

服务提供者暴露服务的主过程

从图中我们可以看出,该方法创建的 Invoker ,下一步会提交给 Protocol ,从 Invoker 转换到 Exporter 。

类图如下。

ProxyFactory 子类

从图中,我们可以看出 Dubbo 支持 Javassist 和 JDK Proxy 两种方式生成代理。

Protocol

Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口。它负责 Invoker 的生命周期管理。

Dubbo 处理服务暴露的关键就在 Invoker 转换到 Exporter 的过程。下面我们以 Dubbo 和 RMI 这两种典型协议的实现来进行说明。

  • Dubbo 的实现
    Dubbo 协议的 Invoker 转为 Exporter 发生在 DubboProtocol 类的 export 方法,它主要是打开 socket 侦听服务,并接收客户端发来的各种请求,通讯细节由 Dubbo 自己实现。
  • RMI 的实现
    RMI 协议的 Invoker 转为 Exporter 发生在 RmiProtocol 类的 export 方法,它通过 Spring 或 Dubbo 或 JDK 来实现 RMI 服务,通讯细节这一块由 JDK 底层来实现,这就省了不少工作量。

Protocol 子类

Exporter

Exporter ,Invoker 暴露服务在 Protocol 上的对象。

Exporter 子类

InvokerListener

Invoker 监听器。

InvokerListener 子类

ExporterListener

Exporter 监听器。

ExporterListener 子类

配置

dubbo 支持多种配置方式:

  • API 配置
  • 属性配置
  • XML 配置
  • 注解配置

所有配置项分为三大类。

  • 服务发现:表示该配置项用于服务的注册与发现,目的是让消费方找到提供方。
  • 服务治理:表示该配置项用于治理服务间的关系,或为开发测试提供便利条件。
  • 性能调优:表示该配置项用于调优性能,不同的选项对性能会产生影响。

所有配置最终都将转换为 Dubbo URL 表示,并由服务提供方生成,经注册中心传递给消费方,各属性对应 URL 的参数,参见配置项一览表中的对应URL参数列。

首先看下 dubbo-config-api 项目结构。

dubbo-config-api 项目结构

整理下配制间的关系。

配置之间的关系

配置可分为四部分。

  • application-shared

  • provider-side

  • consumer-side

  • sub-config

配置类关系如下。

配置类关系

扩展

做框架应该考虑的一些问题。

  • 避免不必要的引用
  • 依赖接口,而不是具体的实现

扩展点

系统通过扩展机制,来横向扩展系统功能,只要符合对应扩展规范,扩展的过程无需修改系统代码。所谓扩展点,是系统定义的,可以支持扩展的点(流程的某个步骤、代码的某个点)。

dubbo 提供了大量扩展点。

  • 协议扩展
  • 调用拦截扩展
  • 引用监听扩展
  • 暴露监听扩展
  • 集群扩展
  • 路由扩展
  • 负载均衡扩展
  • 合并结果扩展
  • 注册中心扩展
  • 监控中心扩展
  • 扩展点加载扩展
  • 动态代理扩展
  • 编译器扩展
  • 消息派发扩展
  • 线程池扩展
  • 序列化扩展
  • 网络传输扩展
  • 消息交换扩展
  • 组网扩展
  • Telnet 命令扩展
  • 容器扩展
  • 页面扩展
  • 缓存扩展
  • 验证扩展
  • 日志适配扩展

改进

dubbo 借鉴 JDK 标准的 SPI(Service Provider Interface)扩展点机制,重新实现了一套 SPI 机制,做了如下改进。

  • JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。

  • 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过 getName() 获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因。

  • 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

代码结构

ubbo SPI 在 dubbo-commonextension 包实现,如下图所示。

代码结构

注解

@SPI

扩展点接口的标识。

@Adaptive

自适应拓展信息的标记。

可添加方法上,分别代表了两种不同的使用方式。标记在拓展接口的方法上,代表自动生成代码实现该接口的 Adaptive 拓展实现类;标记在上,代表手动实现它是一个拓展接口的 Adaptive 拓展实现类。

一个拓展接口,有且仅有一个 Adaptive 拓展实现类。

@Active

自动激活条件的标记。

说明

bazel - usage

查看所有 Targets

1
$ bazel query //... --output label_kind

修改输出路径

1
2
3
# 仍会在项目创建 bazel-* 软链, 一般无需修改
$ bazel --output_base=output build //... # 修改输出路径
$ bazel --output_user_root=output build //... # 修改输出 & 安装路径

使用 github 依赖

在 WORKSPACE 中指定

1
2
3
4
5
http_archive(
name = "com_google_googleapis",
strip_prefix = "googleapis-8b976f7c6187f7f68956207b9a154bc278e11d7e",
urls = ["https://github.com/googleapis/googleapis/archive/8b976f7c6187f7f68956207b9a154bc278e11d7e.tar.gz"],
)

这里的 8b976f7c6187f7f68956207b9a154bc278e11d7e 为 commit id,下载 url 为 https://github.com/{user}/{repo}/archive/{commit}.tar.gz

BUILD 文件中引用

1
2
3
4
5
6
7
8
9
10
11
12
13
proto_library(
name = "person_proto",
srcs = ["person.proto"],
deps = [
"@com_google_googleapis//google/api:annotations_proto", # 这里引用
"@com_google_googleapis//google/api:client_proto",
"@com_google_googleapis//google/api:field_behavior_proto",
"@com_google_googleapis//google/api:resource_proto",
"@com_google_protobuf//:empty_proto",
"@com_google_protobuf//:field_mask_proto",
"@com_google_protobuf//:any_proto",
],
)

proto 中引用

1
2
3
4
import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/api/resource.proto";

部署 jar 包

WORKSPACE 中添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
RULES_JVM_EXTERNAL_TAG = "4.0"
RULES_JVM_EXTERNAL_SHA = "31701ad93dbfe544d597dbe62c9a1fdd76d81d8a9150c2bf1ecf928ecdf97169"

http_archive(
name = "rules_jvm_external",
strip_prefix = "rules_jvm_external-%s" % RULES_JVM_EXTERNAL_TAG,
sha256 = RULES_JVM_EXTERNAL_SHA,
url = "https://github.com/bazelbuild/rules_jvm_external/archive/%s.zip" % RULES_JVM_EXTERNAL_TAG,
)

load("@rules_jvm_external//:defs.bzl", "maven_install")

maven_install(
artifacts = [
"junit:junit:4.12",
"androidx.test.espresso:espresso-core:3.1.1",
"org.hamcrest:hamcrest-library:1.3",
],
repositories = [
# Private repositories are supported through HTTP Basic auth
# "http://test:Test1234@localhost:8081/artifactory/maven-repo-demo",
"https://maven.google.com",
"https://repo1.maven.org/maven2",
],
)

load("@rules_jvm_external//:repositories.bzl", "rules_jvm_external_deps")
rules_jvm_external_deps()
load("@rules_jvm_external//:setup.bzl", "rules_jvm_external_setup")
rules_jvm_external_setup()

BUILD 文件添加 exporter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java_gapic_library(
name = "java_cook_gapic",
srcs = [":cook_proto"],
gapic_yaml = "cook_gapic.yaml",
grpc_service_config = "cook_gapic_service_config.json",
deps = [
":java_cook_proto",
],
)

# 定义 exporter
java_export(
name = "java_cook_gapic_export",
maven_coordinates = "pub.wii.cook:cook-v1:0.0.1",
# pom_template = "pom.tmpl", # You can omit this
# srcs = glob(["*.java"]), # 如果是 java 项目, 这里部署的是 gapic 生成的 jar 包, 先注释掉
runtime_deps = [
":java_cook_gapic"
],
)

执行 exporter

1
2
3
4
5
# 部署到本地
$ bazel run --define "maven_repo=file://$HOME/.m2/repository" //cook/v1:java_cook_gapic_export.publish

# 部署到 maven 仓库, 注意这里要用 https 协议, 不支持 http
$ bazel run --define "maven_repo=https://127.0.0.1:8081/artifactory/maven-repo-demo" --define "maven_user=test" --define "maven_password=Test1234" //cook/v1:java_cook_gapic_export.publish