7 from subprocess
import Popen
, STDOUT
, PIPE
8 from select
import select
10 # Pseudo-TTY and terminal manipulation
11 import pty
, array
, fcntl
, termios
13 IS_PY_3
= sys
.version_info
[0] == 3
15 # TODO: do we need to support '\n' too
20 parser
= argparse
.ArgumentParser(
21 description
="Run a test file against a Mal implementation")
22 parser
.add_argument('--rundir',
23 help="change to the directory before running tests")
24 parser
.add_argument('--start-timeout', default
=10, type=int,
25 help="default timeout for initial prompt")
26 parser
.add_argument('--test-timeout', default
=20, type=int,
27 help="default timeout for each individual test action")
28 parser
.add_argument('--pre-eval', default
=None, type=str,
29 help="Mal code to evaluate prior to running the test")
30 parser
.add_argument('--no-pty', action
='store_true',
31 help="Use direct pipes instead of pseudo-tty")
32 parser
.add_argument('--log-file', type=str,
33 help="Write all test interaction the named file")
34 parser
.add_argument('--soft', action
='store_true',
35 help="Report but do not fail tests after ';>>> soft=True'")
37 parser
.add_argument('test_file', type=argparse
.FileType('r'),
38 help="a test file formatted as with mal test data")
39 parser
.add_argument('mal_cmd', nargs
="*",
40 help="Mal implementation command line. Use '--' to "
41 "specify a Mal command line with dashed options.")
44 def __init__(self
, args
, no_pty
=False, log_file
=None):
45 #print "args: %s" % repr(args)
48 if log_file
: self
.logf
= open(log_file
, "a")
49 else: self
.logf
= None
51 # Cleanup child process on exit
52 atexit
.register(self
.cleanup
)
57 env
['INPUTRC'] = '/dev/null'
58 env
['PERL_RL'] = 'false'
60 self
.p
= Popen(args
, bufsize
=0,
61 stdin
=PIPE
, stdout
=PIPE
, stderr
=STDOUT
,
64 self
.stdin
= self
.p
.stdin
65 self
.stdout
= self
.p
.stdout
67 # provide tty to get 'interactive' readline to work
68 master
, slave
= pty
.openpty()
70 # Set terminal size large so that readline will not send
71 # ANSI/VT escape codes when the lines are long.
72 buf
= array
.array('h', [100, 200, 0, 0])
73 fcntl
.ioctl(master
, termios
.TIOCSWINSZ
, buf
, True)
75 self
.p
= Popen(args
, bufsize
=0,
76 stdin
=slave
, stdout
=slave
, stderr
=STDOUT
,
79 # Now close slave so that we will get an exception from
80 # read when the child exits early
81 # http://stackoverflow.com/questions/11165521
83 self
.stdin
= os
.fdopen(master
, 'r+b', 0)
84 self
.stdout
= self
.stdin
90 def read_to_prompt(self
, prompts
, timeout
):
91 end_time
= time
.time() + timeout
92 while time
.time() < end_time
:
93 [outs
,_
,_
] = select([self
.stdout
], [], [], 1)
94 if self
.stdout
in outs
:
95 new_data
= self
.stdout
.read(1)
96 new_data
= new_data
.decode("utf-8") if IS_PY_3
else new_data
97 #print "new_data: '%s'" % new_data
100 self
.buf
+= new_data
.replace("\n", "\r\n")
103 for prompt
in prompts
:
104 regexp
= re
.compile(prompt
)
105 match
= regexp
.search(self
.buf
)
108 buf
= self
.buf
[0:end
-len(prompt
)]
109 self
.buf
= self
.buf
[end
:]
110 self
.last_prompt
= prompt
116 self
.logf
.write(data
)
120 def writeline(self
, str):
122 return bytes(s
, "utf-8") if IS_PY_3
else s
124 self
.stdin
.write(_to_bytes(str + "\n"))
130 os
.killpg(self
.p
.pid
, signal
.SIGTERM
)
136 def __init__(self
, test_file
):
138 self
.data
= test_file
.read().split('\n')
148 line
= self
.data
.pop(0)
149 if re
.match(r
"^\s*$", line
): # blank line
151 elif line
[0:3] == ";;;": # ignore comment
153 elif line
[0:2] == ";;": # output comment
156 elif line
[0:5] == ";>>> ": # settings/commands
158 exec(line
[5:], {}, settings
)
159 if 'soft' in settings
: self
.soft
= True
161 elif line
[0:1] == ";": # unexpected comment
162 print("Test data error at line %d:\n%s" % (self
.line_num
, line
))
164 self
.form
= line
# the line is a form to send
166 # Now find the output and return value
169 if line
[0:3] == ";=>":
170 self
.ret
= line
[3:].replace('\\r', '\r').replace('\\n', '\n')
174 elif line
[0:2] == "; ":
175 self
.out
= self
.out
+ line
[2:] + sep
185 args
= parser
.parse_args(sys
.argv
[1:])
186 # Workaround argparse issue with two '--' on command line
187 if sys
.argv
.count('--') > 0:
188 args
.mal_cmd
= sys
.argv
[sys
.argv
.index('--')+1:]
190 if args
.rundir
: os
.chdir(args
.rundir
)
192 r
= Runner(args
.mal_cmd
, no_pty
=args
.no_pty
, log_file
=args
.log_file
)
193 t
= TestReader(args
.test_file
)
196 def assert_prompt(runner
, prompts
, timeout
):
197 # Wait for the initial prompt
198 header
= runner
.read_to_prompt(prompts
, timeout
=timeout
)
199 if not header
== None:
201 print("Started with:\n%s" % header
)
203 print("Did not one of following prompt(s): %s" % repr(prompts
))
204 print(" Got : %s" % repr(r
.buf
))
208 # Wait for the initial prompt
209 assert_prompt(r
, ['user> ', 'mal-user> '], args
.start_timeout
)
211 # Send the pre-eval code if any
213 sys
.stdout
.write("RUNNING pre-eval: %s" % args
.pre_eval
)
214 p
.write(args
.pre_eval
)
215 assert_prompt(args
.test_timeout
)
221 sys
.stdout
.write("TEST: %s -> [%s,%s]" % (t
.form
, repr(t
.out
), t
.ret
))
224 # The repeated form is to get around an occasional OS X issue
225 # where the form is repeated.
226 # https://github.com/kanaka/mal/issues/30
227 expected
= ["%s%s%s%s" % (t
.form
, sep
, t
.out
, t
.ret
),
228 "%s%s%s%s%s%s" % (t
.form
, sep
, t
.form
, sep
, t
.out
, t
.ret
)]
232 res
= r
.read_to_prompt(['\r\nuser> ', '\nuser> ',
233 '\r\nmal-user> ', '\nmal-user> '],
234 timeout
=args
.test_timeout
)
235 #print "%s,%s,%s" % (idx, repr(p.before), repr(p.after))
236 if t
.ret
== "*" or res
in expected
:
239 if args
.soft
and t
.soft
:
240 print(" -> SOFT FAIL (line %d):" % t
.line_num
)
243 print(" -> FAIL (line %d):" % t
.line_num
)
245 print(" Expected : %s" % repr(expected
))
246 print(" Got : %s" % repr(res
))
248 _
, exc
, _
= sys
.exc_info()
249 print("\nException: %s" % repr(exc
))
250 print("Output before exception:\n%s" % r
.buf
)
253 if soft_fail_cnt
> 0:
254 print("SOFT FAILURES: %d" % soft_fail_cnt
)
256 print("FAILURES: %d" % fail_cnt
)