Skip to content

logs

Work with a variety of Zeek logs useful for troubleshooting and performance analysis.

Currently, supports:

  • broker.log
  • cluster.log
  • reporter.log
  • stats.log

To import...

from dynamite_nsm.services.zeek import logs as zeek_logs

BrokerEntry

A single line item entry for Zeek's broker.log

__init__(self, entry_raw) special

A single line item entry in the broker.log

Parameters:

Name Type Description Default
entry_raw str

A JSON serializable string representing a single line item entry in the broker.log

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry_raw: str):
    """
    A single line item entry in the broker.log

    Args:
        entry_raw: A JSON serializable string representing a single line item entry in the broker.log
    """
    self.entry_raw = entry_raw
    self.time = None
    self.timestamp = None
    self.category = None
    self.event = None
    self.peer_address = None
    self.peer_port = None
    self.message = None
    self._parse_entry()

BrokerLog

Provides an interface for working with Zeek's broker.log

__init__(self, log_sample_size=500, include_archived_logs=False) special

Work with Zeek's broker.log

Parameters:

Name Type Description Default
log_sample_size Optional[int]

The maximum number of log entries to load into memory

500
include_archived_logs Optional[bool]

If True, include gzipped archive logs

False
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
    """
    Work with Zeek's broker.log

    Args:
        log_sample_size: The maximum number of log entries to load into memory
        include_archived_logs: If True, include gzipped archive logs
    """
    self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
    self.env_dict = utilities.get_environment_file_dict()
    self.zeek_home = self.env_dict.get('ZEEK_HOME')
    self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'broker.log')

    logs.LogFile.__init__(self,
                          log_path=self.log_path,
                          log_sample_size=log_sample_size)
    if include_archived_logs:
        self.entries = ZeekLogsProxy('broker.log', log_sample_size=log_sample_size).entries

iter_entries(self, start=None, end=None)

Iterate through BrokerEntries while providing some basic filtering options

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None

Returns:

Type Description
Generator[BrokerEntry]

yields a BrokerEntry for every iteration

Source code in dynamite_nsm/services/zeek/logs.py
def iter_entries(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> Generator[BrokerEntry]:
    """Iterate through BrokerEntries while providing some basic filtering options
    Args:
        start: UTC start time
        end: UTC end time
    Returns:
         yields a BrokerEntry for every iteration
    """

    def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
        if not e:
            e = datetime.utcnow()
        if not s:
            s = datetime.utcnow() - timedelta(days=365)
        for en in self.entries:
            en = BrokerEntry(en)
            if s < en.time < e:
                yield en

    for log_entry in filter_entries(start, end):
        yield log_entry

tail(self, pretty_print=True)

Tail and follow a log to console

Parameters:

Name Type Description Default
pretty_print Optional[bool]

Print the log entry in a nice tabular view

True

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
    """Tail and follow a log to console
    Args:
        pretty_print: Print the log entry in a nice tabular view
    Returns:
        None
    """
    visited = []
    start = datetime.utcnow() - timedelta(days=365)
    try:
        while True:
            end = datetime.utcnow()
            self.refresh()
            for entry in self.iter_entries(start=start, end=end):
                if entry.timestamp not in visited:
                    visited.append(entry.timestamp)
                    if not pretty_print:
                        print(json.dumps(json.loads(str(entry)), indent=1))
                    else:
                        status_table = [
                            ['Time', 'Category', 'Peer', 'Message'],
                            [entry.time, entry.category, f'{entry.peer_address}:{entry.peer_port}', entry.message]
                        ]
                        print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
                if len(visited) > 100:
                    visited = []
            start = datetime.utcnow() - timedelta(seconds=60)
            time.sleep(5)
    except KeyboardInterrupt:
        print(utilities.PrintDecorations.colorize('OK', 'green'))

ClusterEntry

A single line item entry for Zeek's cluster.log

__init__(self, entry_raw) special

A single line item entry in the cluster.log

Parameters:

Name Type Description Default
entry_raw str

A JSON serializable string representing a single line item entry in the cluster.log

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry_raw: str):
    """
    A single line item entry in the cluster.log

    Args:
        entry_raw: A JSON serializable string representing a single line item entry in the cluster.log
    """
    self.entry_raw = entry_raw
    self.time = None
    self.timestamp = None
    self.message = None
    self._parse_entry()

