Skip to content

logs

Work with a Filebeat's filebeat.log.

Currently, supports:

  • filebeat.log

To import...

from dynamite_nsm.services.filebeat import logs as filebeat_logs

InvalidFilebeatStatusLogEntry

__init__(self, message) special

Thrown when a Filebeat log entry is improperly formatted

Parameters:

Name Type Description Default
message

A more specific error message

required

Returns:

Type Description

None

Source code in dynamite_nsm/services/filebeat/logs.py
def __init__(self, message):
    """Thrown when a Filebeat log entry is improperly formatted
    Args:
        message: A more specific error message
    Returns:
        None
    """
    msg = "FileBeat log entry is invalid: {}".format(message)
    super(InvalidFilebeatStatusLogEntry, self).__init__(msg)

MetricsEntry

A single Filebeat metrics entry for a specific time-interval

__init__(self, monitoring_payload, time) special

Initialize metrics entry object

Parameters:

Name Type Description Default
monitoring_payload Dict

The serialized JSON for "monitoring" status types

required
time datetime

A datetime object representing when the metrics entry was written

required
Source code in dynamite_nsm/services/filebeat/logs.py
def __init__(self, monitoring_payload: Dict, time: datetime):
    """Initialize metrics entry object
    Args:
        monitoring_payload: The serialized JSON for "monitoring" status types
        time: A datetime object representing when the metrics entry was written
    """

    self.monitoring_payload = monitoring_payload
    metrics = self.monitoring_payload["monitoring"]["metrics"]
    self.time = time
    self.open_file_handles = metrics.get("beat", {}).get("handles", {}).get("open", 0)
    self.memory_allocated = metrics.get("beat", {}).get("memstats", {}).get("memory_alloc", 0)
    self.harvester_open_files = metrics.get("filebeat", {}).get("harvester", {}).get("open_files", 0)
    self.harvester_running_files = metrics.get("filebeat", {}).get("harvester", {}).get("running_files", 0)
    self.write_bytes = metrics.get("libbeat", {}).get("output", {}).get("write", {}).get("bytes", 0)
    self.read_bytes = metrics.get("libbeat", {}).get("output", {}).get("read", {}).get("bytes", 0)
    self.active_events = metrics.get("libbeat", {}).get("pipeline", {}).get("events", {}).get("active", 0)
    self.published_events = metrics.get("libbeat", {}).get("pipeline", {}).get("events", {}).get("published", 0)

merge_metric_entry(self, metric_entry)

Merge another metrics entry into this one

Parameters:

Name Type Description Default
metric_entry MetricsEntry

The MetricsEntry you wish to merge in

required

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/filebeat/logs.py
def merge_metric_entry(self, metric_entry: MetricsEntry) -> None:
    """Merge another metrics entry into this one
    Args:
        metric_entry: The MetricsEntry you wish to merge in
    Returns:
        None
    """
    self.open_file_handles = math.ceil((self.open_file_handles + metric_entry.open_file_handles) / 2)
    self.memory_allocated = math.ceil((self.memory_allocated + metric_entry.memory_allocated) / 2)
    self.harvester_open_files = math.ceil((self.harvester_open_files + metric_entry.harvester_open_files) / 2)
    self.harvester_running_files = math.ceil(
        (self.harvester_running_files + metric_entry.harvester_running_files) / 2)
    self.write_bytes += metric_entry.write_bytes
    self.read_bytes += metric_entry.read_bytes
    self.active_events += metric_entry.active_events
    self.published_events += metric_entry.published_events

StatusEntry

An entry from Filebeat's main log; automatically parses out MetricsEntries into their own dedicated object

__init__(self, entry_raw, include_json_payload=False) special

A status entry

Parameters:

Name Type Description Default
entry_raw str

A line item representing a single entry within the Filebeat log

required
include_json_payload Optional[bool]

If, True, then the metrics payload will be included in its raw JSON form

False
Source code in dynamite_nsm/services/filebeat/logs.py
def __init__(self, entry_raw: str, include_json_payload: Optional[bool] = False):
    """A status entry
    Args:
        entry_raw: A line item representing a single entry within the Filebeat log
        include_json_payload: If, True, then the metrics payload will be included in its raw JSON form
    """

    self.include_json_payload = include_json_payload
    self.entry_raw = entry_raw
    self.json_payload = False
    self.payload = None
    self.metrics = None
    self.message = None
    self.description = None
    self.category = None
    self.timestamp = None
    self.log_level = None
    self._parse_entry()

