Skip to content

Commit 2f30632

Browse files
authored
Merge pull request #639 from esythan/new_trainer
New online trainer
2 parents bfd8724 + b6c6845 commit 2f30632

11 files changed

+567
-207
lines changed

models/rank/slot_dnn/config_offline_infer.yaml

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ runner:
2828
reader_type: "InmemoryDataset" # DataLoader / QueueDataset / RecDataset
2929
pipe_command: "python3 inmemorydataset_reader.py"
3030

31-
init_model_path: "output_model/20190720/inference_model_6"
31+
init_model_path: "output_model/20190720/6"
32+
model_mode: 0
3233

3334
dataset_debug: False
3435
parse_ins_id: True
@@ -52,3 +53,44 @@ hyper_parameters:
5253
slot_num: 300
5354
layer_sizes: [512, 256, 128]
5455
distributed_embedding: 0
56+
57+
table_parameters:
58+
embedding:
59+
table_class: "MemorySparseTable"
60+
shard_num: 10
61+
accessor:
62+
accessor_class: "CtrCommonAccessor"
63+
fea_dim: 11
64+
embedx_dim: 8
65+
embedx_threshold: 10
66+
embed_sgd_param:
67+
name: "SparseAdaGradSGDRule"
68+
adagrad:
69+
learning_rate: 0.05
70+
initial_g2sum: 1.0
71+
initial_range: 0.0001
72+
weight_bounds: [-10.0, 10.0]
73+
embedx_sgd_param:
74+
name: "SparseAdaGradSGDRule"
75+
adagrad:
76+
learning_rate: 0.05
77+
initial_g2sum: 3.0
78+
initial_range: 0.0001
79+
weight_bounds: [-10.0, 10.0]
80+
ctr_accessor_param:
81+
nonclk_coeff: 0.1
82+
click_coeff: 1.0
83+
# base_threshold: 1.5
84+
# delta_threshold: 0.25
85+
base_threshold: 0
86+
delta_threshold: 0
87+
delta_keep_days: 16.0
88+
show_click_decay_rate: 0.98
89+
delete_threshold: 0.8
90+
delete_after_unseen_days: 30.0
91+
ssd_unseenday_threshold: 1
92+
# table_accessor_save_param:
93+
# num: 2
94+
# param: [1, 2]
95+
# converter: ""
96+
# deconverter: ""

models/rank/slot_dnn/config_online.yaml

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,43 @@ runner:
22
use_gloo: True
33
# train_data_dir: "afs:/xxx"
44
train_data_dir: "./data"
5-
train_reader_path: "criteo_reader" # importlib format
65
use_gpu: False
7-
use_auc: True
86
train_batch_size: 32
9-
epochs: 3
107
print_interval: 1
118
model_save_path: "output_model"
12-
checkpoint_per_pass: 6
13-
save_delta_frequency: 6
9+
# model_save_path: "afs:/user/paddle/wangguanqun/pscore_output_model"
10+
checkpoint_per_pass: 1
11+
save_delta_frequency: 1
1412

1513
train_thread_num: 3
14+
shuffle_thread_num: 12
1615
reader_type: "InMemoryDataset" # DataLoader / QueueDataset / RecDataset
1716
pipe_command: "python3.7 queuedataset_reader.py"
1817
dataset_debug: False
19-
split_file_list: False
2018
# data_donefile: "data.done"
2119
data_sleep_second: 1
2220
sync_mode: "async"
2321

24-
split_interval: 5
22+
split_interval: 30
2523
split_per_pass: 2
2624
start_day: "20190720"
2725
end_day: "20190722"
2826
infer_batch_size: 32
2927
infer_thread_num: 1
30-
infer_reader_path: "criteo_reader" # importlib format
3128
infer_data_dir: "data/"
32-
infer_load_path: "output_model"
33-
infer_start_epoch: 0
34-
infer_end_epoch: 3
35-
use_inference: True
3629

