logs
Work with a variety of Zeek logs useful for troubleshooting and performance analysis.
Currently, supports:
- broker.log
- cluster.log
- reporter.log
- stats.log
To import...
from dynamite_nsm.services.zeek import logs as zeek_logs
BrokerEntry
A single line item entry for Zeek's broker.log
__init__(self, entry_raw)
special
A single line item entry in the broker.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry_raw |
str |
A JSON serializable string representing a single line item entry in the broker.log |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry_raw: str):
"""
A single line item entry in the broker.log
Args:
entry_raw: A JSON serializable string representing a single line item entry in the broker.log
"""
self.entry_raw = entry_raw
self.time = None
self.timestamp = None
self.category = None
self.event = None
self.peer_address = None
self.peer_port = None
self.message = None
self._parse_entry()
BrokerLog
Provides an interface for working with Zeek's broker.log
__init__(self, log_sample_size=500, include_archived_logs=False)
special
Work with Zeek's broker.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_sample_size |
Optional[int] |
The maximum number of log entries to load into memory |
500 |
include_archived_logs |
Optional[bool] |
If True, include gzipped archive logs |
False |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
"""
Work with Zeek's broker.log
Args:
log_sample_size: The maximum number of log entries to load into memory
include_archived_logs: If True, include gzipped archive logs
"""
self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
self.env_dict = utilities.get_environment_file_dict()
self.zeek_home = self.env_dict.get('ZEEK_HOME')
self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'broker.log')
logs.LogFile.__init__(self,
log_path=self.log_path,
log_sample_size=log_sample_size)
if include_archived_logs:
self.entries = ZeekLogsProxy('broker.log', log_sample_size=log_sample_size).entries
iter_entries(self, start=None, end=None)
Iterate through BrokerEntries while providing some basic filtering options
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
Optional[datetime] |
UTC start time |
None |
end |
Optional[datetime] |
UTC end time |
None |
Returns:
Type | Description |
---|---|
Generator[BrokerEntry] |
yields a BrokerEntry for every iteration |
Source code in dynamite_nsm/services/zeek/logs.py
def iter_entries(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> Generator[BrokerEntry]:
"""Iterate through BrokerEntries while providing some basic filtering options
Args:
start: UTC start time
end: UTC end time
Returns:
yields a BrokerEntry for every iteration
"""
def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
if not e:
e = datetime.utcnow()
if not s:
s = datetime.utcnow() - timedelta(days=365)
for en in self.entries:
en = BrokerEntry(en)
if s < en.time < e:
yield en
for log_entry in filter_entries(start, end):
yield log_entry
tail(self, pretty_print=True)
Tail and follow a log to console
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pretty_print |
Optional[bool] |
Print the log entry in a nice tabular view |
True |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
"""Tail and follow a log to console
Args:
pretty_print: Print the log entry in a nice tabular view
Returns:
None
"""
visited = []
start = datetime.utcnow() - timedelta(days=365)
try:
while True:
end = datetime.utcnow()
self.refresh()
for entry in self.iter_entries(start=start, end=end):
if entry.timestamp not in visited:
visited.append(entry.timestamp)
if not pretty_print:
print(json.dumps(json.loads(str(entry)), indent=1))
else:
status_table = [
['Time', 'Category', 'Peer', 'Message'],
[entry.time, entry.category, f'{entry.peer_address}:{entry.peer_port}', entry.message]
]
print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
if len(visited) > 100:
visited = []
start = datetime.utcnow() - timedelta(seconds=60)
time.sleep(5)
except KeyboardInterrupt:
print(utilities.PrintDecorations.colorize('OK', 'green'))
ClusterEntry
A single line item entry for Zeek's cluster.log
__init__(self, entry_raw)
special
A single line item entry in the cluster.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry_raw |
str |
A JSON serializable string representing a single line item entry in the cluster.log |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry_raw: str):
"""
A single line item entry in the cluster.log
Args:
entry_raw: A JSON serializable string representing a single line item entry in the cluster.log
"""
self.entry_raw = entry_raw
self.time = None
self.timestamp = None
self.message = None
self._parse_entry()
ClusterLog
Provides an interface for working with Zeek's cluster.log
__init__(self, log_sample_size=500, include_archived_logs=False)
special
Work with Zeek's cluster.log log_sample_size: The maximum number of log entries to load into memory include_archived_logs: If True, include gzipped archive logs
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
"""Work with Zeek's cluster.log
log_sample_size: The maximum number of log entries to load into memory
include_archived_logs: If True, include gzipped archive logs
"""
self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
self.env_dict = utilities.get_environment_file_dict()
self.zeek_home = self.env_dict.get('ZEEK_HOME')
self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'cluster.log')
logs.LogFile.__init__(self,
log_path=self.log_path,
log_sample_size=log_sample_size)
if include_archived_logs:
self.entries = ZeekLogsProxy('cluster.log', log_sample_size=log_sample_size).entries
iter_entries(self, start=None, end=None)
Iterate through ClusterEntries while providing some basic filtering options
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
Optional[datetime] |
UTC start time |
None |
end |
Optional[datetime] |
UTC end time |
None |
Returns:
Type | Description |
---|---|
Generator[ClusterEntry] |
yields a ClusterEntry for every iteration |
Source code in dynamite_nsm/services/zeek/logs.py
def iter_entries(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> Generator[ClusterEntry]:
"""Iterate through ClusterEntries while providing some basic filtering options
Args:
start: UTC start time
end: UTC end time
Returns:
yields a ClusterEntry for every iteration
"""
def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
if not e:
e = datetime.utcnow()
if not s:
s = datetime.utcnow() - timedelta(days=365)
for en in self.entries:
en = ClusterEntry(en)
if s < en.time < e:
yield en
for log_entry in filter_entries(start, end):
yield log_entry
tail(self, pretty_print=True)
Tail and follow a log to console
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pretty_print |
Optional[bool] |
Print the log entry in a nice tabular view |
True |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
"""Tail and follow a log to console
Args:
pretty_print: Print the log entry in a nice tabular view
Returns:
None
"""
visited = []
start = datetime.utcnow() - timedelta(days=365)
try:
while True:
end = datetime.utcnow()
self.refresh()
for entry in self.iter_entries(start=start, end=end):
if entry.timestamp not in visited:
visited.append(entry.timestamp)
if not pretty_print:
print(json.dumps(json.loads(str(entry)), indent=1))
else:
status_table = [
['Time', 'Node', 'Message'],
[entry.time, entry.node, entry.message]
]
print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
if len(visited) > 100:
visited = []
start = datetime.utcnow() - timedelta(seconds=60)
time.sleep(5)
except KeyboardInterrupt:
print(utilities.PrintDecorations.colorize('OK', 'green'))
InvalidZeekBrokerLogEntry
Thrown when a Zeek broker.log entry is improperly formatted
__init__(self, message)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
|
A more specific error message |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
"""
Args:
message: A more specific error message
"""
msg = f'Zeek broker log entry is invalid: {message}'
super(InvalidZeekBrokerLogEntry, self).__init__(msg)
InvalidZeekClusterLogEntry
Thrown when a Zeek cluster.log entry is improperly formatted
__init__(self, message)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
|
A more specific error message |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
"""
Args:
message: A more specific error message
"""
msg = f'Zeek cluster log entry is invalid: {message}'
super(InvalidZeekClusterLogEntry, self).__init__(msg)
InvalidZeekReporterLogEntry
Thrown when a Zeek reporter.log entry is improperly formatted
__init__(self, message)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
|
A more specific error message |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
"""
Args:
message: A more specific error message
"""
msg = f'Zeek reporter log entry is invalid: {message}'
super(InvalidZeekReporterLogEntry, self).__init__(msg)
InvalidZeekStatusLogEntry
Thrown when a Zeek stats.log entry is improperly formatted
__init__(self, message)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
|
A more specific error message |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, message):
"""
Args:
message: A more specific error message
"""
msg = f'Zeek status log entry is invalid: {message}'
super(InvalidZeekStatusLogEntry, self).__init__(msg)
MetricsEntry
A single Filebeat metrics entry for a specific time-interval
__init__(self, entry)
special
A metrics entry derived from the stats.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry |
Dict |
A dictionary containing a variety of analyzable fields within a line item metrics entry |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry: Dict):
"""
A metrics entry derived from the stats.log
Args:
entry: A dictionary containing a variety of analyzable fields within a line item metrics entry
"""
self.entry_raw = entry
self.timestamp = entry.get('ts')
self.time = parse_zeek_datetime(self.timestamp)
self.peer = entry.get('peer')
self.peers = [self.peer]
self.memory = entry.get('mem', 0)
self.packets_processed = entry.get('pkts_proc', 0)
self.bytes_received = entry.get('bytes_recv', 0)
self.packets_dropped = entry.get('pkts_dropped', 0)
self.packets_link = entry.get('pkts_link', 0)
self.packet_lag = entry.get('pkt_lag', 0)
self.events_processed = entry.get('events_proc', 0)
self.events_queued = entry.get('events_queued', 0)
self.active_tcp_connections = entry.get('active_tcp_conns', 0)
self.active_udp_connections = entry.get('active_udp_conns', 0)
self.active_icmp_connections = entry.get('active_icmp_conns', 0)
self.tcp_connections = entry.get('tcp_conns', 0)
self.udp_connections = entry.get('udp_conns', 0)
self.icmp_connections = entry.get('icmp_conns', 0)
self.timers = entry.get('timers', 0)
self.files = entry.get('files', 0)
self.active_files = entry.get('active_files', 0)
self.dns_requests = entry.get('dns_requests', 0)
self.active_dns_requests = entry.get('active_dns_requests', 0)
self.reassembly_tcp_size = entry.get('reassem_tcp_size', 0)
self.reassembly_file_size = entry.get('reassem_file_size', 0)
self.reassembly_fragment_size = entry.get('reassem_frag_size', 0)
self.reassembly_unknown_size = entry.get('reassem_unknown_size', 0)
self.packets_dropped_percentage = 0
if self.packets_link > 0:
self.packets_dropped_percentage = round(self.packets_dropped / self.packets_link, 2)
merge_metric_entry(self, metric_entry)
Merge another metrics entry into this one
Parameters:
Name | Type | Description | Default |
---|---|---|---|
metric_entry |
MetricsEntry |
The MetricsEntry you wish to merge in |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/zeek/logs.py
def merge_metric_entry(self, metric_entry: MetricsEntry) -> None:
"""Merge another metrics entry into this one
Args:
metric_entry: The MetricsEntry you wish to merge in
Returns:
None
"""
self.peer = None
self.peers.append(metric_entry.peer)
self.memory = self.memory + metric_entry.memory
self.packets_processed = self.packets_processed + metric_entry.packets_processed
self.bytes_received = self.bytes_received + metric_entry.bytes_received
self.packets_dropped = self.packets_dropped + metric_entry.packets_dropped
self.packets_link = self.packets_link + metric_entry.packets_link
self.packet_lag = self.packet_lag + metric_entry.packet_lag
self.events_processed = self.events_processed + metric_entry.events_processed
self.events_queued = self.events_queued + metric_entry.events_queued
self.active_tcp_connections = self.active_tcp_connections + metric_entry.active_tcp_connections
self.active_udp_connections = self.active_udp_connections + metric_entry.active_udp_connections
self.active_icmp_connections = self.active_icmp_connections + metric_entry.active_icmp_connections
self.tcp_connections = self.tcp_connections + metric_entry.tcp_connections
self.udp_connections = self.udp_connections + metric_entry.udp_connections
self.icmp_connections = self.icmp_connections + metric_entry.icmp_connections
self.timers = self.timers + metric_entry.timers
self.files = self.files + metric_entry.files
self.active_files = self.active_files + metric_entry.active_files
self.dns_requests = self.dns_requests + metric_entry.dns_requests
self.active_dns_requests = self.active_dns_requests + metric_entry.active_dns_requests
self.reassembly_tcp_size = self.reassembly_tcp_size + metric_entry.reassembly_tcp_size
self.reassembly_file_size = self.reassembly_file_size + metric_entry.reassembly_file_size
self.reassembly_fragment_size = self.reassembly_fragment_size + metric_entry.reassembly_fragment_size
self.reassembly_unknown_size = self.reassembly_unknown_size + metric_entry.reassembly_unknown_size
if self.packets_processed > 0:
self.packets_dropped_percentage = round(self.packets_dropped / self.packets_link, 6)
ReporterEntry
A single line item entry for Zeek's reporter.log
__init__(self, entry_raw)
special
A single line item entry in the reporter.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry_raw |
str |
A JSON serializable string representing a single line item entry in the reporter.log |
required |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, entry_raw: str):
"""
A single line item entry in the reporter.log
Args:
entry_raw: A JSON serializable string representing a single line item entry in the reporter.log
"""
self.entry_raw = entry_raw
self.time = None
self.timestamp = None
self.log_level = None
self.message = None
self.location = None
self._parse_entry()
ReporterLog
Provides an interface for working with Zeek's reporter.log
__init__(self, log_sample_size=500, include_archived_logs=False)
special
Work with Zeek's reporter.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_sample_size |
Optional[int] |
The maximum number of log entries to load into memory |
500 |
include_archived_logs |
Optional[bool] |
If True, include gzipped archive logs content |
False |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
"""Work with Zeek's reporter.log
Args:
log_sample_size: The maximum number of log entries to load into memory
include_archived_logs: If True, include gzipped archive logs content
"""
self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
self.env_dict = utilities.get_environment_file_dict()
self.zeek_home = self.env_dict.get('ZEEK_HOME')
self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'reporter.log')
logs.LogFile.__init__(self,
log_path=self.log_path,
log_sample_size=log_sample_size)
if include_archived_logs:
self.entries = ZeekLogsProxy('reporter.log', log_sample_size=log_sample_size).entries
iter_entries(self, start=None, end=None)
Iterate through ReporterEntries while providing some basic filtering options
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
Optional[datetime] |
UTC start time |
None |
end |
Optional[datetime] |
UTC end time |
None |
Returns:
Type | Description |
---|---|
Generator[ReporterEntry] |
yields a ReporterEntry for every iteration |
Source code in dynamite_nsm/services/zeek/logs.py
def iter_entries(self, start: Optional[datetime] = None,
end: Optional[datetime] = None) -> Generator[ReporterEntry]:
"""Iterate through ReporterEntries while providing some basic filtering options
Args:
start: UTC start time
end: UTC end time
Returns:
yields a ReporterEntry for every iteration
"""
def filter_entries(s: Optional[datetime], e: Optional[datetime] = None):
if not e:
e = datetime.utcnow()
if not s:
s = datetime.utcnow() - timedelta(days=365)
for en in self.entries:
en = ReporterEntry(en)
if s < en.time < e:
yield en
for log_entry in filter_entries(start, end):
yield log_entry
tail(self, pretty_print=True)
Tail and follow a log to console
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pretty_print |
Optional[bool] |
Print the log entry in a nice tabular view |
True |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
"""Tail and follow a log to console
Args:
pretty_print: Print the log entry in a nice tabular view
Returns:
None
"""
visited = []
start = datetime.utcnow() - timedelta(days=365)
try:
while True:
end = datetime.utcnow()
self.refresh()
for entry in self.iter_entries(start=start, end=end):
if entry.timestamp not in visited:
visited.append(entry.timestamp)
if not pretty_print:
print(json.dumps(json.loads(str(entry)), indent=1))
else:
status_table = [
['Time', 'Log Level', 'Location', 'Message'],
[entry.time, entry.log_level, entry.location, entry.message]
]
print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
if len(visited) > 100:
visited = []
start = datetime.utcnow() - timedelta(seconds=60)
time.sleep(5)
except KeyboardInterrupt:
print(utilities.PrintDecorations.colorize('OK', 'green'))
StatusLog
Provides an interface for working with Zeek's stats.log
__init__(self, log_sample_size=500, include_archived_logs=False)
special
Work with Zeek's stats.log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_sample_size |
Optional[int] |
The maximum number of log entries to load into memory |
500 |
include_archived_logs |
Optional[bool] |
If True, include gzipped archive logs content |
False |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_sample_size: Optional[int] = 500, include_archived_logs: Optional[bool] = False):
"""Work with Zeek's stats.log
Args:
log_sample_size: The maximum number of log entries to load into memory
include_archived_logs: If True, include gzipped archive logs content
"""
self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
self.env_dict = utilities.get_environment_file_dict()
self.zeek_home = self.env_dict.get('ZEEK_HOME')
self.log_path = os.path.join(self.zeek_home, 'logs', 'current', 'stats.log')
logs.LogFile.__init__(self,
log_path=self.log_path,
log_sample_size=log_sample_size)
if include_archived_logs:
self.entries = ZeekLogsProxy('stats.log', log_sample_size=log_sample_size).entries
iter_aggregated_metrics(self, start=None, end=None, tolerance_seconds=60)
Aggregate peers and combine events within tolerance_seconds into the same entry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
Optional[datetime] |
UTC start time |
None |
end |
Optional[datetime] |
UTC end time |
None |
tolerance_seconds |
Optional[int] |
Specifies the maximum time distance between entries to combine them |
60 |
Returns:
Type | Description |
---|---|
Generator[MetricsEntry] |
yields a MetricsEntry for every iteration |
Source code in dynamite_nsm/services/zeek/logs.py
def iter_aggregated_metrics(self, start: Optional[datetime] = None, end: Optional[datetime] = None,
tolerance_seconds: Optional[int] = 60) -> Generator[MetricsEntry]:
"""Aggregate peers and combine events within tolerance_seconds into the same entry.
Args:
start: UTC start time
end: UTC end time
tolerance_seconds: Specifies the maximum time distance between entries to combine them
Returns:
yields a MetricsEntry for every iteration
"""
sorted_by_time = [metric for metric in self.iter_metrics(start, end)]
if not sorted_by_time:
return
sorted_by_time = sorted(sorted_by_time, key=lambda x: x.timestamp)
start = sorted_by_time[0].time
for name, group in itertools.groupby(
sorted_by_time, lambda x: int((x.time - start).total_seconds() // tolerance_seconds + 1)):
aggregated_entry = None
for entry in group:
if not aggregated_entry:
aggregated_entry = entry
else:
aggregated_entry.merge_metric_entry(entry)
yield aggregated_entry
iter_metrics(self, start=None, end=None)
Iterate through metrics entries individually. Metrics are given for each individual Zeek peer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
Optional[datetime] |
UTC start time |
None |
end |
Optional[datetime] |
UTC end time |
None |
Returns:
Type | Description |
---|---|
Generator[MetricsEntry] |
yields a MetricsEntry for every iteration |
Source code in dynamite_nsm/services/zeek/logs.py
def iter_metrics(self, start: Optional[datetime] = None, end: Optional[datetime] = None) -> Generator[MetricsEntry]:
"""Iterate through metrics entries individually. Metrics are given for each individual Zeek peer.
Args:
start: UTC start time
end: UTC end time
Returns:
yields a MetricsEntry for every iteration
"""
def filter_metrics(s: Optional[datetime] = None, e: Optional[datetime] = None):
if not e:
e = datetime.utcnow()
if not s:
s = datetime.utcnow() - timedelta(minutes=60)
for en in self.entries:
en = MetricsEntry(json.loads(en))
if s < en.time < e:
yield en
for log_entry in filter_metrics(start, end):
yield log_entry
tail(self, pretty_print=True)
Tail and follow a log to console
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pretty_print |
Optional[bool] |
Print the log entry in a nice tabular view |
True |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/zeek/logs.py
def tail(self, pretty_print: Optional[bool] = True) -> None:
"""Tail and follow a log to console
Args:
pretty_print: Print the log entry in a nice tabular view
Returns:
None
"""
visited = []
start = datetime.utcnow() - timedelta(days=365)
try:
while True:
end = datetime.utcnow()
self.refresh()
for metric in self.iter_aggregated_metrics(start=start, end=end):
if metric.timestamp not in visited:
visited.append(metric.timestamp)
if not pretty_print:
print(json.dumps(json.loads(str(metric)), indent=1))
else:
status_table = [
['Time', 'Memory', 'Timers', 'Packets on Link', 'Packets Processed', 'Packets Dropped',
'Peers'],
[metric.time, metric.memory, metric.timers, metric.packets_link,
metric.packets_processed, metric.packets_dropped, '\n'.join(metric.peers)]
]
print(tabulate.tabulate(status_table, tablefmt='fancy_grid'))
if len(visited) > 100:
visited = []
start = datetime.utcnow() - timedelta(seconds=60)
time.sleep(5)
except KeyboardInterrupt:
print(utilities.PrintDecorations.colorize('OK', 'green'))
ZeekLogsProxy
A convenience class providing accessibility to Zeek's archived logs; supports gzipped content
__init__(self, log_name, log_sample_size=1000)
special
Access a log and all of its corresponding archived logs until log_sample_size is reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_name |
str |
The name of the Zeek log to retrieve |
required |
log_sample_size |
Optional[int] |
The max number of log entries to retrieve |
1000 |
Source code in dynamite_nsm/services/zeek/logs.py
def __init__(self, log_name: str, log_sample_size: Optional[int] = 1000):
"""Access a log and all of its corresponding archived logs until log_sample_size is reached.
Args:
log_name: The name of the Zeek log to retrieve
log_sample_size: The max number of log entries to retrieve
"""
self.entries = []
self.log_name = log_name
self.log_sample_size = log_sample_size
self.env_file = os.path.join(const.CONFIG_PATH, 'environment')
self.env_dict = utilities.get_environment_file_dict()
self.zeek_home = self.env_dict.get('ZEEK_HOME')
self.current_log_path = os.path.join(self.zeek_home, 'logs', 'current', log_name)
self.log_archive_directory = os.path.join(self.zeek_home, 'logs')
self.load_all_logs()
load_all_logs(self)
Load all logs into memory up to self.log_sample_size
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/zeek/logs.py
def load_all_logs(self) -> None:
"""
Load all logs into memory up to self.log_sample_size
Returns:
None
"""
archive_directories = []
sorted_log_paths = []
for log_archive_directory in os.listdir(self.log_archive_directory):
try:
archive_directories.append(
(log_archive_directory, datetime.strptime(log_archive_directory, '%Y-%m-%d')))
except ValueError:
pass
sorted_archive_directories = sorted(archive_directories, key=lambda x: x[1])
for archive_dir_name, _ in sorted_archive_directories:
relevant_log_names = [fname
for fname in os.listdir(os.path.join(self.log_archive_directory, archive_dir_name))
if fname.startswith(self.log_name.replace('.log', '')) and fname.endswith('.gz')
]
for log_archive_file_name in relevant_log_names:
log_rotate_time = log_archive_file_name.split('.')[1].split('-')[0]
sorted_log_paths.append(
(os.path.join(self.log_archive_directory, archive_dir_name, log_archive_file_name),
datetime.strptime(archive_dir_name + ' ' + log_rotate_time, '%Y-%m-%d %H:%M:%S'))
)
sorted_log_paths = sorted(sorted_log_paths, key=lambda x: x[1], reverse=True)
current_log_file = logs.LogFile(log_path=self.current_log_path, log_sample_size=self.log_sample_size,
gzip_decode=False)
self.entries.extend(current_log_file.entries)
for log_path, log_rotate_date in sorted_log_paths:
archived_log_file = logs.LogFile(log_path, log_sample_size=self.log_sample_size, gzip_decode=True)
remaining_entries_available = self.log_sample_size - len(self.entries)
if remaining_entries_available > 0:
self.entries.extend(archived_log_file.entries[0: remaining_entries_available])
else:
break