Skip to content

misc.py

Miscellaneous configuration objects for Suricata

To import...

from dynamite_nsm.services.base.config_objects.suricata import misc as suricata_config_misc

AfPacketInterface

__init__(self, interface_name, cluster_id=None, cluster_type=None, bpf_filter=None, threads=None) special

Suricata AF_PACKET interface

Parameters:

Name Type Description Default
interface_name str

The name of a network interface to monitor

required
cluster_id Optional[int]

A unique integer associated with this worker maps to af_packet_fanout_id

None
cluster_type Optional[str]

The algorithm used to spread traffic between sockets.

None
bpf_filter Optional[str]

A filter that can be used to drop packets before they are analyzed

None
threads Union[int, str]

The number of threads dedicated to monitoring this network interface

None
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, interface_name: str, cluster_id: Optional[int] = None, cluster_type: Optional[str] = None,
             bpf_filter: Optional[str] = None,
             threads: Union[int, str] = None):

    """Suricata AF_PACKET interface
    Args:
        interface_name: The name of a network interface to monitor
        cluster_id: A unique integer associated with this worker maps to af_packet_fanout_id
        cluster_type: The algorithm used to spread traffic between sockets.
        bpf_filter: A filter that can be used to drop packets before they are analyzed
        threads: The number of threads dedicated to monitoring this network interface
    """
    self.interface = interface_name
    self.cluster_id = cluster_id
    if cluster_type:
        self.cluster_type = cluster_type.replace('AF_Packet::', '')
        if self.cluster_type in AF_PACKET_FANOUT_MODE_TO_CLUSTER_TYPE_MAP.keys():
            self.cluster_type = AF_PACKET_FANOUT_MODE_TO_CLUSTER_TYPE_MAP.get(self.cluster_type)
    else:
        self.cluster_type = 'cluster_qm'

    self.bpf_filter = bpf_filter
    self.threads = threads
    if not threads:
        self.threads = 'auto'

get_raw(self)

Get a raw representation of this AfPacketInterface.

Returns:

Type Description
Dict

A dictionary that can be serialized to YAML then inserted into the suricata.yaml file.

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get_raw(self) -> Dict:
    """Get a raw representation of this AfPacketInterface.

    Returns:
        A dictionary that can be serialized to YAML then inserted into the `suricata.yaml` file.
    """
    orig_raw = {
        'interface': self.interface,
        'cluster-id': self.cluster_id,
        'cluster-type': self.cluster_type,
        'bpf-filter': self.bpf_filter,
        'threads': self.threads
    }
    orig_raw = {k: v for k, v in orig_raw.items() if v is not None and v != ''}
    return orig_raw

AfPacketInterfaces

__init__(self, interfaces=None) special

A collection of AfPacketInterfaces.

Parameters:

Name Type Description Default
interfaces Optional[List[dynamite_nsm.services.base.config_objects.suricata.misc.AfPacketInterface]]

A list of AfPacketInterface objects

None
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, interfaces: Optional[List[AfPacketInterface]] = None):
    """A collection of AfPacketInterfaces.
    Args:
        interfaces: A list of AfPacketInterface objects
    """
    self._idx = 0
    self.interfaces = interfaces
    if not self.interfaces:
        self.interfaces = []

add(self, interface)

Add a new AfPacketInterface

Parameters:

Name Type Description Default
interface AfPacketInterface

An AfPacketInterface object

required

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def add(self, interface: AfPacketInterface) -> None:
    """Add a new AfPacketInterface
    Args:
        interface: An AfPacketInterface object

    Returns:
        None
    """
    self.interfaces.append(interface)

get(self, interface_name)

Given the name of an interface retrieve the corresponding AfPacketInterface object

Parameters:

Name Type Description Default
interface_name str

The name of the network interface.

required

Returns:

Type Description
Optional[dynamite_nsm.services.base.config_objects.suricata.misc.AfPacketInterface]

An AfPacketInterface if found, otherwise None

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get(self, interface_name: str) -> Optional[AfPacketInterface]:
    """Given the name of an interface retrieve the corresponding AfPacketInterface object
    Args:
        interface_name: The name of the network interface.
    Returns:
        An AfPacketInterface if found, otherwise `None`
    """
    for interface in self.interfaces:
        if interface.interface == interface_name:
            return interface
    return None

get_raw(self)

Get a raw representation of AfPacketInterfaces that can be serialized and inserted into suricata.yaml file

Returns:

Type Description
List[Dict]

A list of dictionaries representing individual AfPacketInterface configurations

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get_raw(self) -> List[Dict]:
    """Get a raw representation of AfPacketInterfaces that can be serialized and inserted into `suricata.yaml` file
    Returns:
        A list of dictionaries representing individual AfPacketInterface configurations
    """
    return [interface.get_raw() for interface in self.interfaces]

remove(self, interface_name)

Given the name of an interface delete it

Parameters:

Name Type Description Default
interface_name str

The name of the network interface.

required

Returns:

Type Description
None

None

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def remove(self, interface_name: str) -> None:
    """Given the name of an interface delete it
    Args:
        interface_name: The name of the network interface.
    Returns:
        None
    """
    temp_interfaces = []
    for interface in self.interfaces:
        if interface.interface == interface_name:
            continue
        temp_interfaces.append(interface)
    self.interfaces = temp_interfaces

PcapInterfaces

__init__(self, interface_names) special

:param interface_names: A list of network interface names

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, interface_names: List[str]):
    """
    :param interface_names: A list of network interface names
    """
    self.interfaces = interface_names

Threading

__init__(self, management_cpu_set=None, receive_cpu_set=None, worker_cpu_set=None) special

The threading configuration for Suricata

Parameters:

Name Type Description Default
management_cpu_set Optional[Set]

A set of integers representing CPU cores dedicated to management tasks

None
receive_cpu_set Optional[Set]

A set of integers representing CPU cores dedicated to packet acquisition

None
worker_cpu_set Optional[Set]

A set of integers representing CPU cores dedicated to analysis

None
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def __init__(self, management_cpu_set: Optional[Set] = None, receive_cpu_set: Optional[Set] = None,
             worker_cpu_set: Optional[Set] = None):

    """The threading configuration for Suricata
    Args:
        management_cpu_set: A set of integers representing CPU cores dedicated to management tasks
        receive_cpu_set: A set of integers representing CPU cores dedicated to packet acquisition
        worker_cpu_set: A set of integers representing CPU cores dedicated to analysis
    """

    self.management_cpu_set = management_cpu_set
    self.receive_cpu_set = receive_cpu_set
    self.worker_cpu_set = worker_cpu_set

get_raw(self)

Get a raw representation of Threading that can be serialized and inserted into suricata.yaml file

Returns:

Type Description
Dict

A dictionary containing the threading families

Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
def get_raw(self) -> Dict:
    """Get a raw representation of Threading that can be serialized and inserted into `suricata.yaml` file
    Returns:
        A dictionary containing the threading families
    """
    thread_families = []
    if self.management_cpu_set:
        thread_families.append(
            {
                'management-cpu-set': {
                    'cpu': list(self.management_cpu_set)
                }
            }
        )
    if self.receive_cpu_set:
        thread_families.append(
            {
                'receive-cpu-set': {
                    'cpu': list(self.receive_cpu_set)
                }
            }
        )
    if self.worker_cpu_set:
        thread_families.append(
            {
                'worker-cpu-set': {
                    'cpu': list(self.worker_cpu_set),
                    'mode': 'exclusive',
                    'threads': len(self.worker_cpu_set)
                }
            }
        )
    return {
        'set-cpu-affinity': True,
        'cpu-affinity': thread_families
    }