Functions
prepare
from openmoa.datasets import Electricity
from capymoa.evaluation import prequential_evaluation
from capymoa.classifier import (
FESLClassifier, # NIPS‘2017
FOBOSClassifier, # JMLR‘2009
FTRLClassifier, # AISTATS‘2011
OASFClassifier, # BigData‘2024
RSOLClassifier, # SDM‘2024
ORF3VClassifier, # AAAI'2022
OVFMClassifier, # ICDM'2021
OSLMFClassifier, # AAAI'2022
OLD3SClassifier, # TKDE'2023
OWSSClassifier # ICDM'2024
)
Demo
"""Demo script for FESL Classifier - Simplified Version"""
import sys
import os
sys.path.insert(0, os.path.abspath('./src'))
from capymoa.datasets import Electricity
from capymoa.evaluation import prequential_evaluation
from capymoa.classifier import FESLClassifier
# Load stream
elec_stream = Electricity()
schema = elec_stream.schema
print(f"Number of features: {schema.get_num_attributes()}")
# Create FESL learner
fesl_learner = FESLClassifier(
schema=schema,
s1_feature_indices=[0, 1, 2, 3],
s2_feature_indices=[2, 3, 4, 5],
overlap_size=100,
switch_point=5000,
ensemble_method="selection",
learning_rate_scale=0.1,
random_seed=42
)
# Run evaluation with progress bar
results = prequential_evaluation(
stream=elec_stream,
learner=fesl_learner,
max_instances=10000,
window_size=100,
progress_bar=True # 开启进度条
)
# Print final results
print(f"Ensemble Method: {fesl_learner.ensemble_method}")
print(f"\nFinal Accuracy: {results['cumulative'].accuracy():.2f}%")
print(f"Kappa: {results['cumulative'].kappa():.4f}")
data loading functions
1. openmoa.data.load_real()
Function: Load real data stream flow.
Return: (X, y, feat_info)
- X: Scipy.sparse. csr_matrix has one sample per row, with dimensions increasing over time;
- y: Np.ndarray 0/1 label, where 1 indicates an exception;
- Feat_info: dict records the timestamp of adding/deleting features.
- Example:
X, y, feat_info = load_real(real_dataset)
2. openmoa.data.load_synthetic()
Function: Load synthetic data stream flow.
Return: (X, y, feat_info)
- X: Scipy.sparse. csr_matrix has one sample per row, with dimensions increasing over time;
- y: Np.ndarray 0/1 label, where 1 indicates an exception;
- Feat_info: dict records the timestamp of adding/deleting features.
- Example:
X, y, feat_info = load_synthetic(stnthetic_dataset)
data preprocessing functions
3. openmoa.preprocess datastream_select()
Function: Select the corresponding data stream feature space to process the original dataset.
Return: (X, y, feat_info)
- X: Scipy.sparse. csr_matrix has one sample per row, with dimensions increasing over time;
- y: Np.ndarray 0/1 label, where 1 indicates an exception;
- Feat_info: dict records the timestamp of adding/deleting features.
- Example:
X, y, feat_info = datastream_select(dataset)
4. openmoa.preprocess.open_scaler()
Function: Flow based robust normalizer (zero mean, unit variance), capable of incremental updates and automatically expanding mean/variance vectors as feature dimensions change.
Source: Universal module, but used as the default preprocessing in the robust experiment of SDM'23 RSOL.
API:
scaler = open_scaler(with_centering=True, clip_outliers=3.0)
for batch in stream:
X_batch = scaler.partial_fit_transform(batch)
5. openmoa.preprocess.elastic_projection()
Function: Elastic sparse mapping, when new features appear, the online learning projection matrix compresses the original high-dimensional space to a fixed k-dimension while retaining anomaly discriminative power.
Source: Elastic Sparse Projection module of IJCAI'25 SOAD.
API:
proj = elastic_projection(out_dim=128, l1_penalty=1e-3)
for X_batch in stream:
Z_batch = proj.partial_fit_transform(X_batch) # Z ∈ R^{n×128}
model functions
6. openmoa.model.SOADLearner()
Function: Sparse active online anomaly detector, implementing IJCAI'25 paper core algorithm:
- Integrate active selection (uncertainty+diversity+budget);
- Support incremental sparse weight updates in open feature spaces;
- Provide interfaces for. partial_fit (X, y=None) and. query (batch-budget).
Core parameters:
soad = SOADLearner(
budget_per_round=50, #Up to 50 tags can be queried per round
l1_reg=1e-4, #Sparse regularization
Forget_rate=0.01 # Anti concept drift
)
7. openmoa.model.OCURSketch()
Function: Online CUR row and column sketch, targeting SDM'24 ℓ 1, ∞ - MXed Norm CUR algorithm:
- Row sparsity constraint and variable column dimension;
- press (rank=r, budget=b) returns (C, U, R) in one step.
Example:
cur = OCURSketch(rank=50, row_sparsity=5)
for M_batch in matrix_stream:
C, U, R = cur.partial_compress(M_batch)
‘‘‘Dataset Loading (openmoa.dataset.*)’’’
stream_loader(), ds = om.dataset.stream_loader('synthetic_open', n_samples=1e6, feature_pace=500)Return a streaming dataset with an infinite iterator based on its name, and specify a feature drift strategyfile_stream(path, fmt='csv|jsonl|parquet'), ds = om.dataset.file_stream('s3://bucket/log.parquet')Read real-time append files from local files or S3/HDFSkafka_stream(topic, brokers, schema), ds = om.dataset.kafka_stream('iot-sensor', brokers='kafka:9092')Directly consume from Kafka topicsarxiv_open_citation_stream()Open feature drift flow reserved interface for real academic graphs
‘‘‘Pre-processing (openmoa.preprocess.*)’’’
adaptive_standardize(), ds = om.preprocess.adaptive_standardize(ds, alpha=0.01)Online mean variance standardization, supporting cold start for new featuresdrift_detector(), flag = om.preprocess.drift_detector(window=1000)Feature space drift detection (KL divergence/Maximum Mean Discrepancy)feature_hashing(n_buckets), ds = om.preprocess.feature_hashing(ds, n_buckets=2**20)Hash dimension reduction when feature space explodesmissing_value_imputer(strategy='mean|median|zero'), ds = om.preprocess.missing_value_imputer(strategy='zero')Fill in missing values online