# copyright 2004-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of rql.
#
# rql is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# rql is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with rql. If not, see <http://www.gnu.org/licenses/>.
"""Construction and manipulation of RQL syntax trees.
This module defines only first level nodes (i.e. statements). Child nodes are
defined in the nodes module
"""
from copy import deepcopy
from warnings import warn
from logilab.common.deprecation import callable_deprecated
from rql import BadRQLQuery, CoercionError, nodes
from rql.base import BaseNode, Node
from rql.utils import rqlvar_maker
from typing import (
TYPE_CHECKING,
Dict,
List,
Union as Union_,
Any,
Iterable,
Optional,
cast,
Set as Set_,
Tuple,
Iterator,
Sequence,
)
_MARKER: object = object()
__docformat__: str = "restructuredtext en"
if TYPE_CHECKING:
import rql
Solution = Dict[str, str]
SolutionsList = List[Solution]
# this is a node of the syntax tree
# used, for now, for Select.vargraph
Graph = Dict[Union_[Tuple[str, str], str], List[str]]
def _check_references(
defined: Dict[str, Union_["rql.nodes.Variable", "rql.nodes.ColumnAlias"]],
varrefs: Iterable[Union_["rql.nodes.VariableRef", "rql.base.BaseNode"]],
) -> bool:
refs = {}
for var in defined.values():
for vref in var.references():
# be careful, Variable and VariableRef define __cmp__
if not [v for v in varrefs if v is vref]:
raise AssertionError(f"vref {vref!r} is not in the tree")
refs[id(vref)] = 1
for vref in varrefs:
if id(vref) not in refs:
raise AssertionError(f"vref {vref!r} is not referenced ({vref.stmt!r})")
return True
[docs]class undo_modification:
def __init__(self, select):
self.select = select
def __enter__(self):
self.select.save_state()
def __exit__(self):
self.select.recover()
[docs]class ScopeNode(BaseNode):
def __init__(self):
# dictionnary of defined variables in the original RQL syntax tree
self.defined_vars: Dict[str, "rql.nodes.Variable"] = {}
self.with_: List["rql.nodes.SubQuery"] = []
# list of possibles solutions for used variables
self.solutions: SolutionsList = []
self._varmaker = None # variable names generator, built when necessary
self.where: Optional["rql.base.Node"] = None # where clause node
self.having: Iterable["rql.base.Node"] = () # XXX now a single node
# "ScopeNode" has no attribute "schema"
self.schema: Optional[Any] = None
# "ScopeNode" has no attribute "aliases"
self.aliases: Dict[str, "rql.nodes.ColumnAlias"] = {}
# "ScopeNode" has no attribute "undo_manager"
@property
def undo_manager(self):
try:
return self._undo_manager
except AttributeError:
from rql.undo import SelectionManager
self._undo_manager = SelectionManager(self)
return self._undo_manager
@property
def should_register_op(self):
return None
[docs] def get_selected_variables(self):
return self.selected_terms()
[docs] def set_where(self, node: "rql.base.Node") -> None:
self.where = node
node.parent = self
[docs] def set_having(self, terms: Iterable["rql.base.Node"]) -> None:
if self.should_register_op:
from rql.undo import SetHavingOperation
# "ScopeNode" has no attribute "undo_manager" [attr-defined]
# either we ignore, either we redefine
self.undo_manager.add_operation(SetHavingOperation(self, self.having))
self.having = terms
for node in terms:
node.parent = self
# Signature of "copy" incompatible with supertype "BaseNode"
# either add a parameter that won't be used, either ignore
[docs] def copy(
self,
stmt: Optional["Statement"] = None,
copy_solutions: bool = True,
solutions: SolutionsList = None,
) -> "rql.base.BaseNode":
new = self.__class__()
if self.schema is not None:
new.schema = self.schema
if solutions is not None:
new.solutions = solutions
elif copy_solutions and self.solutions:
new.solutions = deepcopy(self.solutions)
return new
# construction helper methods #############################################
[docs] def get_etype(self, name: str) -> "rql.nodes.Constant":
"""return the type object for the given entity's type name
raise BadRQLQuery on unknown type
"""
return nodes.Constant(name, "etype")
[docs] def get_variable(
self, name: str
) -> Union_["rql.nodes.Variable", "rql.nodes.ColumnAlias"]:
"""get a variable instance from its name
the variable is created if it doesn't exist yet
"""
try:
return self.defined_vars[name]
except Exception:
self.defined_vars[name] = var = nodes.Variable(name)
var.stmt = self
return var
[docs] def allocate_varname(self) -> str:
"""return an yet undefined variable name"""
if self._varmaker is None:
self._varmaker = rqlvar_maker(
defined=self.defined_vars,
# XXX only on Select node
aliases=getattr(self, "aliases", None),
)
return next(self._varmaker)
[docs] def make_variable(self) -> "rql.nodes.Variable":
"""create a new variable with an unique name for this tree"""
var = self.get_variable(self.allocate_varname())
if self.should_register_op:
from rql.undo import MakeVarOperation
self.undo_manager.add_operation(MakeVarOperation(var))
return cast("rql.nodes.Variable", var)
[docs] def set_possible_types(
self,
solutions: SolutionsList,
kwargs: Optional[Union_[object, Dict[str, str]]] = _MARKER,
key: str = "possibletypes",
) -> None:
if key == "possibletypes":
self.solutions = solutions
defined = self.defined_vars
for var in defined.values():
var.stinfo[key] = set()
for solution in solutions:
var.stinfo[key].add(solution[var.name])
# for debugging
# for sol in solutions:
# for vname in sol:
# assert vname in self.defined_vars or vname in self.aliases
[docs] def check_references(self) -> bool:
"""test function"""
try:
defined = cast(
Dict[str, Union_["rql.nodes.ColumnAlias", "rql.nodes.Variable"]],
self.aliases.copy(),
)
except AttributeError:
defined = cast(
Dict[str, Union_["rql.nodes.ColumnAlias", "rql.nodes.Variable"]],
self.defined_vars.copy(),
)
else:
defined.update(self.defined_vars)
for subq in self.with_:
subq.query.check_references()
varrefs = [
vref for vref in self.get_nodes(nodes.VariableRef) if vref.stmt is self
]
try:
_check_references(defined, varrefs)
except Exception:
print(repr(self))
raise
return True
[docs]class Statement:
"""base class for statement nodes"""
# default values for optional instance attributes, set on the instance when
# used
schema: Optional["rql.interfaces.ISchema"] = None
annotated: bool = False # set by the annotator
if TYPE_CHECKING:
def get_variable(self, name, column=None):
raise NotImplementedError()
# navigation helper methods #############################################
@property
def root(self):
"""return the root node of the tree"""
return self
@property
def stmt(self):
return self
@property
def scope(self):
return self
[docs] def ored(
self, traverse_scope: bool = False, _fromnode: Optional["rql.nodes.And"] = None
) -> Optional["rql.nodes.Or"]:
return None
[docs] def neged(
self, traverse_scope: bool = False, _fromnode: Optional["rql.nodes.Or"] = None
) -> Optional["rql.nodes.Not"]:
return None
[docs]class Union(Statement, Node):
"""the select node is the root of the syntax tree for selection statement
using UNION
"""
TYPE: str = "select"
# default values for optional instance attributes, set on the instance when
# used
undoing: bool = False # used to prevent from memorizing when undoing !
memorizing: int = 0 # recoverable modification attributes
[docs] def wrap_selects(self) -> None:
"""return a new rqlst root containing the given union as a subquery"""
child = Union()
for select in self.children[:]:
child.append(select)
self.remove_select(cast("Select", select))
newselect: "Select" = Select()
aliases: List["rql.nodes.VariableRef"] = []
for i in range(len(cast("Select", select).selection)):
aliases.append(nodes.VariableRef(newselect.make_variable()))
newselect.add_subquery(nodes.SubQuery(aliases, child), check=False)
for vref in aliases:
newselect.append_selected(nodes.VariableRef(vref.variable))
self.append_select(newselect)
def _get_offset(self) -> int:
warn("offset is now a Select node attribute", DeprecationWarning, stacklevel=2)
last_children = cast("Select", self.children[-1])
return last_children.offset
[docs] def set_offset(self, offset: int) -> None:
if len(self.children) == 1:
last_children = cast("Select", self.children[-1])
last_children.set_offset(offset)
# we have to introduce a new root
# XXX not undoable since a new root has to be introduced
self.wrap_selects()
first_child = cast("Select", self.children[0])
first_child.set_offset(offset)
offset = property(_get_offset, set_offset)
def _get_limit(self):
warn("limit is now a Select node attribute", DeprecationWarning, stacklevel=2)
return self.children[-1].limit
[docs] def set_limit(self, limit: int) -> None:
if len(self.children) == 1:
cast("Select", self.children[-1]).set_limit(limit)
return None
self.wrap_selects()
cast("Select", self.children[0]).set_limit(limit)
return None
limit = property(_get_limit, set_limit)
@property
def root(self):
"""return the root node of the tree"""
if self.parent is None:
return self
return self.parent.root
[docs] def get_description(
self,
mainindex: Optional[int] = None,
tr: Optional["rql.nodes.TranslationFunction"] = None,
) -> List[List[str]]:
"""
`mainindex`:
selection index to consider as main column, useful to get smarter
results
`tr`:
optional translation function taking a string as argument and
returning a string
"""
if tr is None:
def tr(msg, context=None):
return msg
return [
c.get_description(mainindex, tr)
for c in cast(Sequence["Select"], self.children)
]
# repr / as_string / copy #################################################
def __repr__(self) -> str:
return "\nUNION\n".join(repr(select) for select in self.children)
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str:
"""return the tree as an encoded rql string"""
strings: List[str] = [
select.as_string(kwargs=kwargs) for select in self.children
]
if len(strings) == 1:
return strings[0]
return " UNION ".join(f"({part})" for part in strings)
[docs] def copy(
self, stmt: Optional["Statement"] = None, copy_children: bool = True
) -> "Union":
new: "Union" = Union()
if self.schema is not None:
new.schema = self.schema
if copy_children:
for child in self.children:
new.append(child.copy())
assert new.children[-1].parent is new
return new
# union specific methods ##################################################
# XXX for bw compat, should now use get_variable_indices (cw > 3.8.4)
[docs] def get_variable_variables(self) -> Set_[int]:
change: Set_[int] = set()
for idx in self.get_variable_indices():
first_child = self.children[0]
vrefs = (
cast("Select", first_child).selection[idx].iget_nodes(nodes.VariableRef)
)
for vref in vrefs:
change.add(vref.name)
return change
[docs] def get_variable_indices(self) -> Set_[int]:
"""return the set of selection indexes which take different types
according to the solutions
"""
change: Set_[int] = set()
values: Dict[int, Set_] = {}
for select in self.children:
for descr in cast("Select", select).get_selection_solutions():
for i, etype in enumerate(descr):
values.setdefault(i, set()).add(etype)
for idx, etypes in values.items():
if len(etypes) > 1:
change.add(idx)
return change
def _locate_subquery(
self, col: int, etype: str, kwargs: Optional[Dict[Any, Any]] = None
) -> Tuple:
first_child = self.children[0]
has_children = len(self.children) == 1
first_child_subqueries = not cast("Select", first_child).with_
if has_children and first_child_subqueries:
return self.children[0], col
for select in cast(Sequence["Select"], self.children):
term = select.selection[col]
try:
if term.name in select.aliases:
alias = select.aliases[term.name]
return alias.query._locate_subquery(alias.colnum, etype, kwargs)
except AttributeError:
# term has no 'name' attribute
pass
for i, solution in enumerate(select.solutions):
if term.get_type(solution, kwargs) == etype:
return select, col
raise Exception(f"internal error, {etype} not found on col {col}")
[docs] def locate_subquery(
self, col: int, etype: str, kwargs: Optional[Dict] = None
) -> Any:
"""return a select node and associated selection index where root
variable at column `col` is of type `etype`
"""
try:
# Cannot determine type of '_subq_cache' [has-type]
return self._subq_cache[(col, etype)] # type: ignore[has-type]
except AttributeError:
self._subq_cache = {}
except KeyError:
pass
self._subq_cache[(col, etype)] = self._locate_subquery(col, etype, kwargs)
return self._subq_cache[(col, etype)]
[docs] def subquery_selection_index(self, subselect: Any, col: int) -> int:
"""given a select sub-query and a column index in the root query, return
the selection index for this column in the sub-query
"""
selectpath: List = []
while subselect.parent.parent is not None:
subq = subselect.parent.parent
subselect = subq.parent
selectpath.insert(0, subselect)
for select in selectpath:
col = select.selection[col].variable.colnum
return col
# recoverable modification methods ########################################
# don't use @cached: we want to be able to disable it while this must still
# be cached
@property
def undo_manager(self) -> "rql.undo.SelectionManager":
from rql.undo import SelectionManager
undo_manager = getattr(self, "_undo_manager", None)
if undo_manager:
return undo_manager
self._undo_manager = SelectionManager(self)
return self._undo_manager
@property
def should_register_op(self):
return self.memorizing and not self.undoing
[docs] def undo_modification(self) -> "undo_modification":
return undo_modification(self)
[docs] def save_state(self) -> None:
"""save the current tree"""
self.undo_manager.push_state()
self.memorizing += 1
[docs] def recover(self) -> None:
"""reverts the tree as it was when save_state() was last called"""
self.memorizing -= 1
assert self.memorizing >= 0
self.undo_manager.recover()
[docs] def check_references(self) -> bool:
"""test function"""
for select in cast(Sequence["Select"], self.children):
select.check_references()
return True
[docs] def append_select(self, select: "Select") -> None:
if self.should_register_op:
from rql.undo import AppendSelectOperation
self.undo_manager.add_operation(AppendSelectOperation(self, select))
self.children.append(select)
[docs] def remove_select(self, select: "Select") -> None:
idx: int = self.children.index(select)
if self.should_register_op:
from rql.undo import RemoveSelectOperation
self.undo_manager.add_operation(RemoveSelectOperation(self, select, idx))
self.children.pop(idx)
[docs]class Select(Statement, nodes.EditableMixIn, ScopeNode):
"""the select node is the base statement of the syntax tree for selection
statement, always child of a UNION root.
"""
vargraph: Graph = {}
parent = None
distinct: bool = False
# limit / offset
limit: Optional[int] = None
offset: int = 0
# already defined inside ScopeNode right?
# But py3 fails when I change anything here
# RecursionError: maximum recursion depth excedeed
# set by the annotator
has_aggregat: bool = False
def __init__(self):
Statement.__init__(self)
ScopeNode.__init__(self)
self.selection: List = []
# subqueries alias
self.aliases: Dict[str, "rql.nodes.ColumnAlias"] = {}
# syntax tree meta-information
self.stinfo: Dict[str, Dict] = {"rewritten": {}}
# select clauses
self.groupby: List[Any] = []
self.orderby: List[Any] = []
@property
def root(self):
"""return the root node of the tree"""
return self.parent
[docs] def get_description(
self,
mainindex: Optional[int] = None,
tr: Optional["rql.nodes.TranslationFunction"] = None,
) -> List[str]:
"""return the list of types or relations (if not found) associated to
selected variables.
mainindex is an optional selection index which should be considered has
'pivot' entity.
"""
descr: List[str] = []
for term in self.selection:
# don't translate Any
try:
descr.append(term.get_description(mainindex, tr) or "Any")
except CoercionError:
descr.append("Any")
return descr
@property
def children(self):
children = self.selection[:]
if self.groupby:
children += self.groupby
if self.orderby:
children += self.orderby
if self.where:
children.append(self.where)
if self.having:
children += self.having
if self.with_:
children += self.with_
return children
# repr / as_string / copy #################################################
def __repr__(self) -> str:
return self.as_string(userepr=True)
[docs] def as_string(self, kwargs: Optional[Dict] = None, userepr: bool = False) -> str:
"""return the tree as an encoded rql string"""
if userepr:
as_string = repr
else:
def as_string(x):
return x.as_string(kwargs=kwargs)
s = [",".join(as_string(term) for term in self.selection)]
if self.groupby:
s.append("GROUPBY " + ",".join(as_string(term) for term in self.groupby))
if self.orderby:
s.append("ORDERBY " + ",".join(as_string(term) for term in self.orderby))
if self.limit is not None:
s.append(f"LIMIT {self.limit}")
if self.offset:
s.append(f"OFFSET {self.offset}")
if self.where is not None:
s.append("WHERE " + as_string(self.where))
if self.having:
s.append("HAVING " + ",".join(as_string(term) for term in self.having))
if self.with_:
s.append("WITH " + ",".join(as_string(term) for term in self.with_))
if self.distinct:
return "DISTINCT Any " + " ".join(s)
return "Any " + " ".join(s)
[docs] def copy(
self,
stmt: Optional["Statement"] = None,
copy_solutions: bool = True,
solutions: Optional[SolutionsList] = None,
) -> "Select":
new = super().copy(self, copy_solutions, solutions)
# "ScopeNode" has no attribute .... [attr-defined]
new = cast("Select", new)
if self.with_:
new.set_with([sq.copy(new) for sq in self.with_], check=False)
for child in self.selection:
new_var_ref = child.copy(new)
new.append_selected(new_var_ref)
if self.groupby:
new.set_groupby([sq.copy(new) for sq in self.groupby])
if self.orderby:
new.set_orderby([sq.copy(new) for sq in self.orderby])
# Argument 1 to "set_where" of "ScopeNode" has incompatible type "Node"
# expected "Union[Or, Not, And, Relation]" [arg-type]
if self.where:
new.set_where(self.where.copy(new))
if self.having:
new.set_having([sq.copy(new) for sq in self.having])
new.distinct = self.distinct
new.limit = self.limit
new.offset = self.offset
new.vargraph = self.vargraph
return new
# select specific methods #################################################
[docs] def set_possible_types(
self,
solutions: SolutionsList,
kwargs: Optional[Union_[object, Dict[str, str]]] = _MARKER,
key: str = "possibletypes",
) -> None:
super(Select, self).set_possible_types(solutions, kwargs, key)
for ca in self.aliases.values():
ca.stinfo[key] = capt = set()
for solution in solutions:
capt.add(solution[ca.name])
if kwargs is _MARKER:
continue
# propagage to subqueries in case we're introducing additional
# type constraints
for stmt in ca.query.children[:]:
# better type for term
term: Any = stmt.selection[ca.colnum]
sols: List = [
sol for sol in stmt.solutions if term.get_type(sol, kwargs) in capt
]
if not sols:
ca.query.remove_select(stmt)
else:
stmt.set_possible_types(sols)
[docs] def set_statement_type(self, etype: str) -> None:
"""set the statement type for this selection
this method must be called last (i.e. once selected variables has been
added)
"""
assert self.selection
# Person P -> Any P where P is Person
if etype != "Any":
variables: List["rql.nodes.VariableRef"] = list(
self.get_selected_variables()
)
if not variables:
raise BadRQLQuery(
"Setting type in selection is only allowed "
"when some variable is selected"
)
for var in variables:
self.add_type_restriction(var.variable, etype)
[docs] def set_distinct(self, value: bool) -> None:
"""mark DISTINCT query"""
if self.should_register_op and value != self.distinct:
from rql.undo import SetDistinctOperation
self.undo_manager.add_operation(SetDistinctOperation(self.distinct, self))
self.distinct = value
[docs] def set_limit(self, limit: int) -> None:
if limit is not None and (not isinstance(limit, int) or limit <= 0):
raise BadRQLQuery(f"bad limit {limit}")
if self.should_register_op and limit != self.limit:
from rql.undo import SetLimitOperation
self.undo_manager.add_operation(SetLimitOperation(self.limit, self))
self.limit = limit
[docs] def set_offset(self, offset: int) -> None:
if offset is not None and (not isinstance(offset, int) or offset < 0):
raise BadRQLQuery(f"bad offset {offset}")
if self.should_register_op and offset != self.offset:
from rql.undo import SetOffsetOperation
self.undo_manager.add_operation(SetOffsetOperation(self.offset, self))
self.offset = offset
[docs] def set_orderby(self, terms: List["rql.nodes.SortTerm"]) -> None:
self.orderby = terms
for node in terms:
node.parent = self
[docs] def set_groupby(
self, terms: List[Union_["rql.nodes.Function", "rql.nodes.VariableRef"]]
) -> None:
self.groupby = terms
for node in terms:
node.parent = self
[docs] def set_with(self, terms: List["rql.nodes.SubQuery"], check: bool = True) -> None:
self.with_ = []
for node in terms:
self.add_subquery(node, check)
[docs] def add_subquery(self, node: "rql.nodes.SubQuery", check: bool = True) -> None:
assert node.query
node.parent = self
self.with_.append(node)
# "BaseNode" has no attribute "selection"
if check and len(node.aliases) != len(
cast("Select", node.query.children[0]).selection
):
raise BadRQLQuery(
"Should have the same number of aliases than "
"selected terms in sub-query"
)
for i, alias in enumerate(node.aliases):
if check and alias.name in self.aliases:
raise BadRQLQuery(f"Duplicated alias {alias}")
ca = self.get_variable(alias.name, i)
# "Variable" has no attribute "query"
# classes with query attribute: ColumnAlias, Exists, SubQuery
ca = cast("rql.nodes.ColumnAlias", ca)
ca.query = node.query
[docs] def remove_subquery(self, node: "rql.nodes.SubQuery") -> None:
self.with_.remove(node)
node.parent = None
for i, alias in enumerate(node.aliases):
del self.aliases[alias.name]
# Signature of "get_variable" incompatible with supertype "ScopeNode" [override]
[docs] def get_variable(
self, name: str, colnum: Optional[int] = None
) -> Union_["rql.nodes.Variable", "rql.nodes.ColumnAlias"]:
"""get a variable instance from its name
the variable is created if it doesn't exist yet
"""
if name in self.aliases.keys():
return self.aliases[name]
if colnum is not None: # take care, may be 0
self.aliases[name] = calias = nodes.ColumnAlias(name, colnum)
calias.stmt = self
# alias may already have been used as a regular variable, replace it
if name in self.defined_vars:
var = self.defined_vars.pop(name)
calias.stinfo["references"] = var.stinfo["references"]
for vref in var.references():
vref.variable = calias
return self.aliases[name]
return super(Select, self).get_variable(name)
[docs] def clean_solutions(self, solutions: Optional[SolutionsList] = None) -> None:
"""when a rqlst has been extracted from another, this method returns
solutions which make sense for this sub syntax tree
"""
if solutions is None:
solutions = self.solutions
# this may occurs with rql optimization, for instance on
# 'Any X WHERE X eid 12' query
if not (self.defined_vars or self.aliases):
self.solutions = [{}]
else:
newsolutions: SolutionsList = []
for origsol in solutions:
asol: Solution = {}
for var in self.defined_vars:
asol[var] = origsol[var]
for var in self.aliases:
asol[var] = origsol[var]
if asol not in newsolutions:
newsolutions.append(asol)
self.solutions = newsolutions
[docs] def get_selection_solutions(self) -> Set_[Tuple[str, ...]]:
"""return the set of variable names which take different type according
to the solutions
"""
descriptions: Set_ = set()
for solution in self.solutions:
descr: List = []
for term in self.selection:
try:
descr.append(term.get_type(solution=solution))
except CoercionError:
pass
descriptions.add(tuple(descr))
return descriptions
# quick accessors #########################################################
[docs] def get_selected_variables(self) -> Iterator["rql.nodes.VariableRef"]:
"""returns all selected variables, including those used in aggregate
functions
"""
for term in self.selection:
for node in term.iget_nodes(nodes.VariableRef):
yield node
# construction helper methods #############################################
[docs] def save_state(self) -> None:
"""save the current tree"""
assert self.parent is not None
self.parent.save_state()
[docs] def recover(self) -> None:
"""reverts the tree as it was when save_state() was last called"""
assert self.parent is not None
self.parent.recover()
[docs] def append_selected_method_call(
self, function_call: str, args: List[nodes.Constant] = []
) -> None:
varname, function = function_call.split(".")
term = nodes.VariableRefMethodCall(self.get_variable(varname), function, args)
self.append_selected(term)
[docs] def append_selected_attribute_access(
self,
get_attribute: str,
) -> None:
varname, attribute = get_attribute.split(".")
term = nodes.VariableRefAttributeAccess(self.get_variable(varname), attribute)
self.append_selected(term)
[docs] def append_selected(
self,
term: Union_[
"rql.nodes.Function",
"rql.nodes.MathExpression",
"rql.nodes.VariableRef",
"rql.nodes.Constant",
],
) -> None:
if isinstance(term, nodes.Constant) and term.type == "etype":
raise BadRQLQuery("Entity type are not allowed in selection")
term.parent = self
self.selection.append(term)
# XXX proprify edition, we should specify if we want:
# * undo support
# * references handling
# skipped here
[docs] def replace(
self, oldnode: "rql.base.BaseNode", newnode: "rql.base.BaseNode"
) -> Tuple["rql.base.BaseNode", "rql.base.BaseNode", Optional[int]]:
if oldnode is self.where:
self.where = cast("rql.base.Node", newnode)
elif any(oldnode.is_equivalent(s) for s in self.selection):
index = next(
i for i, s in enumerate(self.selection) if oldnode.is_equivalent(s)
)
self.selection[index] = newnode
elif any(oldnode.is_equivalent(o) for o in self.orderby):
index = next(
i for i, o in enumerate(self.orderby) if oldnode.is_equivalent(o)
)
self.orderby[index] = newnode
elif any(oldnode.is_equivalent(g) for g in self.groupby):
index = next(
i for i, g in enumerate(self.groupby) if oldnode.is_equivalent(g)
)
self.groupby[index] = newnode
elif any(oldnode.is_equivalent(h) for h in self.having):
index = next(
i for i, h in enumerate(self.having) if oldnode.is_equivalent(h)
)
# Unsupported target for indexed assignment ("Iterable[Node]") [index]
self.having = cast(List, self.having)
self.having[index] = newnode
else:
raise Exception(f"duh XXX {oldnode}")
# XXX no undo/reference support 'by design' (i.e. breaks things if you add
# it...)
oldnode.parent = None
newnode.parent = self
return oldnode, self, None
[docs] def remove(
self, node: "rql.nodes.SortTerm"
) -> Tuple["BaseNode", Optional["BaseNode"], Optional[int]]:
if node is self.where:
self.where = None
elif any(node.is_equivalent(o) for o in self.orderby):
self.remove_sort_term(node)
elif any(node.is_equivalent(g) for g in self.groupby):
self.remove_group_term(node)
elif any(node.is_equivalent(h) for h in self.having):
# "Iterable[Node]" has no attribute "remove" [attr-defined]
self.having = cast(List, self.having)
self.having.remove(node)
# XXX selection
else:
raise Exception("duh XXX")
node.parent = None
return node, self, None
[docs] def undefine_variable(self, var: "rql.nodes.Variable") -> None:
"""undefine the given variable and remove all relations where it appears"""
# remove relations where this variable is referenced
for vref in var.references():
rel = vref.relation()
if rel is not None:
self.remove_node(rel)
# XXX may have other nodes between vref and the sort term
elif isinstance(vref.parent, nodes.SortTerm):
self.remove_sort_term(vref.parent)
elif vref in self.groupby:
self.remove_group_term(vref)
else: # selected variable
self.remove_selected(vref)
# effective undefine operation
if self.should_register_op:
from rql.undo import UndefineVarOperation
solutions = [d.copy() for d in self.solutions]
self.undo_manager.add_operation(UndefineVarOperation(var, self, solutions))
for sol in self.solutions:
sol.pop(var.name, None)
del self.defined_vars[var.name]
def _var_index(
self, var: Union_["rql.nodes.Variable", "rql.nodes.VariableRef"]
) -> int:
"""get variable index in the list using identity (Variable and VariableRef
define __cmp__
"""
for i, term in enumerate(self.selection):
if term is var:
return i
raise IndexError()
[docs] def remove_selected(
self, var: Union_["rql.nodes.Variable", "rql.nodes.VariableRef"]
) -> None:
"""deletes var from selection variable"""
# assert isinstance(var, VariableRef)
index = self._var_index(var)
if self.should_register_op:
from rql.undo import UnselectVarOperation
self.undo_manager.add_operation(UnselectVarOperation(var, index))
for vref in self.selection.pop(index).iget_nodes(nodes.VariableRef):
vref.unregister_reference()
[docs] def add_selected(
self,
term: Union_["rql.nodes.Variable", "rql.nodes.VariableRef"],
index: Optional[int] = None,
) -> None:
"""override Select.add_selected to memoize modification when needed"""
if isinstance(term, nodes.Variable):
term = nodes.VariableRef(term, noautoref=1)
term.register_reference()
else:
for var in term.iget_nodes(nodes.VariableRef):
var = nodes.variable_ref(var)
var.register_reference()
if index is not None:
self.selection.insert(index, term)
term.parent = self
else:
self.append_selected(term)
if self.should_register_op:
from rql.undo import SelectVarOperation
self.undo_manager.add_operation(SelectVarOperation(term))
[docs] def add_group_var(
self,
var: Union_["rql.nodes.Variable", "rql.nodes.VariableRef"],
index: Optional[int] = None,
):
"""add var in 'orderby' constraints
asc is a boolean indicating the group order (ascendent or descendent)
"""
vref = nodes.variable_ref(var)
vref.register_reference()
if index is None:
self.groupby.append(vref)
else:
self.groupby.insert(index, vref)
vref.parent = self
if self.should_register_op:
from rql.undo import AddGroupOperation
self.undo_manager.add_operation(AddGroupOperation(vref))
[docs] def remove_group_term(self, term: "rql.base.BaseNode") -> None:
"""remove the group variable and the group node if necessary"""
if self.should_register_op:
from rql.undo import RemoveGroupOperation
self.undo_manager.add_operation(RemoveGroupOperation(term))
for vref in term.iget_nodes(nodes.VariableRef):
vref.unregister_reference()
index = next(i for i, g in enumerate(self.groupby) if term.is_equivalent(g))
del self.groupby[index]
remove_group_var = callable_deprecated("[rql 0.29] use remove_group_term instead")(
remove_group_term
)
[docs] def remove_groups(self) -> None:
for vref in self.groupby[:]:
self.remove_group_term(vref)
[docs] def add_sort_var(
self, var: "rql.nodes.Variable", asc: Optional[bool] = True
) -> None:
"""add var in 'orderby' constraints
asc is a boolean indicating the sort order (ascendent or descendent)
"""
vref = nodes.variable_ref(var)
vref.register_reference()
term = nodes.SortTerm(vref, cast(int, asc)) # cast bool to int ?
self.add_sort_term(term)
[docs] def add_sort_term(
self, term: "rql.nodes.SortTerm", index: Optional[int] = None
) -> None:
if index is None:
self.orderby.append(term)
else:
self.orderby.insert(index, term)
term.parent = self
for vref in term.iget_nodes(nodes.VariableRef):
try:
vref.register_reference()
except AssertionError:
pass # already referenced
if self.should_register_op:
from rql.undo import AddSortOperation
self.undo_manager.add_operation(AddSortOperation(term))
[docs] def remove_sort_terms(self) -> None:
if self.orderby:
for term in self.orderby[:]:
self.remove_sort_term(term)
[docs] def remove_sort_term(self, term: "rql.nodes.SortTerm") -> None:
"""remove a sort term and the sort node if necessary"""
if self.should_register_op:
from rql.undo import RemoveSortOperation
self.undo_manager.add_operation(RemoveSortOperation(term))
for vref in term.iget_nodes(nodes.VariableRef):
vref.unregister_reference()
self.orderby.remove(term)
[docs] def select_only_variables(self) -> None:
selection: List["rql.nodes.VariableRef"] = []
for term in self.selection:
for vref in term.iget_nodes(nodes.VariableRef):
if not any(vref.is_equivalent(s) for s in selection):
vref.parent = self
selection.append(vref)
self.selection = selection
[docs]class Delete(Statement, ScopeNode):
"""the Delete node is the root of the syntax tree for deletion statement"""
TYPE = "delete"
def __init__(self):
Statement.__init__(self)
ScopeNode.__init__(self)
self.main_variables = []
self.main_relations = []
@property
def children(self):
children = self.selection[:]
children += self.main_relations
if self.where:
children.append(self.where)
if self.having:
children += self.having
return children
@property
def selection(self) -> List["rql.nodes.VariableRef"]:
return [vref for et, vref in self.main_variables]
[docs] def add_main_variable(self, etype: str, vref: "rql.nodes.VariableRef") -> None:
"""add a variable to the list of deleted variables"""
# if etype == 'Any':
# raise BadRQLQuery('"Any" is not supported in DELETE statement')
vref.parent = self
self.main_variables.append((etype, vref))
[docs] def add_main_relation(self, relation):
"""add a relation to the list of deleted relations"""
assert isinstance(relation.children[0], nodes.VariableRef)
assert isinstance(relation.children[1], nodes.Comparison)
assert isinstance(relation.children[1].children[0], nodes.VariableRef)
relation.parent = self
self.main_relations.append(relation)
# repr / as_string / copy #################################################
def __repr__(self) -> str:
result = ["DELETE"]
if self.main_variables:
result.append(
", ".join([f"{etype!r} {var!r}" for etype, var in self.main_variables])
)
if self.main_relations:
if self.main_variables:
result.append(",")
result.append(", ".join([repr(rel) for rel in self.main_relations]))
if self.where is not None:
result.append(repr(self.where))
if self.having:
result.append("HAVING " + ",".join(repr(term) for term in self.having))
return " ".join(result)
[docs] def as_string(self, kwargs: Optional[Dict] = None) -> str:
"""return the tree as an encoded rql string"""
result = ["DELETE"]
if self.main_variables:
result.append(
", ".join([f"{etype} {var}" for etype, var in self.main_variables])
)
if self.main_relations:
if self.main_variables:
result.append(",")
result.append(
", ".join([rel.as_string(kwargs=kwargs) for rel in self.main_relations])
)
if self.where is not None:
result.append("WHERE " + self.where.as_string(kwargs=kwargs))
if self.having:
result.append(
"HAVING "
+ ",".join(term.as_string(kwargs=kwargs) for term in self.having)
)
return " ".join(result)
# Signature of "copy" incompatible with supertype "ScopeNode" [override]
# Signature of "copy" incompatible with supertype "BaseNode" [override]
[docs] def copy(self) -> "Delete": # type:ignore[override]
new = Delete()
for etype, var in self.main_variables:
vref = nodes.VariableRef(new.get_variable(var.name))
new.add_main_variable(etype, vref)
for child in self.main_relations:
new.add_main_relation(child.copy(new))
if self.where:
new.set_where(self.where.copy(new))
if self.having:
new.set_having([sq.copy(new) for sq in self.having])
return new
[docs]class Insert(Statement, ScopeNode):
"""the Insert node is the root of the syntax tree for insertion statement"""
TYPE = "insert"
def __init__(self):
Statement.__init__(self)
ScopeNode.__init__(self)
self.main_variables = []
self.main_relations = []
self.inserted_variables = {}
@property
def children(self):
children = self.selection[:]
children += self.main_relations
if self.where:
children.append(self.where)
if self.having:
children += self.having
return children
@property
def selection(self):
return [vref for et, vref in self.main_variables]
[docs] def add_main_variable(self, etype: str, vref: "rql.nodes.VariableRef") -> None:
"""add a variable to the list of inserted variables"""
if etype == "Any":
raise BadRQLQuery('"Any" is not supported in INSERT statement')
self.main_variables.append((etype, vref))
vref.parent = self
self.inserted_variables[vref.variable] = 1
[docs] def add_main_relation(self, relation: "rql.nodes.Relation") -> None:
"""add a relation to the list of inserted relations"""
var = cast("rql.nodes.VariableRef", relation.children[0]).variable
rhs = relation.children[1]
if var not in self.inserted_variables:
if isinstance(rhs, nodes.Constant):
msg = "Using variable %s in declaration but %s is not an \
insertion variable"
raise BadRQLQuery(msg % (var, var))
relation.parent = self
self.main_relations.append(relation)
# repr / as_string / copy #################################################
def __repr__(self) -> str:
result = ["INSERT"]
result.append(
", ".join([f"{etype!r} {var!r}" for etype, var in self.main_variables])
)
if self.main_relations:
result.append(":")
result.append(", ".join([repr(rel) for rel in self.main_relations]))
if self.where is not None:
result.append("WHERE " + repr(self.where))
if self.having:
result.append("HAVING " + ",".join(repr(term) for term in self.having))
return " ".join(result)
[docs] def as_string(self, kwargs: Optional[Any] = None) -> str:
"""return the tree as an encoded rql string"""
result = ["INSERT"]
result.append(
", ".join([f"{etype} {var}" for etype, var in self.main_variables])
)
if self.main_relations:
result.append(":")
result.append(
", ".join([rel.as_string(kwargs=kwargs) for rel in self.main_relations])
)
if self.where is not None:
result.append("WHERE " + self.where.as_string(kwargs=kwargs))
if self.having:
result.append(
"HAVING "
+ ",".join(term.as_string(kwargs=kwargs) for term in self.having)
)
return " ".join(result)
# Signature of "copy" incompatible with supertype "ScopeNode" [override]
# Signature of "copy" incompatible with supertype "BaseNode" [override]
[docs] def copy(self) -> "Insert": # type:ignore[override]
new = Insert()
for etype, var in self.main_variables:
vref = nodes.VariableRef(new.get_variable(var.name))
new.add_main_variable(etype, vref)
for child in self.main_relations:
new.add_main_relation(child.copy(new))
if self.where:
new.set_where(self.where.copy(new))
if self.having:
new.set_having([sq.copy(new) for sq in self.having])
return new
[docs]class Set(Statement, ScopeNode):
"""the Set node is the root of the syntax tree for update statement"""
TYPE = "set"
def __init__(self):
Statement.__init__(self)
ScopeNode.__init__(self)
self.main_relations = []
@property
def children(self):
children = self.main_relations[:]
if self.where:
children.append(self.where)
if self.having:
children += self.having
return children
@property
def selection(self):
return []
[docs] def add_main_relation(self, relation: "rql.nodes.Relation") -> None:
"""add a relation to the list of modified relations"""
relation.parent = self
self.main_relations.append(relation)
# repr / as_string / copy #################################################
def __repr__(self) -> str:
result = ["SET"]
result.append(", ".join(repr(rel) for rel in self.main_relations))
if self.where is not None:
result.append("WHERE " + repr(self.where))
if self.having:
result.append("HAVING " + ",".join(repr(term) for term in self.having))
return " ".join(result)
[docs] def as_string(self, kwargs: Optional[Any] = None) -> str:
"""return the tree as an encoded rql string"""
result = ["SET"]
result.append(
", ".join(rel.as_string(kwargs=kwargs) for rel in self.main_relations)
)
if self.where is not None:
result.append("WHERE " + self.where.as_string(kwargs=kwargs))
if self.having:
result.append(
"HAVING "
+ ",".join(term.as_string(kwargs=kwargs) for term in self.having)
)
return " ".join(result)
# Signature of "copy" incompatible with supertype "ScopeNode" [override]
# Signature of "copy" incompatible with supertype "BaseNode" [override]
[docs] def copy(self) -> "Set": # type:ignore[override]
new = Set()
for child in self.main_relations:
new.add_main_relation(child.copy(new))
if self.where:
new.set_where(self.where.copy(new))
if self.having:
new.set_having([sq.copy(new) for sq in self.having])
return new
AnyStatement = Union_[
Set,
Delete,
Union,
Insert,
]
AnyScopeNode = Union_[
Set,
Delete,
Insert,
Select,
]