Skip to content

Commit a9bb991

Browse files
authored
Merge pull request #811 from ziyoujiyi/fl-rec
add ncf fl-trainer
2 parents e694b2c + 0f2192e commit a9bb991

File tree

11 files changed

+495
-9
lines changed

11 files changed

+495
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pandas as pd
16+
import argparse
17+
18+
19+
def gen_heter_files(data_load_path, splitted_data_path, file_nums):
20+
data = pd.read_csv(data_load_path)
21+
total_sample_num = data.shape[0]
22+
print("total sample num is: {}".format(total_sample_num))
23+
sample_num_per_file = int(total_sample_num / file_nums)
24+
for i in range(0, file_nums):
25+
save_data = data.iloc[i * sample_num_per_file + 1:(i + 1) *
26+
sample_num_per_file + 1]
27+
file_name = splitted_data_path + '/' + str(i) + '.csv'
28+
save_data.to_csv(file_name, index=False)
29+
print("files splitted done, num is {}, saved in path: {}".format(
30+
file_nums, splitted_data_path))
31+
32+
33+
def get_zipcode_dict():
34+
filename = '/home/wangbin/the_one_ps/ziyoujiyi_PaddleRec/MovieLens-1M/ml-1m/users.dat'
35+
zipcode_dict = {}
36+
with open(filename, "r") as f:
37+
line = f.readline()
38+
while line != None and line != "":
39+
arr = line.split("::")
40+
user_id, sex, age, occupation, zip_code = int(arr[0]), str(arr[
41+
1]), int(arr[2]), int(arr[3]), str(arr[4])
42+
zip_code = int(zip_code[0:5])
43+
zipcode_dict[user_id] = zip_code
44+
line = f.readline()
45+
return zipcode_dict
46+
47+
48+
def shuffle_data_by_zipcode(data_load_path, splitted_data_path, file_nums,
49+
zipcode_dict):
50+
data = pd.read_csv(data_load_path)
51+
total_sample_num = data.shape[0]
52+
print("total sample num is: {}".format(total_sample_num))
53+
data_list = data.values.tolist()
54+
sharded_data = [(idx, []) for idx in range(10)]
55+
for data_row in data_list:
56+
user_id = data_row[0]
57+
zipcode = zipcode_dict[user_id + 1]
58+
shard_id = int(zipcode / 10000)
59+
sharded_data[shard_id][1].extend([data_row])
60+
for (shard_id, sample) in sharded_data:
61+
print("zipcode start with {}: {}".format(shard_id, len(sample)))
62+
file_name = splitted_data_path + '/' + str(shard_id) + '.csv'
63+
d = pd.DataFrame(data=sample)
64+
d.to_csv(file_name, index=False)
65+
print("files splitted by zipcode done, saved in path: {}".format(
66+
splitted_data_path))
67+
68+
69+
def parse_args():
70+
parser = argparse.ArgumentParser(description="Run GMF.")
71+
parser.add_argument(
72+
'--full_train_data_path',
73+
type=str,
74+
default="../big_train/train_data.csv",
75+
help='full_train_data_path')
76+
parser.add_argument(
77+
'--splitted_data_path',
78+
type=str,
79+
default="fl_train_data",
80+
help='splitted_data_path')
81+
parser.add_argument(
82+
'--file_nums', type=int, default='10', help='fl clients num')
83+
return parser.parse_args()
84+
85+
86+
if __name__ == '__main__':
87+
args = parse_args()
88+
#gen_heter_files(args.full_train_data_path, args.splitted_data_path, args.file_nums)
89+
zipcode_dict = get_zipcode_dict()
90+
shuffle_data_by_zipcode(args.full_train_data_path, args.splitted_data_path,
91+
args.file_nums, zipcode_dict)

models/rank/dnn/config.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ runner:
3737
sync_mode: "async"
3838
split_file_list: False
3939
thread_num: 1
40+
reader_type: "QueueDataset" # DataLoader / QueueDataset / RecDataset
41+
pipe_command: "python queuedataset_reader.py" # QueueDataset 模式下的数据pipe命令
42+
dataset_debug: False # QueueDataset 模式下 Profiler 开关
4043

4144

4245
# hyper parameters of user-defined network

models/rank/slot_dnn/config_queuedataset.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
runner:
2020
train_data_dir: "data/"
2121
train_reader_path: "criteo_reader" # importlib format
22+
sync_mode: "async"
2223
use_gpu: False
2324
use_auc: True
2425
train_batch_size: 2

