Source code for titli.fe.base_feature_extractor
"""Base feature extractor for network traffic analysis."""
import json
import pickle
from abc import ABC, abstractmethod
from io import TextIOWrapper
from pathlib import Path
import numpy as np
from scapy.all import PcapReader
class JSONEncoder(json.JSONEncoder):
"""Custom JSON encoder for handling Path, TextIOWrapper, and numpy types."""
def default(self, obj):
"""Convert non-serializable objects to JSON-serializable format.
Args:
obj: Object to serialize
Returns:
JSON-serializable representation of the object
"""
if isinstance(obj, Path):
return str(obj)
if isinstance(obj, TextIOWrapper):
return obj.name
if isinstance(obj, np.float32):
return float(obj)
return super().default(obj)
[docs]
class BaseTrafficFeatureExtractor(ABC):
"""Abstract base class for network traffic feature extraction.
This class provides the framework for extracting features from network traffic
captured in PCAP files. Subclasses must implement the abstract methods to define
specific feature extraction logic.
Attributes:
file_path (str): Path to the input PCAP file
state: Optional pre-existing state to continue from previous extraction
feature_file: Output file handle for extracted features
meta_file: Output file handle for traffic vector metadata
count (int): Number of packets successfully processed
skipped (int): Number of packets skipped during processing
"""
[docs]
def __init__(self, file_path, dataset_name=None, state=None, **kwargs):
"""Initialize the feature extractor.
Args:
file_path (str): Path to the PCAP file to process
dataset_name (str, optional): Name of the dataset (deprecated, not used)
state (NetStat, optional): Pre-existing state to continue from. If None,
starts fresh extraction
**kwargs: Additional arguments for subclass customization
"""
self.file_path = file_path
self.state = state
[docs]
@abstractmethod
def update(self, traffic_vector):
"""Update the feature extractor with a new traffic vector.
This method processes a traffic vector and updates the internal state
of the feature extractor, returning the computed features.
Args:
traffic_vector (np.ndarray): Traffic vector extracted from packet(s)
Returns:
np.ndarray: Extracted features corresponding to the traffic vector
"""
pass
[docs]
@abstractmethod
def peek(self, traffic_vectors):
"""Simulate feature extraction without updating internal state.
This method performs a "dry run" of feature extraction without modifying
the extractor's state. Useful for adversarial attacks or what-if analysis.
Args:
traffic_vectors (list): List of traffic vectors to process
Returns:
list: List of features corresponding to each traffic vector
"""
pass
[docs]
@abstractmethod
def get_traffic_vector(self, packet):
"""Extract traffic vector from a raw network packet.
Args:
packet (scapy.packet.Packet): Input packet to process
Returns:
np.ndarray or None: Extracted traffic vector, or None if packet should be skipped
"""
pass
[docs]
def setup(self, output_path=None):
"""Set up the feature extractor for processing.
Opens the input PCAP file, creates output CSV files for features and metadata,
and initializes processing counters and state management flags.
Args:
output_path (str or Path, optional): Custom path for the output feature file.
If None, creates the feature file in the same directory as the input PCAP
with a .csv extension. The metadata file will be created with a '_meta.csv'
suffix in the same directory.
Side Effects:
- Opens input PCAP file for reading
- Creates and opens feature and metadata CSV files for writing
- Initializes count, skipped counters to 0
- Sets state management flags based on whether pre-existing state was provided
"""
self.path = Path(self.file_path)
if output_path is not None:
feature_file = Path(output_path)
meta_file = feature_file.parent / (feature_file.stem + "_meta.csv")
else:
feature_file = self.path.with_suffix(".csv")
meta_file = self.path.parent / (self.path.stem + "_meta.csv")
self.feature_file = open(feature_file, "w")
self.meta_file = open(meta_file, "w")
self.feature_file.write(",".join(self.get_headers()) + "\n")
self.meta_file.write(",".join(self.get_meta_headers()) + "\n")
self.count = 0
self.skipped = 0
self.input_pcap = PcapReader(str(self.path))
if self.state is not None:
self.reset_state = False
self.save_state = False
self.offset_timestamp = True
else:
self.reset_state = True
self.save_state = True
self.offset_timestamp = False
self.offset_time = None
[docs]
@abstractmethod
def get_headers(self):
"""Get the column names for the feature CSV file.
Returns:
list[str]: List of feature column names
"""
pass
[docs]
@abstractmethod
def get_meta_headers(self):
"""Get the column names for the metadata/traffic vector CSV file.
Returns:
list[str]: List of metadata column names
"""
pass
[docs]
def teardown(self):
"""Clean up resources and finalize feature extraction.
Closes all open files (PCAP input, feature output, metadata output),
prints processing statistics, and saves the extractor state if configured.
Side Effects:
- Closes all open file handles
- Prints processing statistics (skipped, processed, written counts)
- Saves state to 'state.pkl' in the PCAP directory if save_state is True
"""
self.meta_file.close()
self.feature_file.close()
self.input_pcap.close()
print(
f"skipped: {self.skipped} processed: {self.count+self.skipped} written: {self.count}"
)
if self.save_state:
state_path = self.path.parent / "state.pkl"
state_path.parent.mkdir(parents=True, exist_ok=True)
with open(state_path, "wb") as pf:
pickle.dump(self.state, pf)
[docs]
@abstractmethod
def extract_features(self):
"""Main entry point for feature extraction from PCAP file.
This method should implement the complete feature extraction pipeline:
reading packets from the input PCAP, extracting traffic vectors,
computing features, and writing results to output files.
Must call setup() before and teardown() after processing.
"""
pass