# copyright 2004-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of rql.
#
# rql is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# rql is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with rql. If not, see <http://www.gnu.org/licenses/>.
"""RQL library (implementation independant)."""
from math import log
import sys
import threading
if sys.version_info < (3, 10):
from importlib_metadata import version
else:
from importlib.metadata import version
from io import StringIO
from rql._exceptions import ( # noqa
UsesReservedWord,
RQLSyntaxError,
RQLException,
MissingType,
UsesReservedWord,
RQLSyntaxError,
TypeResolverException,
BadRQLQuery,
CoercionError,
)
from rql.analyze import ETypeResolver
from typing import TYPE_CHECKING, Optional, Dict, List, Any, Set, cast
__docformat__: str = "restructuredtext en"
__version__: str = version("rql")
if TYPE_CHECKING:
import rql
# REQUIRED_TYPES = ['String', 'Float', 'Int', 'Boolean', 'Date']
[docs]class RQLHelper:
"""Helper class for RQL handling
give access to methods for :
- parsing RQL strings
- variables type resolving
- comparison of two queries
"""
def __init__(
self,
schema: "rql.interfaces.ISchema",
uid_func_mapping: "rql.analyze.UIDFuncMapping" = None,
special_relations: Dict[str, str] = {},
backend: Optional[Any] = None,
) -> None:
# chech schema
# for e_type in REQUIRED_TYPES:
# if not schema.has_entity(e_type):
# raise MissingType(e_type)
# create helpers
from rql.stcheck import RQLSTChecker, RQLSTAnnotator
if uid_func_mapping:
for key in uid_func_mapping:
special_relations[key] = "uid"
self._checker = RQLSTChecker(schema, special_relations, backend)
self._annotator = RQLSTAnnotator(schema, special_relations)
self._analyser_lock = threading.Lock()
self._analyser = ETypeResolver(schema, uid_func_mapping)
# IgnoreTypeRestriction analyser
from rql.analyze import ETypeResolverIgnoreTypeRestriction
self._itr_analyser_lock = threading.Lock()
self._itr_analyser = ETypeResolverIgnoreTypeRestriction(
schema, uid_func_mapping
)
self.set_schema(schema)
[docs] def set_schema(self, schema: "rql.interfaces.ISchema") -> None:
from rql.utils import is_keyword
for etype in schema.entities():
etype = str(etype)
if is_keyword(etype) or etype.capitalize() == "Any":
raise UsesReservedWord(etype)
for rtype in schema.relations():
rtype = str(rtype)
if is_keyword(rtype):
raise UsesReservedWord(rtype)
self._checker.schema = schema
self._annotator.schema = schema
self._analyser.set_schema(schema)
[docs] def get_backend(self) -> Any:
return self._checker.backend
[docs] def set_backend(self, backend: Any) -> None:
self._checker.backend = backend
backend = property(get_backend, set_backend)
[docs] def parse(self, rqlstring: str, annotate: bool = True) -> "rql.stmts.AnyStatement":
"""Return a syntax tree created from a RQL string."""
rqlst = parse(rqlstring, False)
self._checker.check(rqlst)
if annotate:
self.annotate(rqlst)
rqlst.schema = self._annotator.schema
return rqlst
[docs] def annotate(self, rqlst: "rql.stmts.Statement") -> None:
self._annotator.annotate(rqlst)
[docs] def compute_solutions(
self,
rqlst: "rql.stmts.AnyStatement",
uid_func_mapping: "rql.analyze.UIDFuncMapping" = None,
kwargs: Optional[Dict] = None,
debug: bool = False,
) -> Set:
"""Set solutions for variables of the syntax tree.
Each solution is a dictionary with variable's name as key and
variable's type as value.
"""
with self._analyser_lock:
return self._analyser.visit(rqlst, uid_func_mapping, kwargs, debug)
[docs] def compute_all_solutions(
self, rqlst, uid_func_mapping=None, kwargs=None, debug=False
):
"""compute syntax tree solutions with all types restriction (eg
is/instance_of relations) ignored
"""
with self._itr_analyser_lock:
self._itr_analyser.visit(rqlst, uid_func_mapping, kwargs, debug)
[docs] def simplify(self, rqlst: "rql.stmts.Union") -> None:
"""Simplify `rqlst` by rewriting non-final variables associated to a const
node (if annotator say we can...)
The tree is modified in-place.
"""
# print('simplify', rqlst.as_string())
# seems like only Union as TYPE attribute, I should double check
if rqlst.TYPE == "select":
for select in rqlst.children:
self._simplify(cast("rql.stmts.Select", select))
def _simplify(self, select: "rql.stmts.Select") -> None:
# recurse on subqueries first
for subquery in select.with_:
for subselect in subquery.query.children:
self._simplify(cast("rql.stmts.Select", subselect))
rewritten = False
for var in list(select.defined_vars.values()):
stinfo = var.stinfo
if (
stinfo["constnode"]
and not stinfo.get("blocsimplification")
and not var.has_attribute_or_function_var_references()
):
uidrel = stinfo["uidrel"]
var = uidrel.children[0].variable
vconsts: List["rql.nodes.Constant"] = []
rhs = uidrel.children[1].children[0]
for vref in var.references():
rel = vref.relation()
if rel is None:
term = vref
while term.parent is not select:
term = term.parent
if any(term.is_equivalent(t) for t in select.selection):
rhs = copy_uid_node(select, rhs, vconsts)
if vref is term:
index = next(
i
for i, var in enumerate(select.selection)
if vref.is_equivalent(var)
)
select.selection[index] = rhs
rhs.parent = select
else:
vref.parent.replace(vref, rhs)
elif any(term.is_equivalent(o) for o in select.orderby):
# remove from orderby
select.remove(term)
elif not select.having:
# remove from groupby if no HAVING clause
select.remove(term)
else:
rhs = copy_uid_node(select, rhs, vconsts)
# "Iterable[Any]" has no attribute "index" [attr-defined]
select.groupby = cast(List, select.groupby)
select.groupby[select.groupby.index(vref)] = rhs
rhs.parent = select
elif rel is uidrel:
uidrel.parent.remove(uidrel)
elif rel.is_types_restriction():
stinfo["typerel"] = None
rel.parent.remove(rel)
else:
rhs = copy_uid_node(select, rhs, vconsts)
vref.parent.replace(vref, rhs)
del select.defined_vars[var.name]
stinfo["uidrel"] = None
rewritten = True
if vconsts:
select.stinfo["rewritten"][var.name] = vconsts
if rewritten and select.solutions:
select.clean_solutions()
[docs]def copy_uid_node(
select: "rql.stmts.Select",
node: "rql.nodes.Constant",
vconsts: List["rql.nodes.Constant"],
) -> "rql.nodes.Constant":
# Incompatible types in assignment
# (expression has type "BaseNode", variable has type "Constant")
node = node.copy(select) # type:ignore[assignment]
node.uid = True
vconsts.append(node)
return node
[docs]def parse(rqlstring: str, print_errors: bool = True) -> "rql.stmts.AnyStatement":
"""Return a syntax tree created from a RQL string."""
from yapps.runtime import print_error, SyntaxError, NoMoreTokens
from rql.parser import Hercule, HerculeScanner
# make sure rql string ends with a semi-colon
rqlstring = rqlstring.strip()
if rqlstring and not rqlstring.endswith(";"):
rqlstring += ";"
# parse the RQL string
parser = Hercule(HerculeScanner(rqlstring))
try:
return parser.goal()
except SyntaxError as ex:
if not print_errors:
if ex.pos is not None:
multi_lines_rql = rqlstring.splitlines()
nb_lines = len(multi_lines_rql)
if nb_lines > 5:
width = log(nb_lines, 10) + 1
template = " %%%ii: %%s" % width
rqlstring = "\n".join(
template % (idx + 1, line)
for idx, line in enumerate(multi_lines_rql)
)
padding = 30
_, line, column = ex.pos
rql_line = multi_lines_rql[line - 1]
if len(rql_line) <= (padding * 2):
around = rql_line
here_left_padding = len("around: ") + column
elif column < padding:
around = rql_line[: padding * 2] + "..."
here_left_padding = len("around: ") + column
elif (len(rql_line) - padding) < column:
around = "..." + rql_line[-(padding * 2) :]
here_left_padding = len("around: ...") + (
column - (len(rql_line) - (padding * 2))
)
else:
around = (
"..." + rql_line[column - padding : column + padding] + "..."
)
here_left_padding = len("around: ...") + padding
here = " " * (here_left_padding - 1) + "^ here"
msg = "%s in:\n%s\n\nat line %s and column %s\naround: %s\n%s" % (
ex.msg,
rqlstring,
line,
column,
around,
here,
)
else:
msg = f"{rqlstring}\n{ex.msg}"
exc = RQLSyntaxError(msg)
exc.__traceback__ = sys.exc_info()[-1]
raise exc
# try to get error message from yapps
try:
out = sys.stderr
sys.stderr = stream = StringIO()
try:
print_error(ex, parser._scanner)
finally:
sys.stderr = out
exc = RQLSyntaxError(stream.getvalue())
exc.__traceback__ = sys.exc_info()[-1]
raise exc
except ImportError: # duh?
sys.stderr = out
exc = RQLSyntaxError(
"Syntax Error",
ex.msg,
"on line",
1 + out.count("\n", 0, ex.pos), # type:ignore
)
exc.__traceback__ = sys.exc_info()[-1]
raise exc
except NoMoreTokens:
msg = "Could not complete parsing; stopped around here: \n%s"
exc = RQLSyntaxError(msg % parser._scanner)
exc.__traceback__ = sys.exc_info()[-1]
raise exc