bitmap

bitmap

bitmap(位图),是一个位数组,物理层面是内存中的一块连续区域,形式上可以表示为 01010001011

用途

  • 布隆过滤器

roaring bitmap

对位图进行空间优化,思想是高位用于分桶,只存储低位。比如,如果存储一个 32 位整数,使用高 16 位获取分桶,每个分桶是一个 bitmap,低 16 位用来确定 bitmap 中的一个 bit,用 bit 值的 0/1 来标识数据是否存在。这样实际存储过程中,可以省略每个整数的高 16 位,理想情况下占用空间减少 50%。

用途

  • lucene 存储倒排索引(其倒排索引 id 递增)

参考

elastic search dev tool

索引

查询所有索引

1
2
GET /_cat/indices
{}

搜索

所有数据

1
2
3
4
5
6
7
GET /<index>/_search
{
"query": {
"match_all": {},
"track_total_hits": true
}
}

track_total_hits=true 返回准确的数量,不然 total.value 最大返回 10000

搜索字段

1
2
3
4
5
6
7
8
GET /<index>/_search
{
"query": {
"match": {
"<filed>": "<value>"
}
}
}

正则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
GET /<index>/_search
{
"query": {
"regexp": {
"<filed>": "<regepx>"
}
}
}

# 不为空
{
"query": {
"regexp": {
"<filed>": ".+"
}
}
}

有值

1
2
3
4
5
6
7
8
GET /_search
{
"query": {
"exists": {
"field": "<field>"
}
}
}

更新数据

1
2
3
4
5
6
POST /<index>/_update/<id>
{
"doc": {
"<field>": "<value>"
}
}

索引

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT /<new-index>
{
"settings": {
"index.store.type": "mmapfs",
"index.refresh_interval": "1s",
"index.number_of_replicas": "5",
"index.number_of_shards": "2",
"index.max_result_window": "5000000"
},
"mappings": {
"date_detection": true
}
}

删除索引

1
2
3
DELETE /<index>
{
}

重建索引

1
2
3
4
5
6
7
8
9
POST _reindex
{
"source": {
"index": "<source-index>"
},
"dest": {
"index": "<dest-index>"
}
}

python common notes

package

1
2
3
4
5
6
import pathlib
import sys
from os.path import dirname, abspath

PROJECT_BASE_PATH = dirname(abspath(pathlib.Path(__file__).absolute()))
sys.path.append(PROJECT_BASE_PATH)

path

判断路径是否存在

1
os.path.exists('path')

获取 Home 路径

1
2
from pathlib import Path
Path.home() # 用户目录

json

序列化类

1
json.dumps(obj, default=vars)  # 使用 vars 序列化对象, 打印对象的所有属性

multiprocessing

共享自定义对象

1
2
3
4
5
6
7
8
9
class A():
pass

AProxy = MakeProxyType('A', public_methods(A))
setattr(multiprocessing.managers, 'A', AProxy)
SyncManager.register('A', A, AProxy)
manager = SyncManager()
manager.start()
shared_a = manager.A()

python argparse

创建解析对象

1
parser = argparse.ArgumentParser()

kv

1
2
parser.add_argument('-a', '--age', required=True)      # 必选
parser.add_argument('-p', '--product', required=False) # 非必选

example

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument('-a', '--age', required=True)
...
>>> parser.add_argument('-p', '--product', required=False)
...
>>> parser.parse_args('-a 10'.split())
Namespace(age='10', product=None)
>>> parser.parse_args('-a 10 -p wii'.split())
Namespace(age='10', product='wii')
>>> parser.parse_args('-p wii'.split())
usage: [-h] -a AGE [-p PRODUCT]
: error: the following arguments are required: -a/--age

标记

