config
Configuration wrappers for the main Suricata configuration file.
To import...
from dynamite_nsm.services.suricata import config as suricata_config
ConfigManager
Manage Suricata.yaml configuration
__init__(self, configuration_directory, verbose=False, stdout=True)
special
Configuration Manager for suricata.yaml file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration_directory |
str |
The path to the Suricata configuration directory (E.G /etc/dynamite/suricata) |
required |
Instance Variables:
Directories and Files:
suricata_log_output_file
- Directory where logs are writtendefault_rules_directory
- Directory where rules liveclassification_file
- The file (path) that maps severity to various [class]types (classification.config)reference_config_file
- The file (path) to the reference.config file
Network Interface Setup:
af_packet_interfaces
- Alist
ofmisc.AfPacketInterfaces
representing suricata monitored interfacespcap_interfaces
- Alist
ofmisc.PcapInterfaces
(libpcap support if af_packet isn't possible)
Rules:
rule_files
- Alist
of suricatarules.Rules
(rulesets)
Address Groups:
home_net
external_net
http_servers
sql_servers
dns_servers
telnet_servers
aim_servers
dc_servers
dnp3_servers
modbus_servers
enip_server
Port Groups:
http_ports
shellcode_ports
oracle_ports
ssh_ports
dnp3_ports
modbus_ports
file_data_ports
ftp_ports
Source code in dynamite_nsm/services/suricata/config.py
def __init__(self, configuration_directory: str, verbose: Optional[bool] = False, stdout: Optional[bool] = True):
"""Configuration Manager for suricata.yaml file
Args:
configuration_directory: The path to the Suricata configuration directory (E.G /etc/dynamite/suricata)
___
# Instance Variables:
## Directories and Files:
- `suricata_log_output_file` - Directory where logs are written
- `default_rules_directory` - Directory where rules live
- `classification_file` - The file (path) that maps severity to various [class]types (classification.config)
- `reference_config_file` - The file (path) to the reference.config file
## Network Interface Setup:
- `af_packet_interfaces` - A `list` of `misc.AfPacketInterfaces` representing suricata monitored interfaces
- `pcap_interfaces` - A `list` of `misc.PcapInterfaces` (libpcap support if af_packet isn't possible)
## Rules:
- `rule_files` - A `list` of suricata `rules.Rules` (rulesets)
## Address Groups:
> <sup>[See syntax.](https://suricata.readthedocs.io/en/suricata-6.0.0/configuration/suricata-yaml.html#rule-vars)</sup>
- `home_net`
- `external_net`
- `http_servers`
- `sql_servers`
- `dns_servers`
- `telnet_servers`
- `aim_servers`
- `dc_servers`
- `dnp3_servers`
- `modbus_servers`
- `enip_server`
## Port Groups:
> <sup>[See syntax.](https://suricata.readthedocs.io/en/suricata-6.0.0/configuration/suricata-yaml.html#rule-vars)</sup>
- `http_ports`
- `shellcode_ports`
- `oracle_ports`
- `ssh_ports`
- `dnp3_ports`
- `modbus_ports`
- `file_data_ports`
- `ftp_ports`
"""
extract_tokens = {
'runmode': ('runmode',),
'home_net': ('vars', 'address-groups', 'HOME_NET'),
'external_net': ('vars', 'address-groups', 'EXTERNAL_NET'),
'http_servers': ('vars', 'address-groups', 'HTTP_SERVERS'),
'sql_servers': ('vars', 'address-groups', 'SQL_SERVERS'),
'dns_servers': ('vars', 'address-groups', 'DNS_SERVERS'),
'telnet_servers': ('vars', 'address-groups', 'TELNET_SERVERS'),
'aim_servers': ('vars', 'address-groups', 'AIM_SERVERS'),
'dc_servers': ('vars', 'address-groups', 'DC_SERVERS'),
'dnp3_servers': ('vars', 'address-groups', 'DNP3_SERVERS'),
'modbus_client': ('vars', 'address-groups', 'MODBUS_CLIENT'),
'modbus_server': ('vars', 'address-groups', 'MODBUS_SERVER'),
'enip_client': ('vars', 'address-groups', 'ENIP_CLIENT'),
'enip_server': ('vars', 'address-groups', 'ENIP_SERVER'),
'http_ports': ('vars', 'port-groups', 'HTTP_PORTS'),
'shellcode_ports': ('vars', 'port-groups', 'SHELLCODE_PORTS'),
'oracle_ports': ('vars', 'port-groups', 'ORACLE_PORTS'),
'ssh_ports': ('vars', 'port-groups', 'SSH_PORTS'),
'dnp3_ports': ('vars', 'port-groups', 'DNP3_PORTS'),
'modbus_ports': ('vars', 'port-groups', 'MODBUS_PORTS'),
'file_data_ports': ('vars', 'port-groups', 'FILE_DATA_PORTS'),
'ftp_ports': ('vars', 'port-groups', 'FTP_PORTS'),
'default_log_directory': ('default-log-dir',),
'suricata_log_output_file': ('logging', 'outputs', 'file', 'filename'),
'default_rules_directory': ('default-rule-path',),
'classification_file': ('classification-file',),
'reference_config_file': ('reference-config-file',),
'_af_packet_interfaces_raw': ('af-packet',),
'_rule_files_raw': ('rule-files',),
'_threading_raw': ('threading',)
}
self.configuration_directory = configuration_directory
self.config_data = None
self.runmode = None
self.home_net = None
self.external_net = None
self.http_servers = None
self.sql_servers = None
self.dns_servers = None
self.telnet_servers = None
self.aim_servers = None
self.dc_servers = None
self.modbus_client = None
self.modbus_server = None
self.enip_client = None
self.enip_server = None
self.http_ports = None
self.shellcode_ports = None
self.oracle_ports = None
self.ssh_ports = None
self.dnp3_ports = None
self.modbus_ports = None
self.ftp_ports = None
self.file_data_ports = None
self.default_log_directory = None
self.suricata_log_output_file = None
self.default_rules_directory = None
self.classification_file = None
self.reference_config_file = None
self._af_packet_interfaces_raw = []
self._rule_files_raw = []
self._threading_raw = {}
self.suricata_config_file = os.path.join(self.configuration_directory, 'suricata.yaml')
try:
with open(self.suricata_config_file, 'r') as configyaml:
self.config_data_raw = load(configyaml, Loader=Loader)
except (IOError, ValueError):
raise general_exceptions.ReadConfigError(f'Failed to read or parse {self.suricata_config_file}.')
super().__init__(self.config_data_raw, name='suricata.config', verbose=verbose, stdout=stdout, **extract_tokens)
self.parse_yaml_file()
self.rules = rules.Rules()
for rule_name in self.list_available_rule_names():
if rule_name in self._rule_files_raw:
self.rules.add(rules.Rule(rule_name, enabled=True))
else:
self.rules.add(rules.Rule(rule_name, enabled=False))
self.af_packet_interfaces = misc.AfPacketInterfaces(
[misc.AfPacketInterface(
cluster_id=af_packet_interface_raw.get('cluster-id'),
cluster_type=af_packet_interface_raw.get('cluster-type'),
interface_name=af_packet_interface_raw.get('interface'),
bpf_filter=af_packet_interface_raw.get('bpf-filter'),
threads=af_packet_interface_raw.get('threads')
) for af_packet_interface_raw in self._af_packet_interfaces_raw]
)
thread_families = self._threading_raw.get('cpu-affinity', [])
management_cpu_set, receive_cpu_set, worker_cpu_set = None, None, None
for thread_family in thread_families:
if 'management-cpu-set' in thread_family.keys():
management_cpu_set = thread_family.get('management-cpu-set', {}).get('cpu', [])
elif 'receive-cpu-set' in thread_family.keys():
receive_cpu_set = thread_family.get('receive-cpu-set', {}).get('cpu', [])
elif 'worker-cpu-set' in thread_family.keys():
worker_cpu_set = thread_family.get('worker-cpu-set', {}).get('cpu', [])
self.threading = misc.Threading(management_cpu_set, receive_cpu_set, worker_cpu_set)
commit(self, out_file_path=None, backup_directory=None, top_text=None)
Write out an updated configuration file, and optionally backup the old one.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
out_file_path |
Optional[str] |
The path to the output file; if none given overwrites existing |
None |
backup_directory |
Optional[str] |
The path to the backup directory |
None |
top_text |
Optional[str] |
If specified, the first line of the configuration file will be set to the value of your choosing. |
None |
Returns:
Type | Description |
---|---|
None |
None |
Source code in dynamite_nsm/services/suricata/config.py
def commit(self, out_file_path: Optional[str] = None, backup_directory: Optional[str] = None,
top_text: Optional[str] = None) -> None:
"""Write out an updated configuration file, and optionally backup the old one.
Args:
out_file_path: The path to the output file; if none given overwrites existing
backup_directory: The path to the backup directory
top_text: If specified, the first line of the configuration file will be set to the value of your choosing.
Returns:
None
"""
if not out_file_path:
out_file_path = f'{self.configuration_directory}/suricata.yaml'
if not top_text:
top_text = '%YAML 1.1\n---'
self._rule_files_raw = self.rules.get_raw()
self._af_packet_interfaces_raw = self.af_packet_interfaces.get_raw()
self._threading_raw = self.threading.get_raw()
super(ConfigManager, self).commit(out_file_path, backup_directory, top_text=top_text)
from_raw_text(raw_text, configuration_directory=None)
classmethod
Alternative method for creating configuration file from raw text
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raw_text |
str |
The string representing the configuration file |
required |
configuration_directory |
Optional[str] |
The configuration directory for Suricata |
None |
Returns:
Type | Description |
---|---|
ConfigManager |
An instance of ConfigManager |
Source code in dynamite_nsm/services/suricata/config.py
@classmethod
def from_raw_text(cls, raw_text: str, configuration_directory: Optional[str] = None) -> ConfigManager:
"""Alternative method for creating configuration file from raw text
Args:
raw_text: The string representing the configuration file
configuration_directory: The configuration directory for Suricata
Returns:
An instance of ConfigManager
"""
tmp_dir = f'{const.CONFIG_PATH}/.tmp'
tmp_config = f'{tmp_dir}/suricata.yaml'
utilities.makedirs(tmp_dir)
with open(tmp_config, 'w') as out_f:
out_f.write(raw_text)
c = cls(configuration_directory=tmp_dir)
if configuration_directory:
c.configuration_directory = configuration_directory
return c
list_available_rule_names(self)
List the names of all available Suricata rules.
Returns:
Type | Description |
---|---|
List[str] |
A list of Suricata rule names that can be enabled |
Source code in dynamite_nsm/services/suricata/config.py
def list_available_rule_names(self) -> List[str]:
"""List the names of all available Suricata rules.
Returns:
A list of Suricata rule names that can be enabled
"""
return [rule for rule in os.listdir(f'{self.configuration_directory}/rules') if rule.endswith('.rules')]
reset(self, inspect_interfaces, out_file_path=None, default_config_path=None)
Reset a configuration file back to its default
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inspect_interfaces |
List[str] |
A list of network interfaces to capture on (E.G ["mon0", "mon1"]) |
required |
out_file_path |
Optional[str] |
The path to the output file |
None |
default_config_path |
Optional[str] |
The path to the default configuration |
None |
Returns:
Type | Description |
---|---|
|
None |
Source code in dynamite_nsm/services/suricata/config.py
def reset(self, inspect_interfaces: List[str], out_file_path: Optional[str] = None,
default_config_path: Optional[str] = None):
"""Reset a configuration file back to its default
Args:
inspect_interfaces: A list of network interfaces to capture on (E.G ["mon0", "mon1"])
out_file_path: The path to the output file
default_config_path: The path to the default configuration
Returns:
None
"""
if not install.BaseInstallManager.validate_inspect_interfaces(inspect_interfaces):
raise install.NetworkInterfaceNotFound(inspect_interfaces)
if not out_file_path:
out_file_path = f'{self.configuration_directory}/suricata.yaml'
if not default_config_path:
default_config_path = f'{const.DEFAULT_CONFIGS}/suricata/suricata.yaml'
super(ConfigManager, self).reset(out_file_path, default_config_path)
self.af_packet_interfaces = misc.AfPacketInterfaces()
for interface in inspect_interfaces:
self.af_packet_interfaces.add(
misc.AfPacketInterface(
interface_name=interface, threads='auto', cluster_id=random.randint(1, 50000),
cluster_type='cluster_qm'
)
)
self.commit(out_file_path=out_file_path)
lookup_rule_definition(rule_id)
Return the definition, categories, and friendly_name of a given script
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rule_id |
str |
A unique identifier representing a Suricata rule. |
required |
Returns:
Type | Description |
---|---|
Dict |
A dictionary of the format {"friendly_name" |
Source code in dynamite_nsm/services/suricata/config.py
def lookup_rule_definition(rule_id: str) -> Dict:
"""Return the definition, categories, and friendly_name of a given script
Args:
rule_id: A unique identifier representing a Suricata rule.
Returns:
A dictionary of the format {"friendly_name" <str>, "description" <str>, "categories" <list>}
"""
try:
suricata_rule_defs = os.path.join(const.DEFAULT_CONFIGS, 'suricata', 'suricata_rule_definitions.json')
with open(suricata_rule_defs) as f:
suricata_defs = json.load(f)
except FileNotFoundError:
suricata_defs = {}
definition = suricata_defs.get(str(rule_id))
return definition