Skip to content

Commit 66fb66c

Browse files
ShenYuhannepeplwu
authored andcommitted
add single process app() (#212)
* add single_app()
1 parent 1a598b6 commit 66fb66c

File tree

6 files changed

+538
-44
lines changed

6 files changed

+538
-44
lines changed

paddlehub/commands/serving.py

+68-11
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
from __future__ import print_function
1919

2020
import argparse
21-
import subprocess
22-
import shlex
2321
import os
22+
import platform
23+
import socket
2424
import json
2525
import paddlehub as hub
2626
from paddlehub.commands.base_command import BaseCommand, ENTRY
27-
from paddlehub.serving import app
2827

2928

3029
class ServingCommand(BaseCommand):
@@ -41,33 +40,56 @@ def __init__(self, name):
4140
usage='%(prog)s',
4241
add_help=True)
4342
self.parser.add_argument("command")
43+
self.parser.add_argument("sub_command")
4444
self.sub_parse = self.parser.add_mutually_exclusive_group(
4545
required=False)
46-
self.sub_parse.add_argument("--start", action="store_true")
4746
self.parser.add_argument(
4847
"--use_gpu", action="store_true", default=False)
48+
self.parser.add_argument(
49+
"--use_multiprocess", action="store_true", default=False)
4950
self.parser.add_argument("--modules", "-m", nargs="+")
5051
self.parser.add_argument("--config", "-c", nargs="+")
51-
self.parser.add_argument("--port", "-p", nargs="+", default=[8888])
52+
self.parser.add_argument("--port", "-p", nargs="+", default=[8866])
53+
54+
@staticmethod
55+
def is_port_occupied(ip, port):
56+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57+
try:
58+
s.connect((ip, int(port)))
59+
s.shutdown(2)
60+
return True
61+
except:
62+
return False
5263

5364
@staticmethod
5465
def preinstall_modules(modules):
5566
configs = []
67+
module_exist = {}
5668
if modules is not None:
5769
for module in modules:
5870
module_name = module if "==" not in module else \
5971
module.split("==")[0]
6072
module_version = None if "==" not in module else \
6173
module.split("==")[1]
74+
if module_exist.get(module_name, "") != "":
75+
print(module_name, "==", module_exist.get(module_name),
76+
" will be ignored cause new version is specified.")
77+
configs.pop()
78+
module_exist.update({module_name: module_version})
6279
try:
6380
m = hub.Module(name=module_name, version=module_version)
81+
method_name = m.desc.attr.map.data['default_signature'].s
82+
if method_name == "":
83+
raise RuntimeError("{} cannot be use for "
84+
"predicting".format(module_name))
6485
configs.append({
6586
"module": module_name,
6687
"version": m.version,
6788
"category": str(m.type).split("/")[0].upper()
6889
})
6990
except Exception as err:
70-
pass
91+
print(err, ", start Hub-Serving unsuccessfully.")
92+
exit(1)
7193
return configs
7294