1
2
3
parser.add_argument('-a', '--age', action='store_true')         # 如果指定则为 True, 否则为 False
parser.add_argument('-n', '--name', action='store_false') # 如果指定则为 False, 否则为 True
parser.add_argument('-m', '--price', action='store_const', const=10) # 如果指定则为 const 的值, 否则为 None

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument('-a', '--age', action='store_true')
...
>>> parser.add_argument('-n', '--name', action='store_false')
...
>>> parser.add_argument('-m', '--price', action='store_const', const=10)
...
>>> parser.parse_args('-anm'.split())
Namespace(age=True, name=False, price=10)
>>> parser.parse_args('-a'.split())
Namespace(age=True, name=True, price=None)
>>> parser.parse_args('-m'.split())
Namespace(age=False, name=True, price=10)
>>> parser.parse_args('-n'.split())
Namespace(age=False, name=False, price=None)
>>> parser.parse_args(''.split())
Namespace(age=False, name=True, price=None)

ubuntu user manager

1
2
3
4
5
6
7
8
9
10
11
# 添加 group
$ groupadd {groupname}

# 添加用户
$ useradd -d /home/{username} -m -s /bin/bash -g {username} {groupname}

# 添加 sudo
$ sudo usermod -aG sudo {username}

# 设置密码
$ sudo passwd {username}
1
2
3
4
groupadd wii
useradd -d /home/wii -m -s /bin/bash -g wii wii
sudo usermod -aG sudo wii
sudo passwd wii

ubuntu - vnc

安装 Desktop

1
2
3
4
5
6
sudo apt update && sudo apt upgrade 
sudo apt install tasksel -y
sudo tasksel # 选择安装 ubuntu-desktop
sudo systemctl set-default graphical.target
# 重启电脑
sudo reboot

安装 TigerVNCServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sudo apt install tigervnc-standalone-server tigervnc-common

# 设置 vnc 密码
vncpasswd

# 设置 vnc server
sudo vi ~/.vnc/xstartup
# 内容如下
#!/bin/sh
xrdb $HOME/.Xresources
vncconfig -iconic &
dbus-launch --exit-with-session gnome-session &

# 启动 vnc server
vncserver -localhost no

操作

1
2
3
4
5
# 列出启动的 session
vncserver -list

# kill session
vncserver -kill :<session-no> # 比如 :2

参考

java lock example

场景

等待条件发生

比如,有这样一个场景,在 spring boot 工程里面,有一个 controller,他会接受数据并把数据写入 kafka,然后返回写入 kafka 的结果。在调用 send 方法后,会得到一个 ListenableFuture 对象,这个对象可以传入 callback 对象,这是一个异步的过程,需要等待回调执行后,才能将结果返回给客户端。

我们就需要一种机制等待回调事件,这里用的模式如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Object mutex = new Object();

onSuccess/onFailure[callback] {
synchronized (mutex) {
// 发送消息成功后, 唤醒在等待的线程
mutex.notify();
}
}

// 程序会先走到这里, 并等待 mutex 唤醒
synchronized (mutex) {
mutex.wait();
}

return ...;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Controller
@RequestMapping("kafka")
public class KafkaController {
static class Keeper {
String result;
}

@Resource
KafkaTemplate<String, String> template;

@RequestMapping(value = "put", method = RequestMethod.POST,
produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public ResponseEntity<String> check(@RequestBody Message message) throws InterruptedException {
ListenableFuture<SendResult<String, String>> f = template.send("example", message.getKey(), message.getMessage());
Object mutex = new Object();
final Keeper keeper = new Keeper();
keeper.result = "unknown";
f.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(@SuppressWarnings("NullableProblems") Throwable throwable) {
keeper.result = "send message failed";
synchronized (mutex) {
mutex.notify();
}
}

@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
keeper.result = "send message success";
synchronized (mutex) {
mutex.notify();
} }
});

synchronized (mutex) {
mutex.wait();
}

return ResponseEntity.ok(keeper.result);
}
}

kafka attentions

  • 传输大文件
  • 每个partition的一个消息只能被一个consumer group中的一个consumer消费
  • consumer group 中的一个consumer可以消费多个partition
  • 一个consumer group 中的cunsomer processes必须小于partition的数量
  • each message is delivered exactly to one consumer from a group (with a same group id)

