bazel - deps

google-apis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
http_archive(
name = "com_google_googleapis",
strip_prefix = "googleapis-8b976f7c6187f7f68956207b9a154bc278e11d7e",
urls = ["https://github.com/googleapis/googleapis/archive/8b976f7c6187f7f68956207b9a154bc278e11d7e.tar.gz"],
)

load("@com_google_googleapis//:repository_rules.bzl", "switched_rules_by_language")

switched_rules_by_language(
name = "com_google_googleapis_imports",
gapic = True,
grpc = True,
java = True,
python = True,
)

google api common protos

1
2
3
4
5
6
com_google_googleapis 包含了 common protos; 暂时保留
http_archive(
name = "com_google_api_common_protos",
strip_prefix = "api-common-protos-1db64f2e971e7ea0e15769164a67643539f0994f",
urls = ["https://github.com/googleapis/api-common-protos/archive/1db64f2e971e7ea0e15769164a67643539f0994f.tar.gz"],
)

linux commands misc

netstat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 参数
-a 列出所有连接
-t TCP连接
-u UDP连接
-t 禁用反向DNS查找,提高输出速度
-l 只列出正在监听的端口
-p 列出PID和程序名称
-e 列出程序所属用户
-s 打印网络数据(接受、发送包统计等数据)
-r 打印路由信息
-i 打印网络接口信息
-c 持续打印网络信息

# 常用
$ netstat -ct # 获取持续输出
$ netstat -atnp # 获取所有活动的TCP连接
$ netstat -ie # 打印用户友好的网络接口信息

watch

1
2
3
4
5
6
7
8
9
# 参数
-n 设置间隔时间
-d 高亮显示变化区域
-t 关闭顶部的时间间隔、命令、当前时间信息

# 示例
$ watch -n 1 -d netstat -ant # 观察每秒网络连接变化
$ watch -n 1 -d 'pstree|grep http' # 观察每秒http链接的变化
$ watch -n 10 'cat /proc/loadavg' # 每10秒输出一次系统平均负载

awk

1
2
# 示例
$ awk '{ print $1 }' # 打印首列

sed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 格式
$ sed [-Ealn] [-e command] [-f command_file] [-i extension] [file ...]
$ sed [-Ealn] command [file ...]
# command
[address[,address]]function[arguments]
# 示例
$ 1,20s/old/new/g
# 参数
-n slilent模式,是输出处理行
-e 通过命令行参数附加编辑操作
-i inplace 修改文件
-f 指定sed命令文件
# funciont
a 新增(后)
i 插入(前)
c 替换
d 删除
p 打印选择数据
s 取代
# 匹配
## () 匹配模式,\1, \2 使用模式值
$ echo "http://localhost:8080/uri/path?p=v" | sed -e 's/^\([^:]*\):\/\/\([^:]*\):\([0-9]*\)\(.*\)$/protocol=[\1] host=[\2] port=[\3] pathAndParams=[\4]/g'
protocol=[http] host=[localhost] port=[8080] pathAndParams=[/uri/path?p=v]

# 示例
$ 1,20s/old/new/g 替换1~20行内的old为new
$ 2,5d 删除2~5行
$ 3,$d 删除第三行至结尾数据

cut

1
2
3
4
# 示例
$ cut -d ' ' -f3,5 # 打印第3,5列
$ echo "localhost:8080" | cut -d ':' -f1 # 提取host
localhost

tr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# tr: translate characters, 转换和删除字符
# 格式
tr [-Ccsu] string1 string2 # 替换 string1 中字符为 string2 中位置对应的字符
tr [-Ccu] -d string1
tr [-Ccu] -s string1
tr [-Ccu] -ds string1 string2
# 参数
-d 删除指令字符
-c 反选指令字符串
-C 类似-c,反选指令集中字符

# class
[:class:] Represents all characters belonging to the defined character class. alnum <alphanumeric characters>
alpha <alphabetic characters>
blank <whitespace characters>
cntrl <control characters>
digit <numeric characters>
graph <graphic characters>
ideogram <ideographic characters>
lower <lower-case alphabetic characters>
phonogram <phonographic characters>
print <printable characters>
punct <punctuation characters>
rune <valid characters>
space <space characters>
special <special characters>
upper <upper-case characters>
xdigit <hexadecimal characters>

