protobuf - c++

Descriptor & Reflection

message

1
2
3
4
5
6
7
8
9
10
syntax = "proto3";

message Sea {
string name = 1;
}

message World {
int64 age = 1;
repeated string tag = 2;
}

Descriptor & Reflection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
World wd{};

// protobuf 3
auto age = World::GetDescriptor()->FindFieldByName("age");
// protobuf 2
auto age = wd.GetDescriptor()->FindFieldByName("age");

// 设置单值
World::GetReflection->SetInt64(&wd, age, 10);
// 设置 repeated 字段
World::GetReflection()->AddString(&wd, tag, "a");
World::GetReflection()->AddString(&wd, tag, "b");
World::GetReflection()->SetRepeatedString(&wd, tag, 1, "c");

// 设置 submessage
wd.mutable_sea()->set_name("pacific");

Json

1
2
3
4
5
6
7
8
9
10
#include <google/protobuf/util/json_util.h>

// message -> json
std::string output;
google::protobuf::util::MessageToJsonString(message, &output);

// json -> message
SomeMessage msg;
std::string input;
google::protobuf::util::MessageToJsonString(input, &msg);

cmake

广告推荐

广告商可以在广告交易平台,选择适合需求的计费模型和指标来优化广告投放。

买量方式

不同的广告系统,会根据业务的不同选择不同的买量方式(出价方式),程序化广告交易平台支持的常用买量方式有 CPI、CPC、CPM、深度转化出价、Target-CPE、Target-ROAS 等。对于视频平台广告系统可能会有 CPV 等买量方式。

  • CPI,Cost Per Install,每次安装成本
  • CPC,Cost Per Click,每次点击成本
  • CPM,Cost Per Mille,每千次展示成本
  • 双出价 / 深度转化出价
  • Target-CPE,Target Cost Per Engagement,每次参与成本,按参与收费
  • Target-ROAS,Target Return on Advertising Spend,按回报率收费
  • CPA,Cost Per Acquisition,每次获客成本
  • CPV,Cost Per View,每次观看成本,主要用于视频广告

指标

  • eCPM,期望价值,用于评估当前请求的预估价值

    • 基于 eCPM 来控制利润率
  • 填充率,FillRate,有效请求量(有广告填充) / 总请求量

  • ROI

ROI ,投资回报率(Return on Investment),利润 / 花费

  • CTR,点击率(点击 / 展示)
  • CVR,点击转化率(转化 / 点击)
  • IVR,曝光转化率(转化 / 展示)

eCPM = ivr * price * 1000

  • LTV,Lifetime Value,生命周期总价值,指一个用户在其与企业之间的整个生命周期内产生的总价值或收益

术语

  • Offer,在广告行业,广告主投的广告为 Offer,开发者接受 Offer,广告交易平台的作用是撮合开发者拿到广告主的 Offer
  • Campaign,广告活动,一个 Offer 可以拆分多个 Campaign
  • Supply,供给方,媒体
  • Demand,需求方,广告主
  • Advertiser,广告主
  • PMP,Private Marketplace,优质媒体私有化购买
  • IAA,In-App Advertising,应用内广告,通过展示广告来获得收入
  • IAP,In-App Purchases,应用内购买,帮助开发者实现收入来源
  • EGR,Engagement Rate,参与率,广告或营销活动中用户互动的比率
  • SKAN,基于苹果官方归因解决方案 SKAdNetwork 的回传数据做出的一个帮助广告主衡量 iOS 端投放效果的榜单,属于确定性归因

服务

DMP

数据管理平台(Data Management Platform,DMP),用于存储设备用户的兴趣、安装列表、以及人物画像等数据。

其他

  • PCOC,Predict Click Over Click,校准之后的点击率与后验点击率(近似真实概率)的比值

参考

logrotate

配置

示例

rotate.conf

