Skip to content

Commit aac0160

Browse files
committed
add doc
1 parent 952be8e commit aac0160

File tree

6 files changed

+136
-36
lines changed

6 files changed

+136
-36
lines changed

doc/online_trainer.md

+98-22
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# 流式训练
2-
推荐系统在服务的过程中,会不断产生可用于训练CTR模型的日志数据,流式训练是指数据不是一次性放入训练系统中,而是随着时间流式地加入到训练过程中去。每接收一个分片的数据,模型会对它进行预测,并利用该分片数据增量训练模型,同时按一定的频率保存全量或增量模型。
3-
本教程以[slot_dnn](../models/rank/slot_dnn/README.md)模型使用demo数据为例进行介绍
2+
推荐系统在服务的过程中,会不断产生可用于训练CTR模型的日志数据,流式训练是指数据不是一次性放入训练系统中,而是随着时间流式地加入到训练过程中去。每接收一个分片的数据,模型会对它进行预测,并利用该分片数据增量训练模型,同时按一定的频率保存全量或增量模型。
3+
本教程以[slot_dnn](../models/rank/slot_dnn/README.md)模型使用demo数据为例进行介绍
44

55
## 配置
66
流式训练配置参见models/rank/slot_dnn/config_online.yaml,新增配置及作用如下:
@@ -16,8 +16,8 @@
1616

