Source code for rql.nodes

# copyright 2004-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of rql.
#
# rql is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# rql is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with rql. If not, see <http://www.gnu.org/licenses/>.
"""RQL syntax tree nodes.

This module defines all the nodes we can find in a RQL Syntax tree, except
root nodes, defined in the `stmts` module.
"""
from decimal import Decimal
from datetime import datetime, date, time, timedelta

from rql import CoercionError, RQLException
from rql.base import BaseNode, Node, BinaryNode, LeafNode
from rql.utils import function_description, uquote, common_parent, VisitableMixIn

from typing import (
    Iterable,
    FrozenSet,
    Optional,
    Mapping,
    Union as Union_,
    Type,
    TypeVar,
    Callable,
    Any,
    Iterator,
    TYPE_CHECKING,
    cast,
    Tuple,
    List,
    Dict,
)
from typing_extensions import Protocol

__docformat__: str = "restructuredtext en"

if TYPE_CHECKING:
    import rql
    import logilab


CONSTANT_TYPES: FrozenSet[Optional[str]] = frozenset(
    (
        None,
        "Date",
        "Datetime",
        "Boolean",
        "Float",
        "Int",
        "String",
        "Substitute",
        "etype",
    )
)

EtypePyobjMapType = Mapping[
    Union_[
        Type[bool],
        Type[int],
        Type[float],
        Type[Decimal],
        Type[str],
        Type[datetime],
        Type[date],
        Type[time],
        Type[timedelta],
    ],
    str,
]

ETYPE_PYOBJ_MAP: EtypePyobjMapType = {
    bool: "Boolean",
    int: "Int",
    float: "Float",
    Decimal: "Decimal",
    str: "String",
    datetime: "Datetime",
    date: "Date",
    time: "Time",
    timedelta: "Interval",
}

KeywordMapType = Mapping[str, Callable[[], Any]]
KEYWORD_MAP: KeywordMapType = {"NOW": datetime.now, "TODAY": date.today}

# definitely not clear enough
ValueType = Union_[bool, int, float, Decimal, str, datetime, date, time, timedelta]


