Skip to content

Commit ccce04f

Browse files
authored
Merge pull request #278 from Yancey1989/fitaline_ft
Run fit-a-line demo with fault tolerant mode
2 parents 3e232f3 + 93969de commit ccce04f

File tree

5 files changed

+104
-15
lines changed

5 files changed

+104
-15
lines changed

demo/fit_a_line/train_ft.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import paddle.v2 as paddle
2+
import os
3+
import gzip
4+
from paddle.v2.reader.creator import cloud_reader
5+
import paddle.v2.dataset.uci_housing as uci_housing
6+
7+
etcd_ip = os.getenv("ETCD_IP")
8+
etcd_endpoint = "http://" + etcd_ip + ":" + "2379"
9+
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
10+
11+
def main():
12+
# init
13+
paddle.init()
14+
15+
# network config
16+
x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13))
17+
y_predict = paddle.layer.fc(input=x, size=1, act=paddle.activation.Linear())
18+
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
19+
cost = paddle.layer.mse_cost(input=y_predict, label=y)
20+
21+
# create parameters
22+
parameters = paddle.parameters.create(cost)
23+
24+
# create optimizer
25+
optimizer = paddle.optimizer.Momentum(momentum=0)
26+
27+
trainer = paddle.trainer.SGD(
28+
cost=cost,
29+
parameters=parameters,
30+
update_equation=optimizer,
31+
is_local=False,
32+
pserver_spec=etcd_endpoint,
33+
use_etcd=True)
34+
35+
feeding = {'x': 0, 'y': 1}
36+
37+
# event_handler to print training and testing info
38+
def event_handler(event):
39+
if isinstance(event, paddle.event.EndIteration):
40+
if event.batch_id % 100 == 0:
41+
print "Pass %d, Batch %d, Cost %f" % (
42+
event.pass_id, event.batch_id, event.cost)
43+
44+
if isinstance(event, paddle.event.EndPass):
45+
result = trainer.test(
46+
reader=paddle.batch(uci_housing.test(), batch_size=2),
47+
feeding=feeding)
48+
print "Test %d, Cost %f" % (event.pass_id, result.cost)
49+
if trainer_id == "0":
50+
with gzip.open("fit-a-line_pass_%05d.tar.gz" % event.pass_id,
51+
"w") as f:
52+
parameters.to_tar(f)
53+
# training
54+
trainer.train(
55+
reader=paddle.batch(
56+
paddle.reader.shuffle(cloud_reader(
57+
["/pfs/dlnel/public/dataset/uci_housing/uci_housing_train-*"],
58+
etcd_endpoint), buf_size=500),
59+
batch_size=2),
60+
feeding=feeding,
61+
event_handler=event_handler,
62+
num_passes=30)
63+
64+
65+
if __name__ == '__main__':
66+
main()

docker/k8s_tools.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,10 @@ def fetch_pserver_ips():
4242
return ",".join(pserver_ips)
4343

4444
def fetch_master_ip():
45-
while True:
46-
label_selector = "paddle-job-master=%s" % PADDLE_JOB_NAME
47-
pod_list = fetch_pods_info(label_selector)
48-
master_ip = ""
49-
if len(pod_list) >=1:
50-
master_ip = pod_list[0][1]
51-
if master_ip:
52-
return master_ip
53-
time.sleep(5)
45+
label_selector = "paddle-job-master=%s" % PADDLE_JOB_NAME
46+
pod_list = fetch_pods_info(label_selector)
47+
master_ips = [item[1] for item in pod_list]
48+
return master_ips[0]
5449

5550
def fetch_trainer_id():
5651
label_selector = "paddle-job=%s" % PADDLE_JOB_NAME

docker/paddle_k8s

+28-1
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ start_pserver() {
1212
}
1313

1414
start_new_pserver() {
15+
stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-master=${PADDLE_JOB_NAME} 1
1516
export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip)
1617
stdbuf -oL /usr/bin/pserver \
1718
-port=$PADDLE_INIT_PORT \
1819
-num-pservers=$PSERVERS \
1920
-log-level=debug \
20-
-etcd-endpoint=http://$PADDLE_INIT_MASTER_IP:2379
21+
-etcd-endpoint=http://$MASTER_IP:2379
2122
}
2223