ClusterLog

Provides an interface for working with Zeek's cluster.log

__init__(self, log_sample_size=500, include_archived_logs=False) special

Work with Zeek's cluster.log log_sample_size: The maximum number of log entries to load into memory include_archived_logs: If True, include gzipped archive logs

Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
    """Work with Zeek's cluster.log
        log_sample_size: The maximum number of log entries to load into memory
        include_archived_logs: If True, include gzipped archive logs
    """
    self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
    self.env_dict = utilities.get_environment_file_dict()
    self.zeek_home = self.env_dict.get('ZEEK_HOME')
    self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'cluster.log')

    logs.LogFile.__init__(self,
                          log_path=self.log_path,
                          log_sample_size=log_sample_size)
    if include_archived_logs:
        self.entries = ZeekLogsProxy('cluster.log', log_sample_size=log_sample_size).entries

iter_entries(self, start=None, end=None)

Iterate through ClusterEntries while providing some basic filtering options

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None

Returns:

Type Description
Generator[ClusterEntry]

yields a ClusterEntry for every iteration

Source code in dynamite_nsm/services/zeek/logs.py
def iter_entries(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> Generator[ClusterEntry]:
    """Iterate through ClusterEntries while providing some basic filtering options
    Args:
        start: UTC start time
        end: UTC end time
    Returns:
         yields a ClusterEntry for every iteration
    """

    def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
        if not e:
            e = datetime.utcnow()
        if not s:
            s = datetime.utcnow() - timedelta(days=365)
        for en in self.entries:
            en = ClusterEntry(en)
            if s < en.time < e:
                yield en

    for log_entry in filter_entries(start, end):
        yield log_entry

tail(self, pretty_print=True)

Tail and follow a log to console

Parameters:

Name Type Description Default
pretty_print Optional[bool]

Print the log entry in a nice tabular view

True

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
    """Tail and follow a log to console
    Args:
        pretty_print: Print the log entry in a nice tabular view
    Returns:
        None
    """
    visited = []
    start = datetime.utcnow() - timedelta(days=365)
    try:
        while True:
            end = datetime.utcnow()
            self.refresh()
            for entry in self.iter_entries(start=start, end=end):
                if entry.timestamp not in visited:
                    visited.append(entry.timestamp)
                    if not pretty_print:
                        print(json.dumps(json.loads(str(entry)), indent=1))
                    else:
                        status_table = [
                            ['Time', 'Node', 'Message'],
                            [entry.time, entry.node, entry.message]
                        ]
                        print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
                if len(visited) > 100:
                    visited = []
            start = datetime.utcnow() - timedelta(seconds=60)
            time.sleep(5)
    except KeyboardInterrupt:
        print(utilities.PrintDecorations.colorize('OK', 'green'))

InvalidZeekBrokerLogEntry

Thrown when a Zeek broker.log entry is improperly formatted

__init__(self, message) special

Parameters:

Name Type Description Default
message

A more specific error message

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
    """
    Args:
        message: A more specific error message
    """
    msg = f'Zeek broker log entry is invalid: {message}'
    super(InvalidZeekBrokerLogEntry, self).__init__(msg)

InvalidZeekClusterLogEntry

Thrown when a Zeek cluster.log entry is improperly formatted

__init__(self, message) special

Parameters:

Name Type Description Default
message

A more specific error message

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
    """
    Args:
        message: A more specific error message
    """
    msg = f'Zeek cluster log entry is invalid: {message}'
    super(InvalidZeekClusterLogEntry, self).__init__(msg)

InvalidZeekReporterLogEntry

Thrown when a Zeek reporter.log entry is improperly formatted

__init__(self, message) special

Parameters:

Name Type Description Default
message

A more specific error message

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
    """
    Args:
        message: A more specific error message
    """
    msg = f'Zeek reporter log entry is invalid: {message}'
    super(InvalidZeekReporterLogEntry, self).__init__(msg)

InvalidZeekStatusLogEntry

Thrown when a Zeek stats.log entry is improperly formatted

__init__(self, message) special

Parameters:

Name Type Description Default
message

A more specific error message

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
    """
    Args:
        message: A more specific error message
    """
    msg = f'Zeek status log entry is invalid: {message}'
    super(InvalidZeekStatusLogEntry, self).__init__(msg)

MetricsEntry

A single Filebeat metrics entry for a specific time-interval

__init__(self, entry) special

A metrics entry derived from the stats.log

Parameters:

Name Type Description Default
entry Dict

A dictionary containing a variety of analyzable fields within a line item metrics entry

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry: Dict):
    """
    A metrics entry derived from the stats.log

    Args:
        entry: A dictionary containing a variety of analyzable fields within a line item metrics entry
    """
    self.entry_raw = entry
    self.timestamp = entry.get('ts')
    self.time = parse_zeek_datetime(self.timestamp)
    self.peer = entry.get('peer')
    self.peers = [self.peer]
    self.memory = entry.get('mem', 0)
    self.packets_processed = entry.get('pkts_proc', 0)
    self.bytes_received = entry.get('bytes_recv', 0)
    self.packets_dropped = entry.get('pkts_dropped', 0)
    self.packets_link = entry.get('pkts_link', 0)
    self.packet_lag = entry.get('pkt_lag', 0)
    self.events_processed = entry.get('events_proc', 0)
    self.events_queued = entry.get('events_queued', 0)
    self.active_tcp_connections = entry.get('active_tcp_conns', 0)
    self.active_udp_connections = entry.get('active_udp_conns', 0)
    self.active_icmp_connections = entry.get('active_icmp_conns', 0)
    self.tcp_connections = entry.get('tcp_conns', 0)
    self.udp_connections = entry.get('udp_conns', 0)
    self.icmp_connections = entry.get('icmp_conns', 0)
    self.timers = entry.get('timers', 0)
    self.files = entry.get('files', 0)
    self.active_files = entry.get('active_files', 0)
    self.dns_requests = entry.get('dns_requests', 0)
    self.active_dns_requests = entry.get('active_dns_requests', 0)
    self.reassembly_tcp_size = entry.get('reassem_tcp_size', 0)
    self.reassembly_file_size = entry.get('reassem_file_size', 0)
    self.reassembly_fragment_size = entry.get('reassem_frag_size', 0)
    self.reassembly_unknown_size = entry.get('reassem_unknown_size', 0)
    self.packets_dropped_percentage = 0
    if self.packets_processed > 0:
        self.packets_dropped_percentage = round(self.packets_dropped / self.packets_processed, 2)

