diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..445ef0ac3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*.o +output +.idea/ +paddlerec.egg-info/ +*~ +*.pyc diff --git a/README.md b/README.md index 3dfbf8d39..74fdd2ce6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,219 @@ -# PaddleRec -推荐算法,大规模并行训练支持 +

+ +

+ +

+
+ Release + License + Slack +
+

+ + +

什么是PaddleRec

+ +

+ +

+ +- 源于飞桨生态的搜索推荐模型**一站式开箱即用工具** +- 适合初学者,开发者,研究者从调研,训练到预测部署的全流程解决方案 +- 包含语义理解、召回、粗排、精排、多任务学习、融合等多个任务的推荐搜索算法库 +- 配置**yaml**自定义选项,即可快速上手使用单机训练、大规模分布式训练、离线预测、在线部署 + + +

PadlleRec概览

+ +

+ +

+ + +

推荐系统-流程概览

+ +

+ +

+ +

便捷安装

+ +### 环境要求 +* Python 2.7/ 3.5 / 3.6 / 3.7 +* PaddlePaddle >= 1.7.2 +* 操作系统: Windows/Mac/Linux + + > Windows下目前仅提供单机训练,建议使用Linux + +### 安装命令 + +- 安装方法一: + ```bash + python -m pip install paddle-rec + ``` + +- 安装方法二 + + 源码编译安装 + 1. 安装飞桨 **注:需要用户安装版本 >1.7.2 的飞桨** + + ```shell + python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple + ``` + + 2. 源码安装PaddleRec + + ``` + git clone https://github.com/PaddlePaddle/PaddleRec/ + cd PaddleRec + python setup.py install + ``` + + +

快速启动

+ +### 启动内置模型的默认配置 + +目前框架内置了多个模型,一行命令即可使用内置模型开始单机训练和本地模拟分布式训练。 + > 本地模拟分布式(`local_cluster`)为`1个server + 1个trainer`的参数服务器模式 + + +我们以排序模型中的`dnn`模型为例介绍PaddleRec的简单使用。训练数据来源为[Criteo数据集](https://www.kaggle.com/c/criteo-display-ad-challenge/),我们从中截取了100条方便您快速上手体验完整的PaddleRec流程。 + +```bash +# 使用CPU进行单机训练 +python -m paddlerec.run -m paddlerec.models.rank.dnn +``` + +### 启动内置模型的自定配置 + +若您复用内置模型,对**yaml**配置文件进行了修改,如更改超参,重新配置数据后,可以直接使用paddlerec运行该yaml文件。 + +我们以dnn模型为例,在paddlerec代码目录下: +```bash +cd paddlerec +``` + +修改dnn模型的[超参配置](./models/rank/dnn/config.yaml),例如将迭代训练轮数从10轮修改为5轮: +```yaml +train: + # epochs: 10 + epochs: 5 +``` + +在Linux环境下,可以使用`vim`等文本编辑工具修改yaml文件: + +```bash +vim ./models/rank/dnn/config.yaml +# 键入 i, 进入编辑模式 +# 修改yaml文件配置 +# 完成修改后,点击esc,退出编辑模式 +# 键入 :wq 保存文件并退出 +``` + +完成dnn模型`models/rank/dnn/config.yaml`的配置修改后,运行`dnn`模型: +```bash +# 使用自定配置进行训练 +python -m paddlerec.run -m ./models/rank/dnn/config.yaml +``` + +### 分布式训练 + +分布式训练需要配置`config.yaml`,加入或修改`engine`选项为`cluster`或`local_cluster`,以进行分布式训练,或本地模拟分布式训练。 + +#### 本地模拟分布式训练 + +我们以dnn模型为例,在paddlerec代码目录下,修改dnn模型的`config.yaml`文件: + +```yaml +train: + #engine: single + engine: local_cluster +``` +然后启动paddlerec训练: + +```bash +# 进行本地模拟分布式训练 +python -m paddlerec.run -m ./models/rank/dnn/config.yaml +``` + +#### 集群分布式训练 + +我们以dnn模型为例,在paddlerec代码目录下,首先修改dnn模型`config.yaml`文件: + +```yaml +train: + #engine: single + engine: cluster +``` +再添加分布式启动配置文件`backend.yaml`,具体配置规则在[分布式训练](doc/distributed_train.md)教程中介绍。最后启动paddlerec训练: + +```bash +# 配置好 mpi/k8s/paddlecloud集群环境后 +python -m paddlerec.run -m ./models/rank/dnn/config.yaml -b backend.yaml +``` + + +

支持模型列表

+ + +| 方向 | 模型 | 单机CPU训练 | 单机GPU训练 | 分布式CPU训练 | +| :------: | :-----------------------------------------------------------------------: | :---------: | :---------: | :-----------: | +| 内容理解 | [Text-Classifcation](models/contentunderstanding/classification/model.py) | ✓ | x | ✓ | +| 内容理解 | [TagSpace](models/contentunderstanding/tagspace/model.py) | ✓ | x | ✓ | +| 召回 | [DSSM](models/match/dssm/model.py) | ✓ | x | ✓ | +| 召回 | [MultiView-Simnet](models/match/multiview-simnet/model.py) | ✓ | x | ✓ | +| 召回 | [TDM](models/treebased/tdm/model.py) | ✓ | x | ✓ | +| 召回 | [Word2Vec](models/recall/word2vec/model.py) | ✓ | x | ✓ | +| 召回 | [SSR](models/recall/ssr/model.py) | ✓ | ✓ | ✓ | +| 召回 | [Gru4Rec](models/recall/gru4rec/model.py) | ✓ | ✓ | ✓ | +| 召回 | [Youtube_dnn](models/recall/youtube_dnn/model.py) | ✓ | ✓ | ✓ | +| 召回 | [NCF](models/recall/ncf/model.py) | ✓ | ✓ | ✓ | +| 排序 | [Dnn](models/rank/dnn/model.py) | ✓ | x | ✓ | +| 排序 | [DeepFM](models/rank/deepfm/model.py) | ✓ | x | ✓ | +| 排序 | [xDeepFM](models/rank/xdeepfm/model.py) | ✓ | x | ✓ | +| 排序 | [DIN](models/rank/din/model.py) | ✓ | x | ✓ | +| 排序 | [Wide&Deep](models/rank/wide_deep/model.py) | ✓ | x | ✓ | +| 多任务 | [ESMM](models/multitask/esmm/model.py) | ✓ | ✓ | ✓ | +| 多任务 | [MMOE](models/multitask/mmoe/model.py) | ✓ | ✓ | ✓ | +| 多任务 | [ShareBottom](models/multitask/share-bottom/model.py) | ✓ | ✓ | ✓ | + + + + +

文档

+ +### 背景介绍 +* [推荐系统介绍](doc/rec_background.md) +* [分布式深度学习介绍](doc/ps_background.md) + +### 新手教程 +* [环境要求](#环境要求) +* [安装命令](#安装命令) +* [快速开始](#启动内置模型的默认配置) + +### 进阶教程 +* [自定义数据集及Reader](doc/custom_dataset_reader.md) +* [分布式训练](doc/distributed_train.md) + +### 开发者教程 +* [PaddleRec设计文档](doc/design.md) + +### 关于PaddleRec性能 +* [Benchmark](doc/benchmark.md) + +### FAQ +* [常见问题FAQ](doc/faq.md) + + +

社区

