Core Concepts
Understand streams, online learning, and drift detection — the foundations of OpenMOA.
OpenMOA is a machine learning framework designed for data streams. Unlike traditional batch learning where all data is available upfront, stream learning processes data that arrives one instance at a time, cannot be revisited, and whose distribution may change over time. OpenMOA is built around four core abstractions: Stream, Instance, Learner, and Evaluator.
1. Stream — Data Stream
The Stream is the fundamental abstraction in OpenMOA. All data sources implement the Stream interface, providing iterator-based access to instances one at a time.
Core Interface Methods:
class Stream(ABC):
def next_instance(self) -> Instance:
"""Return the next instance from the stream"""
def has_more_instances(self) -> bool:
"""Check if more instances are available"""
def get_schema(self) -> Schema:
"""Return stream schema (num features, classes, etc.)"""
def restart(self):
"""Reset stream to the beginning"""
Stream also implements Python's Iterator protocol, supporting for instance in stream: syntax.
Built-in Stream Types:
| Type | Description | Example |
|---|---|---|
ARFFStream |
Load from ARFF files | ARFFStream("data.arff") |
CSVStream |
Load from CSV files | CSVStream("data.csv") |
LibsvmStream |
Load from LibSVM format (sparse) | LibsvmStream("data.libsvm") |
NumpyStream |
Create from NumPy arrays | NumpyStream(X, y) |
MOAStream |
Wrap MOA Java stream objects | For MOA generators |
ConcatStream |
Concatenate multiple streams | ConcatStream([s1, s2]) |
Built-in Datasets (Auto-download):
from openmoa.datasets import Electricity, RCV1, Magic04
stream = Electricity() # Auto-downloads on first use
Includes binary classification (Australian, Spambase, Adult, RCV1), multi-class (DryBean, Optdigits), and regression (Bike, Fried) tasks.
2. Schema — Stream Structure
The Schema describes metadata about a stream — it's the bridge between streams and learners.
schema = stream.get_schema()
schema.get_num_attributes() # Number of features (excluding target)
schema.get_num_classes() # Number of classes (classification)
schema.is_classification() # Is this a classification task?
schema.is_regression() # Is this a regression task?
schema.get_label_values() # List of possible class labels
Schema is passed to learners during initialization to ensure model structure matches the data.
3. Instance — Data Point
Each data point read from a stream is an Instance object.
Classification Instance (LabeledInstance):
instance = stream.next_instance()
instance.x # numpy.ndarray — feature vector
instance.y_index # int — class index (0, 1, 2, ...)
instance.y_label # str — class label ("positive", "negative", ...)
Regression Instance (RegressionInstance):
instance.x # numpy.ndarray — feature vector
instance.y_value # float — target continuous value
4. Learner — Online Learner
All learners follow a unified train + predict interface.
Classifier Base Class:
class Classifier(ABC):
def __init__(self, schema: Schema, random_seed: int = 1): ...
def train(self, instance: LabeledInstance) -> None:
"""Incrementally train on a single instance"""
def predict(self, instance: Instance) -> int:
"""Return predicted class index"""
def predict_proba(self, instance: Instance) -> np.ndarray:
"""Return probability distribution over classes"""
Regressor Base Class:
class Regressor(ABC):
def train(self, instance: RegressionInstance) -> None: ...
def predict(self, instance: RegressionInstance) -> float: ...
Learner Sources:
| Type | Description | Examples |
|---|---|---|
| MOA Algorithms | Java implementation via JPype | HoeffdingTree, NaiveBayes |
| Native Python | Pure Python/NumPy implementation | FOBOSClassifier, FTRLClassifier |
| Scikit-learn Wrappers | Wrap sklearn models with partial_fit |
SKClassifier(SGDClassifier()) |
| Batch Learners | Support mini-batch training | Inherit BatchClassifier |
5. Online Learning Protocol
OpenMOA uses the Prequential (Test-Then-Train) evaluation protocol. For each arriving instance:
for instance in stream:
# ① Predict with current model
prediction = learner.predict(instance)
# ② Record result (correct/incorrect)
evaluator.update(truth, prediction)
# ③ Train on the instance
learner.train(instance)
This ensures each instance is evaluated before being used for training — the standard evaluation paradigm for stream learning.
Using the Built-in Evaluation Function:
from openmoa.evaluation import prequential_evaluation
results = prequential_evaluation(
stream=stream,
learner=learner,
max_instances=10000,
window_size=1000 # Sliding window size
)
print(results.cumulative.accuracy()) # Cumulative accuracy
print(results.windowed.accuracy()) # Sliding window accuracy
6. Evaluator
ClassificationEvaluator tracks the following metrics:
accuracy()— Accuracykappa()— Cohen's Kappa (handles class imbalance)precision()/recall()/f1_score()— Classification metricsmetrics_dict()— Get all metrics at once
Two Evaluation Modes:
| Mode | Description |
|---|---|
| Cumulative | Overall metrics computed from the first instance |
| Windowed | Metrics based on only the last N instances — reflects recent performance |
RegressionEvaluator metrics: MSE, RMSE, MAE, R²
7. Drift Detection
In real data streams, the statistical distribution may change over time (concept drift). OpenMOA provides drift detectors to monitor these changes.
Drift Detector Interface:
class BaseDriftDetector(ABC):
def add_element(self, element: float) -> None:
"""Input an observation (usually prediction error: 0 or 1)"""
def detected_change(self) -> bool:
"""Has drift been detected?"""
def detected_warning(self) -> bool:
"""Is the detector in warning zone?"""
Built-in Detectors:
Usage Example:
from openmoa.drift.detectors import ADWIN
detector = ADWIN(delta=0.002)
for instance in stream:
prediction = learner.predict(instance)
is_correct = (prediction == instance.y_index)
detector.add_element(1 - is_correct) # Input error
if detector.detected_change():
print("Drift detected! Consider resetting the learner.")
learner = create_new_learner() # Reset model
learner.train(instance)
8. Stream Wrappers — Feature Space Evolution
A unique feature of OpenMOA is simulating feature space drift (changing feature dimensions) through Stream Wrappers.
Two Approaches:
| Approach | Output Dimension | Missing Representation | Suitable Algorithms |
|---|---|---|---|
OpenFeatureStream |
Variable length | Not in vector | Sparse-aware (FOBOS, ORF3V) |
TrapezoidalStream / CapriciousStream / EvolvableStream |
Fixed length | NaN placeholder | Missing value handling (OVFM) |
Three Evolution Patterns:
| Pattern | Full Name | Behavior |
|---|---|---|
| TDS | Trapezoidal Data Stream | Features "born" gradually, dimension monotonically increases |
| CDS | Capricious Data Stream | Features randomly appear/disappear at each timestep |
| EDS | Evolvable Data Stream | Feature space replaces in segments with transition periods |
Combined Usage:
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream
# Layer 1: Shuffle to remove label ordering bias
base = ShuffledStream(Magic04(), random_seed=42)
# Layer 2: Inject TDS feature evolution
stream = OpenFeatureStream(
base, evolution_pattern="tds", tds_mode="ordered",
total_instances=19020, random_seed=42
)
9. Architecture Overview
┌─────────────────────┐
│ Datasets │
│ (Auto-download) │
└────────┬────────────┘
│
┌────────▼────────────┐
│ Stream Layer │
│ ARFF/CSV/LibSVM/ │
│ Numpy/MOA Generator │
└────────┬────────────┘
│
┌────────▼────────────┐
│ Stream Wrappers │
│ Shuffle / TDS / │
│ CDS / EDS │
└────────┬────────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────────▼──────┐ ┌────▼─────┐ ┌──────▼───────┐
│ MOA Learners │ │ Python │ │ Sklearn │
│ (Java/JPype) │ │ Learners │ │ Wrappers │
└────────┬──────┘ └────┬─────┘ └──────┬───────┘
│ │ │
└──────────────┼──────────────┘
│
┌────────▼────────────┐
│ Evaluation │
│ Prequential / Drift │
│ Classification / │
│ Regression │
└─────────────────────┘