Update spec test script (#1227)
This commit is contained in:
@ -57,26 +57,25 @@ rundir = None
|
||||
|
||||
class Runner():
|
||||
def __init__(self, args, no_pty=False):
|
||||
#print "args: %s" % repr(args)
|
||||
self.no_pty = no_pty
|
||||
|
||||
# Cleanup child process on exit
|
||||
atexit.register(self.cleanup)
|
||||
|
||||
self.p = None
|
||||
self.process = None
|
||||
env = os.environ
|
||||
env['TERM'] = 'dumb'
|
||||
env['INPUTRC'] = '/dev/null'
|
||||
env['PERL_RL'] = 'false'
|
||||
if no_pty:
|
||||
self.p = Popen(args, bufsize=0,
|
||||
self.process = Popen(args, bufsize=0,
|
||||
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
|
||||
preexec_fn=os.setsid,
|
||||
env=env)
|
||||
self.stdin = self.p.stdin
|
||||
self.stdout = self.p.stdout
|
||||
self.stdin = self.process.stdin
|
||||
self.stdout = self.process.stdout
|
||||
else:
|
||||
# provide tty to get 'interactive' readline to work
|
||||
# Use tty to setup an interactive environment
|
||||
master, slave = pty.openpty()
|
||||
|
||||
# Set terminal size large so that readline will not send
|
||||
@ -84,7 +83,7 @@ class Runner():
|
||||
buf = array.array('h', [100, 200, 0, 0])
|
||||
fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True)
|
||||
|
||||
self.p = Popen(args, bufsize=0,
|
||||
self.process = Popen(args, bufsize=0,
|
||||
stdin=slave, stdout=slave, stderr=STDOUT,
|
||||
preexec_fn=os.setsid,
|
||||
env=env)
|
||||
@ -95,55 +94,55 @@ class Runner():
|
||||
self.stdin = os.fdopen(master, 'r+b', 0)
|
||||
self.stdout = self.stdin
|
||||
|
||||
#print "started"
|
||||
self.buf = ""
|
||||
self.last_prompt = ""
|
||||
|
||||
def read_to_prompt(self, prompts, timeout):
|
||||
end_time = time.time() + timeout
|
||||
while time.time() < end_time:
|
||||
wait_until = time.time() + timeout
|
||||
while time.time() < wait_until:
|
||||
[outs,_,_] = select([self.stdout], [], [], 1)
|
||||
if self.stdout in outs:
|
||||
new_data = self.stdout.read(1)
|
||||
if not new_data:
|
||||
read_byte = self.stdout.read(1)
|
||||
if not read_byte:
|
||||
# EOF on macOS ends up here.
|
||||
break
|
||||
new_data = new_data.decode("utf-8") if IS_PY_3 else new_data
|
||||
#print("new_data: '%s'" % new_data)
|
||||
debug(new_data)
|
||||
read_byte = read_byte.decode('utf-8') if IS_PY_3 else read_byte
|
||||
|
||||
debug(read_byte)
|
||||
if self.no_pty:
|
||||
self.buf += new_data.replace("\n", "\r\n")
|
||||
self.buf += read_byte.replace('\n', '\r\n')
|
||||
else:
|
||||
self.buf += new_data
|
||||
self.buf = self.buf.replace("\r\r", "\r")
|
||||
self.buf += read_byte
|
||||
self.buf = self.buf.replace('\r\r', '\r')
|
||||
|
||||
# filter the prompts
|
||||
for prompt in prompts:
|
||||
regexp = re.compile(prompt)
|
||||
match = regexp.search(self.buf)
|
||||
pattern = re.compile(prompt)
|
||||
match = pattern.search(self.buf)
|
||||
if match:
|
||||
end = match.end()
|
||||
buf = self.buf[0:end-len(prompt)]
|
||||
self.buf = self.buf[end:]
|
||||
self.last_prompt = prompt
|
||||
return buf
|
||||
return None
|
||||
|
||||
def writeline(self, str):
|
||||
def _to_bytes(s):
|
||||
return bytes(s, "utf-8") if IS_PY_3 else s
|
||||
str_to_write = str + '\n'
|
||||
str_to_write = bytes(
|
||||
str_to_write, 'utf-8') if IS_PY_3 else str_to_write
|
||||
|
||||
self.stdin.write(_to_bytes(str + "\n"))
|
||||
self.stdin.write(str_to_write)
|
||||
|
||||
def cleanup(self):
|
||||
if self.p:
|
||||
if self.process:
|
||||
try:
|
||||
self.writeline("__exit__")
|
||||
time.sleep(.020)
|
||||
os.killpg(self.p.pid, signal.SIGTERM)
|
||||
self.process.kill()
|
||||
except OSError:
|
||||
pass
|
||||
except IOError:
|
||||
pass
|
||||
self.p = None
|
||||
self.process = None
|
||||
self.stdin.close()
|
||||
if self.stdin != self.stdout:
|
||||
self.stdout.close()
|
||||
|
||||
Reference in New Issue
Block a user