misc.py
Miscellaneous configuration objects for Suricata
To import...
from dynamite_nsm.services.base.config_objects.suricata import misc as suricata_config_misc
AfPacketInterface
__init__(self, interface_name, cluster_id=None, cluster_type=None, bpf_filter=None, threads=None)
special
Suricata AF_PACKET interface
Parameters:
Name | Type | Description | Default |
---|---|---|---|
interface_name |
str |
The name of a network interface to monitor |
required |
cluster_id |
Optional[int] |
A unique integer associated with this worker maps to af_packet_fanout_id |
None |
cluster_type |
Optional[str] |
The algorithm used to spread traffic between sockets. |
None |
bpf_filter |
Optional[str] |
A filter that can be used to drop packets before they are analyzed |
None |
threads |
Union[int, str] |
The number of threads dedicated to monitoring this network interface |
None |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, interface_name: str, cluster_id: Optional[int] = None, cluster_type: Optional[str] = None,
bpf_filter: Optional[str] = None,
threads: Union[int, str] = None):
"""Suricata AF_PACKET interface
Args:
interface_name: The name of a network interface to monitor
cluster_id: A unique integer associated with this worker maps to af_packet_fanout_id
cluster_type: The algorithm used to spread traffic between sockets.
bpf_filter: A filter that can be used to drop packets before they are analyzed
threads: The number of threads dedicated to monitoring this network interface
"""
self.interface = interface_name
self.cluster_id = cluster_id
if cluster_type:
self.cluster_type = cluster_type.replace('AF_Packet::', '')
if self.cluster_type in AF_PACKET_FANOUT_MODE_TO_CLUSTER_TYPE_MAP.keys():
self.cluster_type = AF_PACKET_FANOUT_MODE_TO_CLUSTER_TYPE_MAP.get(self.cluster_type)
else:
self.cluster_type = 'cluster_qm'
self.bpf_filter = bpf_filter
self.threads = threads
if not threads:
self.threads = 'auto'
get_raw(self)
Get a raw representation of this AfPacketInterface.
Returns:
Type | Description |
---|---|
Dict |
A dictionary that can be serialized to YAML then inserted into the |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get_raw(self) -> Dict:
"""Get a raw representation of this AfPacketInterface.
Returns:
A dictionary that can be serialized to YAML then inserted into the `suricata.yaml` file.
"""
orig_raw = {
'interface': self.interface,
'cluster-id': self.cluster_id,
'cluster-type': self.cluster_type,
'bpf-filter': self.bpf_filter,
'threads': self.threads
}
orig_raw = {k: v for k, v in orig_raw.items() if v is not None and v != ''}
return orig_raw
AfPacketInterfaces
__init__(self, interfaces=None)
special
A collection of AfPacketInterfaces.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
interfaces |
Optional[List[dynamite_nsm.services.base.config_objects.suricata.misc.AfPacketInterface]] |
A list of AfPacketInterface objects |
None |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, interfaces: Optional[List[AfPacketInterface]] = None):
"""A collection of AfPacketInterfaces.
Args:
interfaces: A list of AfPacketInterface objects
"""
self._idx = 0
self.interfaces = interfaces
if not self.interfaces:
self.interfaces = []
add(self, interface)
Add a new AfPacketInterface
Parameters:
Name | Type | Description | Default |
---|---|---|---|
interface |
AfPacketInterface |
An AfPacketInterface object |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def add(self, interface: AfPacketInterface) -> None:
"""Add a new AfPacketInterface
Args:
interface: An AfPacketInterface object
Returns:
None
"""
self.interfaces.append(interface)
get(self, interface_name)
Given the name of an interface retrieve the corresponding AfPacketInterface object
Parameters:
Name | Type | Description | Default |
---|---|---|---|
interface_name |
str |
The name of the network interface. |
required |
Returns:
Type | Description |
---|---|
Optional[dynamite_nsm.services.base.config_objects.suricata.misc.AfPacketInterface] |
An AfPacketInterface if found, otherwise |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get(self, interface_name: str) -> Optional[AfPacketInterface]:
"""Given the name of an interface retrieve the corresponding AfPacketInterface object
Args:
interface_name: The name of the network interface.
Returns:
An AfPacketInterface if found, otherwise `None`
"""
for interface in self.interfaces:
if interface.interface == interface_name:
return interface
return None
get_raw(self)
Get a raw representation of AfPacketInterfaces that can be serialized and inserted into suricata.yaml
file
Returns:
Type | Description |
---|---|
List[Dict] |
A list of dictionaries representing individual AfPacketInterface configurations |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get_raw(self) -> List[Dict]:
"""Get a raw representation of AfPacketInterfaces that can be serialized and inserted into `suricata.yaml` file
Returns:
A list of dictionaries representing individual AfPacketInterface configurations
"""
return [interface.get_raw() for interface in self.interfaces]
remove(self, interface_name)
Given the name of an interface delete it
Parameters:
Name | Type | Description | Default |
---|---|---|---|
interface_name |
str |
The name of the network interface. |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def remove(self, interface_name: str) -> None:
"""Given the name of an interface delete it
Args:
interface_name: The name of the network interface.
Returns:
None
"""
temp_interfaces = []
for interface in self.interfaces:
if interface.interface == interface_name:
continue
temp_interfaces.append(interface)
self.interfaces = temp_interfaces
PcapInterfaces
__init__(self, interface_names)
special
:param interface_names: A list of network interface names
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, interface_names: List[str]):
"""
:param interface_names: A list of network interface names
"""
self.interfaces = interface_names
Threading
__init__(self, management_cpu_set=None, receive_cpu_set=None, worker_cpu_set=None)
special
The threading configuration for Suricata
Parameters:
Name | Type | Description | Default |
---|---|---|---|
management_cpu_set |
Optional[Set] |
A set of integers representing CPU cores dedicated to management tasks |
None |
receive_cpu_set |
Optional[Set] |
A set of integers representing CPU cores dedicated to packet acquisition |
None |
worker_cpu_set |
Optional[Set] |
A set of integers representing CPU cores dedicated to analysis |
None |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, management_cpu_set: Optional[Set] = None, receive_cpu_set: Optional[Set] = None,
worker_cpu_set: Optional[Set] = None):
"""The threading configuration for Suricata
Args:
management_cpu_set: A set of integers representing CPU cores dedicated to management tasks
receive_cpu_set: A set of integers representing CPU cores dedicated to packet acquisition
worker_cpu_set: A set of integers representing CPU cores dedicated to analysis
"""
self.management_cpu_set = management_cpu_set
self.receive_cpu_set = receive_cpu_set
self.worker_cpu_set = worker_cpu_set
get_raw(self)
Get a raw representation of Threading that can be serialized and inserted into suricata.yaml
file
Returns:
Type | Description |
---|---|
Dict |
A dictionary containing the threading families |
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get_raw(self) -> Dict:
"""Get a raw representation of Threading that can be serialized and inserted into `suricata.yaml` file
Returns:
A dictionary containing the threading families
"""
thread_families = []
if self.management_cpu_set:
thread_families.append(
{
'management-cpu-set': {
'cpu': list(self.management_cpu_set)
}
}
)
if self.receive_cpu_set:
thread_families.append(
{
'receive-cpu-set': {
'cpu': list(self.receive_cpu_set)
}
}
)
if self.worker_cpu_set:
thread_families.append(
{
'worker-cpu-set': {
'cpu': list(self.worker_cpu_set),
'mode': 'exclusive',
'threads': len(self.worker_cpu_set)
}
}
)
return {
'set-cpu-affinity': True,
'cpu-affinity': thread_families
}