[docs]class TranslationFunction(Protocol): def __call__(self, msg: str, context: Optional[str] = None) -> str: ...
[docs]def etype_from_pyobj(value: ValueType) -> str: """guess yams type from python value""" # note: # * Password is not selectable so no problem # * use type(value) and not value.__class__ since C instances may have no # __class__ attribute return ETYPE_PYOBJ_MAP[type(value)]
[docs]def variable_ref(var: Union_["Variable", "rql.nodes.VariableRef"]) -> "VariableRef": """get a VariableRef""" if isinstance(var, Variable): return VariableRef(var, noautoref=1) assert isinstance(var, VariableRef) return var
# Incompatible types in "yield" (actual type "BaseNode", expected type "VariableRef") [misc] # We may need to update the iget_nodes signature
[docs]def variable_refs( node: "VariableRef", ) -> Iterator[Union_["rql.base.BaseNode", "VariableRef"]]: for vref in node.iget_nodes(VariableRef): vref = cast(VariableRef, vref) if isinstance(vref.variable, Variable): yield vref
_N = TypeVar("_N", bound="BaseNode") class _HasOperatorAttribute(Protocol): operator: str def iget_nodes(self, klass: Type["_N"]) -> Iterator["_N"]: ... def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None, ) -> str: ...
[docs]class OperatorExpressionMixin:
[docs] def initargs( self: _HasOperatorAttribute, stmt: Optional["rql.stmts.Statement"] ) -> Tuple[str, Optional[str]]: """return list of arguments to give to __init__ to clone this node""" return (self.operator, None)
[docs] def is_equivalent(self: _HasOperatorAttribute, other: Any) -> bool: if not Node.is_equivalent(self, other): return False return self.operator == other.operator
[docs] def get_description( self: _HasOperatorAttribute, mainindex: int, tr: TranslationFunction ) -> Optional[str]: """if there is a variable in the math expr used as rhs of a relation, return the name of this relation, else return the type of the math expression """ try: description = tr(self.get_type()) except CoercionError: for vref in self.iget_nodes(VariableRef): vtype = vref.get_description(mainindex, tr) if vtype is not None and vtype != "Any": description = tr(vtype) return description
class _HasRelationAttribute(Protocol): def relation( self, ) -> Optional["Relation"]: ... class _HasHand(Protocol): parent: _HasRelationAttribute def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None, ) -> str: ...
[docs]class HSMixin: """mixin class for classes which may be the lhs or rhs of an expression""" __slots__: Iterable[str] = ()
[docs] def relation(self: _HasHand) -> Optional["Relation"]: """return the parent relation where self occurs or None""" try: return self.parent.relation() except AttributeError: return None
[docs] def get_description( self: _HasHand, mainindex: int, tr: TranslationFunction ) -> Optional[str]: mytype = self.get_type() if mytype != "Any": return tr(mytype) return "Any"
# rql st edition utilities ####################################################
[docs]def make_relation( var: Union_["Variable", "ColumnAlias"], rel: str, rhsargs: Tuple[Any, ...], rhsclass: type, operator: str = "=", ) -> "Relation": """build an relation equivalent to '<var> rel = <cst>'""" cmpop = Comparison(operator) cmpop.append(rhsclass(*rhsargs)) relation = Relation(rel) if hasattr(var, "variable"): # "Variable" has no attribute "variable" [union-attr] var = var.variable # type: ignore[union-attr] relation.append(VariableRef(var)) relation.append(cmpop) return relation
[docs]def make_constant_restriction( var: Union_["Variable", "ColumnAlias"], rtype: str, value: str, ctype: str, operator: str = "=", ) -> "Relation": if ctype is None: ctype = etype_from_pyobj(value) if isinstance(value, (set, frozenset, tuple, list, dict)): if len(value) > 1: rel = make_relation(var, rtype, ("IN",), Function, operator) infunc = rel.children[1].children[0] for atype in sorted(value): infunc.append(Constant(atype, ctype)) return rel value = next(iter(value)) return make_relation(var, rtype, (value, ctype), Constant, operator)
[docs]class EditableMixIn: """mixin class to add edition functionalities to some nodes, eg root nodes (statement) and Exists nodes """ __slots__: Iterable[str] = () @property def undo_manager(self) -> "rql.undo.SelectionManager": # "EditableMixIn" has no attribute "root" [attr-defined] return self.root.undo_manager # type: ignore[attr-defined] @property def should_register_op(self) -> bool: # "EditableMixIn" has no attribute "root" [attr-defined] root = self.root # type: ignore[attr-defined] # root is None during parsing return root is not None and root.should_register_op
[docs] def remove_node(self, node: Any, undefine: bool = False) -> None: """remove the given node from the tree USE THIS METHOD INSTEAD OF .remove to get correct variable references handling """ # unregister variable references in the removed subtree parent = node.parent # Item "None" of "Optional[BaseNode]" has no attribute "stmt" [union-attr] stmt = parent.stmt for varref in node.iget_nodes(VariableRef): # "BaseNode" has no attribute "unregister_reference" [attr-defined] varref.unregister_reference() # Item "None" of "Optional[Variable]" has no attribute "stinfo" [union-attr] if undefine and not varref.variable.stinfo["references"]: # Item "Statement" of "Union[Statement, None, Any]" has no # attribute "undefine_variable" stmt.undefine_variable(varref.variable) # remove return actually removed node and its parent # Item "BaseNode" of "Optional[BaseNode]" has no attribute "remove" [union-attr] node, parent, index = parent.remove(node) if self.should_register_op: from rql.undo import RemoveNodeOperation self.undo_manager.add_operation( RemoveNodeOperation(node, parent, stmt, index) )
[docs] def add_restriction(self, relation: "Relation") -> "rql.nodes.Relation": """add a restriction relation""" # "EditableMixIn" has no attribute "where" [attr-defined] r = self.where # type: ignore[attr-defined] if r is not None: newnode = And(r, relation) # "EditableMixIn" has no attribute "set_where" [attr-defined] self.set_where(newnode) # type: ignore[attr-defined] if self.should_register_op: from rql.undo import ReplaceNodeOperation self.undo_manager.add_operation(ReplaceNodeOperation(r, newnode)) else: # "EditableMixIn" has no attribute "set_where" [attr-defined] self.set_where(relation) # type: ignore[attr-defined] if self.should_register_op: from rql.undo import AddNodeOperation self.undo_manager.add_operation(AddNodeOperation(relation)) return relation
[docs] def add_constant_restriction( self, var: Union_["Variable", "ColumnAlias"], rtype: str, value: str, ctype: str, operator: str = "=", ) -> "Relation": """builds a restriction node to express a constant restriction: variable rtype = value """ restr = make_constant_restriction(var, rtype, value, ctype, operator) return self.add_restriction(restr)
[docs] def add_relation( self, lhsvar: "Variable", rtype: str, rhsvar: "rql.nodes.Variable" ) -> "Relation": """builds a restriction node to express '<var> eid <eid>'""" return self.add_restriction( make_relation(lhsvar, rtype, (rhsvar,), VariableRef) )
[docs] def add_eid_restriction( self, var: "Variable", eid: str, c_type: str = "Int" ) -> "Relation": """builds a restriction node to express '<var> eid <eid>'""" assert c_type in ( "Int", "Substitute", ), f"Error got c_type={c_type!r} in eid restriction" return self.add_constant_restriction(var, "eid", eid, c_type)
[docs] def add_type_restriction( self, var: Union_["Variable", "ColumnAlias"], etype: str ) -> "Relation": """builds a restriction node to express : variable is etype""" # what is stinfo? typerel = var.stinfo.get("typerel", None) if typerel: if typerel.r_type == "is": istarget = typerel.children[1].children[0] if isinstance(istarget, Constant): etypes: List[Any] = [ istarget.value, ] else: # Function (IN) etypes = [et.value for et in istarget.children] if isinstance(etype, str): restr_etypes = {etype} else: restr_etypes = set(etype) if restr_etypes - set(etypes): raise RQLException(f"{restr_etypes!r} not a subset of {etypes!r}") if len(etypes) > 1: # iterate a copy of children because it's modified inplace for child in istarget.children[:]: if child.value not in restr_etypes: typerel.stmt.remove_node(child) return typerel else: assert typerel.r_type == "is_instance_of" typerel.stmt.remove_node(typerel) return self.add_constant_restriction(var, "is", etype, "etype")
# base RQL nodes ##############################################################
[docs]class SubQuery(BaseNode): """WITH clause""" __slots__: Iterable[str] = ("aliases", "query") def __init__( self, aliases: Optional[List["VariableRef"]] = None, query: Optional["rql.stmts.Union"] = None, ) -> None: if aliases is not None: self.set_aliases(aliases) if query is not None: self.set_query(query)
[docs] def set_aliases(self, aliases: List["VariableRef"]) -> None: self.aliases = aliases for node in aliases: node.parent = self
[docs] def set_query(self, node: "rql.stmts.Union") -> None: self.query = node node.parent = self
[docs] def copy(self, stmt: Optional["rql.stmts.Statement"] = None) -> "SubQuery": # error: List comprehension has incompatible type List[BaseNode]; # expected List[VariableRef] [misc] return SubQuery( [cast(VariableRef, v.copy(stmt)) for v in self.aliases], self.query.copy() )
@property def children(self): return self.aliases + [self.query]
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: return "%s BEING (%s)" % ( ",".join(v.name for v in self.aliases), self.query.as_string(kwargs=kwargs), )
def __repr__(self) -> str: return f"{','.join(repr(v) for v in self.aliases)} BEING ({repr(self.query)})"
[docs]class And(BinaryNode): """a logical AND node (binary)""" __slots__: Iterable[str] = ()
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" return "%s, %s" % ( self.children[0].as_string(kwargs=kwargs), self.children[1].as_string(kwargs=kwargs), )
def __repr__(self) -> str: return f"{repr(self.children[0])} AND {repr(self.children[1])}"
[docs] def ored(self, traverse_scope: bool = False, _fromnode: Any = None) -> "Or": # Item "BaseNode" of "Optional[BaseNode]" # has no attribute "ored" [union-attr] return self.parent.ored(traverse_scope, _fromnode or self) # type: ignore[union-attr]
[docs] def neged(self, traverse_scope: bool = False, _fromnode: Any = None) -> "Not": # Item "BaseNode" of "Optional[BaseNode]" # has no attribute "neged" [union-attr] return self.parent.neged(traverse_scope, _fromnode or self) # type: ignore[union-attr]
[docs]class Or(BinaryNode): """a logical OR node (binary)""" __slots__: Iterable[str] = ()
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: return "(%s) OR (%s)" % ( self.children[0].as_string(kwargs=kwargs), self.children[1].as_string(kwargs=kwargs), )
def __repr__(self) -> str: return f"{repr(self.children[0])} OR {repr(self.children[1])}"
[docs] def ored(self, traverse_scope: bool = False, _fromnode: Any = None) -> "Or": return self
[docs] def neged(self, traverse_scope: bool = False, _fromnode: Any = None) -> "Not": # Item "BaseNode" of "Optional[BaseNode]" has no attribute "neged" [union-attr] return self.parent.neged(traverse_scope, _fromnode or self) # type: ignore[union-attr]
[docs]class Not(Node): """a logical NOT node (unary)""" __slots__: Iterable[str] = () def __init__(self, expr: Any = None): Node.__init__(self) if expr is not None: self.append(expr)
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: if isinstance(self.children[0], (Exists, Relation)): return f"NOT {self.children[0].as_string(kwargs=kwargs)}" return f"NOT ({self.children[0].as_string(kwargs=kwargs)})"
def __repr__( self, encoding: Optional[str] = None, kwargs: Optional[Dict] = None ) -> str: return f"NOT ({repr(self.children[0])})"
[docs] def ored(self, traverse_scope: bool = False, _fromnode: Any = None) -> "Or": # XXX consider traverse_scope ? # Item "BaseNode" of "Optional[BaseNode]" has no attribute "ored" [union-attr] return self.parent.ored(traverse_scope, _fromnode or self) # type: ignore[union-attr]
[docs] def neged( self, traverse_scope: bool = False, _fromnode: Any = None, strict: bool = False ) -> "Not": return self
[docs] def remove( self, child: "rql.base.BaseNode" ) -> Tuple["rql.base.BaseNode", Optional["rql.base.BaseNode"], Optional[int]]: # Item "BaseNode" of "Optional[BaseNode]" has no attribute "remove" [union-attr] return self.parent.remove(self) # type: ignore[union-attr]
# def parent_scope_property(attr): # def _get_parent_attr(self, attr=attr): # return getattr(self.parent.scope, attr) # return property(_get_parent_attr) # # editable compatibility # for method in ('remove_node', 'add_restriction', 'add_constant_restriction', # 'add_relation', 'add_eid_restriction', 'add_type_restriction'): # setattr(Not, method, parent_scope_property(method))
[docs]class Exists(EditableMixIn, BaseNode): """EXISTS sub query""" __slots__: Iterable[str] = ("query",) def __init__(self, restriction: Any = None): if restriction is not None: self.set_where(restriction) else: self.query: Any = None
[docs] def copy(self, stmt: Optional["rql.stmts.Statement"] = None) -> "Exists": # def copy(self, stmt: "rql.stmts.Statement") -> "SubQuery": new: Any = self.query.copy(stmt) return Exists(new)
@property def children(self): return (self.query,)
[docs] def append(self, node: "rql.base.BaseNode") -> None: assert self.query is None self.query = node node.parent = self
[docs] def is_equivalent(self, other): if self is other: return True raise NotImplementedError
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: content = self.query and self.query.as_string(kwargs=kwargs) return f"EXISTS({content})"
def __repr__(self) -> str: return f"EXISTS({repr(self.query)})"
[docs] def set_where(self, node: "rql.base.BaseNode") -> None: self.query = node node.parent = self
@property def where(self): return self.query
[docs] def replace( self, oldnode: "rql.base.BaseNode", newnode: "rql.base.BaseNode" ) -> Tuple["rql.base.BaseNode", "rql.base.BaseNode", Optional[int]]: assert oldnode is self.query self.query = newnode newnode.parent = self return oldnode, self, None
[docs] def remove( self, child: "rql.base.BaseNode" ) -> Tuple["rql.base.BaseNode", Optional["rql.base.BaseNode"], Optional[int]]: # Item "BaseNode" of "Optional[BaseNode]" has no attribute "remove" [union-attr] return self.parent.remove(self) # type: ignore[union-attr]
@property def scope(self): return self
[docs] def ored( self, traverse_scope: bool = False, _fromnode: Any = None ) -> Union_["Or", bool]: if not traverse_scope: if _fromnode is not None: # stop here return False return self.parent.ored(traverse_scope, self) # type: ignore[union-attr] return self.parent.ored(traverse_scope, _fromnode) # type: ignore[union-attr]
[docs] def neged( self, traverse_scope: bool = False, _fromnode: Any = None, strict: bool = None ) -> Union_["Not", bool]: if not traverse_scope: if _fromnode is not None: # stop here return False return self.parent.neged(self) # type: ignore[union-attr] elif strict: return isinstance(self.parent, Not) return self.parent.neged(traverse_scope, _fromnode) # type: ignore[union-attr]
[docs]class Relation(Node): """a RQL relation""" __slots__: Iterable[str] = ( "r_type", "optional", "_q_sqltable", "_q_needcast", ) # XXX cubicweb specific def __init__(self, r_type: str, optional: Optional[str] = None) -> None: Node.__init__(self) self.r_type = r_type self.optional: Optional[str] = None self.set_optional(optional)
[docs] def initargs( self, stmt: Optional["rql.stmts.Statement"] ) -> Tuple[str, Optional[str]]: """return list of arguments to give to __init__ to clone this node""" return self.r_type, self.optional
[docs] def is_equivalent(self, other: "Relation") -> bool: if not Node.is_equivalent(self, other): return False if self.r_type != other.r_type: return False return True
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" try: lhs: str = self.children[0].as_string(kwargs=kwargs) if self.optional in ("left", "both"): lhs += "?" rhs: str = self.children[1].as_string(kwargs=kwargs) if self.optional in ("right", "both"): rhs += "?" except IndexError: return repr(self) # not fully built relation return f"{lhs} {self.r_type} {rhs}"
def __repr__(self) -> str: if self.optional: rtype: str = f"{self.r_type}[{self.optional}]" else: rtype = self.r_type try: return f"Relation({self.children[0]!r} {rtype} {self.children[1]!r})" except IndexError: return f"Relation({self.r_type})"
[docs] def set_optional(self, optional: Optional[str]) -> None: assert optional in (None, "left", "right") if optional is not None: if self.optional and self.optional != optional: self.optional = "both" else: self.optional = optional
[docs] def relation(self): """return the parent relation where self occurs or None""" return self
[docs] def ored(self, traverse_scope: bool = False, _fromnode: Any = None) -> "Or": # Item "BaseNode" of "Optional[BaseNode]" has no attribute "ored" [union-attr] return self.parent.ored(traverse_scope, _fromnode or self) # type: ignore[union-attr]
[docs] def neged( self, traverse_scope: bool = False, _fromnode: Any = None, strict: bool = None ) -> Union_["Not", bool]: if strict: return isinstance(self.parent, Not) return self.parent.neged(traverse_scope, _fromnode or self) # type: ignore[union-attr]
[docs] def is_types_restriction(self) -> bool: if self.r_type not in ("is", "is_instance_of"): return False rhs = self.children[1] if isinstance(rhs, Comparison): rhs = rhs.children[0] # else: relation used in SET OR DELETE selection return (isinstance(rhs, Constant) and rhs.type == "etype") or ( isinstance(rhs, Function) and rhs.name == "IN" )
[docs] def operator(self) -> str: """return the operator of the relation <, <=, =, >=, > and LIKE (relations used in SET, INSERT and DELETE definitions don't have an operator as rhs) """ rhs = self.children[1] if isinstance(rhs, Comparison): return rhs.operator return "="
[docs] def get_parts(self) -> Tuple[Any, Any]: """return the left hand side and the right hand side of this relation""" lhs = self.children[0] rhs = self.children[1] return lhs, rhs
[docs] def get_variable_parts(self) -> Tuple[Any, Any]: """return the left hand side and the right hand side of this relation, ignoring comparison """ lhs = self.children[0] rhs = self.children[1].children[0] return lhs, rhs
[docs] def change_optional(self, value: str) -> None: root = self.root # error: "BaseNode" has no attribute "should_register_op" [attr-defined] if ( root is not None and root.should_register_op # type: ignore[attr-defined] and value != self.optional ): from rql.undo import SetOptionalOperation # error: "BaseNode" has no attribute "undo_manager" [attr-defined] root.undo_manager.add_operation( # type: ignore[attr-defined] SetOptionalOperation(self, self.optional) ) self.set_optional(value)
CMP_OPERATORS: FrozenSet[Optional[str]] = frozenset( ("=", "!=", "<", "<=", ">=", ">", "ILIKE", "LIKE", "REGEXP") )
[docs]class Comparison(HSMixin, Node): """handle comparisons: <, <=, =, >=, > LIKE and ILIKE operators have a unique children. """ __slots__: Iterable[str] = ("operator", "optional") def __init__( self, operator: str, value: Union_[ "Function", "Constant", "Constant", "VariableRef", "MathExpression", None, ] = None, optional: Optional[str] = None, ) -> None: Node.__init__(self) if operator == "~=": operator = "ILIKE" assert operator in CMP_OPERATORS, operator self.operator = operator self.optional = optional if value is not None: self.append(value)
[docs] def initargs( self, stmt: Optional["rql.stmts.Statement"] ) -> Tuple[str, Optional[Any], Optional[str]]: """return list of arguments to give to __init__ to clone this node""" return (self.operator, None, self.optional)
[docs] def set_optional(self, left: Optional[Any], right: Optional[Any]) -> None: if left and right: self.optional = "both" elif left: self.optional = "left" elif right: self.optional = "right"
[docs] def is_equivalent(self, other: Any) -> bool: if not Node.is_equivalent(self, other): return False return self.operator == other.operator
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" if len(self.children) == 0: return self.operator if len(self.children) == 2: lhsopt = rhsopt = "" if self.optional in ("left", "both"): lhsopt = "?" if self.optional in ("right", "both"): rhsopt = "?" return "%s%s %s %s%s" % ( self.children[0].as_string(kwargs=kwargs), lhsopt, self.operator, self.children[1].as_string(kwargs=kwargs), rhsopt, ) if self.operator == "=": return self.children[0].as_string(kwargs=kwargs) return f"{self.operator} {self.children[0].as_string(kwargs=kwargs)}"
def __repr__(self) -> str: return f"{self.operator} {', '.join(repr(c) for c in self.children)}"
[docs]class MathExpression(OperatorExpressionMixin, HSMixin, BinaryNode): """Mathematical Operators""" __slots__: Iterable[str] = ("operator",) def __init__( self, operator: str, lhs: Optional["Node"] = None, rhs: Optional["Node"] = None, ) -> None: BinaryNode.__init__(self, lhs, rhs) self.operator = operator
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" return "(%s %s %s)" % ( self.children[0].as_string(kwargs=kwargs), self.operator, self.children[1].as_string(kwargs=kwargs), )
def __repr__(self) -> str: return f"({self.children[0]!r} {self.operator} {self.children[1]!r})"
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: """return the type of object returned by this function if known solution is an optional variable/etype mapping """ # "BaseNode" has no attribute "get_type" [attr-defined] lhstype = self.children[0].get_type(solution, kwargs) # type: ignore[attr-defined] rhstype = self.children[1].get_type(solution, kwargs) # type: ignore[attr-defined] key: Tuple[str, str, str] = (self.operator, lhstype, rhstype) try: return { ("-", "Date", "Datetime"): "Interval", ("-", "Date", "TZDatetime"): "Interval", ("-", "Date", "Date"): "Interval", ("-", "Datetime", "Datetime"): "Interval", ("-", "Datetime", "TZDatetime"): "Interval", ("-", "Datetime", "Date"): "Interval", ("-", "TZDatetime", "Datetime"): "Interval", ("-", "TZDatetime", "TZDatetime"): "Interval", ("-", "TZDatetime", "Date"): "Interval", ("-", "Date", "Interval"): "Datetime", ("+", "Date", "Interval"): "Datetime", ("-", "Datetime", "Interval"): "Datetime", ("+", "Datetime", "Interval"): "Datetime", ("-", "TZDatetime", "Interval"): "TZDatetime", ("+", "TZDatetime", "Interval"): "TZDatetime", }[key] except KeyError: if lhstype == rhstype and "Date" not in str(lhstype): return rhstype if sorted((lhstype, rhstype)) == ["Float", "Int"]: return "Float" raise CoercionError(key)
[docs]class UnaryExpression(OperatorExpressionMixin, Node): """Unary Operators""" __slots__: Iterable[str] = ("operator",) def __init__(self, operator: str, child: Optional[Any] = None) -> None: Node.__init__(self) self.operator = operator if child is not None: self.append(child)
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" return f"{self.operator}{self.children[0].as_string(kwargs=kwargs)}"
def __repr__(self) -> str: return f"{self.operator}{self.children[0]!r}"
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: """return the type of object returned by this expression if known solution is an optional variable/etype mapping """ # error: "BaseNode" has no attribute "get_type" [attr-defined] return self.children[0].get_type(solution, kwargs) # type: ignore[attr-defined]
[docs]class Function(HSMixin, Node): """Class used to deal with aggregat functions (sum, min, max, count, avg) and latter upper(), lower() and other RQL transformations functions """ __slots__: Iterable[str] = ("name",) def __init__(self, name: str) -> None: Node.__init__(self) self.name = name.strip().upper()
[docs] def initargs(self, stmt: Optional["rql.stmts.Statement"]) -> Tuple[str]: """return list of arguments to give to __init__ to clone this node""" return (self.name,)
[docs] def is_equivalent(self, other: Any) -> bool: if not Node.is_equivalent(self, other): return False return self.name == other.name
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" return "%s(%s)" % ( self.name, ", ".join(c.as_string(kwargs=kwargs) for c in self.children), )
def __repr__(self) -> str: return f"{self.name}({', '.join(repr(c) for c in self.children)})"
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: """return the type of object returned by this function if known solution is an optional variable/etype mapping """ func_descr = self.descr() rtype = func_descr.rql_return_type(self) if rtype is None: # XXX support one variable ref child try: # error: "BaseNode" has no attribute "name" [attr-defined] rtype = solution and solution.get( self.children[0].name # type: ignore[attr-defined] ) except AttributeError: pass return rtype or "Any"
[docs] def get_description(self, mainindex: int, tr: TranslationFunction) -> Optional[str]: return self.descr().st_description(self, mainindex, tr)
[docs] def descr(self) -> "logilab.database.FunctionDescr": """return the type of object returned by this function if known""" return function_description(self.name)
[docs]class Constant(HSMixin, LeafNode): """String, Int, TRUE, FALSE, TODAY, NULL...""" __slots__: Iterable[str] = ("value", "type", "uid", "uidtype") def __init__( self, value: Union_[float, int, bool, str, None], c_type: str, _uid: bool = False, _uidtype: str = "", ) -> None: assert c_type in CONSTANT_TYPES, "Error got c_type=" + repr(c_type) LeafNode.__init__(self) # don't care about Node attributes self.value = value self.type = c_type # updated by the annotator/analyzer if necessary self.uid = _uid self.uidtype = _uidtype
[docs] def initargs( self, stmt: Optional["rql.stmts.Statement"] ) -> Tuple[Union_[float, int, bool, str, None], str, bool, Optional[str]]: """return list of arguments to give to __init__ to clone this node""" return (self.value, self.type, self.uid, self.uidtype)
[docs] def is_equivalent(self, other: Any) -> bool: if not LeafNode.is_equivalent(self, other): return False return self.type == other.type and self.value == other.value
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string (an unicode string is returned if encoding is None) """ if self.type is None: return "NULL" if self.type in ("etype", "Date", "Datetime", "Int", "Float"): return str(self.value) if self.type == "Boolean": return self.value and "TRUE" or "FALSE" if self.type == "Substitute": # XXX could get some type information from self.root().schema() # and linked relation if kwargs is not None: value = kwargs.get(self.value, "???") if isinstance(value, str): value = uquote(value) else: value = repr(value) return value return "%%(%s)s" % self.value if isinstance(self.value, str): return uquote(self.value) return repr(self.value)
def __repr__(self) -> str: s = self.as_string() return s
[docs] def eval(self, kwargs): if self.type == "Substitute": try: return kwargs[self.value] except (IndexError, KeyError): raise RQLException( f"{self.value} isn't provided in the query arguments: {kwargs}" ) if self.type in ("Date", "Datetime"): # TODAY, NOW return KEYWORD_MAP[self.value]() return self.value
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: if self.uid: return self.uidtype if self.type == "Substitute": if kwargs is not None: return etype_from_pyobj(self.eval(kwargs)) return "String" return self.type
[docs]class VariableRef(HSMixin, LeafNode): """a reference to a variable in the syntax tree""" __slots__: Iterable[str] = ("variable", "name") def __init__( self, variable: Union_["Variable", "ColumnAlias"], noautoref: Optional[int] = None, ) -> None: LeafNode.__init__(self) # don't care about Node attributes self.variable = variable self.name: str = variable.name if noautoref is None: self.register_reference() def _copy_variable(self, stmt: Optional["rql.stmts.Statement"]) -> "Variable": var = self.variable if isinstance(var, ColumnAlias): assert stmt is not None newvar = stmt.get_variable(self.name, var.colnum) else: assert stmt is not None newvar = stmt.get_variable(self.name) newvar.init_copy(var) return newvar
[docs] def copy(self, stmt: Optional["rql.stmts.Statement"] = None) -> "VariableRef": return VariableRef(self._copy_variable(stmt))
[docs] def is_equivalent(self, other: Any) -> bool: if not LeafNode.is_equivalent(self, other): return False return self.name == other.name
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" return self.name
def __repr__(self) -> str: return f"VarRef({self.variable!r})"
[docs] def register_reference(self) -> None: self.variable.register_reference(self)
[docs] def unregister_reference(self) -> None: self.variable.unregister_reference(self)
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: return self.variable.get_type(solution, kwargs)
[docs] def get_description(self, mainindex: int, tr: TranslationFunction) -> Optional[str]: return self.variable.get_description(mainindex, tr)
[docs] def root_selection_index(self) -> Optional[int]: """return the index of this variable reference *in the root selection* if it's selected, else None """ myidx = self.variable.selected_index() if myidx is None: return None stmt = self.stmt # Item "Statement" of "Optional[Statement]" has no attribute "parent" [union-attr] union = stmt.parent # type:ignore[union-attr] if union.parent is None: return myidx # first .parent is the SubQuery node, we want the Select node parentselect = union.parent.parent for ca in parentselect.aliases.values(): if ca.query is union and ca.colnum == myidx: caidx = ca.selected_index() if caidx is None: return None return parentselect.selection[caidx].root_selection_index() return None
[docs]class VariableRefMethodCall(VariableRef): def __init__( self, variable: Union_["Variable", "ColumnAlias"], method_to_call: str, args: List[Constant], noautoref: Optional[int] = None, ) -> None: VariableRef.__init__(self, variable=variable, noautoref=noautoref) self.method_to_call = method_to_call self.args_for_method_to_call = args
[docs] def copy( self, stmt: Optional["rql.stmts.Statement"] = None ) -> "VariableRefMethodCall": return VariableRefMethodCall( self._copy_variable(stmt), self.method_to_call, self.args_for_method_to_call )
def __repr__(self) -> str: return ( f"VarRef({self.variable})." f"{self.method_to_call}" f"({self.args_for_method_to_call})" )
[docs]class VariableRefAttributeAccess(VariableRef): def __init__( self, variable: Union_["Variable", "ColumnAlias"], attribute: str, noautoref: Optional[int] = None, ) -> None: VariableRef.__init__(self, variable=variable, noautoref=noautoref) self.attribute_to_access = attribute
[docs] def copy( self, stmt: Optional["rql.stmts.Statement"] = None ) -> "VariableRefAttributeAccess": return VariableRefAttributeAccess( self._copy_variable(stmt), self.attribute_to_access )
def __repr__(self) -> str: return f"VarRef({self.variable})." f"{self.attribute_to_access}"
[docs]class SortTerm(Node): """a sort term bind a variable to the boolean <asc> if <asc> ascendant sort else descendant sort """ __slots__: Iterable[str] = ("asc",) def __init__( self, variable: Any, asc: int = 1, nulls_sort: int = 0, copy: Optional[Any] = None, ) -> None: Node.__init__(self) self.asc = asc self.nulls_sort = nulls_sort if copy is None: self.append(variable)
[docs] def initargs( self, stmt: Optional["rql.stmts.Statement"] ) -> Tuple[None, int, int, bool]: """return list of arguments to give to __init__ to clone this node""" return (None, self.asc, self.nulls_sort, True)
[docs] def is_equivalent(self, other: Any) -> bool: if not Node.is_equivalent(self, other): return False return self.asc == other.asc and self.nulls_sort == other.nulls_sort
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: if self.asc: return f"{self.term}{self._nulls_sort_as_string}" return f"{self.term} DESC{self._nulls_sort_as_string}"
def __repr__(self) -> str: if self.asc: return f"{self.term} ASC{self._nulls_sort_as_string}" return f"{self.term} DESC{self._nulls_sort_as_string}" @property def term(self): return self.children[0] @property def _nulls_sort_as_string(self): if self.nulls_sort == 0: return "" elif self.nulls_sort == 1: return " NULLSFIRST" elif self.nulls_sort == 2: return " NULLSLAST"
###############################################################################
[docs]class Referenceable(VisitableMixIn): __slots__: Iterable[str] = ("name", "stinfo", "stmt") def __init__(self, name): self.name = name.strip() # used to collect some global information about the syntax tree self.stinfo = { # link to VariableReference objects in the syntax tree "references": set(), } # reference to the selection self.stmt = None @property def schema(self): return self.stmt.root.schema
[docs] def init_copy(self, old): # should copy variable's possibletypes on copy if not self.stinfo.get("possibletypes"): self.stinfo["possibletypes"] = old.stinfo.get("possibletypes")
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str: """return the tree as an encoded rql string""" return self.name
[docs] def register_reference(self, vref: "VariableRef") -> None: """add a reference to this variable""" self.stinfo["references"].add(vref)
[docs] def unregister_reference(self, vref: "VariableRef") -> None: """remove a reference to this variable""" try: self.stinfo["references"].remove(vref) except KeyError: # this may occur on hairy undoing pass
[docs] def references(self) -> Tuple[Any, ...]: """return all references on this variable""" return tuple(self.stinfo["references"])
[docs] def prepare_annotation(self): self.stinfo.update( { "scope": None, # relations where this variable is used on the lhs/rhs "relations": set(), "rhsrelations": set(), # selection indexes if any "selected": set(), # type restriction (e.g. "is" / "is_instance_of") where this # variable is used on the lhs "typerel": None, # uid relations (e.g. "eid") where this variable is used on the lhs "uidrel": None, # if this variable is an attribute variable (ie final entity), link # to the (prefered) attribute owner variable "attrvar": None, # constant node linked to an uid variable if any "constnode": None, } ) # remove optional st infos for key in ("optrelations", "blocsimplification", "ftirels"): self.stinfo.pop(key, None)
def _set_scope(self, key, scopenode): if scopenode is self.stmt or self.stinfo[key] is None: self.stinfo[key] = scopenode elif not (self.stinfo[key] is self.stmt or scopenode is self.stinfo[key]): self.stinfo[key] = common_parent(self.stinfo[key], scopenode).scope
[docs] def set_scope(self, scopenode): self._set_scope("scope", scopenode)
[docs] def get_scope(self): return self.stinfo["scope"]
[docs] def has_attribute_or_function_var_references(self): return any( [ varref for varref in self.references() if isinstance( varref, (VariableRefAttributeAccess, VariableRefMethodCall), ) ] )
scope = property(get_scope, set_scope)
[docs] def add_optional_relation(self, relation): try: self.stinfo["optrelations"].add(relation) except KeyError: self.stinfo["optrelations"] = set((relation,))
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: """return entity type of this object, 'Any' if not found""" if solution: return solution[self.name] if self.stinfo["typerel"]: rhs = self.stinfo["typerel"].children[1].children[0] if isinstance(rhs, Constant): return str(rhs.value) schema = self.schema if schema is not None: for rel in self.stinfo["rhsrelations"]: try: lhstype = rel.children[0].get_type(solution, kwargs) return schema.entity_schema_for(lhstype).destination(rel.r_type) except Exception: # CoertionError, AssertionError :( pass return "Any"
[docs] def get_description( self, mainindex: int, tr: TranslationFunction, none_allowed: bool = False ) -> Optional[str]: """return : * the name of a relation where this variable is used as lhs, * the entity type of this object if specified by a 'is' relation, * 'Any' if nothing nicer has been found... give priority to relation name """ if mainindex is not None: if mainindex in self.stinfo["selected"]: return ", ".join( sorted(tr(etype) for etype in self.stinfo["possibletypes"]) ) rtype = frtype = None schema = self.schema for rel in self.stinfo["relations"]: if schema is not None: rschema = schema.relation_schema_for(rel.r_type) if rschema.final: if self.name == rel.children[0].name: # ignore final relation where this variable is used as subject continue # final relation where this variable is used as object frtype = rel.r_type rtype = rel.r_type lhs, rhs = rel.get_variable_parts() # use getattr, may not be a variable ref (rewritten, constant...) rhsvar = getattr(rhs, "variable", None) if mainindex is not None: # relation to the main variable, stop searching lhsvar = getattr(lhs, "variable", None) context = None if lhsvar is not None and mainindex in lhsvar.stinfo["selected"]: if len(lhsvar.stinfo["possibletypes"]) == 1: context = next(iter(lhsvar.stinfo["possibletypes"])) return tr(rtype, context=context) if rhsvar is not None and mainindex in rhsvar.stinfo["selected"]: if len(rhsvar.stinfo["possibletypes"]) == 1: context = next(iter(rhsvar.stinfo["possibletypes"])) if schema is not None and rschema.symmetric: return tr(rtype, context=context) return tr(rtype + "_object", context=context) if rhsvar is self: rtype += "_object" if frtype is not None: return tr(frtype) if mainindex is None and rtype is not None: return tr(rtype) if none_allowed: return None return ", ".join(sorted(tr(etype) for etype in self.stinfo["possibletypes"]))
[docs] def selected_index(self): """return the index of this variable in the selection if it's selected, else None """ if not self.stinfo["selected"]: return None return next(iter(self.stinfo["selected"]))
[docs] def main_relation(self): """Return the relation where this variable is used in the rhs. It is useful for cases where this variable is final and we are looking for the entity to which it belongs. """ for ref in self.references(): rel = ref.relation() if rel is None: continue if rel.r_type != "is" and self.name != rel.children[0].name: return rel return None
[docs] def valuable_references(self): """return the number of "valuable" references : references is in selection or in a non type (is) relations """ stinfo = self.stinfo return len(stinfo["selected"]) + len(stinfo["relations"])
[docs]class ColumnAlias(Referenceable): __slots__: Iterable[str] = ( "colnum", "query", "_q_sql", "_q_sqltable", ) # XXX cubicweb specific def __init__(self, alias, colnum, query=None): super(ColumnAlias, self).__init__(alias) self.colnum = int(colnum) self.query = query def __repr__(self) -> str: return f"alias {self.name}"
[docs] def get_type( self, solution: Optional[Dict[str, str]] = None, kwargs: Optional[Dict] = None ) -> str: """return entity type of this object, 'Any' if not found""" vtype = super(ColumnAlias, self).get_type(solution, kwargs) if vtype == "Any": for select in self.query.children: vtype = select.selection[self.colnum].get_type(solution, kwargs) if vtype != "Any": return vtype return vtype
[docs] def get_description( self, mainindex: int, tr: TranslationFunction, none_allowed: bool = False ) -> Optional[str]: """return entity type of this object, 'Any' if not found""" vtype = super(ColumnAlias, self).get_description( mainindex, tr, none_allowed=True ) if vtype is None: vtypes = set() for select in self.query.children: vtype = select.selection[self.colnum].get_description(mainindex, tr) if vtype is not None: vtypes.add(vtype) if vtypes: return ", ".join(sorted(vtype for vtype in vtypes)) return vtype
[docs]class Variable(Referenceable): """ a variable definition, should not be directly added to the syntax tree (use VariableRef instead) collects information about a variable use in a syntax tree """ __slots__: Iterable[str] = ( "_q_invariant", "_q_sql", "_q_sqltable", ) # XXX ginco specific def __repr__(self) -> str: return f"{self.name}"