StatusLog

Provides an interface for working with Filebeat's main log

__init__(self, log_sample_size=500, include_json_payloads=False) special

Work with Filebeat's filebeat.log

Parameters:

Name Type Description Default
log_sample_size Optional[int]

The maximum number of entries to parse

500
include_json_payloads Optional[bool]

If, True, then metrics payloads will be included in their raw JSON form

False
Source code in dynamite_nsm/services/filebeat/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_json_payloads: Optional[bool] = False):
    """Work with Filebeat's filebeat.log
    Args:
        log_sample_size: The maximum number of entries to parse
        include_json_payloads: If, True, then metrics payloads will be included in their raw JSON form
    """

    self.include_json_payloads = include_json_payloads
    self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
    self.env_dict = utilities.get_environment_file_dict()
    self.filebeat_home = self.env_dict.get('FILEBEAT_HOME')
    self.log_path = os.path.join(self.filebeat_home, 'logs', 'filebeat')

    logs.LogFile.__init__(self,
                          log_path=self.log_path,
                          log_sample_size=log_sample_size)

iter_aggregated_metrics(self, start=None, end=None, tolerance_seconds=60)

Iterates through metric entries, while aggregating entries together that are within the same tolerance_seconds into a single MetricsEntry

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None
tolerance_seconds Optional[int]

Specifies the maximum numbers seconds between entries to consider them common, and therefore aggregate.

60

Returns:

Type Description

yields a MetricsEntry for every iteration

