kafka configuration

Common

配置项 单位 说明
bootstrap.servers - brokers 列表,多个服务器 , 分隔
metadata.max.age.ms 毫秒 本地缓存的 meta 信息最大有效时间
send.buffer.bytes 字节(bytes) TCP 发送缓存大小,-1 使用系统默认值
receive.buffer.bytes 字节(bytes) TCP 接受缓存大小,-1 使用系统默认值

Producer

配置项 单位 说明
client.id - 向服务端发请求的客户端标识,主要用于 kafka 服务端落日志及 debug
reconnect.backoff.ms 毫秒 客户端重连 broker 的最小等待时间
reconnect.backoff.max.ms 毫秒 每次重连失败会增加等待时间,直到此最大等待时间
retries - 消息发送失败重试次数;设置大于 0 的值,会触发客户端遇到错误后重新发送消息;设置 retries 且 max.in.flight.requests.per.connection > 1 可能会导致消息重排序
retry.backoff.ms 毫秒 消息重发等待时间
connections.max.idle.ms 毫秒 限制连接最大空闲时间
request.timeout.ms 毫秒 请求超时时间;应大于 replica.lag.time.max.ms (broker配置) 的值
batch.size 字节(bytes) 设置批量发送消息时每条消息最大字节数,设置 0 禁用批量发送消息
acks - 设置 kafka 集群 leader 需要多少 ack 才认为消息发送成功;设置 0,producer 不等待确认,消息写入发送缓存,这种情况下不保证服务端接收到消息,retries 配置不生效,offset 返回值总是 -1;设置 1,kafka leader 消息写入日志文件后发送确认消息,不等待 followers 数据写入确认,如果 leader down 掉可能会出现消息丢失;设置 all 或 -1,kafka leader 等待所有 follower 确认写入后,再发送确认消息。
linger.ms 毫秒 设置徘徊等待时间,消息到达后不是立即发送给消费者,而是等待一段时间,以便聚合更到消息,批量发送;如果达到 batch.size 的限制,则会忽略该配置立即发送;默认是0,不启用消息徘徊等待
max.request.size 字节(bytes) 一个请求的最大字节数;服务器可能用不同的方式来计算 batch size
max.block.ms 毫秒 控制 KafkaProducer.send()KafkaProducer.partitionsFor() 的阻塞时间,这两个方法可能会因为缓存满或元数据不可用阻塞
buffer.memory 字节(bytes) producer 用于发送消息缓存的全部字节数,不仅缓存消息,还有处理中的请求、压缩后的消息
compression.type - 压缩类型;none、gzip、snappy、lz4
max.in.flight.requests.per.connection - 每个连接允许的最大正在处理(未收到确认)请求数,超过之后会阻塞;如果设置的值大于 1,那么存在因为消息发送失败及启用 retries 导致的消息重排序的问题(比如 message1 先发送且失败,message2 后发送且成功,message1 会因为重试机制再次发送且后于 message2 确认)
key.serializer - key 的序列化类,实现 org.apache.kafka.common.serialization.Serializer 接口
value.serializer - value 的序列化类,实现 org.apache.kafka.common.serialization.Serializer 接口
partitioner.class - 分区类,实现 org.apache.kafka.clients.producer.Partitioner 接口
interceptor.classes - 拦截器类,实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口;允许在发送给 cluster 前拦截 producer 的消息;默认没有拦截器
enable.idempotence - 幂等性设置;设置 true 会保证消息只被写入一次,前提配置是 enable.idempotence <= 5,retries > 0,acks = all;如果设为 false,则可能会出现一条消息因为重试写入多次
transaction.timeout.ms 毫秒 事务协作者等待事务状态更新的超时时间
transactional.id - 用于事务投递,如果配置,则必须开启 enable.idempotence,默认是 null,不能使用事务

Consumer