1
2
3
4
5
6
7
8
9
/home/wii/www/logs/*.log {
daily
rotate 30
missingok
nocompress
copytruncate
notifempty
dateext
}

Rotate

触发 rotate

1
2
3
4
5
6
logrotate [options] [config-file]

logrotate -v -s /home/wii/www/logs/rotate-status rotate.conf

-v: 打印具体信息
-s: 指定 status 保存路径; 默认在 /var/lib/logrotate/ 下,可能没有权限

注意

  • logrotate 根据 status 文件判断是否需要 rotate,第一次执行可能只创建 status 文件,并不会创建 rotate 日志文件。如果想要触发,修改 status 文件记录的最后一次 rotate 时间,以触发 rotate

c++ threads

Thread

thread

1
2
3
#include <thread>

std::thread trd(function, args...);

线程私有数据

1
2
3
4
5
6
7
8
# 创建线程私有数据
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*)); # 第二个函数为 destructor 函数,线程结束时调用
# 设置私有数据
int pthread_setspecific(pthread_key_t key,const void *pointer)); # 第一个参数由 pthread_key_create 产生
# 获取私有数据
void *pthread_getspecific(pthread_key_t key);
# 删除
int pthread_key_delete(pthread_key_t key);

Mutex

1
2
#include <mutex>
#include <shared_mutex>

gunicorn logging

多进程日志问题

使用 gunicorn 启动 python 服务,会使用多进程。如果每个进程使用单独的 logger,操作相同的日志文件,那么对于按时间滚动的日志(TimedRotatingFileHandler),会出现冲突(多个进程同一时间 reopen 日志文件)。冲突造成的现象比较复杂,比如丢失日志,再比如开始运行正常但是过一段时间无法写入日志。这里 可以了解,gunicorn 使用 os.fork 创建的多进程,可以公用日志文件的 offset 和句柄。但是对于多进程写到同一个文件仍可能有问题,解决方案可以参考这里,方式是使用 QueueHandler 或 SocketHandler。

另外一种方案,是把日志写到一个文件里面,使用 logrotate 工具分隔日志。建议使用这种方式,不使用内置的 TimedRotatingFileHandler,代价是新增一个 logrotate 的配置 + crontab 配置。

当然,还有其他方式,比如继承 TimedRotatingFileHandler,使用锁来规避问题,这里这里是个示例。

logrotate

1
2
3
4
5
6
7
8
9
10
11
12
13
# /home/ubuntu/server/config/server-logrotate.conf
/home/ubuntu/server/logs/server.log {
daily
rotate 30
missingok
nocompress
copytruncate
notifempty
dateext
dateyesterday
}
# crontab
0 0 * * * cd /home/ubuntu/server; logrotate -v -s logs/logrotate-status configs/logrotate.conf >> logs/logrotate.log 2>&1 &

Log Server

一个 Log Server 可以对应多个 Python 服务,只需修改 config 文件即可。多个进程 / 线程通过网络把日志写到日志服务,日志服务负责落日志,这样就避免了多个进程同时操作同一个日志文件的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import logging
import logging.config
import logging.handlers
import pickle
import socketserver
import struct

logging.config.fileConfig('config/log_config.ini')


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.

This basically logs the record using whatever logging policy is
configured locally.
"""

def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)

def unPickle(self, data):
return pickle.loads(data)

def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)


class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""

allow_reuse_address = True

def __init__(self, host='127.0.0.1',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None

def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort


def run(port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT):
tcpserver = LogRecordSocketReceiver(port=port)
print('About to start TCP server...')
tcpserver.serve_until_stopped()


if __name__ == '__main__':
run()

日志配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
; declarations
[loggers]
keys = root,serving,webhook

[handlers]
keys = generic,serving,webhook

[formatters]
keys = generic

; loggers
[logger_root]
level = INFO
handlers = generic

[logger_serving]
level = INFO
handlers = serving
propagate = 0
qualname = serving

[logger_webhook]
level = INFO
handlers = webhook
propagate = 0
qualname = webhook

; handlers
[handler_generic]
class = handlers.TimedRotatingFileHandler
args = ('./logs/generic.log', 'midnight', 1, 30,)
level = INFO
formatter = generic

[handler_serving]
class = handlers.TimedRotatingFileHandler
args = ('./logs/serving.log', 'midnight', 1, 30,)
level = INFO
formatter = generic

[handler_webhook]
class = handlers.TimedRotatingFileHandler
args = ('./logs/webhook.log', 'midnight', 1, 30,)
level = INFO
formatter = generic

; formatters
[formatter_generic]
format = %(asctime)s-%(levelname)s: %(message)s
datefmt = %Y-%m-%d %H:%M:%S%z
class = logging.Formatter

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def get_logger(name, port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT, level=logging.INFO):
log = logging.getLogger(name)
log.setLevel(level)
# socket handler
sh = logging.handlers.SocketHandler('127.0.0.1', port)
formatter = logging.Formatter('%(asctime)s-%(levelname)s: %(message)s')
# consul handler
ch = logging.StreamHandler()
ch.setLevel(level)
ch.setFormatter(formatter)

log.addHandler(sh)
log.addHandler(ch)
return log


logger = get_logger('serving')
webhook_logger = get_logger('webhook')

mongo - cxx driver

安装

1
2
# brew
brew insall mongo-cxx-deriver

其他参考这里

示例

官方示例参考这里

连接数据库

1
2
3
mongocxx::instance instance{};
mongocxx::uri uri("mongodb://192.168.4.13:27017");
mongocxx::client client(uri);

读取数组

1
2
3
4
5
6
7
8
auto arr_element = doc["contribs"];
if (arr_element && arr_element.type() == bsoncxx::type::k_array) {
bsoncxx::array::view arr(arr_element.get_array().value);
std::cout << arr.length() << std::endl; // 67; not count of elements
for (auto e: arr) {
std::cout << "E " << e.get_utf8().value.to_string() << std::endl;
}
}

grafana

安装

可以从 清华源 下载。

1
2
3
# centos
wget https://mirrors.tuna.tsinghua.edu.cn/grafana/yum/rpm/grafana-7.5.3-1.x86_64.rpm
sudo npm install grafana-7.5.3-1.x86_64.rpm

插件

Grafana Image Renderer

image

1
2
# 需要安装的依赖
yum install -y libXcomposite libXdamage libXtst cups libXScrnSaver pango atk adwaita-cursor-theme adwaita-icon-theme at at-spi2-atk at-spi2-core cairo-gobject colord-libs dconf desktop-file-utils ed emacs-filesystem gdk-pixbuf2 glib-networking gnutls gsettings-desktop-schemas gtk-update-icon-cache gtk3 hicolor-icon-theme jasper-libs json-glib libappindicator-gtk3 libdbusmenu libdbusmenu-gtk3 libepoxy liberation-fonts liberation-narrow-fonts liberation-sans-fonts liberation-serif-fonts libgusb libindicator-gtk3 libmodman libproxy libsoup libwayland-cursor libwayland-egl libxkbcommon m4 mailx nettle patch psmisc redhat-lsb-core redhat-lsb-submod-security rest spax time trousers xdg-utils xkeyboard-config alsa-lib

安装

1
grafana-cli plugins install grafana-image-renderer

seastar with docker

创建容器

1
docker run ... --sysctl net.core.somaxconn=10240 ...

设置

1
2
3
4
5
6
7
8
9
10
11
12
13
# 临时修改 aio-max-nr
# 注意 image-name 是镜像的名称,不是 container 的名称
docker run --privileged <image-name> sh -c 'echo 1048576 > /proc/sys/fs/aio-max-nr'

# 或者
docker run --privileged -it zhenkai.sun/external-with-openssh bash
sysctl -w fs.aio-max-nr=1048576

# 验证
cat /proc/sys/fs/aio-max-nr
cat /proc/sys/net/core/somaxconn

docker run --privileged --sysctl net.core.somaxconn=10240 zhenkai.sun/external-with-openssh bash