1717
## 数据
1818
### 数据落盘目录格式
19-
在train_data_dir目录下,再建立两层目录,第一层目录对应训练数据的日期(8位),第二层目录对应训练数据的具体时间(4位,前两位为小时,后两位为分钟),并且需要与配置文件中的split_interval配置对应。
20-
例如:train_data_dir配置为“data”目录,split_interval配置为5,则具体的目录结构如下:
19+
在train_data_dir目录下,再建立两层目录,第一层目录对应训练数据的日期(8位),第二层目录对应训练数据的具体时间(4位,前两位为小时,后两位为分钟),并且需要与配置文件中的split_interval配置对应。
20+
例如:train_data_dir配置为“data”目录,split_interval配置为5,则具体的目录结构如下:
2121
```txt
2222
├── data
2323
├── 20190720 # 训练数据的日期
@@ -40,30 +40,106 @@
4040

4141
## 模型
4242
流式训练采用静态图参数服务器方式训练,在组网时需要注意几点:
43-
1. embedding层需使用paddle.static.nn.sparse_embedding,其中size参数的第一维可指定任意值,第二维为embedding向量的维度。
44-
2. 在组网中增加inference_feed_vars、inference_target_var两个变量的赋值,指明inference_model的输入和输出,供在线推理使用。
45-
3. 在组网中增加all_vars变量的赋值,可用于在线离线一致性检查。
46-
4. 如果希望在训练过程中dump出组网中的变量和网络参数(主要用于训练中的调试和异常检查),请赋值train_dump_fields和train_dump_params;如果希望在预测过程中dump出组网中的变量(主要用于线上预测所需特征的离线灌库),请赋值infer_dump_fields。
43+
1、embedding层需使用paddle.static.nn.sparse_embedding,其中size参数的第一维可指定任意值,第二维为embedding向量的维度。
44+
2、为了记录特征展现(show)和点击(click)的次数,需要在网络中定义两个变量,指明特征是否展现和点击,取值均为0或者1,sparse_embedding中通过entry参数传入一个ShowClickEntry,指明这两个变量(show和click)的名字。
45+
```python
46+
# net.py
47+
# 构造ShowClickEntry,指明展现和点击对应的变量名
48+
self.entry = paddle.distributed.ShowClickEntry("show", "click")
49+
emb = paddle.static.nn.sparse_embedding(
50+
input=s_input,
51+
size=[self.dict_dim, self.emb_dim],
52+
padding_idx=0,
53+
entry=self.entry, # 在sparse_embedding中传入entry
54+
param_attr=paddle.ParamAttr(name="embedding"))
55+
56+
# static_model.py
57+
# 构造show/click对应的data,变量名需要与entry中的名称一致
58+
show = paddle.static.data(
59+
name="show", shape=[None, 1], dtype="int64", lod_level=1)
60+
label = paddle.static.data(
61+
name="click", shape=[None, 1], dtype="int64", lod_level=1)
62+
```
4763

4864
## 训练
49-
请在models/rank/slot_dnn目录下执行如下命令,启动流式训练。
65+
请在models/rank/slot_dnn目录下执行如下命令,启动流式训练。
5066
```bash
5167
fleetrun --server_num=1 --worker_num=1 ../../../tools/static_ps_online_trainer.py -m config_online.yaml
5268
```
53-
启动后,可以在该目录下的log/workerlog.0文件中查看训练日志。
54-
正确的训练过程应该包含以下几个部分:
55-
1. 参数初始化:打印config_online.yaml中配置的参数。
56-
2. 获取之前已经训练好的模型并加载模型,如果之前没有保存模型,则跳过加载模型这一步。
57-
3. 循环训练每个pass的数据,其中包括获取训练数据(建立训练数据处理pipe);利用上个pass的模型预测当前pass的数据,并获取预测AUC;训练当前pass的数据,并获取训练AUC。
58-
4. 保存模型:根据checkpoint_per_pass配置,在固定pass数据训练完成之后,保存模型。
69+
启动后,可以在该目录下的log/workerlog.0文件中查看训练日志。
70+
正确的训练过程应该包含以下几个部分:
71+
1参数初始化:打印config_online.yaml中配置的参数。
72+
2获取之前已经训练好的模型并加载模型,如果之前没有保存模型,则跳过加载模型这一步。
73+
3循环训练每个pass的数据,其中包括获取训练数据(建立训练数据处理pipe);利用上个pass的模型预测当前pass的数据,并获取预测AUC;训练当前pass的数据,并获取训练AUC。
74+
4保存模型:根据checkpoint_per_pass配置,在固定pass数据训练完成之后,保存模型。
5975

6076
## 模型
61-
目前流式训练支持保存两种格式的模型。
77+
目前流式训练支持保存几种格式的模型:
6278
### 全量模型
6379
全量模型(checkpoint)用于流式训练的热启,具体目录为model_save_path/{$day}/{$pass_id}。
64-
其中model_save_path为config_online.yaml中的配置,day对应8位日期,pass_id对应流式训练过程中的第几个pass。
65-
目录下的embedding.shard目录为sparse特征对应的embedding,其中.txt文件为具体的embedding值和优化方法需要的统计量,.meta文件指明.txt文件的具体schema。
66-
目录下的其他文件为dense参数,文件名即为这些参数在组网中对应的var_name。
67-
### inference_model
68-
用于在线推理的模型,保存于model_save_path/day/inference_model_{$pass_id}中,分为model、sparse、dense三个部分。
69-
其中sparse和dense参数与checkpoint模型类似,多出一个名为“__model__”的文件,保存的是在线服务使用的组网(可能经过裁剪),线上服务可以直接加载。
80+
其中model_save_path为config_online.yaml中的配置,day对应8位日期,pass_id对应流式训练过程中的第几个pass。
81+
目录下存在两个文件夹,其中000为模型中的dense参数,001为模型中的sparse参数
82+
### batch model
83+
与checkpoint模型类似,一般在每天数据训练结束后保存,保存前调用shrink函数删除掉长久不见的sparse特征,节省空间。
84+
### base/delta model
85+
base/delta模型一般用于线上预测,与全量模型相比,在保存过程中去掉了部分出现频率不高的特征,降低模型保存的磁盘占用及耗时。
86+
这两个模型一般指sparse_embedding中的参数,因此需要搭配dnn_plugin(模型文件和dense参数文件)才能实现线上完整预测。
87+
base模型具体保存路径为model_save_path/{$day}/base,每天数据训练结束后保存,保存前调用shrink函数。
88+
delta模型具体保存路径为model_save_path/{$day}/delta_{$pass_id},每一个delta模型都是在上一个base/delta模型基础上进行保存的增量模型。
89+
90+
## 高级功能
91+
为进一步提升模型效果,降低存储空间,提供了一系列高级功能,下面逐一进行介绍相关的功能和配置。
92+
具体配置详情可参考config_online.yaml中的table_parameters部分。
93+
为使用高级功能,需要配置相应的table及accessor:
94+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
95+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
96+
| table_class | string | MemorySparseTable || 存储embedding的table名称 |
97+
| accessor_class | string | CtrCommonAccessor || 获取embedding的accessor名称 |
98+
### 特征频次score计算
99+
server端会根据特征的show和click计算一个频次得分,用于判断该特征embedding是否可以扩展、保存等,具体涉及到的配置如下:
100+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
101+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
102+
| nonclk_coeff | float | 任意 || 特征展现但未点击对应系数 |
103+
| click_coeff | float | 任意 || 特征点击对应系数 |
104+
具体频次score计算公式如下:
105+
score = click_coeff * click + noclick_coeff * (click - show)
106+
### 特征embedding扩展
107+
特征embedding初始情况下,只会生成一维embedding,其余维度均为0,当特征的频次score大于等于扩展阈值时,才会扩展出剩余维度,具体涉及到的配置如下:
108+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
109+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
110+
| embedx_dim | int | 任意 || 特征embedding扩展维度 |
111+
| embedx_threshold | int | 任意 || 特征embedding扩展阈值 |
112+
| fea_dim | int | 任意 || 特征embedding总维度 |
113+
需要注意的是:
114+
1、特征embedding的实际维度为1 + embedx_dim,即一维初始embedding + 扩展embedding。
115+
2、特征总维度包括show和click,因此fea_dim = embedx_dim + 3。
116+
### 特征embedding保存
117+
为降低模型保存的磁盘占用及耗时,在保存base/delta模型时,可以去掉部分出现频率不高的特征,具体涉及到的配置如下:
118+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
119+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
120+
| base_threshold | float | 任意 || 特征频次score大于等于该阈值才会在base模型中保存 |
121+
| delta_threshold | float | 任意 || 从上一个delta模型到当前delta模型,特征频次score大于等于该阈值才会在delta模型中保存 |
122+
| delta_keep_days | int | 任意 || 特征未出现天数小于等于该阈值才会在delta模型中保存 |
123+
| converter | string | 任意 || base/delta模型转换器 |
124+
| deconverter | string | 任意 || base/delta模型解压器 |
125+
### 特征embedding淘汰
126+
一般每天的数据训练完成后,会调用shrink函数删除掉一些长久不出现或者出现频率极低的特征,具体涉及到的配置如下:
127+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
128+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
129+
| show_click_decay_rate | float | 任意 || 调用shrink函数时,show和click会根据该配置进行衰减 |
130+
| delete_threshold | float | 任意 || 特征频次score小于该阈值时,删除该特征 |
131+
| delete_after_unseen_days | int | 任意 || 特征未出现天数大于该阈值时,删除该特征 |
132+
### 参数优化算法
133+
稀疏参数(sparse_embedding)优化算法配置,分为一维embedding的优化算法(embed_sgd_param)和扩展embedding的优化算法(embedx_sgd_param):
134+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
135+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
136+
| name | string | SparseAdaGradSGDRule/SparseNaiveSGDRule/SparseAdamSGDRule/StdAdaGradSGDRule || 优化算法名称 |
137+
| learning_rate | float | 任意 || 学习率 |
138+
| initial_g2sum | float | 任意 || g2sum初始值 |
139+
| initial_range | float | 任意 || embedding初始化范围[-initial_range, initial_range] |
140+
| weight_bounds | list(float) | 任意 || embedding在训练过程中的范围 |
141+
142+
稠密参数优化算法配置:
143+
| 名称 | 类型 | 取值 | 是否必须 | 作用描述 |
144+
| :---------------------------: | :----------: | :-------------------------------------------------------: | :------: | :------------------------------------------------------------------: |
145+
| adam_d2sum | bool | 任意 || 是否使用新的稠密参数优化算法 |

models/rank/slot_dnn/config_online.yaml

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
runner:
2-
use_gloo: True
3-
# train_data_dir: "afs:/xxx"
42
train_data_dir: "./data"
3+
# train_data_dir: "afs:/xxx"
54
use_gpu: False
65
train_batch_size: 32
76
print_interval: 1
87
model_save_path: "output_model"
9-
# model_save_path: "afs:/user/paddle/wangguanqun/pscore_output_model"
8+
# model_save_path: "afs:/xxx"
109
checkpoint_per_pass: 1
1110
save_delta_frequency: 1
1211

@@ -32,7 +31,6 @@ runner:
3231
train_dump_fields_dir: "./train_dump_data"
3332
infer_dump_fields_dir: "./infer_dump_data"
3433

35-
use_hadoop: False
3634
# fs_client:
3735
# uri: "afs://xxx"
3836
# user: "xxx"

tools/feature_importance.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def __init__(self, config):
6666
self.train_result_dict["speed"] = []
6767

6868
def run(self):
69-
fleet.init()
69+
self.init_fleet_with_gloo()
7070
self.network()
7171
if fleet.is_server():
7272
self.run_server()
@@ -76,6 +76,14 @@ def run(self):
7676
# self.record_result()
7777
logger.info("Run Success, Exit.")
7878

79+
def init_fleet_with_gloo(use_gloo=True):
80+
if use_gloo:
81+
os.environ["PADDLE_WITH_GLOO"] = "1"
82+
role = role_maker.PaddleCloudRoleMaker()
83+
fleet.init(role)
84+
else:
85+
fleet.init()
86+
7987
def network(self):
8088
self.model = get_model(self.config)
8189
self.input_data = self.model.create_feeds()

tools/static_ps_offline_infer.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __init__(self, config):
6464
self.train_result_dict["speed"] = []
6565

6666
def run(self):
67-
fleet.init()
67+
self.init_fleet_with_gloo()
6868
self.network()
6969
if fleet.is_server():
7070
self.run_server()
@@ -74,6 +74,14 @@ def run(self):
7474
# self.record_result()
7575
logger.info("Run Success, Exit.")
7676

77+
def init_fleet_with_gloo(use_gloo=True):
78+
if use_gloo:
79+
os.environ["PADDLE_WITH_GLOO"] = "1"
80+
role = role_maker.PaddleCloudRoleMaker()
81+
fleet.init(role)
82+
else:
83+
fleet.init()
84+
7785
def network(self):
7886
model = get_model(self.config)
7987
self.input_data = model.create_feeds()

tools/static_ps_online_trainer.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def __init__(self, config):
6262
self.metrics = {}
6363
self.config = config
6464
self.exe = None
65-
self.use_gloo = config.get("runner.use_gloo", False)
6665
self.reader_type = config.get("runner.reader_type", "InMemoryDataset")
6766
self.split_interval = config.get("runner.split_interval", 5)
6867
self.split_per_pass = config.get("runner.split_per_pass", 1)
@@ -92,12 +91,7 @@ def __init__(self, config):
9291
self.hadoop_client = None
9392

9493
def run(self):
95-
if self.use_gloo:
96-
os.environ["PADDLE_WITH_GLOO"] = "1"
97-
role = role_maker.PaddleCloudRoleMaker(init_gloo=True)
98-
fleet.init(role)
99-
else:
100-
fleet.init()
94+
self.init_fleet_with_gloo()
10195
self.init_network()
10296
if fleet.is_server():
10397
self.run_server()
@@ -107,6 +101,14 @@ def run(self):
107101
# self.record_result()
108102
logger.info("Run Success, Exit.")
109103

104+
def init_fleet_with_gloo(use_gloo=True):
105+
if use_gloo:
106+
os.environ["PADDLE_WITH_GLOO"] = "1"
107+
role = role_maker.PaddleCloudRoleMaker()
108+
fleet.init(role)
109+
else:
110+
fleet.init()
111+
110112
def init_network(self):
111113
model = get_model(self.config)
112114
self.input_data = model.create_feeds()

tools/static_ps_trainer.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def __init__(self, config):
7979
self.pure_bf16 = self.config['pure_bf16']
8080

8181
def run(self):
82-
fleet.init()
82+
self.init_fleet_with_gloo()
8383
self.network()
8484
if fleet.is_server():
8585
self.run_server()
@@ -89,6 +89,14 @@ def run(self):
8989
self.record_result()
9090
logger.info("Run Success, Exit.")
9191

92+
def init_fleet_with_gloo(use_gloo=True):
93+
if use_gloo:
94+
os.environ["PADDLE_WITH_GLOO"] = "1"
95+
role = role_maker.PaddleCloudRoleMaker()
96+
fleet.init(role)
97+
else:
98+
fleet.init()
99+
92100
def network(self):
93101
self.model = get_model(self.config)
94102
self.input_data = self.model.create_feeds()

0 commit comments

Comments
 (0)