Stream Wrapper
Deep analysis of stream_wrapper.py — the core decorator module that transforms static datasets into dynamically evolving feature-space data streams.
1. Overview & Purpose
stream_wrapper.py is the core decorator module of the OpenMOA stream processing framework. It provides a suite of Stream Wrappers that convert static, fixed-dimensional datasets into dynamic data streams with evolving feature spaces — simulating real-world feature drift scenarios for online learning benchmarks.
Problems Solved
ShuffledStream randomizes instance order to remove inherent label ordering
OpenFeatureStream outputs variable-length vectors with global index mapping
TrapezoidalStream simulates features "born" progressively over time
CapriciousStream simulates stochastic feature availability via Bernoulli trials
EvolvableStream replaces entire feature partitions across time segments
feature_indices metadata ensures learners map local positions to global feature IDs
2. Architecture Position
Stream wrappers sit between the base stream layer and the learner layer, acting as transparent decorators that enrich the data pipeline:
┌─────────────────────────┐
│ Datasets │
│ (UCI / MOA / Custom) │
└────────────┬──────────────┘
│
┌────────────▼──────────────┐
│ Base Streams │
│ ARFF / CSV / LibSVM / │
│ Numpy / MOA Generator │
└────────────┬──────────────┘
│
┌───────────────────▼───────────────────┐
│ │
│ Stream Wrappers ★ │
│ │
│ ShuffledStream OpenFeatureStream │
│ TrapezoidalStream CapriciousStream │
│ EvolvableStream │
│ │
└───────────────────┬───────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───────▼───────┐ ┌───▼────┐ ┌──────▼──────┐
│ MOA Learners │ │ Python │ │ Sklearn │
│ (Java/JPype) │ │Learners│ │ Wrappers │
└───────┬───────┘ └───┬────┘ └──────┬──────┘
│ │ │
└─────────────┼─────────────┘
│
┌────────────▼──────────────┐
│ Evaluation │
│ Prequential / Holdout / │
│ Classification / Drift │
└───────────────────────────┘
3. Class Reference
The module defines five wrapper classes, all inheriting from the Stream abstract base class:
OpenFeatureStream
Variable-length output wrapper supporting 6 evolution modes (pyramid, incremental, decremental, TDS, CDS, EDS). Attaches feature_indices metadata to each instance.
TrapezoidalStream
Fixed-dimension + NaN-fill TDS simulator. Features appear gradually based on a priority ranking; inactive positions filled with np.nan.
CapriciousStream
Fixed-dimension + NaN-fill CDS simulator. Each feature undergoes independent Bernoulli trial at every timestep to determine availability.
EvolvableStream
Fixed-dimension + NaN-fill EDS simulator. Feature space is partitioned and activated in segments with configurable overlap transitions.
ShuffledStream
In-memory buffer + random shuffle wrapper. Loads all instances at init, then serves them in randomized order to eliminate label sorting bias.
Two Approaches: Variable vs. Fixed Dimension
OpenFeatureStream
Variable Dimension| Output dimension | = number of active features |
| Missing features | Not in vector at all |
feature_indices | Attached as metadata |
| Target algorithms | Sparse-aware (FOBOS, ORF3V) |
| Memory efficiency | Higher (short vectors) |
Trapezoidal / Capricious / Evolvable
Fixed Dimension| Output dimension | = d_max (constant) |
| Missing features | np.nan placeholder |
feature_indices | Not provided |
| Target algorithms | NaN-handling (OVFM, OSLMF) |
| Memory efficiency | Lower (full-length vectors) |
4. Evolution Patterns
Six distinct evolution patterns are supported, each modelling a different real-world feature drift scenario:
4.1 Dimension-Schedule Patterns
These three patterns use a pre-computed dimension schedule — the number of active features varies deterministically with time.
Pyramid
Dimension over time:
d_max ─ ╱╲
╱ ╲
d_min ─ ___╱ ╲___
0 T/2 T
Incremental
Dimension over time:
d_max ─ ___
╱
╱
d_min ─ _____╱
0 T
Decremental
Dimension over time:
d_max ─ ___
╲
╲
d_min ─ ╲_____
0 T
For each timestep, the target dimension d(t) is computed via np.linspace, and the specific feature subset is determined by the feature_selection strategy:
| Strategy | Selection Rule | Example (d=3, d_max=10) |
|---|---|---|
prefix | First d features | [0, 1, 2] |
suffix | Last d features | [7, 8, 9] |
random | Random d features (seeded) | [2, 5, 8] |
4.2 Trapezoidal Data Stream (TDS)
Each feature is assigned a birth time. Features become active once the stream passes their birth time, and remain active permanently — so dimensionality increases monotonically.
TDS Ordered Mode (d_max=10, total_instances=1000)
Feature: 0 1 2 3 4 5 6 7 8 9
Birth time: 0 0 0 100 100 200 200 300 300 400
|________|________|________|________|
Bucket 0 Bucket 1 Bucket 2 Bucket 3
Timeline:
t = 0 active = [0, 1, 2] 3 features
t = 150 active = [0, 1, 2, 3, 4] 5 features
t = 350 active = [0, 1, 2, 3, 4, 5, 6, 7] 8 features
t = 500 active = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] all 10
random shuffles the feature birth order randomly; ordered assigns births sequentially by feature index. Both divide the timeline into 10 buckets and distribute features evenly.
4.3 Capricious Data Stream (CDS)
At each timestep, every feature independently undergoes a Bernoulli trial with probability missing_ratio of being absent. This produces truly stochastic feature availability:
# For each timestep t:
rng_t = np.random.RandomState(random_seed + t) # deterministic per t
mask = rng_t.rand(d_max) > missing_ratio # Bernoulli trial
indices = np.where(mask)[0] # active features
if len(indices) == 0:
indices = np.array([0]) # safety: at least 1 feature
random_seed + t as its seed, ensuring the same pattern across runs while keeping consecutive timesteps independent.
4.4 Evolvable Data Stream (EDS)
The feature space is partitioned into n_segments groups. The stream alternates between stable periods (single partition active) and overlap transitions (two adjacent partitions active simultaneously):
EDS (n_segments=3, overlap_ratio=1.0, total=1000) L = 1000 / (3 + 1.0 * 2) = 200 Stage 0 (t = 0–199) P0 active [0, 1, 2, 3] Stage 1 (t = 200–399) P0 ∪ P1 active [0, 1, 2, 3, 4, 5, 6] ← overlap Stage 2 (t = 400–599) P1 active [4, 5, 6] Stage 3 (t = 600–799) P1 ∪ P2 active [4, 5, 6, 7, 8, 9] ← overlap Stage 4 (t = 800–999) P2 active [7, 8, 9] Visual timeline: P0 ████████ P0∪P1 ████████ P1 ████████ P1∪P2 ████████ P2 ████████ --------+--------+--------+--------+--------→ t 0 200 400 600 800 1000
The overlap period length is controlled by overlap_ratio relative to the stable period length L:
# Stable period length calculation:
L = total_instances / (n_segments + overlap_ratio * (n_segments - 1))
5. Data Flow
The typical pipeline stacks ShuffledStream for order randomization, then OpenFeatureStream for feature evolution:
Complete Data Flow (TDS mode example)
Disk (.arff/.csv/.libsvm)
│
▼
┌─────────────────────────────────────────────────────────┐
│ openmoa.datasets.Magic04() │
│ → instance.x = [v0, v1, v2, ..., v9] (fixed 10-d) │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────┐
│ ShuffledStream(base, random_seed=42) │
│ → Loads ALL instances into memory │
│ → Shuffles index array │
│ → Output: same instances, random order │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────┐
│ OpenFeatureStream(base, evolution_pattern="tds", ...) │
│ │
│ t=0: x=[v0,v1] feature_indices=[0,1] │
│ t=300: x=[v0,v1,..,v4] feature_indices=[0,1,2,3,4] │
│ t=900: x=[v0,v1,..,v9] feature_indices=[0,..,9] │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────────────────▼──────────────────────────────────┐
│ FOBOSClassifier │
│ │
│ _get_sparse_x(instance): │
│ → detects instance.feature_indices │
│ → returns (indices=[0,1,2,3,4], values=[v0,..,v4]) │
│ │
│ train(): │
│ → W[[0,1,2,3,4]] -= eta * grad * values │
│ → sparse update: only touched rows change │
└─────────────────────────────────────────────────────────┘
Instance Transformation Detail
Here is what happens inside OpenFeatureStream.next_instance() for a single call:
# Input from base_stream:
x_full = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6] # 6-dimensional
y_index = 1
# Step 1: Determine active features at time t
active_indices = _get_active_indices(t=100) # → [1, 3, 5] (TDS mode)
# Step 2: Slice the feature vector
x_subset = x_full[active_indices] # → [0.2, 0.4, 0.6] (3-d)
# Step 3: Build new instance with metadata
new_instance.x = [0.2, 0.4, 0.6] # physical 3-d vector
new_instance.y_index = 1
new_instance.feature_indices = [1, 3, 5] # ← global IDs for learner
[0.2, 0.4, 0.6] at positions 0, 1, 2 — the feature_indices=[1,3,5] tells the learner these correspond to global features 1, 3, 5. Weight updates target W[[1,3,5]] instead of W[[0,1,2]].
6. Core Logic Walkthrough
6.1 OpenFeatureStream._get_active_indices(t)
This is the decision engine of the entire module. Given a timestep t, it returns the array of active global feature IDs:
def _get_active_indices(self, t: int) -> np.ndarray:
# 1. Deterministic patterns: direct table lookup
if self.evolution_pattern in ["pyramid", "incremental", "decremental"]:
return self._feature_indices_cache[t]
# 2. TDS: all features whose birth_time ≤ t
elif self.evolution_pattern == "tds":
return np.where(self._feature_offsets <= t)[0]
# 3. CDS: Bernoulli trial per feature
elif self.evolution_pattern == "cds":
rng_t = np.random.RandomState(self.random_seed + t)
mask = rng_t.rand(self.d_max) > self.missing_ratio
indices = np.where(mask)[0]
if len(indices) == 0:
indices = np.array([0]) # safety fallback
return indices
# 4. EDS: find current stage, return partition(s)
elif self.evolution_pattern == "eds":
stage_idx = 0
for i, boundary in enumerate(self._eds_boundaries):
if t < boundary:
stage_idx = i
break
if stage_idx % 2 == 0: # even = stable
return self._eds_partitions[stage_idx // 2]
else: # odd = overlap
prev = (stage_idx - 1) // 2
return np.sort(np.concatenate([
self._eds_partitions[prev],
self._eds_partitions[prev + 1]
]))
t and constructor parameters, this method always returns the same result — making the entire stream fully reproducible.
6.2 OpenFeatureStream Constructor Parameters
| Parameter | Type | Range | Default | Description |
|---|---|---|---|---|
base_stream | Stream | — | — | The underlying data source to wrap |
d_min | int | [1, d_max] | 2 | Minimum number of active features |
d_max | int | [d_min, original_d] | None = original_d | Maximum feature space dimension |
evolution_pattern | Literal | pyramid / incremental / decremental / tds / cds / eds | "pyramid" | Evolution strategy |
total_instances | int | > 0 | 10000 | Stream length for timeline calculation |
feature_selection | Literal | prefix / suffix / random | "prefix" | How to select features in deterministic modes |
missing_ratio | float | [0.0, 1.0) | 0.0 | CDS feature absence probability |
tds_mode | Literal | random / ordered | "random" | TDS feature birth ordering |
n_segments | int | ≥ 2 | 2 | EDS partition count |
overlap_ratio | float | ≥ 0 | 1.0 | EDS overlap-to-stable period ratio |
random_seed | int | — | 42 | Random seed for reproducibility |
6.3 Method Reference — OpenFeatureStream
| Method | Visibility | Description |
|---|---|---|
__init__ | Public | Validate params, pre-compute schedules per pattern |
_generate_dimension_schedule | Private | Generate per-timestep target dimension array |
_generate_feature_indices | Private | Pre-compute feature index list for each timestep |
_generate_tds_offsets | Private | Assign "birth time" to each feature (TDS) |
_generate_eds_partitions | Private | Divide feature space into n disjoint subsets (EDS) |
_calculate_eds_boundaries | Private | Compute EDS stage time boundaries |
_get_active_indices | Private | Core engine: return active feature IDs for time t |
next_instance | Public | Return next evolved instance with metadata |
has_more_instances | Public | Check if more instances are available |
restart | Public | Reset stream state to beginning |
get_schema | Public | Return original Schema |
get_moa_stream | Public | Returns None (pure Python implementation) |
7. Design Patterns & Technical Highlights
Decorator Pattern
All wrappers accept a base_stream and can be nested: OpenFeatureStream(ShuffledStream(Magic04())). Extends behaviour without modifying the source.
Strategy Pattern
The evolution_pattern parameter selects algorithms at init time. Both initialization and runtime logic dispatch based on the chosen strategy.
Template Method
All wrappers share the Stream ABC interface: next_instance(), has_more_instances(), get_schema(), restart().
Pre-computation
Dimension schedules and index caches are computed during __init__, reducing per-instance cost to O(1) table lookups for deterministic patterns.
Dynamic Attributes
feature_indices is attached to instances at runtime via Python’s dynamic attribute mechanism. Downstream checks via hasattr().
Defensive Programming
Parameter validation, out-of-bounds guards, empty-result fallbacks (at least 1 feature), and safe time indexing with min(t, T-1).
Reproducibility Mechanism
# Global random state (for shuffle, TDS offsets, EDS partitions)
self._rng = np.random.RandomState(random_seed)
# Per-timestep local state (for CDS mode — independent across t)
rng_t = np.random.RandomState(self.random_seed + t)
mask = rng_t.rand(self.d_max) > self.missing_ratio
Using np.random.RandomState instead of global np.random isolates the wrapper from external random calls, ensuring deterministic reproducibility.
Performance Characteristics
| Operation | Init Cost | Per-Instance Cost | Memory |
|---|---|---|---|
| Pyramid / Inc / Dec | O(T) | O(1) lookup | O(T · d_max) |
| TDS | O(d_max) | O(d_max) scan | O(d_max) |
| CDS | O(1) | O(d_max) Bernoulli | O(1) |
| EDS | O(d_max + n) | O(n) boundary scan | O(d_max) |
| ShuffledStream | O(N) load all | O(1) index | O(N) instances |
8. Usage Examples
8.1 TDS Mode — Online Learning Pipeline
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream
from openmoa.classifier._fobos_classifier import FOBOSClassifier
# 1. Load dataset and shuffle
base = ShuffledStream(Magic04(), random_seed=42)
n_total = base.get_num_instances() # 19020
# 2. Wrap with TDS evolution
stream = OpenFeatureStream(
base_stream=base,
evolution_pattern="tds",
tds_mode="ordered",
d_min=2,
total_instances=n_total,
random_seed=42
)
# 3. Initialize learner
schema = stream.get_schema()
learner = FOBOSClassifier(
schema=schema,
alpha=1.0,
lambda_=0.0001,
regularization="l1"
)
# 4. Test-Then-Train loop
correct = 0
total = 0
while stream.has_more_instances():
instance = stream.next_instance()
# Observe feature evolution
print(f"t={total}: dim={len(instance.x)}, indices={instance.feature_indices}")
pred = learner.predict(instance)
if pred == instance.y_index:
correct += 1
learner.train(instance)
total += 1
print(f"Accuracy: {correct / total:.4f}")
8.2 CDS Mode — Random Feature Absence
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream
stream = OpenFeatureStream(
base_stream=ShuffledStream(Magic04()),
evolution_pattern="cds",
missing_ratio=0.4, # 40% feature absence
total_instances=19020,
random_seed=42
)
# Observe the stochastic missing pattern
for i in range(5):
inst = stream.next_instance()
all_features = set(range(10))
present = set(inst.feature_indices.tolist())
print(f"t={i}: present={sorted(present)}, missing={sorted(all_features - present)}")
# Output example:
# t=0: present=[0,2,4,5,7,9], missing=[1,3,6,8]
# t=1: present=[1,2,3,6,8,9], missing=[0,4,5,7]
# t=2: present=[0,1,3,5,6,7,8], missing=[2,4,9]
8.3 EDS Mode — Segmented Evolution
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream
stream = OpenFeatureStream(
base_stream=ShuffledStream(Magic04()),
evolution_pattern="eds",
n_segments=3, # 3 feature partitions
overlap_ratio=0.5, # overlap = 0.5 × stable period
total_instances=1000,
random_seed=42
)
# Track stage transitions
prev_key = None
for i in range(1000):
inst = stream.next_instance()
if inst is None:
break
key = tuple(inst.feature_indices)
if key != prev_key:
print(f"t={i}: New stage → features={key}")
prev_key = key
8.4 NaN-based Approach (TrapezoidalStream)
import numpy as np
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, TrapezoidalStream
# Fixed dimension + NaN approach
stream = TrapezoidalStream(
base_stream=ShuffledStream(Magic04()),
evolution_mode="ordered",
total_instances=1000
)
inst = stream.next_instance()
print(f"Vector: {inst.x}") # Full 10-d, some NaN
print(f"NaN count: {np.isnan(inst.x).sum()}")
print(f"Has feature_indices: {hasattr(inst, 'feature_indices')}") # False
9. Known Issues & Improvement Suggestions
ShuffledStream Memory Consumption
Medium RiskAll instances are loaded into memory at init. For GB-scale datasets, this will cause out-of-memory errors. Consider reservoir sampling or chunk-based shuffling for large-scale use.
# Current: loads everything
while base_stream.has_more_instances():
self._instances.append(base_stream.next_instance())
# Suggestion: reservoir sampling or memory-mapped approach
CDS Fallback Bias
Low RiskWhen missing_ratio is close to 1.0, the safety fallback always selects feature 0. This over-emphasizes a single feature.
# Current:
if len(indices) == 0:
indices = np.array([0]) # always feature 0
# Suggested:
if len(indices) == 0:
indices = np.array([rng_t.randint(0, self.d_max)]) # random fallback
Schema Inconsistency
Low Riskget_schema() returns the original d_max-dimensional Schema, but OpenFeatureStream may output vectors as small as d_min. Downstream code relying on Schema dimensions may encounter mismatches.
Magic Number in TDS
Low RiskThe TDS implementation hard-codes 10 as the number of time buckets. Consider extracting this as a named constant or parameter for better readability and configurability.
# Current:
time_step = self.total_instances // 10 # why 10?
# Suggested:
TDS_NUM_BUCKETS = 10
time_step = self.total_instances // TDS_NUM_BUCKETS
10. Quick Reference
Terminology
| Term | Full Name | Meaning |
|---|---|---|
| TDS | Trapezoidal Data Stream | Features appear gradually; dimension grows monotonically |
| CDS | Capricious Data Stream | Features randomly present/absent at each timestep |
| EDS | Evolvable Data Stream | Feature space replaces in segments with overlap periods |
| Feature Drift | — | Changes in available feature dimensions over time |
| Index Shift | — | Mismatch between local vector position and global feature ID |
| Prequential | Test-Then-Train | Evaluate on each instance before using it for training |
Related Files
| File | Path | Role |
|---|---|---|
| Stream ABC | src/openmoa/stream/_stream.py | Base class definition |
| Schema | src/openmoa/stream/_stream.py | Stream structure descriptor |
| Instance | src/openmoa/instance.py | Data instance types |
| FOBOS | src/openmoa/classifier/_fobos_classifier.py | Sparse-aware learner example |
| Package init | src/openmoa/stream/__init__.py | Public exports |
| Benchmarks | demo/demo_fobos_benchmark_binary.py | Full usage examples |
Dependencies
numpy
≥ 1.20 — Array ops, random state
typing
Built-in — Literal, Optional, List
openmoa.stream.Stream
ABC base class
openmoa.instance
LabeledInstance, RegressionInstance