# 示例
$ echo "What a cute dog" | tr a-z A-Z
WHAT A CUTE DOG
$ $ echo "What a cute dog" | tr [:lower:] [:upper:]
WHAT A CUTE DOG

sort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 排序
# 格式
sort [-bcCdfghiRMmnrsuVz] [-k field1[,field2]] [-S memsize] [-T dir] [-t char] [-o output] [file ...]

# 参数
-u 删除重复key
-s 稳定排序
-b 忽略开头空白符
-d 字典序输出
-i 忽略不可打印字符
-R 乱序输出
-n 数字排序
-t 指定分隔符
-k 指定排序字段

# 示例
$ cat t2
10.0.0.1:8080
10.0.0.2:8080
10.0.0.1:8090
10.0.0.3:8070
10.0.0.1:8060
$ cat t2 | sort -t ':' -k 1
10.0.0.1:8060
10.0.0.1:8080
10.0.0.1:8090
10.0.0.2:8080
10.0.0.3:8070
$ cat t2 | sort -t ':' -k 2
10.0.0.1:8060
10.0.0.3:8070
10.0.0.1:8080
10.0.0.2:8080
10.0.0.1:8090

uniq

1
# 删除重复行,一般与sort结合使用

date

1
2
3
4
5
6
7
8
9
10
# format
date +"%Y%m%d"
# 分钟
date +"%M"

# minus
-d "-1 days"

# 前一台日期
date -d "-1 days" +"%Y-%m-%d"

json

1
2
3
4
5
6
7
8
# 美化 json 字符串
$ echo '{"data":{"name":"wii","age":18}}' | python -m json.tool
{
"data": {
"age": 18,
"name": "wii"
}
}

time

1
2
3
4
5
6
# 统计程序运行时间
time <program args>
...
real 0m0.003s
user 0m0.001s
sys 0m0.002s

python coroutines

协程

由程序负责任务切换,可以减少线程/进程上下文切换的消耗。用户态实现任务切换,无需进入内核态。

用途

虽然 Python 有多线程的概念,但是由于 GIL 的存在,并不能利用多核资源。如果易不能充分利用单进程资源,可能会带来严重的性能问题。

相关

EventLoop

python 默认只为主线程创建 loop。如下 tornado 代码实现了自动为创建 loop 的功能,使用 asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) 来生效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if sys.platform == "win32" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
# "Any thread" and "selector" should be orthogonal, but there's not a clean
# interface for composing policies so pick the right base.
_BasePolicy = asyncio.WindowsSelectorEventLoopPolicy # type: ignore
else:
_BasePolicy = asyncio.DefaultEventLoopPolicy


class AnyThreadEventLoopPolicy(_BasePolicy): # type: ignore
"""Event loop policy that allows loop creation on any thread.

The default `asyncio` event loop policy only automatically creates
event loops in the main threads. Other threads must create event
loops explicitly or `asyncio.get_event_loop` (and therefore
`.IOLoop.current`) will fail. Installing this policy allows event
loops to be created automatically on any thread, matching the
behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).

Usage::

asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())

.. versionadded:: 5.0

"""

def get_event_loop(self) -> asyncio.AbstractEventLoop:
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
# This was an AssertionError in Python 3.4.2 (which ships with Debian Jessie)
# and changed to a RuntimeError in 3.4.3.
# "There is no current event loop in thread %r"
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop

示例

定时器

下面是使用协程实现的定时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# coding: utf-8
import asyncio
import threading
import time
from datetime import datetime
from typing import Callable


class Scheduler:
cache: set[str] = set()

@classmethod
async def _do_schedule(cls, name: str, delay: int, interval: int, cb: Callable, args, kwargs):
await asyncio.sleep(delay)
while name in cls.cache:
try:
cb(*args, **kwargs)
except Exception as e:
print('execute target failed, e=', e)
await asyncio.sleep(interval)

@classmethod
def _schedule_wrapper(cls, name: str, delay: int, interval: int, cb: Callable, args, kwargs):
asyncio.run(cls._do_schedule(name, delay, interval, cb, args, kwargs))

@classmethod
def schedule(cls, name: str, delay: int, interval: int, cb: Callable, *args, **kwargs):
assert name not in cls.cache, 'duplicate scheduler with name ' + name
threading.Thread(target=cls._schedule_wrapper,
args=(name, delay, interval, cb, args, kwargs),
daemon=True).start()