merge_metric_entry(self, metric_entry)

Merge another metrics entry into this one

Parameters:

Name Type Description Default
metric_entry MetricsEntry

The MetricsEntry you wish to merge in

required

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/zeek/logs.py
def merge_metric_entry(self, metric_entry: MetricsEntry) -> None:
    """Merge another metrics entry into this one
    Args:
        metric_entry: The MetricsEntry you wish to merge in
    Returns:
        None
    """
    self.peer = None
    self.peers.append(metric_entry.peer)
    self.memory = self.memory + metric_entry.memory
    self.packets_processed = self.packets_processed + metric_entry.packets_processed
    self.bytes_received = self.bytes_received + metric_entry.bytes_received
    self.packets_dropped = self.packets_dropped + metric_entry.packets_dropped
    self.packets_link = self.packets_link + metric_entry.packets_link
    self.packet_lag = self.packet_lag + metric_entry.packet_lag
    self.events_processed = self.events_processed + metric_entry.events_processed
    self.events_queued = self.events_queued + metric_entry.events_queued
    self.active_tcp_connections = self.active_tcp_connections + metric_entry.active_tcp_connections
    self.active_udp_connections = self.active_udp_connections + metric_entry.active_udp_connections
    self.active_icmp_connections = self.active_icmp_connections + metric_entry.active_icmp_connections
    self.tcp_connections = self.tcp_connections + metric_entry.tcp_connections
    self.udp_connections = self.udp_connections + metric_entry.udp_connections
    self.icmp_connections = self.icmp_connections + metric_entry.icmp_connections
    self.timers = self.timers + metric_entry.timers
    self.files = self.files + metric_entry.files
    self.active_files = self.active_files + metric_entry.active_files
    self.dns_requests = self.dns_requests + metric_entry.dns_requests
    self.active_dns_requests = self.active_dns_requests + metric_entry.active_dns_requests
    self.reassembly_tcp_size = self.reassembly_tcp_size + metric_entry.reassembly_tcp_size
    self.reassembly_file_size = self.reassembly_file_size + metric_entry.reassembly_file_size
    self.reassembly_fragment_size = self.reassembly_fragment_size + metric_entry.reassembly_fragment_size
    self.reassembly_unknown_size = self.reassembly_unknown_size + metric_entry.reassembly_unknown_size
    if self.packets_processed > 0:
        self.packets_dropped_percentage = round(self.packets_dropped / self.packets_processed, 6)