配置项 单位 说明
group.id - 消费组
max.poll.records - 单次调用 poll() 返回的最大消息条数
max.poll.interval.ms 毫秒 使用消费组管理时,触发poll() 的最大间隔时间
session.timeout.ms 毫秒 消费者存活周期;使用组管理设施时,消费者定期发送心跳延长 broker 保存的存活时间,如果超过该项设置的时间没有收到心跳请求,broker 会移除 consumer 并 rebalance;值必须是 broker 允许的值,在 group.min.session.timeout.msgroup.max.session.timeout.ms (broker 配置)之间
heartbeat.interval.ms 毫秒 心跳请求发送周期;该值一般不应大于 session.timeout.ms 的 1/3
enable.auto.commit - 如果为 true,消费者的 offset 会在后台定期提交
auto.commit.interval.ms 毫秒 消费者定期提交 offset 的间隔
partition.assignment.strategy - 类名,消费者使用的分区分配策略
auto.offset.reset - 没有初始的 offset 信息时消息消费策略;earliest,从最早的 offset 消费;latest,从新的消息开始消费;none,如果没有设置消费组 offset 则抛出异常
fetch.min.bytes 字节 拉取请求 server 返回的最小字节数;不足则等待足够的消息再返回,默认为1,只要有消息则立即返回
fetch.max.bytes 字节 拉取请求 server 返回的最大字节数;并不是一个绝对最大值,还受其他配置影响,比如 max.message.bytes,默认 50 MB
fetch.max.wait.ms 毫秒 服务响应拉取请求的最大阻塞时间
max.partition.fetch.bytes 毫秒 拉取请求,每个分区返回的最大字节数,默认 1 MB
client.id - 向服务端发请求的客户端标识,主要用于 kafka 服务端落日志及 debug
check.crcs - 开启 CRC32 检查
key.deserializer - key 反序列化类,实现 org.apache.kafka.common.serialization.Deserializer 接口
value.deserializer - key 反序列化类,实现 org.apache.kafka.common.serialization.Deserializer 接口
default.api.timeout.ms 毫秒 consumer API 的默认超时时间
interceptor.classes - 拦截器类,实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口
exclude.internal.topics - 内部 topic 消息是否暴露给消费者;如果为 true,唯一可以从内部 topic 获取消息的方式是订阅它
internal.leave.group.on.close - consumer 关闭时是否退出消费组;如果为 false,则消费者关闭后不会触发 rebalance,知道 session.timeout.ms 过期
isolation.level - 控制读取事务消息的方式;read_committed,consumer.poll() 只返回提交了的事务消息;read_uncommitted,返回所有消息,包括被废弃的事务消息;对于非事务消息,两种方式会无条件返回;默认是 read_uncommitted

其他

幂等性

producer 设置 enable.idempotence 为 true,kafka 会保证消息按顺序且不重复的送达。

原理

每个 producer 被分配一个 PID(producer id),向 broker 发送消息时会带上,且每个消息有一个单调递增的序列号,生产者为主题的每个分区维护一个序列号,broker 同样记录 producer 的最大消息序列号,只接受 +1 的消息。

参考

grpc python3

安装

1
$ pip3 install grpcio

示例

1
2
3
4
5
import grpc
channel = grpc.insecure_channel('localhost:8000')
grpc.channel_ready_future(self.channel).result(10) # 等待 channel ready, 超时 10s
stub = EchoServiceGRPC.EchoServiceStub(self.channel) # 创建 stub
stub.ping('pong') # 远程调用

bitmap

bitmap

bitmap(位图),是一个位数组,物理层面是内存中的一块连续区域,形式上可以表示为 01010001011

用途

  • 布隆过滤器

roaring bitmap

对位图进行空间优化,思想是高位用于分桶,只存储低位。比如,如果存储一个 32 位整数,使用高 16 位获取分桶,每个分桶是一个 bitmap,低 16 位用来确定 bitmap 中的一个 bit,用 bit 值的 0/1 来标识数据是否存在。这样实际存储过程中,可以省略每个整数的高 16 位,理想情况下占用空间减少 50%。

用途

  • lucene 存储倒排索引(其倒排索引 id 递增)

参考

elastic search dev tool

索引

查询所有索引

1
2
GET /_cat/indices
{}

搜索

所有数据

1
2
3
4
5
6
7
GET /<index>/_search
{
"query": {
"match_all": {},
"track_total_hits": true
}
}

track_total_hits=true 返回准确的数量,不然 total.value 最大返回 10000

搜索字段

1
2
3
4
5
6
7
8
GET /<index>/_search
{
"query": {
"match": {
"<filed>": "<value>"
}
}
}

正则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET /<index>/_search
{
"query": {
"regexp": {
"<filed>": "<regepx>"
}
}
}

# 不为空
{
"query": {
"regexp": {
"<filed>": ".+"
}
}
}

有值

1
2
3
4
5
6
7
8
GET /_search
{
"query": {
"exists": {
"field": "<field>"
}
}
}

更新数据

1
2
3
4
5
6
POST /<index>/_update/<id>
{
"doc": {
"<field>": "<value>"
}
}

索引

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT /<new-index>
{
"settings": {
"index.store.type": "mmapfs",
"index.refresh_interval": "1s",
"index.number_of_replicas": "5",
"index.number_of_shards": "2",
"index.max_result_window": "5000000"
},
"mappings": {
"date_detection": true
}
}

删除索引

1
2
3
DELETE /<index>
{
}

重建索引

1
2
3
4
5
6
7
8
9
POST _reindex
{
"source": {
"index": "<source-index>"
},
"dest": {
"index": "<dest-index>"
}
}

python common notes

package

1
2
3
4
5
6
import pathlib
import sys
from os.path import dirname, abspath

PROJECT_BASE_PATH = dirname(abspath(pathlib.Path(__file__).absolute()))
sys.path.append(PROJECT_BASE_PATH)

path

判断路径是否存在

1
os.path.exists('path')

获取 Home 路径

1
2
from pathlib import Path
Path.home() # 用户目录

json

序列化类

1
json.dumps(obj, default=vars)  # 使用 vars 序列化对象, 打印对象的所有属性