2324
start_master() {
@@ -43,6 +44,27 @@ check_trainer_ret() {
4344
exit $ret
4445
}
4546

47+
start_new_trainer() {
48+
stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-master=${PADDLE_JOB_NAME} 1
49+
# FIXME: use etcd lock instead of trainer id
50+
stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job=${PADDLE_JOB_NAME} ${TRAINERS}
51+
52+
export MASTER_IP=$(python /root/k8s_tools.py fetch_master_ip)
53+
export ETCD_IP="$MASTER_IP"
54+
export PADDLE_INIT_TRAINER_ID=$(python /root/k8s_tools.py fetch_trainer_id)
55+
56+
# NOTE: $TRAINER_PACKAGE may be large, do not copy
57+
export PYTHONPATH=$TRAINER_PACKAGE:$PYTHONPATH
58+
cd $TRAINER_PACKAGE
59+
60+
stdbuf -oL echo "Starting training job: " $TRAINER_PACKAGE, "num_gradient_servers:" \
61+
$PADDLE_INIT_NUM_GRADIENT_SERVERS, "trainer_id: " $PADDLE_INIT_TRAINER_ID, \
62+
"version: " $1
63+
64+
stdbuf -oL sh -c "${ENTRY}"
65+
check_trainer_ret $?
66+
}
67+
4668
start_trainer() {
4769
stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job-pserver=${PADDLE_JOB_NAME} ${PSERVERS}
4870
stdbuf -oL python /root/k8s_tools.py wait_pods_running paddle-job=${PADDLE_JOB_NAME} ${TRAINERS}
@@ -98,6 +120,8 @@ usage() {
98120
echo "usage: paddle_k8s [<args>]:"
99121
echo " start_trainer [v1|v2] Start a trainer process with v1 or v2 API"
100122
echo " start_pserver Start a pserver process"
123+
echo " start_new_pserver Start a new pserver process"
124+
echo " start_new_trainer Start a new triner process"
101125
}
102126

103127
case "$1" in
@@ -107,6 +131,9 @@ case "$1" in
107131
start_trainer)
108132
start_trainer $2
109133
;;
134+
start_new_trainer)
135+
start_new_trainer
136+
;;
110137
start_new_pserver)
111138
start_new_pserver
112139
;;

paddlecloud/paddlejob/paddle_job.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(self,
2727
volumes=[],
2828
registry_secret=None,
2929
envs = {},
30-
new_pserver=True,
30+
fault_tolerant=False,
3131
etcd_image="quay.io/coreos/etcd:v3.2.1"):
3232

3333
self._ports_num=1
@@ -54,7 +54,7 @@ def __init__(self,
5454
self._mastercpu = 1
5555
self._mastermemory = "300Mi"
5656
# use new pserver for tolerant
57-
self._new_pserver = new_pserver
57+
self._fault_tolerant = fault_tolerant
5858
self._etcd_image = etcd_image
5959

6060
@property
@@ -92,7 +92,6 @@ def get_env(self):
9292
envs.append({"name":"PADDLE_INIT_PORTS_NUM_FOR_SPARSE", "value":str(self._ports_num_for_sparse)})
9393
envs.append({"name":"PADDLE_INIT_NUM_GRADIENT_SERVERS", "value":str(self._num_gradient_servers)})
9494
envs.append({"name":"PADDLE_INIT_NUM_PASSES", "value":str(self._passes)})
95-
9695
if self._gpu:
9796
envs.append({"name":"PADDLE_INIT_USE_GPU", "value":str("1")})
9897
# HACK: add nvidia lib LD_LIBRARY_PATH for all pods
@@ -131,13 +130,15 @@ def _get_master_entrypoint(self):
131130
return ["paddle_k8s", "start_master"]
132131

133132
def _get_pserver_entrypoint(self):
134-
if not self._new_pserver:
133+
if not self._fault_tolerant:
135134
return ["paddle_k8s", "start_pserver"]
136135
else:
137136
return ["paddle_k8s", "start_new_pserver"]
138137

139138
def _get_trainer_entrypoint(self):
140139
if self._entry:
140+
if self._fault_tolerant:
141+
return ["paddle_k8s", "start_new_trainer"]
141142
return ["paddle_k8s", "start_trainer", "v2"]
142143
return ["paddle_k8s", "start_trainer", "v1"]
143144

paddlecloud/paddlejob/views.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def post(self, request, format=None):
146146
registry_secret = registry_secret,
147147
volumes = volumes,
148148
envs = envs,
149-
new_pserver = fault_tolerant,
149+
fault_tolerant = fault_tolerant,
150150
etcd_image = settings.ETCD_IMAGE
151151
)
152152
# ========== submit master ReplicaSet if using fault_tolerant feature ==

0 commit comments

Comments
 (0)