utils.py
5.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import os
import sys
import stat
import select
import time
import errno
try:
InterruptedError
except NameError:
# Alias Python2 exception to Python3
InterruptedError = select.error
if sys.version_info[0] >= 3:
string_types = (str,)
else:
string_types = (unicode, str)
def is_executable_file(path):
"""Checks that path is an executable regular file, or a symlink towards one.
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``.
"""
# follow symlinks,
fpath = os.path.realpath(path)
if not os.path.isfile(fpath):
# non-files (directories, fifo, etc.)
return False
mode = os.stat(fpath).st_mode
if (sys.platform.startswith('sunos')
and os.getuid() == 0):
# When root on Solaris, os.X_OK is True for *all* files, irregardless
# of their executability -- instead, any permission bit of any user,
# group, or other is fine enough.
#
# (This may be true for other "Unix98" OS's such as HP-UX and AIX)
return bool(mode & (stat.S_IXUSR |
stat.S_IXGRP |
stat.S_IXOTH))
return os.access(fpath, os.X_OK)
def which(filename, env=None):
'''This takes a given filename; tries to find it in the environment path;
then checks if it is executable. This returns the full path to the filename
if found and executable. Otherwise this returns None.'''
# Special case where filename contains an explicit path.
if os.path.dirname(filename) != '' and is_executable_file(filename):
return filename
if env is None:
env = os.environ
p = env.get('PATH')
if not p:
p = os.defpath
pathlist = p.split(os.pathsep)
for path in pathlist:
ff = os.path.join(path, filename)
if is_executable_file(ff):
return ff
return None
def split_command_line(command_line):
'''This splits a command line into a list of arguments. It splits arguments
on spaces, but handles embedded quotes, doublequotes, and escaped
characters. It's impossible to do this with a regular expression, so I
wrote a little state machine to parse the command line. '''
arg_list = []
arg = ''
# Constants to name the states we can be in.
state_basic = 0
state_esc = 1
state_singlequote = 2
state_doublequote = 3
# The state when consuming whitespace between commands.
state_whitespace = 4
state = state_basic
for c in command_line:
if state == state_basic or state == state_whitespace:
if c == '\\':
# Escape the next character
state = state_esc
elif c == r"'":
# Handle single quote
state = state_singlequote
elif c == r'"':
# Handle double quote
state = state_doublequote
elif c.isspace():
# Add arg to arg_list if we aren't in the middle of whitespace.
if state == state_whitespace:
# Do nothing.
None
else:
arg_list.append(arg)
arg = ''
state = state_whitespace
else:
arg = arg + c
state = state_basic
elif state == state_esc:
arg = arg + c
state = state_basic
elif state == state_singlequote:
if c == r"'":
state = state_basic
else:
arg = arg + c
elif state == state_doublequote:
if c == r'"':
state = state_basic
else:
arg = arg + c
if arg != '':
arg_list.append(arg)
return arg_list
def select_ignore_interrupts(iwtd, owtd, ewtd, timeout=None):
'''This is a wrapper around select.select() that ignores signals. If
select.select raises a select.error exception and errno is an EINTR
error then it is ignored. Mainly this is used to ignore sigwinch
(terminal resize). '''
# if select() is interrupted by a signal (errno==EINTR) then
# we loop back and enter the select() again.
if timeout is not None:
end_time = time.time() + timeout
while True:
try:
return select.select(iwtd, owtd, ewtd, timeout)
except InterruptedError:
err = sys.exc_info()[1]
if err.args[0] == errno.EINTR:
# if we loop back we have to subtract the
# amount of time we already waited.
if timeout is not None:
timeout = end_time - time.time()
if timeout < 0:
return([], [], [])
else:
# something else caused the select.error, so
# this actually is an exception.
raise
def poll_ignore_interrupts(fds, timeout=None):
'''Simple wrapper around poll to register file descriptors and
ignore signals.'''
if timeout is not None:
end_time = time.time() + timeout
poller = select.poll()
for fd in fds:
poller.register(fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
while True:
try:
timeout_ms = None if timeout is None else timeout * 1000
results = poller.poll(timeout_ms)
return [afd for afd, _ in results]
except InterruptedError:
err = sys.exc_info()[1]
if err.args[0] == errno.EINTR:
# if we loop back we have to subtract the
# amount of time we already waited.
if timeout is not None:
timeout = end_time - time.time()
if timeout < 0:
return []
else:
# something else caused the select.error, so
# this actually is an exception.
raise