ReporterEntry

A single line item entry for Zeek's reporter.log

__init__(self, entry_raw) special

A single line item entry in the reporter.log

Parameters:

Name Type Description Default
entry_raw str

A JSON serializable string representing a single line item entry in the reporter.log

required
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry_raw: str):
    """
    A single line item entry in the reporter.log

    Args:
        entry_raw: A JSON serializable string representing a single line item entry in the reporter.log
    """
    self.entry_raw = entry_raw
    self.time = None
    self.timestamp = None
    self.log_level = None
    self.message = None
    self.location = None
    self._parse_entry()

ReporterLog

Provides an interface for working with Zeek's reporter.log

__init__(self, log_sample_size=500, include_archived_logs=False) special

Work with Zeek's reporter.log

Parameters:

Name Type Description Default
log_sample_size Optional[int]

The maximum number of log entries to load into memory

500
include_archived_logs Optional[bool]

If True, include gzipped archive logs content

False
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
    """Work with Zeek's reporter.log
    Args:
        log_sample_size: The maximum number of log entries to load into memory
        include_archived_logs: If True, include gzipped archive logs content
    """

    self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
    self.env_dict = utilities.get_environment_file_dict()
    self.zeek_home = self.env_dict.get('ZEEK_HOME')
    self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'reporter.log')

    logs.LogFile.__init__(self,
                          log_path=self.log_path,
                          log_sample_size=log_sample_size)
    if include_archived_logs:
        self.entries = ZeekLogsProxy('reporter.log', log_sample_size=log_sample_size).entries

iter_entries(self, start=None, end=None)

Iterate through ReporterEntries while providing some basic filtering options

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None

Returns:

Type Description
Generator[ReporterEntry]

yields a ReporterEntry for every iteration

Source code in dynamite_nsm/services/zeek/logs.py
def iter_entries(self, start: Optional[datetime] = None,
                 end: Optional[datetime] = None) -> Generator[ReporterEntry]:
    """Iterate through ReporterEntries while providing some basic filtering options
    Args:
        start: UTC start time
        end: UTC end time
    Returns:
         yields a ReporterEntry for every iteration
    """

    def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
        if not e:
            e = datetime.utcnow()
        if not s:
            s = datetime.utcnow() - timedelta(days=365)
        for en in self.entries:
            en = ReporterEntry(en)
            if s < en.time < e:
                yield en

    for log_entry in filter_entries(start, end):
        yield log_entry

tail(self, pretty_print=True)

Tail and follow a log to console

Parameters:

Name Type Description Default
pretty_print Optional[bool]

Print the log entry in a nice tabular view

True

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
    """Tail and follow a log to console
    Args:
        pretty_print: Print the log entry in a nice tabular view
    Returns:
        None
    """
    visited = []
    start = datetime.utcnow() - timedelta(days=365)
    try:
        while True:
            end = datetime.utcnow()
            self.refresh()
            for entry in self.iter_entries(start=start, end=end):
                if entry.timestamp not in visited:
                    visited.append(entry.timestamp)
                    if not pretty_print:
                        print(json.dumps(json.loads(str(entry)), indent=1))
                    else:
                        status_table = [
                            ['Time', 'Log Level', 'Location', 'Message'],
                            [entry.time, entry.log_level, entry.location, entry.message]
                        ]
                        print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
                if len(visited) > 100:
                    visited = []
            start = datetime.utcnow() - timedelta(seconds=60)
            time.sleep(5)
    except KeyboardInterrupt:
        print(utilities.PrintDecorations.colorize('OK', 'green'))