Source code in dynamite_nsm/services/filebeat/logs.py
def iter_aggregated_metrics(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
                            tolerance_seconds: Optional[int] = 60):
    """Iterates through metric entries, while aggregating entries together that are within the same tolerance_seconds into a single MetricsEntry
    Args:
        start: UTC start time
        end: UTC end time
        tolerance_seconds: Specifies the maximum numbers seconds between entries to consider them common, and therefore aggregate.
    Returns:
         yields a MetricsEntry for every iteration
    """

    sorted_by_time = [metric for metric in self.iter_metrics(start, end)]
    if not sorted_by_time:
        return
    sorted_by_time = sorted(sorted_by_time, key=lambda x: x.time)
    start = sorted_by_time[0].time
    for name, group in itertools.groupby(
            sorted_by_time, lambda x: int((x.time - start).total_seconds() // tolerance_seconds + 1)):
        aggregated_entry = None
        for entry in group:
            if not aggregated_entry:
                aggregated_entry = entry
            else:
                aggregated_entry.merge_metric_entry(entry)
        yield aggregated_entry

iter_entries(self, start=None, end=None, log_level=None, category=None)

Iterate through StatusEntries while providing some basic filtering options

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None
log_level

DEBUG, INFO, WARN, ERROR, CRITICAL

None
category

Defaults to all if none specified; valid categories are beat, cfgwarn, crawler, harvester, monitoring, publisher, registrar, seccomp

None

Returns:

Type Description

yields a StatusEntry for every iteration

Source code in dynamite_nsm/services/filebeat/logs.py
def iter_entries(self, start: Optional[datetime] = None, end: Optional[datetime] = None, log_level=None,
                 category=None):
    """Iterate through StatusEntries while providing some basic filtering options
    Args:
        start: UTC start time
        end: UTC end time
        log_level: DEBUG, INFO, WARN, ERROR, CRITICAL
        category: Defaults to all if none specified; valid categories are beat, cfgwarn, crawler, harvester, monitoring, publisher, registrar, seccomp
    Returns:
         yields a StatusEntry for every iteration
    """

    def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
        if not e:
            e = datetime.utcnow()
        if not s:
            s = datetime.utcnow() - timedelta(minutes=60)
        for en in self.entries:
            try:
                en = StatusEntry(en, include_json_payload=self.include_json_payloads)
            except InvalidFilebeatStatusLogEntry:
                continue
            if s < en.time < e:
                yield en

    for log_entry in filter_entries(start, end):
        if log_level:
            if log_entry.log_level.lower() != log_level.lower():
                continue
        if category:
            if log_entry.category.lower() != category.lower():
                continue
        yield log_entry

iter_metrics(self, start=None, end=None)

Iterate through metrics entries individually

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None

Returns:

Type Description

yields a MetricsEntry for every iteration

Source code in dynamite_nsm/services/filebeat/logs.py
def iter_metrics(self, start: Optional[datetime] = None, end: Optional[datetime] = None):
    """Iterate through metrics entries individually
    Args:
        start: UTC start time
        end: UTC end time
    Returns:
         yields a MetricsEntry for every iteration
    """
    for entry in self.iter_entries(start, end):
        if entry.metrics:
            yield entry.metrics

tail_entries(self, pretty_print=True)

Tail and follow a log to console

Parameters:

Name Type Description Default
pretty_print Optional[bool]

Print the log entry in a nice tabular view

True

Returns:

Type Description

None

Source code in dynamite_nsm/services/filebeat/logs.py
def tail_entries(self, pretty_print: Optional[bool] = True):
    """Tail and follow a log to console
    Args:
        pretty_print: Print the log entry in a nice tabular view
    Returns:
        None
    """
    visited = []
    start = datetime.utcnow() - timedelta(days=365)
    try:
        while True:
            end = datetime.utcnow()
            self.refresh()
            for entry in self.iter_entries(start=start, end=end):
                if entry.timestamp not in visited:
                    visited.append(entry.timestamp)
                    if not pretty_print:
                        print(json.dumps(json.loads(str(entry)), indent=1))
                    else:
                        status_table = [
                            ['Time', 'Log Level', 'Category', 'Message'],
                            [entry.time, entry.log_level, entry.category, entry.message]
                        ]
                        print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
                if len(visited) > 100:
                    visited = []
            start = datetime.utcnow() - timedelta(seconds=60)
            time.sleep(5)
    except KeyboardInterrupt:
        print(utilities.PrintDecorations.colorize('OK', 'green'))

tail_metrics(self, pretty_print=True)

Tail and follow a metrics log to console

Parameters:

Name Type Description Default
pretty_print Optional[bool]

Print the log entry in a nice tabular view

True

Returns:

Type Description

None

Source code in dynamite_nsm/services/filebeat/logs.py
def tail_metrics(self, pretty_print: Optional[bool] = True):
    """Tail and follow a metrics log to console
    Args:
        pretty_print: Print the log entry in a nice tabular view
    Returns:
        None
    """
    visited = []
    start = datetime.utcnow() - timedelta(days=365)
    try:
        while True:
            end = datetime.utcnow()
            self.refresh()
            for metric in self.iter_aggregated_metrics(start=start, end=end):
                if metric.time.timestamp() not in visited:
                    visited.append(metric.time.timestamp())
                    if not pretty_print:
                        print(json.dumps(json.loads(str(metric)), indent=1))
                    else:
                        status_table = [
                            ['Time', 'Memory Allocated', 'Read (Bytes)',
                             'Write (Bytes)', 'Open Files', 'Active Events', 'Published Events'],
                            [metric.time, metric.memory_allocated, metric.read_bytes,
                             metric.write_bytes, metric.open_file_handles, metric.active_events,
                             metric.published_events]
                        ]
                        print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
                if len(visited) > 100:
                    visited = []
            start = datetime.utcnow() - timedelta(seconds=60)
            time.sleep(5)
    except KeyboardInterrupt:
        print(utilities.PrintDecorations.colorize('OK', 'green'))

parse_filebeat_datetime(t)

Parse a common filebeat timestamp string

Parameters:

Name Type Description Default
t str

A '%Y-%m-%dT%H:%M:%S.%f' formatted string

required

Returns:

Type Description
datetime

A datetime object

Source code in dynamite_nsm/services/filebeat/logs.py
def parse_filebeat_datetime(t: str) -> datetime:
    """Parse a common filebeat timestamp string
    Args:
        t: A '%Y-%m-%dT%H:%M:%S.%f' formatted string

    Returns:
        A datetime object
    """
    ret = datetime.strptime(t[0:22], '%Y-%m-%dT%H:%M:%S.%f')
    if t[23] == '+':
        ret -= timedelta(hours=int(t[24:26]), minutes=int(t[27:]))
    elif t[23] == '-':
        ret += timedelta(hours=int(t[24:26]), minutes=int(t[27:]))
    return ret