Source code for eql.engine

"""EQL engine in native python."""
from __future__ import print_function

import json
import functools
from collections import defaultdict, deque, OrderedDict, namedtuple

from .ast import *  # noqa: F403
from .errors import EqlCompileError
from .pipes import *  # noqa: F403
from .transpilers import BaseEngine, BaseTranspiler
from .events import Event, AnalyticOutput
from .schema import EVENT_TYPE_ANY, EVENT_TYPE_GENERIC
from .utils import is_string, is_array, is_number, get_type_converter, fold_case

PIPE_EOF = object()


class Scope(namedtuple('Scope', ['events', 'variables'])):
    """Used for passing variables that may be referenced by nested callback functions."""

    @property
    def event(self):
        """Get the first event."""
        return self.events[0]

    def call(self, fn, *args):
        """Call a function by temporarily adding variables to the scope."""
        size = len(self.variables)
        self.variables.extend(args)
        status = fn(self)
        self.variables[size:] = []
        return status


[docs]class PythonEngine(BaseEngine, BaseTranspiler): """Converter from EQL to Python callbacks.""" def __init__(self, config=None): """Create a python engine for EQL.""" super(PythonEngine, self).__init__(config) self._output_hooks = [] self._any_event_hooks = [] # Any new list of hooks will automatically inherit from the global list self._event_hooks = defaultdict(lambda: list(self._any_event_hooks)) self._functions = {} self._query_multiple_events = True self._in_pipe = False self._query_pipes = [] self._reducer_hooks = defaultdict(list) self.host_key = self.get_config('host_key', 'hostname') self.pid_key = self.get_config('pid_key', 'pid') self.ppid_key = self.get_config('ppid_key', 'ppid') if self.get_config('data_source') == 'endgame': self.process_subtype = "opcode" self.create_values = (1, 3, 9) self.terminate_values = (2, 4) else: self.process_subtype = "subtype" self.create_values = ["create", "fork"] self.terminate_values = ["terminate"] self._scoped = [] for name, fn in self.get_config('functions', {}).items(): self.add_custom_function(name, fn) self._output_hooks.extend(self.get_config('hooks', [])) self.flatten = self.get_config('flatten', False) if self.get_config('print', False): self._default_emitter = self.print_events elif self.flatten: self._default_emitter = self.output_single_events else: self._default_emitter = self.get_result_emitter() def print_event(self, event): # type: (Event) -> None """Print an event to stdout.""" print(json.dumps(event.data, sort_keys=True)) def print_events(self, events): """Print an array of events to stdout.""" if events is not PIPE_EOF: for event in events: self.print_event(event) def _to_hooks(self, item): for hook in self._output_hooks: hook(item) def output_single_events(self, events): """Output a list of events to all callbacks.""" if events is not PIPE_EOF: for event in events: self._to_hooks(event) def get_result_emitter(self, analytic_id=None, next_pipe=None): """Get a function that returns results for an analytic.""" if next_pipe is None: next_pipe = self._to_hooks def flatten_with_id(events): if events is not PIPE_EOF: events = [event.copy() for event in events] for event in events: event.data['analytic_id'] = analytic_id self.output_single_events(events) def output_results(events): # type: (list[Event]) -> None if events is not PIPE_EOF: result = AnalyticOutput(analytic_id, events) next_pipe(result) if self.flatten: if not analytic_id: return self.output_single_events else: return flatten_with_id else: return output_results @classmethod def null_checked(cls, f): """Helper decorator for checking null values.""" @functools.wraps(f) def decorated(*args): # null can only be compared with IsNull/IsNotNull if any(arg is None for arg in args): return return f(*args) return decorated @classmethod def check_types(cls, f): """Helper decorator for performing type checks before evaluating comparisons.""" @functools.wraps(f) def decorated(a, b): if a is None or b is None: return if is_string(a) and is_string(b) or \ is_number(a) and is_number(b) or \ type(a) == type(b): return f(a, b) return decorated @classmethod def lowercase(cls, f, always=True): """Decorator function for lowercasing values to perform case-insensitive comparisons.""" @functools.wraps(f) def decorated(a, b): if is_string(a) and is_string(b): return f(fold_case(a), fold_case(b)) if not always: return f(a, b) return decorated def convert(self, node, *args, **kwargs): """Convert an eql AST to a python callback function. :param EqlNode node: The eql AST :param bool piped: Create a pipe callback for multiple events :param bool scoped: Wrap the callback with variable scoping :return A python callback function that takes an event. :rtype: (Event|Scope|list[Event]) -> object """ piped = kwargs.pop("piped", False) scoped = kwargs.pop("scoped", False) method = self.get_node_method(node, "_convert_") cb = method(node, *args, **kwargs) if not scoped: return cb elif piped: @functools.wraps(cb) def wrapped(events): return cb(Scope(events, [])) return wrapped else: @functools.wraps(cb) def wrapped(event): return cb(Scope([event], [])) return wrapped @classmethod def _remove_case(cls, key): if is_string(key): return fold_case(key) elif is_array(key): return tuple(cls._remove_case(k) for k in key) else: return key def _convert_key(self, args, scoped=True, piped=False, insensitive=True): """Convert a tuple of AST nodes to a callback function that returns a key. :param list[Event] args: :param bool piped: Create a pipe callback for multiple events :param bool scoped: Wrap the callback with variable scoping :rtype: (Scope|Event|list[Event]) -> tuple[object] """ remove_case = self._remove_case if len(args) == 0: return lambda e: None elif len(args) == 1: callback = self.convert(args[0], scoped=scoped, piped=piped) if insensitive: return lambda e: remove_case(callback(e)) return callback callbacks = [self.convert(arg, scoped=scoped, piped=piped) for arg in args] if insensitive: return lambda e: tuple(remove_case(cb(e)) for cb in callbacks) else: return lambda e: tuple(cb(e) for cb in callbacks) def _convert_tuple(self, args): """Convert a tuple of AST nodes to a callback function that returns a tuple of values. :param list[EqlNode] args: :rtype: (eql.engines.base.Event) -> tuple[object] """ callbacks = [self.convert(arg) for arg in args] if len(callbacks) == 0: tup = tuple() return lambda e: tup def to_tuple_callback(value): return tuple(callback(value) for callback in callbacks) return to_tuple_callback def convert_pipe(self, node, next_pipe): """Convert an EQL pipe into a callback function. :param PipeCommand node: The original EQL pipe :param (list[eql.engines.base.Event]) -> None next_pipe: An already converted pipe :rtype: (list[eql.engines.base.Event]) -> None """ return self.convert(node, next_pipe) def convert_reducer(self, node, next_pipe): """Convert an EQL reducer into a callback function. :param PipeCommand node: The original EQL pipe :param (list[eql.engines.base.Event]) -> None next_pipe: An already converted reducer :rtype: (list[eql.engines.base.Event]) -> None """ method = self.get_node_method(node, "_reduce_") or self.get_node_method(node, "_convert_") return method(node, next_pipe) def _convert_not(self, node): # type: (Not) -> callable get_value = self.convert(node.term) def negate(scope): # type: (Scope) -> bool value = get_value(scope) if value is not None: return not value return negate def _convert_literal(self, node): # type: (Literal) -> callable literal_value = node.value return lambda scope: literal_value def _convert_field(self, node): # type: (Field) -> callable def walk_path(value): for key in node.path: if value is None: break elif is_string(value) and is_string(key): # expand subtype.create -> subtype == "create" # if there's a string field called "subtype" value = (value == key) elif isinstance(value, dict): value = value.get(key) elif isinstance(key, int) and is_array(value) and key < len(value): value = value[key] else: return return value # if the variable is a dynamic variable, then find it from the list of named arguments if node.base in self._scoped: for pos, name in enumerate(self._scoped): if name == node.base: index = pos def dynamic_func(scope): # type: (Scope) -> object return walk_path(scope.variables[index]) return dynamic_func if self._in_pipe: # Check if this is querying for any events if self._query_multiple_events: index, node = node.query_multiple_events() else: index = 0 def pipe_callback(scope): # type: (Scope) -> object event = scope.events[index] return walk_path(event.data.get(node.base)) return pipe_callback else: def query_event_callback(scope): # type: (Scope) -> object return walk_path(scope.event.data.get(node.base)) return query_event_callback def _create_custom_callback(self, arguments, body): """Convert an EQL callback with named arguments. :param list[Field] arguments: List of named arguments for the callback function :param Expression body: List of named arguments for the callback function :rtype: (Scope, object) -> object """ names = [arg.base for arg in arguments] size = len(self._scoped) self._scoped.extend(names) callback = self.convert(body) self._scoped[size:] = [] return callback def _function_safe(self, arguments): get_value = self.convert(arguments[0]) def callback(scope): try: return get_value(scope) except Exception: pass return callback def _function_array_count(self, arguments): node = FunctionCall('arrayCount', arguments) if len(arguments) == 3 and self.is_variable(arguments[1]): array, name, body = arguments get_array = self.convert(array) callback = self._create_custom_callback([name], body) def walk_array(scope): # type: (Scope) -> bool array = get_array(scope) count = 0 if isinstance(array, list): for item in array: if scope.call(callback, item): count = count + 1 return count return walk_array raise EqlCompileError(u"Invalid signature {}".format(node)) def _function_array_search(self, arguments): node = FunctionCall('arraySearch', arguments) if len(arguments) == 3 and self.is_variable(arguments[1]): array, name, body = arguments get_array = self.convert(array) callback = self._create_custom_callback([name], body) def walk_array(scope): # type: (Scope) -> bool array = get_array(scope) if isinstance(array, list): for item in array: if scope.call(callback, item): return True return False return walk_array raise EqlCompileError(u"Invalid signature {}".format(node)) def _convert_function_call(self, node): # type: (FunctionCall) -> callable name = node.name method = getattr(self, "_function_{}".format(self.camelized(name)), None) if method: return method(node.arguments) # if a function isn't found, pull a specific callback in from the function registry func = self._functions.get(node.name, node.signature) # if it's a function signature, then get the methods if hasattr(func, "get_callback"): func = func.get_callback(*node.arguments) if not callable(func): raise EqlCompileError("Unknown function {}".format(node.name)) get_arguments = self._convert_tuple(node.arguments) def wrapped_function(scope): # type: (Scope) -> bool return func(*get_arguments(scope)) return wrapped_function def _convert_in_set(self, node): # type: (InSet) -> callable if all(isinstance(item, Literal) for item in node.container): values = set() for item in node.container: value = item.value values.add(fold_case(value)) get_value = self.convert(node.expression) def callback(scope): # type: (Scope) -> bool check_value = get_value(scope) if check_value is None: return return fold_case(check_value) in values return callback else: return self.convert(node.synonym) def _convert_is_null(self, node): # type: (IsNull) -> callable get_value = self.convert(node.expr) return lambda x: get_value(x) is None def _convert_is_not_null(self, node): # type: (IsNotNull) -> callable get_value = self.convert(node.expr) return lambda x: get_value(x) is not None def _convert_comparison(self, node): # type: (Comparison) -> callable get_left = self.convert(node.left) get_right = self.convert(node.right) # if left and right could be strings, then we should normalize case possible_string_types = FunctionCall, String, Field if isinstance(node.left, possible_string_types) and isinstance(node.right, possible_string_types): compare = self.check_types(self.lowercase(node.function, always=False)) else: compare = self.check_types(node.function) def callback(scope): # type: (Scope) -> bool left = get_left(scope) right = get_right(scope) return compare(left, right) return callback def _convert_math_operation(self, node): # type: (MathOperation) -> callable return self.convert(node.to_function_call()) @staticmethod def to_bool_or_null(obj): # type: (object) -> bool """Helper function for aggregating boolean logic.""" if obj is not None: return bool(obj) def _convert_and(self, node): # type: (And) -> callable get_terms = [self.convert(term) for term in node.terms] first = get_terms[0] rest = get_terms[1:] def and_terms(scope): # type: (Scope) -> bool aggregate = self.to_bool_or_null(first(scope)) if aggregate is False: return False for term in rest: value = self.to_bool_or_null(term(scope)) if value is False: return False elif value is None: aggregate = None return aggregate return and_terms def _convert_or(self, node): # type: (Or) -> callable get_terms = [self.convert(term) for term in node.terms] first = get_terms[0] rest = get_terms[1:] def or_terms(scope): # type: (Scope) -> bool aggregate = self.to_bool_or_null(first(scope)) if aggregate is True: return True for term in rest: value = self.to_bool_or_null(term(scope)) if value is True: return True elif value is None: aggregate = None return aggregate return or_terms def _convert_count_pipe(self, node, next_pipe): # type: (CountPipe, callable) -> callable host_key = self.host_key if len(node.arguments) == 0: # Counting only the total summary = {'key': 'totals', 'count': 0} hosts = set() def count_total_callback(events): if events is PIPE_EOF: if len(hosts): summary['total_hosts'] = len(hosts) summary['hosts'] = list(sorted(hosts)) next_pipe([Event(EVENT_TYPE_GENERIC, 0, summary)]) next_pipe(PIPE_EOF) else: summary['count'] += 1 if host_key in events[0].data: hosts.add(events[0].data[host_key]) return count_total_callback else: get_key = self._convert_key(node.arguments, scoped=True, piped=True, insensitive=False) # we want to aggregate counts for keys insensitively, but need to keep the case of the first one we see key_lookup = {} count_table = defaultdict(lambda: {'count': 0, 'hosts': set()}) remove_case = self._remove_case def count_tuple_callback(events): # type: (list[Event]) -> None if events is PIPE_EOF: # This may seem a little tricky, but we need to effectively learn the type(s) to perform comparison # Python 3 doesn't allow you to use a key function that returns various types case_sensitive_table = {key_lookup[k]: v for k, v in count_table.items()} converter = get_type_converter(case_sensitive_table) converted_count_table = {converter(k): v for k, v in case_sensitive_table.items()} total = sum(tbl['count'] for tbl in converted_count_table.values()) for key, details in sorted(converted_count_table.items(), key=lambda kv: (kv[1]['count'], kv[0])): hosts = details.pop('hosts') if len(hosts): details['hosts'] = list(sorted(hosts)) details['total_hosts'] = len(hosts) details['key'] = key details['percent'] = float(details['count']) / total next_pipe([Event(EVENT_TYPE_GENERIC, 0, details)]) next_pipe(PIPE_EOF) else: key = get_key(events) insensitive_key = remove_case(key) key_lookup.setdefault(insensitive_key, key) count_table[insensitive_key]['count'] += 1 if host_key in events[0].data: count_table[insensitive_key]['hosts'].add(events[0].data[host_key]) return count_tuple_callback def _convert_filter_pipe(self, node, next_pipe): # type: (FilterPipe, callable) -> callable check_filter = self.convert(node.expression, piped=True, scoped=True) def filter_callback(events): # type: (list[Event]) -> None if events is PIPE_EOF: next_pipe(PIPE_EOF) elif check_filter(events): next_pipe(events) return filter_callback def _convert_head_pipe(self, node, next_pipe): # type: (HeadPipe, callable) -> callable totals = [0] # has to be mutable because of python scoping max_count = node.count def head_callback(events): if totals[0] < max_count: if events is PIPE_EOF: next_pipe(PIPE_EOF) else: totals[0] += 1 next_pipe(events) if totals[0] == max_count: next_pipe(PIPE_EOF) return head_callback def _convert_tail_pipe(self, node, next_pipe): # type: (TailPipe, callable) -> callable output_buffer = deque(maxlen=node.count) def tail_callback(events): if events is PIPE_EOF: for output in output_buffer: next_pipe(output) next_pipe(PIPE_EOF) else: output_buffer.append(events) return tail_callback def _convert_sort_pipe(self, node, next_pipe): # type: (SortPipe, callable) -> callable output_buffer = [] sort_key = self._convert_key(node.arguments, scoped=True, piped=True) def sort_callback(events): if events is PIPE_EOF: converter = get_type_converter(sort_key(buf) for buf in output_buffer) def get_converted_key(buffer_events): return converter(sort_key(buffer_events)) output_buffer.sort(key=get_converted_key) for output in output_buffer: next_pipe(output) next_pipe(PIPE_EOF) else: output_buffer.append(events) return sort_callback def _convert_unique_pipe(self, node, next_pipe): # type: (UniquePipe, callable) -> callable seen = set() get_unique_key = self._convert_key(node.arguments, scoped=True, piped=True) def unique_callback(events): if events is PIPE_EOF: next_pipe(PIPE_EOF) else: key = get_unique_key(events) if key not in seen: seen.add(key) next_pipe(events) return unique_callback def _convert_unique_count_pipe(self, node, next_pipe): # type: (CountPipe) -> callable """Aggregate counts coming into the pipe.""" host_key = self.host_key get_unique_key = self._convert_key(node.arguments, scoped=True, piped=True) results = OrderedDict() def count_unique_callback(events): # type: (list[Event]) -> None if events is PIPE_EOF: # Calculate the total total = sum(result[0].data['count'] for result in results.values()) for result in results.values(): hosts = result[0].data.pop('hosts') # type: set if len(hosts) > 0: result[0].data['hosts'] = list(sorted(hosts)) result[0].data['total_hosts'] = len(hosts) result[0].data['percent'] = float(result[0].data['count']) / total next_pipe(result) next_pipe(PIPE_EOF) else: # Create a copy of these, because they can be modified events = [events[0].copy()] + events[1:] piece = events[0].data key = get_unique_key(events) hosts = piece.pop('hosts', []) host = piece.pop(host_key, None) count = piece.pop('count', 1) if key not in results: results[key] = events match = piece match['hosts'] = set() match['count'] = count else: match = results[key][0].data match['count'] += count if host: match['hosts'].add(host) else: match['hosts'].update(hosts) return count_unique_callback def _reduce_count_pipe(self, node, next_pipe): # type: (CountPipe) -> callable """Aggregate counts coming into the pipe.""" host_key = self.host_key if len(node.arguments) == 0: # Counting only the total result = {'key': 'totals', 'count': 0, 'hosts': set()} def count_total_aggregates(events): # type: (list[Event]) -> None if events is PIPE_EOF: hosts = result.pop('hosts') # type: set if len(hosts) > 0: result['hosts'] = list(sorted(hosts)) result['total_hosts'] = len(hosts) next_pipe([Event(EVENT_TYPE_GENERIC, 0, result)]) next_pipe(PIPE_EOF) else: piece = events[0].data result['count'] += piece['count'] if host_key in piece: result['hosts'].add(piece[host_key]) elif 'hosts' in piece: results['hosts'].update(piece['hosts']) return count_total_aggregates else: results = defaultdict(lambda: {'count': 0, 'hosts': set()}) def count_tuple_callback(events): # type: (list[Event]) -> None if events is PIPE_EOF: converter = get_type_converter(results) converted_results = {converter(k): v for k, v in results.items()} total = sum(result['count'] for result in converted_results.values()) for key, result in sorted(converted_results.items(), key=lambda kr: (kr[1]['count'], kr[0])): hosts = result.pop('hosts') # type: set if len(hosts) > 0: result['hosts'] = list(sorted(hosts)) result['total_hosts'] = len(hosts) result['key'] = key result['percent'] = float(result['count']) / total next_pipe([Event(EVENT_TYPE_GENERIC, 0, result)]) next_pipe(PIPE_EOF) else: piece = events[0].data key = events[0].data['key'] key = tuple(key) if len(node.arguments) > 1 else key results[key]['count'] += piece['count'] if host_key in piece: results[key]['hosts'].add(piece[host_key]) elif 'hosts' in piece: results[key]['hosts'].update(piece['hosts']) return count_tuple_callback def _convert_named_subquery(self, node): # type: (NamedSubquery) -> callable if node.query_type == NamedSubquery.DESCENDANT: return self._get_descendant_of(node.query) elif node.query_type == NamedSubquery.CHILD: return self._get_child_of(node.query) elif node.query_type == NamedSubquery.EVENT: return self._get_event_of(node.query) else: raise EqlCompileError("Unknown query type {}".format(node.query_type)) def _get_descendant_of(self, node): # type: (EventQuery) -> callable sources = set() descendants = set() dead_processes = set() process_subtype = self.process_subtype creates = self.create_values terminates = self.terminate_values @self.event_callback("process") def update_descendants(event): # type: (Event) -> None ppid = event.data.get(self.ppid_key) pid = event.data.get(self.pid_key) subtype = event.data.get(process_subtype) for pending_pid in dead_processes: if pending_pid in descendants: descendants.remove(pending_pid) if pending_pid in sources: sources.remove(pending_pid) dead_processes.clear() if subtype in creates and pid == 4 and event.data.get('process_name') == "System": # Reset all state on a sensor or machine boot up descendants.clear() sources.clear() if subtype in creates and (ppid in descendants or ppid in sources): # Check if the parent matches descendants.add(pid) elif subtype in terminates: dead_processes.add(pid) ancestor_match = self.convert(node.query, scoped=True) @self.event_callback(node.event_type) def check_ancestor(event): # type: (Event) -> None pid = event.data.get('pid', 0) pid_key = event.data.get(self.pid_key) if pid != 0 and ancestor_match(event) and pid_key: sources.add(pid_key) def check_if_descendant(scope): # type: (Scope) -> bool return scope.event.data.get(self.pid_key) in descendants return check_if_descendant def _get_child_of(self, node): # type: (EventQuery) -> callable parents = set() children = set() dead_processes = set() process_subtype = self.process_subtype creates = self.create_values terminates = self.terminate_values @self.event_callback("process") def update_children(event): # type: (Event) -> None ppid = event.data.get(self.ppid_key) pid = event.data.get(self.pid_key) subtype = event.data.get(process_subtype) for pending_pid in dead_processes: if pending_pid in children: children.remove(pending_pid) if pending_pid in parents: parents.remove(pending_pid) dead_processes.clear() if subtype in creates and pid == 4 and event.data.get('process_name') == "System": # Reset all state on a sensor or machine boot up children.clear() parents.clear() if subtype in creates and (ppid in parents): # Check if the parent matches children.add(pid) elif subtype in terminates: dead_processes.add(pid) process_match = self.convert(node.query, scoped=True) @self.event_callback(node.event_type) def match_processes(event): # type: (Event) -> None pid = event.data.get('pid', 0) pid_key = event.data.get(self.pid_key) if pid != 0 and process_match(event) and pid_key: parents.add(pid_key) def check_if_child(scope): # type: (Scope) -> bool return scope.event.data.get(self.pid_key) in children return check_if_child def _get_event_of(self, node): # type: (EventQuery) -> callable processes = set() dead_processes = set() process_subtype = self.process_subtype creates = self.create_values terminates = self.terminate_values @self.event_callback("process") def purge_on_terminate(event): # type: (Event) -> None pid = event.data.get(self.pid_key) subtype = event.data.get(process_subtype) for pending_pid in dead_processes: if pending_pid in processes: processes.remove(pending_pid) dead_processes.clear() if subtype in creates and pid == 4 and event.data.get('process_name') == "System": # Reset all state on a sensor or machine boot up processes.clear() elif subtype in terminates: dead_processes.add(pid) process_match = self.convert(node.query, scoped=True) @self.event_callback(node.event_type) def match_processes(event): # type: (Event) -> None pid = event.data.get('pid', 0) pid_key = event.data.get(self.pid_key) if pid != 0 and process_match(event) and pid_key: processes.add(pid_key) def check_for_match(scope): # type: (Scope) -> bool return scope.event.data.get(self.pid_key) in processes return check_for_match def _convert_event_query(self, node): # type: (EventQuery) -> callable check_match = self.convert(node.query, scoped=True) expected_type = node.event_type def match_event_callback(event): # type: (Event) -> bool return expected_type == event.type and check_match(event) if expected_type == EVENT_TYPE_ANY: return check_match else: return match_event_callback def _convert_join(self, node, next_pipe): # type: (Join, callable) -> callable size = len(node.queries) lookup = defaultdict(lambda: [None] * size) # type: dict[object, list[Event]] def convert_join_term(subquery, position): # type: (SubqueryBy, int) -> callable check_event = self.convert(subquery.query) get_join_value = self._convert_key(subquery.join_values, scoped=True) @self.event_callback(subquery.query.event_type) def join_event_callback(event): # type: (Event) -> None if check_event(event): join_value = get_join_value(event) if lookup[join_value][position] is None: lookup[join_value][position] = event if all(event is not None for event in lookup[join_value]): next_pipe(lookup[join_value]) lookup.pop(join_value) if node.close: check_close_event = self.convert(node.close.query) close_join_value = self._convert_key(node.close.join_values, scoped=True) @self.event_callback(node.close.query.event_type) def close_join_callback(event): # type: (Event) -> None if check_close_event(event): join_value = close_join_value(event) lookup.pop(join_value, None) for pos, query in enumerate(node.queries): convert_join_term(query, pos) def _convert_sample_term(self, subquery, size, samples, next_pipe=None): check_event = self.convert(subquery.query) # Determine if there's a join_value present has_join_value = True if subquery.join_values else False # If there's a join value, get it. get_join_value = self._convert_key(subquery.join_values, scoped=True) if has_join_value else None @self.event_callback(subquery.query.event_type) def sample_callback(event): # type: (Event) -> None if check_event(event): if has_join_value: # The regular case where join values exist join_value = get_join_value(event) if join_value not in samples: samples[join_value] = [] samples[join_value].append(event) if len(samples[join_value]) == size: next_pipe(samples[join_value]) samples.pop(join_value) else: # The case where no join values exist samples.append(event) if len(samples) == size: # Pass a copy to the next_pipe to avoid mutation issues next_pipe(samples[:]) samples.clear() def _convert_sample(self, node, next_pipe): # type: (Sample, callable) -> callable # Check if there's a join value for any subquery has_join_value_for_any_subquery = any(subquery.join_values for subquery in node.queries) # Initialize samples based on the presence of join values samples = {} if has_join_value_for_any_subquery else [] size = len(node.queries) for _, query in reversed(list(enumerate(node.queries))): # Create these in reverse order, so one event can't hit multiple callbacks to be propagated self._convert_sample_term(query, size, samples, next_pipe) def _convert_sequence_term(self, subquery, position, size, lookups, next_pipe=None): # type: (SubqueryBy, int, int, list[dict[object, list[Event]]], callable) -> callable check_event = self.convert(subquery.query) get_join_value = self._convert_key(subquery.join_values, scoped=True) last_position = size - 1 fork = bool(subquery.params.kv.get('fork', Boolean(False)).value) if position == 0: @self.event_callback(subquery.query.event_type) def start_sequence_callback(event): # type: (Event) -> None if check_event(event): join_value = get_join_value(event) sequence = [event] lookups[1][join_value] = sequence elif position < last_position: next_position = position + 1 @self.event_callback(subquery.query.event_type) def continue_sequence_callback(event): # type: (Event) -> None if len(lookups[position]) and check_event(event): join_value = get_join_value(event) if join_value in lookups[position]: if fork: sequence = list(lookups[position].get(join_value)) else: sequence = lookups[position].pop(join_value) sequence.append(event) lookups[next_position][join_value] = sequence else: @self.event_callback(subquery.query.event_type) def finish_sequence(event): # type: (Event) -> None if len(lookups[position]) and check_event(event): join_value = get_join_value(event) if join_value in lookups[position]: if fork: sequence = list(lookups[position].get(join_value)) else: sequence = lookups[position].pop(join_value) sequence.append(event) next_pipe(sequence) def _convert_sequence(self, node, next_pipe): # type: (Sequence, callable) -> callable # Two lookups can help avoid unnecessary calls size = len(node.queries) lookups = [{} for _ in range(size)] # type: list[dict[object, list[Event]]] if node.max_span is not None: max_span = self.convert_time_range(node.max_span) event_types = set(q.query.event_type for q in node.queries) @self.event_callback(*event_types) def check_timeout(event): # type: (Event) -> None minimum_start = event.time - max_span for sub_lookup in lookups: for join_key, sequence in list(sub_lookup.items()): if sequence[0].time < minimum_start: sub_lookup.pop(join_key) if node.close: check_close_event = self.convert(node.close.query) get_close_join_value = self._convert_key(node.close.join_values, scoped=True) @self.event_callback(node.close.query.event_type) def close_sequences(event): # type: (Event) -> None if check_close_event(event): join_value = get_close_join_value(event) for sub_lookup in lookups: if join_value in sub_lookup: sub_lookup.pop(join_value) for pos, query in reversed(list(enumerate(node.queries))): # Create these in reverse order, so one event can't hit multiple callbacks to be propagated self._convert_sequence_term(query, pos, len(node.queries), lookups, next_pipe) def _get_pipe_chain(self, pipes, output_pipe=None, query_multiple=True): # type: (list[PipeCommand], callable) -> callable """Get a chain of pipes.""" prev_query_value = self._query_multiple_events self._query_multiple_events = query_multiple output_pipe = output_pipe or self._default_emitter self._in_pipe = True for pipe in reversed(pipes): output_pipe = self.convert_pipe(pipe, output_pipe) self._in_pipe = False self._query_multiple_events = prev_query_value return output_pipe def _get_pipe_reducers(self, pipes, output_pipe=None, query_multiple=True): # type: (list[PipeCommand], callable, bool) -> callable """Get a chain of pipes.""" prev_query_value = self._query_multiple_events self._query_multiple_events = query_multiple output_pipe = output_pipe or self._default_emitter self._in_pipe = True for pipe in reversed(pipes): output_pipe = self.convert_reducer(pipe, output_pipe) if isinstance(pipe, (CountPipe, UniqueCountPipe)): break else: # Sort these events by time next_pipe = output_pipe results = [] def sort_results(events): # type: (list[Event]) -> None if events is not PIPE_EOF: results.append(events) else: results.sort(key=lambda result: (max(event.time for event in result), max(event.data.get('serial_event_id') for event in result))) for result in results: next_pipe(result) next_pipe(PIPE_EOF) output_pipe = sort_results self._in_pipe = False self._query_multiple_events = prev_query_value return output_pipe def _convert_piped_query(self, node, output_pipe=None): # type: (PipedQuery, callable) -> callable base_query = node.first query_multiple = not isinstance(base_query, EventQuery) output_pipe = self._get_pipe_chain(node.pipes, output_pipe=output_pipe, query_multiple=query_multiple) self.register_output_pipe(output_pipe) if isinstance(base_query, EventQuery): event_query = base_query check_match = self._convert_event_query(event_query) @self.event_callback(event_query.event_type) def callback(event): # type: (Event) -> None if check_match(event): output_pipe([event]) elif isinstance(base_query, Join): self._convert_join(base_query, output_pipe) elif isinstance(base_query, Sequence): self._convert_sequence(base_query, output_pipe) elif isinstance(base_query, Sample): self._convert_sample(base_query, output_pipe) else: raise EqlCompileError("Unsupported {}".format(type(base_query).__name__)) def _convert_analytic(self, analytic): # type: (EqlAnalytic) -> callable analytic_id = analytic.id or analytic.name self._convert_piped_query(analytic.query, self.get_result_emitter(analytic_id))
[docs] def add_custom_function(self, name, func): # type: (str, function) -> None """Load a python function into the EQL engine.""" self._functions[name] = func
[docs] def add_analytic(self, analytic): # type: (EqlAnalytic) -> None """Convert an analytic and load into the engine.""" expanded_analytic = self.preprocessor.expand(analytic) self._convert_analytic(expanded_analytic)
[docs] def add_query(self, query): # type: (PipedQuery | EqlAnalytic) -> None """Convert an analytic and load into the engine.""" query = self.preprocessor.expand(query) self._convert_piped_query(query)
[docs] def add_queries(self, queries): """Add multiple queries to the engine.""" for query in queries: self.add_query(query)
def add_post_processor(self, query, analytic_id=None, output_pipe=None, query_multiple=False): # type: (PipedQuery, str, callable, bool) -> None """Register a query post-processor to perform additional filtering of results.""" chain = self._get_pipe_chain(query.pipes, output_pipe, query_multiple=query_multiple) self._reducer_hooks[analytic_id].append(chain) def add_reducer(self, query, analytic_id=None, output_pipe=None): """Reduce the output from multiple queries. :param PipedQuery|EqlAnalytic query: The analytic to extra the reduce logic from :param str analytic_id: Optional analytic_id to add to AnalyticOutput results :param callable output_pipe: Next pipe to reduce to """ if isinstance(query, EqlAnalytic): analytic_id = query.id or analytic_id output_pipe = self.get_result_emitter(query.id, output_pipe) query = query.query query_multiple = not isinstance(query.first, EventQuery) reduce_pipe_chain = self._get_pipe_reducers(query.pipes, output_pipe, query_multiple=query_multiple) # At this point output_pipe is the entry point to the reducer self._reducer_hooks[analytic_id].append(reduce_pipe_chain)
[docs] def stream_event(self, event): # type: (Event) -> None """Stream a single :class:`~Event` through the engine.""" for hook in self._event_hooks[event.type]: hook(event)
[docs] def finalize(self): """Send the engine an EOF signal, so that aggregating pipes can finish.""" for pipe in self._query_pipes: pipe(PIPE_EOF) for analytic_id, reducers in self._reducer_hooks.items(): for reducer in reducers: reducer(PIPE_EOF)
[docs] def stream_events(self, events, finalize=True): """Stream :class:`~Event` objects through the engine.""" for event in events: if not isinstance(event, Event): event = Event.from_data(event) self.stream_event(event) if finalize: self.finalize()
def reduce_events(self, inputs, analytic_id=None, finalize=True): """Run an event through the reducers registered with :meth:`~add_reducer` and :meth:`~add_post_processor`. :param AnalyticOutput|Event|dict inputs: Mapped results to reduce :param str analytic_id: Optional analytic id to add to generated AnalyticOutput results :param bool finalize: Send the finalize signal when input is exhausted. """ for data in inputs: if isinstance(data, AnalyticOutput): analytic_id = data.analytic_id or analytic_id events = data.events elif isinstance(data, Event): events = [data] elif isinstance(data, dict): events = [Event.from_data(data)] else: raise EqlCompileError("Unable to reduce {}".format(data)) for reducer in self._reducer_hooks[analytic_id]: reducer(events) if finalize: self.finalize() def add_event_callback(self, event_type, f): # type: (int, callable) -> None """Register a callback for incoming events.""" if event_type == EVENT_TYPE_ANY: # Note that if querying over all events, we need to preserve the order the hooks were created # So append them to all existing hook arrays self._any_event_hooks.append(f) for _, event_hooks in self._event_hooks.items(): event_hooks.append(f) else: self._event_hooks[event_type].append(f) def event_callback(self, *event_types): """Get a decorator that registers a function as an event callback in the engine.""" assert all(is_string(e) for e in event_types) def event_callback_decorator(f): for event_type in event_types: self.add_event_callback(event_type, f) return f return event_callback_decorator def register_output_pipe(self, f): """"Register a pipe, so that it can get called when the engine is closing.""" self._query_pipes.append(f) def output_pipe(self, f): """"Decorator that registers a pipe, so that it can get called when the engine is closing.""" self.register_output_pipe(f) return f
[docs] def add_output_hook(self, f): """Register a callback to receive events as they are output from the engine.""" self._output_hooks.append(f)
__all__ = ( "PythonEngine", )