StatusLog

Provides an interface for working with Zeek's stats.log

__init__(self, log_sample_size=500, include_archived_logs=False) special

Work with Zeek's stats.log

Parameters:

Name Type Description Default
log_sample_size Optional[int]

The maximum number of log entries to load into memory

500
include_archived_logs Optional[bool]

If True, include gzipped archive logs content

False
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
    """Work with Zeek's stats.log
    Args:
        log_sample_size: The maximum number of log entries to load into memory
        include_archived_logs: If True, include gzipped archive logs content
    """

    self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
    self.env_dict = utilities.get_environment_file_dict()
    self.zeek_home = self.env_dict.get('ZEEK_HOME')
    self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'stats.log')

    logs.LogFile.__init__(self,
                          log_path=self.log_path,
                          log_sample_size=log_sample_size)
    if include_archived_logs:
        self.entries = ZeekLogsProxy('stats.log', log_sample_size=log_sample_size).entries

iter_aggregated_metrics(self, start=None, end=None, tolerance_seconds=60)

Aggregate peers and combine events within tolerance_seconds into the same entry.

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None
tolerance_seconds Optional[int]

Specifies the maximum time distance between entries to combine them

60

Returns:

Type Description
Generator[MetricsEntry]

yields a MetricsEntry for every iteration

