from __future__ import absolute_import import itertools import lit.util from lit.ShCommands import Command, GlobItem, Pipeline, Seq class ShLexer: def __init__(self, data, win32Escapes=False): self.data = data self.pos = 0 self.end = len(data) self.win32Escapes = win32Escapes def eat(self): c = self.data[self.pos] self.pos += 1 return c def look(self): return self.data[self.pos] def maybe_eat(self, c): """ maybe_eat(c) - Consume the character c if it is the next character, returning True if a character was consumed.""" if self.data[self.pos] == c: self.pos += 1 return True return False def lex_arg_fast(self, c): # Get the leading whitespace free section. chunk = self.data[self.pos - 1 :].split(None, 1)[0] # If it has special characters, the fast path failed. if ( "|" in chunk or "&" in chunk or "<" in chunk or ">" in chunk or "'" in chunk or '"' in chunk or ";" in chunk or "\\" in chunk ): return None self.pos = self.pos - 1 + len(chunk) return GlobItem(chunk) if "*" in chunk or "?" in chunk else chunk def lex_arg_slow(self, c): if c in "'\"": str = self.lex_arg_quoted(c) else: str = c unquoted_glob_char = False quoted_glob_char = False while self.pos != self.end: c = self.look() if c.isspace() or c in "|&;": break elif c in "><": # This is an annoying case; we treat '2>' as a single token so # we don't have to track whitespace tokens. # If the parse string isn't an integer, do the usual thing. if not str.isdigit(): break # Otherwise, lex the operator and convert to a redirection # token. num = int(str) tok = self.lex_one_token() assert isinstance(tok, tuple) and len(tok) == 1 return (tok[0], num) elif c == '"' or c == "'": self.eat() quoted_arg = self.lex_arg_quoted(c) if "*" in quoted_arg or "?" in quoted_arg: quoted_glob_char = True str += quoted_arg elif not self.win32Escapes and c == "\\": # Outside of a string, '\\' escapes everything. self.eat() if self.pos == self.end: lit.util.warning( "escape at end of quoted argument in: %r" % self.data ) return str str += self.eat() elif c in "*?": unquoted_glob_char = True str += self.eat() else: str += self.eat() # If a quote character is present, lex_arg_quoted will remove the quotes # and append the argument directly. This causes a problem when the # quoted portion contains a glob character, as the character will no # longer be treated literally. If glob characters occur *only* inside # of quotes, then we can handle this by not globbing at all, and if # glob characters occur *only* outside of quotes, we can still glob just # fine. But if a glob character occurs both inside and outside of # quotes this presents a problem. In practice this is such an obscure # edge case that it doesn't seem worth the added complexity to support. # By adding an assertion, it means some bot somewhere will catch this # and flag the user of a non-portable test (which could almost certainly # be re-written to work correctly without triggering this). assert not (quoted_glob_char and unquoted_glob_char) return GlobItem(str) if unquoted_glob_char else str def lex_arg_quoted(self, delim): str = "" while self.pos != self.end: c = self.eat() if c == delim: return str elif c == "\\" and delim == '"': # Inside a '"' quoted string, '\\' only escapes the quote # character and backslash, otherwise it is preserved. if self.pos == self.end: lit.util.warning( "escape at end of quoted argument in: %r" % self.data ) return str c = self.eat() if c == '"': # str += '"' elif c == "\\": str += "\\" else: str += "\\" + c else: str += c lit.util.warning("missing quote character in %r" % self.data) return str def lex_arg_checked(self, c): pos = self.pos res = self.lex_arg_fast(c) end = self.pos self.pos = pos reference = self.lex_arg_slow(c) if res is not None: if res != reference: raise ValueError("Fast path failure: %r != %r" % (res, reference)) if self.pos != end: raise ValueError("Fast path failure: %r != %r" % (self.pos, end)) return reference def lex_arg(self, c): return self.lex_arg_fast(c) or self.lex_arg_slow(c) def lex_one_token(self): """ lex_one_token - Lex a single 'sh' token.""" c = self.eat() if c == ";": return (c,) if c == "|": if self.maybe_eat("|"): return ("||",) return (c,) if c == "&": if self.maybe_eat("&"): return ("&&",) if self.maybe_eat(">"): return ("&>",) return (c,) if c == ">": if self.maybe_eat("&"): return (">&",) if self.maybe_eat(">"): return (">>",) return (c,) if c == "<": if self.maybe_eat("&"): return ("<&",) if self.maybe_eat(">"): return ("<<",) return (c,) return self.lex_arg(c) def lex(self): while self.pos != self.end: if self.look().isspace(): self.eat() else: yield self.lex_one_token() ### class ShParser: def __init__(self, data, win32Escapes=False, pipefail=False): self.data = data self.pipefail = pipefail self.tokens = ShLexer(data, win32Escapes=win32Escapes).lex() def lex(self): for item in self.tokens: return item return None def look(self): token = self.lex() if token is not None: self.tokens = itertools.chain([token], self.tokens) return token def parse_command(self): tok = self.lex() if not tok: raise ValueError("empty command!") if isinstance(tok, tuple): raise ValueError("syntax error near unexpected token %r" % tok[0]) args = [tok] redirects = [] while 1: tok = self.look() # EOF? if tok is None: break # If this is an argument, just add it to the current command. if isinstance(tok, (str, GlobItem)): args.append(self.lex()) continue # Otherwise see if it is a terminator. assert isinstance(tok, tuple) if tok[0] in ("|", ";", "&", "||", "&&"): break # Otherwise it must be a redirection. op = self.lex() arg = self.lex() if not arg: raise ValueError("syntax error near token %r" % op[0]) redirects.append((op, arg)) return Command(args, redirects) def parse_pipeline(self): negate = False commands = [self.parse_command()] while self.look() == ("|",): self.lex() commands.append(self.parse_command()) return Pipeline(commands, negate, self.pipefail) def parse(self): lhs = self.parse_pipeline() while self.look(): operator = self.lex() assert isinstance(operator, tuple) and len(operator) == 1 if not self.look(): raise ValueError("missing argument to operator %r" % operator[0]) # FIXME: Operator precedence!! lhs = Seq(lhs, operator[0], self.parse_pipeline()) return lhs