kafka notes

基础

  • 架构

    • 介绍
    • Rebalance
      • Why
        • 是Kafka集群的一个保护设置,用于剔除掉无法消费或过慢的消费者
        • 负载均衡
      • When
        • new consumer
        • consumer offline / exit / dead / unsubscribe
        • 消费者订阅的topic出现分区数量变化
      • 影响
        • 重复消费
        • Rebalance扩散到整个ConsumerGroup,一个Consumer的退出,导致Group进行Rebalance,影响面大
        • 频繁的Rebalance导致重复消费及Rebalance占用大量时间
        • 数据不能及时消费,会累计lag(消费滞后),在Kafka的TTL之后会丢弃数据
      • improve
        • 关闭 auto commit,手动管理offset和心跳
      • Rebalance Listener
      • more
  • 名词

    • Broker
      • 消息中间件处理节点
      • 一个Kafka节点就是一个broker
      • 一个或多个Kafka节点组成Kafka集群
    • Topic
      • Kafka根据Topic对消息进行归类
      • 发布到Kafka集群的每条消息都需要指定一个topic
    • Producer
      • 参数
        • Topic
        • Partition(Optional)
        • Key(Optional)
        • Value
    • Consumer
      • 一个Consumer可以消费一个或多个partition
    • ConsumerGroup
      • 每个Consumer属于一个特定的ConsumerGroup
      • 一条消息可以发送到多个不同的ConsumerGroup
      • 一个ConsumerGroup中的一条消息只被一个Consumer处理
      • 仅是用来对消费者进行分组来消费topic的消息
    • Partition
      • 物理上的概念
      • 一个topic的信息可以划分到多个partition
      • 每个partition内部是有序的
      • 每个partition在存储层面是append log文件
      • 顺序写磁盘,效率非常高,这是Kafka高吞吐量的重要保证
      • partition是broker属性,不影响producer
    • Segment
      • partition细分的物理概念
      • 包括:
        • .index文件
        • .log文件
      • Ref
    • Offset
      • 发布到partition的消息被追加到log文件的尾部
      • 每条消息在partition文件中的位置成为offset
      • offset是一个整形数字,唯一标记一条消息
    • Leader & Follower
      • 为了提高消息可靠性,Kafka为每个topic的partition设置N个副本
      • N 副本中的一个选举为Leader,其他为Follower
      • Leader处理partition的所有读写请求,follower定期复制leader上的数据
      • 负责维护和跟踪ISR(副本同步队列)中所有follower滞后的状态
      • producer发送一条消息到broker后,leader写入消息并复制到所有follower
  • 如何提高可靠性

    • 通过request.required.acks参数设置数据可靠性级别
      • 1:producer接受到leader成功收到数据并得到确认后发送下一条数据
        • 如果leader宕机,消息未同步到follower,可能会造成数据丢失
      • 0:producer无需等待来自broker的确认而继续发送下一条消息,可靠性最低
      • -1:producer等待ISR中所有follower都确认接收到数据才算一次发送成功,可靠性最高
        • 如果设置副本为1,也就是说只有leader,此时和设置request.required.acks等效,如果leader宕机,也会发生数据丢失
    • 保证高可靠性
      • topic的配置:replication.factor>=3,即副本数至少是个;2<=min.insync.replicas<=replication.factor
      • broker的配置:leader的选举条件unclean.leader.election.enable=false
      • producer的配置:request.required.acks=-1(all),producer.type=sync
  • 应用

    • 日志收集
    • 业务数据收集
    • page view
  • Ref

读取消息示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val props = new Properties()
props.setProperty("group.id", "-")
props.setProperty("bootstrap.servers", "-")
props.setProperty("auto.offset.reset", "-")
props.put("key.deserializer", classOf[StringDeserializer])
props.put("value.deserializer", classOf[StringDeserializer])
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic))
while (true) {
val results = consumer.poll(2000)
for (record <- results.iterator()) {
print(s"${record.key()} - ${record.value()}")
}
}