cls.cache.add(name)

@classmethod
def stop(cls, name: str):
if name in cls.cache:
cls.cache.remove(name)


def cbk(a, b, c):
print('execute at', datetime.now(), 'with args:', (a, b, c))


if __name__ == '__main__':
Scheduler.schedule('first', 1, 1, cbk, 'a', 'b', c='c')
Scheduler.schedule('second', 1, 1, cbk, 'd', 'e', c='f')
time.sleep(3)
Scheduler.stop('first')
try:
while True:
pass
except KeyboardInterrupt:
pass

异常

loop argument must agree with Future

下看下抛出异常的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('A Future, a coroutine or an awaitable is required')

参考

curator x discovery

概念

  • ServiceDiscovery ,创建 ServiceProvider 对象,首先需要有 ServiceDiscovery ;所有请求直接访问 zk
  • ServiceProvider, 特定服务发现的封装,并集成了负载均衡策略;集成了 ServiceCache ,有节点监听和缓存
  • ServiceCache ,会在本地内存缓存,并使用 watcher 来保持数据最新

说明

ServiceDiscoveryServiceProvider 需要调用 start 方法后可用。

注册

  • 使用 ServiceDiscoveryregisterService 注册服务后,只要 ServiceDiscoverystop ,会一直保持节点注册
  • 服务被强制 stop ,没有及时调用 unregisterService 接口来取消注册,zk 节点会保存一段时间(几秒),然后由 zk 摘除

查询

  • ServiceProvider 的接口,会实时调用 zk 查询数据,

监听

ServiceCacheListener 有两个方法。

  • cacheChanged 当服务节点变化时,会调用该方法
  • stateChanged 当 zk 连接状态变化时,会调用该方法

grpc java

Channel

gRPC 代码实现中,Channel 是一个虚拟类,是物理连接的逻辑概念。ManagedChannelImpl 和 ManagedChannelImpl2 继承了该类,并实现了 newCall 和 authority 接口。

SubChannel

SubChannel 是 LoadBalancer 的内部类,在了解 SubChannel 之前,需要先了解 SocketAddress 和 EquivalentAddressGroup。

SocketAddress

SocketAddress 是一个虚拟类,代表一个不包含协议信息的 Socket 地址。

EquivalentAddressGroup

EquivalentAddressGroup 是一组 SocketAddress,在 Channel 创建连接时,其包含的 SocketAddress 被视为无差异的。

SubChannel

再回到 SubChannel,他表示和一个服务器或者 EquivalentAddressGroup 表示的一组等价服务器的 一个物理连接,这里需要注意的是,他至多有一个物理连接。在发起新的 RPCs 时,如果没有激活的 transport,在被安排 RPC 请求时,会创建 transport。调用 requestConnection ,会请求 Subchannel 在没有 transport 的情况下创建 transport。

SubChannel 有一个 List<EquivalentAddressGroup> 属性,可以通过 setAddresses(List<EquivalentAddressGroup> addrs)setAddresses(EquivalentAddressGroup addrs) 设置。

InternalSubchannel

InternalSubchannel 表示一个 SocketAddress 的 Transports 。他实现了 TransportProvider 接口,定义了 obtainActiveTransport 方法,该方法如果没有激活的 transports,则会调用 startNewTransport 进行创建。

获取 SocketAddress

在创建 transports 时,需要先获取 SocketAddress。在创建 InternalSubChannel 时,会传入 List<EquivalentAddressGroup>需要注意的是,InternalSubChannel 默认使用 第一个 EquivalentAddressGroup 的 第一个 SocketAddress ,只有在 transport 被关闭时,才会尝试下一个服务地址。

尝试完所有的地址,全部失败后,此时 InternalSubChannel 处于 TRANSIENT_FAILURE 状态,等待一个 delay 时间后,重新尝试。

NameResolver

NameResolver 是一个可插拔的组件(pluggable component),代码层面是一个接口,用来解析一个 target(URI),并返回给调用方一个地址列表,gRPC 内置了 DnsNameResolver。

地址的返回是基于 Listener 机制,NameResolver 的实现类,需要定义 start 方法,方法会传入一个 Listener,当服务列表发生变化时,调用 Listener 的 onResult 方法,通知 NameResolver 的持有方。

