misc.py
Miscellaneous configuration objects for Suricata
To import...
from dynamite_nsm.services.base.config_objects.suricata import misc as suricata_config_misc
        AfPacketInterface
    
__init__(self, interface_name, cluster_id=None, cluster_type=None, bpf_filter=None, threads=None)
  
      special
  
    Suricata AF_PACKET interface
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| interface_name | str | The name of a network interface to monitor | required | 
| cluster_id | Optional[int] | A unique integer associated with this worker maps to af_packet_fanout_id | None | 
| cluster_type | Optional[str] | The algorithm used to spread traffic between sockets. | None | 
| bpf_filter | Optional[str] | A filter that can be used to drop packets before they are analyzed | None | 
| threads | Union[int, str] | The number of threads dedicated to monitoring this network interface | None | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def __init__(self, interface_name: str, cluster_id: Optional[int] = None, cluster_type: Optional[str] = None,
             bpf_filter: Optional[str] = None,
             threads: Union[int, str] = None):
    """Suricata AF_PACKET interface
    Args:
        interface_name: The name of a network interface to monitor
        cluster_id: A unique integer associated with this worker maps to af_packet_fanout_id
        cluster_type: The algorithm used to spread traffic between sockets.
        bpf_filter: A filter that can be used to drop packets before they are analyzed
        threads: The number of threads dedicated to monitoring this network interface
    """
    self.interface = interface_name
    self.cluster_id = cluster_id
    if cluster_type:
        self.cluster_type = cluster_type.replace('AF_Packet::', '')
        if self.cluster_type in AF_PACKET_FANOUT_MODE_TO_CLUSTER_TYPE_MAP.keys():
            self.cluster_type = AF_PACKET_FANOUT_MODE_TO_CLUSTER_TYPE_MAP.get(self.cluster_type)
    else:
        self.cluster_type = 'cluster_qm'
    self.bpf_filter = bpf_filter
    self.threads = threads
    if not threads:
        self.threads = 'auto'
get_raw(self)
    Get a raw representation of this AfPacketInterface.
Returns:
| Type | Description | 
|---|---|
| Dict | A dictionary that can be serialized to YAML then inserted into the  | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def get_raw(self) -> Dict:
    """Get a raw representation of this AfPacketInterface.
    Returns:
        A dictionary that can be serialized to YAML then inserted into the `suricata.yaml` file.
    """
    orig_raw = {
        'interface': self.interface,
        'cluster-id': self.cluster_id,
        'cluster-type': self.cluster_type,
        'bpf-filter': self.bpf_filter,
        'threads': self.threads
    }
    orig_raw = {k: v for k, v in orig_raw.items() if v is not None and v != ''}
    return orig_raw
        AfPacketInterfaces
    
__init__(self, interfaces=None)
  
      special
  
    A collection of AfPacketInterfaces.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| interfaces | Optional[List[dynamite_nsm.services.base.config_objects.suricata.misc.AfPacketInterface]] | A list of AfPacketInterface objects | None | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def __init__(self, interfaces: Optional[List[AfPacketInterface]] = None):
    """A collection of AfPacketInterfaces.
    Args:
        interfaces: A list of AfPacketInterface objects
    """
    self._idx = 0
    self.interfaces = interfaces
    if not self.interfaces:
        self.interfaces = []
add(self, interface)
    Add a new AfPacketInterface
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| interface | AfPacketInterface | An AfPacketInterface object | required | 
Returns:
| Type | Description | 
|---|---|
| None | None | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def add(self, interface: AfPacketInterface) -> None:
    """Add a new AfPacketInterface
    Args:
        interface: An AfPacketInterface object
    Returns:
        None
    """
    self.interfaces.append(interface)