Source code in dynamite_nsm/services/zeek/logs.py
def iter_aggregated_metrics(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
                            tolerance_seconds: Optional[int] = 60) -> Generator[MetricsEntry]:

    """Aggregate peers and combine events within tolerance_seconds into the same entry.

    Args:
        start: UTC start time
        end: UTC end time
        tolerance_seconds: Specifies the maximum time distance between entries to combine them
    Returns:
         yields a MetricsEntry for every iteration
    """
    sorted_by_time = [metric for metric in self.iter_metrics(start, end)]
    if not sorted_by_time:
        return
    sorted_by_time = sorted(sorted_by_time, key=lambda x: x.timestamp)
    start = sorted_by_time[0].time
    for name, group in itertools.groupby(
            sorted_by_time, lambda x: int((x.time - start).total_seconds() // tolerance_seconds + 1)):
        aggregated_entry = None
        for entry in group:
            if not aggregated_entry:
                aggregated_entry = entry
            else:
                aggregated_entry.merge_metric_entry(entry)
        yield aggregated_entry

iter_metrics(self, start=None, end=None)

Iterate through metrics entries individually. Metrics are given for each individual Zeek peer.

Parameters:

Name Type Description Default
start Optional[datetime]

UTC start time

None
end Optional[datetime]

UTC end time

None

Returns:

Type Description
Generator[MetricsEntry]

yields a MetricsEntry for every iteration

Source code in dynamite_nsm/services/zeek/logs.py
def iter_metrics(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> Generator[MetricsEntry]:
    """Iterate through metrics entries individually. Metrics are given for each individual Zeek peer.

    Args:
        start: UTC start time
        end: UTC end time
    Returns:
         yields a MetricsEntry for every iteration
    """

    def filter_metrics(s: Optional[datetime] = None, e: Optional[datetime] = None):
        if not e:
            e = datetime.utcnow()
        if not s:
            s = datetime.utcnow() - timedelta(minutes=60)
        for en in self.entries:
            en = MetricsEntry(json.loads(en))
            if s < en.time < e:
                yield en

    for log_entry in filter_metrics(start, end):
        yield log_entry

tail(self, pretty_print=True)

Tail and follow a log to console

Parameters:

Name Type Description Default
pretty_print Optional[bool]

Print the log entry in a nice tabular view

True

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
    """Tail and follow a log to console
    Args:
        pretty_print: Print the log entry in a nice tabular view
    Returns:
        None
    """
    visited = []
    start = datetime.utcnow() - timedelta(days=365)
    try:
        while True:
            end = datetime.utcnow()
            self.refresh()
            for metric in self.iter_aggregated_metrics(start=start, end=end):
                if metric.timestamp not in visited:
                    visited.append(metric.timestamp)
                    if not pretty_print:
                        print(json.dumps(json.loads(str(metric)), indent=1))
                    else:
                        status_table = [
                            ['Time', 'Memory', 'Timers', 'Packets on Link', 'Packets Processed', 'Packets Dropped',
                             'Peers'],
                            [metric.time, metric.memory, metric.timers, metric.packets_link,
                             metric.packets_processed, metric.packets_dropped, '\n'.join(metric.peers)]
                        ]
                        print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
                if len(visited) > 100:
                    visited = []
            start = datetime.utcnow() - timedelta(seconds=60)
            time.sleep(5)
    except KeyboardInterrupt:
        print(utilities.PrintDecorations.colorize('OK', 'green'))

ZeekLogsProxy

A convenience class providing accessibility to Zeek's archived logs; supports gzipped content

__init__(self, log_name, log_sample_size=1000) special

Access a log and all of its corresponding archived logs until log_sample_size is reached.

Parameters:

Name Type Description Default
log_name str

The name of the Zeek log to retrieve

required
log_sample_size Optional[int]

The max number of log entries to retrieve

1000
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_name: str, log_sample_size: Optional[int] = 1000):
    """Access a log and all of its corresponding archived logs until log_sample_size is reached.
    Args:
        log_name: The name of the Zeek log to retrieve
        log_sample_size: The max number of log entries to retrieve
    """
    self.entries = []
    self.log_name = log_name
    self.log_sample_size = log_sample_size
    self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
    self.env_dict = utilities.get_environment_file_dict()
    self.zeek_home = self.env_dict.get('ZEEK_HOME')
    self.current_log_path = os.path.join(self.zeek_home, 'logs', 'current', log_name)
    self.log_archive_directory = os.path.join(self.zeek_home, 'logs')
    self.load_all_logs()

load_all_logs(self)

Load all logs into memory up to self.log_sample_size

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/zeek/logs.py
def load_all_logs(self) -> None:
    """
    Load all logs into memory up to self.log_sample_size

    Returns:
        None
    """
    archive_directories = []
    sorted_log_paths = []
    for log_archive_directory in os.listdir(self.log_archive_directory):
        try:
            archive_directories.append(
                (log_archive_directory, datetime.strptime(log_archive_directory, '%Y-%m-%d')))
        except ValueError:
            pass
    sorted_archive_directories = sorted(archive_directories, key=lambda x: x[1])

    for archive_dir_name, _ in sorted_archive_directories:
        relevant_log_names = [fname
                              for fname in os.listdir(os.path.join(self.log_archive_directory, archive_dir_name))
                              if fname.startswith(self.log_name.replace('.log', '')) and fname.endswith('.gz')
                              ]
        for log_archive_file_name in relevant_log_names:
            log_rotate_time = log_archive_file_name.split('.')[1].split('-')[0]
            sorted_log_paths.append(
                (os.path.join(self.log_archive_directory, archive_dir_name, log_archive_file_name),
                 datetime.strptime(archive_dir_name + ' ' + log_rotate_time, '%Y-%m-%d %H:%M:%S'))
            )
        sorted_log_paths = sorted(sorted_log_paths, key=lambda x: x[1], reverse=True)

    current_log_file = logs.LogFile(log_path=self.current_log_path, log_sample_size=self.log_sample_size,
                                    gzip_decode=False)
    self.entries.extend(current_log_file.entries)
    for log_path, log_rotate_date in sorted_log_paths:
        archived_log_file = logs.LogFile(log_path, log_sample_size=self.log_sample_size, gzip_decode=True)
        remaining_entries_available = self.log_sample_size - len(self.entries)
        if remaining_entries_available > 0:
            self.entries.extend(archived_log_file.entries[0: remaining_entries_available])
        else:
            break