Source code for rql.stcheck

# copyright 2004-2022 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact --
# This file is part of rql.
# rql is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
# rql is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License along
# with rql. If not, see <>.
"""RQL Syntax tree annotator"""

from logilab.common.graph import has_path
from logilab.database import UnknownFunction

from rql._exceptions import BadRQLQuery
from rql.utils import function_description
from rql.nodes import (
from rql.base import BaseNode

from typing import (
    Union as Union_,

__docformat__: str = "restructuredtext en"

    # Cannot find implementation or library stub for module named "unittest_analyze"
    from unittest_analyze import DummySchema  # type:ignore[import]
    import rql

def _var_graphid(
    subvarname: str, trmap: Dict[str, str], select: "rql.stmts.Select"
) -> str:
        return trmap[subvarname]
    except KeyError:
        return subvarname + str(id(select))

[docs]def bloc_simplification( variable: Union_["rql.nodes.ColumnAlias", "rql.nodes.Variable"], term: "rql.nodes.Relation", ) -> None: try: variable.stinfo["blocsimplification"].add(term) except KeyError: variable.stinfo["blocsimplification"] = set((term,))
[docs]class GoTo(Exception): """Exception used to control the visit of the tree.""" def __init__(self, node: "rql.nodes.Relation") -> None: self.node = node
VAR_SELECTED: int = 1 VAR_HAS_TYPE_REL: int = 2 VAR_HAS_UID_REL: int = 4 VAR_HAS_REL: int = 8
[docs]class STCheckState: def __init__(self): self.errors: List[str] = [] self.under_not: List[bool] = [] self.var_info: Dict[ Union_["rql.nodes.ColumnAlias", "rql.nodes.Variable"], int ] = {}
[docs] def error(self, msg: str) -> None: self.errors.append(msg)
[docs] def add_var_info( self, var: Union_["rql.nodes.ColumnAlias", "rql.nodes.Variable"], vi: int ) -> None: try: self.var_info[var] |= vi except KeyError: self.var_info[var] = vi
[docs]class RQLSTChecker: """Check a RQL syntax tree for errors not detected on parsing. Some simple rewriting of the tree may be done too: * if a OR is used on a symmetric relation * IN function with a single child use assertions for internal error but specific `BadRQLQuery` exception for errors due to a bad rql input """ def __init__( self, schema: "DummySchema", # in practice this is a yams Schema special_relations: Optional[Dict[str, str]] = None, backend: Optional[Any] = None, ) -> None: self.schema = schema self.special_relations = special_relations or {} self.backend = backend
[docs] def check(self, node: BaseNode) -> None: state = STCheckState() self._visit(node, state) if state.errors: raise BadRQLQuery("%s\n** %s" % (node, "\n** ".join(state.errors)))
# if node.TYPE == 'select' and \ # not node.defined_vars and not node.get_restriction(): # result = [] # for term in node.selected_terms(): # result.append(term.eval(kwargs)) def _visit(self, node: BaseNode, state: Optional["STCheckState"]) -> None: try: node.accept(self, state) except GoTo as ex: self._visit(ex.node, state) else: for c in node.children: self._visit(c, state) node.leave(self, state) def _visit_selectedterm( self, node: Union_[ "rql.stmts.Set", "rql.stmts.Insert", "rql.stmts.Delete", "rql.stmts.Select" ], state: "STCheckState", ) -> None: for i, term in enumerate(node.selection): # selected terms are not included by the default visit, # accept manually each of them self._visit(term, state) def _check_selected( self, term: "rql.nodes.VariableRef", termtype: str, state: "STCheckState", ) -> None: """check that variables referenced in the given term are selected""" for variable_ref in variable_refs(term): # no stinfo yet, use references # Item "BaseNode" of "Union[BaseNode, VariableRef]" has no attribute "variable" variable_ref = cast("rql.nodes.VariableRef", variable_ref) for other_variable_ref in variable_ref.variable.references(): relation: Optional["rql.nodes.Relation"] = other_variable_ref.relation() if relation is not None: break else: message: str = ( "variable %s used in %s is not referenced by any relation" ) state.error(message % (, termtype)) # statement nodes #########################################################
[docs] def visit_union(self, node: "rql.stmts.Union", state: "STCheckState") -> None: # "BaseNode" has no attribute "selection" [attr-defined] nbselected: int = len(cast("rql.stmts.Select", node.children[0]).selection) for select in node.children[1:]: select = cast("rql.stmts.Select", select) if not len(select.selection) == nbselected: state.error( "when using union, all subqueries should have " "the same number of selected terms" )
[docs] def leave_union(self, node, state): pass
[docs] def visit_select(self, node: "rql.stmts.Select", state: "STCheckState") -> None: node.vargraph = {} # graph representing links between variable # "Select" has no attribute "aggregated" node.aggregated = set() # type:ignore[attr-defined] self._visit_selectedterm(node, state)
[docs] def leave_select(self, node: "rql.stmts.Select", state: "STCheckState") -> None: selected = node.selection # check selected variable are used in restriction if node.where is not None or len(selected) > 1: for term in selected: self._check_selected(term, "selection", state) for vref in term.iget_nodes(VariableRef): state.add_var_info(vref.variable, VAR_SELECTED) for var in node.defined_vars.values(): vinfo: int = state.var_info.get(var, 0) if ( not (vinfo & VAR_HAS_REL) and (vinfo & VAR_HAS_TYPE_REL) and not (vinfo & VAR_SELECTED) ): raise BadRQLQuery(f"unbound variable {} ({selected})") if node.groupby: # check that selected variables are used in groups for var in node.selection: if isinstance(var, VariableRef) and not any( var.is_equivalent(g) for g in node.groupby ): state.error(f"variable {var} should be grouped") for group in node.groupby: self._check_selected(group, "group", state) if node.distinct and node.orderby: # check that variables referenced in the given term are reachable from # a selected variable with only ?1 cardinality selected selectidx = frozenset( for term in selected for vref in term.get_nodes(VariableRef) ) for sortterm in node.orderby: for vref in sortterm.term.get_nodes(VariableRef): if in selectidx: continue for vname in selectidx: try: if self.has_unique_value_path(node, vname, break except KeyError: continue # unlinked variable (usually from a subquery) else: msg = ( "can't sort on variable %s which is linked to a" " variable in the selection but may have different" " values for a resulting row" ) state.error(msg %
[docs] def has_unique_value_path( self, select: "rql.stmts.Select", fromvar: str, tovar: str ) -> bool: graph = select.vargraph path: Optional[List[str]] = has_path( graph, fromvar, tovar # type:ignore[arg-type] ) if path is None: return False for var in path: try: rtype = graph[(fromvar, var)] cardidx: int = 0 except KeyError: rtype = graph[(var, fromvar)] cardidx = 1 relation_schema_for: Any = self.schema.relation_schema_for(rtype) for ( relation_definition ) in relation_schema_for.relation_definitions.values(): # XXX aggregats handling needs much probably some enhancements... # "Select" has no attribute "aggregated" if not ( var in select.aggregated # type:ignore[attr-defined] or ( relation_definition.cardinality[cardidx] in "?1" and (var == tovar or not ) ): return False fromvar = var return True
[docs] def visit_insert( self, insert: Union_[ "rql.stmts.Set", "rql.stmts.Insert", "rql.stmts.Delete", "rql.stmts.Select" ], state: "STCheckState", ) -> None: self._visit_selectedterm(insert, state)
[docs] def leave_insert(self, node, state): pass
[docs] def visit_delete(self, delete, state): self._visit_selectedterm(delete, state)
[docs] def leave_delete(self, node, state): pass
[docs] def visit_set( self, update: Union_[ "rql.stmts.Set", "rql.stmts.Insert", "rql.stmts.Delete", "rql.stmts.Select" ], state: "STCheckState", ) -> None: self._visit_selectedterm(update, state)
[docs] def leave_set(self, node, state): pass
# tree nodes ##############################################################
[docs] def visit_exists(self, node, state): pass
[docs] def leave_exists(self, node, state): pass
[docs] def visit_subquery(self, node, state): pass
[docs] def leave_subquery(self, node, state): # copy graph information we're interested in pgraph = node.parent.vargraph for select in node.query.children: # map subquery variable names to outer query variable names trmap = {} for i, vref in enumerate(node.aliases): try: subvref = select.selection[i] except IndexError: state.error( 'subquery "%s" has only %s selected terms, needs %s' % (select, len(select.selection), len(node.aliases)) ) continue if isinstance(subvref, VariableRef): trmap[] = elif ( isinstance(subvref, Function) and subvref.descr().aggregat and len(subvref.children) == 1 and isinstance(subvref.children[0], VariableRef) ): # XXX ok for MIN, MAX, but what about COUNT, AVG... trmap[subvref.children[0].name] = node.parent.aggregated.add( for key, val in select.vargraph.items(): if isinstance(key, tuple): key = ( _var_graphid(key[0], trmap, select), _var_graphid(key[1], trmap, select), ) pgraph[key] = val else: values = pgraph.setdefault(_var_graphid(key, trmap, select), []) values += [_var_graphid(v, trmap, select) for v in val]
[docs] def visit_sortterm(self, sortterm, state): term = sortterm.term if isinstance(term, Constant): for select in sortterm.root.children: if len(select.selection) < term.value: state.error(f"order column out of bound {term.value}") else: stmt = term.stmt for tvref in variable_refs(term): for vref in tvref.variable.references(): if vref.relation() or any( vref.is_equivalent(s) for s in stmt.selection ): break else: msg = "sort variable %s is not referenced any where else" state.error(msg %
[docs] def leave_sortterm(self, node, state): pass
[docs] def visit_and(self, et, state): pass # assert len(et.children) == 2, len(et.children)
[docs] def leave_and(self, node, state): pass
[docs] def visit_or(self, ou, state): # assert len(ou.children) == 2, len(ou.children) # simplify Ored expression of a symmetric relation r1, r2 = ou.children[0], ou.children[1] try: r1type = r1.r_type r2type = r2.r_type except AttributeError: return # can't be if r1type == r2type and self.schema.relation_schema_for(r1type).symmetric: lhs1, rhs1 = r1.get_variable_parts() lhs2, rhs2 = r2.get_variable_parts() try: if lhs1.variable is rhs2.variable and rhs1.variable is lhs2.variable: ou.parent.replace(ou, r1) for vref in r2.get_nodes(VariableRef): vref.unregister_reference() raise GoTo(r1) except AttributeError: pass
[docs] def leave_or(self, node, state): pass
[docs] def visit_not(self, not_, state): state.under_not.append(True)
[docs] def leave_not(self, not_, state): state.under_not.pop() # NOT normalization child = not_.children[0] if self._should_wrap_by_exists(child): not_.replace(child, Exists(child))
def _should_wrap_by_exists(self, child): if isinstance(child, Exists): return False if not isinstance(child, Relation): return True if child.r_type == "identity": return False relation_schema_for = self.schema.relation_schema_for(child.r_type) if return False # XXX no exists for `inlined` relation (allow IS NULL optimization) # unless the lhs variable is only referenced from this neged relation, # in which case it's *not* in the statement's scope, hence EXISTS should # be added anyway if relation_schema_for.inlined: references = child.children[0].variable.references() valuable = 0 for vref in references: rel = vref.relation() if rel is None or not rel.is_types_restriction(): if valuable: return False valuable = 1 return True return not child.is_types_restriction()
[docs] def visit_relation(self, relation, state): if relation.optional and state.under_not: state.error( f"can't use optional relation under NOT ({relation.as_string()})" ) lhsvar = relation.children[0].variable if relation.is_types_restriction(): if relation.optional: state.error(f'can\'t use optional relation on "{relation.as_string()}"') if state.var_info.get(lhsvar, 0) & VAR_HAS_TYPE_REL: state.error( "can only one type restriction per variable (use " "IN for %s if desired)" % ) else: state.add_var_info(lhsvar, VAR_HAS_TYPE_REL) # special case "C is NULL" # if relation.children[1].operator == 'IS': # lhs, rhs = relation.children # #assert isinstance(lhs, VariableRef), lhs # #assert isinstance(rhs.children[0], Constant) # #assert rhs.operator == 'IS', rhs.operator # #assert rhs.children[0].type == None else: state.add_var_info(lhsvar, VAR_HAS_REL) rtype = relation.r_type try: relation_schema_for = self.schema.relation_schema_for(rtype) except KeyError: state.error(f"unknown relation `{rtype}`") else: if and relation.optional not in ( None, "right", ): state.error( "optional may only be set on the rhs on final relation `%s`" % relation.r_type ) if ( self.special_relations.get(rtype) == "uid" and relation.operator() == "=" ): if state.var_info.get(lhsvar, 0) & VAR_HAS_UID_REL: state.error( "can only one uid restriction per variable " "(use IN for %s if desired)" % ) else: state.add_var_info(lhsvar, VAR_HAS_UID_REL) for vref in relation.children[1].get_nodes(VariableRef): state.add_var_info(vref.variable, VAR_HAS_REL) try: vargraph = relation.stmt.vargraph rhsvarname = relation.children[1].children[0] except AttributeError: pass else: vargraph.setdefault(, []).append(rhsvarname) vargraph.setdefault(rhsvarname, []).append( vargraph[(, rhsvarname)] = relation.r_type
[docs] def leave_relation(self, relation, state): pass
# assert isinstance(lhs, VariableRef), '%s: %s' % (lhs.__class__, # relation)
[docs] def visit_comparison(self, comparison, state): pass # assert len(comparison.children) in (1,2), len(comparison.children)
[docs] def leave_comparison(self, node, state): pass
[docs] def visit_mathexpression(self, mathexpr, state): pass # assert len(mathexpr.children) == 2, len(mathexpr.children)
[docs] def leave_mathexpression(self, node, state): pass
[docs] def visit_unaryexpression(self, unaryexpr, state): pass # assert len(unaryexpr.children) == 2, len(unaryexpr.children)
[docs] def leave_unaryexpression(self, node, state): pass
[docs] def visit_function(self, function, state): try: funcdescr = function_description( except UnknownFunction: state.error(f'unknown function "{}"') else: try: funcdescr.check_nbargs(len(function.children)) except BadRQLQuery as ex: state.error(str(ex)) if self.backend is not None: try: funcdescr.st_check_backend(self.backend, function) except BadRQLQuery as ex: state.error(str(ex)) if funcdescr.aggregat: if ( isinstance(function.children[0], Function) and function.children[0].descr().aggregat ): state.error("can't nest aggregat functions") if == "IN": # assert function.parent.operator == '=' if len(function.children) == 1: function.parent.append(function.children[0]) function.parent.remove(function)
# else: # assert len(function.children) >= 1
[docs] def leave_function(self, node, state): pass
[docs] def visit_variableref(self, variableref, state): # assert len(variableref.children)==0 # assert not variableref.parent is variableref pass
[docs] def visit_variablerefattributeaccess(self, variableref, state): pass
[docs] def visit_variablerefmethodcall(self, variableref, state): pass
[docs] def leave_variableref(self, node, state): pass
[docs] def leave_variablerefattributeaccess(self, node, state): pass
[docs] def leave_variablerefmethodcall(self, node, state): pass
[docs] def visit_constant(self, constant, state): if constant.type != "etype": return if constant.value not in self.schema: state.error(f"unknown entity type {constant.value}") if isinstance(constant.parent, Function) and == "CAST": return rel = constant.relation() if rel is not None and rel.r_type in ("is", "is_instance_of"): return state.error( "Entity types can only be used inside a CAST() " 'or with "is" relation' )
[docs] def leave_constant(self, node, state): pass
def _check_aggregat_rec(node: "rql.base.BaseNode") -> bool: for func in node.iget_nodes(Function): if func.descr().aggregat: return True for child in node.children: has_aggregat = _check_aggregat_rec(child) if has_aggregat: return True return False
[docs]class RQLSTAnnotator: """Annotate RQL syntax tree to ease further code generation from it. If an optional variable is shared among multiple scopes, it's rewritten to use identity relation. """ def __init__(self, schema, special_relations=None): self.schema = schema self.special_relations = special_relations or {}
[docs] def annotate(self, node): # assert not node.annotated node.accept(self) node.annotated = True
def _visit_stmt(self, node): for var in node.defined_vars.values(): var.prepare_annotation() for i, term in enumerate(node.selection): if _check_aggregat_rec(term): node.has_aggregat = True # register the selection column index for vref in term.get_nodes(VariableRef): vref.variable.stinfo["selected"].add(i) vref.variable.set_scope(node) if node.where is not None: node.where.accept(self, node) visit_insert = visit_delete = visit_set = _visit_stmt
[docs] def visit_union(self, node): for select in node.children: self.visit_select(select)
[docs] def visit_select(self, node): for var in node.aliases.values(): var.prepare_annotation() if node.with_ is not None: for subquery in node.with_: self.visit_union(subquery.query) subquery.query.schema = node.root.schema node.has_aggregat = False self._visit_stmt(node) if node.having: # if there is a having clause, bloc simplification of variables used in GROUPBY for term in node.groupby: for vref in term.get_nodes(VariableRef): bloc_simplification(vref.variable, term) try: vargraph = node.vargraph except AttributeError: vargraph = None # XXX node.having is a list of size 1 assert len(node.having) == 1 for term in node.having[0].get_nodes(Comparison): lhsvariables = set( vref.variable for vref in term.children[0].get_nodes(VariableRef) ) rhsvariables = set( vref.variable for vref in term.children[1].get_nodes(VariableRef) ) for var in lhsvariables | rhsvariables: var.stinfo.setdefault("having", []).append(term) if vargraph is not None: for v1 in lhsvariables: v1 = for v2 in rhsvariables: v2 = if v1 != v2: vargraph.setdefault(v1, []).append(v2) vargraph.setdefault(v2, []).append(v1) if term.optional in ("left", "both"): for var in lhsvariables: if var.stinfo["attrvar"] is not None: optcomps = var.stinfo["attrvar"].stinfo.setdefault( "optcomparisons", set() ) optcomps.add(term) if term.optional in ("right", "both"): for var in rhsvariables: if var.stinfo["attrvar"] is not None: optcomps = var.stinfo["attrvar"].stinfo.setdefault( "optcomparisons", set() ) optcomps.add(term)
[docs] def rewrite_shared_optional(self, exists, var, identity_rel_scope=None): """if variable is shared across multiple scopes, need some tree rewriting """ # allocate a new variable newvar = var.stmt.make_variable() newvar.prepare_annotation() for vref in var.references(): if vref.scope is exists: rel = vref.relation() vref.unregister_reference() newvref = VariableRef(newvar) vref.parent.replace(vref, newvref) stinfo = var.stinfo # update stinfo structure which may have already been # partially processed if rel in stinfo["rhsrelations"]: lhs, rhs = rel.get_parts() if ( vref is rhs.children[0] and self.schema.relation_schema_for(rel.r_type).final ): update_attrvars(newvar, rel, lhs) lhsvar = getattr(lhs, "variable", None) stinfo["attrvars"].remove((lhsvar, rel.r_type)) if stinfo["attrvar"] is lhsvar: if stinfo["attrvars"]: stinfo["attrvar"] = next(iter(stinfo["attrvars"])) else: stinfo["attrvar"] = None stinfo["rhsrelations"].remove(rel) newvar.stinfo["rhsrelations"].add(rel) try: stinfo["relations"].remove(rel) newvar.stinfo["relations"].add(rel) except KeyError: pass try: stinfo["optrelations"].remove(rel) newvar.add_optional_relation(rel) except KeyError: pass try: stinfo["blocsimplification"].remove(rel) bloc_simplification(newvar, rel) except KeyError: pass if stinfo["uidrel"] is rel: newvar.stinfo["uidrel"] = rel stinfo["uidrel"] = None if stinfo["typerel"] is rel: newvar.stinfo["typerel"] = rel stinfo["typerel"] = None # shared references newvar.stinfo["constnode"] = var.stinfo["constnode"] if # solutions already computed newvar.stinfo["possibletypes"] = var.stinfo["possibletypes"] for sol in sol[] = sol[] if identity_rel_scope is None: rel = exists.add_relation(var, "identity", newvar) identity_rel_scope = exists else: rel = make_relation(var, "identity", (newvar,), VariableRef) exists.parent.replace(exists, And(exists, Exists(rel))) # we have to force visit of the introduced relation self.visit_relation(rel, identity_rel_scope) return newvar
# tree nodes ##############################################################
[docs] def visit_exists(self, node, scope): node.children[0].accept(self, node)
[docs] def visit_not(self, node, scope): node.children[0].accept(self, scope)
[docs] def visit_and(self, node, scope): node.children[0].accept(self, scope) node.children[1].accept(self, scope)
visit_or = visit_and
[docs] def visit_relation(self, relation, scope): # assert relation.parent, repr(relation) lhs, rhs = relation.get_parts() # may be a constant once rqlst has been simplified lhsvar = getattr(lhs, "variable", None) if relation.is_types_restriction(): if lhsvar is not None: lhsvar.stinfo["typerel"] = relation return if relation.optional is not None: exists = relation.scope if not isinstance(exists, Exists): exists = None if lhsvar is not None: if exists is not None and lhsvar.scope is lhsvar.stmt: lhsvar = self.rewrite_shared_optional(exists, lhsvar) bloc_simplification(lhsvar, relation) if relation.optional == "both": lhsvar.add_optional_relation(relation) elif relation.optional == "left": lhsvar.add_optional_relation(relation) try: rhsvar = rhs.children[0].variable if exists is not None and rhsvar.scope is rhsvar.stmt: rhsvar = self.rewrite_shared_optional(exists, rhsvar) bloc_simplification(rhsvar, relation) if relation.optional == "right": rhsvar.add_optional_relation(relation) elif relation.optional == "both": rhsvar.add_optional_relation(relation) except AttributeError: # may have been rewritten as well pass rtype = relation.r_type relation_schema_for = self.schema.relation_schema_for(rtype) if lhsvar is not None: lhsvar.set_scope(scope) lhsvar.stinfo["relations"].add(relation) if rtype in self.special_relations: key = f"{self.special_relations[rtype]}rels" if key == "uidrels": constnode = relation.get_variable_parts()[1] if not ( relation.operator() != "=" # XXX use state to detect relation under NOT/OR # + check variable's scope or isinstance(relation.parent, Not) or relation.parent.ored() # NOT EXISTS should be treated like NOT or ( isinstance(relation.parent, Exists) and isinstance(relation.parent.parent, Not) ) ): if isinstance(constnode, Constant): lhsvar.stinfo["constnode"] = constnode if not isinstance(constnode, VariableRef): lhsvar.stinfo["uidrel"] = relation else: lhsvar.stinfo.setdefault(key, set()).add(relation) elif or relation_schema_for.inlined: bloc_simplification(lhsvar, relation) for vref in rhs.get_nodes(VariableRef): var = vref.variable var.set_scope(scope) var.stinfo["relations"].add(relation) var.stinfo["rhsrelations"].add(relation) if vref is rhs.children[0] and update_attrvars(var, relation, lhs)
[docs]def update_attrvars(var, relation, lhs): if var.stinfo["relations"] - var.stinfo["rhsrelations"]: raise BadRQLQuery( "variable %s should not be used as rhs of attribute relation %s" % (, relation) ) # stinfo['attrvars'] is set of couple (lhs variable name, relation name) # where the `var` attribute variable is used lhsvar = getattr(lhs, "variable", None) try: var.stinfo["attrvars"].add((lhsvar, relation.r_type)) except KeyError: var.stinfo["attrvars"] = set([(lhsvar, relation.r_type)]) # give priority to variable which is not in an EXISTS as # "main" attribute variable if var.stinfo["attrvar"] is None or not isinstance(relation.scope, Exists): var.stinfo["attrvar"] = lhsvar or lhs