get(self, interface_name)
    Given the name of an interface retrieve the corresponding AfPacketInterface object
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| interface_name | str | The name of the network interface. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[dynamite_nsm.services.base.config_objects.suricata.misc.AfPacketInterface] | An AfPacketInterface if found, otherwise  | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def get(self, interface_name: str) -> Optional[AfPacketInterface]:
    """Given the name of an interface retrieve the corresponding AfPacketInterface object
    Args:
        interface_name: The name of the network interface.
    Returns:
        An AfPacketInterface if found, otherwise `None`
    """
    for interface in self.interfaces:
        if interface.interface == interface_name:
            return interface
    return None
get_raw(self)
    Get a raw representation of AfPacketInterfaces that can be serialized and inserted into suricata.yaml file
Returns:
| Type | Description | 
|---|---|
| List[Dict] | A list of dictionaries representing individual AfPacketInterface configurations | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def get_raw(self) -> List[Dict]:
    """Get a raw representation of AfPacketInterfaces that can be serialized and inserted into `suricata.yaml` file
    Returns:
        A list of dictionaries representing individual AfPacketInterface configurations
    """
    return [interface.get_raw() for interface in self.interfaces]
remove(self, interface_name)
    Given the name of an interface delete it
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| interface_name | str | The name of the network interface. | required | 
Returns:
| Type | Description | 
|---|---|
| None | None | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def remove(self, interface_name: str) -> None:
    """Given the name of an interface delete it
    Args:
        interface_name: The name of the network interface.
    Returns:
        None
    """
    temp_interfaces = []
    for interface in self.interfaces:
        if interface.interface == interface_name:
            continue
        temp_interfaces.append(interface)
    self.interfaces = temp_interfaces
        PcapInterfaces
    
__init__(self, interface_names)
  
      special
  
    :param interface_names: A list of network interface names
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def __init__(self, interface_names: List[str]):
    """
    :param interface_names: A list of network interface names
    """
    self.interfaces = interface_names
        Threading
    
__init__(self, management_cpu_set=None, receive_cpu_set=None, worker_cpu_set=None)
  
      special
  
    The threading configuration for Suricata
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| management_cpu_set | Optional[Set] | A set of integers representing CPU cores dedicated to management tasks | None | 
| receive_cpu_set | Optional[Set] | A set of integers representing CPU cores dedicated to packet acquisition | None | 
| worker_cpu_set | Optional[Set] | A set of integers representing CPU cores dedicated to analysis | None | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def __init__(self, management_cpu_set: Optional[Set] = None, receive_cpu_set: Optional[Set] = None,
             worker_cpu_set: Optional[Set] = None):
    """The threading configuration for Suricata
    Args:
        management_cpu_set: A set of integers representing CPU cores dedicated to management tasks
        receive_cpu_set: A set of integers representing CPU cores dedicated to packet acquisition
        worker_cpu_set: A set of integers representing CPU cores dedicated to analysis
    """
    self.management_cpu_set = management_cpu_set
    self.receive_cpu_set = receive_cpu_set
    self.worker_cpu_set = worker_cpu_set
get_raw(self)
    Get a raw representation of Threading that can be serialized and inserted into suricata.yaml file
Returns:
| Type | Description | 
|---|---|
| Dict | A dictionary containing the threading families | 
Source code in dynamite_nsm/services/base/config_objects/suricata/misc.py
          def get_raw(self) -> Dict:
    """Get a raw representation of Threading that can be serialized and inserted into `suricata.yaml` file
    Returns:
        A dictionary containing the threading families
    """
    thread_families = []
    if self.management_cpu_set:
        thread_families.append(
            {
                'management-cpu-set': {
                    'cpu': list(self.management_cpu_set)
                }
            }
        )
    if self.receive_cpu_set:
        thread_families.append(
            {
                'receive-cpu-set': {
                    'cpu': list(self.receive_cpu_set)
                }
            }
        )
    if self.worker_cpu_set:
        thread_families.append(
            {
                'worker-cpu-set': {
                    'cpu': list(self.worker_cpu_set),
                    'mode': 'exclusive',
                    'threads': len(self.worker_cpu_set)
                }
            }
        )
    return {
        'set-cpu-affinity': True,
        'cpu-affinity': thread_families
    }