Deep Dive ~20 min read

Stream Wrapper

Deep analysis of stream_wrapper.py — the core decorator module that transforms static datasets into dynamically evolving feature-space data streams.

src/openmoa/stream/stream_wrapper.py

1. Overview & Purpose

stream_wrapper.py is the core decorator module of the OpenMOA stream processing framework. It provides a suite of Stream Wrappers that convert static, fixed-dimensional datasets into dynamic data streams with evolving feature spaces — simulating real-world feature drift scenarios for online learning benchmarks.

Problems Solved

Label Sorting Bias ShuffledStream randomizes instance order to remove inherent label ordering
Varying Dimensionality OpenFeatureStream outputs variable-length vectors with global index mapping
Gradual Feature Arrival (TDS) TrapezoidalStream simulates features "born" progressively over time
Random Feature Absence (CDS) CapriciousStream simulates stochastic feature availability via Bernoulli trials
Feature Segment Evolution (EDS) EvolvableStream replaces entire feature partitions across time segments
Index Shift Problem feature_indices metadata ensures learners map local positions to global feature IDs

2. Architecture Position

Stream wrappers sit between the base stream layer and the learner layer, acting as transparent decorators that enrich the data pipeline:

                     ┌─────────────────────────┐
                     │       Datasets           │
                     │   (UCI / MOA / Custom)    │
                     └────────────┬──────────────┘
                                  │
                     ┌────────────▼──────────────┐
                     │      Base Streams         │
                     │  ARFF / CSV / LibSVM /    │
                     │  Numpy / MOA Generator    │
                     └────────────┬──────────────┘
                                  │
              ┌───────────────────▼───────────────────┐
              │                                       │
              │         Stream Wrappers  ★            │
              │                                       │
              │   ShuffledStream   OpenFeatureStream   │
              │   TrapezoidalStream  CapriciousStream  │
              │   EvolvableStream                      │
              │                                       │
              └───────────────────┬───────────────────┘
                                  │
                    ┌─────────────┼─────────────┐
                    │             │             │
           ┌───────▼───────┐ ┌───▼────┐ ┌──────▼──────┐
           │  MOA Learners │ │ Python │ │   Sklearn   │
           │  (Java/JPype) │ │Learners│ │   Wrappers  │
           └───────┬───────┘ └───┬────┘ └──────┬──────┘
                    │             │             │
                    └─────────────┼─────────────┘
                                  │
                     ┌────────────▼──────────────┐
                     │       Evaluation          │
                     │  Prequential / Holdout /  │
                     │  Classification / Drift   │
                     └───────────────────────────┘

3. Class Reference

The module defines five wrapper classes, all inheriting from the Stream abstract base class:

OpenFeatureStream

extends Stream

Variable-length output wrapper supporting 6 evolution modes (pyramid, incremental, decremental, TDS, CDS, EDS). Attaches feature_indices metadata to each instance.

Lines 7–288

TrapezoidalStream

extends Stream

Fixed-dimension + NaN-fill TDS simulator. Features appear gradually based on a priority ranking; inactive positions filled with np.nan.

Lines 290–457

CapriciousStream

extends Stream

Fixed-dimension + NaN-fill CDS simulator. Each feature undergoes independent Bernoulli trial at every timestep to determine availability.

Lines 460–572

EvolvableStream

extends Stream

Fixed-dimension + NaN-fill EDS simulator. Feature space is partitioned and activated in segments with configurable overlap transitions.

Lines 576–772

ShuffledStream

extends Stream

In-memory buffer + random shuffle wrapper. Loads all instances at init, then serves them in randomized order to eliminate label sorting bias.

Lines 776–836

Two Approaches: Variable vs. Fixed Dimension

OpenFeatureStream

Variable Dimension
Output dimension= number of active features
Missing featuresNot in vector at all
feature_indicesAttached as metadata
Target algorithmsSparse-aware (FOBOS, ORF3V)
Memory efficiencyHigher (short vectors)

Trapezoidal / Capricious / Evolvable

Fixed Dimension
Output dimension= d_max (constant)
Missing featuresnp.nan placeholder
feature_indicesNot provided
Target algorithmsNaN-handling (OVFM, OSLMF)
Memory efficiencyLower (full-length vectors)

4. Evolution Patterns

Six distinct evolution patterns are supported, each modelling a different real-world feature drift scenario:

4.1 Dimension-Schedule Patterns

These three patterns use a pre-computed dimension schedule — the number of active features varies deterministically with time.

Pyramid

Features increase then decrease symmetrically
Dimension over time:
d_max ─      ╱╲
            ╱  ╲
d_min ─ ___╱    ╲___
        0   T/2   T

Incremental

Features appear monotonically over time
Dimension over time:
d_max ─           ___
                 ╱
               ╱
d_min ─ _____╱
        0           T

Decremental

Features disappear monotonically over time
Dimension over time:
d_max ─ ___
           ╲
             ╲
d_min ─       ╲_____
        0           T

For each timestep, the target dimension d(t) is computed via np.linspace, and the specific feature subset is determined by the feature_selection strategy:

StrategySelection RuleExample (d=3, d_max=10)
prefixFirst d features[0, 1, 2]
suffixLast d features[7, 8, 9]
randomRandom d features (seeded)[2, 5, 8]

4.2 Trapezoidal Data Stream (TDS)

Each feature is assigned a birth time. Features become active once the stream passes their birth time, and remain active permanently — so dimensionality increases monotonically.

TDS Ordered Mode (d_max=10, total_instances=1000)

Feature:     0  1  2  3  4  5  6  7  8  9
Birth time:  0  0  0  100 100 200 200 300 300 400
             |________|________|________|________|
              Bucket 0  Bucket 1 Bucket 2 Bucket 3

Timeline:
t = 0     active = [0, 1, 2]                          3 features
t = 150   active = [0, 1, 2, 3, 4]                    5 features
t = 350   active = [0, 1, 2, 3, 4, 5, 6, 7]           8 features
t = 500   active = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]    all 10
Two TDS modes: random shuffles the feature birth order randomly; ordered assigns births sequentially by feature index. Both divide the timeline into 10 buckets and distribute features evenly.

4.3 Capricious Data Stream (CDS)

At each timestep, every feature independently undergoes a Bernoulli trial with probability missing_ratio of being absent. This produces truly stochastic feature availability:

# For each timestep t:
rng_t = np.random.RandomState(random_seed + t)  # deterministic per t
mask = rng_t.rand(d_max) > missing_ratio         # Bernoulli trial
indices = np.where(mask)[0]                     # active features
if len(indices) == 0:
    indices = np.array([0])                     # safety: at least 1 feature
Reproducibility: Each timestep uses random_seed + t as its seed, ensuring the same pattern across runs while keeping consecutive timesteps independent.

4.4 Evolvable Data Stream (EDS)

The feature space is partitioned into n_segments groups. The stream alternates between stable periods (single partition active) and overlap transitions (two adjacent partitions active simultaneously):

EDS (n_segments=3, overlap_ratio=1.0, total=1000)
L = 1000 / (3 + 1.0 * 2) = 200

Stage 0  (t = 0–199)   P0 active        [0, 1, 2, 3]
Stage 1  (t = 200–399) P0 ∪ P1 active   [0, 1, 2, 3, 4, 5, 6]   ← overlap
Stage 2  (t = 400–599) P1 active        [4, 5, 6]
Stage 3  (t = 600–799) P1 ∪ P2 active   [4, 5, 6, 7, 8, 9]      ← overlap
Stage 4  (t = 800–999) P2 active        [7, 8, 9]

Visual timeline:
  P0    ████████
  P0∪P1         ████████
  P1                    ████████
  P1∪P2                         ████████
  P2                                    ████████
        --------+--------+--------+--------+--------→ t
        0      200      400      600      800     1000

The overlap period length is controlled by overlap_ratio relative to the stable period length L:

# Stable period length calculation:
L = total_instances / (n_segments + overlap_ratio * (n_segments - 1))

5. Data Flow

The typical pipeline stacks ShuffledStream for order randomization, then OpenFeatureStream for feature evolution:

Complete Data Flow (TDS mode example)

  Disk (.arff/.csv/.libsvm)
         │
         ▼
  ┌─────────────────────────────────────────────────────────┐
  │  openmoa.datasets.Magic04()                             │
  │  → instance.x = [v0, v1, v2, ..., v9]   (fixed 10-d)  │
  └──────────────────────┬──────────────────────────────────┘
                         │
  ┌──────────────────────▼──────────────────────────────────┐
  │  ShuffledStream(base, random_seed=42)                   │
  │  → Loads ALL instances into memory                      │
  │  → Shuffles index array                                 │
  │  → Output: same instances, random order                 │
  └──────────────────────┬──────────────────────────────────┘
                         │
  ┌──────────────────────▼──────────────────────────────────┐
  │  OpenFeatureStream(base, evolution_pattern="tds", ...)  │
  │                                                         │
  │  t=0:   x=[v0,v1]          feature_indices=[0,1]       │
  │  t=300: x=[v0,v1,..,v4]    feature_indices=[0,1,2,3,4] │
  │  t=900: x=[v0,v1,..,v9]    feature_indices=[0,..,9]    │
  └──────────────────────┬──────────────────────────────────┘
                         │
  ┌──────────────────────▼──────────────────────────────────┐
  │  FOBOSClassifier                                        │
  │                                                         │
  │  _get_sparse_x(instance):                               │
  │    → detects instance.feature_indices                   │
  │    → returns (indices=[0,1,2,3,4], values=[v0,..,v4])   │
  │                                                         │
  │  train():                                               │
  │    → W[[0,1,2,3,4]] -= eta * grad * values              │
  │    → sparse update: only touched rows change            │
  └─────────────────────────────────────────────────────────┘

Instance Transformation Detail

Here is what happens inside OpenFeatureStream.next_instance() for a single call:

# Input from base_stream:
x_full = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]  # 6-dimensional
y_index = 1

# Step 1: Determine active features at time t
active_indices = _get_active_indices(t=100)  # → [1, 3, 5]  (TDS mode)

# Step 2: Slice the feature vector
x_subset = x_full[active_indices]             # → [0.2, 0.4, 0.6]  (3-d)

# Step 3: Build new instance with metadata
new_instance.x = [0.2, 0.4, 0.6]              # physical 3-d vector
new_instance.y_index = 1
new_instance.feature_indices = [1, 3, 5]     # ← global IDs for learner
Index Shift Problem: Although the physical vector is [0.2, 0.4, 0.6] at positions 0, 1, 2 — the feature_indices=[1,3,5] tells the learner these correspond to global features 1, 3, 5. Weight updates target W[[1,3,5]] instead of W[[0,1,2]].

6. Core Logic Walkthrough

6.1 OpenFeatureStream._get_active_indices(t)

This is the decision engine of the entire module. Given a timestep t, it returns the array of active global feature IDs:

def _get_active_indices(self, t: int) -> np.ndarray:
    # 1. Deterministic patterns: direct table lookup
    if self.evolution_pattern in ["pyramid", "incremental", "decremental"]:
        return self._feature_indices_cache[t]

    # 2. TDS: all features whose birth_time ≤ t
    elif self.evolution_pattern == "tds":
        return np.where(self._feature_offsets <= t)[0]

    # 3. CDS: Bernoulli trial per feature
    elif self.evolution_pattern == "cds":
        rng_t = np.random.RandomState(self.random_seed + t)
        mask = rng_t.rand(self.d_max) > self.missing_ratio
        indices = np.where(mask)[0]
        if len(indices) == 0:
            indices = np.array([0])  # safety fallback
        return indices

    # 4. EDS: find current stage, return partition(s)
    elif self.evolution_pattern == "eds":
        stage_idx = 0
        for i, boundary in enumerate(self._eds_boundaries):
            if t < boundary:
                stage_idx = i
                break
        if stage_idx % 2 == 0:       # even = stable
            return self._eds_partitions[stage_idx // 2]
        else:                        # odd  = overlap
            prev = (stage_idx - 1) // 2
            return np.sort(np.concatenate([
                self._eds_partitions[prev],
                self._eds_partitions[prev + 1]
            ]))
Pure Function: For identical t and constructor parameters, this method always returns the same result — making the entire stream fully reproducible.

6.2 OpenFeatureStream Constructor Parameters

ParameterTypeRangeDefaultDescription
base_streamStream The underlying data source to wrap
d_minint[1, d_max]2 Minimum number of active features
d_maxint[d_min, original_d]None = original_d Maximum feature space dimension
evolution_patternLiteralpyramid / incremental / decremental / tds / cds / eds"pyramid" Evolution strategy
total_instancesint> 010000 Stream length for timeline calculation
feature_selectionLiteralprefix / suffix / random"prefix" How to select features in deterministic modes
missing_ratiofloat[0.0, 1.0)0.0 CDS feature absence probability
tds_modeLiteralrandom / ordered"random" TDS feature birth ordering
n_segmentsint≥ 22 EDS partition count
overlap_ratiofloat≥ 01.0 EDS overlap-to-stable period ratio
random_seedint42 Random seed for reproducibility

6.3 Method Reference — OpenFeatureStream

MethodVisibilityDescription
__init__PublicValidate params, pre-compute schedules per pattern
_generate_dimension_schedulePrivateGenerate per-timestep target dimension array
_generate_feature_indicesPrivatePre-compute feature index list for each timestep
_generate_tds_offsetsPrivateAssign "birth time" to each feature (TDS)
_generate_eds_partitionsPrivateDivide feature space into n disjoint subsets (EDS)
_calculate_eds_boundariesPrivateCompute EDS stage time boundaries
_get_active_indicesPrivateCore engine: return active feature IDs for time t
next_instancePublicReturn next evolved instance with metadata
has_more_instancesPublicCheck if more instances are available
restartPublicReset stream state to beginning
get_schemaPublicReturn original Schema
get_moa_streamPublicReturns None (pure Python implementation)

7. Design Patterns & Technical Highlights

Decorator Pattern

All wrappers accept a base_stream and can be nested: OpenFeatureStream(ShuffledStream(Magic04())). Extends behaviour without modifying the source.

Strategy Pattern

The evolution_pattern parameter selects algorithms at init time. Both initialization and runtime logic dispatch based on the chosen strategy.

Template Method

All wrappers share the Stream ABC interface: next_instance(), has_more_instances(), get_schema(), restart().

Pre-computation

Dimension schedules and index caches are computed during __init__, reducing per-instance cost to O(1) table lookups for deterministic patterns.

Dynamic Attributes

feature_indices is attached to instances at runtime via Python’s dynamic attribute mechanism. Downstream checks via hasattr().

Defensive Programming

Parameter validation, out-of-bounds guards, empty-result fallbacks (at least 1 feature), and safe time indexing with min(t, T-1).

Reproducibility Mechanism

# Global random state (for shuffle, TDS offsets, EDS partitions)
self._rng = np.random.RandomState(random_seed)

# Per-timestep local state (for CDS mode — independent across t)
rng_t = np.random.RandomState(self.random_seed + t)
mask = rng_t.rand(self.d_max) > self.missing_ratio

Using np.random.RandomState instead of global np.random isolates the wrapper from external random calls, ensuring deterministic reproducibility.

Performance Characteristics

OperationInit CostPer-Instance CostMemory
Pyramid / Inc / DecO(T)O(1) lookupO(T · d_max)
TDSO(d_max)O(d_max) scanO(d_max)
CDSO(1)O(d_max) BernoulliO(1)
EDSO(d_max + n)O(n) boundary scanO(d_max)
ShuffledStreamO(N) load allO(1) indexO(N) instances

8. Usage Examples

8.1 TDS Mode — Online Learning Pipeline

tds_pipeline.py
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream
from openmoa.classifier._fobos_classifier import FOBOSClassifier

# 1. Load dataset and shuffle
base = ShuffledStream(Magic04(), random_seed=42)
n_total = base.get_num_instances()  # 19020

# 2. Wrap with TDS evolution
stream = OpenFeatureStream(
    base_stream=base,
    evolution_pattern="tds",
    tds_mode="ordered",
    d_min=2,
    total_instances=n_total,
    random_seed=42
)

# 3. Initialize learner
schema = stream.get_schema()
learner = FOBOSClassifier(
    schema=schema,
    alpha=1.0,
    lambda_=0.0001,
    regularization="l1"
)

# 4. Test-Then-Train loop
correct = 0
total = 0
while stream.has_more_instances():
    instance = stream.next_instance()

    # Observe feature evolution
    print(f"t={total}: dim={len(instance.x)}, indices={instance.feature_indices}")

    pred = learner.predict(instance)
    if pred == instance.y_index:
        correct += 1
    learner.train(instance)
    total += 1

print(f"Accuracy: {correct / total:.4f}")

8.2 CDS Mode — Random Feature Absence

cds_example.py
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream

stream = OpenFeatureStream(
    base_stream=ShuffledStream(Magic04()),
    evolution_pattern="cds",
    missing_ratio=0.4,          # 40% feature absence
    total_instances=19020,
    random_seed=42
)

# Observe the stochastic missing pattern
for i in range(5):
    inst = stream.next_instance()
    all_features = set(range(10))
    present = set(inst.feature_indices.tolist())
    print(f"t={i}: present={sorted(present)}, missing={sorted(all_features - present)}")

# Output example:
# t=0: present=[0,2,4,5,7,9], missing=[1,3,6,8]
# t=1: present=[1,2,3,6,8,9], missing=[0,4,5,7]
# t=2: present=[0,1,3,5,6,7,8], missing=[2,4,9]

8.3 EDS Mode — Segmented Evolution

eds_example.py
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, OpenFeatureStream

stream = OpenFeatureStream(
    base_stream=ShuffledStream(Magic04()),
    evolution_pattern="eds",
    n_segments=3,           # 3 feature partitions
    overlap_ratio=0.5,      # overlap = 0.5 × stable period
    total_instances=1000,
    random_seed=42
)

# Track stage transitions
prev_key = None
for i in range(1000):
    inst = stream.next_instance()
    if inst is None:
        break
    key = tuple(inst.feature_indices)
    if key != prev_key:
        print(f"t={i}: New stage → features={key}")
        prev_key = key

8.4 NaN-based Approach (TrapezoidalStream)

nan_approach.py
import numpy as np
from openmoa.datasets import Magic04
from openmoa.stream import ShuffledStream, TrapezoidalStream

# Fixed dimension + NaN approach
stream = TrapezoidalStream(
    base_stream=ShuffledStream(Magic04()),
    evolution_mode="ordered",
    total_instances=1000
)

inst = stream.next_instance()
print(f"Vector:   {inst.x}")           # Full 10-d, some NaN
print(f"NaN count: {np.isnan(inst.x).sum()}")
print(f"Has feature_indices: {hasattr(inst, 'feature_indices')}")  # False

9. Known Issues & Improvement Suggestions

ShuffledStream Memory Consumption

Medium Risk

All instances are loaded into memory at init. For GB-scale datasets, this will cause out-of-memory errors. Consider reservoir sampling or chunk-based shuffling for large-scale use.

# Current: loads everything
while base_stream.has_more_instances():
    self._instances.append(base_stream.next_instance())

# Suggestion: reservoir sampling or memory-mapped approach

CDS Fallback Bias

Low Risk

When missing_ratio is close to 1.0, the safety fallback always selects feature 0. This over-emphasizes a single feature.

# Current:
if len(indices) == 0:
    indices = np.array([0])          # always feature 0

# Suggested:
if len(indices) == 0:
    indices = np.array([rng_t.randint(0, self.d_max)])  # random fallback

Schema Inconsistency

Low Risk

get_schema() returns the original d_max-dimensional Schema, but OpenFeatureStream may output vectors as small as d_min. Downstream code relying on Schema dimensions may encounter mismatches.

Magic Number in TDS

Low Risk

The TDS implementation hard-codes 10 as the number of time buckets. Consider extracting this as a named constant or parameter for better readability and configurability.

# Current:
time_step = self.total_instances // 10   # why 10?

# Suggested:
TDS_NUM_BUCKETS = 10
time_step = self.total_instances // TDS_NUM_BUCKETS

10. Quick Reference

Terminology

TermFull NameMeaning
TDSTrapezoidal Data StreamFeatures appear gradually; dimension grows monotonically
CDSCapricious Data StreamFeatures randomly present/absent at each timestep
EDSEvolvable Data StreamFeature space replaces in segments with overlap periods
Feature DriftChanges in available feature dimensions over time
Index ShiftMismatch between local vector position and global feature ID
PrequentialTest-Then-TrainEvaluate on each instance before using it for training

Related Files

FilePathRole
Stream ABCsrc/openmoa/stream/_stream.pyBase class definition
Schemasrc/openmoa/stream/_stream.pyStream structure descriptor
Instancesrc/openmoa/instance.pyData instance types
FOBOSsrc/openmoa/classifier/_fobos_classifier.pySparse-aware learner example
Package initsrc/openmoa/stream/__init__.pyPublic exports
Benchmarksdemo/demo_fobos_benchmark_binary.pyFull usage examples

Dependencies

numpy ≥ 1.20 — Array ops, random state
typing Built-in — Literal, Optional, List
openmoa.stream.Stream ABC base class
openmoa.instance LabeledInstance, RegressionInstance