Source code for maltoolbox.attackgraph.analyzers.apriori

"""
MAL-Toolbox Attack Graph Apriori Analyzer Submodule

This submodule contains analyzers that are relevant before attackers are even
connected to the attack graph.
Currently these are:
- Viability = Determine if a node can be traversed under any circumstances or
  if the model structure makes it unviable.
- Necessity = Determine if a node is necessary for the attacker or if the
  model structure means it is not needed(it behaves as if it were already
  compromised) to compromise children attack steps.
"""

from __future__ import annotations
from typing import Optional, TYPE_CHECKING
import logging

if TYPE_CHECKING:
    from ..attackgraph import AttackGraph
    from ..node import AttackGraphNode

logger = logging.getLogger(__name__)

[docs] def propagate_viability_from_node(node: AttackGraphNode) -> None: """ Arguments: node - the attack graph node from which to propagate the viable status """ logger.debug( 'Propagate viability from "%s"(%d) with viability status %s.', node.full_name, node.id, node.is_viable ) for child in node.children: original_value = child.is_viable if child.type == 'or': child.is_viable = False for parent in child.parents: child.is_viable = child.is_viable or parent.is_viable if child.type == 'and': child.is_viable = False if child.is_viable != original_value: propagate_viability_from_node(child)
[docs] def propagate_necessity_from_node(node: AttackGraphNode) -> None: """ Arguments: node - the attack graph node from which to propagate the necessary status """ logger.debug( 'Propagate necessity from "%s"(%d) with necessity status %s.', node.full_name, node.id, node.is_necessary ) for child in node.children: if child.ttc and child.ttc.get('name', None) not in ['Enabled', 'Disabled', 'Instant']: # Do not propagate unnecessary state from nodes that have a TTC # probability distribution associated with them. # TODO: Evaluate this more carefully, how do we want to have TTCs # impact necessity and viability. # TODO: Have this condition be any probability that has a # Bernoulli component continue original_value = child.is_necessary if child.type == 'or': child.is_necessary = False if child.type == 'and': child.is_necessary = False for parent in child.parents: child.is_necessary = child.is_necessary or parent.is_necessary # TODO: Update TTC for child attack step before if it is not necessary # before propagating it further. if child.is_necessary != original_value: propagate_necessity_from_node(child)
[docs] def evaluate_viability(node: AttackGraphNode) -> None: """ Arguments: graph - the node to evaluate viability for. """ match (node.type): case 'exist': assert isinstance(node.existence_status, bool), \ f'Existence status not defined for {node.full_name}.' node.is_viable = node.existence_status case 'notExist': assert isinstance(node.existence_status, bool), \ f'Existence status not defined for {node.full_name}.' node.is_viable = not node.existence_status case 'defense': assert node.defense_status is not None and \ 0.0 <= node.defense_status <= 1.0, \ f'{node.full_name} defense status invalid: {node.defense_status}.' node.is_viable = node.defense_status != 1.0 case 'or': node.is_viable = False for parent in node.parents: node.is_viable = node.is_viable or parent.is_viable case 'and': node.is_viable = True for parent in node.parents: node.is_viable = node.is_viable and parent.is_viable case _: msg = ('Evaluate viability was provided node "%s"(%d) which ' 'is of unknown type "%s"') logger.error(msg, node.full_name, node.id, node.type) raise ValueError(msg % (node.full_name, node.id, node.type))
[docs] def evaluate_necessity(node: AttackGraphNode) -> None: """ Arguments: graph - the node to evaluate necessity for. """ match (node.type): case 'exist': assert isinstance(node.existence_status, bool), \ f'Existence status not defined for {node.full_name}.' node.is_necessary = not node.existence_status case 'notExist': assert isinstance(node.existence_status, bool), \ f'Existence status not defined for {node.full_name}.' node.is_necessary = bool(node.existence_status) case 'defense': assert node.defense_status is not None and \ 0.0 <= node.defense_status <= 1.0, \ f'{node.full_name} defense status invalid: {node.defense_status}.' node.is_necessary = node.defense_status != 0.0 case 'or': node.is_necessary = True for parent in node.parents: node.is_necessary = node.is_necessary and parent.is_necessary case 'and': node.is_necessary = False for parent in node.parents: node.is_necessary = node.is_necessary or parent.is_necessary case _: msg = ('Evaluate necessity was provided node "%s"(%d) which ' 'is of unknown type "%s"') logger.error(msg, node.full_name, node.id, node.type) raise ValueError(msg % (node.full_name, node.id, node.type))
[docs] def evaluate_viability_and_necessity(node: AttackGraphNode) -> None: """ Arguments: graph - the node to evaluate viability and necessity for. """ evaluate_viability(node) evaluate_necessity(node)
[docs] def calculate_viability_and_necessity(graph: AttackGraph) -> None: """ Arguments: graph - the attack graph for which we wish to determine the viability and necessity statuses for the nodes. """ for node in graph.nodes.values(): if node.type in ['exist', 'notExist', 'defense']: evaluate_viability_and_necessity(node) if not node.is_viable: propagate_viability_from_node(node) if not node.is_necessary: propagate_necessity_from_node(node)
[docs] def prune_unviable_and_unnecessary_nodes(graph: AttackGraph) -> None: """ Arguments: graph - the attack graph for which we wish to remove the the nodes which are not viable or necessary. """ logger.debug( 'Prune unviable and unnecessary nodes from the attack graph.') nodes_to_remove = set() for node in graph.nodes.values(): if node.type in ('or', 'and') and \ (not node.is_viable or not node.is_necessary): nodes_to_remove.add(node) # Do the removal separatly so we don't remove # nodes from a set we are looping over for node in nodes_to_remove: graph.remove_node(node)
[docs] def propagate_viability_from_unviable_node( unviable_node: AttackGraphNode, ) -> set[AttackGraphNode]: """ Update viability of nodes affected by newly enabled defense `unviable_node` in the graph and return any attack steps that are no longer viable because of it. Propagate recursively via children as long as changes occur. Arguments: unviable_node - the node to propagate viability from Returns: attack_steps_made_unviable - set of the attack steps that have been made unviable by a defense enabled in the current step. Builds up recursively. """ attack_steps_made_unviable = set() logger.debug( 'Update viability for node "%s"(%d)', unviable_node.full_name, unviable_node.id ) assert not unviable_node.is_viable, ( "propagate_viability_from_unviable_node should not be called" f" on viable node {unviable_node.full_name}" ) if unviable_node.type in ('and', 'or'): attack_steps_made_unviable.add(unviable_node) for child in unviable_node.children: original_value = child.is_viable if child.type == 'or': child.is_viable = False for parent in child.parents: child.is_viable = child.is_viable or parent.is_viable if child.type == 'and': child.is_viable = False if child.is_viable != original_value: attack_steps_made_unviable |= \ propagate_viability_from_unviable_node(child) return attack_steps_made_unviable