33import threading
44import errno
55import logging
6- from typing import Any , Dict , List , Mapping , Text , TypeVar , Union
6+ import select
7+ import os
78
9+ import cStringIO
10+ from cStringIO import StringIO
11+ from typing import Any , Dict , List , Mapping , Text , TypeVar , Union
12+ from pkg_resources import resource_stream
813
914class JavascriptException (Exception ):
1015 pass
1116
1217_logger = logging .getLogger ("cwltool" )
1318
14- JSON = Union [Dict [Any ,Any ], List [Any ], Text , int , long , float , bool , None ]
19+ JSON = Union [Dict [Text ,Any ], List [Any ], Text , int , long , float , bool , None ]
20+
21+ localdata = threading .local ()
1522
1623have_node_slim = False
1724
18- def execjs (js , jslib , timeout = None ): # type: (Union[Mapping, Text], Any, int) -> JSON
25+ def new_js_proc ():
26+ # type: () -> subprocess.Popen
27+
28+ res = resource_stream (__name__ , 'cwlNodeEngine.js' )
29+ nodecode = res .read ()
30+
1931 nodejs = None
2032 trynodes = ("nodejs" , "node" )
2133 for n in trynodes :
2234 try :
23- nodejs = subprocess .Popen ([n ], stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
35+ nodejs = subprocess .Popen ([n , "--eval" , nodecode ], stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
2436 break
2537 except OSError as e :
2638 if e .errno == errno .ENOENT :
@@ -39,7 +51,7 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -
3951 nodejs = subprocess .Popen (["docker" , "run" ,
4052 "--attach=STDIN" , "--attach=STDOUT" , "--attach=STDERR" ,
4153 "--sig-proxy=true" , "--interactive" ,
42- "--rm" , nodeimg ],
54+ "--rm" , nodeimg , "node" , "--eval" , nodecode ],
4355 stdin = subprocess .PIPE , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
4456 except OSError as e :
4557 if e .errno == errno .ENOENT :
@@ -55,15 +67,24 @@ def execjs(js, jslib, timeout=None): # type: (Union[Mapping, Text], Any, int) -
5567 "expressions, but couldn't find it. Tried %s, docker run "
5668 "node:slim" % u", " .join (trynodes ))
5769
70+ return nodejs
71+
72+
73+ def execjs (js , jslib , timeout = None ): # type: (Union[Mapping, Text], Any, int) -> JSON
74+
75+ if not hasattr (localdata , "proc" ) or localdata .proc .poll () is not None :
76+ localdata .proc = new_js_proc ()
77+
78+ nodejs = localdata .proc
79+
5880 fn = u"\" use strict\" ;\n %s\n (function()%s)()" % (jslib , js if isinstance (js , basestring ) and len (js ) > 1 and js [0 ] == '{' else ("{return (%s);}" % js ))
59- script = u"console.log(JSON.stringify(require(\" vm\" ).runInNewContext(%s, {})));\n " % json .dumps (fn )
6081
6182 killed = []
6283
6384 def term ():
6485 try :
65- nodejs .kill ()
6686 killed .append (True )
87+ nodejs .kill ()
6788 except OSError :
6889 pass
6990
@@ -73,17 +94,44 @@ def term():
7394 tm = threading .Timer (timeout , term )
7495 tm .start ()
7596
76- stdoutdata , stderrdata = nodejs .communicate (script )
97+ stdin_buf = StringIO (json .dumps (fn )+ "\n " )
98+ stdout_buf = StringIO ()
99+ stderr_buf = StringIO ()
100+
101+ completed = [] # type: List[Union[cStringIO.InputType, cStringIO.OutputType]]
102+ while len (completed ) < 3 :
103+ rready , wready , _ = select .select ([nodejs .stdout , nodejs .stderr ], [nodejs .stdin ], [])
104+ if nodejs .stdin in wready :
105+ b = stdin_buf .read (select .PIPE_BUF )
106+ if b :
107+ os .write (nodejs .stdin .fileno (), b )
108+ elif stdin_buf not in completed :
109+ completed .append (stdin_buf )
110+ for pipes in ((nodejs .stdout , stdout_buf ), (nodejs .stderr , stderr_buf )):
111+ if pipes [0 ] in rready :
112+ b = os .read (pipes [0 ].fileno (), select .PIPE_BUF )
113+ if b :
114+ pipes [1 ].write (b )
115+ elif pipes [1 ] not in completed :
116+ completed .append (pipes [1 ])
117+ if stdout_buf .getvalue ().endswith ("\n " ):
118+ for buf in (stdout_buf , stderr_buf ):
119+ if buf not in completed :
120+ completed .append (buf )
77121 tm .cancel ()
78122
123+ stdin_buf .close ()
124+ stdoutdata = stdout_buf .getvalue ()
125+ stderrdata = stderr_buf .getvalue ()
126+
79127 def fn_linenum (): # type: () -> Text
80128 return u"\n " .join (u"%04i %s" % (i + 1 , b ) for i , b in enumerate (fn .split ("\n " )))
81129
82- if killed :
83- raise JavascriptException ( u"Long-running script killed after %s seconds. \n script was: \n %s \n " % ( timeout , fn_linenum ()))
84-
85- if nodejs . returncode != 0 :
86- raise JavascriptException (u"Returncode was: %s\n script was:\n %s\n stdout was: '%s'\n stderr was: '%s'\n " % (nodejs .returncode , fn_linenum (), stdoutdata , stderrdata ))
130+ if nodejs . poll () not in ( None , 0 ) :
131+ if killed :
132+ raise JavascriptException ( u"Long-running script killed after %s seconds. \n script was: \n %s \n " % ( timeout , fn_linenum ()))
133+ else :
134+ raise JavascriptException (u"Returncode was: %s\n script was:\n %s\n stdout was: '%s'\n stderr was: '%s'\n " % (nodejs .returncode , fn_linenum (), stdoutdata , stderrdata ))
87135 else :
88136 try :
89137 return json .loads (stdoutdata )
0 commit comments