Source code for eql.walkers

"""EQL walker classes."""
import re
from collections import defaultdict, deque
from contextlib import contextmanager

from .schema import Schema
from .utils import is_string, to_unicode


__all__ = (
    "Walker",
    "RecursiveWalker",
    "ConfigurableWalker",
    "DepthFirstWalker",
)


DEFAULT_TIME_UNIT = 10000000  # Windows FileTime 0.1 microseconds


[docs]class Walker(object): """Base class that provides functionality for walking abstract syntax trees of eql.BaseNode.""" __camelcache = {} def __init__(self): """Create the AST walker.""" object.__init__(self) self._method_cache = defaultdict(dict) self.event_stack = [] self.in_pipes = [] self.base_event_types = [] self.node_stack = []
[docs] def register_func(self, node_cls, func, prefix="_walk_"): """Register a callback function.""" camelized = self.camelized(node_cls) method_name = prefix + camelized setattr(self, method_name, func)
[docs] def iter_node(self, node): """Iterate through a syntax tree.""" if isinstance(node, BaseNode): yield node for descendant in self.iter_node([v for v in node.iter_slots()]): yield descendant elif isinstance(node, (list, tuple)): for n in node: for descendant in self.iter_node(n): yield descendant elif isinstance(node, dict): for n in self.iter_node(node.items()): yield n
[docs] @classmethod def camelized(cls, node_cls): """Get the camelized name for the class.""" if is_string(node_cls): class_name = node_cls else: if not isinstance(node_cls, type): node_cls = type(node_cls) class_name = node_cls.__name__ if class_name not in cls.__camelcache: pass1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', class_name) pass2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', pass1) cls.__camelcache[class_name] = to_unicode(pass2.lower()) return cls.__camelcache[class_name]
@property def current_event_type(self): """Get the active event type while walking.""" if self.event_stack: return self.event_stack[-1] def _enter(self, node): self.event_stack.append(node.event_type) def _enter_event_query(self, node): self.event_stack.append(node.event_type) def _enter_piped_query(self, node): # type: (PipedQuery) -> None self.base_event_types = [] if isinstance(node.first, EventQuery): self.base_event_types.append(node.first.event_type) else: self.base_event_types.extend(q.query.event_type for q in node.first.queries) def _enter_pipe_command(self, node): self.in_pipes = True def _enter_subquery_by(self, node): self.event_stack.append(node.query.event_type) def _exit_subquery_by(self, node): self.event_stack.pop() def _exit_event_query(self, node): self.event_stack.pop() def _exit_piped_query(self, node): self.base_event_types = [] def _exit_pipe_command(self, node): self.in_pipes = False def _walk_default(self, node, *args, **kwargs): return node
[docs] def get_node_method(self, node_cls, prefix): # type: (BaseNode, str) -> callable """Get the walk method for a node.""" if not isinstance(node_cls, type): node_cls = type(node_cls) if node_cls in self._method_cache[prefix]: return self._method_cache[prefix][node_cls] queue = deque([node_cls]) method = None while queue: next_cls = queue.popleft() method_name = prefix + self.camelized(next_cls) method = getattr(self, method_name, None) if callable(method): break queue.extend(next_cls.__bases__) method = method or getattr(self, prefix + "default", None) self._method_cache[prefix][node_cls] = method return method
def _walk_list(self, nodes, *args, **kwargs): return [self.walk(n, *args, **kwargs) for n in nodes] def _walk_tuple(self, nodes, *args, **kwargs): return tuple(self.walk(n, *args, **kwargs) for n in nodes) def _walk_dict(self, nodes, *args, **kwargs): return dict({self.walk(k, *args, **kwargs): self.walk(v, *args, **kwargs) for k, v in nodes.items()}) @property def active_node(self): """Get the active context.""" return self.node_stack[-1] @property def parent_node(self): """Get the parent context.""" return self.node_stack[-2]
[docs] @contextmanager def set_context(self, node): """Push a node onto the context stack.""" enter_method = self.get_node_method(node, prefix="_enter_") exit_method = self.get_node_method(node, prefix="_exit_") if callable(enter_method): enter_method(node) self.node_stack.append(node) try: yield node finally: self.node_stack.pop() if callable(exit_method): exit_method(node)
[docs] def walk(self, node, *args, **kwargs): """Walk the syntax tree top-down.""" method = self.get_node_method(node, "_walk_") if callable(method): with self.set_context(node): return method(node, *args, **kwargs)
[docs]class RecursiveWalker(Walker): """Walker that will recursively walk and transform a tree.""" def _walk_base_node(self, node, *args, **kwargs): # type: (BaseNode) -> BaseNode cls = type(node) slots = [self.walk(v, *args, **kwargs) for k, v in node.iter_slots()] return cls(*slots).optimize() def copy_node(self, node): """Create a copy of a node.""" return self.walk(node)
[docs]class DepthFirstWalker(Walker): """Walk an AST bottom up.""" def walk(self, node, *args, **kwargs): """Walk the syntax tree top-down.""" method = self.get_node_method(node, "_walk_") if callable(method): with self.set_context(node): if isinstance(node, BaseNode): slots = [self.walk(v, *args, **kwargs) for name, v in node.iter_slots()] node = type(node)(*slots) return method(node, *args, **kwargs) def copy_node(self, node): """Create a copy of a node.""" return RecursiveWalker().walk(node)
class ConfigurableWalker(RecursiveWalker): """Subclass for adding configurations to an walkers.""" def __init__(self, config=None): """Create the walker with optional configuration.""" self.config = config or {} self.stack = [] self.time_unit = self.get_config('time_unit', DEFAULT_TIME_UNIT) # type: int self._schema = None if self.get_config('schema', None) is not None: self._schema = Schema(**self.get_config('schema')) super(ConfigurableWalker, self).__init__() @property def schema(self): """Get the current engine schema.""" if self._schema is None: return Schema.current() return self._schema def get_config(self, name, default=None): """Get a property from the config dict.""" return self.config.get(name, default) # circular dependency from .ast import BaseNode, EventQuery # noqa: E402