3730
# need_train_dump: True
3831
# need_infer_dump: True
3932
train_dump_fields_dir: "./train_dump_data"
4033
infer_dump_fields_dir: "./infer_dump_data"
34+
35+
use_hadoop: False
4136
# fs_client:
4237
# uri: "afs://xxx"
4338
# user: "xxx"
4439
# passwd: "xxx"
45-
# hadoop_bin: "hadoop"
40+
# hadoop_bin: "$HADOOP_HOME/bin/hadoop"
41+
4642
# hyper parameters of user-defined network
4743
hyper_parameters:
4844
# optimizer config
@@ -57,3 +53,43 @@ hyper_parameters:
5753
slot_num: 300
5854
layer_sizes: [512, 256, 128]
5955
distributed_embedding: 0
56+
# adam_d2sum: False
57+
58+
table_parameters:
59+
embedding:
60+
table_class: "MemorySparseTable"
61+
shard_num: 10
62+
accessor:
63+
accessor_class: "CtrCommonAccessor"
64+
fea_dim: 11
65+
embedx_dim: 8
66+
embedx_threshold: 10
67+
embed_sgd_param:
68+
name: "SparseAdaGradSGDRule"
69+
adagrad:
70+
learning_rate: 0.05
71+
initial_g2sum: 3.0
72+
initial_range: 0.0001
73+
weight_bounds: [-10.0, 10.0]
74+
embedx_sgd_param:
75+
name: "SparseAdaGradSGDRule"
76+
adagrad:
77+
learning_rate: 0.05
78+
initial_g2sum: 3.0
79+
initial_range: 0.0001
80+
weight_bounds: [-10.0, 10.0]
81+
ctr_accessor_param:
82+
nonclk_coeff: 0.1
83+
click_coeff: 1.0
84+
base_threshold: 1.5
85+
delta_threshold: 0.25
86+
delta_keep_days: 16.0
87+
show_click_decay_rate: 0.98
88+
delete_threshold: 0.8
89+
delete_after_unseen_days: 30.0
90+
ssd_unseenday_threshold: 1
91+
# table_accessor_save_param:
92+
# num: 2
93+
# param: [1, 2]
94+
# converter: ""
95+
# deconverter: ""

models/rank/slot_dnn/inmemorydataset_reader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ def line_process(self, line):
7878
output[self.slot2index[i]][1].extend([self.padding])
7979
else:
8080
self.visit[slot] = False
81+
82+
# add show
83+
output = [("0", ["1"])] + output
8184
output = [("ins_id", [ins_id])] + output
8285
return output
8386
#return [label] + sparse_feature + [dense_feature]

models/rank/slot_dnn/net.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import paddle.nn as nn
1717
import paddle.nn.functional as F
1818
import math
19+
import paddle.fluid as fluid
1920

2021

2122
class BenchmarkDNNLayer(nn.Layer):
@@ -33,15 +34,7 @@ def __init__(self,
3334
self.layer_sizes = layer_sizes
3435
self._init_range = 0.2
3536

36-
# to do
37-
#self.embedding = paddle.nn.Embedding(
38-
# self.dict_dim,
39-
# self.emb_dim,
40-
# sparse=True,
41-
# weight_attr=paddle.ParamAttr(
42-
# name="embedding",
43-
# initializer=paddle.nn.initializer.XavierNormal()))
44-
#initializer=paddle.nn.initializer.Uniform()))
37+
self.entry = paddle.distributed.ShowClickEntry("show", "click")
4538

4639
sizes = [emb_dim * slot_num] + self.layer_sizes + [1]
4740
acts = ["relu" for _ in range(len(self.layer_sizes))] + [None]
@@ -70,7 +63,10 @@ def forward(self, slot_inputs):
7063
emb = paddle.static.nn.sparse_embedding(
7164
input=s_input,
7265
size=[self.dict_dim, self.emb_dim],
66+
padding_idx=0,
67+
entry=self.entry,
7368
param_attr=paddle.ParamAttr(name="embedding"))
69+
7470
self.inference_feed_vars.append(emb)
7571

7672
bow = paddle.fluid.layers.sequence_pool(input=emb, pool_type='sum')

models/rank/slot_dnn/queuedataset_reader.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,10 @@ def line_process(self, line):
7878
else:
7979
self.visit[slot] = False
8080

81+
# add show
82+
output = [("0", [1])] + output
8183
return output
82-
#return [label] + sparse_feature + [dense_feature]
84+
8385
def generate_sample(self, line):
8486
"Dataset Generator"
8587

models/rank/slot_dnn/static_model.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,17 @@ def create_feeds(self, is_infer=False):
4949
for i in range(2, self.slot_num + 2)
5050
]
5151