7395
@staticmethod
@@ -78,8 +100,24 @@ def start_serving(args):
78100
if os.path.exists(config_file):
79101
with open(config_file, "r") as fp:
80102
configs = json.load(fp)
103+
use_multiprocess = configs.get("use_multiprocess", False)
104+
if use_multiprocess is True:
105+
if platform.system() == "Windows":
106+
print(
107+
"Warning: Windows cannot use multiprocess working "
108+
"mode, Hub-Serving will switch to single process mode"
109+
)
110+
from paddlehub.serving import app_single as app
111+
else:
112+
from paddlehub.serving import app
113+
else:
114+
from paddlehub.serving import app_single as app
81115
use_gpu = configs.get("use_gpu", False)
82-
port = configs.get("port", 8888)
116+
port = configs.get("port", 8866)
117+
if ServingCommand.is_port_occupied("127.0.0.1",
118+
port) is True:
119+
print("Port %s is occupied, please change it." % (port))
120+
return False
83121
configs = configs.get("modules_info")
84122
module = [
85123
str(i["module"]) + "==" + str(i["version"])
@@ -92,10 +130,23 @@ def start_serving(args):
92130
else:
93131
print("config_file ", config_file, "not exists.")
94132
else:
133+
if args.use_multiprocess is True:
134+
if platform.system() == "Windows":
135+
print(
136+
"Warning: Windows cannot use multiprocess working "
137+
"mode, Hub-Serving will switch to single process mode")
138+
from paddlehub.serving import app_single as app
139+
else:
140+
from paddlehub.serving import app
141+
else:
142+
from paddlehub.serving import app_single as app
95143
module = args.modules
96144
if module is not None:
97145
use_gpu = args.use_gpu
98146
port = args.port[0]
147+
if ServingCommand.is_port_occupied("127.0.0.1", port) is True:
148+
print("Port %s is occupied, please change it." % (port))
149+
return False
99150
module_info = ServingCommand.preinstall_modules(module)
100151
[
101152
item.update({
@@ -111,9 +162,10 @@ def start_serving(args):
111162
def show_help():
112163
str = "serving <option>\n"
113164
str += "\tManage PaddleHub-Serving.\n"
114-
str += "option:\n"
115-
str += "--start\n"
165+
str += "sub command:\n"
166+
str += "start\n"
116167
str += "\tStart PaddleHub-Serving if specifies this parameter.\n"
168+
str += "option:\n"
117169
str += "--modules/-m [module1==version, module2==version...]\n"
118170
str += "\tPre-install modules via this parameter list.\n"
119171
str += "--port/-p XXXX\n"
@@ -126,8 +178,13 @@ def show_help():
126178
print(str)
127179

128180
def execute(self, argv):
129-
args = self.parser.parse_args()
130-
if args.start is True:
181+
try:
182+
args = self.parser.parse_args()
183+
except:
184+
print("Please refer to the instructions below.")
185+
ServingCommand.show_help()
186+
return False
187+
if args.sub_command == "start":
131188
ServingCommand.start_serving(args)
132189
else:
133190
ServingCommand.show_help()

paddlehub/module/module.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,20 @@ def _init_with_name(self, name, version=None):
155155
module_name=name, module_version=version, extra=extra)
156156
if not result:
157157
logger.error(tips)
158-
exit(1)
159-
logger.info(tips)
160-
self._init_with_module_file(module_dir[0])
158+
raise RuntimeError(tips)
159+
else:
160+
logger.info(tips)
161+
self._init_with_module_file(module_dir[0])
161162

162163
def _init_with_url(self, url):
163164
utils.check_url(url)
164165
result, tips, module_dir = default_downloader.download_file_and_uncompress(
165166
url, save_path=".")
166167
if not result:
167168
logger.error(tips)
168-
exit(1)
169-
self._init_with_module_file(module_dir)
169+
raise RuntimeError(tips)
170+
else:
171+
self._init_with_module_file(module_dir)
170172

171173
def _dump_processor(self):
172174
import inspect

paddlehub/serving/app.py

+53-17
Original file line numberDiff line numberDiff line change
@@ -185,24 +185,51 @@ def index():
185185

186186
@app_instance.before_request
187187
def before_request():
188-
request.data = {"id": str(time.time())}
188+
request.data = {"id": utils.md5(request.remote_addr + str(time.time()))}
189+
print(request.remote_addr)
189190
pass
190191

192+
@app_instance.route("/get/modules", methods=["GET", "POST"])
193+
def get_modules_info():
194+
global nlp_module, cv_module
195+
module_info = {}
196+
if len(nlp_module) > 0:
197+
module_info.update({"nlp_module": [{"Choose...": "Choose..."}]})
198+
for item in nlp_module:
199+
module_info["nlp_module"].append({item: item})
200+
if len(cv_module) > 0:
201+
module_info.update({"cv_module": [{"Choose...": "Choose..."}]})
202+
for item in cv_module:
203+
module_info["cv_module"].append({item: item})
204+
module_info.update({"Choose...": [{"请先选择分类": "Choose..."}]})
205+
return {"module_info": module_info}
206+
191207
@app_instance.route("/predict/image/<module_name>", methods=["POST"])
192208
def predict_iamge(module_name):
193209
global results_dict
194210
req_id = request.data.get("id")
195-
img_base64 = request.form.get("input_img", "")
196-
received_file_name = request.form.get("input_file", "")
197-
ext = received_file_name.split(".")[-1]
198-
if ext == "":
199-
return {"result": "Unrecognized file type"}
211+
212+
img_base64 = request.form.get("image", "")
213+
if img_base64 != "":
214+
img_base64 = request.form.get("image", "")
215+
ext = img_base64.split(";")[0].split("/")[-1]
216+
if ext not in ["jpeg", "jpg", "png"]:
217+
return {"result": "Unrecognized file type"}
218+
filename = utils.md5(str(time.time()) + str(img_base64)) + "." + ext
219+
base64_head = img_base64.split(',')[0]
220+
img_data = base64.b64decode(img_base64.split(',')[-1])
221+
with open(filename, "wb") as fp:
222+
fp.write(img_data)
223+
else:
224+
file = request.files["image"]
225+
filename = file.filename
226+
ext = file.filename.split(".")[-1]
227+
if ext not in ["jpeg", "jpg", "png"]:
228+
return {"result": "Unrecognized file type"}
229+
base64_head = "data:image/" + ext + ";base64"
230+
filename = utils.md5(filename) + '.' + ext
231+
file.save(filename)
200232
score = time.time()
201-
filename = utils.md5(str(time.time()) + str(img_base64)) + "." + ext
202-
base64_head = img_base64.split(',')[0]
203-
img_data = base64.b64decode(img_base64.split(',')[-1])
204-
with open(filename, "wb") as fp:
205-
fp.write(img_data)
206233
file_list = [filename]
207234
if queues_dict[module_name].qsize(
208235
) + 1 > queues_dict[module_name].get_attribute("maxsize"):
@@ -211,9 +238,14 @@ def predict_iamge(module_name):
211238
data_num = len(file_list)
212239
results = []
213240
result_len = 0
241+
start_time = time.time()
214242
while result_len != data_num:
215243
result_len = len(results_dict.get(req_id, []))
244+
if time.time() - start_time > time_out:
245+
results_dict.pop(req_id, None)
246+
return {"result": "Request time out."}
216247
results = results_dict.get(req_id)
248+
results_dict.pop(req_id, None)
217249
results = [i[1] for i in sorted(results, key=lambda k: k[0])]
218250
filename = results[0].get("path")
219251
ext = filename.split(".")[-1]
@@ -225,7 +257,7 @@ def predict_iamge(module_name):
225257
os.remove(filename)
226258
os.remove(output_file)
227259
results = {
228-
"border":
260+
"desc":
229261
str(results[0]["data"]),
230262
"output_img":
231263
base64_head + "," + str(output_img_base64).replace(
@@ -244,7 +276,7 @@ def data_2_item(data_list, req_id, score, module_name):
244276
def predict_text(module_name):
245277
global results_dict, queues_dict
246278
req_id = request.data.get("id")
247-
data_list = request.form.get("input_text")
279+
data_list = request.form.get("text")
248280
score = time.time()
249281
data_list = data_list.splitlines()
250282
data_temp = []
@@ -261,14 +293,17 @@ def predict_text(module_name):
261293
if data_num + queues_dict[module_name].qsize(
262294
) > queues_dict[module_name].get_attribute("maxsize"):
263295
return {"result": "Too many visitors now, please come back later."}
264-
start = time.time()
265-
266296
data_2_item(data_list, req_id, score, module_name)
267297
results = []
268298
result_len = 0
299+
start_time = time.time()
269300
while result_len != data_num:
270301
result_len = len(results_dict.get(req_id, []))
302+
if time.time() - start_time > time_out:
303+
results_dict.pop(req_id, None)
304+
return {"result": "Request time out."}
271305
results = results_dict.get(req_id)
306+
results_dict.pop(req_id, None)
272307
results = [i[1] for i in sorted(results, key=lambda k: k[0])]
273308
return {"result": results}
274309

@@ -302,8 +337,9 @@ def config_with_file(configs):
302337
queue_name_list.append(item["module"])
303338

304339

305-
def run(is_use_gpu=False, configs=None, port=8888):
306-
global use_gpu
340+
def run(is_use_gpu=False, configs=None, port=8866, timeout=60):
341+
global use_gpu, time_out
342+
time_out = timeout
307343
use_gpu = is_use_gpu
308344
if configs is not None:
309345
config_with_file(configs)

0 commit comments

Comments
 (0)