Source code for rql.parser

"""yapps input grammar for RQL.

:organization: Logilab
:copyright: 2003-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: --

Select statement grammar

query = <squery> | <union>

union = (<squery>) UNION (<squery>) [UNION (<squery>)]*

squery = Any <selection>
        [GROUPBY <variables>]
        [ORDERBY <sortterms>]
        [LIMIT <nb> OFFSET <nb>]
        [WHERE <restriction>]
        [HAVING <aggregat restriction>]
        [WITH <subquery> [,<subquery]*]

subquery = <variables> BEING (<query>)

variables = <variable> [, <variable>]*

Abbreviations in this code

* rel -> relation
* decl -> declaration
* expr -> expression
* restr -> restriction
* var -> variable
* func -> function
* const -> constant
* cmp -> comparison

* R -> syntax tree root
* S -> select node
* P -> parent node


# Begin -- grammar generated by Yapps
from __future__ import print_function
import sys, re
from yapps import runtime

[docs]class HerculeScanner(runtime.Scanner): patterns = [ ("'IN'", re.compile("IN")), ("'\\)'", re.compile("\\)")), ("'\\('", re.compile("\\(")), ("','", re.compile(",")), ('r"\\)"', re.compile("\\)")), ('r"\\("', re.compile("\\(")), ('":"', re.compile(":")), ("';'", re.compile(";")), ("\\s+", re.compile("\\s+")), ("/\\*(?:[^*]|\\*(?!/))*\\*/", re.compile("/\\*(?:[^*]|\\*(?!/))*\\*/")), ("DELETE", re.compile("(?i)DELETE")), ("SET", re.compile("(?i)SET")), ("INSERT", re.compile("(?i)INSERT")), ("UNION", re.compile("(?i)UNION")), ("DISTINCT", re.compile("(?i)DISTINCT")), ("WITH", re.compile("(?i)WITH")), ("WHERE", re.compile("(?i)WHERE")), ("BEING", re.compile("(?i)BEING")), ("OR", re.compile("(?i)OR")), ("AND", re.compile("(?i)AND")), ("NOT", re.compile("(?i)NOT")), ("GROUPBY", re.compile("(?i)GROUPBY")), ("HAVING", re.compile("(?i)HAVING")), ("ORDERBY", re.compile("(?i)ORDERBY")), ("SORT_ASC", re.compile("(?i)ASC")), ("SORT_DESC", re.compile("(?i)DESC")), ("SORT_NULLSFIRST", re.compile("(?i)NULLSFIRST")), ("SORT_NULLSLAST", re.compile("(?i)NULLSLAST")), ("LIMIT", re.compile("(?i)LIMIT")), ("OFFSET", re.compile("(?i)OFFSET")), ("DATE", re.compile("(?i)TODAY")), ("DATETIME", re.compile("(?i)NOW")), ("TRUE", re.compile("(?i)TRUE")), ("FALSE", re.compile("(?i)FALSE")), ("NULL", re.compile("(?i)NULL")), ("EXISTS", re.compile("(?i)EXISTS")), ("CMP_OP", re.compile("(?i)<=|<|>=|>|!=|=|~=|LIKE|ILIKE|REGEXP")), ("ADD_OP", re.compile("\\+|-|\\||#")), ("MUL_OP", re.compile("\\*|/|%|&")), ("POW_OP", re.compile("\\^|>>|<<")), ("UNARY_OP", re.compile("-|~")), ("FUNCTION", re.compile("[A-Za-z_]+\\s*(?=\\()")), ("R_TYPE", re.compile("[a-z_][a-z0-9_]*")), ("E_TYPE", re.compile("[A-Z][A-Za-z0-9]*[a-z]+[A-Z0-9]*")), ("VARIABLE", re.compile("[A-Z][A-Z0-9_]*")), ("QMARK", re.compile("\\?")), ("FUNCTION_CALL", re.compile("[A-Z][A-Z0-9_]*\\.[a-z_][a-zA-Z0-9_]+")), ( "STRING", re.compile("'([^\\'\\\\]|\\\\.)*'|\\\"([^\\\\\\\"\\\\]|\\\\.)*\\\""), ), ("FLOAT", re.compile("-?\\d+\\.\\d*")), ("INT", re.compile("-?\\d+")), ("SUBSTITUTE", re.compile("%\\([A-Za-z_0-9]+\\)s")), ] def __init__(self, str, *args, **kw): runtime.Scanner.__init__( self, None, { "\\s+": None, "/\\*(?:[^*]|\\*(?!/))*\\*/": None, }, str, *args, **kw )
[docs]class Hercule(runtime.Parser): Context = runtime.Context
[docs] def goal(self, _parent=None): _context = self.Context(_parent, self._scanner, "goal", []) _token = self._peek( "DELETE", "INSERT", "SET", 'r"\\("', "DISTINCT", "E_TYPE", context=_context ) if _token == "DELETE": DELETE = self._scan("DELETE", context=_context) _delete = self._delete(Delete(), _context) self._scan("';'", context=_context) return _delete elif _token == "INSERT": INSERT = self._scan("INSERT", context=_context) _insert = self._insert(Insert(), _context) self._scan("';'", context=_context) return _insert elif _token == "SET": SET = self._scan("SET", context=_context) update = self.update(Set(), _context) self._scan("';'", context=_context) return update else: # in ['r"\\("', 'DISTINCT', 'E_TYPE'] union = self.union(Union(), _context) self._scan("';'", context=_context) return union
def _delete(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "_delete", [R]) _token = self._peek("E_TYPE", "VARIABLE", context=_context) if _token == "VARIABLE": decl_rels = self.decl_rels(R, _context) where = self.where(R, _context) having = self.having(R, _context) return R else: # == 'E_TYPE' decl_vars = self.decl_vars(R, _context) where = self.where(R, _context) having = self.having(R, _context) return R def _insert(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "_insert", [R]) decl_vars = self.decl_vars(R, _context) insert_rels = self.insert_rels(R, _context) return R
[docs] def insert_rels(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "insert_rels", [R]) _token = self._peek('":"', "';'", context=_context) if _token == '":"': self._scan('":"', context=_context) decl_rels = self.decl_rels(R, _context) where = self.where(R, _context) having = self.having(R, _context) return R else: # == "';'" pass
[docs] def update(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "update", [R]) decl_rels = self.decl_rels(R, _context) where = self.where(R, _context) having = self.having(R, _context) return R
[docs] def union(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "union", [R]) _token = self._peek('r"\\("', "DISTINCT", "E_TYPE", context=_context) if _token != 'r"\\("': select =, _context) R.append(select) return R else: # == 'r"\\("' self._scan('r"\\("', context=_context) select =, _context) self._scan('r"\\)"', context=_context) R.append(select) while self._peek("UNION", "';'", 'r"\\)"', context=_context) == "UNION": UNION = self._scan("UNION", context=_context) self._scan('r"\\("', context=_context) select =, _context) self._scan('r"\\)"', context=_context) R.append(select) return R
[docs] def select(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "select", [S]) _token = self._peek("DISTINCT", "E_TYPE", context=_context) if _token == "DISTINCT": DISTINCT = self._scan("DISTINCT", context=_context) select_ = self.select_(S, _context) S.distinct = True return S else: # == 'E_TYPE' select_ = self.select_(S, _context) return S
[docs] def select_(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "select_", [S]) E_TYPE = self._scan("E_TYPE", context=_context) selection = self.selection(S, _context) groupby = self.groupby(S, _context) orderby = self.orderby(S, _context) limit_offset = self.limit_offset(S, _context) where = self.where(S, _context) having = self.having(S, _context) with_ = self.with_(S, _context) S.set_statement_type(E_TYPE) return S
[docs] def selection(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "selection", [S]) selection_function = self.selection_function(S, _context) while ( self._peek( "','", "GROUPBY", "ORDERBY", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) == "','" ): self._scan("','", context=_context) selection_function = self.selection_function(S, _context)
[docs] def selection_function(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "selection_function", [S]) _token = self._peek( "FUNCTION_CALL", "UNARY_OP", 'r"\\("', "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) if _token != "FUNCTION_CALL": expr_add = self.expr_add(S, _context) S.append_selected(expr_add) else: # == 'FUNCTION_CALL' FUNCTION_CALL = self._scan("FUNCTION_CALL", context=_context) _token = self._peek( "'\\('", "','", "GROUPBY", "ORDERBY", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) if _token == "'\\('": self._scan("'\\('", context=_context) function_args = self.function_args(_context) self._scan("'\\)'", context=_context) S.append_selected_method_call(FUNCTION_CALL, function_args) else: S.append_selected_attribute_access(FUNCTION_CALL)
[docs] def function_args(self, _parent=None): _context = self.Context(_parent, self._scanner, "function_args", []) _token = self._peek( "'\\)'", "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", context=_context, ) if _token != "'\\)'": const = self.const(_context) args = [const] while self._peek("','", "'\\)'", context=_context) == "','": self._scan("','", context=_context) const = self.const(_context) args.append(const) return args else: # == "'\\)'" return []
[docs] def groupby(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "groupby", [S]) _token = self._peek( "GROUPBY", "ORDERBY", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) if _token == "GROUPBY": GROUPBY = self._scan("GROUPBY", context=_context) nodes = [] expr_add = self.expr_add(S, _context) nodes.append(expr_add) while ( self._peek( "','", "ORDERBY", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) == "','" ): self._scan("','", context=_context) expr_add = self.expr_add(S, _context) nodes.append(expr_add) S.set_groupby(nodes) return True else: pass
[docs] def having(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "having", [S]) _token = self._peek("HAVING", "WITH", "';'", 'r"\\)"', context=_context) if _token == "HAVING": HAVING = self._scan("HAVING", context=_context) logical_expr = self.logical_expr(S, _context) S.set_having([logical_expr]) else: # in ['WITH', "';'", 'r"\\)"'] pass
[docs] def orderby(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "orderby", [S]) _token = self._peek( "ORDERBY", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) if _token == "ORDERBY": ORDERBY = self._scan("ORDERBY", context=_context) nodes = [] sort_term = self.sort_term(S, _context) nodes.append(sort_term) while ( self._peek( "','", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) == "','" ): self._scan("','", context=_context) sort_term = self.sort_term(S, _context) nodes.append(sort_term) S.set_orderby(nodes) return True else: pass
[docs] def with_(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "with_", [S]) _token = self._peek("WITH", 'r"\\)"', "';'", context=_context) if _token == "WITH": WITH = self._scan("WITH", context=_context) nodes = [] subquery = self.subquery(S, _context) nodes.append(subquery) while self._peek("','", 'r"\\)"', "';'", context=_context) == "','": self._scan("','", context=_context) subquery = self.subquery(S, _context) nodes.append(subquery) S.set_with(nodes) else: # in ['r"\\)"', "';'"] pass
[docs] def subquery(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "subquery", [S]) variables = self.variables(S, _context) node = SubQuery() node.set_aliases(variables) BEING = self._scan("BEING", context=_context) self._scan('r"\\("', context=_context) union = self.union(Union(), _context) self._scan('r"\\)"', context=_context) node.set_query(union) return node
[docs] def sort_term(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "sort_term", [S]) expr_add = self.expr_add(S, _context) sort_meth = self.sort_meth(_context) sort_meth_nulls = self.sort_meth_nulls(_context) return SortTerm(expr_add, sort_meth, sort_meth_nulls)
[docs] def sort_meth(self, _parent=None): _context = self.Context(_parent, self._scanner, "sort_meth", []) _token = self._peek( "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "','", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) if _token == "SORT_DESC": SORT_DESC = self._scan("SORT_DESC", context=_context) return 0 elif _token == "SORT_ASC": SORT_ASC = self._scan("SORT_ASC", context=_context) return 1 else: return 1 # default to SORT_ASC
[docs] def sort_meth_nulls(self, _parent=None): _context = self.Context(_parent, self._scanner, "sort_meth_nulls", []) _token = self._peek( "SORT_NULLSFIRST", "SORT_NULLSLAST", "','", "WHERE", "LIMIT", "OFFSET", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) if _token == "SORT_NULLSFIRST": SORT_NULLSFIRST = self._scan("SORT_NULLSFIRST", context=_context) return 1 elif _token == "SORT_NULLSLAST": SORT_NULLSLAST = self._scan("SORT_NULLSLAST", context=_context) return 2 else: return 0 # default
[docs] def limit_offset(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "limit_offset", [R]) limit = self.limit(R, _context) offset = self.offset(R, _context) return limit or offset
[docs] def limit(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "limit", [R]) _token = self._peek( "LIMIT", "OFFSET", "WHERE", "HAVING", "WITH", "';'", 'r"\\)"', context=_context, ) if _token == "LIMIT": LIMIT = self._scan("LIMIT", context=_context) INT = self._scan("INT", context=_context) R.set_limit(int(INT)) return True else: # in ['OFFSET', 'WHERE', 'HAVING', 'WITH', "';'", 'r"\\)"'] pass
[docs] def offset(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "offset", [R]) _token = self._peek( "OFFSET", "WHERE", "HAVING", "WITH", "';'", 'r"\\)"', context=_context ) if _token == "OFFSET": OFFSET = self._scan("OFFSET", context=_context) INT = self._scan("INT", context=_context) R.set_offset(int(INT)) return True else: # in ['WHERE', 'HAVING', 'WITH', "';'", 'r"\\)"'] pass
[docs] def where(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "where", [S]) _token = self._peek( "WHERE", "HAVING", "WITH", "';'", 'r"\\)"', context=_context ) if _token == "WHERE": WHERE = self._scan("WHERE", context=_context) restriction = self.restriction(S, _context) S.set_where(restriction) else: # in ['HAVING', 'WITH', "';'", 'r"\\)"'] pass
[docs] def restriction(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "restriction", [S]) rels_or = self.rels_or(S, _context) node = rels_or while ( self._peek("','", 'r"\\)"', "HAVING", "WITH", "';'", context=_context) == "','" ): self._scan("','", context=_context) rels_or = self.rels_or(S, _context) node = And(node, rels_or) return node
[docs] def rels_or(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "rels_or", [S]) rels_and = self.rels_and(S, _context) node = rels_and while ( self._peek("OR", "','", 'r"\\)"', "HAVING", "WITH", "';'", context=_context) == "OR" ): OR = self._scan("OR", context=_context) rels_and = self.rels_and(S, _context) node = Or(node, rels_and) return node
[docs] def rels_and(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "rels_and", [S]) rels_not = self.rels_not(S, _context) node = rels_not while ( self._peek( "AND", "OR", "','", 'r"\\)"', "HAVING", "WITH", "';'", context=_context ) == "AND" ): AND = self._scan("AND", context=_context) rels_not = self.rels_not(S, _context) node = And(node, rels_not) return node
[docs] def rels_not(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "rels_not", [S]) _token = self._peek("NOT", 'r"\\("', "EXISTS", "VARIABLE", context=_context) if _token == "NOT": NOT = self._scan("NOT", context=_context) rel = self.rel(S, _context) return Not(rel) else: # in ['r"\\("', 'EXISTS', 'VARIABLE'] rel = self.rel(S, _context) return rel
[docs] def rel(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "rel", [S]) _token = self._peek('r"\\("', "EXISTS", "VARIABLE", context=_context) if _token != 'r"\\("': rel_base = self.rel_base(S, _context) return rel_base else: # == 'r"\\("' self._scan('r"\\("', context=_context) restriction = self.restriction(S, _context) self._scan('r"\\)"', context=_context) return restriction
[docs] def rel_base(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "rel_base", [S]) _token = self._peek("EXISTS", "VARIABLE", context=_context) if _token == "VARIABLE": var = self.var(S, _context) opt_left = self.opt_left(S, _context) rtype = self.rtype(_context) rtype.append(var) rtype.set_optional(opt_left) expr = self.expr(S, _context) opt_right = self.opt_right(S, _context) rtype.append(expr) rtype.set_optional(opt_right) return rtype else: # == 'EXISTS' EXISTS = self._scan("EXISTS", context=_context) self._scan('r"\\("', context=_context) restriction = self.restriction(S, _context) self._scan('r"\\)"', context=_context) return Exists(restriction)
[docs] def rtype(self, _parent=None): _context = self.Context(_parent, self._scanner, "rtype", []) R_TYPE = self._scan("R_TYPE", context=_context) return Relation(R_TYPE)
[docs] def opt_left(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "opt_left", [S]) _token = self._peek("QMARK", "R_TYPE", "CMP_OP", "'IN'", context=_context) if _token == "QMARK": QMARK = self._scan("QMARK", context=_context) return "left" else: # in ['R_TYPE', 'CMP_OP', "'IN'"] pass
[docs] def opt_right(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "opt_right", [S]) _token = self._peek( "QMARK", "AND", "OR", "','", 'r"\\)"', "WITH", "';'", "HAVING", context=_context, ) if _token == "QMARK": QMARK = self._scan("QMARK", context=_context) return "right" else: # in ['AND', 'OR', "','", 'r"\\)"', 'WITH', "';'", 'HAVING'] pass
[docs] def logical_expr(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "logical_expr", [S]) exprs_or = self.exprs_or(S, _context) node = exprs_or while self._peek("','", 'r"\\)"', "WITH", "';'", context=_context) == "','": self._scan("','", context=_context) exprs_or = self.exprs_or(S, _context) node = And(node, exprs_or) return node
[docs] def exprs_or(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "exprs_or", [S]) exprs_and = self.exprs_and(S, _context) node = exprs_and while ( self._peek("OR", "','", 'r"\\)"', "WITH", "';'", context=_context) == "OR" ): OR = self._scan("OR", context=_context) exprs_and = self.exprs_and(S, _context) node = Or(node, exprs_and) return node
[docs] def exprs_and(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "exprs_and", [S]) exprs_not = self.exprs_not(S, _context) node = exprs_not while ( self._peek("AND", "OR", "','", 'r"\\)"', "WITH", "';'", context=_context) == "AND" ): AND = self._scan("AND", context=_context) exprs_not = self.exprs_not(S, _context) node = And(node, exprs_not) return node
[docs] def exprs_not(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "exprs_not", [S]) _token = self._peek( "NOT", 'r"\\("', "UNARY_OP", "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) if _token == "NOT": NOT = self._scan("NOT", context=_context) balanced_expr = self.balanced_expr(S, _context) return Not(balanced_expr) else: balanced_expr = self.balanced_expr(S, _context) return balanced_expr
[docs] def balanced_expr(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "balanced_expr", [S]) _token = self._peek( 'r"\\("', "UNARY_OP", "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) if _token == 'r"\\("': self._scan('r"\\("', context=_context) logical_expr = self.logical_expr(S, _context) self._scan('r"\\)"', context=_context) return logical_expr elif 1: expr_add = self.expr_add(S, _context) opt_left = self.opt_left(S, _context) expr_op = self.expr_op(S, _context) opt_right = self.opt_right(S, _context) expr_op.insert(0, expr_add) expr_op.set_optional(opt_left, opt_right) return expr_op else: raise runtime.SyntaxError(_token[0], "Could not match balanced_expr")
[docs] def expr_op(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "expr_op", [S]) _token = self._peek("CMP_OP", "'IN'", context=_context) if _token == "CMP_OP": CMP_OP = self._scan("CMP_OP", context=_context) expr_add = self.expr_add(S, _context) return Comparison(CMP_OP.upper(), expr_add) else: # == "'IN'" in_expr = self.in_expr(S, _context) return Comparison("=", in_expr)
[docs] def variables(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "variables", [S]) vars = [] var = self.var(S, _context) vars.append(var) while self._peek("','", "BEING", context=_context) == "','": self._scan("','", context=_context) var = self.var(S, _context) vars.append(var) return vars
[docs] def decl_vars(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "decl_vars", [R]) E_TYPE = self._scan("E_TYPE", context=_context) var = self.var(R, _context) while ( self._peek( "','", "R_TYPE", "QMARK", "WHERE", '":"', "CMP_OP", "HAVING", "'IN'", "';'", "POW_OP", "BEING", "WITH", "MUL_OP", 'r"\\)"', "ADD_OP", "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "GROUPBY", "ORDERBY", "LIMIT", "OFFSET", "AND", "OR", context=_context, ) == "','" ): R.add_main_variable(E_TYPE, var) self._scan("','", context=_context) E_TYPE = self._scan("E_TYPE", context=_context) var = self.var(R, _context) R.add_main_variable(E_TYPE, var)
[docs] def decl_rels(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "decl_rels", [R]) simple_rel = self.simple_rel(R, _context) while ( self._peek( "','", "WHERE", "HAVING", "WITH", "';'", 'r"\\)"', context=_context ) == "','" ): R.add_main_relation(simple_rel) self._scan("','", context=_context) simple_rel = self.simple_rel(R, _context) R.add_main_relation(simple_rel)
[docs] def simple_rel(self, R, _parent=None): _context = self.Context(_parent, self._scanner, "simple_rel", [R]) var = self.var(R, _context) R_TYPE = self._scan("R_TYPE", context=_context) e = Relation(R_TYPE) e.append(var) expr_add = self.expr_add(R, _context) e.append(Comparison("=", expr_add)) return e
[docs] def expr(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "expr", [S]) _token = self._peek( "CMP_OP", "UNARY_OP", 'r"\\("', "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) if _token == "CMP_OP": CMP_OP = self._scan("CMP_OP", context=_context) expr_add = self.expr_add(S, _context) return Comparison(CMP_OP.upper(), expr_add) else: expr_add = self.expr_add(S, _context) return Comparison("=", expr_add)
[docs] def expr_add(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "expr_add", [S]) _token = self._peek( "UNARY_OP", 'r"\\("', "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) if _token != "UNARY_OP": expr_mul = self.expr_mul(S, _context) node = expr_mul while ( self._peek( "ADD_OP", "QMARK", 'r"\\)"', "','", "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "CMP_OP", "R_TYPE", "'IN'", "GROUPBY", "ORDERBY", "WHERE", "HAVING", "LIMIT", "OFFSET", "WITH", "';'", "AND", "OR", context=_context, ) == "ADD_OP" ): ADD_OP = self._scan("ADD_OP", context=_context) expr_mul = self.expr_mul(S, _context) node = MathExpression(ADD_OP, node, expr_mul) return node else: # == 'UNARY_OP' UNARY_OP = self._scan("UNARY_OP", context=_context) expr_mul = self.expr_mul(S, _context) node = UnaryExpression(UNARY_OP, expr_mul) while ( self._peek( "ADD_OP", "QMARK", 'r"\\)"', "','", "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "CMP_OP", "R_TYPE", "'IN'", "GROUPBY", "ORDERBY", "WHERE", "HAVING", "LIMIT", "OFFSET", "WITH", "';'", "AND", "OR", context=_context, ) == "ADD_OP" ): ADD_OP = self._scan("ADD_OP", context=_context) expr_mul = self.expr_mul(S, _context) node = MathExpression(ADD_OP, node, expr_mul) return node
[docs] def expr_mul(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "expr_mul", [S]) expr_pow = self.expr_pow(S, _context) node = expr_pow while ( self._peek( "MUL_OP", "ADD_OP", "QMARK", 'r"\\)"', "','", "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "CMP_OP", "R_TYPE", "'IN'", "GROUPBY", "ORDERBY", "WHERE", "HAVING", "LIMIT", "OFFSET", "WITH", "';'", "AND", "OR", context=_context, ) == "MUL_OP" ): MUL_OP = self._scan("MUL_OP", context=_context) expr_pow = self.expr_pow(S, _context) node = MathExpression(MUL_OP, node, expr_pow) return node
[docs] def expr_pow(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "expr_pow", [S]) expr_base = self.expr_base(S, _context) node = expr_base while ( self._peek( "POW_OP", "MUL_OP", "ADD_OP", "QMARK", 'r"\\)"', "','", "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "CMP_OP", "R_TYPE", "'IN'", "GROUPBY", "ORDERBY", "WHERE", "HAVING", "LIMIT", "OFFSET", "WITH", "';'", "AND", "OR", context=_context, ) == "POW_OP" ): POW_OP = self._scan("POW_OP", context=_context) expr_base = self.expr_base(S, _context) node = MathExpression(MUL_OP, node, expr_base) return node
[docs] def expr_base(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "expr_base", [S]) _token = self._peek( 'r"\\("', "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) if _token not in ['r"\\("', "VARIABLE", "E_TYPE", "FUNCTION"]: const = self.const(_context) return const elif _token == "VARIABLE": var = self.var(S, _context) return var elif _token == "E_TYPE": etype = self.etype(S, _context) return etype elif _token == "FUNCTION": func = self.func(S, _context) return func else: # == 'r"\\("' self._scan('r"\\("', context=_context) expr_add = self.expr_add(S, _context) self._scan('r"\\)"', context=_context) return expr_add
[docs] def func(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "func", [S]) FUNCTION = self._scan("FUNCTION", context=_context) self._scan('r"\\("', context=_context) F = Function(FUNCTION) if ( self._peek( "UNARY_OP", 'r"\\)"', 'r"\\("', "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) != 'r"\\)"' ): expr_add = self.expr_add(S, _context) while ( self._peek( "','", "QMARK", 'r"\\)"', "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "CMP_OP", "R_TYPE", "'IN'", "GROUPBY", "ORDERBY", "WHERE", "HAVING", "LIMIT", "OFFSET", "WITH", "';'", "AND", "OR", context=_context, ) == "','" ): F.append(expr_add) self._scan("','", context=_context) expr_add = self.expr_add(S, _context) F.append(expr_add) self._scan('r"\\)"', context=_context) return F
[docs] def in_expr(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "in_expr", [S]) self._scan("'IN'", context=_context) self._scan('r"\\("', context=_context) F = Function("IN") if ( self._peek( "UNARY_OP", 'r"\\)"', 'r"\\("', "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", "VARIABLE", "E_TYPE", "FUNCTION", context=_context, ) != 'r"\\)"' ): expr_add = self.expr_add(S, _context) while ( self._peek( "','", "QMARK", 'r"\\)"', "SORT_DESC", "SORT_ASC", "SORT_NULLSFIRST", "SORT_NULLSLAST", "CMP_OP", "R_TYPE", "'IN'", "GROUPBY", "ORDERBY", "WHERE", "HAVING", "LIMIT", "OFFSET", "WITH", "';'", "AND", "OR", context=_context, ) == "','" ): F.append(expr_add) self._scan("','", context=_context) expr_add = self.expr_add(S, _context) F.append(expr_add) self._scan('r"\\)"', context=_context) return F
[docs] def var(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "var", [S]) VARIABLE = self._scan("VARIABLE", context=_context) return VariableRef(S.get_variable(VARIABLE))
[docs] def etype(self, S, _parent=None): _context = self.Context(_parent, self._scanner, "etype", [S]) E_TYPE = self._scan("E_TYPE", context=_context) return S.get_etype(E_TYPE)
[docs] def const(self, _parent=None): _context = self.Context(_parent, self._scanner, "const", []) _token = self._peek( "NULL", "DATE", "DATETIME", "TRUE", "FALSE", "FLOAT", "INT", "STRING", "SUBSTITUTE", context=_context, ) if _token == "NULL": NULL = self._scan("NULL", context=_context) return Constant(None, None) elif _token == "DATE": DATE = self._scan("DATE", context=_context) return Constant(DATE.upper(), "Date") elif _token == "DATETIME": DATETIME = self._scan("DATETIME", context=_context) return Constant(DATETIME.upper(), "Datetime") elif _token == "TRUE": TRUE = self._scan("TRUE", context=_context) return Constant(True, "Boolean") elif _token == "FALSE": FALSE = self._scan("FALSE", context=_context) return Constant(False, "Boolean") elif _token == "FLOAT": FLOAT = self._scan("FLOAT", context=_context) return Constant(float(FLOAT), "Float") elif _token == "INT": INT = self._scan("INT", context=_context) return Constant(int(INT), "Int") elif _token == "STRING": STRING = self._scan("STRING", context=_context) return Constant(unquote(STRING), "String") else: # == 'SUBSTITUTE' SUBSTITUTE = self._scan("SUBSTITUTE", context=_context) return Constant(SUBSTITUTE[2:-2], "Substitute")
[docs]def parse(rule, text): P = Hercule(HerculeScanner(text)) return runtime.wrap_error_reporter(P, rule)
# End -- grammar generated by Yapps from warnings import warn from rql.stmts import Union, Select, Delete, Insert, Set from rql.nodes import ( SubQuery, And, Or, Not, Exists, Relation, Comparison, MathExpression, UnaryExpression, Function, Constant, VariableRef, SortTerm, )
[docs]def unquote(string): """Remove quotes from a string.""" if string.startswith('"'): return string[1:-1].replace("\\\\", "\\").replace('\\"', '"') elif string.startswith("'"): return string[1:-1].replace("\\\\", "\\").replace("\\'", "'")