52+
show = paddle.static.data(
53+
name="show", shape=[None, 1], dtype="int64", lod_level=1)
5254
label = paddle.static.data(
5355
name="click", shape=[None, 1], dtype="int64", lod_level=1)
5456

55-
feeds_list = [label] + slot_ids
57+
feeds_list = [show, label] + slot_ids
5658
return feeds_list
5759

5860
def net(self, input, is_infer=False):
59-
self.label_input = input[0]
60-
self.slot_inputs = input[1:]
61+
self.label_input = input[1]
62+
self.slot_inputs = input[2:]
6163

6264
dnn_model = BenchmarkDNNLayer(
6365
self.dict_dim,
@@ -74,12 +76,23 @@ def net(self, input, is_infer=False):
7476
predict_2d = paddle.concat(x=[1 - self.predict, self.predict], axis=1)
7577
#label_int = paddle.cast(self.label, 'int64')
7678

77-
auc, batch_auc_var, self.auc_stat_list = paddle.static.auc(
79+
auc, batch_auc_var, auc_stat_list = paddle.static.auc(
7880
input=predict_2d, label=self.label_input, slide_steps=0)
79-
self.metric_list = fluid.contrib.layers.ctr_metric_bundle(
81+
metric_list = fluid.contrib.layers.ctr_metric_bundle(
8082
self.predict,
8183
fluid.layers.cast(
8284
x=self.label_input, dtype='float32'))
85+
86+
self.thread_stat_var_names = [
87+
auc_stat_list[2].name, auc_stat_list[3].name
88+
]
89+
self.thread_stat_var_names += [i.name for i in metric_list]
90+
self.thread_stat_var_names = list(set(self.thread_stat_var_names))
91+
92+
self.metric_list = list(auc_stat_list) + list(metric_list)
93+
self.metric_types = ["int64"] * len(auc_stat_list) + ["float32"] * len(
94+
metric_list)
95+
8396
self.inference_feed_vars = dnn_model.inference_feed_vars
8497
self.inference_target_var = self.predict
8598

tools/feature_importance.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ def run_offline_infer(self):
134134

135135
self.exe.run(paddle.static.default_startup_program())
136136
fleet.init_worker()
137-
fleet.load_model(init_model_path, mode=0)
137+
if fleet.is_first_worker():
138+
fleet.load_model(init_model_path, mode=0)
139+
fleet.barrier_worker()
138140

139141
logger.info("Prepare Dataset Begin.")
140142
prepare_data_start_time = time.time()

tools/static_ps_offline_infer.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def parse_args():
4646
args = parser.parse_args()
4747
args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml))
4848
yaml_helper = YamlHelper()
49-
config = yaml_helper.load_yaml(args.config_yaml)
49+
config = yaml_helper.load_yaml(args.config_yaml, ["table_parameters"])
5050
config["yaml_path"] = args.config_yaml
5151
config["config_abs_dir"] = args.abs_dir
5252
yaml_helper.print_yaml(config)
@@ -121,7 +121,10 @@ def run_offline_infer(self):
121121
fleet.init_worker()
122122

123123
init_model_path = config.get("runner.init_model_path")
124-
fleet.load_model(init_model_path, mode=0)
124+
model_mode = config.get("runner.model_mode", 0)
125+
if fleet.is_first_worker():
126+
fleet.load_model(init_model_path, mode=model_mode)
127+
fleet.barrier_worker()
125128

126129
logger.info("Prepare Dataset Begin.")
127130
prepare_data_start_time = time.time()
@@ -147,7 +150,6 @@ def dataset_offline_infer(self, cur_dataset):
147150
"dump_fields_path": dump_fields_path,
148151
"dump_fields": dump_fields
149152
})
150-
print(paddle.static.default_main_program()._fleet_opt)
151153

152154
self.exe.infer_from_dataset(
153155
program=paddle.static.default_main_program(),

0 commit comments

Comments
 (0)