3 from __future__
import print_function
8 from subprocess
import Popen
, STDOUT
, PIPE
9 from select
import select
11 # Pseudo-TTY and terminal manipulation
12 import pty
, array
, fcntl
, termios
14 IS_PY_3
= sys
.version_info
[0] == 3
21 debug_file
.write(data
)
24 def log(data
, end
='\n'):
26 log_file
.write(data
+ end
)
31 # TODO: do we need to support '\n' too
35 parser
= argparse
.ArgumentParser(
36 description
="Run a test file against a Mal implementation")
37 parser
.add_argument('--rundir',
38 help="change to the directory before running tests")
39 parser
.add_argument('--start-timeout', default
=10, type=int,
40 help="default timeout for initial prompt")
41 parser
.add_argument('--test-timeout', default
=20, type=int,
42 help="default timeout for each individual test action")
43 parser
.add_argument('--pre-eval', default
=None, type=str,
44 help="Mal code to evaluate prior to running the test")
45 parser
.add_argument('--no-pty', action
='store_true',
46 help="Use direct pipes instead of pseudo-tty")
47 parser
.add_argument('--log-file', type=str,
48 help="Write messages to the named file in addition the screen")
49 parser
.add_argument('--debug-file', type=str,
50 help="Write all test interaction the named file")
51 parser
.add_argument('--hard', action
='store_true',
52 help="Turn soft tests (soft, deferrable, optional) into hard failures")
54 # Control whether deferrable and optional tests are executed
55 parser
.add_argument('--deferrable', dest
='deferrable', action
='store_true',
56 help="Enable deferrable tests that follow a ';>>> deferrable=True'")
57 parser
.add_argument('--no-deferrable', dest
='deferrable', action
='store_false',
58 help="Disable deferrable tests that follow a ';>>> deferrable=True'")
59 parser
.set_defaults(deferrable
=True)
60 parser
.add_argument('--optional', dest
='optional', action
='store_true',
61 help="Enable optional tests that follow a ';>>> optional=True'")
62 parser
.add_argument('--no-optional', dest
='optional', action
='store_false',
63 help="Disable optional tests that follow a ';>>> optional=True'")
64 parser
.set_defaults(optional
=True)
66 parser
.add_argument('test_file', type=str,
67 help="a test file formatted as with mal test data")
68 parser
.add_argument('mal_cmd', nargs
="*",
69 help="Mal implementation command line. Use '--' to "
70 "specify a Mal command line with dashed options.")
71 parser
.add_argument('--crlf', dest
='crlf', action
='store_true',
72 help="Write \\r\\n instead of \\n to the input")
76 """List process currently open FDs and their target """
77 if sys
.platform
!= 'linux2':
78 raise NotImplementedError('Unsupported platform: %s' % sys
.platform
)
81 base
= '/proc/self/fd'
82 for num
in os
.listdir(base
):
85 path
= os
.readlink(os
.path
.join(base
, num
))
86 except OSError as err
:
87 # Last FD is always the "listdir" one (which may be closed)
88 if err
.errno
!= errno
.ENOENT
:
95 def __init__(self
, args
, no_pty
=False, line_break
="\n"):
96 #print "args: %s" % repr(args)
99 # Cleanup child process on exit
100 atexit
.register(self
.cleanup
)
105 env
['INPUTRC'] = '/dev/null'
106 env
['PERL_RL'] = 'false'
107 print("FDS before: %s" % list_fds())
109 self
.p
= Popen(args
, bufsize
=0,
110 stdin
=PIPE
, stdout
=PIPE
, stderr
=STDOUT
,
111 preexec_fn
=os
.setsid
,
112 env
=env
, close_fds
=True)
113 self
.stdin
= self
.p
.stdin
114 self
.stdout
= self
.p
.stdout
116 # provide tty to get 'interactive' readline to work
117 master
, slave
= pty
.openpty()
119 # Set terminal size large so that readline will not send
120 # ANSI/VT escape codes when the lines are long.
121 buf
= array
.array('h', [100, 200, 0, 0])
122 fcntl
.ioctl(master
, termios
.TIOCSWINSZ
, buf
, True)
124 self
.p
= Popen(args
, bufsize
=0,
125 stdin
=slave
, stdout
=slave
, stderr
=STDOUT
,
126 preexec_fn
=os
.setsid
,
127 env
=env
, close_fds
=True)
128 # Now close slave so that we will get an exception from
129 # read when the child exits early
130 # http://stackoverflow.com/questions/11165521
132 self
.stdin
= os
.fdopen(master
, 'r+b', 0)
133 self
.stdout
= self
.stdin
135 print("FDS after: %s" % list_fds())
139 self
.last_prompt
= ""
141 self
.line_break
= line_break
143 def read_to_prompt(self
, prompts
, timeout
):
144 end_time
= time
.time() + timeout
145 while time
.time() < end_time
:
146 [outs
,_
,_
] = select([self
.stdout
], [], [], 1)
147 if self
.stdout
in outs
:
148 new_data
= self
.stdout
.read(1)
149 new_data
= new_data
.decode("utf-8") if IS_PY_3
else new_data
150 #print("new_data: '%s'" % new_data)
152 # Perform newline cleanup
154 self
.buf
+= new_data
.replace("\n", "\r\n")
157 self
.buf
= self
.buf
.replace("\r\r", "\r")
158 # Remove ANSI codes generally
159 #ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')
160 # Remove rustyline ANSI CSI codes:
161 # - [6C - CR + cursor forward
162 # - [6K - CR + erase in line
163 ansi_escape
= re
.compile(r
'\r\x1B\[[0-9]*[CK]')
164 self
.buf
= ansi_escape
.sub('', self
.buf
)
165 for prompt
in prompts
:
166 regexp
= re
.compile(prompt
)
167 match
= regexp
.search(self
.buf
)
170 buf
= self
.buf
[0:match
.start()]
171 self
.buf
= self
.buf
[end
:]
172 self
.last_prompt
= prompt
173 return buf
.replace("^M", "\r")
176 def writeline(self
, str):
178 return bytes(s
, "utf-8") if IS_PY_3
else s
180 self
.stdin
.write(_to_bytes(str.replace('\r', '\x16\r') + self
.line_break
))
186 os
.killpg(self
.p
.pid
, signal
.SIGTERM
)
192 def __init__(self
, test_file
):
194 f
= open(test_file
, newline
='') if IS_PY_3
else open(test_file
)
195 self
.data
= f
.read().split('\n')
197 self
.deferrable
= False
198 self
.optional
= False
208 line
= self
.data
.pop(0)
209 if re
.match(r
"^\s*$", line
): # blank line
211 elif line
[0:3] == ";;;": # ignore comment
213 elif line
[0:2] == ";;": # output comment
216 elif line
[0:5] == ";>>> ": # settings/commands
218 exec(line
[5:], {}, settings
)
219 if 'soft' in settings
:
220 self
.soft
= settings
['soft']
221 if 'deferrable' in settings
and settings
['deferrable']:
222 self
.deferrable
= "\nSkipping deferrable and optional tests"
224 if 'optional' in settings
and settings
['optional']:
225 self
.optional
= "\nSkipping optional tests"
228 elif line
[0:1] == ";": # unexpected comment
229 raise Exception("Test data error at line %d:\n%s" % (self
.line_num
, line
))
230 self
.form
= line
# the line is a form to send
232 # Now find the output and return value
235 if line
[0:3] == ";=>":
240 elif line
[0:2] == ";/":
241 self
.out
= self
.out
+ line
[2:] + sep
247 if self
.ret
!= None: break
249 if self
.out
[-2:] == sep
and not self
.ret
:
250 # If there is no return value, output should not end in
252 self
.out
= self
.out
[0:-2]
255 args
= parser
.parse_args(sys
.argv
[1:])
256 # Workaround argparse issue with two '--' on command line
257 if sys
.argv
.count('--') > 0:
258 args
.mal_cmd
= sys
.argv
[sys
.argv
.index('--')+1:]
260 if args
.rundir
: os
.chdir(args
.rundir
)
262 if args
.log_file
: log_file
= open(args
.log_file
, "a")
263 if args
.debug_file
: debug_file
= open(args
.debug_file
, "a")
265 r
= Runner(args
.mal_cmd
, no_pty
=args
.no_pty
, line_break
="\r\n" if args
.crlf
else "\n")
266 t
= TestReader(args
.test_file
)
269 def assert_prompt(runner
, prompts
, timeout
):
270 # Wait for the initial prompt
271 header
= runner
.read_to_prompt(prompts
, timeout
=timeout
)
272 if not header
== None:
274 log("Started with:\n%s" % header
)
276 log("Did not receive one of following prompt(s): %s" % repr(prompts
))
277 log(" Got : %s" % repr(r
.buf
))
281 # Wait for the initial prompt
283 assert_prompt(r
, ['[^\s()<>]+> '], args
.start_timeout
)
285 _
, exc
, _
= sys
.exc_info()
286 log("\nException: %s" % repr(exc
))
287 log("Output before exception:\n%s" % r
.buf
)
290 # Send the pre-eval code if any
292 sys
.stdout
.write("RUNNING pre-eval: %s" % args
.pre_eval
)
293 r
.writeline(args
.pre_eval
)
294 assert_prompt(r
, ['[^\s()<>]+> '], args
.test_timeout
)
303 if args
.deferrable
== False and t
.deferrable
:
307 if args
.optional
== False and t
.optional
:
315 if t
.form
== None: continue
317 log("TEST: %s -> [%s,%s]" % (repr(t
.form
), repr(t
.out
), t
.ret
), end
='')
319 # The repeated form is to get around an occasional OS X issue
320 # where the form is repeated.
321 # https://github.com/kanaka/mal/issues/30
322 expects
= ["%s%s%s%s" % (re
.escape(t
.form
), sep
,
323 t
.out
, re
.escape(t
.ret
)),
324 "%s%s%s%s%s%s" % (re
.escape(t
.form
), sep
,
325 re
.escape(t
.form
), sep
,
326 t
.out
, re
.escape(t
.ret
))]
331 res
= r
.read_to_prompt(['\r\n[^\s()<>]+> ', '\n[^\s()<>]+> '],
332 timeout
=args
.test_timeout
)
333 #print "%s,%s,%s" % (idx, repr(p.before), repr(p.after))
334 if (t
.ret
== "" and t
.out
== ""):
335 log(" -> SUCCESS (result ignored)")
337 elif (re
.search(expects
[0], res
, re
.S
) or
338 re
.search(expects
[1], res
, re
.S
)):
342 if t
.soft
and not args
.hard
:
343 log(" -> SOFT FAIL (line %d):" % t
.line_num
)
347 log(" -> FAIL (line %d):" % t
.line_num
)
350 log(" Expected : %s" % repr(expects
[0]))
351 log(" Got : %s" % repr(res
))
352 failed_test
= """%sFAILED TEST (line %d): %s -> [%s,%s]:
354 Got : %s""" % (fail_type
, t
.line_num
, t
.form
, repr(t
.out
),
355 t
.ret
, repr(expects
[0]), repr(res
))
356 failures
.append(failed_test
)
358 _
, exc
, _
= sys
.exc_info()
359 log("\nException: %s" % repr(exc
))
360 log("Output before exception:\n%s" % r
.buf
)
363 if len(failures
) > 0:
369 TEST RESULTS (for %s):
370 %3d: soft failing tests
374 """ % (args
.test_file
, soft_fail_cnt
, fail_cnt
,
378 debug("\n") # add some separate to debug log