models/recall/ncf/config_fl.yaml

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
runner:
16+
sync_mode: "geo" # 可选, string: sync/async/geo
17+
#with_coodinator: 1
18+
geo_step: 100 # 可选, int, 在geo模式下控制本地的迭代次数
19+
split_file_list: True # 可选, bool, 若每个节点上都拥有全量数据,则需设置为True
20+
thread_num: 1 # 多线程配置
21+
22+
# reader类型,分布式下推荐QueueDataset
23+
reader_type: "QueueDataset" # DataLoader / QueueDataset / RecDataset
24+
pipe_command: "python queuedataset_reader.py" # QueueDataset 模式下的数据pipe命令
25+
dataset_debug: False # QueueDataset 模式下 Profiler 开关
26+
27+
train_data_dir: "../../../datasets/movielens_pinterest_NCF/fl_data/fl_train_data"
28+
train_reader_path: "movielens_reader" # importlib format
29+
train_batch_size: 512
30+
model_save_path: "output_model_ncf"
31+
32+
use_gpu: False
33+
epochs: 2
34+
print_interval: 50
35+
36+
test_data_dir: "../../../datasets/movielens_pinterest_NCF/fl_data/fl_test_data"
37+
infer_reader_path: "movielens_reader" # importlib format
38+
infer_batch_size: 1
39+
infer_load_path: "output_model_ncf"
40+
infer_start_epoch: 2
41+
infer_end_epoch: 3
42+
43+
need_dump: True
44+
dump_fields_path: "/home/wangbin/the_one_ps/ziyoujiyi_PaddleRec/PaddleRec/models/recall/ncf"
45+
dump_fields: ['item_input', 'user_input']
46+
dump_param: []
47+
local_sparse: ['embedding_0.w_0']
48+
remote_sparse: ['embedding_1.w_0']
49+
50+
hyper_parameters:
51+
optimizer:
52+
class: adam
53+
learning_rate: 0.001
54+
num_users: 6040
55+
num_items: 3706
56+
mf_dim: 8
57+
mode: "NCF_MLP" # optional: NCF_NeuMF, NCF_GMF, NCF_MLP
58+
fc_layers: [64, 32, 16, 8]

models/recall/ncf/fl_ps_help.md

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# 1、功能介绍
2+
基于 GEO-PS 实现的 FL-PS,支持 Coordinator:
3+
* 构造 worker 上异构样本数据
4+
* 每一轮训练(Epoch)时,打印训练指标(loss、auc)
5+
* 每一轮训练之后,用测试集数据推理
6+
7+
# 2、样本准备
8+
* 在 PaddleRec/datasets/movielens_pinterest_NCF 目录中执行: sh run.sh,获取初步处理过的训练数据(big_train)和测试数据(test_data)
9+
* 从 MovieLens 官网下载 ml-1m 数据集,获取 user.dat 文件(可自定义存储路径,但需要和 gen_heter_data.py 脚本中路径保持一致),后续用于构造异构数据集(按 zipcode 的首位数字划分)
10+
* 在 PaddleRec/datasets/movielens_pinterest_NCF/fl_data 中新建目录 fl_test_data 和 fl_train_data,用于存放每个 client 上的训练数据集和测试数据集
11+
* 在 PaddleRec/datasets/movielens_pinterest_NCF/fl_data 目录中执行: python gen_heter_data.py,生成 10 份训练数据
12+
* 总样本数 4970844(按 1:4 补充负样本):0 - 518095,1 - 520165,2 - 373605,3 - 315550,4 - 483779,5 - 495635,6 - 402810,7 - 354590,8 - 262710,9 - 1243905
13+
* 样本数据每一行表示:物品 id,用户 id,标签
14+
15+
# 3、运行命令
16+
1. 不带 coordinator 版本
17+
* 在本文件所在的目录下执行:fleetrun --worker_num=10 --server_num=1 ../../../tools/static_fl_trainer.py -m config_fl.yaml
18+
2. 带 coordinator 版本
19+
* 在本文件所在的目录下执行:fleetrun --worker_num=10 --server_num=1 --coordinator_num=1 ../../../tools/static_fl_trainer.py -m config_fl.yaml
20+
(可参考 fl_run.sh 文件)
21+
22+
# 4、二次开发
23+
## 系统层面
24+
1. 代码 repo
25+
* Paddle: https://github.com/ziyoujiyi/Paddle/tree/fl_ps
26+
* PaddleRec:https://github.com/ziyoujiyi/PaddleRec/tree/fl-rec
27+
2. 编译、安装
28+
```
29+
1)去 https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/develop/install/compile/linux-compile.html 找到 develop/Linux/源码编译/CPU/ 的开发镜像,在 docker 中开发
30+
2)在 Paddle 根目录下,新建 build 目录
31+
3)cd build
32+
4)cmake .. -DPY_VERSION=3.7 -DWITH_GPU=OFF -DCMAKE_BUILD_TYPE=Release -DWITH_DISTRIBUTE=ON -DWITH_PSCORE=ON -WITH_AVX=OFF -DWITH_TESTING=OFF -DWITH_FLPS=ON
33+
5) make -j
34+
6)python -m pip install python/dist/paddlepaddle-0.0.0-cp37-cp37m-linux_x86_64.whl -U
35+
```
36+
3. 用户二次开发模块
37+
* Paddle:
38+
* Paddle/python/paddle/distributed/ps/coordinator.py
39+
* 模型组网文件参考:PaddleRec/models/recall/ncf/net.py,用户如果新增组网文件,用前缀 "fl_" 标识
40+
* 数据集:如果 PaddleRec 中已经有的,直接在对应目录下新增 fl_test_data 和 fl_test_train 目录;如果 PaddleRec 中没有,用户在 PaddleRec/datasets 中新增
41+
* 用户自定义异构数据集构造,参考 gen_heter_data.py
42+
* 构造模型输入请参考:PaddleRec/models/recall/ncf/queuedataset_reader.py
43+
4. 编码规范
44+
* 风格检查:pip install pre-commit && 在 git 根目录下执行:pre-commit install
45+
* 遵循已有风格
46+
47+
48+
## 策略层面
49+
1. 边缘任务调度策略
50+
* 用户组网:DDPG
51+
* 用户 python 端调用 _pull_dense 接口从 ps 端拉取 dense 参数,然后从 scope 里读
52+
* 用户确定每轮训练之后 client 需要上传给 coordinator 的各个参数(字段)
53+
2. 新的损失函数设计
54+
* 直接使用不带 coordinator 版本的训练脚本
55+
3. 知识蒸馏
56+
* 用户训练 student 模型,打印 logits 结果,并上传到 coordinator,coordinator 端进行 teacher 模型训练
57+
* coordinator 下发全局软目标
58+
4. 模型压缩