+ +### 反馈 +如有意见、建议及使用中的BUG,欢迎在`GitHub Issue`提交 + +### 版本历史 +- 2020.5.14 - PaddleRec v0.1 + +### 许可证书 +本项目的发布受[Apache 2.0 license](LICENSE)许可认证。 + diff --git a/__init__.py b/__init__.py new file mode 100755 index 000000000..abf198b97 --- /dev/null +++ b/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/__init__.py b/core/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/core/engine/__init__.py b/core/engine/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/core/engine/cluster/__init__.py b/core/engine/cluster/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/core/engine/cluster/cloud/__init__.py b/core/engine/cluster/cloud/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/core/engine/cluster/cloud/cluster.sh b/core/engine/cluster/cloud/cluster.sh new file mode 100644 index 000000000..1a0605fd9 --- /dev/null +++ b/core/engine/cluster/cloud/cluster.sh @@ -0,0 +1,95 @@ +#!/bin/bash +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +################################################### +# Usage: submit.sh +# Description: run mpi submit client implement +################################################### + +# ---------------------------------------------------------------------------- # +# variable define # +# ---------------------------------------------------------------------------- # + +#----------------------------------------------------------------------------------------------------------------- +#fun : package +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function package_hook() { + g_run_stage="package" + package +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : before hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function _before_submit() { + echo "before_submit" + before_submit_hook +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : after hook submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function _after_submit() { + echo "after_submit" + after_submit_hook +} + +#----------------------------------------------------------------------------------------------------------------- +#fun : submit to cluster +#param : N/A +#return : 0 -- success; not 0 -- failure +#----------------------------------------------------------------------------------------------------------------- +function _submit() { + g_run_stage="submit" + + cd ${engine_temp_path} + + paddlecloud job --ak ${engine_submit_ak} --sk ${engine_submit_sk} train --cluster-name ${engine_submit_cluster} \ + --job-version ${engine_submit_version} \ + --mpi-priority ${engine_submit_priority} \ + --mpi-wall-time 300:59:00 \ + --mpi-nodes ${engine_submit_nodes} --is-standalone 0 \ + --mpi-memory 110Gi \ + --job-name ${engine_submit_jobname} \ + --start-cmd "${g_run_cmd}" \ + --group-name ${engine_submit_group} \ + --job-conf ${engine_submit_config} \ + --files ${g_submitfiles} \ + --json + + cd - +} + +function submit_hook() { + _before_submit + _submit + _after_submit +} + +function main() { + source ${engine_submit_scrpit} + + package_hook + submit_hook +} + +main diff --git a/core/engine/cluster/cluster.py b/core/engine/cluster/cluster.py new file mode 100644 index 000000000..6dfcec3a9 --- /dev/null +++ b/core/engine/cluster/cluster.py @@ -0,0 +1,60 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from __future__ import unicode_literals + +import copy +import os +import subprocess + +from paddlerec.core.engine.engine import Engine +from paddlerec.core.factory import TrainerFactory +from paddlerec.core.utils import envs + + +class ClusterEngine(Engine): + def __init_impl__(self): + abs_dir = os.path.dirname(os.path.abspath(__file__)) + backend = envs.get_runtime_environ("engine_backend") + if backend == "PaddleCloud": + self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh") + else: + raise ValueError("{} can not be supported now".format(backend)) + + def start_worker_procs(self): + trainer = TrainerFactory.create(self.trainer) + trainer.run() + + def start_master_procs(self): + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + cmd = ("bash {}".format(self.submit_script)).split(" ") + proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) + proc.wait() + + def run(self): + role = envs.get_runtime_environ("engine_role") + + if role == "MASTER": + self.start_master_procs() + + elif role == "WORKER": + self.start_worker_procs() + + else: + raise ValueError("role {} error, must in MASTER/WORKER".format(role)) diff --git a/core/engine/engine.py b/core/engine/engine.py new file mode 100755 index 000000000..492bf8e1c --- /dev/null +++ b/core/engine/engine.py @@ -0,0 +1,31 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class Engine: + __metaclass__ = abc.ABCMeta + + def __init__(self, envs, trainer): + self.envs = envs + self.trainer = trainer + self.__init_impl__() + + def __init_impl__(self): + pass + + @abc.abstractmethod + def run(self): + pass diff --git a/core/engine/local_cluster.py b/core/engine/local_cluster.py new file mode 100755 index 000000000..4cf614f02 --- /dev/null +++ b/core/engine/local_cluster.py @@ -0,0 +1,104 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from __future__ import unicode_literals + +import copy +import os +import sys +import subprocess + +from paddlerec.core.engine.engine import Engine +from paddlerec.core.utils import envs + + +class LocalClusterEngine(Engine): + def start_procs(self): + worker_num = self.envs["worker_num"] + server_num = self.envs["server_num"] + ports = [self.envs["start_port"]] + logs_dir = self.envs["log_dir"] + + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env["CLUSTER_INSTANCE"] = "1" + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + procs = [] + log_fns = [] + + for i in range(server_num - 1): + while True: + new_port = envs.find_free_port() + if new_port not in ports: + ports.append(new_port) + break + user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) + user_endpoints_ips = [x.split(":")[0] + for x in user_endpoints.split(",")] + user_endpoints_port = [x.split(":")[1] + for x in user_endpoints.split(",")] + + factory = "paddlerec.core.factory" + cmd = [sys.executable, "-u", "-m", factory, self.trainer] + + for i in range(server_num): + current_env.update({ + "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, + "PADDLE_PORT": user_endpoints_port[i], + "TRAINING_ROLE": "PSERVER", + "PADDLE_TRAINERS_NUM": str(worker_num), + "POD_IP": user_endpoints_ips[i] + }) + + os.system("mkdir -p {}".format(logs_dir)) + fn = open("%s/server.%d" % (logs_dir, i), "w") + log_fns.append(fn) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) + procs.append(proc) + + for i in range(worker_num): + current_env.update({ + "PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints, + "PADDLE_TRAINERS_NUM": str(worker_num), + "TRAINING_ROLE": "TRAINER", + "PADDLE_TRAINER_ID": str(i) + }) + + os.system("mkdir -p {}".format(logs_dir)) + fn = open("%s/worker.%d" % (logs_dir, i), "w") + log_fns.append(fn) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) + procs.append(proc) + + # only wait worker to finish here + for i, proc in enumerate(procs): + if i < server_num: + continue + procs[i].wait() + if len(log_fns) > 0: + log_fns[i].close() + + for i in range(server_num): + if len(log_fns) > 0: + log_fns[i].close() + procs[i].terminate() + print("all workers already completed, you can view logs under the `{}` directory".format(logs_dir), + file=sys.stderr) + + def run(self): + self.start_procs() diff --git a/core/engine/local_mpi.py b/core/engine/local_mpi.py new file mode 100755 index 000000000..49db821fe --- /dev/null +++ b/core/engine/local_mpi.py @@ -0,0 +1,57 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from __future__ import unicode_literals + +import copy +import os +import sys +import subprocess + +from paddlerec.core.engine.engine import Engine + + +class LocalMPIEngine(Engine): + def start_procs(self): + logs_dir = self.envs["log_dir"] + + default_env = os.environ.copy() + current_env = copy.copy(default_env) + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + procs = [] + log_fns = [] + + factory = "paddlerec.core.factory" + cmd = "mpirun -npernode 2 -timestamp-output -tag-output".split(" ") + cmd.extend([sys.executable, "-u", "-m", factory, self.trainer]) + + if logs_dir is not None: + os.system("mkdir -p {}".format(logs_dir)) + fn = open("%s/job.log" % logs_dir, "w") + log_fns.append(fn) + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) + else: + proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) + procs.append(proc) + + for i in range(len(procs)): + if len(log_fns) > 0: + log_fns[i].close() + procs[i].wait() + print("all workers and parameter servers already completed", file=sys.stderr) + + def run(self): + self.start_procs() diff --git a/core/factory.py b/core/factory.py new file mode 100755 index 000000000..4c08f1f6b --- /dev/null +++ b/core/factory.py @@ -0,0 +1,88 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +import yaml + +from paddlerec.core.utils import envs + +trainer_abs = os.path.join(os.path.dirname( + os.path.abspath(__file__)), "trainers") +trainers = {} + + +def trainer_registry(): + trainers["SingleTrainer"] = os.path.join( + trainer_abs, "single_trainer.py") + trainers["ClusterTrainer"] = os.path.join( + trainer_abs, "cluster_trainer.py") + trainers["CtrCodingTrainer"] = os.path.join( + trainer_abs, "ctr_coding_trainer.py") + trainers["CtrModulTrainer"] = os.path.join( + trainer_abs, "ctr_modul_trainer.py") + trainers["TDMSingleTrainer"] = os.path.join( + trainer_abs, "tdm_single_trainer.py") + trainers["TDMClusterTrainer"] = os.path.join( + trainer_abs, "tdm_cluster_trainer.py") + + +trainer_registry() + + +class TrainerFactory(object): + def __init__(self): + pass + + @staticmethod + def _build_trainer(yaml_path): + print(envs.pretty_print_envs(envs.get_global_envs())) + + train_mode = envs.get_trainer() + trainer_abs = trainers.get(train_mode, None) + + if trainer_abs is None: + if not os.path.isfile(train_mode): + raise IOError( + "trainer {} can not be recognized".format(train_mode)) + trainer_abs = train_mode + train_mode = "UserDefineTrainer" + + trainer_class = envs.lazy_instance_by_fliename(trainer_abs, train_mode) + trainer = trainer_class(yaml_path) + return trainer + + @staticmethod + def create(config): + _config = None + if os.path.isfile(config): + with open(config, 'r') as rb: + _config = yaml.load(rb.read(), Loader=yaml.FullLoader) + else: + raise ValueError("paddlerec's config only support yaml") + + envs.set_global_envs(_config) + envs.update_workspace() + + trainer = TrainerFactory._build_trainer(config) + return trainer + + +# server num, worker num +if __name__ == "__main__": + if len(sys.argv) != 2: + raise ValueError("need a yaml file path argv") + trainer = TrainerFactory.create(sys.argv[1]) + trainer.run() diff --git a/core/layer.py b/core/layer.py new file mode 100755 index 000000000..07c058c77 --- /dev/null +++ b/core/layer.py @@ -0,0 +1,32 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class Layer(object): + """R + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + """R + """ + pass + + @abc.abstractmethod + def generate(self, param): + """R + """ + pass diff --git a/core/metric.py b/core/metric.py new file mode 100755 index 000000000..e0f6b24e7 --- /dev/null +++ b/core/metric.py @@ -0,0 +1,61 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class Metric(object): + """R + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + """ """ + pass + + @abc.abstractmethod + def clear(self, scope, params): + """ + clear current value + Args: + scope: value container + params: extend varilable for clear + """ + pass + + @abc.abstractmethod + def calculate(self, scope, params): + """ + calculate result + Args: + scope: value container + params: extend varilable for clear + """ + pass + + @abc.abstractmethod + def get_result(self): + """ + Return: + result(dict) : calculate result + """ + pass + + @abc.abstractmethod + def __str__(self): + """ + Return: + result(string) : calculate result with string format, for output + """ + pass diff --git a/core/metrics/__init__.py b/core/metrics/__init__.py new file mode 100755 index 000000000..abf198b97 --- /dev/null +++ b/core/metrics/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/metrics/auc_metrics.py b/core/metrics/auc_metrics.py new file mode 100755 index 000000000..5dd16cc07 --- /dev/null +++ b/core/metrics/auc_metrics.py @@ -0,0 +1,211 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math + +import numpy as np +import paddle.fluid as fluid + +from paddlerec.core.metric import Metric + + +class AUCMetric(Metric): + """ + Metric For Paddle Model + """ + + def __init__(self, config, fleet): + """ """ + self.config = config + self.fleet = fleet + + def clear(self, scope, params): + """ + Clear current metric value, usually set to zero + Args: + scope : paddle runtime var container + params(dict) : + label : a group name for metric + metric_dict : current metric_items in group + Return: + None + """ + self._label = params['label'] + self._metric_dict = params['metric_dict'] + self._result = {} + place = fluid.CPUPlace() + for metric_name in self._metric_dict: + metric_config = self._metric_dict[metric_name] + if scope.find_var(metric_config['var'].name) is None: + continue + metric_var = scope.var(metric_config['var'].name).get_tensor() + data_type = 'float32' + if 'data_type' in metric_config: + data_type = metric_config['data_type'] + data_array = np.zeros(metric_var._get_dims()).astype(data_type) + metric_var.set(data_array, place) + + def get_metric(self, scope, metric_name): + """ + reduce metric named metric_name from all worker + Return: + metric reduce result + """ + metric = np.array(scope.find_var(metric_name).get_tensor()) + old_metric_shape = np.array(metric.shape) + metric = metric.reshape(-1) + global_metric = np.copy(metric) * 0 + self.fleet._role_maker._node_type_comm.Allreduce(metric, global_metric) + global_metric = global_metric.reshape(old_metric_shape) + return global_metric[0] + + def get_global_metrics(self, scope, metric_dict): + """ + reduce all metric in metric_dict from all worker + Return: + dict : {matric_name : metric_result} + """ + self.fleet._role_maker._barrier_worker() + result = {} + for metric_name in metric_dict: + metric_item = metric_dict[metric_name] + if scope.find_var(metric_item['var'].name) is None: + result[metric_name] = None + continue + result[metric_name] = self.get_metric(scope, metric_item['var'].name) + return result + + def calculate_auc(self, global_pos, global_neg): + """R + """ + num_bucket = len(global_pos) + area = 0.0 + pos = 0.0 + neg = 0.0 + new_pos = 0.0 + new_neg = 0.0 + total_ins_num = 0 + for i in range(num_bucket): + index = num_bucket - 1 - i + new_pos = pos + global_pos[index] + total_ins_num += global_pos[index] + new_neg = neg + global_neg[index] + total_ins_num += global_neg[index] + area += (new_neg - neg) * (pos + new_pos) / 2 + pos = new_pos + neg = new_neg + auc_value = None + if pos * neg == 0 or total_ins_num == 0: + auc_value = 0.5 + else: + auc_value = area / (pos * neg) + return auc_value + + def calculate_bucket_error(self, global_pos, global_neg): + """R + """ + num_bucket = len(global_pos) + last_ctr = -1.0 + impression_sum = 0.0 + ctr_sum = 0.0 + click_sum = 0.0 + error_sum = 0.0 + error_count = 0.0 + click = 0.0 + show = 0.0 + ctr = 0.0 + adjust_ctr = 0.0 + relative_error = 0.0 + actual_ctr = 0.0 + relative_ctr_error = 0.0 + k_max_span = 0.01 + k_relative_error_bound = 0.05 + for i in range(num_bucket): + click = global_pos[i] + show = global_pos[i] + global_neg[i] + ctr = float(i) / num_bucket + if abs(ctr - last_ctr) > k_max_span: + last_ctr = ctr + impression_sum = 0.0 + ctr_sum = 0.0 + click_sum = 0.0 + impression_sum += show + ctr_sum += ctr * show + click_sum += click + if impression_sum == 0: + continue + adjust_ctr = ctr_sum / impression_sum + if adjust_ctr == 0: + continue + relative_error = \ + math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum)) + if relative_error < k_relative_error_bound: + actual_ctr = click_sum / impression_sum + relative_ctr_error = abs(actual_ctr / adjust_ctr - 1) + error_sum += relative_ctr_error * impression_sum + error_count += impression_sum + last_ctr = -1 + + bucket_error = error_sum / error_count if error_count > 0 else 0.0 + return bucket_error + + def calculate(self, scope, params): + """ """ + self._label = params['label'] + self._metric_dict = params['metric_dict'] + self.fleet._role_maker._barrier_worker() + result = self.get_global_metrics(scope, self._metric_dict) + if result['total_ins_num'] == 0: + self._result = result + self._result['auc'] = 0 + self._result['bucket_error'] = 0 + self._result['actual_ctr'] = 0 + self._result['predict_ctr'] = 0 + self._result['mae'] = 0 + self._result['rmse'] = 0 + self._result['copc'] = 0 + self._result['mean_q'] = 0 + return self._result + if 'stat_pos' in result and 'stat_neg' in result: + result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) + result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) + if 'pos_ins_num' in result: + result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num'] + if 'abserr' in result: + result['mae'] = result['abserr'] / result['total_ins_num'] + if 'sqrerr' in result: + result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num']) + if 'prob' in result: + result['predict_ctr'] = result['prob'] / result['total_ins_num'] + if abs(result['predict_ctr']) > 1e-6: + result['copc'] = result['actual_ctr'] / result['predict_ctr'] + + if 'q' in result: + result['mean_q'] = result['q'] / result['total_ins_num'] + self._result = result + return result + + def get_result(self): + """ """ + return self._result + + def __str__(self): + """ """ + result = self.get_result() + result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f " \ + "Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \ + (self._label, result['auc'], result['bucket_error'], result['mae'], result['rmse'], + result['actual_ctr'], + result['predict_ctr'], result['copc'], result['mean_q'], result['total_ins_num']) + return result_str diff --git a/core/model.py b/core/model.py new file mode 100755 index 000000000..212db44c8 --- /dev/null +++ b/core/model.py @@ -0,0 +1,129 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +import paddle.fluid as fluid + +from paddlerec.core.utils import envs + + +class Model(object): + """R + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + """R + """ + self._cost = None + self._metrics = {} + self._data_var = [] + self._infer_data_var = [] + self._infer_results = {} + self._data_loader = None + self._infer_data_loader = None + self._fetch_interval = 20 + self._namespace = "train.model" + self._platform = envs.get_platform() + + def _init_slots(self): + sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader") + dense_slots = envs.get_global_env("dense_slots", None, "train.reader") + + if sparse_slots is not None or dense_slots is not None: + sparse_slots = sparse_slots.strip().split(" ") + dense_slots = dense_slots.strip().split(" ") + dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots] + dense_slots = [i.split(":")[0] for i in dense_slots] + self._dense_data_var = [] + for i in range(len(dense_slots)): + l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32") + self._data_var.append(l) + self._dense_data_var.append(l) + self._sparse_data_var = [] + for name in sparse_slots: + l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64") + self._data_var.append(l) + self._sparse_data_var.append(l) + + dataset_class = envs.get_global_env("dataset_class", None, "train.reader") + if dataset_class == "DataLoader": + self._init_dataloader() + + def _init_dataloader(self): + self._data_loader = fluid.io.DataLoader.from_generator( + feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False) + + def get_inputs(self): + return self._data_var + + def get_infer_inputs(self): + return self._infer_data_var + + def get_infer_results(self): + return self._infer_results + + def get_avg_cost(self): + """R + """ + return self._cost + + def get_metrics(self): + """R + """ + return self._metrics + + def get_fetch_period(self): + return self._fetch_interval + + def _build_optimizer(self, name, lr): + name = name.upper() + optimizers = ["SGD", "ADAM", "ADAGRAD"] + if name not in optimizers: + raise ValueError( + "configured optimizer can only supported SGD/Adam/Adagrad") + + if name == "SGD": + reg = envs.get_global_env( + "hyper_parameters.reg", 0.0001, self._namespace) + optimizer_i = fluid.optimizer.SGD( + lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) + elif name == "ADAM": + optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True) + elif name == "ADAGRAD": + optimizer_i = fluid.optimizer.Adagrad(lr) + else: + raise ValueError( + "configured optimizer can only supported SGD/Adam/Adagrad") + + return optimizer_i + + def optimizer(self): + learning_rate = envs.get_global_env( + "hyper_parameters.learning_rate", None, self._namespace) + optimizer = envs.get_global_env( + "hyper_parameters.optimizer", None, self._namespace) + print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) + return self._build_optimizer(optimizer, learning_rate) + + @abc.abstractmethod + def train_net(self): + """R + """ + pass + + @abc.abstractmethod + def infer_net(self): + pass diff --git a/core/modules/__init__.py b/core/modules/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/core/modules/coding/__init__.py b/core/modules/coding/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/core/modules/coding/layers.py b/core/modules/coding/layers.py new file mode 100755 index 000000000..e69de29bb diff --git a/core/modules/modul/__init__.py b/core/modules/modul/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/core/modules/modul/build.py b/core/modules/modul/build.py new file mode 100755 index 000000000..0263cbf60 --- /dev/null +++ b/core/modules/modul/build.py @@ -0,0 +1,203 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +import yaml + +from paddlerec.core.model import Model +from paddlerec.core.utils import table + + +def create(config): + """ + Create a model instance by config + Args: + config(dict) : desc model type and net + Return: + Model Instance + """ + model = None + if config['mode'] == 'fluid': + model = YamlModel(config) + model.train_net() + return model + + +class YamlModel(Model): + """R + """ + + def __init__(self, config): + """R + """ + Model.__init__(self, config) + self._config = config + self._name = config['name'] + f = open(config['layer_file'], 'r') + self._build_nodes = yaml.safe_load(f.read()) + self._build_phase = ['input', 'param', 'summary', 'layer'] + self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}} + self._inference_meta = {'dependency': {}, 'params': {}} + + def train_net(self): + """R + build a fluid model with config + Return: + modle_instance(dict) + train_program + startup_program + inference_param : all params name list + table: table-meta to ps-server + """ + for layer in self._build_nodes['layer']: + self._build_param['inner_layer'][layer['name']] = layer + + self._build_param['table'] = {} + self._build_param['model']['train_program'] = fluid.Program() + self._build_param['model']['startup_program'] = fluid.Program() + with fluid.program_guard(self._build_param['model']['train_program'], \ + self._build_param['model']['startup_program']): + with fluid.unique_name.guard(): + for phase in self._build_phase: + if self._build_nodes[phase] is None: + continue + for node in self._build_nodes[phase]: + exec("""layer=layer.{}(node)""".format(node['class'])) + layer_output, extend_output = layer.generate(self._config['mode'], self._build_param) + self._build_param['layer'][node['name']] = layer_output + self._build_param['layer_extend'][node['name']] = extend_output + if extend_output is None: + continue + if 'loss' in extend_output: + if self._cost is None: + self._cost = extend_output['loss'] + else: + self._cost += extend_output['loss'] + if 'data_var' in extend_output: + self._data_var += extend_output['data_var'] + if 'metric_label' in extend_output and extend_output['metric_label'] is not None: + self._metrics[extend_output['metric_label']] = extend_output['metric_dict'] + + if 'inference_param' in extend_output: + inference_param = extend_output['inference_param'] + param_name = inference_param['name'] + if param_name not in self._build_param['table']: + self._build_param['table'][param_name] = {'params': []} + table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) + self._build_param['table'][param_name]['_meta'] = table_meta + self._build_param['table'][param_name]['params'] += inference_param['params'] + pass + + @classmethod + def build_optimizer(self, params): + """R + """ + optimizer_conf = params['optimizer_conf'] + strategy = None + if 'strategy' in optimizer_conf: + strategy = optimizer_conf['strategy'] + stat_var_names = [] + metrics = params['metrics'] + for name in metrics: + model_metrics = metrics[name] + stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics] + strategy['stat_var_names'] = list(set(stat_var_names)) + optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \ + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' + exec(optimizer_generator) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + return optimizer + + def dump_model_program(self, path): + """R + """ + with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout: + print >> fout, self._build_param['model']['train_program'] + with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout: + print >> fout, self._build_param['model']['startup_program'] + pass + + def shrink(self, params): + """R + """ + scope = params['scope'] + decay = params['decay'] + for param_table in self._build_param['table']: + table_id = self._build_param['table'][param_table]['_meta']._table_id + fleet.shrink_dense_table(decay, scope=scope, table_id=table_id) + + def dump_inference_program(self, inference_layer, path): + """R + """ + pass + + def dump_inference_param(self, params): + """R + """ + scope = params['scope'] + executor = params['executor'] + program = self._build_param['model']['train_program'] + for table_name, table in self._build_param['table'].items(): + fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) + for infernce_item in params['inference_list']: + params_name_list = self.inference_params(infernce_item['layer_name']) + params_var_list = [program.global_block().var(i) for i in params_name_list] + params_file_name = infernce_item['save_file_name'] + with fluid.scope_guard(scope): + if params['save_combine']: + fluid.io.save_vars(executor, "./", \ + program, vars=params_var_list, filename=params_file_name) + else: + fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list) + + def inference_params(self, inference_layer): + """ + get params name for inference_layer + Args: + inference_layer(str): layer for inference + Return: + params(list): params name list that for inference layer + """ + layer = inference_layer + if layer in self._inference_meta['params']: + return self._inference_meta['params'][layer] + + self._inference_meta['params'][layer] = [] + self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer) + for node in self._build_nodes['layer']: + if node['name'] not in self._inference_meta['dependency'][layer]: + continue + if 'inference_param' in self._build_param['layer_extend'][node['name']]: + self._inference_meta['params'][layer] += \ + self._build_param['layer_extend'][node['name']]['inference_param']['params'] + return self._inference_meta['params'][layer] + + def get_dependency(self, layer_graph, dest_layer): + """ + get model of dest_layer depends on + Args: + layer_graph(dict) : all model in graph + Return: + depend_layers(list) : sub-graph model for calculate dest_layer + """ + dependency_list = [] + if dest_layer in layer_graph: + dependencys = copy.deepcopy(layer_graph[dest_layer]['input']) + dependency_list = copy.deepcopy(dependencys) + for dependency in dependencys: + dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) + return list(set(dependency_list)) diff --git a/core/modules/modul/layers.py b/core/modules/modul/layers.py new file mode 100755 index 000000000..060c023ff --- /dev/null +++ b/core/modules/modul/layers.py @@ -0,0 +1,262 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid + +from paddlerec.core.layer import Layer + + +class EmbeddingFuseLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._cvm = config['cvm'] + self._name = config['name'] + self._slots = [str(slot) for slot in config['slots']] + self._mf_dim = config['mf_dim'] + self._backward = config['backward'] + self._emb_dim = self._mf_dim + 3 # append show ctr lr + self._emb_layers = [] + + def generate(self, param): + """R + """ + show_clk = fluid.layers.concat( + [param['layer']['show'], param['layer']['click']], axis=1) + show_clk.stop_gradient = True + data_var = [] + for slot in self._slots: + l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1) + data_var.append(l) + emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \ + is_sparse=True, is_distributed=True, + param_attr=fluid.ParamAttr(name="embedding")) + emb = fluid.layers.sequence_pool(input=emb, pool_type='sum') + emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm) + self._emb_layers.append(emb) + output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name) + return output, {'data_var': data_var} + + +class LabelInputLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._dim = config.get('dim', 1) + self._data_type = config.get('data_type', "int64") + self._label_idx = config['label_idx'] + + def generate(self, param): + """R + """ + label = fluid.layers.data(name=self._name, shape=[-1, self._dim], \ + dtype=self._data_type, lod_level=0, append_batch_size=False) + cast_label = fluid.layers.cast(label, dtype='float32') + cast_label.stop_gradient = True + return cast_label, {'data_var': [label]} + + +class TagInputLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._tag = config['tag'] + self._dim = config.get('dim', 1) + self._data_type = config['data_type'] + + def generate(self, param): + """R + """ + output = fluid.layers.data(name=self._name, shape=[-1, self._dim], \ + dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True) + return output, {'data_var': [output]} + + +class ParamLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._coln = config['coln'] + self._table_id = config.get('table_id', -1) + self._init_range = config.get('init_range', 1) + self._data_type = config.get('data_type', 'float32') + self._config = config + + def generate(self, param): + """R + """ + return self._config, {'inference_param': {'name': 'param', 'params': [], 'table_id': self._table_id}} + + +class SummaryLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._table_id = config.get('table_id', -1) + self._data_type = config.get('data_type', 'float32') + self._config = config + + def generate(self, param): + """R + """ + return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}} + + +class NormalizationLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._input = config['input'] + self._summary = config['summary'] + self._table_id = config.get('table_id', -1) + + def generate(self, param): + """R + """ + input_layer = param['layer'][self._input[0]] + summary_layer = param['layer'][self._summary] + if len(self._input) > 0: + input_list = [param['layer'][i] for i in self._input] + input_layer = fluid.layers.concat(input=input_list, axis=1) + bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={ + "batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4}) + inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum'] + return bn, {'inference_param': {'name': 'summary', \ + 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} + + +class FCLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._param = config['param'] + self._input = config['input'] + self._bias = config.get('bias', True) + self._act_func = config.get('act_func', None) + + def generate(self, param): + """R + """ + param_layer = param['layer'][self._param] + input_layer = param['layer'][self._input[0]] + if len(self._input) > 0: + input_list = [param['layer'][i] for i in self._input] + input_layer = fluid.layers.concat(input=input_list, axis=1) + input_coln = input_layer.shape[1] + scale = param_layer['init_range'] / (input_coln ** 0.5) + bias = None + if self._bias: + bias = fluid.ParamAttr(learning_rate=1.0, + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) + fc = fluid.layers.fc( + name=self._name, + input=input_layer, + size=param_layer['coln'], + act=self._act_func, + param_attr= \ + fluid.ParamAttr(learning_rate=1.0, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)), + bias_attr=bias) + inference_param = [self._name + '.w_0', self._name + '.b_0'] + return fc, {'inference_param': {'name': 'param', 'params': inference_param, \ + 'table_id': param_layer.get('table_id', -1)}} + + +class LogLossLayer(Layer): + """R + """ + + def __init__(self, config): + """R + """ + self._name = config['name'] + self._label = config['label'] + self._input = config['input'] + self._weight = config.get('weight', None) + self._metric_label = config.get('metric_label', None) + self._bound = config.get('bound', [-15.0, 15.0]) + self._extend_output = { + 'metric_label': self._metric_label, + 'metric_dict': { + 'auc': {'var': None}, + 'batch_auc': {'var': None}, + 'stat_pos': {'var': None, 'data_type': 'int64'}, + 'stat_neg': {'var': None, 'data_type': 'int64'}, + 'batch_stat_pos': {'var': None, 'data_type': 'int64'}, + 'batch_stat_neg': {'var': None, 'data_type': 'int64'}, + 'pos_ins_num': {'var': None}, + 'abserr': {'var': None}, + 'sqrerr': {'var': None}, + 'prob': {'var': None}, + 'total_ins_num': {'var': None}, + 'q': {'var': None} + } + } + + def generate(self, param): + """R + """ + input_layer = param['layer'][self._input[0]] + label_layer = param['layer'][self._label] + output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name) + norm = fluid.layers.sigmoid(output, name=self._name) + output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32')) + if self._weight: + weight_layer = param['layer'][self._weight] + output = fluid.layers.elementwise_mul(output, weight_layer) + output = fluid.layers.mean(x=output) + self._extend_output['loss'] = output + + # For AUC Metric + metric = self._extend_output['metric_dict'] + binary_predict = fluid.layers.concat( + input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1) + metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \ + metric['batch_stat_neg']['var'], metric['stat_pos']['var'], + metric['stat_neg']['var']] = \ + fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), \ + curve='ROC', num_thresholds=32) + + metric['sqrerr']['var'], metric['abserr']['var'], metric['prob']['var'], metric['q']['var'], \ + metric['pos_ins_num']['var'], metric['total_ins_num']['var'] = \ + fluid.contrib.layers.ctr_metric_bundle(norm, fluid.layers.cast(x=label_layer, dtype='float32')) + + return norm, self._extend_output diff --git a/core/reader.py b/core/reader.py new file mode 100755 index 000000000..01502761e --- /dev/null +++ b/core/reader.py @@ -0,0 +1,102 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function + +import sys +import abc +import os + +import paddle.fluid.incubate.data_generator as dg +import yaml + +from paddlerec.core.utils import envs + + +class Reader(dg.MultiSlotDataGenerator): + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + dg.MultiSlotDataGenerator.__init__(self) + + if os.path.isfile(config): + with open(config, 'r') as rb: + _config = yaml.load(rb.read(), Loader=yaml.FullLoader) + else: + raise ValueError("reader config only support yaml") + + envs.set_global_envs(_config) + envs.update_workspace() + + @abc.abstractmethod + def init(self): + pass + + @abc.abstractmethod + def generate_sample(self, line): + pass + + +class SlotReader(dg.MultiSlotDataGenerator): + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + dg.MultiSlotDataGenerator.__init__(self) + if os.path.isfile(config): + with open(config, 'r') as rb: + _config = yaml.load(rb.read(), Loader=yaml.FullLoader) + else: + raise ValueError("reader config only support yaml") + envs.set_global_envs(_config) + envs.update_workspace() + + def init(self, sparse_slots, dense_slots, padding=0): + from operator import mul + self.sparse_slots = sparse_slots.strip().split(" ") + self.dense_slots = dense_slots.strip().split(" ") + self.dense_slots_shape = [reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) for i in self.dense_slots] + self.dense_slots = [i.split(":")[0] for i in self.dense_slots] + self.slots = self.dense_slots + self.sparse_slots + self.slot2index = {} + self.visit = {} + for i in range(len(self.slots)): + self.slot2index[self.slots[i]] = i + self.visit[self.slots[i]] = False + self.padding = padding + + def generate_sample(self, l): + def reader(): + line = l.strip().split(" ") + output = [(i, []) for i in self.slots] + for i in line: + slot_feasign = i.split(":") + slot = slot_feasign[0] + if slot not in self.slots: + continue + if slot in self.sparse_slots: + feasign = int(slot_feasign[1]) + else: + feasign = float(slot_feasign[1]) + output[self.slot2index[slot]][1].append(feasign) + self.visit[slot] = True + for i in self.visit: + slot = i + if not self.visit[slot]: + if i in self.dense_slots: + output[self.slot2index[i]][1].extend([self.padding] * self.dense_slots_shape[self.slot2index[i]]) + else: + output[self.slot2index[i]][1].extend([self.padding]) + else: + self.visit[slot] = False + yield output + return reader diff --git a/core/trainer.py b/core/trainer.py new file mode 100755 index 000000000..40fc35de9 --- /dev/null +++ b/core/trainer.py @@ -0,0 +1,100 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import os +import time +import sys +import yaml + +from paddle import fluid + +from paddlerec.core.utils import envs + + +class Trainer(object): + """R + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, config=None): + self._status_processor = {} + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self._exector_context = {} + self._context = {'status': 'uninit', 'is_exit': False} + self._config_yaml = config + + with open(config, 'r') as rb: + self._config = yaml.load(rb.read(), Loader=yaml.FullLoader) + + def regist_context_processor(self, status_name, processor): + """ + regist a processor for specify status + """ + self._status_processor[status_name] = processor + + def context_process(self, context): + """ + select a processor to deal specify context + Args: + context : context with status + Return: + None : run a processor for this status + """ + if context['status'] in self._status_processor: + self._status_processor[context['status']](context) + else: + self.other_status_processor(context) + + def other_status_processor(self, context): + """ + if no processor match context.status, use defalut processor + Return: + None, just sleep in base + """ + print('unknow context_status:%s, do nothing' % context['status']) + time.sleep(60) + + def reload_train_context(self): + """ + context maybe update timely, reload for update + """ + pass + + def run(self): + """ + keep running by statu context. + """ + while True: + self.reload_train_context() + self.context_process(self._context) + if self._context['is_exit']: + break + + +def user_define_engine(engine_yaml): + with open(engine_yaml, 'r') as rb: + _config = yaml.load(rb.read(), Loader=yaml.FullLoader) + assert _config is not None + + envs.set_runtime_environs(_config) + + train_location = envs.get_global_env("engine.file") + train_dirname = os.path.dirname(train_location) + base_name = os.path.splitext(os.path.basename(train_location))[0] + sys.path.append(train_dirname) + trainer_class = envs.lazy_instance_by_fliename( + base_name, "UserDefineTraining") + return trainer_class diff --git a/core/trainers/__init__.py b/core/trainers/__init__.py new file mode 100755 index 000000000..cd9c9db5e --- /dev/null +++ b/core/trainers/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +trainer implement. + + ↗ (single/cluster) CtrTrainer +Trainer + ↗ (for single training) SingleTrainer/TDMSingleTrainer + ↘ TranspilerTrainer → (for cluster training) ClusterTrainer/TDMClusterTrainer + ↘ (for online learning training) OnlineLearningTrainer + +""" + + diff --git a/core/trainers/cluster_trainer.py b/core/trainers/cluster_trainer.py new file mode 100755 index 000000000..faa960359 --- /dev/null +++ b/core/trainers/cluster_trainer.py @@ -0,0 +1,181 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with one node only. +""" + +from __future__ import print_function + +import os +import time + +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker + +from paddlerec.core.utils import envs +from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer + + +class ClusterTrainer(TranspileTrainer): + def processor_register(self): + role = PaddleCloudRoleMaker() + fleet.init(role) + + if fleet.is_server(): + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('server_pass', self.server) + else: + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('startup_pass', self.startup) + if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader": + self.regist_context_processor('train_pass', self.dataset_train) + else: + self.regist_context_processor( + 'train_pass', self.dataloader_train) + self.regist_context_processor('infer_pass', self.infer) + self.regist_context_processor('terminal_pass', self.terminal) + + def build_strategy(self): + mode = envs.get_runtime_environ("train.trainer.strategy") + assert mode in ["async", "geo", "sync", "half_async"] + + strategy = None + + if mode == "async": + strategy = StrategyFactory.create_async_strategy() + elif mode == "geo": + push_num = envs.get_global_env("train.strategy.mode.push_num", 100) + strategy = StrategyFactory.create_geo_strategy(push_num) + elif mode == "sync": + strategy = StrategyFactory.create_sync_strategy() + elif mode == "half_async": + strategy = StrategyFactory.create_half_async_strategy() + + assert strategy is not None + + self.strategy = strategy + return strategy + + def init(self, context): + self.model.train_net() + optimizer = self.model.optimizer() + optimizer_name = envs.get_global_env( + "hyper_parameters.optimizer", None, "train.model") + if optimizer_name not in ["", "sgd", "SGD", "Sgd"]: + os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' + + strategy = self.build_strategy() + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(self.model.get_avg_cost()) + + if fleet.is_server(): + context['status'] = 'server_pass' + else: + self.fetch_vars = [] + self.fetch_alias = [] + self.fetch_period = self.model.get_fetch_period() + + metrics = self.model.get_metrics() + if metrics: + self.fetch_vars = metrics.values() + self.fetch_alias = metrics.keys() + context['status'] = 'startup_pass' + + def server(self, context): + fleet.init_server() + fleet.run_server() + context['is_exit'] = True + + def startup(self, context): + self._exe.run(fleet.startup_program) + context['status'] = 'train_pass' + + def dataloader_train(self, context): + fleet.init_worker() + + reader = self._get_dataloader() + epochs = envs.get_global_env("train.epochs") + + program = fluid.compiler.CompiledProgram( + fleet.main_program).with_data_parallel( + loss_name=self.model.get_avg_cost().name, + build_strategy=self.strategy.get_build_strategy(), + exec_strategy=self.strategy.get_execute_strategy()) + + metrics_varnames = [] + metrics_format = [] + + metrics_format.append("{}: {{}}".format("epoch")) + metrics_format.append("{}: {{}}".format("batch")) + + for name, var in self.model.get_metrics().items(): + metrics_varnames.append(var.name) + metrics_format.append("{}: {{}}".format(name)) + + metrics_format = ", ".join(metrics_format) + + for epoch in range(epochs): + reader.start() + batch_id = 0 + try: + while True: + metrics_rets = self._exe.run( + program=program, + fetch_list=metrics_varnames) + + metrics = [epoch, batch_id] + metrics.extend(metrics_rets) + + if batch_id % self.fetch_period == 0 and batch_id != 0: + print(metrics_format.format(*metrics)) + batch_id += 1 + except fluid.core.EOFException: + reader.reset() + self.save(epoch, "train", is_fleet=True) + + fleet.stop_worker() + context['status'] = 'infer_pass' + + def dataset_train(self, context): + fleet.init_worker() + + dataset = self._get_dataset() + ins = self._get_dataset_ins() + + epochs = envs.get_global_env("train.epochs") + + for i in range(epochs): + begin_time = time.time() + self._exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) + end_time = time.time() + times = end_time-begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) + + self.save(i, "train", is_fleet=True) + fleet.stop_worker() + context['status'] = 'infer_pass' + + def terminal(self, context): + for model in self.increment_models: + print("epoch :{}, dir: {}".format(model[0], model[1])) + context['is_exit'] = True diff --git a/core/trainers/ctr_coding_trainer.py b/core/trainers/ctr_coding_trainer.py new file mode 100755 index 000000000..3bfec28cf --- /dev/null +++ b/core/trainers/ctr_coding_trainer.py @@ -0,0 +1,137 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import numpy as np +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker + +from paddlerec.core.utils import envs +from paddlerec.core.trainer import Trainer + + +class CtrTrainer(Trainer): + """R + """ + + def __init__(self, config): + """R + """ + Trainer.__init__(self, config) + + self.global_config = config + self._metrics = {} + self.processor_register() + + def processor_register(self): + role = MPISymetricRoleMaker() + fleet.init(role) + + if fleet.is_server(): + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('server_pass', self.server) + else: + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('train_pass', self.train) + self.regist_context_processor('terminal_pass', self.terminal) + + def _get_dataset(self): + namespace = "train.reader" + + inputs = self.model.get_inputs() + threads = envs.get_global_env("train.threads", None) + batch_size = envs.get_global_env("batch_size", None, namespace) + reader_class = envs.get_global_env("class", None, namespace) + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", self._config_yaml) + train_data_path = envs.get_global_env("train_data_path", None, namespace) + + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var(inputs) + dataset.set_pipe_command(pipe_cmd) + dataset.set_batch_size(batch_size) + dataset.set_thread(threads) + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + + dataset.set_filelist(file_list) + return dataset + + def instance(self, context): + models = envs.get_global_env("train.model.models") + model_class = envs.lazy_instance_by_fliename(models, "Model") + self.model = model_class(None) + context['status'] = 'init_pass' + + def init(self, context): + """R + """ + self.model.train_net() + optimizer = self.model.optimizer() + + optimizer = fleet.distributed_optimizer(optimizer, strategy={"use_cvm": False}) + optimizer.minimize(self.model.get_avg_cost()) + + if fleet.is_server(): + context['status'] = 'server_pass' + else: + self.fetch_vars = [] + self.fetch_alias = [] + self.fetch_period = self.model.get_fetch_period() + + metrics = self.model.get_metrics() + if metrics: + self.fetch_vars = metrics.values() + self.fetch_alias = metrics.keys() + context['status'] = 'train_pass' + + def server(self, context): + fleet.run_server() + fleet.stop_worker() + context['is_exit'] = True + + def train(self, context): + self._exe.run(fluid.default_startup_program()) + fleet.init_worker() + + dataset = self._get_dataset() + + shuf = np.array([fleet.worker_index()]) + gs = shuf * 0 + fleet._role_maker._node_type_comm.Allreduce(shuf, gs) + + print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(), fleet.worker_num(), gs)) + + epochs = envs.get_global_env("train.epochs") + + for i in range(epochs): + self._exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) + + context['status'] = 'terminal_pass' + fleet.stop_worker() + + def terminal(self, context): + print("terminal ended.") + context['is_exit'] = True diff --git a/core/trainers/ctr_modul_trainer.py b/core/trainers/ctr_modul_trainer.py new file mode 100755 index 000000000..7b3bd7874 --- /dev/null +++ b/core/trainers/ctr_modul_trainer.py @@ -0,0 +1,460 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import json +import sys +import time + +import numpy as np +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + + +from paddlerec.core.utils import fs as fs +from paddlerec.core.utils import util as util +from paddlerec.core.metrics.auc_metrics import AUCMetric +from paddlerec.core.modules.modul import build as model_basic +from paddlerec.core.utils import dataset +from paddlerec.core.trainer import Trainer + + +def wroker_numric_opt(value, env, opt): + """ + numric count opt for workers + Args: + value: value for count + env: mpi/gloo + opt: count operator, SUM/MAX/MIN/AVG + Return: + count result + """ + local_value = np.array([value]) + global_value = np.copy(local_value) * 0 + fleet._role_maker.all_reduce_worker(local_value, global_value, opt) + return global_value[0] + + +def worker_numric_sum(value, env="mpi"): + """R + """ + return wroker_numric_opt(value, env, "sum") + + +def worker_numric_avg(value, env="mpi"): + """R + """ + return worker_numric_sum(value, env) / fleet.worker_num() + + +def worker_numric_min(value, env="mpi"): + """R + """ + return wroker_numric_opt(value, env, "min") + + +def worker_numric_max(value, env="mpi"): + """R + """ + return wroker_numric_opt(value, env, "max") + + +class CtrTrainer(Trainer): + """R + """ + + def __init__(self, config): + """R + """ + Trainer.__init__(self, config) + config['output_path'] = util.get_absolute_path( + config['output_path'], config['io']['afs']) + + self.global_config = config + self._metrics = {} + + self._path_generator = util.PathGenerator({ + 'templates': [ + {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, + {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, + {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, + {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'}, + {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'} + ] + }) + if 'path_generator' in config: + self._path_generator.add_path_template(config['path_generator']) + + self.regist_context_processor('uninit', self.init) + self.regist_context_processor('startup', self.startup) + self.regist_context_processor('begin_day', self.begin_day) + self.regist_context_processor('train_pass', self.train_pass) + self.regist_context_processor('end_day', self.end_day) + + def init(self, context): + """R + """ + role_maker = None + if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu': + afs_config = self.global_config['io']['afs'] + role_maker = GeneralRoleMaker( + hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'], + path=self.global_config['output_path'] + "/gloo", + init_timeout_seconds=1200, run_timeout_seconds=1200) + fleet.init(role_maker) + data_var_list = [] + data_var_name_dict = {} + runnnable_scope = [] + runnnable_cost_op = [] + context['status'] = 'startup' + + for executor in self.global_config['executor']: + scope = fluid.Scope() + self._exector_context[executor['name']] = {} + self._exector_context[executor['name']]['scope'] = scope + self._exector_context[executor['name']]['model'] = model_basic.create(executor) + model = self._exector_context[executor['name']]['model'] + self._metrics.update(model.get_metrics()) + runnnable_scope.append(scope) + runnnable_cost_op.append(model.get_avg_cost()) + for var in model._data_var: + if var.name in data_var_name_dict: + continue + data_var_list.append(var) + data_var_name_dict[var.name] = var + + optimizer = model_basic.YamlModel.build_optimizer({ + 'metrics': self._metrics, + 'optimizer_conf': self.global_config['optimizer'] + }) + optimizer.minimize(runnnable_cost_op, runnnable_scope) + for executor in self.global_config['executor']: + scope = self._exector_context[executor['name']]['scope'] + model = self._exector_context[executor['name']]['model'] + program = model._build_param['model']['train_program'] + if not executor['is_update_sparse']: + program._fleet_opt["program_configs"][str(id(model.get_avg_cost().block.program))]["push_sparse"] = [] + if 'train_thread_num' not in executor: + executor['train_thread_num'] = self.global_config['train_thread_num'] + with fluid.scope_guard(scope): + self._exe.run(model._build_param['model']['startup_program']) + model.dump_model_program('./') + + # server init done + if fleet.is_server(): + return 0 + + self._dataset = {} + for dataset_item in self.global_config['dataset']['data_list']: + dataset_item['data_vars'] = data_var_list + dataset_item.update(self.global_config['io']['afs']) + dataset_item["batch_size"] = self.global_config['batch_size'] + self._dataset[dataset_item['name']] = dataset.FluidTimeSplitDataset(dataset_item) + # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass: + # util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3) + fleet.init_worker() + pass + + def print_log(self, log_str, params): + """R + """ + params['index'] = fleet.worker_index() + if params['master']: + if fleet.worker_index() == 0: + print(log_str) + sys.stdout.flush() + else: + print(log_str) + if 'stdout' in params: + params['stdout'] += str(datetime.datetime.now()) + log_str + + def print_global_metrics(self, scope, model, monitor_data, stdout_str): + """R + """ + metrics = model.get_metrics() + metric_calculator = AUCMetric(None) + for metric in metrics: + metric_param = {'label': metric, 'metric_dict': metrics[metric]} + metric_calculator.calculate(scope, metric_param) + metric_result = metric_calculator.get_result_to_string() + self.print_log(metric_result, {'master': True, 'stdout': stdout_str}) + monitor_data += metric_result + metric_calculator.clear(scope, metric_param) + + def save_model(self, day, pass_index, base_key): + """R + """ + cost_printer = util.CostPrinter(util.print_cost, + {'master': True, 'log_format': 'save model cost %s sec'}) + model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) + save_mode = 0 # just save all + if pass_index < 1: # batch_model + save_mode = 3 # unseen_day++, save all + util.rank0_print("going to save_model %s" % model_path) + fleet.save_persistables(None, model_path, mode=save_mode) + if fleet._role_maker.is_first_worker(): + self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) + cost_printer.done() + return model_path + + def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data): + """R + """ + stdout_str = "" + xbox_patch_id = str(int(time.time())) + util.rank0_print("begin save delta model") + + model_path = "" + xbox_model_donefile = "" + cost_printer = util.CostPrinter(util.print_cost, {'master': True, \ + 'log_format': 'save xbox model cost %s sec', + 'stdout': stdout_str}) + if pass_index < 1: + save_mode = 2 + xbox_patch_id = xbox_base_key + model_path = self._path_generator.generate_path('xbox_base', {'day': day}) + xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day}) + else: + save_mode = 1 + model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index}) + xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day}) + total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) + cost_printer.done() + + cost_printer = util.CostPrinter(util.print_cost, {'master': True, + 'log_format': 'save cache model cost %s sec', + 'stdout': stdout_str}) + model_file_handler = fs.FileHandler(self.global_config['io']['afs']) + if self.global_config['save_cache_model']: + cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) + model_file_handler.write( + "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num, + model_path + '/000_cache/sparse_cache.meta', 'w') + cost_printer.done() + util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) + + save_env_param = { + 'executor': self._exe, + 'save_combine': True + } + cost_printer = util.CostPrinter(util.print_cost, {'master': True, + 'log_format': 'save dense model cost %s sec', + 'stdout': stdout_str}) + if fleet._role_maker.is_first_worker(): + for executor in self.global_config['executor']: + if 'layer_for_inference' not in executor: + continue + executor_name = executor['name'] + model = self._exector_context[executor_name]['model'] + save_env_param['inference_list'] = executor['layer_for_inference'] + save_env_param['scope'] = self._exector_context[executor_name]['scope'] + model.dump_inference_param(save_env_param) + for dnn_layer in executor['layer_for_inference']: + model_file_handler.cp(dnn_layer['save_file_name'], + model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) + fleet._role_maker._barrier_worker() + cost_printer.done() + + xbox_done_info = { + "id": xbox_patch_id, + "key": xbox_base_key, + "ins_path": "", + "ins_tag": "feasign", + "partition_type": "2", + "record_count": "111111", + "monitor_data": monitor_data, + "mpi_size": str(fleet.worker_num()), + "input": model_path.rstrip("/") + "/000", + "job_id": util.get_env_value("JOB_ID"), + "job_name": util.get_env_value("JOB_NAME") + } + if fleet._role_maker.is_first_worker(): + model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') + if pass_index > 0: + self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) + fleet._role_maker._barrier_worker() + return stdout_str + + def run_executor(self, executor_config, dataset, stdout_str): + """R + """ + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + xbox_base_key = self._train_pass._base_key + executor_name = executor_config['name'] + scope = self._exector_context[executor_name]['scope'] + model = self._exector_context[executor_name]['model'] + with fluid.scope_guard(scope): + util.rank0_print("Begin " + executor_name + " pass") + begin = time.time() + program = model._build_param['model']['train_program'] + self._exe.train_from_dataset(program, dataset, scope, + thread=executor_config['train_thread_num'], debug=self.global_config['debug']) + end = time.time() + local_cost = (end - begin) / 60.0 + avg_cost = worker_numric_avg(local_cost) + min_cost = worker_numric_min(local_cost) + max_cost = worker_numric_max(local_cost) + util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) + self._exector_context[executor_name]['cost'] = max_cost + + monitor_data = "" + self.print_global_metrics(scope, model, monitor_data, stdout_str) + util.rank0_print("End " + executor_name + " pass") + if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: + stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) + fleet._role_maker._barrier_worker() + + def startup(self, context): + """R + """ + if fleet.is_server(): + fleet.run_server() + context['status'] = 'wait' + return + stdout_str = "" + self._train_pass = util.TimeTrainPass(self.global_config) + if not self.global_config['cold_start']: + cost_printer = util.CostPrinter(util.print_cost, + {'master': True, 'log_format': 'load model cost %s sec', + 'stdout': stdout_str}) + self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) + # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() + # and config.reqi_dnn_plugin_pass >= self._pass_id: + # fleet.load_one_table(0, self._train_pass._checkpoint_model_path) + # else: + fleet.init_server(self._train_pass._checkpoint_model_path, mode=0) + cost_printer.done() + if self.global_config['save_first_base']: + self.print_log("save_first_base=True", {'master': True}) + self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str}) + self._train_pass._base_key = int(time.time()) + stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "") + context['status'] = 'begin_day' + + def begin_day(self, context): + """R + """ + stdout_str = "" + if not self._train_pass.next(): + context['is_exit'] = True + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str}) + if pass_id == self._train_pass.max_pass_num_day(): + context['status'] = 'end_day' + else: + context['status'] = 'train_pass' + + def end_day(self, context): + """R + """ + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + xbox_base_key = int(time.time()) + context['status'] = 'begin_day' + + util.rank0_print("shrink table") + cost_printer = util.CostPrinter(util.print_cost, + {'master': True, 'log_format': 'shrink table done, cost %s sec'}) + fleet.shrink_sparse_table() + for executor in self._exector_context: + self._exector_context[executor]['model'].shrink({ + 'scope': self._exector_context[executor]['scope'], + 'decay': self.global_config['optimizer']['dense_decay_rate'] + }) + cost_printer.done() + + next_date = self._train_pass.date(delta_day=1) + util.rank0_print("going to save xbox base model") + self.save_xbox_model(next_date, 0, xbox_base_key, "") + util.rank0_print("going to save batch model") + self.save_model(next_date, 0, xbox_base_key) + self._train_pass._base_key = xbox_base_key + fleet._role_maker._barrier_worker() + + def train_pass(self, context): + """R + """ + stdout_str = "" + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + base_key = self._train_pass._base_key + pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M") + self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) + train_begin_time = time.time() + + cost_printer = util.CostPrinter(util.print_cost, \ + {'master': True, 'log_format': 'load into memory done, cost %s sec', + 'stdout': stdout_str}) + current_dataset = {} + for name in self._dataset: + current_dataset[name] = self._dataset[name].load_dataset({ + 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), + 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass + }) + fleet._role_maker._barrier_worker() + cost_printer.done() + + util.rank0_print("going to global shuffle") + cost_printer = util.CostPrinter(util.print_cost, { + 'master': True, 'stdout': stdout_str, + 'log_format': 'global shuffle done, cost %s sec'}) + for name in current_dataset: + current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) + cost_printer.done() + # str(dataset.get_shuffle_data_size(fleet)) + fleet._role_maker._barrier_worker() + + if self.global_config['prefetch_data']: + next_pass_time = (self._train_pass._current_train_time + + datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M") + for name in self._dataset: + self._dataset[name].preload_dataset({ + 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), + 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass + }) + + fleet._role_maker._barrier_worker() + pure_train_begin = time.time() + for executor in self.global_config['executor']: + self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) + cost_printer = util.CostPrinter(util.print_cost, \ + {'master': True, 'log_format': 'release_memory cost %s sec'}) + for name in current_dataset: + current_dataset[name].release_memory() + pure_train_cost = time.time() - pure_train_begin + + if self._train_pass.is_checkpoint_pass(pass_id): + self.save_model(day, pass_id, base_key) + + train_end_time = time.time() + train_cost = train_end_time - train_begin_time + other_cost = train_cost - pure_train_cost + log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost) + for executor in self._exector_context: + log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']' + log_str += '[other_cost:' + str(other_cost) + ']' + util.rank0_print(log_str) + stdout_str += util.now_time_str() + log_str + sys.stdout.write(stdout_str) + fleet._role_maker._barrier_worker() + stdout_str = "" + if pass_id == self._train_pass.max_pass_num_day(): + context['status'] = 'end_day' + return + elif not self._train_pass.next(): + context['is_exit'] = True diff --git a/core/trainers/online_learning_trainer.py b/core/trainers/online_learning_trainer.py new file mode 100755 index 000000000..0303e96ac --- /dev/null +++ b/core/trainers/online_learning_trainer.py @@ -0,0 +1,185 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with one node only. +""" + +from __future__ import print_function + +import datetime +import os +import time + +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker + +from paddlerec.core.utils import envs +from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer + + +class OnlineLearningTrainer(TranspileTrainer): + def processor_register(self): + role = PaddleCloudRoleMaker() + fleet.init(role) + + if fleet.is_server(): + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('server_pass', self.server) + else: + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('startup_pass', self.startup) + if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader": + self.regist_context_processor('train_pass', self.dataset_train) + else: + self.regist_context_processor( + 'train_pass', self.dataloader_train) + self.regist_context_processor('infer_pass', self.infer) + self.regist_context_processor('terminal_pass', self.terminal) + + def build_strategy(self): + mode = envs.get_runtime_environ("train.trainer.strategy") + assert mode in ["async", "geo", "sync", "half_async"] + + strategy = None + + if mode == "async": + strategy = StrategyFactory.create_async_strategy() + elif mode == "geo": + push_num = envs.get_global_env("train.strategy.mode.push_num", 100) + strategy = StrategyFactory.create_geo_strategy(push_num) + elif mode == "sync": + strategy = StrategyFactory.create_sync_strategy() + elif mode == "half_async": + strategy = StrategyFactory.create_half_async_strategy() + + assert strategy is not None + + self.strategy = strategy + return strategy + + def init(self, context): + self.model.train_net() + optimizer = self.model.optimizer() + strategy = self.build_strategy() + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(self.model.get_avg_cost()) + + if fleet.is_server(): + context['status'] = 'server_pass' + else: + self.fetch_vars = [] + self.fetch_alias = [] + self.fetch_period = self.model.get_fetch_period() + + metrics = self.model.get_metrics() + if metrics: + self.fetch_vars = metrics.values() + self.fetch_alias = metrics.keys() + context['status'] = 'startup_pass' + + def server(self, context): + fleet.init_server() + fleet.run_server() + context['is_exit'] = True + + def startup(self, context): + self._exe.run(fleet.startup_program) + context['status'] = 'train_pass' + + def dataloader_train(self, context): + print("online learning can only support LINUX only") + context['status'] = 'terminal_pass' + + def _get_dataset(self, state="TRAIN", hour=None): + if state == "TRAIN": + inputs = self.model.get_inputs() + namespace = "train.reader" + train_data_path = envs.get_global_env( + "train_data_path", None, namespace) + else: + inputs = self.model.get_infer_inputs() + namespace = "evaluate.reader" + train_data_path = envs.get_global_env( + "test_data_path", None, namespace) + + threads = int(envs.get_runtime_environ("train.trainer.threads")) + batch_size = envs.get_global_env("batch_size", None, namespace) + reader_class = envs.get_global_env("class", None, namespace) + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + pipe_cmd = "python {} {} {} {}".format( + reader, reader_class, state, self._config_yaml) + + if train_data_path.startswith("paddlerec::"): + package_base = envs.get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + train_data_path = os.path.join( + package_base, train_data_path.split("::")[1]) + + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var(inputs) + dataset.set_pipe_command(pipe_cmd) + dataset.set_batch_size(batch_size) + dataset.set_thread(threads) + + if hour is not None: + train_data_path = os.path.join(train_data_path, hour) + + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + + self.files = file_list + dataset.set_filelist(self.files) + return dataset + + def dataset_train(self, context): + fleet.init_worker() + + days = envs.get_global_env("train.days") + begin_day = datetime.datetime.strptime("begin_day_d", '%Y%m%d') + + for day in range(days): + for hour in range(24): + day = begin_day + datetime.timedelta(days=day, hours=hour) + day_s = day.strftime('%Y%m%d/%H') + i = day.strftime('%Y%m%d_%H') + + dataset = self._get_dataset(hour=day_s) + ins = self._get_dataset_ins() + + begin_time = time.time() + self._exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) + end_time = time.time() + times = end_time-begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) + self.save(i, "train", is_fleet=True) + + fleet.stop_worker() + context['status'] = 'infer_pass' + + def terminal(self, context): + for model in self.increment_models: + print("epoch :{}, dir: {}".format(model[0], model[1])) + context['is_exit'] = True diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py new file mode 100755 index 000000000..8079377ba --- /dev/null +++ b/core/trainers/single_trainer.py @@ -0,0 +1,135 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with one node only. +""" + +from __future__ import print_function + +import time +import logging + +import paddle.fluid as fluid + +from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer +from paddlerec.core.utils import envs + +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) + + +class SingleTrainer(TranspileTrainer): + def processor_register(self): + self.regist_context_processor('uninit', self.instance) + self.regist_context_processor('init_pass', self.init) + self.regist_context_processor('startup_pass', self.startup) + if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, + "train.reader") != "DataLoader": + self.regist_context_processor('train_pass', self.dataset_train) + else: + self.regist_context_processor('train_pass', self.dataloader_train) + + self.regist_context_processor('infer_pass', self.infer) + self.regist_context_processor('terminal_pass', self.terminal) + + def init(self, context): + self.model.train_net() + optimizer = self.model.optimizer() + optimizer.minimize((self.model.get_avg_cost())) + + self.fetch_vars = [] + self.fetch_alias = [] + self.fetch_period = self.model.get_fetch_period() + + metrics = self.model.get_metrics() + if metrics: + self.fetch_vars = metrics.values() + self.fetch_alias = metrics.keys() + evaluate_only = envs.get_global_env( + 'evaluate_only', False, namespace='evaluate') + if evaluate_only: + context['status'] = 'infer_pass' + else: + context['status'] = 'startup_pass' + + def startup(self, context): + self._exe.run(fluid.default_startup_program()) + context['status'] = 'train_pass' + + def dataloader_train(self, context): + reader = self._get_dataloader("TRAIN") + epochs = envs.get_global_env("train.epochs") + + program = fluid.compiler.CompiledProgram( + fluid.default_main_program()).with_data_parallel( + loss_name=self.model.get_avg_cost().name) + + metrics_varnames = [] + metrics_format = [] + + metrics_format.append("{}: {{}}".format("epoch")) + metrics_format.append("{}: {{}}".format("batch")) + + for name, var in self.model.get_metrics().items(): + metrics_varnames.append(var.name) + metrics_format.append("{}: {{}}".format(name)) + + metrics_format = ", ".join(metrics_format) + + for epoch in range(epochs): + reader.start() + batch_id = 0 + try: + while True: + metrics_rets = self._exe.run( + program=program, + fetch_list=metrics_varnames) + + metrics = [epoch, batch_id] + metrics.extend(metrics_rets) + + if batch_id % self.fetch_period == 0 and batch_id != 0: + print(metrics_format.format(*metrics)) + batch_id += 1 + except fluid.core.EOFException: + reader.reset() + self.save(epoch, "train", is_fleet=False) + + context['status'] = 'infer_pass' + + def dataset_train(self, context): + dataset = self._get_dataset("TRAIN") + ins = self._get_dataset_ins() + + epochs = envs.get_global_env("train.epochs") + for i in range(epochs): + begin_time = time.time() + self._exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) + end_time = time.time() + times = end_time - begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins / times)) + + self.save(i, "train", is_fleet=False) + context['status'] = 'infer_pass' + + def terminal(self, context): + for model in self.increment_models: + print("epoch :{}, dir: {}".format(model[0], model[1])) + context['is_exit'] = True diff --git a/core/trainers/tdm_cluster_trainer.py b/core/trainers/tdm_cluster_trainer.py new file mode 100755 index 000000000..3bd1ad336 --- /dev/null +++ b/core/trainers/tdm_cluster_trainer.py @@ -0,0 +1,123 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with one node only. +""" + +from __future__ import print_function + +import logging + +import numpy as np +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + +from paddlerec.core.utils import envs +from paddlerec.core.trainers.cluster_trainer import ClusterTrainer + +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) +special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"] + + +class TDMClusterTrainer(ClusterTrainer): + def server(self, context): + namespace = "train.startup" + init_model_path = envs.get_global_env( + "cluster.init_model_path", "", namespace) + assert init_model_path != "", "Cluster train must has init_model for TDM" + fleet.init_server(init_model_path) + logger.info("TDM: load model from {}".format(init_model_path)) + fleet.run_server() + context['is_exit'] = True + + def startup(self, context): + self._exe.run(fleet.startup_program) + + namespace = "train.startup" + load_tree = envs.get_global_env( + "tree.load_tree", True, namespace) + self.tree_layer_path = envs.get_global_env( + "tree.tree_layer_path", "", namespace) + self.tree_travel_path = envs.get_global_env( + "tree.tree_travel_path", "", namespace) + self.tree_info_path = envs.get_global_env( + "tree.tree_info_path", "", namespace) + + save_init_model = envs.get_global_env( + "cluster.save_init_model", False, namespace) + init_model_path = envs.get_global_env( + "cluster.init_model_path", "", namespace) + + if load_tree: + # covert tree to tensor, set it into Fluid's variable. + for param_name in special_param: + param_t = fluid.global_scope().find_var(param_name).get_tensor() + param_array = self._tdm_prepare(param_name) + param_t.set(param_array.astype('int32'), self._place) + + if save_init_model: + logger.info("Begin Save Init model.") + fluid.io.save_persistables( + executor=self._exe, dirname=init_model_path) + logger.info("End Save Init model.") + + context['status'] = 'train_pass' + + def _tdm_prepare(self, param_name): + if param_name == "TDM_Tree_Travel": + travel_array = self._tdm_travel_prepare() + return travel_array + elif param_name == "TDM_Tree_Layer": + layer_array, _ = self._tdm_layer_prepare() + return layer_array + elif param_name == "TDM_Tree_Info": + info_array = self._tdm_info_prepare() + return info_array + else: + raise " {} is not a special tdm param name".format(param_name) + + def _tdm_travel_prepare(self): + """load tdm tree param from npy/list file""" + travel_array = np.load(self.tree_travel_path) + logger.info("TDM Tree leaf node nums: {}".format( + travel_array.shape[0])) + return travel_array + + def _tdm_layer_prepare(self): + """load tdm tree param from npy/list file""" + layer_list = [] + layer_list_flat = [] + with open(self.tree_layer_path, 'r') as fin: + for line in fin.readlines(): + l = [] + layer = (line.split('\n'))[0].split(',') + for node in layer: + if node: + layer_list_flat.append(node) + l.append(node) + layer_list.append(l) + layer_array = np.array(layer_list_flat) + layer_array = layer_array.reshape([-1, 1]) + logger.info("TDM Tree max layer: {}".format(len(layer_list))) + logger.info("TDM Tree layer_node_num_list: {}".format( + [len(i) for i in layer_list])) + return layer_array, layer_list + + def _tdm_info_prepare(self): + """load tdm tree param from list file""" + info_array = np.load(self.tree_info_path) + return info_array diff --git a/core/trainers/tdm_single_trainer.py b/core/trainers/tdm_single_trainer.py new file mode 100755 index 000000000..21be66a67 --- /dev/null +++ b/core/trainers/tdm_single_trainer.py @@ -0,0 +1,139 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with one node only. +""" + +from __future__ import print_function +import logging + +import numpy as np +import paddle.fluid as fluid +from paddlerec.core.trainers.single_trainer import SingleTrainer +from paddlerec.core.utils import envs + +logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger("fluid") +logger.setLevel(logging.INFO) +special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", + "TDM_Tree_Info", "TDM_Tree_Emb"] + + +class TDMSingleTrainer(SingleTrainer): + def startup(self, context): + namespace = "train.startup" + load_persistables = envs.get_global_env( + "single.load_persistables", False, namespace) + persistables_model_path = envs.get_global_env( + "single.persistables_model_path", "", namespace) + + load_tree = envs.get_global_env( + "tree.load_tree", False, namespace) + self.tree_layer_path = envs.get_global_env( + "tree.tree_layer_path", "", namespace) + self.tree_travel_path = envs.get_global_env( + "tree.tree_travel_path", "", namespace) + self.tree_info_path = envs.get_global_env( + "tree.tree_info_path", "", namespace) + self.tree_emb_path = envs.get_global_env( + "tree.tree_emb_path", "", namespace) + + save_init_model = envs.get_global_env( + "single.save_init_model", False, namespace) + init_model_path = envs.get_global_env( + "single.init_model_path", "", namespace) + self._exe.run(fluid.default_startup_program()) + + if load_persistables: + # 从paddle二进制模型加载参数 + fluid.io.load_persistables( + executor=self._exe, + dirname=persistables_model_path, + main_program=fluid.default_main_program()) + logger.info("Load persistables from \"{}\"".format( + persistables_model_path)) + + if load_tree: + # covert tree to tensor, set it into Fluid's variable. + for param_name in special_param: + param_t = fluid.global_scope().find_var(param_name).get_tensor() + param_array = self._tdm_prepare(param_name) + if param_name == 'TDM_Tree_Emb': + param_t.set(param_array.astype('float32'), self._place) + else: + param_t.set(param_array.astype('int32'), self._place) + + if save_init_model: + logger.info("Begin Save Init model.") + fluid.io.save_persistables( + executor=self._exe, dirname=init_model_path) + logger.info("End Save Init model.") + + context['status'] = 'train_pass' + + def _tdm_prepare(self, param_name): + if param_name == "TDM_Tree_Travel": + travel_array = self._tdm_travel_prepare() + return travel_array + elif param_name == "TDM_Tree_Layer": + layer_array, _ = self._tdm_layer_prepare() + return layer_array + elif param_name == "TDM_Tree_Info": + info_array = self._tdm_info_prepare() + return info_array + elif param_name == "TDM_Tree_Emb": + emb_array = self._tdm_emb_prepare() + return emb_array + else: + raise " {} is not a special tdm param name".format(param_name) + + def _tdm_travel_prepare(self): + """load tdm tree param from npy/list file""" + travel_array = np.load(self.tree_travel_path) + logger.info("TDM Tree leaf node nums: {}".format( + travel_array.shape[0])) + return travel_array + + def _tdm_emb_prepare(self): + """load tdm tree param from npy/list file""" + emb_array = np.load(self.tree_emb_path) + logger.info("TDM Tree node nums from emb: {}".format( + emb_array.shape[0])) + return emb_array + + def _tdm_layer_prepare(self): + """load tdm tree param from npy/list file""" + layer_list = [] + layer_list_flat = [] + with open(self.tree_layer_path, 'r') as fin: + for line in fin.readlines(): + l = [] + layer = (line.split('\n'))[0].split(',') + for node in layer: + if node: + layer_list_flat.append(node) + l.append(node) + layer_list.append(l) + layer_array = np.array(layer_list_flat) + layer_array = layer_array.reshape([-1, 1]) + logger.info("TDM Tree max layer: {}".format(len(layer_list))) + logger.info("TDM Tree layer_node_num_list: {}".format( + [len(i) for i in layer_list])) + return layer_array, layer_list + + def _tdm_info_prepare(self): + """load tdm tree param from list file""" + info_array = np.load(self.tree_info_path) + return info_array diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py new file mode 100755 index 000000000..a67d4759b --- /dev/null +++ b/core/trainers/transpiler_trainer.py @@ -0,0 +1,296 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Training use fluid with DistributeTranspiler +""" +import os + +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + +from paddlerec.core.trainer import Trainer +from paddlerec.core.utils import envs +from paddlerec.core.utils import dataloader_instance +from paddlerec.core.reader import SlotReader + + +class TranspileTrainer(Trainer): + def __init__(self, config=None): + Trainer.__init__(self, config) + device = envs.get_global_env("train.device", "cpu") + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + self._exe = fluid.Executor(self._place) + self.processor_register() + self.model = None + self.inference_models = [] + self.increment_models = [] + + def processor_register(self): + print("Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first") + + def _get_dataloader(self, state="TRAIN"): + if state == "TRAIN": + dataloader = self.model._data_loader + namespace = "train.reader" + class_name = "TrainReader" + else: + dataloader = self.model._infer_data_loader + namespace = "evaluate.reader" + class_name = "EvaluateReader" + + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) + + batch_size = envs.get_global_env("batch_size", None, namespace) + print("batch_size: {}".format(batch_size)) + + if sparse_slots is None and dense_slots is None: + reader_class = envs.get_global_env("class", None, namespace) + reader = dataloader_instance.dataloader( + reader_class, state, self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, class_name) + reader_ins = reader_class(self._config_yaml) + else: + reader = dataloader_instance.slotdataloader("", state, self._config_yaml) + reader_ins = SlotReader(self._config_yaml) + + if hasattr(reader_ins, 'generate_batch_from_trainfiles'): + dataloader.set_sample_list_generator(reader) + else: + dataloader.set_sample_generator(reader, batch_size) + + debug_mode = envs.get_global_env("reader_debug_mode", False, namespace) + if debug_mode: + print("--- DataLoader Debug Mode Begin , show pre 10 data ---") + for idx, line in enumerate(reader()): + print(line) + if idx >= 9: + break + print("--- DataLoader Debug Mode End , show pre 10 data ---") + exit(0) + return dataloader + + def _get_dataset_ins(self): + count = 0 + for f in self.files: + for _, _ in enumerate(open(f, 'r')): + count += 1 + return count + + def _get_dataset(self, state="TRAIN"): + if state == "TRAIN": + inputs = self.model.get_inputs() + namespace = "train.reader" + train_data_path = envs.get_global_env( + "train_data_path", None, namespace) + else: + inputs = self.model.get_infer_inputs() + namespace = "evaluate.reader" + train_data_path = envs.get_global_env( + "test_data_path", None, namespace) + + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) + + threads = int(envs.get_runtime_environ("train.trainer.threads")) + batch_size = envs.get_global_env("batch_size", None, namespace) + reader_class = envs.get_global_env("class", None, namespace) + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + + if sparse_slots is None and dense_slots is None: + pipe_cmd = "python {} {} {} {}".format( + reader, reader_class, state, self._config_yaml) + else: + padding = envs.get_global_env("padding", 0, namespace) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, namespace, \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) + + if train_data_path.startswith("paddlerec::"): + package_base = envs.get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + train_data_path = os.path.join( + package_base, train_data_path.split("::")[1]) + + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var(inputs) + dataset.set_pipe_command(pipe_cmd) + dataset.set_batch_size(batch_size) + dataset.set_thread(threads) + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + self.files = file_list + dataset.set_filelist(self.files) + + debug_mode = envs.get_global_env("reader_debug_mode", False, namespace) + if debug_mode: + print( + "--- Dataset Debug Mode Begin , show pre 10 data of {}---".format(file_list[0])) + os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd)) + print( + "--- Dataset Debug Mode End , show pre 10 data of {}---".format(file_list[0])) + exit(0) + + return dataset + + def save(self, epoch_id, namespace, is_fleet=False): + def need_save(epoch_id, epoch_interval, is_last=False): + if is_last: + return True + + if epoch_id == -1: + return False + + return epoch_id % epoch_interval == 0 + + def save_inference_model(): + save_interval = envs.get_global_env( + "save.inference.epoch_interval", -1, namespace) + + if not need_save(epoch_id, save_interval, False): + return + + feed_varnames = envs.get_global_env( + "save.inference.feed_varnames", None, namespace) + fetch_varnames = envs.get_global_env( + "save.inference.fetch_varnames", None, namespace) + if feed_varnames is None or fetch_varnames is None: + return + + fetch_vars = [fluid.default_main_program().global_block().vars[varname] + for varname in fetch_varnames] + dirname = envs.get_global_env( + "save.inference.dirname", None, namespace) + + assert dirname is not None + dirname = os.path.join(dirname, str(epoch_id)) + + if is_fleet: + fleet.save_inference_model( + self._exe, dirname, feed_varnames, fetch_vars) + else: + fluid.io.save_inference_model( + dirname, feed_varnames, fetch_vars, self._exe) + self.inference_models.append((epoch_id, dirname)) + + def save_persistables(): + save_interval = envs.get_global_env( + "save.increment.epoch_interval", -1, namespace) + + if not need_save(epoch_id, save_interval, False): + return + + dirname = envs.get_global_env( + "save.increment.dirname", None, namespace) + + assert dirname is not None + dirname = os.path.join(dirname, str(epoch_id)) + + if is_fleet: + fleet.save_persistables(self._exe, dirname) + else: + fluid.io.save_persistables(self._exe, dirname) + self.increment_models.append((epoch_id, dirname)) + + save_persistables() + save_inference_model() + + def instance(self, context): + models = envs.get_global_env("train.model.models") + model_class = envs.lazy_instance_by_fliename(models, "Model") + self.model = model_class(None) + context['status'] = 'init_pass' + + def init(self, context): + print("Need to be implement") + context['is_exit'] = True + + def dataloader_train(self, context): + print("Need to be implement") + context['is_exit'] = True + + def dataset_train(self, context): + print("Need to be implement") + context['is_exit'] = True + + def infer(self, context): + infer_program = fluid.Program() + startup_program = fluid.Program() + with fluid.unique_name.guard(): + with fluid.program_guard(infer_program, startup_program): + self.model.infer_net() + + if self.model._infer_data_loader is None: + context['status'] = 'terminal_pass' + return + + reader = self._get_dataloader("Evaluate") + + metrics_varnames = [] + metrics_format = [] + + metrics_format.append("{}: {{}}".format("epoch")) + metrics_format.append("{}: {{}}".format("batch")) + + for name, var in self.model.get_infer_results().items(): + metrics_varnames.append(var.name) + metrics_format.append("{}: {{}}".format(name)) + + metrics_format = ", ".join(metrics_format) + self._exe.run(startup_program) + + model_list = self.increment_models + + evaluate_only = envs.get_global_env( + 'evaluate_only', False, namespace='evaluate') + if evaluate_only: + model_list = [(0, envs.get_global_env( + 'evaluate_model_path', "", namespace='evaluate'))] + + is_return_numpy = envs.get_global_env( + 'is_return_numpy', True, namespace='evaluate') + + for (epoch, model_dir) in model_list: + print("Begin to infer No.{} model, model_dir: {}".format( + epoch, model_dir)) + program = infer_program.clone() + fluid.io.load_persistables(self._exe, model_dir, program) + reader.start() + batch_id = 0 + try: + while True: + metrics_rets = self._exe.run( + program=program, + fetch_list=metrics_varnames, + return_numpy=is_return_numpy) + + metrics = [epoch, batch_id] + metrics.extend(metrics_rets) + + if batch_id % 2 == 0 and batch_id != 0: + print(metrics_format.format(*metrics)) + batch_id += 1 + except fluid.core.EOFException: + reader.reset() + + context['status'] = 'terminal_pass' + + def terminal(self, context): + print("clean up and exit") + context['is_exit'] = True diff --git a/core/utils/__init__.py b/core/utils/__init__.py new file mode 100755 index 000000000..abf198b97 --- /dev/null +++ b/core/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/utils/dataloader_instance.py b/core/utils/dataloader_instance.py new file mode 100755 index 000000000..8d4db6f82 --- /dev/null +++ b/core/utils/dataloader_instance.py @@ -0,0 +1,111 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function + +import os + +from paddlerec.core.utils.envs import lazy_instance_by_fliename +from paddlerec.core.utils.envs import get_global_env +from paddlerec.core.utils.envs import get_runtime_environ +from paddlerec.core.reader import SlotReader + + +def dataloader(readerclass, train, yaml_file): + if train == "TRAIN": + reader_name = "TrainReader" + namespace = "train.reader" + data_path = get_global_env("train_data_path", None, namespace) + else: + reader_name = "EvaluateReader" + namespace = "evaluate.reader" + data_path = get_global_env("test_data_path", None, namespace) + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + + reader_class = lazy_instance_by_fliename(readerclass, reader_name) + reader = reader_class(yaml_file) + reader.init() + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader + + +def slotdataloader(readerclass, train, yaml_file): + if train == "TRAIN": + reader_name = "SlotReader" + namespace = "train.reader" + data_path = get_global_env("train_data_path", None, namespace) + else: + reader_name = "SlotReader" + namespace = "evaluate.reader" + data_path = get_global_env("test_data_path", None, namespace) + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + + sparse = get_global_env("sparse_slots", None, namespace) + dense = get_global_env("dense_slots", None, namespace) + padding = get_global_env("padding", 0, namespace) + reader = SlotReader(yaml_file) + reader.init(sparse, dense, int(padding)) + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader diff --git a/core/utils/dataset_holder.py b/core/utils/dataset_holder.py new file mode 100755 index 000000000..cd1954503 --- /dev/null +++ b/core/utils/dataset_holder.py @@ -0,0 +1,190 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import datetime +import time + +import paddle.fluid as fluid + +from paddlerec.core.utils import fs as fs +from paddlerec.core.utils import util as util + + +class DatasetHolder(object): + """ + Dataset Base + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + """ + """ + self._datasets = {} + self._config = config + + @abc.abstractmethod + def check_ready(self, params): + """ + check data ready or not + Return: + True/False + """ + pass + + @abc.abstractmethod + def load_dataset(self, params): + """R + """ + pass + + @abc.abstractmethod + def preload_dataset(self, params): + """R + """ + pass + + @abc.abstractmethod + def release_dataset(self, params): + """R + """ + pass + + +class TimeSplitDatasetHolder(DatasetHolder): + """ + Dataset with time split dir. root_path/$DAY/$HOUR + """ + + def __init__(self, config): + """ + init data root_path, time_split_interval, data_path_format + """ + Dataset.__init__(self, config) + if 'data_donefile' not in config or config['data_donefile'] is None: + config['data_donefile'] = config['data_path'] + "/to.hadoop.done" + self._path_generator = util.PathGenerator({'templates': [ + {'name': 'data_path', 'template': config['data_path']}, + {'name': 'donefile_path', 'template': config['data_donefile']} + ]}) + self._split_interval = config['split_interval'] # data split N mins per dir + self._data_file_handler = fs.FileHandler(config) + + def _format_data_time(self, daytime_str, time_window_mins): + """ """ + data_time = util.make_datetime(daytime_str) + mins_of_day = data_time.hour * 60 + data_time.minute + begin_stage = mins_of_day / self._split_interval + end_stage = (mins_of_day + time_window_mins) / self._split_interval + if begin_stage == end_stage and mins_of_day % self._split_interval != 0: + return None, 0 + + if mins_of_day % self._split_interval != 0: + skip_mins = self._split_interval - (mins_of_day % self._split_interval) + data_time = data_time + datetime.timedelta(minutes=skip_mins) + time_window_mins = time_window_mins - skip_mins + return data_time, time_window_mins + + def check_ready(self, daytime_str, time_window_mins): + """ + data in [daytime_str, daytime_str + time_window_mins] is ready or not + Args: + daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00" + time_window_mins(int): from daytime_str to daytime_str + time_window_mins + Return: + True/False + """ + is_ready = True + data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) + while time_window_mins > 0: + file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) + if not self._data_file_handler.is_exist(file_path): + is_ready = False + break + time_window_mins = time_window_mins - self._split_interval + data_time = data_time + datetime.timedelta(minutes=self._split_interval) + return is_ready + + def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): + """ + data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] + Args: + daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00" + time_window_mins(int): from daytime_str to daytime_str + time_window_mins + node_num(int): data split shard num + node_idx(int): shard_idx + Return: + list, data_shard[node_idx] + """ + data_file_list = [] + data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) + while time_window_mins > 0: + file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) + sub_file_list = self._data_file_handler.ls(file_path) + for sub_file in sub_file_list: + sub_file_name = self._data_file_handler.get_file_name(sub_file) + if not sub_file_name.startswith(self._config['filename_prefix']): + continue + if hash(sub_file_name) % node_num == node_idx: + data_file_list.append(sub_file) + time_window_mins = time_window_mins - self._split_interval + data_time = data_time + datetime.timedelta(minutes=self._split_interval) + return data_file_list + + def _alloc_dataset(self, file_list): + """ """ + dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) + dataset.set_batch_size(self._config['batch_size']) + dataset.set_thread(self._config['load_thread']) + dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi']) + dataset.set_pipe_command(self._config['data_converter']) + dataset.set_filelist(file_list) + dataset.set_use_var(self._config['data_vars']) + # dataset.set_fleet_send_sleep_seconds(2) + # dataset.set_fleet_send_batch_size(80000) + return dataset + + def load_dataset(self, params): + """ """ + begin_time = params['begin_time'] + windown_min = params['time_window_min'] + if begin_time not in self._datasets: + while self.check_ready(begin_time, windown_min) == False: + print("dataset not ready, time:" + begin_time) + time.sleep(30) + file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + self._datasets[begin_time] = self._alloc_dataset(file_list) + self._datasets[begin_time].load_into_memory() + else: + self._datasets[begin_time].wait_preload_done() + return self._datasets[begin_time] + + def preload_dataset(self, params): + """ """ + begin_time = params['begin_time'] + windown_min = params['time_window_min'] + if begin_time not in self._datasets: + if self.check_ready(begin_time, windown_min): + file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + self._datasets[begin_time] = self._alloc_dataset(file_list) + self._datasets[begin_time].preload_into_memory(self._config['preload_thread']) + return True + return False + + def release_dataset(self, params): + """ """ + begin_time = params['begin_time'] + windown_min = params['time_window_min'] + if begin_time in self._datasets: + self._datasets[begin_time].release_memory() diff --git a/core/utils/dataset_instance.py b/core/utils/dataset_instance.py new file mode 100755 index 000000000..f5175c48d --- /dev/null +++ b/core/utils/dataset_instance.py @@ -0,0 +1,48 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys + +from paddlerec.core.utils.envs import lazy_instance_by_fliename +from paddlerec.core.reader import SlotReader +from paddlerec.core.utils import envs + +if len(sys.argv) < 4: + raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path") + +reader_package = sys.argv[1] + +if sys.argv[2].upper() == "TRAIN": + reader_name = "TrainReader" +elif sys.argv[2].upper() == "EVALUATE": + reader_name = "EvaluateReader" +else: + reader_name = "SlotReader" + namespace = sys.argv[4] + sparse_slots = sys.argv[5].replace("#", " ") + dense_slots = sys.argv[6].replace("#", " ") + padding = int(sys.argv[7]) + +yaml_abs_path = sys.argv[3] + +if reader_name != "SlotReader": + reader_class = lazy_instance_by_fliename(reader_package, reader_name) + reader = reader_class(yaml_abs_path) + reader.init() + reader.run_from_stdin() +else: + reader = SlotReader(yaml_abs_path) + reader.init(sparse_slots, dense_slots, padding) + reader.run_from_stdin() diff --git a/core/utils/envs.py b/core/utils/envs.py new file mode 100755 index 000000000..7093d897e --- /dev/null +++ b/core/utils/envs.py @@ -0,0 +1,198 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import closing +import copy +import os +import socket +import sys + +global_envs = {} + + +def flatten_environs(envs, separator="."): + flatten_dict = {} + assert isinstance(envs, dict) + + def fatten_env_namespace(namespace_nests, local_envs): + if not isinstance(local_envs, dict): + global_k = separator.join(namespace_nests) + flatten_dict[global_k] = str(local_envs) + else: + for k, v in local_envs.items(): + if isinstance(v, dict): + nests = copy.deepcopy(namespace_nests) + nests.append(k) + fatten_env_namespace(nests, v) + else: + global_k = separator.join(namespace_nests + [k]) + flatten_dict[global_k] = str(v) + + for k, v in envs.items(): + fatten_env_namespace([k], v) + + return flatten_dict + + +def set_runtime_environs(environs): + for k, v in environs.items(): + os.environ[k] = str(v) + + +def get_runtime_environ(key): + return os.getenv(key, None) + + +def get_trainer(): + train_mode = get_runtime_environ("train.trainer.trainer") + return train_mode + + +def set_global_envs(envs): + assert isinstance(envs, dict) + + def fatten_env_namespace(namespace_nests, local_envs): + for k, v in local_envs.items(): + if isinstance(v, dict): + nests = copy.deepcopy(namespace_nests) + nests.append(k) + fatten_env_namespace(nests, v) + else: + global_k = ".".join(namespace_nests + [k]) + global_envs[global_k] = v + + for k, v in envs.items(): + fatten_env_namespace([k], v) + + +def get_global_env(env_name, default_value=None, namespace=None): + """ + get os environment value + """ + _env_name = env_name if namespace is None else ".".join( + [namespace, env_name]) + return global_envs.get(_env_name, default_value) + + +def get_global_envs(): + return global_envs + + +def path_adapter(path): + if path.startswith("paddlerec."): + package = get_runtime_environ("PACKAGE_BASE") + l_p = path.split("paddlerec.")[1].replace(".", "/") + return os.path.join(package, l_p) + else: + return path + + +def windows_path_converter(path): + if get_platform() == "WINDOWS": + return path.replace("/", "\\") + else: + return path.replace("\\", "/") + + +def update_workspace(): + workspace = global_envs.get("train.workspace", None) + if not workspace: + return + workspace = path_adapter(workspace) + + for name, value in global_envs.items(): + if isinstance(value, str): + value = value.replace("{workspace}", workspace) + value = windows_path_converter(value) + global_envs[name] = value + + +def pretty_print_envs(envs, header=None): + spacing = 5 + max_k = 45 + max_v = 50 + + for k, v in envs.items(): + max_k = max(max_k, len(k)) + + h_format = "{{:^{}s}}{}{{:<{}s}}\n".format(max_k, " " * spacing, max_v) + l_format = "{{:<{}s}}{{}}{{:<{}s}}\n".format(max_k, max_v) + length = max_k + max_v + spacing + + border = "".join(["="] * length) + line = "".join(["-"] * length) + + draws = "" + draws += border + "\n" + + if header: + draws += h_format.format(header[0], header[1]) + else: + draws += h_format.format("paddlerec Global Envs", "Value") + + draws += line + "\n" + + for k, v in envs.items(): + if isinstance(v, str) and len(v) >= max_v: + str_v = "... " + v[-46:] + else: + str_v = v + + draws += l_format.format(k, " " * spacing, str(str_v)) + + draws += border + + _str = "\n{}\n".format(draws) + return _str + + +def lazy_instance_by_package(package, class_name): + models = get_global_env("train.model.models") + model_package = __import__( + package, globals(), locals(), package.split(".")) + instance = getattr(model_package, class_name) + return instance + + +def lazy_instance_by_fliename(abs, class_name): + dirname = os.path.dirname(abs) + sys.path.append(dirname) + package = os.path.splitext(os.path.basename(abs))[0] + + model_package = __import__( + package, globals(), locals(), package.split(".")) + instance = getattr(model_package, class_name) + return instance + + +def get_platform(): + import platform + plats = platform.platform() + if 'Linux' in plats: + return "LINUX" + if 'Darwin' in plats: + return "DARWIN" + if 'Windows' in plats: + return "WINDOWS" + + +def find_free_port(): + def __free_port(): + with closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as s: + s.bind(('', 0)) + return s.getsockname()[1] + + new_port = __free_port() + return new_port diff --git a/core/utils/fs.py b/core/utils/fs.py new file mode 100755 index 000000000..836c6f598 --- /dev/null +++ b/core/utils/fs.py @@ -0,0 +1,178 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient + + +def is_afs_path(path): + """R + """ + if path.startswith("afs") or path.startswith("hdfs"): + return True + return False + + +class LocalFSClient(object): + """ + Util for local disk file_system io + """ + + def __init__(self): + """R + """ + pass + + def write(self, content, path, mode): + """ + write to file + Args: + content(string) + path(string) + mode(string): w/a w:clear_write a:append_write + """ + temp_dir = os.path.dirname(path) + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + f = open(path, mode) + f.write(content) + f.flush() + f.close() + + def cp(self, org_path, dest_path): + """R + """ + temp_dir = os.path.dirname(dest_path) + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + return os.system("cp -r " + org_path + " " + dest_path) + + def cat(self, file_path): + """R + """ + f = open(file_path) + content = f.read() + f.close() + return content + + def mkdir(self, dir_name): + """R + """ + os.makedirs(dir_name) + + def remove(self, path): + """R + """ + os.system("rm -rf " + path) + + def is_exist(self, path): + """R + """ + if os.system("ls " + path) == 0: + return True + return False + + def ls(self, path): + """R + """ + files = os.listdir(path) + return files + + +class FileHandler(object): + """ + A Smart file handler. auto judge local/afs by path + """ + + def __init__(self, config): + """R + """ + if 'fs_name' in config: + hadoop_home = "$HADOOP_HOME" + hdfs_configs = { + "hadoop.job.ugi": config['fs_ugi'], + "fs.default.name": config['fs_name'] + } + self._hdfs_client = HDFSClient(hadoop_home, hdfs_configs) + self._local_fs_client = LocalFSClient() + + def is_exist(self, path): + """R + """ + if is_afs_path(path): + return self._hdfs_client.is_exist(path) + else: + return self._local_fs_client.is_exist(path) + + def get_file_name(self, path): + """R + """ + sub_paths = path.split('/') + return sub_paths[-1] + + def write(self, content, dest_path, mode='w'): + """R + """ + if is_afs_path(dest_path): + file_name = self.get_file_name(dest_path) + temp_local_file = "./tmp/" + file_name + self._local_fs_client.remove(temp_local_file) + org_content = "" + if mode.find('a') >= 0: + org_content = self._hdfs_client.cat(dest_path) + content = content + org_content + self._local_fs_client.write(content, temp_local_file, + mode) # fleet hdfs_client only support upload, so write tmp file + self._hdfs_client.delete(dest_path + ".tmp") + self._hdfs_client.upload(dest_path + ".tmp", temp_local_file) + self._hdfs_client.delete(dest_path + ".bak") + self._hdfs_client.rename(dest_path, dest_path + '.bak') + self._hdfs_client.rename(dest_path + ".tmp", dest_path) + else: + self._local_fs_client.write(content, dest_path, mode) + + def cat(self, path): + """R + """ + if is_afs_path(path): + hdfs_cat = self._hdfs_client.cat(path) + return hdfs_cat + else: + return self._local_fs_client.cat(path) + + def ls(self, path): + """R + """ + files = [] + if is_afs_path(path): + files = self._hdfs_client.ls(path) + files = [path + '/' + self.get_file_name(fi) for fi in files] # absulte path + else: + files = self._local_fs_client.ls(path) + files = [path + '/' + fi for fi in files] # absulte path + return files + + def cp(self, org_path, dest_path): + """R + """ + org_is_afs = is_afs_path(org_path) + dest_is_afs = is_afs_path(dest_path) + if not org_is_afs and not dest_is_afs: + return self._local_fs_client.cp(org_path, dest_path) + if not org_is_afs and dest_is_afs: + return self._hdfs_client.upload(dest_path, org_path) + if org_is_afs and not dest_is_afs: + return self._hdfs_client.download(org_path, dest_path) + print("Not Suppor hdfs cp currently") diff --git a/core/utils/table.py b/core/utils/table.py new file mode 100755 index 000000000..558cd26d6 --- /dev/null +++ b/core/utils/table.py @@ -0,0 +1,40 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class TableMeta(object): + """ + Simple ParamTable Meta, Contain table_id + """ + TableId = 1 + + @staticmethod + def alloc_new_table(table_id): + """ + create table with table_id + Args: + table_id(int) + Return: + table(TableMeta) : a TableMeta instance with table_id + """ + if table_id < 0: + table_id = TableMeta.TableId + if table_id >= TableMeta.TableId: + TableMeta.TableId += 1 + table = TableMeta(table_id) + return table + + def __init__(self, table_id): + """ """ + self._table_id = table_id diff --git a/core/utils/util.py b/core/utils/util.py new file mode 100755 index 000000000..bd6328487 --- /dev/null +++ b/core/utils/util.py @@ -0,0 +1,327 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +import time + +from paddle import fluid + +from paddlerec.core.utils import fs as fs + + +def save_program_proto(path, program=None): + if program is None: + _program = fluid.default_main_program() + else: + _program = program + + with open(path, "wb") as f: + f.write(_program.desc.serialize_to_string()) + + +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise ValueError('Boolean value expected.') + + +def run_which(command): + regex = "/usr/bin/which: no {} in" + ret = run_shell_cmd("which {}".format(command)) + if ret.startswith(regex.format(command)): + return None + else: + return ret + + +def run_shell_cmd(command): + assert command is not None and isinstance(command, str) + return os.popen(command).read().strip() + + +def get_env_value(env_name): + """ + get os environment value + """ + return os.popen("echo -n ${" + env_name + "}").read().strip() + + +def now_time_str(): + """ + get current format str_time + """ + return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:" + + +def get_absolute_path(path, params): + """R + """ + if path.startswith('afs:') or path.startswith('hdfs:'): + sub_path = path.split('fs:')[1] + if ':' in sub_path: # such as afs://xxx:prot/xxxx + return path + elif 'fs_name' in params: + return params['fs_name'] + sub_path + else: + return path + + +def make_datetime(date_str, fmt=None): + """ + create a datetime instance by date_string + Args: + date_str: such as 2020-01-14 + date_str_format: "%Y-%m-%d" + Return: + datetime + """ + if fmt is None: + if len(date_str) == 8: # %Y%m%d + return datetime.datetime.strptime(date_str, '%Y%m%d') + if len(date_str) == 12: # %Y%m%d%H%M + return datetime.datetime.strptime(date_str, '%Y%m%d%H%M') + return datetime.datetime.strptime(date_str, fmt) + + +def rank0_print(log_str): + """R + """ + print_log(log_str, {'master': True}) + + +def print_cost(cost, params): + """R + """ + log_str = params['log_format'] % cost + print_log(log_str, params) + return log_str + + +class CostPrinter(object): + """ + For count cost time && print cost log + """ + + def __init__(self, callback, callback_params): + """R + """ + self.reset(callback, callback_params) + pass + + def __del__(self): + """R + """ + if not self._done: + self.done() + pass + + def reset(self, callback, callback_params): + """R + """ + self._done = False + self._callback = callback + self._callback_params = callback_params + self._begin_time = time.time() + pass + + def done(self): + """R + """ + cost = time.time() - self._begin_time + log_str = self._callback(cost, self._callback_params) # cost(s) + self._done = True + return cost, log_str + + +class PathGenerator(object): + """ + generate path with template & runtime variables + """ + + def __init__(self, config): + """R + """ + self._templates = {} + self.add_path_template(config) + pass + + def add_path_template(self, config): + """R + """ + if 'templates' in config: + for template in config['templates']: + self._templates[template['name']] = template['template'] + pass + + def generate_path(self, template_name, param): + """R + """ + if template_name in self._templates: + if 'time_format' in param: + str = param['time_format'].strftime(self._templates[template_name]) + return str.format(**param) + return self._templates[template_name].format(**param) + else: + return "" + + +class TimeTrainPass(object): + """ + timely pass + define pass time_interval && start_time && end_time + """ + + def __init__(self, global_config): + """R + """ + self._config = global_config['epoch'] + if '+' in self._config['days']: + day_str = self._config['days'].replace(' ', '') + day_fields = day_str.split('+') + self._begin_day = make_datetime(day_fields[0].strip()) + if len(day_fields) == 1 or len(day_fields[1]) == 0: + # 100 years, meaning to continuous running + self._end_day = self._begin_day + datetime.timedelta(days=36500) + else: + # example: 2020212+10 + run_day = int(day_fields[1].strip()) + self._end_day = self._begin_day + datetime.timedelta(days=run_day) + else: + # example: {20191001..20191031} + days = os.popen("echo -n " + self._config['days']).read().split(" ") + self._begin_day = make_datetime(days[0]) + self._end_day = make_datetime(days[len(days) - 1]) + self._checkpoint_interval = self._config['checkpoint_interval'] + self._dump_inference_interval = self._config['dump_inference_interval'] + self._interval_per_pass = self._config['train_time_interval'] # train N min data per pass + + self._pass_id = 0 + self._inference_pass_id = 0 + self._pass_donefile_handler = None + if 'pass_donefile_name' in self._config: + self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name'] + if fs.is_afs_path(self._train_pass_donefile): + self._pass_donefile_handler = fs.FileHandler(global_config['io']['afs']) + else: + self._pass_donefile_handler = fs.FileHandler(global_config['io']['local_fs']) + + last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1] + done_fileds = last_done.split('\t') + if len(done_fileds) > 4: + self._base_key = done_fileds[1] + self._checkpoint_model_path = done_fileds[2] + self._checkpoint_pass_id = int(done_fileds[3]) + self._inference_pass_id = int(done_fileds[4]) + self.init_pass_by_id(done_fileds[0], self._checkpoint_pass_id) + + def max_pass_num_day(self): + """R + """ + return 24 * 60 / self._interval_per_pass + + def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint): + """R + """ + if is_checkpoint: + self._checkpoint_pass_id = pass_id + self._checkpoint_model_path = model_path + done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key, + self._checkpoint_model_path, self._checkpoint_pass_id, pass_id) + self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a') + pass + + def init_pass_by_id(self, date_str, pass_id): + """ + init pass context with pass_id + Args: + date_str: example "20200110" + pass_id(int): pass_id of date + """ + date_time = make_datetime(date_str) + if pass_id < 1: + pass_id = 0 + if (date_time - self._begin_day).total_seconds() > 0: + self._begin_day = date_time + self._pass_id = pass_id + mins = self._interval_per_pass * (pass_id - 1) + self._current_train_time = date_time + datetime.timedelta(minutes=mins) + + def init_pass_by_time(self, datetime_str): + """ + init pass context with datetime + Args: + date_str: example "20200110000" -> "%Y%m%d%H%M" + """ + self._current_train_time = make_datetime(datetime_str) + minus = self._current_train_time.hour * 60 + self._current_train_time.minute + self._pass_id = minus / self._interval_per_pass + 1 + + def current_pass(self): + """R + """ + return self._pass_id + + def next(self): + """R + """ + has_next = True + old_pass_id = self._pass_id + if self._pass_id < 1: + self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M")) + else: + next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass) + if (next_time - self._end_day).total_seconds() > 0: + has_next = False + else: + self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M")) + if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id): + self._inference_pass_id = self._pass_id - 1 + return has_next + + def is_checkpoint_pass(self, pass_id): + """R + """ + if pass_id < 1: + return True + if pass_id == self.max_pass_num_day(): + return False + if pass_id % self._checkpoint_interval == 0: + return True + return False + + def need_dump_inference(self, pass_id): + """R + """ + return self._inference_pass_id < pass_id and pass_id % self._dump_inference_interval == 0 + + def date(self, delta_day=0): + """ + get train date + Args: + delta_day(int): n day afer current_train_date + Return: + date(current_train_time + delta_day) + """ + return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d") + + def timestamp(self, delta_day=0): + """R + """ + return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp() diff --git a/doc/.DS_Store b/doc/.DS_Store new file mode 100644 index 000000000..5e5553cf5 Binary files /dev/null and b/doc/.DS_Store differ diff --git a/doc/__init__.py b/doc/__init__.py new file mode 100755 index 000000000..e69de29bb diff --git a/doc/benchmark.md b/doc/benchmark.md new file mode 100644 index 000000000..b16e26c71 --- /dev/null +++ b/doc/benchmark.md @@ -0,0 +1,2 @@ +# PaddleRec Benchmark +> 占位 \ No newline at end of file diff --git a/doc/contribute.md b/doc/contribute.md new file mode 100644 index 000000000..a9bd19100 --- /dev/null +++ b/doc/contribute.md @@ -0,0 +1,2 @@ +# PaddleRec 贡献代码 +> 占位 \ No newline at end of file diff --git a/doc/custom_dataset_reader.md b/doc/custom_dataset_reader.md new file mode 100644 index 000000000..82b0fd12d --- /dev/null +++ b/doc/custom_dataset_reader.md @@ -0,0 +1,432 @@ +# PaddleRec 推荐数据集格式 + +当你的数据集格式为[slot:feasign]*这种模式,或者可以预处理为这种格式时,可以直接使用PaddleRec内置的Reader。 +好处是不用自己写Reader了,各个model之间的数据格式也都可以统一成一样的格式。 + +## 数据格式说明 + +假如你的原始数据格式为 + +```bash +