LoadBalancer

LoadBalancer 是一个可插拔的组件,接受 NameResolver 拿到的服务方列表,提供可用的 SubChannel。

RoundRobinLoadBalancer

从 RoundRobinLoadBalancer 的 handleResolvedAddresses 实现可以发现。

  • 每次刷新时

    • 对新增服务方创建 SubChannel
    • 对于删掉的服务方进行剔除
    • 对于可用的服务方,不会重新创建 SubChannel

ManagedChannelImpl

+Channel

Channel 去执行一次远程调用,是通过 newCall 方法,传入 **MethodDescriptor ** 和 CallOptions。对于 ManagedChannelImpl,其实现 Channel 接口,newCall 方法转而调用 InterceptorChannel 的 newCall,先简单说下 InterceptorChannel。

managedChannelImpl 字段是调用 ClientInterceptors.intercept(channel, interceptors) 构造,先说 InterceptorChannel 再说 ClientInterceptors。

InterceptorChannel 将 Interceptor 和 Channel 的结合,由 channel + interceptor 构造,调用 channel 的 newCall 时,会执行 interceptor 的 interceptCall,该调用会传入 channel。对于一个原始 channel 和 多个 interceptor,先将 interceptor 倒序,然后依次创建 InterceptorChannel,进行包装。

1
2
3
for (ClientInterceptor interceptor : interceptors) {
channel = new InterceptorChannel(channel, interceptor);
}

相比之下,ClientInterceptors 只是一个工具类。

接着,怎么用上 NameSolver 的。在构造 interceptorChannel 时,传入一个 channel。这个channel 是一个 RealChannel 对象。

RealChannel 实现了 Channel 接口。

+SubChannel

这里需要再提一下,Channel 是逻辑连接,SubChannel 是物理连接。ManagedChannelImpl 实现了 Channel 接口,同时,有一个内部类 SubchannelImpl 实现 SubChannel。

创建物理连接

首先调用 SubchannelImpl 的 requestConnection ,在方法内会调用 InternalSubchannelobtainActiveTransport 创建和 SocketAddress 的连接。

+NameResolver

在 ManagedChannelImpl 内,定义了 NameResolverListener,实现了NameResolver.Listener 接口,在 NameResolverListener 内做了 LoadBalancer 和 NameResolver 的结合。

NameResolver + LoadBalancer

在 NameResolverListener 的 onResult 方法内,当服务器地址变更时会执行改方法。首先,会从参数中获取服务端列 表List<EquivalentAddressGroup>,接下来调用 AutoConfiguredLoadBalancer 的 tryHandleResolvedAddresses 方法,再调用 LoadBalancer 的 handleResolvedAddresses。整个调用实现比较绕,直接了解内置的 LB 即可。

yEd

自适应大小

动态调整node至label大小

File > Preferences > Editor > Dynamically Adjust Node Size to Label Size

新建节点时,node 大小根据 label 大小动态调整。

不启用效果

启用效果

调整格式

调整node适应label

Tools > Fit Node To Label

假设,有如下 node。

调整后效果如下。

rsync

安装

1
$ brew install rsync

参数

1
2
P	等价于 --partial --progress
a archive 模式

同步文件夹

1
2
3
4
5
$ rsync -Pav <local> <user@ip:remote-dist>

# 使用特定 id_rsa
$ rsync -Pav -e "ssh -i ~/.ssh/id_rsa_sensors" <local> <user@ip:remote-dist>

实时同步

使用工具 fswatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 安装依赖
$ brew install fswatch

# watch 文件变动
$ fswatch . | xargs -n1 -I{} <do-something>

# 定义同步方法
function dosyn() {
if [ ! -e .RSYNCING ]; then
touch .RSYNCING
echo "begin to sync"
rsync -Pav -e "ssh -i ~/.ssh/<secret-id>" <local> <user@ip:remote-dist> # 修改这里
echo "rsync done at $(date), sleep 30 seconds"
sleep 30
rm .RSYNCING
echo "sleep done at $(date)"
else
echo "syncing OR sleeping ..."
fi
}

[ -e .RSYNCING ] && rm .RSYNCING
export -f dosyn
fswatch -e .RSYNCING -ro . | xargs -P0 -n1 -I{} bash -c 'dosyn'