models/recall/ncf/fl_run.sh

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
ps -ef | grep python | awk '{print $2}' | xargs kill -9
2+
3+
#fleetrun --worker_num=10 --server_num=1 ../../../tools/static_fl_trainer.py -m config_fl.yaml
4+
#fleetrun --worker_num=10 --server_num=1 --coordinator_num=1 ../../../tools/static_fl_trainer.py -m config_fl.yaml
5+
fleetrun --worker_num=10 --workers="127.0.0.1:9000,127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,127.0.0.1:9007,127.0.0.1:9008,127.0.0.1:9009" --server_num=1 --servers="127.0.0.1:10000" --coordinator_num=1 --coordinators="127.0.0.1:10001" ../../../tools/static_fl_trainer.py -m config_fl.yaml
+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import sys
15+
import yaml
16+
import six
17+
import os
18+
import copy
19+
import paddle.distributed.fleet as fleet
20+
import logging
21+
import numpy as np
22+
23+
logging.basicConfig(
24+
format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class Reader(fleet.MultiSlotDataGenerator):
29+
def init(self, config):
30+
self.slots = ['user_id', 'item_id', 'label']
31+
logger.info("pipe init success")
32+
33+
def line_process(self, line):
34+
features = line.strip().split(',')
35+
user_input = [int(features[0])]
36+
item_input = [int(features[1])]
37+
label = [int(features[2])]
38+
output_list = [(i, []) for i in self.slots]
39+
output_list[0][1].extend(user_input)
40+
output_list[1][1].extend(item_input)
41+
output_list[2][1].extend(label)
42+
return output_list
43+
44+
def generate_sample(self, line):
45+
r"Dataset Generator"
46+
47+
def reader():
48+
output_dict = self.line_process(line)
49+
yield output_dict
50+
51+
return reader
52+
53+
54+
if __name__ == "__main__":
55+
yaml_path = sys.argv[1]
56+
utils_path = sys.argv[2]
57+
sys.path.append(utils_path)
58+
import common
59+
yaml_helper = common.YamlHelper()
60+
config = yaml_helper.load_yaml(yaml_path)
61+
62+
r = Reader()
63+
r.init(config)
64+
r.run_from_stdin()

models/recall/ncf/static_model.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,22 @@ def net(self, input, is_infer=False):
5656
self.mf_dim, self.layers)
5757

5858
prediction = ncf_model.forward(input)
59+
predict_2d = paddle.concat(x=[1 - prediction, prediction], axis=1)
60+
label_input = input[2]
61+
62+
auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos,
63+
stat_neg] = paddle.static.auc(input=predict_2d,
64+
label=label_input,
65+
num_thresholds=2**12,
66+
slide_steps=0)
5967

6068
self.inference_target_var = prediction
6169
if is_infer:
6270
fetch_dict = {
6371
"user": input[0],
6472
'prediction': prediction,
65-
"label": input[2]
73+
"label": input[2],
74+
'auc': auc
6675
}
6776
return fetch_dict
6877
cost = F.log_loss(
@@ -71,7 +80,7 @@ def net(self, input, is_infer=False):
7180
avg_cost = paddle.mean(x=cost)
7281
# print(avg_cost)
7382
self._cost = avg_cost
74-
fetch_dict = {'Loss': avg_cost}
83+
fetch_dict = {'Loss': avg_cost, 'Auc': auc}
7584
return fetch_dict
7685

7786
def create_optimizer(self, strategy=None):

0 commit comments

Comments
 (0)