Source code for eql.walkers

"""EQL walker classes."""
import re
from collections import defaultdict, deque
from contextlib import contextmanager

from .ast import TimeUnit
from .schema import Schema
from .types import NodeInfo, TypeHint
from .utils import is_string, to_unicode


__all__ = (
    "Walker",
    "RecursiveWalker",
    "ConfigurableWalker",
    "DepthFirstWalker",
)


DEFAULT_TIME_UNIT = 10000000  # Windows FileTime 0.1 microseconds


[docs]class Walker(object): """Base class that provides functionality for walking abstract syntax trees of eql.BaseNode.""" __camelcache = {} def __init__(self): """Create the AST walker.""" object.__init__(self) self._method_cache = defaultdict(dict) self.event_stack = [] self.in_pipes = [] self.base_event_types = [] self.node_stack = [] self.output_event_types = []
[docs] def register_func(self, node_cls, func, prefix="_walk_"): """Register a callback function.""" camelized = self.camelized(node_cls) method_name = prefix + camelized setattr(self, method_name, func)
[docs] def iter_node(self, node): """Iterate through a syntax tree.""" if isinstance(node, BaseNode): yield node for descendant in self.iter_node([v for v in node.iter_slots()]): yield descendant elif isinstance(node, (list, tuple)): for n in node: for descendant in self.iter_node(n): yield descendant elif isinstance(node, dict): for n in self.iter_node(node.items()): yield n
[docs] @classmethod def camelized(cls, node_cls): """Get the camelized name for the class.""" if is_string(node_cls): class_name = node_cls else: if not isinstance(node_cls, type): node_cls = type(node_cls) class_name = node_cls.__name__ if class_name not in cls.__camelcache: pass1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', class_name) pass2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', pass1) cls.__camelcache[class_name] = to_unicode(pass2.lower()) return cls.__camelcache[class_name]
@property def current_event_type(self): """Get the active event type while walking.""" if self.event_stack: return self.event_stack[-1] def _enter(self, node): self.event_stack.append(node.event_type) def _enter_event_query(self, node): self.event_stack.append(node.event_type) def _enter_piped_query(self, node): # type: (PipedQuery) -> None self.output_event_types = [] self.base_event_types = [] if isinstance(node.first, EventQuery): self.base_event_types.append(node.first.event_type) else: self.base_event_types.extend(q.query.event_type for q in node.first.queries) self.output_event_types = self.base_event_types[:] def _enter_pipe_command(self, node): self.in_pipes = True def _enter_subquery_by(self, node): self.event_stack.append(node.query.event_type) def _exit_subquery_by(self, node): self.event_stack.pop() def _exit_event_query(self, node): self.event_stack.pop() def _exit_piped_query(self, node): self.base_event_types = [] def _exit_pipe_command(self, node): """Update the output schemas as they change through each pipe.""" self.in_pipes = False incoming_schema = [Schema({event_type: {}}) for event_type in self.output_event_types] output_schemas = node.output_schemas([NodeInfo(a, TypeHint.Unknown) for a in node.arguments], incoming_schema) self.output_event_types = [next(iter(s.schema.keys())) for s in output_schemas] def _walk_default(self, node, *args, **kwargs): return node
[docs] def get_node_method(self, node_cls, prefix): # type: (BaseNode, str) -> callable """Get the walk method for a node.""" if not isinstance(node_cls, type): node_cls = type(node_cls) if node_cls in self._method_cache[prefix]: return self._method_cache[prefix][node_cls] queue = deque([node_cls]) method = None while queue: next_cls = queue.popleft() method_name = prefix + self.camelized(next_cls) method = getattr(self, method_name, None) if callable(method): break queue.extend(next_cls.__bases__) method = method or getattr(self, prefix + "default", None) self._method_cache[prefix][node_cls] = method return method
@property def active_node(self): """Get the active context.""" return self.node_stack[-1] @property def parent_node(self): """Get the parent context.""" return self.node_stack[-2]
[docs] @contextmanager def set_context(self, node): """Push a node onto the context stack.""" enter_method = self.get_node_method(node, prefix="_enter_") exit_method = self.get_node_method(node, prefix="_exit_") if callable(enter_method): enter_method(node) self.node_stack.append(node) try: yield node finally: self.node_stack.pop() if callable(exit_method): exit_method(node)
[docs] def autowalk(self, node, *args, **kwargs): """Automatically walk built-in containers.""" with self.set_context(node): if isinstance(node, list): return [self.walk(n, *args, **kwargs) for n in node] if isinstance(node, tuple): return tuple(self.walk(n, *args, **kwargs) for n in node) if isinstance(node, dict): return dict({self.walk(k, *args, **kwargs): self.walk(v, *args, **kwargs) for k, v in node.items()})
[docs] def walk(self, node, *args, **kwargs): """Walk the syntax tree top-down.""" rv = self.autowalk(node, *args, **kwargs) if rv is not None: return rv method = self.get_node_method(node, "_walk_") if callable(method): with self.set_context(node): return method(node, *args, **kwargs)
[docs]class RecursiveWalker(Walker): """Walker that will recursively walk and transform a tree.""" def _walk_base_node(self, node, *args, **kwargs): # type: (BaseNode) -> BaseNode cls = type(node) slots = self.walk([v for k, v in node.iter_slots()], *args, **kwargs) return cls(*slots) def copy_node(self, node): """Create a copy of a node.""" return self.walk(node)
[docs]class DepthFirstWalker(Walker): """Walk an AST bottom up.""" def walk(self, node, *args, **kwargs): """Walk the syntax tree top-down.""" rv = self.autowalk(node, *args, **kwargs) if rv is not None: return rv method = self.get_node_method(node, "_walk_") if callable(method): with self.set_context(node): if isinstance(node, BaseNode): slots = [self.walk(v, *args, **kwargs) for name, v in node.iter_slots()] node = type(node)(*slots) return method(node, *args, **kwargs) def copy_node(self, node): """Create a copy of a node.""" return RecursiveWalker().walk(node)
class ConfigurableWalker(RecursiveWalker): """Subclass for adding configurations to an walkers.""" def __init__(self, config=None): """Create the walker with optional configuration.""" self.config = config or {} self.stack = [] self.time_unit = self.get_config('time_unit', DEFAULT_TIME_UNIT) # type: int self._schema = None if self.get_config('schema', None) is not None: self._schema = Schema(**self.get_config('schema')) super(ConfigurableWalker, self).__init__() def convert_time_range(self, node): # type: (eql.ast.TimeRange) -> (int|float) """Convert a time range to a timestamp delta.""" tick_rate = TimeUnit.Seconds.as_milliseconds() node_ms = node.as_milliseconds() // tick_rate if not isinstance(self.time_unit, float) and self.time_unit > tick_rate and self.time_unit % tick_rate == 0: # strictly use integer math if we can safely divide the engine's rate by TimeUnits return self.time_unit * node_ms else: # if it doesn't evenly divide, resort to floating point math return float(self.time_unit) * node_ms @property def schema(self): """Get the current engine schema.""" if self._schema is None: return Schema.current() return self._schema def get_config(self, name, default=None): """Get a property from the config dict.""" return self.config.get(name, default) # circular dependency from .ast import BaseNode, EventQuery # noqa: E402