# copyright 2004-2021 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of rql.
#
# rql is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# rql is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with rql. If not, see <http://www.gnu.org/licenses/>.
"""Manages undos on RQL syntax trees."""
from rql.nodes import Exists, VariableRef, BinaryNode
from rql.stmts import Select
from typing import (
List,
Any,
Optional,
TYPE_CHECKING,
Union as Union_,
)
if TYPE_CHECKING:
import rql
__docformat__: str = "restructuredtext en"
Op = Union_[
"NodeOperation",
"MakeVarOperation",
"UndefineVarOperation",
"SelectVarOperation",
"UnselectVarOperation",
"AddNodeOperation",
"ReplaceNodeOperation",
"RemoveNodeOperation",
"AddSortOperation",
"RemoveSortOperation",
"AddGroupOperation",
"RemoveGroupOperation",
"SetDistinctOperation",
"SetOffsetOperation",
"SetLimitOperation",
"SetOptionalOperation",
"SetHavingOperation",
"AppendSelectOperation",
"RemoveSelectOperation",
]
[docs]class SelectionManager:
"""Manage the operation stacks."""
def __init__(self, selection: Union_["rql.stmts.ScopeNode", "rql.stmts.Union"]):
self._selection = selection # The selection tree
self.op_list: List[Op] = [] # The operations we'll have to undo
self.state_stack: List[int] = [] # The save_state()'s index stack
[docs] def push_state(self) -> None:
"""defines current state as the new 'start' state"""
self.state_stack.append(len(self.op_list))
[docs] def recover(self) -> None:
"""recover to the latest pushed state"""
last_state_index: int = self.state_stack.pop()
# if last_index == 0, then there's no intermediate state => undo all !
for i in self.op_list[:-last_state_index] or self.op_list[:]:
self.undo()
[docs] def add_operation(self, operation: Op) -> None:
"""add an operation to the current ones"""
# stores operations in reverse order :
self.op_list.insert(0, operation)
[docs] def undo(self) -> None:
"""undo the latest operation"""
assert len(self.op_list) > 0
op: Op = self.op_list.pop(0)
# Item "ScopeNode" of "Union[ScopeNode, rql.stmts.Union]"
# has no attribute "undoing"
# ignoring for now. Maybe we can add the attribute with the classes
self._selection.undoing = 1 # type:ignore[union-attr]
op.undo(self._selection)
self._selection.undoing = 0 # type:ignore[union-attr]
[docs] def flush(self) -> None:
"""flush the current operations"""
self.op_list = []
class NodeOperation:
"""Abstract class for node manipulation operations."""
def __init__(
self,
node: Union_[
"rql.nodes.ColumnAlias",
"rql.nodes.Variable",
"rql.nodes.VariableRef",
"rql.nodes.Relation",
"rql.nodes.SortTerm",
],
stmt: Optional["rql.stmts.Statement"] = None,
):
self.node = node
if stmt is None:
stmt = node.stmt
self.stmt = stmt
def __str__(self) -> str:
"""undo the operation on the selection"""
return f"{self.__class__.__name__} {self.node}"
# and define this as abstract method?
def undo(self, selection: Any) -> None:
raise NotImplementedError
# Undo for variable manipulation operations ##################################
[docs]class MakeVarOperation(NodeOperation):
"""Defines how to undo make_variable()."""
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.undefine_variable(self.node)
[docs]class UndefineVarOperation(NodeOperation):
"""Defines how to undo undefine_variable()."""
def __init__(self, node, stmt, solutions):
NodeOperation.__init__(self, node, stmt)
self.solutions = solutions
[docs] def undo(self, selection):
"""undo the operation on the selection"""
var = self.node
self.stmt.defined_vars[var.name] = var
self.stmt.solutions = self.solutions
[docs]class SelectVarOperation(NodeOperation):
"""Defines how to undo add_selected()."""
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.remove_selected(self.node)
[docs]class UnselectVarOperation(NodeOperation):
"""Defines how to undo unselect_var()."""
def __init__(self, var, pos):
NodeOperation.__init__(self, var)
self.index = pos
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.add_selected(self.node, self.index)
# Undo for node operations ####################################################
[docs]class AddNodeOperation(NodeOperation):
"""Defines how to undo add_node()."""
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.remove_node(self.node)
[docs]class ReplaceNodeOperation:
"""Defines how to undo 'replace node'."""
def __init__(self, old_node, new_node):
self.old_node = old_node
self.new_node = new_node
[docs] def undo(self, selection):
"""undo the operation on the selection"""
# unregister reference from the inserted node
for varref in self.new_node.iget_nodes(VariableRef):
varref.unregister_reference()
# register reference from the removed node
for varref in self.old_node.iget_nodes(VariableRef):
varref.register_reference()
self.new_node.parent.replace(self.new_node, self.old_node)
def __str__(self):
return f"ReplaceNodeOperation {self.old_node} by {self.new_node}"
[docs]class RemoveNodeOperation(NodeOperation):
"""Defines how to undo remove_node()."""
def __init__(self, node, parent, stmt, index):
NodeOperation.__init__(self, node, stmt)
self.node_parent = parent
if index is None:
assert isinstance(parent, (Exists, Select)), (node, parent)
self.index = index
# XXX FIXME : find a better way to do that
self.binary_remove = isinstance(node, BinaryNode)
[docs] def undo(self, selection):
"""undo the operation on the selection"""
parent = self.node_parent
if self.index is None:
if isinstance(parent, Select):
parent.where = self.node
else: # Exists
parent.query = self.node
sibling = self.node
if self.binary_remove:
# if 'parent' was a BinaryNode, then first reinsert the removed node
# at the same pos in the original 'parent' Binary Node, and then
# reinsert this BinaryNode in its parent's children list
# WARNING : the removed node sibling's parent is no longer the
# 'node_parent'. We must Reparent it manually !
if self.index is not None:
sibling = self.node_parent.children[self.index]
parent.children[self.index] = self.node
sibling.parent = self.node
elif self.index is not None:
parent.insert(self.index, self.node)
# register reference from the removed node
self.node.parent = parent
for varref in self.node.iget_nodes(VariableRef):
varref.register_reference()
[docs]class AddSortOperation(NodeOperation):
"""Defines how to undo 'add sort'."""
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.remove_sort_term(self.node)
class RemoveSortOperation(NodeOperation):
"""Defines how to undo 'remove sort'."""
def __init__(self, node):
NodeOperation.__init__(self, node)
self.index = self.stmt.orderby.index(self.node)
def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.add_sort_term(self.node, self.index)
[docs]class AddGroupOperation(NodeOperation):
"""Defines how to undo 'add group'."""
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.remove_group_term(self.node)
class RemoveGroupOperation(NodeOperation):
"""Defines how to undo 'remove group'."""
def __init__(self, node):
NodeOperation.__init__(self, node)
self.index = self.stmt.groupby.index(self.node)
def undo(self, selection):
"""undo the operation on the selection"""
self.stmt.add_group_var(self.node, self.index)
# misc operations #############################################################
class ChangeValueOperation:
def __init__(self, previous_value, node=None):
self.value = previous_value
self.node = node
[docs]class SetDistinctOperation(ChangeValueOperation):
"""Defines how to undo 'set_distinct'."""
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.node.distinct = self.value
class SetOffsetOperation(ChangeValueOperation):
"""Defines how to undo 'set_offset'."""
def undo(self, selection):
"""undo the operation on the selection"""
self.node.offset = self.value
class SetLimitOperation(ChangeValueOperation):
"""Defines how to undo 'set_limit'."""
def undo(self, selection):
"""undo the operation on the selection"""
self.node.limit = self.value
[docs]class SetOptionalOperation(ChangeValueOperation):
"""Defines how to undo 'set_limit'."""
def __init__(self, rel, previous_value):
self.rel = rel
self.value = previous_value
[docs] def undo(self, selection):
"""undo the operation on the selection"""
self.rel.optional = self.value
class SetHavingOperation:
"""Defines how to undo 'set_having'."""
def __init__(self, select, previous_value):
self.select = select
self.value = previous_value
def undo(self, selection):
"""undo the operation on the selection"""
for term in self.select.having:
# Unregister any VariableRef in the HAVING clause which would
# otherwise be attempted to be undefined whereas they are not
# actually defined.
for varref in term.iget_nodes(VariableRef):
varref.unregister_reference()
self.select.having = self.value
# Union operations ############################################################
class AppendSelectOperation:
"""Defines how to undo append_select()."""
def __init__(self, union, select):
self.union = union
self.select = select
def undo(self, selection):
"""undo the operation on the union's children"""
self.select.parent = self.union
self.union.children.remove(self.select)
class RemoveSelectOperation(AppendSelectOperation):
"""Defines how to undo append_select()."""
def __init__(self, union, select, origindex):
AppendSelectOperation.__init__(self, union, select)
self.origindex = origindex
def undo(self, selection):
"""undo the operation on the union's children"""
self.union.insert(self.origindex, self.select)
__all__ = (
"SelectionManager",
"MakeVarOperation",
"UndefineVarOperation",
"SelectVarOperation",
"UnselectVarOperation",
"AddNodeOperation",
"ReplaceNodeOperation",
"RemoveNodeOperation",
"AddSortOperation",
"AddGroupOperation",
"SetOptionalOperation",
"SetDistinctOperation",
)