multiprocessing

共享自定义对象

1
2
3
4
5
6
7
8
9
class A():
pass

AProxy = MakeProxyType('A', public_methods(A))
setattr(multiprocessing.managers, 'A', AProxy)
SyncManager.register('A', A, AProxy)
manager = SyncManager()
manager.start()
shared_a = manager.A()

python argparse

创建解析对象

1
parser = argparse.ArgumentParser()

kv

1
2
parser.add_argument('-a', '--age', required=True)      # 必选
parser.add_argument('-p', '--product', required=False) # 非必选

example

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument('-a', '--age', required=True)
...
>>> parser.add_argument('-p', '--product', required=False)
...
>>> parser.parse_args('-a 10'.split())
Namespace(age='10', product=None)
>>> parser.parse_args('-a 10 -p wii'.split())
Namespace(age='10', product='wii')
>>> parser.parse_args('-p wii'.split())
usage: [-h] -a AGE [-p PRODUCT]
: error: the following arguments are required: -a/--age

标记

1
2
3
parser.add_argument('-a', '--age', action='store_true')         # 如果指定则为 True, 否则为 False
parser.add_argument('-n', '--name', action='store_false') # 如果指定则为 False, 否则为 True
parser.add_argument('-m', '--price', action='store_const', const=10) # 如果指定则为 const 的值, 否则为 None

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument('-a', '--age', action='store_true')
...
>>> parser.add_argument('-n', '--name', action='store_false')
...
>>> parser.add_argument('-m', '--price', action='store_const', const=10)
...
>>> parser.parse_args('-anm'.split())
Namespace(age=True, name=False, price=10)
>>> parser.parse_args('-a'.split())
Namespace(age=True, name=True, price=None)
>>> parser.parse_args('-m'.split())
Namespace(age=False, name=True, price=10)
>>> parser.parse_args('-n'.split())
Namespace(age=False, name=False, price=None)
>>> parser.parse_args(''.split())
Namespace(age=False, name=True, price=None)

ubuntu user manager

1
2
3
4
5
6
7
8
9
10
11
# 添加 group
$ groupadd {groupname}

# 添加用户
$ useradd -d /home/{username} -m -s /bin/bash -g {username} {groupname}

# 添加 sudo
$ sudo usermod -aG sudo {username}

# 设置密码
$ sudo passwd {username}
1
2
3
4
groupadd wii
useradd -d /home/wii -m -s /bin/bash -g wii wii
sudo usermod -aG sudo wii
sudo passwd wii

ubuntu - vnc

安装 Desktop

1
2
3
4
5
6
sudo apt update && sudo apt upgrade 
sudo apt install tasksel -y
sudo tasksel # 选择安装 ubuntu-desktop
sudo systemctl set-default graphical.target
# 重启电脑
sudo reboot

安装 TigerVNCServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sudo apt install tigervnc-standalone-server tigervnc-common

# 设置 vnc 密码
vncpasswd

# 设置 vnc server
sudo vi ~/.vnc/xstartup
# 内容如下
#!/bin/sh
xrdb $HOME/.Xresources
vncconfig -iconic &
dbus-launch --exit-with-session gnome-session &

# 启动 vnc server
vncserver -localhost no

操作

1
2
3
4
5
# 列出启动的 session
vncserver -list

# kill session
vncserver -kill :<session-no> # 比如 :2

参考

java lock example

场景

等待条件发生

比如,有这样一个场景,在 spring boot 工程里面,有一个 controller,他会接受数据并把数据写入 kafka,然后返回写入 kafka 的结果。在调用 send 方法后,会得到一个 ListenableFuture 对象,这个对象可以传入 callback 对象,这是一个异步的过程,需要等待回调执行后,才能将结果返回给客户端。

我们就需要一种机制等待回调事件,这里用的模式如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Object mutex = new Object();

onSuccess/onFailure[callback] {
synchronized (mutex) {
// 发送消息成功后, 唤醒在等待的线程
mutex.notify();
}
}

// 程序会先走到这里, 并等待 mutex 唤醒
synchronized (mutex) {
mutex.wait();
}

return ...;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Controller
@RequestMapping("kafka")
public class KafkaController {
static class Keeper {
String result;
}

@Resource
KafkaTemplate<String, String> template;

@RequestMapping(value = "put", method = RequestMethod.POST,
produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public ResponseEntity<String> check(@RequestBody Message message) throws InterruptedException {
ListenableFuture<SendResult<String, String>> f = template.send("example", message.getKey(), message.getMessage());
Object mutex = new Object();
final Keeper keeper = new Keeper();
keeper.result = "unknown";
f.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(@SuppressWarnings("NullableProblems") Throwable throwable) {
keeper.result = "send message failed";
synchronized (mutex) {
mutex.notify();
}
}

@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
keeper.result = "send message success";
synchronized (mutex) {
mutex.notify();
} }
});

synchronized (mutex) {
mutex.wait();
}

return ResponseEntity.ok(keeper.result);
}
}