"""EQL syntax tree nodes/schema."""
from __future__ import unicode_literals
import re
from collections import OrderedDict
from operator import lt, le, eq, ne, ge, gt
from string import Template
from enum import Enum
from .errors import EqlError, EqlCompileError
from .signatures import SignatureMixin
from .types import TypeHint, NodeInfo # noqa: F401
from .utils import to_unicode, is_string, is_number, ParserConfig, fold_case, chr_compat
__all__ = (
# base classes
"BaseNode",
"Expression",
"EqlNode",
# Literals
"Literal",
"String",
"Number",
"Null",
"Boolean",
"TimeRange",
"TimeUnit",
# fields and subfields
"Field",
# boolean logic
"Comparison",
"InSet",
"IsNotNull",
"IsNull",
"And",
"Or",
"Not",
"FunctionCall",
"MathOperation",
# queries
"EventQuery",
"NamedSubquery",
"NamedParams",
"SubqueryBy",
"Join",
"Sequence",
"Sample",
# pipes
"PipeCommand",
# full queries
"PipedQuery",
"EqlAnalytic",
# macros
"Definition",
"BaseMacro",
"CustomMacro",
"Macro",
"Constant",
"PreProcessor",
)
[docs]class BaseNode(object):
"""This is the base class for all AST nodes."""
__slots__ = ()
template = None # type: Template
delims = {}
precedence = None
def iter_slots(self):
# type: () -> list
"""Enumerate over all of the slots and their values."""
for key in self.__slots__:
yield key, getattr(self, key, None)
def children(self): # type: () -> list[BaseNode]
"""Collect over all child nodes."""
children = []
def recurse(descendant):
if isinstance(descendant, BaseNode):
children.append(descendant)
elif hasattr(descendant, "__iter__"):
for c in descendant:
recurse(c)
recurse(c for c in self.iter_slots())
return children
def optimize(self, recursive=False):
"""Optimize an AST."""
return Optimizer(recursive=recursive).walk(self)
def __eq__(self, other):
"""Check if two ASTs are equivalent."""
return type(self) == type(other) and list(self.iter_slots()) == list(other.iter_slots())
def __ne__(self, other):
"""Check if two ASTs are not equivalent."""
return not self == other
[docs] def render(self, precedence=None, **kwargs):
"""Render the AST in the target language."""
if not self.template:
raise NotImplementedError()
dicted = {}
for name, value in self.iter_slots():
if isinstance(value, (list, tuple)):
delim = self.delims[name]
value = [v.render(self.precedence, **kwargs) if isinstance(v, BaseNode) else v for v in value]
value = delim.join(v for v in value)
elif isinstance(value, BaseNode):
value = value.render(self.precedence, **kwargs)
dicted[name] = value
return self.template.substitute(dicted)
def __repr__(self):
"""Python representation of the AST."""
return "{}({})".format(type(self).__name__, ", ".join('{}={}'.format(name, repr(slot))
for name, slot in self.iter_slots()))
def __iter__(self):
"""Iterate recursively through all nodes in the tree."""
return Walker().iter_node(self)
def __unicode__(self):
"""Render the AST back as a valid EQL string."""
return self.render()
def __str__(self):
"""Render the AST back as a valid EQL string."""
unicoded = self.__unicode__()
# Python 2.7
if not isinstance(unicoded, str):
unicoded = unicoded.encode('utf-8')
return unicoded
# noinspection PyAbstractClass
[docs]class EqlNode(BaseNode):
"""The base class for all nodes within the event query language."""
TAB = ' '
precedence = None
def indent(self, text, depth=1):
"""Indent by EQL default tab."""
delim = self.TAB * depth
return '\n'.join(delim + line.rstrip() for line in text.splitlines())
def _render(self):
# Render the template if defined
return super(EqlNode, self).render()
def render(self, precedence=None, **kwargs):
"""Render an EQL node and add parentheses to support orders of operation."""
rendered = self._render(**kwargs)
if precedence is not None and self.precedence is not None and self.precedence > precedence:
return '({})'.format(rendered)
return rendered
# noinspection PyAbstractClass
[docs]class Expression(EqlNode):
"""Base class for expressions."""
precedence = 0
def foldable(self):
"""Determine if an expression can be folded."""
return all(c.foldable() for c in self.children())
def fold(self):
"""Fold an expression."""
optimized = self.optimize(recursive=True)
if isinstance(optimized, Literal):
return optimized.value
raise EqlError("Unable to fold expression: {}".format(self))
def __and__(self, other):
"""Boolean AND between two AST nodes."""
if self == Boolean(False) or other == Boolean(False):
return Boolean(False)
elif self == Boolean(True):
return other
elif other == Boolean(True):
return self
if self == Null() or other == Null():
return Null()
# flatten out the terms in the And
terms = []
for t in (self, other):
if isinstance(t, And):
terms.extend(t.terms)
else:
terms.append(t)
return And(terms)
def __or__(self, other):
""""Boolean OR between two AST nodes."""
if self == Boolean(True) or other == Boolean(True):
return Boolean(True)
elif self == Boolean(False):
return other
elif other == Boolean(False):
return self
if self == Null() or other == Null():
return Null()
# flatten out the terms in the Or
terms = []
for t in (self, other):
if isinstance(t, Or):
terms.extend(t.terms)
else:
terms.append(t)
return Or(terms)
def __invert__(self):
"""Negate an expression with Not."""
return Not(self)
[docs]class Literal(Expression):
"""Static value."""
__slots__ = 'value',
precedence = Expression.precedence + 1
type_hint = None # type: TypeHint
def __init__(self, value):
"""Create an EQL value from a python value."""
if type(self) is Literal:
raise TypeError("Literal AST nodes can't be created directly. Try Literal.from_python")
self.value = value
@classmethod
def find_type(cls, python_value):
"""Find the corresponding AST node type for a python value."""
if python_value is None:
return Null
elif python_value is True or python_value is False:
return Boolean
elif is_number(python_value):
return Number
elif is_string(python_value):
return String
else:
raise TypeError("Unable to convert {} to a literal.".format(python_value))
@classmethod
def from_python(cls, python_value):
"""Convert a python value to a literal."""
subcls = cls.find_type(python_value)
return subcls(python_value)
def __invert__(self):
"""Negate a static value."""
return Boolean(not self.value)
class Boolean(Literal):
"""Boolean literal."""
type_hint = TypeHint.Boolean
def _render(self):
return 'true' if self.value else 'false'
class Null(Literal):
"""Null literal."""
type_hint = TypeHint.Null
def __init__(self, value=None):
"""Null literal value."""
super(Null, self).__init__(None)
def __invert__(self):
"""Null values can't be inverted."""
return Null()
def _render(self):
return 'null'
class Number(Literal):
"""Numeric literal."""
type_hint = TypeHint.Numeric
def _render(self):
return to_unicode(self.value)
class String(Literal):
"""String literal."""
escape_patterns = {
'\\': '\\\\',
'\b': '\\b',
'\t': '\\t',
'\r': '\\r',
'\n': '\\n',
'\f': '\\f',
'\"': '\\\"',
'\'': '\\\'',
}
reverse_patterns = {v: k for k, v in escape_patterns.items()}
escape_re = r'[{}]'.format('|'.join(escape_patterns.values()))
type_hint = TypeHint.String
@classmethod
def escape(cls, s):
"""Escape known patterns in a string."""
def replace_callback(sub):
return cls.escape_patterns.get(sub.group(), sub.group())
return re.sub(cls.escape_re, replace_callback, s)
@classmethod
def unescape(cls, s):
"""Unescape an EQL rendered string."""
def replace_callback(sub):
original = sub.group()
if original.startswith("\\u{"):
hex_digits = original[3:-1]
as_hex = int(hex_digits, 16)
return chr_compat(as_hex)
return cls.reverse_patterns[sub.group()]
return re.sub(r"\\u\{.+\}|\\[^u]", replace_callback, s)
def _render(self):
return '"{}"'.format(self.escape(self.value))
class TimeUnit(Enum):
"""Elasticsearch compatible time units."""
Milliseconds = "ms"
Seconds = "s"
Minutes = "m"
Hours = "h"
Days = "d"
@classmethod
def get_units(cls):
"""Return all time units in order of precision."""
return [cls.Milliseconds, cls.Seconds, cls.Minutes, cls.Hours, cls.Days]
def as_milliseconds(self):
"""Convert a time unit to a number of milliseconds."""
if self == TimeUnit.Milliseconds:
return 1
elif self == TimeUnit.Seconds:
return 1000
elif self == TimeUnit.Minutes:
return 1000 * 60
elif self == TimeUnit.Hours:
return 1000 * 60 * 60
elif self == TimeUnit.Days:
return 1000 * 60 * 60 * 24
else:
raise ValueError("Unknown time unit {}".format(repr(self)))
def __repr__(self):
"""Override the default __repr__ method."""
return "{}({})".format(type(self).__name__, repr(self.value))
[docs]class TimeRange(Expression):
"""EQL node for an interval of time."""
__slots__ = 'quantity', 'unit',
precedence = Expression.precedence + 1
def __init__(self, quantity, unit): # type: (int, TimeUnit) -> None
"""EQL time interval."""
self.quantity = quantity
self.unit = unit
def as_milliseconds(self):
"""Convert a time range to milliseconds."""
return self.quantity * self.unit.as_milliseconds()
def _render(self):
return '{:d}{:s}'.format(self.quantity, self.unit.value)
[docs]class Field(Expression):
"""Variables and paths in scope of the event."""
EVENTS = 'events'
__slots__ = 'base', 'path',
precedence = Expression.precedence + 1
field_re = re.compile("^[_A-Za-z][_A-Za-z0-9]*$")
def __init__(self, base, path=None, as_var=False):
"""Query the event for the field expression.
:param str base: The root field
:param list[str|int] path: The sub fields and array positions
:param bool as_var: Render the node as a variable
"""
self.base = base
self.path = path or []
self.as_var = as_var
def query_multiple_events(self): # type: () -> (int, Field)
"""Get the index into the event array and query."""
if self.base == Field.EVENTS and len(self.path) >= 2:
if is_number(self.path[0]) and is_string(self.path[1]):
return self.path[0], Field(self.path[1], self.path[2:])
return 0, self
@property
def full_path(self): # type: () -> list[str]
"""Get the full path for a field."""
return [self.base] + self.path
@classmethod
def escape_ident(cls, key):
"""Escape identifiers that are keywords."""
from .parser import keywords
if key in keywords or cls.field_re.match(key) is None:
return "`{key}`".format(key=key)
return key
def _render(self):
text = []
if self.as_var:
text.append("$")
text.append(self.escape_ident(self.base))
for key in self.path:
if is_number(key):
text.append("[{}]".format(key))
else:
text.append(".{}".format(self.escape_ident(key)))
return "".join(text)
[docs]class FunctionCall(Expression):
"""A call into a user-defined function by name and a list of arguments."""
__slots__ = 'name', 'arguments', 'as_method'
precedence = Literal.precedence + 1
template = Template('$name($arguments)')
delims = {'arguments': ', '}
def __init__(self, name, arguments, as_method=False):
"""Call the function by name.
:param str name: The name of the user-defined function
:param list[Expression] arguments: Arguments to pass into the function.
"""
self.name = name
self.arguments = arguments or []
self.as_method = as_method
@property
def callback(self):
"""Get the callback for this node."""
return self.signature.get_callback(*self.arguments)
@property
def signature(self):
"""Get the matching function signature."""
return get_function(self.name)
def _render(self):
"""Determine the precedence by checking if it's called as a method."""
if self.as_method:
return '{base}:{name}({remaining})'.format(
base=self.arguments[0].render(self.precedence), name=self.name,
remaining=", ".join(arg.render(self.precedence) for arg in self.arguments[1:]))
return super(FunctionCall, self)._render()
def render(self, precedence=None):
"""Convert wildcards back to the short hand syntax."""
if self.signature:
alternate_render = self.signature.alternate_render(self.arguments, precedence)
if alternate_render:
return alternate_render
return super(FunctionCall, self).render()
def __or__(self, other):
"""Optimize OR comparisons between matching variadic binary functions."""
if isinstance(other, FunctionCall) and \
self.name in ("wildcard", "match", "matchLite") and other.name == self.name:
# wildcard(x, ABC...) or wildcard(x, DEF...) ==> wildcard(x, ABC..., DEF...)
if self.arguments[0] == other.arguments[0]:
return FunctionCall(self.name, self.arguments + other.arguments[1:])
return super(FunctionCall, self).__or__(other)
[docs]class NamedSubquery(Expression):
"""Named of queries perform a subquery with a specific type and returns true if the current event is related.
Query Types:
- descendant: Returns true if the pid/unique_pid of the event is a descendant of the subquery process
- child: Returns true if the pid/unique_pid of the event is a child of the subquery process
- event: Returns true if the pid/unique_pid of the event matches the subquery process
"""
__slots__ = 'query_type', 'query'
precedence = FunctionCall.precedence
DESCENDANT = 'descendant'
EVENT = 'event'
CHILD = 'child'
supported_types = (DESCENDANT, EVENT, CHILD)
template = Template('$query_type of [$query]')
def __init__(self, query_type, query):
"""Init.
:param str query_type: The type of subquery to relate by
:param EventQuery query: Query applied to the process' ancestor(s)
"""
self.query_type = query_type
self.query = query
class MathOperation(Expression):
"""Mathematical operation between two numeric values."""
__slots__ = 'left', 'operator', 'right'
OPERATORS = ('*', '/', '%', '+', '-')
func_lookup = {"*": "multiply", "+": "add", "-": "subtract", "%": "modulo", "/": "divide"}
min_precedence = NamedSubquery.precedence + 1
max_precedence = min_precedence + 1
full_template = Template('$left $operator $right')
negative_template = Template('$operator$right')
def __init__(self, left, operator, right): # type: (Expression, str, Expression) -> None
"""Mathematical operation between two numeric values."""
self.left = left
self.operator = operator
self.right = right
def to_function_call(self):
"""Convert a math operator to an EQL function call."""
return FunctionCall(self.func_lookup[self.operator], [self.left, self.right])
@property
def precedence(self):
"""Get the precedence depending on the operator."""
if self.operator in "*/%":
return self.min_precedence
else:
return self.max_precedence
@property
def template(self):
"""Make the template dynamic."""
return self.negative_template if self.left == Number(0) else self.full_template
[docs]class Comparison(Expression):
"""Represents a comparison between two values, as in ``<expr> <comparator> <expr>``.
Comparison operators include ``==``, ``!=``, ``<``, ``<=``, ``>=``, and ``>``.
"""
__slots__ = 'left', 'comparator', 'right'
LT, LE, EQ, NE, GE, GT = ('<', '<=', '==', '!=', '>=', '>')
func_lookup = {LT: lt, LE: le, EQ: eq, NE: ne, GE: ge, GT: gt}
precedence = MathOperation.max_precedence + 1
template = Template('$left $comparator $right')
def __init__(self, left, comparator, right):
# type: (Expression, str, Expression) -> None
"""Compare two fields or values to each other."""
self.left = left
self.comparator = comparator
self.right = right
self.function = self.func_lookup[comparator]
def __invert__(self):
"""Convert a comparison by flipping the operators."""
if self.comparator == self.EQ:
return Comparison(self.left, Comparison.NE, self.right).optimize()
elif self.comparator == self.NE:
return Comparison(self.left, Comparison.EQ, self.right).optimize()
return super(Comparison, self).__invert__()
def __or__(self, other):
"""Check for one field being compared to multiple values, and switch to a set."""
if self.comparator == Comparison.EQ and isinstance(self.right, Literal):
if isinstance(other, Comparison) and self.left == other.left and other.comparator == Comparison.EQ:
if isinstance(other.right, Literal):
return InSet(self.left, [self.right, other.right])
elif isinstance(other, InSet) and self.left == other.expression and other.is_literal():
container = [self.right]
container.extend(other.container)
return InSet(self.left, container)
return super(Comparison, self).__or__(other)
def __and__(self, other):
"""Check if a comparison is ANDed to a set."""
if self.comparator == Comparison.EQ and isinstance(other, InSet) and self.left == other.expression:
return InSet(self.left, [self.right]) & other
return super(Comparison, self).__and__(other)
class IsNull(Expression):
"""Node for checking if values are null."""
template = Template("$expr == null")
precedence = Comparison.precedence
__slots__ = "expr",
def __init__(self, expr): # type: (Expression) -> None
"""Check if a value is null."""
self.expr = expr
class IsNotNull(Expression):
"""Node for checking if values are null."""
template = Template("$expr != null")
precedence = Comparison.precedence
__slots__ = "expr",
def __init__(self, expr): # type: (Expression) -> None
"""Check if a value is not null."""
self.expr = expr
[docs]class InSet(Expression):
"""Check if the value of a field within an event matches a list of values."""
__slots__ = 'expression', 'container'
precedence = Comparison.precedence
def __init__(self, expression, container):
# type: (Expression, list[Expression]) -> None
"""Check if a value is in a list of possible values."""
self.expression = expression
self.container = container
def is_literal(self):
"""Check if a set contains only literal values."""
return all(isinstance(v, Literal) for v in self.container)
def is_dynamic(self):
"""Check if a set contains only dynamic values."""
return all(not isinstance(v, Literal) for v in self.container)
def get_literals(self):
"""Get the values in the set."""
values = OrderedDict()
for literal in self.container: # type: Literal
if not isinstance(literal, Literal):
continue
k = literal.value
if isinstance(literal, String):
values.setdefault(fold_case(k), literal)
else:
values[k] = literal
return values
def __and__(self, other):
"""Perform an intersection between two sets for boolean AND."""
if isinstance(other, InSet) and self.expression == other.expression:
if self.is_literal() and other.is_literal():
container1 = self.get_literals()
container2 = other.get_literals()
reduced = [v for k, v in container1.items() if k in container2]
return InSet(self.expression, reduced).optimize()
elif isinstance(other, Not):
if isinstance(other.term, InSet) and self.expression == other.term.expression:
# Check if one set is being subtracted from another
if self.is_literal() and other.term.is_literal():
container1 = self.get_literals()
container2 = other.term.get_literals()
reduced = [v for k, v in container1.items() if k not in container2]
return InSet(self.expression, reduced).optimize()
elif isinstance(other, Comparison) and other.comparator == Comparison.EQ and self.expression == other.left:
if self.is_literal() and isinstance(other.right, Literal):
return super(InSet, self).__and__(InSet(other.left, [other.right])).optimize()
elif isinstance(other, Comparison) and other.comparator == Comparison.NE and self.expression == other.left:
if self.is_literal() and isinstance(other.right, Literal):
return super(InSet, self).__and__(~ InSet(other.left, [other.right])).optimize()
return super(InSet, self).__and__(other)
def __or__(self, other):
"""Perform a union between two sets for boolean OR."""
if isinstance(other, InSet) and self.expression == other.expression:
if self.is_literal() and other.is_literal():
container = self.get_literals()
for k, v in other.get_literals().items():
container.setdefault(k, v)
union = [v for v in container.values()]
return InSet(self.expression, union).optimize()
elif isinstance(other, Comparison) and self.expression == other.left:
if self.is_literal() and isinstance(other.right, Literal):
return self.__or__(InSet(other.left, [other.right]))
return super(InSet, self).__or__(other)
def split_literals(self):
"""Split the set lookup into static values and dynamic values."""
if self.is_dynamic() or self.is_literal():
return self
literals = InSet(self.expression, [])
dynamic = InSet(self.expression, [])
for item in self.container:
if isinstance(item, Literal):
literals.container.append(item)
else:
dynamic.container.append(item)
return literals.optimize() | dynamic.optimize()
@property
def synonym(self):
"""Get an equivalent node that does performs multiple comparisons with 'or' and '=='."""
return Or([Comparison(self.expression, Comparison.EQ, v) for v in self.container])
def _render(self, negate=False):
values = [v.render() for v in self.container]
expr = self.expression.render(self.precedence)
operator = 'not in' if negate else 'in'
if len(self.container) > 3 and sum(len(v) for v in values) > 40:
delim = ',\n'
return '{lhs} {op} (\n{rhs}\n)'.format(lhs=expr, op=operator, rhs=self.indent(delim.join(values)))
else:
delim = ', '
return '{lhs} {op} ({rhs})'.format(lhs=expr, op=operator, rhs=delim.join(values))
class BaseCompound(Expression):
"""Combine multiple expressions with a single operator."""
__slots__ = 'terms',
operator = None # type: str
def __init__(self, terms):
"""Combine multiple expressions with an operator.
:param list[Expression] terms: List of terms
"""
self.terms = terms
def _render(self):
scoped_terms = [term.render(self.precedence) for term in self.terms]
if len(scoped_terms) == 1:
return scoped_terms[0]
if len(self.terms) > 4 or any(isinstance(t, (BaseCompound, NamedSubquery, InSet)) for t in self.terms):
delim = ' {}\n'.format(self.operator)
indented = [self.indent(t) for t in scoped_terms]
return delim.join(indented).lstrip()
else:
delim = ' {} '.format(self.operator)
return delim.join(scoped_terms).lstrip()
[docs]class Not(Expression):
"""Negate a boolean expression."""
__slots__ = 'term',
precedence = Comparison.precedence + 1
template = Template('not $term')
def __init__(self, term):
"""Init.
:param Expression term: The query node to negate
"""
self.term = term
def demorgans(self):
"""Apply DeMorgan's law."""
if isinstance(self.term, Or):
return And([(~ t).optimize() for t in self.term.terms]).optimize()
elif isinstance(self.term, And):
return Or([(~ t).optimize() for t in self.term.terms]).optimize()
else:
return ~ self.term.optimize()
def __invert__(self):
"""Convert ``not not X`` to X."""
return self.term.optimize()
def render(self, precedence=None):
"""Convert wildcard functions back to the short hand syntax."""
if isinstance(self.term, InSet):
return self.term.render(precedence, negate=True)
if isinstance(self.term, FunctionCall) and self.term.name == 'wildcard':
if len(self.term.arguments) == 2 and isinstance(self.term.arguments[1], String):
lhs, rhs = self.term.arguments
return Comparison(lhs, Comparison.NE, rhs).render(precedence)
return super(Not, self).render(precedence)
[docs]class And(BaseCompound):
"""Perform a boolean ``and`` on a list of expressions."""
precedence = Not.precedence + 1
operator = 'and'
[docs]class Or(BaseCompound):
"""Perform a boolean ``or`` on a list of expressions."""
precedence = And.precedence + 1
operator = 'or'
[docs]class EventQuery(EqlNode):
"""Query over a specific event type with a boolean condition."""
__slots__ = 'event_type', 'query'
template = Template('$event_type where $query')
def __init__(self, event_type, query):
"""Init.
:param str event_type: One of the event types in the repo sensor/eventing_schema
:param query: The query scoped to the event type
"""
self.event_type = event_type
self.query = query
def _render(self):
query_text = self.query.render()
if '\n' in query_text:
return '{} where\n{}'.format(self.event_type, self.indent(query_text))
return super(EventQuery, self)._render()
[docs]class NamedParams(EqlNode):
"""An EQL node for key-value named parameters."""
__slots__ = 'kv',
def __init__(self, kv=None):
"""Key value store for EQL parameters.
:param dict[str, Expression] kv: The named key-value parameters.
"""
self.kv = kv or {}
def _render(self):
return ' '.join('{}={}'.format(k, v.render(Literal.precedence)) for k, v in self.kv.items())
[docs]class SubqueryBy(EqlNode):
"""Node for holding the :class:`~EventQuery` and parameters to join on."""
__slots__ = 'query', 'join_values', 'fork',
def __init__(self, query, join_values=None, fork=None):
"""Init.
:param EventQuery query: The event query enclosed in the term
:param list[Expression] join_values: The field to join values on
:param bool fork: Toggle for copying instead of moving a sequence on match
"""
self.query = query
self.join_values = join_values or []
self.fork = fork
@property
def params(self):
"""Keep params for backwards compatibility."""
params = {}
if self.fork is not None:
params["fork"] = Boolean(self.fork)
return NamedParams(params)
def _render(self):
text = "[{}]".format(self.query.render())
params = self.params.render()
if len(params):
text += ' ' + params
if len(self.join_values):
text += ' by {}'.format(', '.join(jv.render() for jv in self.join_values))
return text
[docs]class Join(EqlNode):
"""Another boolean query that can join multiple events that share common values."""
__slots__ = 'queries', 'close'
def __init__(self, queries, close=None):
"""Init.
:param list[SubqueryBy] queries:
:param SubqueryBy close: The condition to purge all join state.
"""
self.queries = queries
self.close = close
def _render(self):
text = 'join\n'
text += self.indent('\n'.join(query.render() for query in self.queries))
if self.close:
text += '\nuntil\n' + self.indent(self.close.render())
return text
class Sample(EqlNode):
"""Sample finds events matching the defined filters, regardless of their temporal order.
Sample supports defining one or more join keys.
"""
__slots__ = 'queries',
def __init__(self, queries):
"""Create a Sample of multiple events.
:param list[SubqueryBy] queries: List of queries to be sampled
"""
self.queries = queries
def _render(self):
text = 'sample'
text += self.indent('\n'.join(query.render() for query in self.queries))
return text
[docs]class Sequence(EqlNode):
"""Sequence is very similar to join, but enforces an ordering.
Sequence supports the ``until`` keyword, which indicates an event that causes it to terminate early.
"""
__slots__ = 'queries', 'max_span', 'close'
def __init__(self, queries, max_span=None, close=None):
"""Create a Sequence of multiple events.
:param list[SubqueryBy] queries: List of queries to be sequenced
:param TimeUnit max_span: Dictionary of timing parameters for the sequence.
:param SubqueryBy close: An optional query that causes all sequence state to expire
"""
self.queries = queries
self.max_span = max_span
self.close = close
@property
def params(self):
"""Keep params for backwards compatibility."""
params = {}
if self.max_span is not None:
params["maxspan"] = self.max_span
return NamedParams(params)
def _render(self):
text = 'sequence'
if self.max_span:
text += ' with maxspan={}'.format(self.max_span.render())
text += '\n'
text += self.indent('\n'.join(query.render() for query in self.queries))
if self.close:
text += '\nuntil\n' + self.indent(self.close.render())
return text
# noinspection PyAbstractClass
[docs]class PipeCommand(EqlNode, SignatureMixin):
"""Base class for an EQL pipe."""
__slots__ = 'arguments',
name = None # type: str
lookup = {} # type: dict[str, PipeCommand|type]
def __init__(self, arguments=None): # type: (list[Expression]) -> None
"""Create a pipe with optional arguments."""
self.arguments = arguments or []
super(PipeCommand, self).__init__()
@classmethod
def register(cls, name):
"""Register a pipe class by name."""
def decorator(pipe_class):
pipe_class.name = name
if name in cls.lookup:
raise KeyError("Pipe {} already registered as {}".format(cls.lookup[name], name))
cls.lookup[name] = pipe_class
return pipe_class
return decorator
@classmethod
def output_schemas(cls, arguments, event_schemas):
# type: (list[NodeInfo], list[Schema]) -> list[Schema]
"""Output a list of schemas for each event in the pipe."""
return event_schemas
def _render(self):
if len(self.arguments) == 0:
return self.name
return self.name + ' ' + ', '.join(arg.render() for arg in self.arguments)
[docs]class PipedQuery(EqlNode):
"""List of all the pipes."""
__slots__ = 'first', 'pipes'
def __init__(self, first, pipes=None):
"""Init.
:param EventQuery|Join|Sequence first: first query
:param list[PipeCommand] pipes: List of all of the following pipes
"""
self.first = first
self.pipes = pipes or []
def _render(self):
all_pipes = [self.first] + self.pipes
return '\n| '.join(pipe.render() for pipe in all_pipes)
[docs]class EqlAnalytic(EqlNode):
"""Analytics are the top-level nodes for matching and returning events."""
__slots__ = 'query', 'metadata'
def __init__(self, query, metadata=None):
"""Init.
:param PipedQuery query: Analytic query
:param dict metadata: Metadata for the analytic
"""
self.query = query
self.metadata = metadata or {}
@property
def id(self):
"""Return the ID from metadata."""
return self.metadata.get('id')
@property
def name(self):
"""Return the name from metadata."""
return self.metadata.get('name')
def __unicode__(self):
"""Print a string instead of the dictionary that render returns."""
return self.query.__unicode__()
def __str__(self):
"""Print a string instead of the dictionary that render returns."""
return self.query.__str__()
def _render(self):
return {'metadata': self.metadata, 'query': self.query.render()}
class Definition(object):
"""EQL definitions used for pre-processor expansion."""
__slots__ = 'name',
def __init__(self, name):
"""Create a generic definition with a name.
:param str name: The name of the macro
"""
self.name = name
super(Definition, self).__init__()
class Constant(Definition, EqlNode):
"""EQL constant which binds a literal to a name."""
__slots__ = 'name', 'value',
template = Template('const $name = $value')
def __init__(self, name, value): # type: (str, Literal) -> None
"""Create an EQL literal constant."""
super(Constant, self).__init__(name)
self.value = value
class BaseMacro(Definition):
"""Base macro class."""
def expand(self, arguments):
"""Expand a macro with a set of arguments."""
raise NotImplementedError
class CustomMacro(BaseMacro):
"""Custom macro class to use Python callbacks to transform trees."""
def __init__(self, name, callback):
"""Python macro to allow for more dynamic or sophisticated macros.
:param str name: The name of the macro.
:param (list[EqlNode]) -> EqlNode callback: A callback to expand out the macro.
"""
super(CustomMacro, self).__init__(name)
self.callback = callback
def expand(self, arguments):
"""Make the callback do the dirty work for expanding the AST."""
node = self.callback(arguments)
return node.optimize(recursive=True)
@classmethod
def from_name(cls, name):
"""Decorator to convert a function into a :class:`~CustomMacro` object."""
def decorator(f):
return CustomMacro(name, f)
return decorator
class Macro(BaseMacro, EqlNode):
"""Class for a macro on a node, to allow for client-side expansion."""
__slots__ = 'name', 'parameters', 'expression'
template = Template('macro $name($parameters) $expression')
delims = {'parameters': ', '}
def __init__(self, name, parameters, expression):
"""Create a named macro that takes a list of arguments and returns a paramaterized expression.
:param str name: The name of the macro.
:param list[str]: The names of the parameters.
:param Expression expression: The parameterized expression to return.
"""
BaseMacro.__init__(self, name)
EqlNode.__init__(self)
self.parameters = parameters
self.expression = expression
def expand(self, arguments):
"""Expand a node.
:param list[BaseNode node] arguments: The arguments the macro is called with
:param Walker walker: An optional syntax tree walker.
:param bool optimize: Return an optimized copy of the AST
:rtype: BaseNode
"""
if len(arguments) != len(self.parameters):
raise ValueError("Macro {} expected {} arguments but received {}".format(
self.name, len(self.parameters), len(arguments)))
lookup = dict(zip(self.parameters, arguments))
def _walk_field(node):
# type: (Field) -> Field
if node.base in lookup:
argument_value = lookup[node.base]
if node.path:
if not isinstance(argument_value, Field):
raise EqlCompileError("Invalid expansion: {} ({}={})".format(node, node.base, argument_value))
# extend the attributes being retrieved
return Field(argument_value.base, list(argument_value.path) + list(node.path))
return argument_value
return node
walker = RecursiveWalker()
walker.register_func(Field, _walk_field)
expanded = walker.walk(self.expression)
return expanded.optimize(recursive=True)
def _render(self):
expr = self.expression.render()
if '\n' in expr or len(expr) > 40:
expr = '\n' + self.indent(expr)
return self.template.substitute(name=self.name, parameters=', '.join(self.parameters), expression=expr)
return super(Macro, self)._render()
class PreProcessor(ParserConfig):
"""An EQL preprocessor stores definitions and is used for macro expansion and constants."""
def __init__(self, definitions=None):
"""Initialize a preprocessor environment that can load definitions."""
self.constants = OrderedDict() # type: dict[str, Constant]
self.macros = OrderedDict() # type: dict[str, BaseMacro|CustomMacro|Maco]
class PreProcessorWalker(RecursiveWalker):
"""Custom walker class for this preprocessor."""
preprocessor = self
def _walk_field(self, node, *args, **kwargs):
if node.base in self.preprocessor.constants and not node.path:
return self.preprocessor.constants[node.base].value
return self._walk_base_node(node, *args, **kwargs)
def _walk_function_call(self, node, *args, **kwargs):
if node.name in self.preprocessor.macros:
macro = self.preprocessor.macros[node.name]
arguments = [self.walk(arg, *args, **kwargs) for arg in node.arguments]
return macro.expand(arguments)
return self._walk_base_node(node, *args, **kwargs)
self.walker_cls = PreProcessorWalker
ParserConfig.__init__(self, preprocessor=self)
self.add_definitions(definitions or [])
def add_definitions(self, definitions):
"""Add a list of definitions."""
for definition in definitions:
self.add_definition(definition)
def add_definition(self, definition): # type: (BaseMacro|Constant) -> None
"""Add a named definition to the preprocessor."""
name = definition.name
if isinstance(definition, BaseMacro):
# The macro may call into other macros so it should be expanded
expanded_macro = self.expand(definition)
self.macros[name] = expanded_macro
elif isinstance(definition, Constant):
if name in self.constants:
raise KeyError("Constant {} already defined".format(name))
self.constants[name] = definition
def expand(self, root):
"""Expand the function calls that match registered macros.
:param EqlNode root: The input node, macro, expression, etc.
:param bool optimize: Toggle AST optimizations while expanding
:rtype: EqlNode
"""
if not self.constants and not self.macros:
return root
return self.walker_cls().walk(root)
def copy(self):
"""Create a shallow copy of a preprocessor."""
preprocessor = PreProcessor()
preprocessor.constants.update(self.constants)
preprocessor.macros.update(self.macros)
return preprocessor
# circular dependency
from .walkers import Walker, RecursiveWalker # noqa: E402
from .functions import get_function # noqa: E402
from .optimizer import Optimizer # noqa: E402