Skip to content

API Reference

Synchronous datasets

ProfiledDataset

apairo.core.profiled_dataset.ProfiledDataset

Bases: SynchronousDataset, ConfigurableDataset

Synchronous dataset driven by a YAML structural profile.

Subclasses declare a _profile class attribute pointing to a YAML file (relative to apairo/dataset/profiles/ or an absolute path). The profile describes the directory layout, file extensions, dtypes, and any type transformations. All file discovery, loading, split filtering, and derived key resolution are handled automatically.

Example

Minimal subclass::

class MyDataset(ProfiledDataset):
    _profile = "my_dataset.yaml"

Usage::

ds = MyDataset("/data/my_dataset", keys=["lidar", "labels"], split="train")
sample = ds[0]
# sample.data["lidar"]  -> np.ndarray
# sample.data["labels"] -> np.ndarray

Attributes:

Name Type Description
available_keys FrozenSet[str]

Frozenset of key names declared in the profile. Populated at class definition time from the YAML file.

See Also

YAML Profiles <https://apairo-robotics.github.io/apairo/datasets/yaml-profiles/>_ for the full profile specification.

Source code in apairo/core/profiled_dataset.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
class ProfiledDataset(SynchronousDataset, ConfigurableDataset):
    """Synchronous dataset driven by a YAML structural profile.

    Subclasses declare a `_profile` class attribute pointing to a YAML file
    (relative to `apairo/dataset/profiles/` or an absolute path).  The profile
    describes the directory layout, file extensions, dtypes, and any type
    transformations.  All file discovery, loading, split filtering, and derived
    key resolution are handled automatically.

    Example:
        Minimal subclass::

            class MyDataset(ProfiledDataset):
                _profile = "my_dataset.yaml"

        Usage::

            ds = MyDataset("/data/my_dataset", keys=["lidar", "labels"], split="train")
            sample = ds[0]
            # sample.data["lidar"]  -> np.ndarray
            # sample.data["labels"] -> np.ndarray

    Attributes:
        available_keys: Frozenset of key names declared in the profile.
            Populated at class definition time from the YAML file.

    See Also:
        `YAML Profiles <https://apairo-robotics.github.io/apairo/datasets/yaml-profiles/>`_
        for the full profile specification.
    """

    _profile: str

    def __init_subclass__(cls, **kwargs: object) -> None:
        super().__init_subclass__(**kwargs)
        profile_attr = cls.__dict__.get("_profile")
        if profile_attr:
            p = Path(profile_attr)
            profile_path = p if p.is_absolute() else _PROFILES_DIR / p
            if profile_path.exists():
                with open(profile_path) as f:
                    raw = yaml.safe_load(f)
                cls.available_keys = frozenset(raw.get("modalities", {}).keys())

    def __init__(
        self,
        root_dir: str | Path,
        keys: list[str] | None = None,
        split: str | None = None,
        sequence_ids: list[str] | None = None,
    ) -> None:
        profile_path = (
            Path(self._profile)
            if Path(self._profile).is_absolute()
            else _PROFILES_DIR / self._profile
        )
        with open(profile_path) as f:
            raw = yaml.safe_load(f)

        self._modalities: dict[str, ModalitySpec] = {
            k: ModalitySpec.from_dict(k, v) for k, v in raw["modalities"].items()
        }
        self._layers: list[LayerSpec] = _parse_layers(raw["layers"])

        layer_types = [layer.type for layer in self._layers]
        self._modality_layer_idx: int = layer_types.index("modality")
        seq_idx = (
            layer_types.index("sequence")
            if "sequence" in layer_types
            else len(self._layers) - 1
        )
        self._seq_depth: int = len(self._layers) - seq_idx

        self._root = Path(root_dir)
        self._split_filter = split
        self._sequence_ids_filter: frozenset[str] | None = (
            frozenset(sequence_ids) if sequence_ids is not None else None
        )

        self._splits_spec: SplitSpec | None = _parse_splits_spec(raw.get("splits", {}))
        self._frame_filter: set[tuple[str, str]] | None = None
        if (
            split is not None
            and self._splits_spec is not None
            and self._splits_spec.type == "lst"
        ):
            lst_rel = self._splits_spec.files.get(split)
            if lst_rel is None:
                available = list(self._splits_spec.files.keys())
                raise ValueError(
                    f"Split '{split}' not declared in profile. Available: {available}"
                )
            self._frame_filter = _read_lst_frame_set(self._root / lst_rel)

        # .apairo is the source of truth: raw channels present + preprocessed channels created.
        config = self._load_or_create_config(self._root)
        channels: dict = config.get("channels", {})

        if keys is None:
            keys = [
                k
                for k, v in channels.items()
                if v.get("kind", "raw") == "raw" and not self._modalities[k].optional
            ]

        # Classify each requested key.
        raw_keys: list[str] = []
        derived_keys: list[str] = []
        for k in keys:
            ch = channels.get(k)
            if ch is None:
                # Not in .apairo — allow if it is a profile key (raw, not yet scanned).
                if k not in self._modalities:
                    raise KeyError(
                        f"Key '{k}' is not available in '{self._root}'. "
                        f"Available: {sorted(channels)}. "
                        f"Register preprocessed channels with "
                        f"{type(self).__name__}.register_channel()."
                    )
                raw_keys.append(k)
            elif ch.get("kind", "raw") == "raw":
                raw_keys.append(k)
            else:
                derived_keys.append(k)

        self._set_keys(list(keys))
        self._files: dict[str, list[Path]] = {}
        self._loaders: dict[str, _PerFrameLoader | TXTLoader] = {}
        self._ref_key: str | None = None

        for key in raw_keys:
            spec = self._modalities[key]
            if spec.is_sequence_file:
                paths = self._discover_sequence_files(key)
                if not paths and not spec.optional:
                    raise FileNotFoundError(
                        f"Key '{key}': no '{self._mapped_name(key)}{spec.ext}' "
                        f"files found under {self._root}."
                    )
                if paths:
                    self._loaders[key] = TXTLoader(paths, spec.reshape)
            else:
                paths = self._discover_native(key)
                if not paths and not spec.optional:
                    raise FileNotFoundError(
                        f"Key '{key}' declared in profile but no files found under {self._root}."
                    )
                if paths:
                    self._files[key] = paths
                    self._loaders[key] = _PerFrameLoader(paths, spec)
                    if self._ref_key is None:
                        self._ref_key = key

        frame_counts = {k: len(v) for k, v in self._loaders.items()}
        if len(set(frame_counts.values())) > 1:
            raise ValueError(f"Mismatched frame counts per key: {frame_counts}")

        self._modality_idx: int = self._modality_layer_idx
        if self._ref_key and self._files.get(self._ref_key):
            first = self._files[self._ref_key][0]
            rel_parts = first.relative_to(self._root).parts
            mapped = self._mapped_name(self._ref_key)
            if mapped in rel_parts:
                self._modality_idx = rel_parts.index(mapped)

        for key in derived_keys:
            loader = channels[key]["loader"]
            ext = "npy" if loader in ("npys", "npys_img", "npy") else loader
            paths = self._discover_derived(key, ext)
            spec = ModalitySpec(ext=f".{ext}", loader=ext)
            self._loaders[key] = _PerFrameLoader(paths, spec)

        # If no native key was loaded (e.g. preprocessing a derived channel),
        # fall back to the first derived key as the path reference so that
        # derived_path() can resolve output locations.
        if self._ref_key is None:
            for key in derived_keys:
                loader = self._loaders.get(key)
                if loader is not None and loader.paths:
                    self._files[key] = loader.paths
                    self._ref_key = key
                    first = self._files[self._ref_key][0]
                    rel_parts = first.relative_to(self._root).parts
                    mapped = self._mapped_name(self._ref_key)
                    if mapped in rel_parts:
                        self._modality_idx = rel_parts.index(mapped)
                    break

        self._set_keys([k for k in keys if k in self._loaders])

        self._seq_groups: dict[str, list[int]] = {}
        anchor = (
            self._files.get(self._ref_key)
            if self._ref_key
            else next(
                (
                    v.paths
                    for v in self._loaders.values()
                    if isinstance(v, _PerFrameLoader)
                ),
                None,
            )
        )
        if anchor:
            for i, path in enumerate(anchor):
                seq_name = self._seq_root(path).name
                self._seq_groups.setdefault(seq_name, []).append(i)

    def _seq_root(self, path: Path) -> Path:
        d = path
        for _ in range(self._seq_depth):
            d = d.parent
        return d

    def derived_path(self, idx: int, key: str, ext: str) -> Path:
        ref = self._files[self._ref_key][idx]
        rel = ref.relative_to(self._root)
        parts = list(rel.parts)
        src_spec = self._modalities.get(self._ref_key)
        n = len(src_spec.effective_subpath(self._ref_key)) if src_spec else 1
        parts[self._modality_idx : self._modality_idx + n] = [key]
        parts[-1] = f"{ref.stem}.{ext}"
        return self._root / Path(*parts)

    def _is_present(self, root_dir: Path, key: str) -> bool:
        spec = self._modalities[key]
        mapped = self._mapped_name(key)
        fixed_parts = [layer.value for layer in self._layers if layer.type == "fixed"]
        if spec.is_sequence_file:
            return any(root_dir.glob(f"**/{mapped}{spec.ext}"))
        if fixed_parts:
            prefix = Path(*fixed_parts)
            return any(root_dir.glob(str(prefix / "**" / mapped / f"*{spec.ext}")))
        return any(root_dir.glob(f"**/{mapped}/**/*{spec.ext}"))

    def _bootstrap_config(self, root_dir: Path) -> dict:
        channels = {}
        for key in sorted(self.available_keys):
            if self._is_present(root_dir, key):
                spec = self._modalities[key]
                loader = spec.loader or _EXT_TO_LOADER.get(spec.ext, "bin")
                channels[key] = {"loader": loader, "has_timestamps": False}
        return {"version": 1, "channels": channels}

    def _mapped_name(self, key: str) -> str:
        layer = self._layers[self._modality_layer_idx]
        if isinstance(layer.value, dict):
            return layer.value.get(key, key)
        return key

    def _discover_sequence_files(self, key: str) -> list[Path]:
        """Find sequence-level files (one per sequence, not per frame)."""
        spec = self._modalities[key]
        fixed_parts = [layer.value for layer in self._layers if layer.type == "fixed"]
        mapped = self._mapped_name(key)

        if fixed_parts:
            prefix = Path(*fixed_parts)
            pattern = str(prefix / f"**/{mapped}{spec.ext}")
        else:
            pattern = f"**/{mapped}{spec.ext}"

        paths = sorted(self._root.glob(pattern))
        if self._sequence_ids_filter is not None:
            paths = [p for p in paths if p.parent.name in self._sequence_ids_filter]
        return paths

    def _discover_derived(self, key: str, ext: str) -> list[Path]:
        fixed_parts = [layer.value for layer in self._layers if layer.type == "fixed"]
        if fixed_parts:
            prefix = Path(*fixed_parts)
            pattern = str(prefix / "**" / key / f"*.{ext}")
        else:
            pattern = f"**/{key}/**/*.{ext}"

        files = sorted(self._root.glob(pattern))
        if self._split_filter:
            files = [
                f
                for f in files
                if self._split_filter in f.relative_to(self._root).parts
            ]
        if self._sequence_ids_filter is not None:
            files = [
                f for f in files if self._seq_root(f).name in self._sequence_ids_filter
            ]
        if self._frame_filter is not None:
            files = [
                f
                for f in files
                if (self._seq_root(f).name, f.stem) in self._frame_filter
            ]
        if not files:
            raise FileNotFoundError(
                f"Derived key '{key}': no .{ext} files found under '{self._root}'. "
                f"Run run_preprocess(...) to generate them."
            )
        return files

    def _discover_native(self, key: str) -> list[Path]:
        spec = self._modalities[key]
        fixed_parts = [layer.value for layer in self._layers if layer.type == "fixed"]
        mapped = self._mapped_name(key)

        if fixed_parts:
            prefix = Path(*fixed_parts)
            pattern = str(prefix / "**" / mapped / f"*{spec.ext}")
        else:
            pattern = f"**/{mapped}/**/*{spec.ext}"

        files = sorted(self._root.glob(pattern))

        if self._split_filter:
            split_layer = next(
                (layer for layer in self._layers if layer.type == "split"), None
            )
            if split_layer is not None:
                files = [
                    f
                    for f in files
                    if self._split_filter in f.relative_to(self._root).parts
                ]
        if self._sequence_ids_filter is not None:
            files = [
                f for f in files if self._seq_root(f).name in self._sequence_ids_filter
            ]
        if self._frame_filter is not None:
            files = [
                f
                for f in files
                if (self._seq_root(f).name, f.stem) in self._frame_filter
            ]
        return files

    @property
    def loaders(self) -> dict:
        """Per-channel loaders, indexed by global frame index."""
        return self._loaders

    def __len__(self) -> int:
        if not self._loaders:
            return 0
        return len(next(iter(self._loaders.values())))

    def describe(self, sequence_id: str | None = None) -> dict:
        """Describe available channels for this dataset.

        Reads ``.apairo`` at the dataset root (creating it if absent) and
        cross-references it with the profile's declared modalities to show
        which raw channels are present or missing, and which preprocessed
        channels have been registered.

        Args:
            sequence_id: Optional sequence identifier -- used as the display
                label only. Channel availability is dataset-wide.

        Returns:
            ``{"raw": {"present": [...], "missing": [...]}, "preprocess": {...}}``

        Example::

            ds = Rellis3DDataset("/data/RELLIS")
            ds.describe("00000")
        """
        from apairo.core.config import config_exists, read_config

        # Raw channels: probe filesystem directly — .apairo only stores preprocessed ones.
        raw_present = sorted(
            k for k in self.available_keys if self._is_present(self._root, k)
        )
        raw_missing = sorted(
            k for k in self.available_keys if not self._is_present(self._root, k)
        )

        preprocess = {}
        if config_exists(self._root):
            config = read_config(self._root)
            preprocess = {
                k: v
                for k, v in config.get("channels", {}).items()
                if v.get("kind") == "preprocess"
            }

        label = sequence_id if sequence_id is not None else self._root.name
        print(f"\n{type(self).__name__} -- {label}")
        print("─" * 50)
        print("Raw channels")
        if raw_present:
            print("  present  :", ", ".join(raw_present))
        if raw_missing:
            print("  missing  :", ", ".join(raw_missing))
        if not raw_present and not raw_missing:
            print("  (none)")
        print("Preprocessed channels")
        if preprocess:
            for key, meta in sorted(preprocess.items()):
                ts_info = (
                    f"<- timestamps from {meta['timestamps_from']}"
                    if "timestamps_from" in meta
                    else "<- own timestamps"
                )
                src_info = (
                    f"  sources: {meta['sources']}" if meta.get("sources") else ""
                )
                print(f"  {key:<20} {meta['loader']:<6} {ts_info}{src_info}")
        else:
            print("  (none)")
        print()
        return {
            "raw": {"present": raw_present, "missing": raw_missing},
            "preprocess": preprocess,
        }

    @property
    def splits(self) -> list[str]:
        if self._splits_spec is not None:
            return list(self._splits_spec.files.keys())
        for layer in self._layers:
            if layer.type == "split" and isinstance(layer.value, list):
                return list(layer.value)
        return []

    def split(self, name: str) -> "ProfiledDataset":
        """Return a new dataset instance filtered to the named split."""
        return type(self)(
            self._root,
            keys=list(self._keys),
            split=name,
            sequence_ids=list(self._sequence_ids_filter)
            if self._sequence_ids_filter
            else None,
        )

    @property
    def sequence_ids(self) -> list[str]:
        return list(self._seq_groups.keys())

    @property
    def frame_sequence_ids(self) -> np.ndarray:
        """Sequence ID for every frame, indexed by global frame index.

        Returns a string array of shape ``(len(self),)`` where
        ``frame_sequence_ids[i]`` is the sequence ID that frame ``i`` belongs
        to.  Combined with :attr:`FilteredView.indices`, this lets you split a
        pre-filtered dataset by sequence without a second disk sweep::

            ds_filtered = ds.filter("trav_gt", HasMinPositives(min_pos))
            seq_ids = ds.frame_sequence_ids[ds_filtered.indices]

            for train_seqs, val_seqs in folds:
                train_idx = np.where(np.isin(seq_ids, train_seqs))[0]
                val_idx   = np.where(np.isin(seq_ids, val_seqs))[0]
                ds_train  = ds_filtered.filter(train_idx)
                ds_val    = ds_filtered.filter(val_idx)
        """
        result = np.empty(len(self), dtype=object)
        for seq_id, indices in self._seq_groups.items():
            result[indices] = seq_id
        return result

    @property
    def frame_stems(self) -> np.ndarray:
        """Filename stem for every frame, indexed by global frame index."""
        result = np.empty(len(self), dtype=object)
        anchor = self._files.get(self._ref_key)
        if anchor:
            for i, path in enumerate(anchor):
                result[i] = path.stem
        return result

    def filter_split(self, name: str) -> "AbstractDataset":
        """Return a FilteredView restricted to the named predefined split.

        Applies the split without re-instantiating the dataset — registered
        transforms are preserved.  Replaces :meth:`split` for use mid-chain::

            ds.transform("lidar", RobotFilter())
            ds_train = ds.filter_split("train")  # transforms kept
        """
        return _apply_lst_filter(self, self._lst_frame_filter(name))

    def _lst_frame_filter(self, name: str) -> "set[tuple[str, str]]":
        if self._splits_spec is None or self._splits_spec.type != "lst":
            raise ValueError(f"{type(self).__name__} has no LST-based splits defined.")
        lst_rel = self._splits_spec.files.get(name)
        if lst_rel is None:
            available = list(self._splits_spec.files)
            raise ValueError(f"Split '{name}' not found. Available: {available}")
        return _read_lst_frame_set(self._root / lst_rel)

    def sequences(self) -> "list[SequenceView]":
        from apairo.core.sequence_view import SequenceView  # noqa: F401

        return [self.sequence(sid) for sid in self.sequence_ids]

    def sequence(self, seq_id: str) -> "SequenceView":
        if seq_id not in self._seq_groups:
            raise KeyError(
                f"Sequence '{seq_id}' not found. " f"Available: {self.sequence_ids}"
            )
        from apairo.core.sequence_view import SequenceView

        return SequenceView(self, self._seq_groups[seq_id], seq_id)

    def _load(self, idx) -> Sample:
        if isinstance(idx, tuple):
            seq_id, local_idx = idx
            view = self.sequence(seq_id)
            return self._load(view._indices[local_idx])
        if not 0 <= idx < len(self):
            raise IndexError(f"Index {idx} out of range [0, {len(self)})")
        return Sample(data={key: self._loaders[key][idx] for key in self._keys})

loaders property

loaders: dict

Per-channel loaders, indexed by global frame index.

frame_sequence_ids property

frame_sequence_ids: ndarray

Sequence ID for every frame, indexed by global frame index.

Returns a string array of shape (len(self),) where frame_sequence_ids[i] is the sequence ID that frame i belongs to. Combined with :attr:FilteredView.indices, this lets you split a pre-filtered dataset by sequence without a second disk sweep::

ds_filtered = ds.filter("trav_gt", HasMinPositives(min_pos))
seq_ids = ds.frame_sequence_ids[ds_filtered.indices]

for train_seqs, val_seqs in folds:
    train_idx = np.where(np.isin(seq_ids, train_seqs))[0]
    val_idx   = np.where(np.isin(seq_ids, val_seqs))[0]
    ds_train  = ds_filtered.filter(train_idx)
    ds_val    = ds_filtered.filter(val_idx)

frame_stems property

frame_stems: ndarray

Filename stem for every frame, indexed by global frame index.

describe

describe(sequence_id: str | None = None) -> dict

Describe available channels for this dataset.

Reads .apairo at the dataset root (creating it if absent) and cross-references it with the profile's declared modalities to show which raw channels are present or missing, and which preprocessed channels have been registered.

Parameters:

Name Type Description Default
sequence_id str | None

Optional sequence identifier -- used as the display label only. Channel availability is dataset-wide.

None

Returns:

Type Description
dict

{"raw": {"present": [...], "missing": [...]}, "preprocess": {...}}

Example::

ds = Rellis3DDataset("/data/RELLIS")
ds.describe("00000")
Source code in apairo/core/profiled_dataset.py
def describe(self, sequence_id: str | None = None) -> dict:
    """Describe available channels for this dataset.

    Reads ``.apairo`` at the dataset root (creating it if absent) and
    cross-references it with the profile's declared modalities to show
    which raw channels are present or missing, and which preprocessed
    channels have been registered.

    Args:
        sequence_id: Optional sequence identifier -- used as the display
            label only. Channel availability is dataset-wide.

    Returns:
        ``{"raw": {"present": [...], "missing": [...]}, "preprocess": {...}}``

    Example::

        ds = Rellis3DDataset("/data/RELLIS")
        ds.describe("00000")
    """
    from apairo.core.config import config_exists, read_config

    # Raw channels: probe filesystem directly — .apairo only stores preprocessed ones.
    raw_present = sorted(
        k for k in self.available_keys if self._is_present(self._root, k)
    )
    raw_missing = sorted(
        k for k in self.available_keys if not self._is_present(self._root, k)
    )

    preprocess = {}
    if config_exists(self._root):
        config = read_config(self._root)
        preprocess = {
            k: v
            for k, v in config.get("channels", {}).items()
            if v.get("kind") == "preprocess"
        }

    label = sequence_id if sequence_id is not None else self._root.name
    print(f"\n{type(self).__name__} -- {label}")
    print("─" * 50)
    print("Raw channels")
    if raw_present:
        print("  present  :", ", ".join(raw_present))
    if raw_missing:
        print("  missing  :", ", ".join(raw_missing))
    if not raw_present and not raw_missing:
        print("  (none)")
    print("Preprocessed channels")
    if preprocess:
        for key, meta in sorted(preprocess.items()):
            ts_info = (
                f"<- timestamps from {meta['timestamps_from']}"
                if "timestamps_from" in meta
                else "<- own timestamps"
            )
            src_info = (
                f"  sources: {meta['sources']}" if meta.get("sources") else ""
            )
            print(f"  {key:<20} {meta['loader']:<6} {ts_info}{src_info}")
    else:
        print("  (none)")
    print()
    return {
        "raw": {"present": raw_present, "missing": raw_missing},
        "preprocess": preprocess,
    }

split

split(name: str) -> 'ProfiledDataset'

Return a new dataset instance filtered to the named split.

Source code in apairo/core/profiled_dataset.py
def split(self, name: str) -> "ProfiledDataset":
    """Return a new dataset instance filtered to the named split."""
    return type(self)(
        self._root,
        keys=list(self._keys),
        split=name,
        sequence_ids=list(self._sequence_ids_filter)
        if self._sequence_ids_filter
        else None,
    )

filter_split

filter_split(name: str) -> 'AbstractDataset'

Return a FilteredView restricted to the named predefined split.

Applies the split without re-instantiating the dataset — registered transforms are preserved. Replaces :meth:split for use mid-chain::

ds.transform("lidar", RobotFilter())
ds_train = ds.filter_split("train")  # transforms kept
Source code in apairo/core/profiled_dataset.py
def filter_split(self, name: str) -> "AbstractDataset":
    """Return a FilteredView restricted to the named predefined split.

    Applies the split without re-instantiating the dataset — registered
    transforms are preserved.  Replaces :meth:`split` for use mid-chain::

        ds.transform("lidar", RobotFilter())
        ds_train = ds.filter_split("train")  # transforms kept
    """
    return _apply_lst_filter(self, self._lst_frame_filter(name))

SynchronousDataset

apairo.core.synchronous_dataset.SynchronousDataset

Bases: AbstractDataset

Base class for datasets where index i returns a complete synchronous frame.

All modalities at index i are co-captured -- no timestamps, no interleaving. sample.timestamp is always None. Random access and standard PyTorch DataLoader shuffling work without any additional wrappers.

Subclasses must implement __len__ and _load.

For new synchronous datasets, prefer extending :class:~apairo.core.profiled_dataset.ProfiledDataset with a YAML profile rather than subclassing this directly.

Attributes:

Name Type Description
timestamps

Always None -- marks this dataset as synchronous.

Source code in apairo/core/synchronous_dataset.py
class SynchronousDataset(AbstractDataset):
    """Base class for datasets where index ``i`` returns a complete synchronous frame.

    All modalities at index ``i`` are co-captured -- no timestamps, no interleaving.
    ``sample.timestamp`` is always ``None``.  Random access and standard PyTorch
    ``DataLoader`` shuffling work without any additional wrappers.

    Subclasses must implement ``__len__`` and ``_load``.

    For new synchronous datasets, prefer extending
    :class:`~apairo.core.profiled_dataset.ProfiledDataset` with a YAML profile
    rather than subclassing this directly.

    Attributes:
        timestamps: Always ``None`` -- marks this dataset as synchronous.
    """

    timestamps = None

    @property
    def root_dir(self) -> Path:
        return self._root

    def _seq_root(self, path: Path) -> Path:
        """Return the sequence root directory for a native file path.

        Datasets with deeper file structures (e.g. seq/lidar/scan/file.bin)
        should override this to go up the correct number of levels.
        Default: path.parent.parent (one modality directory deep).
        """
        return path.parent.parent

    def derived_path(self, idx: int, key: str, ext: str) -> Path:
        ref = next(iter(self._files.values()))[idx]
        return self._seq_root(ref) / key / f"{ref.stem}.{ext}"

    @abstractmethod
    def __len__(self) -> int: ...

SemanticKittiDataset

apairo.dataset.semantic_kitti.SemanticKittiDataset

Bases: ProfiledDataset

SemanticKITTI dataset -- driving LiDAR with dense semantic labels.

Keys: lidar (float32, shape (N, 4)), labels (int64, lower 16 bits = semantic class).

Example::

ds = SemanticKittiDataset("/data/kitti/dataset", keys=["lidar", "labels"])
sample = ds[0]
# sample.data["lidar"]  -> np.ndarray (N, 4)
# sample.data["labels"] -> np.ndarray (N,)
Source code in apairo/dataset/semantic_kitti/dataset.py
class SemanticKittiDataset(ProfiledDataset):
    """SemanticKITTI dataset -- driving LiDAR with dense semantic labels.

    Keys: ``lidar`` (float32, shape (N, 4)), ``labels`` (int64, lower 16 bits = semantic class).

    Example::

        ds = SemanticKittiDataset("/data/kitti/dataset", keys=["lidar", "labels"])
        sample = ds[0]
        # sample.data["lidar"]  -> np.ndarray (N, 4)
        # sample.data["labels"] -> np.ndarray (N,)
    """

    _profile = "semantic_kitti.yaml"

Goose3DDataset

apairo.dataset.goose.Goose3DDataset

Bases: ProfiledDataset

GOOSE 3D dataset -- outdoor off-road LiDAR with traversability labels.

Keys: lidar (float32, shape (N, 4)), labels (int64). Split: "train", "val", or "test".

Example::

ds = Goose3DDataset("/data/GOOSE_3D", keys=["lidar", "labels"], split="train")
sample = ds[0]
# sample.data["lidar"]  -> np.ndarray (N, 4)
# sample.data["labels"] -> np.ndarray (N,)
Source code in apairo/dataset/goose/dataset.py
class Goose3DDataset(ProfiledDataset):
    """GOOSE 3D dataset -- outdoor off-road LiDAR with traversability labels.

    Keys: ``lidar`` (float32, shape (N, 4)), ``labels`` (int64).
    Split: ``"train"``, ``"val"``, or ``"test"``.

    Example::

        ds = Goose3DDataset("/data/GOOSE_3D", keys=["lidar", "labels"], split="train")
        sample = ds[0]
        # sample.data["lidar"]  -> np.ndarray (N, 4)
        # sample.data["labels"] -> np.ndarray (N,)
    """

    _profile = "goose.yaml"

Rellis3DDataset

apairo.dataset.rellis.Rellis3DDataset

Bases: ProfiledDataset

RELLIS-3D dataset -- off-road LiDAR with semantic labels.

Keys: lidar (float32, shape (N, 4)), labels (int64). Optional: poses (float64, shape (3, 4)) -- one 3x4 pose matrix per frame, loaded from a per-sequence poses.txt file (one row of 12 floats per frame).

Example::

ds = Rellis3DDataset("/data/RELLIS", keys=["lidar", "labels"])
sample = ds[0]
# sample.data["lidar"]  -> np.ndarray (N, 4)
# sample.data["labels"] -> np.ndarray (N,)

ds = Rellis3DDataset("/data/RELLIS", keys=["lidar", "poses"])
# sample.data["poses"]  -> np.ndarray (3, 4)
Source code in apairo/dataset/rellis/dataset.py
class Rellis3DDataset(ProfiledDataset):
    """RELLIS-3D dataset -- off-road LiDAR with semantic labels.

    Keys: ``lidar`` (float32, shape (N, 4)), ``labels`` (int64).
    Optional: ``poses`` (float64, shape (3, 4)) -- one 3x4 pose matrix per frame,
    loaded from a per-sequence ``poses.txt`` file (one row of 12 floats per frame).

    Example::

        ds = Rellis3DDataset("/data/RELLIS", keys=["lidar", "labels"])
        sample = ds[0]
        # sample.data["lidar"]  -> np.ndarray (N, 4)
        # sample.data["labels"] -> np.ndarray (N,)

        ds = Rellis3DDataset("/data/RELLIS", keys=["lidar", "poses"])
        # sample.data["poses"]  -> np.ndarray (3, 4)
    """

    _profile = "rellis.yaml"

Asynchronous datasets

RawDataset

apairo.dataset.raw.RawDataset

Bases: RootSequenceMixin, AsyncLayoutDataset, ConfigurableDataset

Generic channels.yaml-driven dataset; single sequence or dataset root.

Parameters:

Name Type Description Default
directory str | Path

A sequence directory (has .apairo/channels.yaml) or a dataset root directory (has .apairo/dataset.yaml or sequence subdirectories) -- auto-detected.

required
keys Optional[List[str]]

Channels to load. None -> every channel declared in channels.yaml.

None

Example::

RawDataset.init(seq_dir)                      # once: write .apairo
ds = RawDataset(root, keys=["lidar", "imu"])  # whole dataset
ds.run_preprocess(MyLabeler())                # persisted as a new channel
Source code in apairo/dataset/raw/dataset.py
class RawDataset(RootSequenceMixin, AsyncLayoutDataset, ConfigurableDataset):
    r"""Generic ``channels.yaml``-driven dataset; single sequence or dataset root.

    Args:
        directory: A sequence directory (has ``.apairo/channels.yaml``) **or** a
            dataset root directory (has ``.apairo/dataset.yaml`` or sequence
            subdirectories) -- auto-detected.
        keys: Channels to load. ``None`` -> every channel declared in
            ``channels.yaml``.

    Example::

        RawDataset.init(seq_dir)                      # once: write .apairo
        ds = RawDataset(root, keys=["lidar", "imu"])  # whole dataset
        ds.run_preprocess(MyLabeler())                # persisted as a new channel
    """

    def __init__(
        self,
        directory: str | Path,
        keys: Optional[List[str]] = None,
    ) -> None:
        path = Path(directory)

        if config_exists(path):
            self._is_root = False
            self._sequence_dir = path
            self._name = path.name
            super().__init__(path, keys=keys)
        elif _is_dataset_root(path):
            self._init_raw_root(path, keys)
        else:
            raise FileNotFoundError(
                f"'{path}' is neither a sequence (no {CONFIG_DIR}/channels.yaml) nor a "
                f"dataset root (no {CONFIG_DIR}/{_MANIFEST_FILE} and no sequence "
                f"subdirectories). Initialize a sequence with RawDataset.init(<seq>)."
            )

    # ------------------------------------------------------------------ init

    @classmethod
    def init(
        cls,
        directory: str | Path,
        *,
        merge: bool = False,
        overwrite: bool = False,
        name: Optional[str] = None,
    ) -> Path:
        """Write the ``.apairo`` sidecar(s) by scanning *directory*. Root-aware.

        A **sequence** directory (its sub-directories hold data files) gets a
        ``.apairo/channels.yaml`` with loaders inferred per channel. A **root**
        directory (its sub-directories are sequences) gets each sequence
        initialised, then a ``.apairo/dataset.yaml`` manifest (name + sequence
        order + channel union).

        Args:
            directory: Sequence or dataset-root directory (auto-detected).
            merge: Add newly detected channels without touching existing ones.
            overwrite: Discard existing ``.apairo`` and rebuild from scratch.
            name: Dataset name for the root manifest (default: directory name).

        Returns:
            Path of the file written -- ``channels.yaml`` for a sequence, or
            ``dataset.yaml`` for a root.
        """
        path = Path(directory)

        if cls._is_sequence_layout(path):
            AsyncLayoutDataset.init(path, overwrite=overwrite, merge=merge)
            return path / CONFIG_DIR / CHANNELS_FILE

        seq_dirs: list[Path] = []
        for d in sorted(path.iterdir()):
            if not d.is_dir() or d.name.startswith("."):
                continue
            if cls._is_sequence_layout(d):
                try:
                    AsyncLayoutDataset.init(d, overwrite=overwrite, merge=merge)
                except (FileExistsError, ValueError):
                    # Already initialised (no overwrite/merge), or merge found
                    # nothing new -- either way the sequence is ready. Idempotent.
                    pass
                seq_dirs.append(d)
            elif config_exists(d):
                seq_dirs.append(d)

        if not seq_dirs:
            raise FileNotFoundError(
                f"'{path}' has no channels and no sequence sub-directories to "
                f"initialise. Point init at a sequence or a dataset root."
            )
        return cls._write_manifest(path, name=name)

    @staticmethod
    def _is_sequence_layout(path: Path) -> bool:
        """True when *path*'s own sub-directories include a recognizable channel."""
        return any(
            _detect_loader(d) is not None
            for d in path.iterdir()
            if d.is_dir() and not d.name.startswith(".")
        )

    @classmethod
    def _write_manifest(cls, root: str | Path, *, name: Optional[str] = None) -> Path:
        """(Re)write ``<root>/.apairo/dataset.yaml`` from the sequences on disk."""
        root = Path(root)
        sequences = sorted(
            d.name
            for d in root.iterdir()
            if d.is_dir() and not d.name.startswith(".") and config_exists(d)
        )
        channels: dict = {}
        for seq in sequences:
            for key, meta in read_config(root / seq).get("channels", {}).items():
                channels.setdefault(key, {"kind": meta.get("kind", "raw")})

        manifest = {
            "version": 1,
            "name": name or root.name,
            "sequences": sequences,
            "channels": channels,
        }
        apairo_dir = root / CONFIG_DIR
        apairo_dir.mkdir(exist_ok=True)
        path = apairo_dir / _MANIFEST_FILE
        with open(path, "w") as f:
            yaml.dump(manifest, f, default_flow_style=False, sort_keys=True)
        return path

    # ------------------------------------------------------------------ root

    def _init_raw_root(self, root: Path, keys: Optional[List[str]]) -> None:
        manifest = _read_manifest(root)
        self._name = manifest.get("name", root.name)

        # Sequence order: manifest order if given, else sorted discovery.
        if manifest.get("sequences"):
            seq_dirs = [
                root / s for s in manifest["sequences"] if config_exists(root / s)
            ]
        else:
            seq_dirs = sorted(
                d
                for d in root.iterdir()
                if d.is_dir() and not d.name.startswith(".") and config_exists(d)
            )
        if not seq_dirs:
            raise FileNotFoundError(f"No sequences found under '{root}'.")

        super()._init_root(
            root, seq_dirs, lambda d: RawDataset(d, keys=keys), build_index=True
        )

    # ------------------------------------------------------------------ hooks

    def _single_available(self) -> frozenset:
        return frozenset(self._profile)

    def _set_single_keys(self, keys) -> None:
        if keys == "all":
            keys = sorted(self._profile)
        # Delegate to the layout base's setter (validates + re-inits loaders).
        AsyncLayoutDataset.keys.fset(self, list(keys))

    # ------------------------------------------------------------------ public

    @property
    def name(self) -> str:
        """Dataset name (manifest ``name``, else the directory name)."""
        return self._name

    @property
    def calibration(self) -> dict:
        """Static extrinsics from ``.apairo/calibration.yaml`` (e.g. written from
        ``/tf_static``). On a root, sequences' tables are merged."""
        if not self._is_root:
            return read_calibration(self._sequence_dir)
        merged: dict = {}
        for seq in self._sequences:
            merged.update(read_calibration(seq._sequence_dir))
        return merged

    # ------------------------------------------------------------------ helpers

    def derived_path(self, idx: int, key: str, ext: str) -> Path:
        return self._sequence_dir / key / f"{idx:06d}.{ext}"

    def _bootstrap_config(self, sequence_dir: Path) -> dict:
        """ConfigurableDataset hook: detect raw channels when .apairo is absent."""
        channels: dict = {}
        for key in sorted(get_files(str(sequence_dir))):
            loader = _detect_loader(Path(sequence_dir) / key)
            if loader is None:
                continue
            has_ts = (Path(sequence_dir) / key / "timestamps.txt").exists()
            channels[key] = {"loader": loader, "kind": "raw", "has_timestamps": has_ts}
        return {"version": 1, "channels": channels}

name property

name: str

Dataset name (manifest name, else the directory name).

calibration property

calibration: dict

Static extrinsics from .apairo/calibration.yaml (e.g. written from /tf_static). On a root, sequences' tables are merged.

init classmethod

init(directory: str | Path, *, merge: bool = False, overwrite: bool = False, name: Optional[str] = None) -> Path

Write the .apairo sidecar(s) by scanning directory. Root-aware.

A sequence directory (its sub-directories hold data files) gets a .apairo/channels.yaml with loaders inferred per channel. A root directory (its sub-directories are sequences) gets each sequence initialised, then a .apairo/dataset.yaml manifest (name + sequence order + channel union).

Parameters:

Name Type Description Default
directory str | Path

Sequence or dataset-root directory (auto-detected).

required
merge bool

Add newly detected channels without touching existing ones.

False
overwrite bool

Discard existing .apairo and rebuild from scratch.

False
name Optional[str]

Dataset name for the root manifest (default: directory name).

None

Returns:

Type Description
Path

Path of the file written -- channels.yaml for a sequence, or

Path

dataset.yaml for a root.

Source code in apairo/dataset/raw/dataset.py
@classmethod
def init(
    cls,
    directory: str | Path,
    *,
    merge: bool = False,
    overwrite: bool = False,
    name: Optional[str] = None,
) -> Path:
    """Write the ``.apairo`` sidecar(s) by scanning *directory*. Root-aware.

    A **sequence** directory (its sub-directories hold data files) gets a
    ``.apairo/channels.yaml`` with loaders inferred per channel. A **root**
    directory (its sub-directories are sequences) gets each sequence
    initialised, then a ``.apairo/dataset.yaml`` manifest (name + sequence
    order + channel union).

    Args:
        directory: Sequence or dataset-root directory (auto-detected).
        merge: Add newly detected channels without touching existing ones.
        overwrite: Discard existing ``.apairo`` and rebuild from scratch.
        name: Dataset name for the root manifest (default: directory name).

    Returns:
        Path of the file written -- ``channels.yaml`` for a sequence, or
        ``dataset.yaml`` for a root.
    """
    path = Path(directory)

    if cls._is_sequence_layout(path):
        AsyncLayoutDataset.init(path, overwrite=overwrite, merge=merge)
        return path / CONFIG_DIR / CHANNELS_FILE

    seq_dirs: list[Path] = []
    for d in sorted(path.iterdir()):
        if not d.is_dir() or d.name.startswith("."):
            continue
        if cls._is_sequence_layout(d):
            try:
                AsyncLayoutDataset.init(d, overwrite=overwrite, merge=merge)
            except (FileExistsError, ValueError):
                # Already initialised (no overwrite/merge), or merge found
                # nothing new -- either way the sequence is ready. Idempotent.
                pass
            seq_dirs.append(d)
        elif config_exists(d):
            seq_dirs.append(d)

    if not seq_dirs:
        raise FileNotFoundError(
            f"'{path}' has no channels and no sequence sub-directories to "
            f"initialise. Point init at a sequence or a dataset root."
        )
    return cls._write_manifest(path, name=name)

TartanKittiDataset

apairo.dataset.tartan_kitti.TartanKittiDataset

Bases: RootSequenceMixin, AsyncLayoutDataset, ConfigurableDataset

TartanDrive v2 dataset (asynchronous layout, fixed channel profile).

A profiled member of the asynchronous layout family: the on-disk format is handled by :class:~apairo.dataset.kitti.AsyncLayoutDataset, the multi-sequence root behaviour by :class:~apairo.core.root_sequence.RootSequenceMixin, and this class pins the fixed TartanDrive channel set via profile.yaml. (Datasets with a dynamic channel set use :class:~apairo.dataset.raw.RawDataset instead.)

Accepts either a single sequence directory or a root directory that contains multiple sequences -- the structure is auto-detected.

Single sequence::

ds = TartanKittiDataset(seq_dir, keys=["velodyne_0", "cmd"])

Root directory (all sequences, flat access)::

ds = TartanKittiDataset(root_dir, keys=["velodyne_0"])
len(ds)           # total events across all sequences
ds.sequences      # list[TartanKittiDataset], one per sequence

Lazy init -- inspect before loading::

ds = TartanKittiDataset(root_or_seq_dir)
ds.available                 # frozenset of available channels
ds.keys = ["velodyne_0"]     # initialize loaders
ds.keys = "all"              # or load everything

Parameters:

Name Type Description Default
path str | Path

Single sequence directory or root directory.

required
keys Optional[List[str] | str]

Channels to load. None -> lazy (no loaders). "all" -> all channels present in .apairo.

None
Source code in apairo/dataset/tartan_kitti/dataset.py
class TartanKittiDataset(RootSequenceMixin, AsyncLayoutDataset, ConfigurableDataset):
    r"""TartanDrive v2 dataset (asynchronous layout, fixed channel profile).

    A profiled member of the asynchronous layout family: the on-disk format is
    handled by :class:`~apairo.dataset.kitti.AsyncLayoutDataset`, the
    multi-sequence root behaviour by
    :class:`~apairo.core.root_sequence.RootSequenceMixin`, and this class pins
    the *fixed* TartanDrive channel set via ``profile.yaml``.  (Datasets with a
    *dynamic* channel set use :class:`~apairo.dataset.raw.RawDataset` instead.)

    Accepts either a single sequence directory or a root directory that contains
    multiple sequences -- the structure is auto-detected.

    Single sequence::

        ds = TartanKittiDataset(seq_dir, keys=["velodyne_0", "cmd"])

    Root directory (all sequences, flat access)::

        ds = TartanKittiDataset(root_dir, keys=["velodyne_0"])
        len(ds)           # total events across all sequences
        ds.sequences      # list[TartanKittiDataset], one per sequence

    Lazy init -- inspect before loading::

        ds = TartanKittiDataset(root_or_seq_dir)
        ds.available                 # frozenset of available channels
        ds.keys = ["velodyne_0"]     # initialize loaders
        ds.keys = "all"              # or load everything

    Args:
        path: Single sequence directory **or** root directory.
        keys: Channels to load. ``None`` -> lazy (no loaders). ``"all"`` -> all
            channels present in ``.apairo``.
    """

    available_keys: frozenset = frozenset(load_profile(_PROFILE_PATH).keys())

    def __init__(
        self,
        path: str | Path,
        keys: Optional[List[str] | str] = None,
    ) -> None:
        path = Path(path)
        raw_profile = load_profile(_PROFILE_PATH)

        if _is_sequence_dir(path, raw_profile):
            self._is_root = False
            self._init_sequence(path, keys, raw_profile)
        else:
            self._init_root(path, keys, raw_profile)

    # ---------------------------------------------------------------- sequence

    def _init_sequence(self, sequence_dir: Path, keys, raw_profile: dict) -> None:
        config = self._load_or_create_config(sequence_dir)
        channels: dict = config.get("channels", {})

        if not channels:
            raise FileNotFoundError(
                f"No recognized channels found in '{sequence_dir}'. "
                f"Expected subdirectories matching the TartanDrive v2 profile "
                f"(e.g. velodyne_0, image_left, cmd). "
                f"Verify that the path points to a valid sequence directory."
            )

        self._sequence_dir = sequence_dir
        self._available_channels = channels
        self._effective_profile: dict[str, str] = {
            k: v["loader"] for k, v in channels.items()
        }
        self._timestamp_aliases: dict[str, str] = {
            k: v["timestamps_from"]
            for k, v in channels.items()
            if "timestamps_from" in v
        }

        if keys is None:
            # Lazy: store enough state for _init() to run later via keys setter.
            self._keys = []
            self._profile = raw_profile
            self._files = get_files(str(sequence_dir))
        else:
            if keys == "all":
                keys = sorted(channels.keys())
            unknown = set(keys) - set(channels)
            if unknown:
                raise KeyError(
                    f"Keys {unknown} are not declared in .apairo. "
                    f"Register preprocessed channels with "
                    f"{type(self).__name__}.register_channel()."
                )
            missing_dirs = [k for k in keys if not (sequence_dir / k).is_dir()]
            if missing_dirs:
                raise FileNotFoundError(
                    f"Channel directories missing on disk: {missing_dirs}"
                )
            super().__init__(
                directory=sequence_dir, keys=list(keys), dataset_profile=_PROFILE_PATH
            )

    # ---------------------------------------------------------------- root

    def _init_root(self, root_dir: Path, keys, raw_profile: dict) -> None:
        seq_dirs = sorted(
            d
            for d in root_dir.iterdir()
            if d.is_dir()
            and not d.name.startswith(".")
            and _is_sequence_dir(d, raw_profile)
        )
        if not seq_dirs:
            raise FileNotFoundError(
                f"No TartanDrive sequences found in '{root_dir}'. "
                f"Expected subdirectories that are valid sequence directories."
            )
        super()._init_root(
            root_dir,
            seq_dirs,
            lambda d: TartanKittiDataset(d, keys=keys),
            build_index=keys is not None,
        )

    # ---------------------------------------------------------------- hooks

    def _single_available(self) -> frozenset:
        return frozenset(self._available_channels)

    def _set_single_keys(self, keys) -> None:
        if keys == "all":
            keys = sorted(self._available_channels.keys())
        unknown = set(keys) - set(self._available_channels)
        if unknown:
            raise KeyError(
                f"Keys {unknown} are not declared in .apairo. "
                f"Register preprocessed channels with "
                f"{type(self).__name__}.register_channel()."
            )
        missing_dirs = [k for k in keys if not (self._sequence_dir / k).is_dir()]
        if missing_dirs:
            raise FileNotFoundError(
                f"Channel directories missing on disk: {missing_dirs}"
            )
        self._set_keys(list(keys))
        self._init()

    # ---------------------------------------------------------------- preprocessing

    def derived_path(self, idx: int, key: str, ext: str) -> Path:
        return self._sequence_dir / key / f"{idx:06d}.{ext}"

    # ---------------------------------------------------------------- dunder

    def __len__(self) -> int:
        if not self._is_root and not self._keys:
            raise RuntimeError("No keys loaded. Set ds.keys = [...] first.")
        return super().__len__()

    def _load(self, idx):
        if not self._is_root and not isinstance(idx, tuple) and not self._keys:
            raise RuntimeError("No keys loaded. Set ds.keys = [...] first.")
        return super()._load(idx)

    # ---------------------------------------------------------------- bootstrap

    def _bootstrap_config(self, sequence_dir: Path) -> dict:
        raw_profile = load_profile(_PROFILE_PATH)
        available = get_files(str(sequence_dir))
        channels: dict = {}
        for key in sorted(available):
            if key not in raw_profile:
                continue
            has_ts = (sequence_dir / key / "timestamps.txt").exists()
            channels[key] = {"loader": raw_profile[key], "has_timestamps": has_ts}
        return {"version": 1, "channels": channels}

    # ---------------------------------------------------------------- loaders

    def _init_loaders(self) -> None:
        self.loaders = {
            key: str_to_loader[self._effective_profile[key]](self._files[key])
            for key in self._keys
        }

        self.timestamps: dict[str, np.ndarray] = {}
        raw_fallback: list[str] = []

        for key in self._keys:
            ts_path = Path(self._files[key]) / "timestamps.txt"
            if ts_path.exists():
                self.timestamps[key] = np.loadtxt(ts_path)
            elif key in self._timestamp_aliases:
                # Backward compat: derived channel created before own-timestamps convention.
                src = self._timestamp_aliases[key]
                if src not in self.timestamps:
                    src_path = Path(self._files[src]) / "timestamps.txt"
                    if not src_path.exists():
                        raise ValueError(
                            f"'{key}' has no timestamps.txt and its alias source "
                            f"'{src}' has no timestamps.txt either. "
                            f"Re-run run_preprocess() to regenerate."
                        )
                    self.timestamps[src] = np.loadtxt(src_path)
                self.timestamps[key] = self.timestamps[src]
            else:
                raw_fallback.append(key)

        if raw_fallback:
            self.timestamps.update(loads_timestamps(raw_fallback, self._files))

        self.end_of_time: float = get_end_of_time(self.timestamps) + 1.0

    # ---------------------------------------------------------------- describe

    @classmethod
    def describe(cls, path: str | Path) -> dict:
        """Describe available channels -- auto-detects root vs sequence directory.

        Root directory: lists each sequence with its raw and preprocessed channels.
        Sequence directory: shows raw present/missing and preprocessed channels.
        """
        path = Path(path)
        raw_profile = load_profile(_PROFILE_PATH)

        if _is_sequence_dir(path, raw_profile):
            return super().describe(path)

        seq_dirs = sorted(
            d
            for d in path.iterdir()
            if d.is_dir()
            and not d.name.startswith(".")
            and _is_sequence_dir(d, raw_profile)
        )
        if not seq_dirs:
            print(f"No TartanDrive sequences found in '{path}'.")
            return {}

        n = len(seq_dirs)
        print(f"\n{cls.__name__} -- {path.name} ({n} sequence{'s' if n > 1 else ''})")
        print("─" * 50)

        result = {}
        for seq_dir in seq_dirs:
            if config_exists(seq_dir):
                config = read_config(seq_dir)
            else:
                instance = cls.__new__(cls)
                config = instance._bootstrap_config(seq_dir)

            channels = config.get("channels", {})
            raw = sorted(
                k for k, v in channels.items() if v.get("kind", "raw") == "raw"
            )
            preproc = {
                k: v for k, v in channels.items() if v.get("kind") == "preprocess"
            }

            raw_str = ", ".join(raw) if raw else "(none)"
            preproc_str = f" + {len(preproc)} preprocessed" if preproc else ""
            print(f"  {seq_dir.name:<20} {raw_str}{preproc_str}")

            result[seq_dir.name] = {"raw": raw, "preprocess": preproc}

        print()
        return result

describe classmethod

describe(path: str | Path) -> dict

Describe available channels -- auto-detects root vs sequence directory.

Root directory: lists each sequence with its raw and preprocessed channels. Sequence directory: shows raw present/missing and preprocessed channels.

Source code in apairo/dataset/tartan_kitti/dataset.py
@classmethod
def describe(cls, path: str | Path) -> dict:
    """Describe available channels -- auto-detects root vs sequence directory.

    Root directory: lists each sequence with its raw and preprocessed channels.
    Sequence directory: shows raw present/missing and preprocessed channels.
    """
    path = Path(path)
    raw_profile = load_profile(_PROFILE_PATH)

    if _is_sequence_dir(path, raw_profile):
        return super().describe(path)

    seq_dirs = sorted(
        d
        for d in path.iterdir()
        if d.is_dir()
        and not d.name.startswith(".")
        and _is_sequence_dir(d, raw_profile)
    )
    if not seq_dirs:
        print(f"No TartanDrive sequences found in '{path}'.")
        return {}

    n = len(seq_dirs)
    print(f"\n{cls.__name__} -- {path.name} ({n} sequence{'s' if n > 1 else ''})")
    print("─" * 50)

    result = {}
    for seq_dir in seq_dirs:
        if config_exists(seq_dir):
            config = read_config(seq_dir)
        else:
            instance = cls.__new__(cls)
            config = instance._bootstrap_config(seq_dir)

        channels = config.get("channels", {})
        raw = sorted(
            k for k, v in channels.items() if v.get("kind", "raw") == "raw"
        )
        preproc = {
            k: v for k, v in channels.items() if v.get("kind") == "preprocess"
        }

        raw_str = ", ".join(raw) if raw else "(none)"
        preproc_str = f" + {len(preproc)} preprocessed" if preproc else ""
        print(f"  {seq_dir.name:<20} {raw_str}{preproc_str}")

        result[seq_dir.name] = {"raw": raw, "preprocess": preproc}

    print()
    return result

AsyncLayoutDataset

Abstract per-channel layout base for the asynchronous family. Internal base class — subclass it to add a fixed-channel async dataset; end users load through RawDataset or TartanKittiDataset.

apairo.dataset.kitti.AsyncLayoutDataset

Bases: AbstractDataset

Abstract asynchronous layout loader (one subdirectory per channel).

This is the format primitive of the asynchronous dataset family -- it is not the KITTI dataset (no real KITTI dataset uses it; see :class:~apairo.dataset.semantic_kitti.SemanticKittiDataset, which is a synchronous :class:~apairo.core.profiled_dataset.ProfiledDataset).

It describes how channels are stored, never which channels exist: each channel is a subdirectory with its own timestamps.txt and data files in a format known to the loader registry (npys, npy, bin, img, zarr). The set of channels is per-instance state, read from .apairo/channels.yaml (or an explicit dataset_profile). Datasets with a fixed channel set layer a profile on top (e.g. :class:~apairo.dataset.tartan_kitti.TartanKittiDataset); datasets with dynamic channels (e.g. apairo-extractor output) use :class:~apairo.dataset.raw.RawDataset, which reads the channel set from .apairo with no profile.

Usage with an explicit profile (original API)::

ds = AsyncLayoutDataset(seq_dir, keys=["lidar", "cam"], dataset_profile="my.yaml")

Usage with .apairo (after :meth:init has been called)::

AsyncLayoutDataset.init(seq_dir)          # once, auto-detects channels
ds = AsyncLayoutDataset(seq_dir)          # keys and loaders come from .apairo
ds = AsyncLayoutDataset(seq_dir, keys=["lidar"])  # restrict to a subset

Parameters:

Name Type Description Default
directory str | Path

Path to the dataset root / sequence directory.

required
keys Optional[List[str]]

Modality names to load. None → all channels declared in .apairo (requires .apairo to exist).

None
dataset_profile Optional[str | Path]

YAML profile filename or absolute Path mapping keys to loader types. None → loaders are read from .apairo (requires .apairo to exist).

None
Source code in apairo/dataset/kitti/dataset.py
class AsyncLayoutDataset(AbstractDataset):
    r"""Abstract *asynchronous layout* loader (one subdirectory per channel).

    This is the format primitive of the asynchronous dataset family -- it is
    **not** the KITTI dataset (no real KITTI dataset uses it; see
    :class:`~apairo.dataset.semantic_kitti.SemanticKittiDataset`, which is a
    synchronous :class:`~apairo.core.profiled_dataset.ProfiledDataset`).

    It describes *how* channels are stored, never *which* channels exist: each
    channel is a subdirectory with its own ``timestamps.txt`` and data files in
    a format known to the loader registry (``npys``, ``npy``, ``bin``, ``img``,
    ``zarr``). The set of channels is per-instance state, read from
    ``.apairo/channels.yaml`` (or an explicit ``dataset_profile``). Datasets
    with a *fixed* channel set layer a profile on top (e.g.
    :class:`~apairo.dataset.tartan_kitti.TartanKittiDataset`); datasets with
    *dynamic* channels (e.g. ``apairo-extractor`` output) use
    :class:`~apairo.dataset.raw.RawDataset`, which reads the channel set from
    ``.apairo`` with no profile.

    **Usage with an explicit profile (original API)**::

        ds = AsyncLayoutDataset(seq_dir, keys=["lidar", "cam"], dataset_profile="my.yaml")

    **Usage with** ``.apairo`` **(after** :meth:`init` **has been called)**::

        AsyncLayoutDataset.init(seq_dir)          # once, auto-detects channels
        ds = AsyncLayoutDataset(seq_dir)          # keys and loaders come from .apairo
        ds = AsyncLayoutDataset(seq_dir, keys=["lidar"])  # restrict to a subset

    Args:
        directory: Path to the dataset root / sequence directory.
        keys: Modality names to load.  ``None`` → all channels declared in
            ``.apairo`` (requires ``.apairo`` to exist).
        dataset_profile: YAML profile filename **or** absolute Path mapping keys
            to loader types.  ``None`` → loaders are read from ``.apairo``
            (requires ``.apairo`` to exist).
    """

    synchronous: bool = False

    def __init__(
        self,
        directory: str | Path,
        keys: Optional[List[str]] = None,
        dataset_profile: Optional[str | Path] = None,
    ) -> None:
        directory = Path(directory)

        if dataset_profile is not None:
            self._profile: Dict[str, str] = load_profile(dataset_profile)
        elif config_exists(directory):
            config = read_config(directory)
            channels = config.get("channels", {})
            self._profile = {k: v["loader"] for k, v in channels.items() if "loader" in v}
            if keys is None:
                keys = sorted(channels.keys())
        else:
            raise FileNotFoundError(
                f"No dataset_profile given and no .apairo found in '{directory}'. "
                f"Either pass dataset_profile=..., or initialize with "
                f"{type(self).__name__}.init('{directory}')."
            )

        if keys is None:
            raise ValueError(
                "keys must be specified when dataset_profile is given. "
                "Pass keys=[...] or use .apairo (call init() first)."
            )

        self._files: Dict[str, str] = get_files(str(directory))

        missing = set(keys) - set(self._files)
        if missing:
            raise KeyError(f"Keys not found in dataset directory: {missing}")

        self._keys: List[str] = []
        self._set_keys(keys)
        self._init()

    @classmethod
    def init(
        cls,
        directory: str | Path,
        *,
        raw_keys: Optional[List[str]] = None,
        overwrite: bool = False,
        merge: bool = False,
    ) -> None:
        """Scan a KITTI-layout directory and write ``.apairo/channels.yaml``.

        All detected subdirectories are registered as raw channels.  Loader
        type is inferred from file extensions:

        * ``.bin`` → ``bin``
        * ``.png`` / ``.jpg`` / … → ``img``
        * multiple ``.npy`` files → ``npys``
        * single ``.npy`` file → ``npy``

        For ambiguous cases (e.g. a single-frame ``.npy`` that is actually
        per-frame), call :func:`~apairo.core.config.register_raw_channel`
        afterwards to override the detected loader.

        Args:
            directory: KITTI root / sequence directory to initialize.
            raw_keys: Subdirectory names to include.  ``None`` → all detected
                subdirectories with recognizable file types.
            overwrite: Discard the existing ``.apairo`` and rebuild from
                scratch.  Incompatible with ``merge``.
            merge: Add newly detected raw channels to an existing ``.apairo``
                without touching channels already declared (raw or
                preprocessed).  If ``.apairo`` does not yet exist, behaves
                like a normal init.  Incompatible with ``overwrite``.

        Raises:
            ValueError: If both ``overwrite`` and ``merge`` are ``True``.
            FileExistsError: If ``.apairo`` already exists and both
                ``overwrite`` and ``merge`` are ``False``.
            ValueError: If no new recognizable channels are found.
        """
        if overwrite and merge:
            raise ValueError("overwrite and merge are mutually exclusive.")

        directory = Path(directory)

        if merge and config_exists(directory):
            existing = read_config(directory).get("channels", {})
            added = 0
            for channel_dir in sorted(directory.iterdir()):
                if not channel_dir.is_dir() or channel_dir.name.startswith("."):
                    continue
                if raw_keys is not None and channel_dir.name not in raw_keys:
                    continue
                if channel_dir.name in existing:
                    continue
                loader = _detect_loader(channel_dir)
                if loader is None:
                    continue
                _register_raw_channel(directory, channel_dir.name, loader)
                added += 1
            if added == 0:
                detail = f" (checked: {raw_keys})" if raw_keys else ""
                raise ValueError(
                    f"No new recognizable channels found in '{directory}'{detail}."
                )
            return

        if config_exists(directory) and not overwrite:
            raise FileExistsError(
                f".apairo already exists in '{directory}'. "
                f"Pass overwrite=True to reinitialize, or merge=True to add new channels."
            )

        channels: dict = {}
        for channel_dir in sorted(directory.iterdir()):
            if not channel_dir.is_dir() or channel_dir.name.startswith("."):
                continue
            if raw_keys is not None and channel_dir.name not in raw_keys:
                continue
            loader = _detect_loader(channel_dir)
            if loader is None:
                continue
            channels[channel_dir.name] = {
                "has_timestamps": (channel_dir / "timestamps.txt").exists(),
                "kind": "raw",
                "loader": loader,
            }

        if not channels:
            detail = f" (checked: {raw_keys})" if raw_keys else ""
            raise ValueError(
                f"No recognizable channels found in '{directory}'{detail}. "
                f"Expected subdirectories containing .bin, .npy, or image files."
            )

        write_config(directory, {"version": 1, "channels": channels})

    # ------------------------------------------------------------------ keys

    @property
    def keys(self) -> List[str]:
        return self._keys

    @keys.setter
    def keys(self, keys: List[str]) -> None:
        missing = set(keys) - set(self._files)
        if missing:
            raise KeyError(f"Keys not found in dataset directory: {missing}")
        self._set_keys(list(keys))
        self._init()

    # ----------------------------------------------------------------- shape

    @property
    def shape(self) -> Dict[str, Tuple[int, ...]]:
        return {key: self.loaders[key].shape for key in self.keys}

    # ----------------------------------------------------------------- init

    def _init(self) -> None:
        if not self._keys:
            return
        self._init_loaders()
        self._init_timeline()

    def _init_loaders(self) -> None:
        self.loaders: Dict[str, AbstractLoader] = {
            key: str_to_loader[self._profile[key]](self._files[key])
            for key in self._keys
        }
        self.timestamps: Dict[str, np.ndarray] = loads_timestamps(
            self._keys, self._files
        )
        self.end_of_time: float = get_end_of_time(self.timestamps) + 1.0

    def _init_timeline(self) -> None:
        """Build the interleaved timeline as two parallel numpy arrays."""
        from apairo.utils.timestamps import merge_timeline
        self._tl_key_idxs, self._tl_frame_idxs = merge_timeline(
            self.timestamps, self._keys
        )

    # ------------------------------------------------------------ dunder

    def __len__(self) -> int:
        return len(self._tl_key_idxs)

    def _load(self, idx: int) -> Sample:
        if not 0 <= idx < len(self):
            raise IndexError(f"Index {idx} out of range [0, {len(self)})")
        key = self._keys[self._tl_key_idxs[idx]]
        frame = int(self._tl_frame_idxs[idx])
        return Sample(
            data={key: self.loaders[key][frame]},
            timestamp=float(self.timestamps[key][frame]),
        )

init classmethod

init(directory: str | Path, *, raw_keys: Optional[List[str]] = None, overwrite: bool = False, merge: bool = False) -> None

Scan a KITTI-layout directory and write .apairo/channels.yaml.

All detected subdirectories are registered as raw channels. Loader type is inferred from file extensions:

  • .binbin
  • .png / .jpg / … → img
  • multiple .npy files → npys
  • single .npy file → npy

For ambiguous cases (e.g. a single-frame .npy that is actually per-frame), call :func:~apairo.core.config.register_raw_channel afterwards to override the detected loader.

Parameters:

Name Type Description Default
directory str | Path

KITTI root / sequence directory to initialize.

required
raw_keys Optional[List[str]]

Subdirectory names to include. None → all detected subdirectories with recognizable file types.

None
overwrite bool

Discard the existing .apairo and rebuild from scratch. Incompatible with merge.

False
merge bool

Add newly detected raw channels to an existing .apairo without touching channels already declared (raw or preprocessed). If .apairo does not yet exist, behaves like a normal init. Incompatible with overwrite.

False

Raises:

Type Description
ValueError

If both overwrite and merge are True.

FileExistsError

If .apairo already exists and both overwrite and merge are False.

ValueError

If no new recognizable channels are found.

Source code in apairo/dataset/kitti/dataset.py
@classmethod
def init(
    cls,
    directory: str | Path,
    *,
    raw_keys: Optional[List[str]] = None,
    overwrite: bool = False,
    merge: bool = False,
) -> None:
    """Scan a KITTI-layout directory and write ``.apairo/channels.yaml``.

    All detected subdirectories are registered as raw channels.  Loader
    type is inferred from file extensions:

    * ``.bin`` → ``bin``
    * ``.png`` / ``.jpg`` / … → ``img``
    * multiple ``.npy`` files → ``npys``
    * single ``.npy`` file → ``npy``

    For ambiguous cases (e.g. a single-frame ``.npy`` that is actually
    per-frame), call :func:`~apairo.core.config.register_raw_channel`
    afterwards to override the detected loader.

    Args:
        directory: KITTI root / sequence directory to initialize.
        raw_keys: Subdirectory names to include.  ``None`` → all detected
            subdirectories with recognizable file types.
        overwrite: Discard the existing ``.apairo`` and rebuild from
            scratch.  Incompatible with ``merge``.
        merge: Add newly detected raw channels to an existing ``.apairo``
            without touching channels already declared (raw or
            preprocessed).  If ``.apairo`` does not yet exist, behaves
            like a normal init.  Incompatible with ``overwrite``.

    Raises:
        ValueError: If both ``overwrite`` and ``merge`` are ``True``.
        FileExistsError: If ``.apairo`` already exists and both
            ``overwrite`` and ``merge`` are ``False``.
        ValueError: If no new recognizable channels are found.
    """
    if overwrite and merge:
        raise ValueError("overwrite and merge are mutually exclusive.")

    directory = Path(directory)

    if merge and config_exists(directory):
        existing = read_config(directory).get("channels", {})
        added = 0
        for channel_dir in sorted(directory.iterdir()):
            if not channel_dir.is_dir() or channel_dir.name.startswith("."):
                continue
            if raw_keys is not None and channel_dir.name not in raw_keys:
                continue
            if channel_dir.name in existing:
                continue
            loader = _detect_loader(channel_dir)
            if loader is None:
                continue
            _register_raw_channel(directory, channel_dir.name, loader)
            added += 1
        if added == 0:
            detail = f" (checked: {raw_keys})" if raw_keys else ""
            raise ValueError(
                f"No new recognizable channels found in '{directory}'{detail}."
            )
        return

    if config_exists(directory) and not overwrite:
        raise FileExistsError(
            f".apairo already exists in '{directory}'. "
            f"Pass overwrite=True to reinitialize, or merge=True to add new channels."
        )

    channels: dict = {}
    for channel_dir in sorted(directory.iterdir()):
        if not channel_dir.is_dir() or channel_dir.name.startswith("."):
            continue
        if raw_keys is not None and channel_dir.name not in raw_keys:
            continue
        loader = _detect_loader(channel_dir)
        if loader is None:
            continue
        channels[channel_dir.name] = {
            "has_timestamps": (channel_dir / "timestamps.txt").exists(),
            "kind": "raw",
            "loader": loader,
        }

    if not channels:
        detail = f" (checked: {raw_keys})" if raw_keys else ""
        raise ValueError(
            f"No recognizable channels found in '{directory}'{detail}. "
            f"Expected subdirectories containing .bin, .npy, or image files."
        )

    write_config(directory, {"version": 1, "channels": channels})

StreamDataset

apairo.dataset.stream.StreamDataset

Bases: AbstractDataset

In-memory asynchronous dataset built from timestamped event streams.

The bridge between live or freshly-decoded data (ROS messages, queue items, arrays in RAM) and the apairo API: give it one (timestamps, items) pair per channel and it behaves exactly like a file-backed asynchronous dataset -- merged timeline, single-event samples, and above all :meth:~apairo.core.abstract_dataset.AbstractDataset.synchronize::

ds = StreamDataset({
    "image": (img_ts, img_msgs),      # any indexable items
    "lidar": (lidar_ts, lidar_msgs),
    "odom":  (odom_ts,  odom_msgs),
})

ds[0]                                  # one event, timestamp-ordered
frames = ds.synchronize(reference=clock, method="latest")

Items are stored as given -- they can be numpy arrays, ROS messages, or any Python objects; apairo never copies or converts them.

Parameters:

Name Type Description Default
streams Dict[str, Tuple[ndarray, Sequence]]

{channel: (timestamps, items)}. Timestamps must be ascending 1-D arrays; len(items) must match.

required

Raises:

Type Description
ValueError

On empty streams, length mismatch, or non-ascending timestamps.

Source code in apairo/dataset/stream.py
class StreamDataset(AbstractDataset):
    """In-memory asynchronous dataset built from timestamped event streams.

    The bridge between live or freshly-decoded data (ROS messages, queue
    items, arrays in RAM) and the apairo API: give it one ``(timestamps,
    items)`` pair per channel and it behaves exactly like a file-backed
    asynchronous dataset -- merged timeline, single-event samples, and above
    all :meth:`~apairo.core.abstract_dataset.AbstractDataset.synchronize`::

        ds = StreamDataset({
            "image": (img_ts, img_msgs),      # any indexable items
            "lidar": (lidar_ts, lidar_msgs),
            "odom":  (odom_ts,  odom_msgs),
        })

        ds[0]                                  # one event, timestamp-ordered
        frames = ds.synchronize(reference=clock, method="latest")

    Items are stored as given -- they can be numpy arrays, ROS messages, or
    any Python objects; apairo never copies or converts them.

    Args:
        streams: ``{channel: (timestamps, items)}``.  Timestamps must be
            ascending 1-D arrays; ``len(items)`` must match.

    Raises:
        ValueError: On empty streams, length mismatch, or non-ascending
            timestamps.
    """

    def __init__(
        self,
        streams: Dict[str, Tuple[np.ndarray, Sequence]],
    ) -> None:
        if not streams:
            raise ValueError("StreamDataset requires at least one stream.")

        self.loaders: Dict[str, Sequence] = {}
        self.timestamps: Dict[str, np.ndarray] = {}
        for key, (ts, items) in streams.items():
            ts = np.asarray(ts, dtype=np.float64)
            if ts.ndim != 1 or len(ts) == 0:
                raise ValueError(
                    f"Stream {key!r}: timestamps must be a non-empty 1-D "
                    f"array, got shape {ts.shape}."
                )
            if len(ts) != len(items):
                raise ValueError(
                    f"Stream {key!r}: {len(ts)} timestamps for "
                    f"{len(items)} items."
                )
            if np.any(np.diff(ts) < 0):
                raise ValueError(f"Stream {key!r}: timestamps must be ascending.")
            self.timestamps[key] = ts
            self.loaders[key] = items

        self._set_keys(list(streams))

        from apairo.utils.timestamps import merge_timeline
        self._tl_key_idxs, self._tl_frame_idxs = merge_timeline(
            self.timestamps, self._keys
        )

    def __len__(self) -> int:
        return len(self._tl_key_idxs)

    def _load(self, idx: int) -> Sample:
        if not 0 <= idx < len(self):
            raise IndexError(f"Index {idx} out of range [0, {len(self)})")
        key = self._keys[self._tl_key_idxs[idx]]
        frame = int(self._tl_frame_idxs[idx])
        return Sample(
            data={key: self.loaders[key][frame]},
            timestamp=float(self.timestamps[key][frame]),
        )

    def __repr__(self) -> str:
        sizes = {k: len(v) for k, v in self.loaders.items()}
        return f"StreamDataset(events={len(self)}, streams={sizes})"

SynchronizedView

apairo.core.synchronized_view.SynchronizedView

Bases: AbstractDataset

A synchronous view over an asynchronous dataset.

Created by :meth:~apairo.core.abstract_dataset.AbstractDataset.synchronize. Index i returns a complete sample built around the i-th tick of the reference clock: every channel contributes either an existing event matched by timestamp, or a value synthesized at the tick by an :class:~apairo.core.interpolator.Interpolator. The matching is a pure index computation (one np.searchsorted per channel at construction time) -- no data is read until access.

Because the view is synchronous (timestamps is None), the full chaining API applies: .filter(), .select(), .cache(), .join(), and map-style PyTorch DataLoader with shuffling.

.. note:: The view reads channel data directly from the parent's loaders -- transforms registered on the parent are not applied (they were written for single-event samples). Register transforms on the view::

    ds_sync = ds.synchronize().transform("velodyne_0", RangeFilter(50))

Parameters:

Name Type Description Default
parent AbstractDataset

Asynchronous dataset exposing per-channel timestamps and loaders.

required
reference str | ndarray | None

The clock to resample onto. Three forms:

  • channel name -- that channel's timestamps drive the view;
  • None -- the lowest-frequency channel is used;
  • array of timestamps -- an external clock. Enables fixed-rate resampling (np.arange(t0, t1, 1/hz)) or distance-based resampling (see :func:~apairo.utils.timestamps.clock_from_distance).
None
method ChannelStrategy | dict[str, ChannelStrategy]

Strategy applied to every channel, or a dict mapping channel names to per-channel strategies (unlisted channels default to "latest"). A strategy is one of:

  • "latest" -- last event with t <= t_ref (zero-order hold);
  • "nearest" -- event closest in time to t_ref;
  • a callable (channel_ts, ref_ts) -> indices returning, for each reference tick, the event index to use (negative = no match, the frame is dropped);
  • an :class:~apairo.core.interpolator.Interpolator -- the value is synthesized at t_ref from the two bracketing events (continuous signals only: poses, IMU, commands).
'latest'
tolerance float | None

Maximum |t - t_ref| in seconds. For interpolated channels, both bracketing events must lie within tolerance. Reference ticks where any channel has no match are dropped.

None

Example::

from apairo_transform.interp import Se3Interp

ds = TartanKittiDataset(seq, keys=["velodyne_0", "gicp_poses"])
ds_sync = ds.synchronize(
    reference="velodyne_0",
    method={"gicp_poses": Se3Interp()},   # velodyne_0 -> "latest"
    tolerance=0.05,
)

s = ds_sync[0]
s.data["gicp_poses"]    # pose interpolated at s.timestamp
Source code in apairo/core/synchronized_view.py
class SynchronizedView(AbstractDataset):
    """A synchronous view over an asynchronous dataset.

    Created by :meth:`~apairo.core.abstract_dataset.AbstractDataset.synchronize`.
    Index ``i`` returns a complete sample built around the *i*-th tick of the
    reference clock: every channel contributes either an existing event
    matched by timestamp, or a value synthesized at the tick by an
    :class:`~apairo.core.interpolator.Interpolator`.  The matching is a pure
    index computation (one ``np.searchsorted`` per channel at construction
    time) -- no data is read until access.

    Because the view is synchronous (``timestamps`` is ``None``), the full
    chaining API applies: ``.filter()``, ``.select()``, ``.cache()``,
    ``.join()``, and map-style PyTorch ``DataLoader`` with shuffling.

    .. note::
        The view reads channel data directly from the parent's loaders --
        transforms registered on the *parent* are not applied (they were
        written for single-event samples).  Register transforms on the view::

            ds_sync = ds.synchronize().transform("velodyne_0", RangeFilter(50))

    Args:
        parent: Asynchronous dataset exposing per-channel ``timestamps`` and
            ``loaders``.
        reference: The clock to resample onto.  Three forms:

            * channel name -- that channel's timestamps drive the view;
            * ``None`` -- the lowest-frequency channel is used;
            * **array of timestamps** -- an external clock.  Enables
              fixed-rate resampling (``np.arange(t0, t1, 1/hz)``) or
              distance-based resampling (see
              :func:`~apairo.utils.timestamps.clock_from_distance`).
        method: Strategy applied to every channel, or a dict mapping channel
            names to per-channel strategies (unlisted channels default to
            ``"latest"``).  A strategy is one of:

            * ``"latest"`` -- last event with ``t <= t_ref`` (zero-order hold);
            * ``"nearest"`` -- event closest in time to ``t_ref``;
            * a **callable** ``(channel_ts, ref_ts) -> indices`` returning,
              for each reference tick, the event index to use (negative = no
              match, the frame is dropped);
            * an :class:`~apairo.core.interpolator.Interpolator` -- the value
              is synthesized at ``t_ref`` from the two bracketing events
              (continuous signals only: poses, IMU, commands).
        tolerance: Maximum ``|t - t_ref|`` in seconds.  For interpolated
            channels, *both* bracketing events must lie within tolerance.
            Reference ticks where any channel has no match are dropped.

    Example::

        from apairo_transform.interp import Se3Interp

        ds = TartanKittiDataset(seq, keys=["velodyne_0", "gicp_poses"])
        ds_sync = ds.synchronize(
            reference="velodyne_0",
            method={"gicp_poses": Se3Interp()},   # velodyne_0 -> "latest"
            tolerance=0.05,
        )

        s = ds_sync[0]
        s.data["gicp_poses"]    # pose interpolated at s.timestamp
    """

    timestamps = None  # the view itself is synchronous

    def __init__(
        self,
        parent: AbstractDataset,
        reference: str | np.ndarray | None = None,
        method: ChannelStrategy | dict[str, ChannelStrategy] = "latest",
        tolerance: float | None = None,
    ) -> None:
        parent_ts = getattr(parent, "timestamps", None)
        if not isinstance(parent_ts, dict) or not parent_ts:
            raise ValueError(
                f"synchronize() requires an asynchronous dataset with "
                f"per-channel timestamps; {parent.__class__.__name__} is "
                f"already synchronous."
            )

        keys = list(parent.keys)
        strategies = self._resolve_strategies(method, keys)
        ref_name, ref_ts = self._resolve_clock(reference, parent_ts, keys)

        valid = np.ones(len(ref_ts), dtype=bool)
        index_map: dict[str, np.ndarray] = {}
        channel_ts: dict[str, np.ndarray] = {}

        for key in keys:
            ts = np.asarray(parent_ts[key], dtype=float)
            channel_ts[key] = ts
            idx, ok = self._match(strategies[key], ts, ref_ts, tolerance)
            valid &= ok
            index_map[key] = idx

        keep = np.where(valid)[0]
        self._parent = parent
        self._reference = ref_name
        self._method = method
        self._strategies = strategies
        self._tolerance = tolerance
        self._ref_timestamps = ref_ts[keep]
        self._index_map = {
            k: v[keep].astype(np.intp) for k, v in index_map.items()
        }
        self._channel_ts = channel_ts
        self._keys = keys

    # ------------------------------------------------------------- resolution

    @staticmethod
    def _resolve_strategies(method, keys: list[str]) -> dict[str, ChannelStrategy]:
        """Normalize *method* into one validated strategy per channel."""
        if isinstance(method, dict):
            unknown = set(method) - set(keys)
            if unknown:
                raise KeyError(
                    f"method maps unknown channels {sorted(unknown)}; "
                    f"dataset keys are {keys}."
                )
            strategies = {k: method.get(k, "latest") for k in keys}
        else:
            strategies = {k: method for k in keys}

        for key, strat in strategies.items():
            if isinstance(strat, Interpolator) or callable(strat):
                continue
            if strat not in ("latest", "nearest"):
                raise ValueError(
                    f"Strategy for {key!r} must be 'latest', 'nearest', a "
                    f"callable (channel_ts, ref_ts) -> indices, or an "
                    f"Interpolator, got {strat!r}"
                )
        return strategies

    @staticmethod
    def _resolve_clock(reference, parent_ts: dict, keys: list[str]):
        """Resolve *reference* into ``(name_or_None, timestamp_array)``.

        Accepts a channel name, ``None`` (lowest-frequency channel), or an
        explicit array of timestamps — an external clock (e.g. fixed-rate
        ticks or distance-based ticks from odometry).
        """
        if reference is None:
            from apairo.utils.timestamps import get_reference_timestamps
            name = get_reference_timestamps({k: parent_ts[k] for k in keys})
            return name, np.asarray(parent_ts[name], dtype=float)

        if isinstance(reference, str):
            if reference not in keys:
                raise KeyError(
                    f"Reference channel {reference!r} not in dataset keys {keys}."
                )
            return reference, np.asarray(parent_ts[reference], dtype=float)

        ref_ts = np.asarray(reference, dtype=float)
        if ref_ts.ndim != 1 or len(ref_ts) == 0:
            raise ValueError(
                f"An external clock must be a non-empty 1-D array of "
                f"timestamps, got shape {ref_ts.shape}."
            )
        if np.any(np.diff(ref_ts) < 0):
            raise ValueError("External clock timestamps must be ascending.")
        return None, ref_ts

    # --------------------------------------------------------------- matching

    @staticmethod
    def _match(
        strat: ChannelStrategy,
        ts: np.ndarray,
        ref_ts: np.ndarray,
        tolerance: float | None,
    ) -> tuple[np.ndarray, np.ndarray]:
        """Compute event indices and per-tick validity for one channel.

        Returns ``(idx, valid)`` where ``idx`` has shape ``(N,)`` for matching
        strategies or ``(N, 2)`` (bracketing pair) for interpolators.
        """
        right = np.searchsorted(ts, ref_ts, side="right")
        latest = right - 1  # last event with t <= t_ref; -1 when none yet

        if isinstance(strat, Interpolator):
            i0 = np.clip(latest, 0, len(ts) - 1)
            i1 = np.clip(right, 0, len(ts) - 1)
            # exact matches collapse to a single index -- the stored value is
            # returned directly, the interpolator is never called
            i1 = np.where(ts[i0] == ref_ts, i0, i1)
            # bracketed: an event at or before the tick, and one at or after
            valid = (latest >= 0) & (ts[i1] >= ref_ts)
            if tolerance is not None:
                valid &= np.maximum(ref_ts - ts[i0], ts[i1] - ref_ts) <= tolerance
            return np.stack([i0, i1], axis=1), valid

        if callable(strat):
            idx = np.asarray(strat(ts, ref_ts))
            if idx.shape != ref_ts.shape:
                raise ValueError(
                    f"Custom method returned shape {idx.shape}, expected "
                    f"{ref_ts.shape} (one index per reference tick; "
                    f"negative = no match)."
                )
            valid = (idx >= 0) & (idx < len(ts))
        elif strat == "latest":
            idx = latest
            valid = latest >= 0
        else:  # nearest
            prev = np.clip(latest, 0, len(ts) - 1)
            nxt = np.clip(right, 0, len(ts) - 1)
            idx = np.where(
                np.abs(ts[prev] - ref_ts) <= np.abs(ts[nxt] - ref_ts),
                prev,
                nxt,
            )
            valid = np.ones(len(ref_ts), dtype=bool)

        idx = np.clip(idx, 0, len(ts) - 1)
        if tolerance is not None:
            valid = valid & (np.abs(ts[idx] - ref_ts) <= tolerance)
        return idx, valid

    # ------------------------------------------------------------- properties

    @property
    def reference(self) -> str | None:
        """Channel providing the clock, or ``None`` for an external clock."""
        return self._reference

    @property
    def reference_timestamps(self) -> np.ndarray:
        """Timestamp of each frame in the view (reference clock)."""
        return self._ref_timestamps

    @property
    def frame_indices(self) -> dict[str, np.ndarray]:
        """Per-channel event indices backing each frame.

        Shape ``(n,)`` for matched channels (the event used), ``(n, 2)`` for
        interpolated channels (the bracketing pair).
        """
        return self._index_map

    def time_offsets(self, key: str) -> np.ndarray:
        """Signed ``t_event - t_ref`` per frame for *key*, in seconds.

        Interpolated channels return zeros: their values are synthesized at
        the reference instant.
        """
        if isinstance(self._strategies[key], Interpolator):
            return np.zeros(len(self), dtype=float)
        ts = self._channel_ts[key]
        return ts[self._index_map[key]] - self._ref_timestamps

    # ----------------------------------------------------------------- access

    def __len__(self) -> int:
        return len(self._ref_timestamps)

    def _load(self, idx: int) -> "Sample":
        from apairo.core.sample import Sample
        if not 0 <= idx < len(self):
            raise IndexError(f"Index {idx} out of range [0, {len(self)})")

        t_ref = float(self._ref_timestamps[idx])
        data = {}
        for key in self._keys:
            strat = self._strategies[key]
            if isinstance(strat, Interpolator):
                i0, i1 = (int(i) for i in self._index_map[key][idx])
                v0 = self._parent.loaders[key][i0]
                if i0 == i1:  # exact match -- no synthesis needed
                    data[key] = v0
                else:
                    ts = self._channel_ts[key]
                    data[key] = strat(
                        t_ref,
                        float(ts[i0]), v0,
                        float(ts[i1]), self._parent.loaders[key][i1],
                    )
            else:
                data[key] = self._parent.loaders[key][int(self._index_map[key][idx])]
        return Sample(data=data, timestamp=t_ref)

    def __repr__(self) -> str:
        ref = self._reference if self._reference is not None else "<external clock>"
        if isinstance(self._method, dict):
            method = "per-channel"
        else:
            method = getattr(self._method, "__name__", self._method)
        return (
            f"SynchronizedView(n={len(self)}, reference={ref!r}, "
            f"method={method!r}, keys={self._keys})"
        )

reference property

reference: str | None

Channel providing the clock, or None for an external clock.

reference_timestamps property

reference_timestamps: ndarray

Timestamp of each frame in the view (reference clock).

frame_indices property

frame_indices: dict[str, ndarray]

Per-channel event indices backing each frame.

Shape (n,) for matched channels (the event used), (n, 2) for interpolated channels (the bracketing pair).

time_offsets

time_offsets(key: str) -> np.ndarray

Signed t_event - t_ref per frame for key, in seconds.

Interpolated channels return zeros: their values are synthesized at the reference instant.

Source code in apairo/core/synchronized_view.py
def time_offsets(self, key: str) -> np.ndarray:
    """Signed ``t_event - t_ref`` per frame for *key*, in seconds.

    Interpolated channels return zeros: their values are synthesized at
    the reference instant.
    """
    if isinstance(self._strategies[key], Interpolator):
        return np.zeros(len(self), dtype=float)
    ts = self._channel_ts[key]
    return ts[self._index_map[key]] - self._ref_timestamps

Interpolator

apairo.core.interpolator.Interpolator

Bases: ABC

Synthesize a channel value at the reference instant from its two bracketing events.

This is the value-level counterpart of the index-level matching strategies ("latest", "nearest"): instead of picking an existing event, an interpolator builds a new value at t_ref. Use it for continuous signals -- poses, IMU, commands -- never for data that cannot be blended (point clouds, images).

Contract, as orchestrated by :class:~apairo.core.synchronized_view.SynchronizedView:

  • a channel whose strategy is an Interpolator receives, for each reference tick t, its two bracketing events (t0, v0) and (t1, v1) with t0 <= t <= t1 and t0 < t1;
  • ticks not bracketed by two events (before the first event or after the last) are dropped from the view;
  • exact matches (t == t0) bypass the interpolator -- the stored value is returned directly, so implementations never see t0 == t1;
  • with tolerance, both neighbours must lie within tolerance of t (max(t - t0, t1 - t) <= tolerance).

Concrete implementations (LinearInterp, Se3Interp, ...) live in apairo_transform.interp.

Example::

class LinearInterp(Interpolator):
    def __call__(self, t, t0, v0, t1, v1):
        a = (t - t0) / (t1 - t0)
        return (1.0 - a) * v0 + a * v1
Source code in apairo/core/interpolator.py
class Interpolator(ABC):
    """Synthesize a channel value at the reference instant from its two
    bracketing events.

    This is the *value-level* counterpart of the index-level matching
    strategies (``"latest"``, ``"nearest"``): instead of picking an existing
    event, an interpolator builds a new value at ``t_ref``.  Use it for
    continuous signals -- poses, IMU, commands -- never for data that cannot
    be blended (point clouds, images).

    Contract, as orchestrated by
    :class:`~apairo.core.synchronized_view.SynchronizedView`:

    * a channel whose strategy is an ``Interpolator`` receives, for each
      reference tick ``t``, its two bracketing events ``(t0, v0)`` and
      ``(t1, v1)`` with ``t0 <= t <= t1`` and ``t0 < t1``;
    * ticks not bracketed by two events (before the first event or after the
      last) are dropped from the view;
    * exact matches (``t == t0``) bypass the interpolator -- the stored
      value is returned directly, so implementations never see ``t0 == t1``;
    * with ``tolerance``, *both* neighbours must lie within tolerance of
      ``t`` (``max(t - t0, t1 - t) <= tolerance``).

    Concrete implementations (``LinearInterp``, ``Se3Interp``, ...) live in
    ``apairo_transform.interp``.

    Example::

        class LinearInterp(Interpolator):
            def __call__(self, t, t0, v0, t1, v1):
                a = (t - t0) / (t1 - t0)
                return (1.0 - a) * v0 + a * v1
    """

    @abstractmethod
    def __call__(self, t: float, t0: float, v0: Any, t1: float, v1: Any) -> Any:
        """Return the channel value at time *t*, ``t0 <= t <= t1``, ``t0 < t1``."""
        ...

__call__ abstractmethod

__call__(t: float, t0: float, v0: Any, t1: float, v1: Any) -> Any

Return the channel value at time t, t0 <= t <= t1, t0 < t1.

Source code in apairo/core/interpolator.py
@abstractmethod
def __call__(self, t: float, t0: float, v0: Any, t1: float, v1: Any) -> Any:
    """Return the channel value at time *t*, ``t0 <= t <= t1``, ``t0 < t1``."""
    ...

Dataset composition

ConcatDataset

apairo.dataset.concat.ConcatDataset

Bases: AbstractDataset

Concatenates multiple dataset instances into one.

Takes the intersection of keys across all datasets so every index returns the same set of modalities regardless of which underlying dataset is hit. Indexing is O(log n) via binary search over cumulative lengths.

Parameters:

Name Type Description Default
datasets List[AbstractDataset]

Non-empty list of dataset instances to concatenate.

required

Example::

sequences = [
    SemanticKittiDataset(f"/data/kitti/seq_{i:02d}", keys=["lidar", "labels"])
    for i in range(11)
]
combined = ConcatDataset(sequences)
sample = combined[0]

Raises:

Type Description
ValueError

If datasets is empty.

Source code in apairo/dataset/concat.py
class ConcatDataset(AbstractDataset):
    """Concatenates multiple dataset instances into one.

    Takes the intersection of keys across all datasets so every index returns
    the same set of modalities regardless of which underlying dataset is hit.
    Indexing is O(log n) via binary search over cumulative lengths.

    Args:
        datasets: Non-empty list of dataset instances to concatenate.

    Example::

        sequences = [
            SemanticKittiDataset(f"/data/kitti/seq_{i:02d}", keys=["lidar", "labels"])
            for i in range(11)
        ]
        combined = ConcatDataset(sequences)
        sample = combined[0]

    Raises:
        ValueError: If ``datasets`` is empty.
    """

    def __init__(self, datasets: List[AbstractDataset]) -> None:
        if not datasets:
            raise ValueError("datasets must be non-empty")
        self.datasets = datasets
        self._resolve_keys()
        self._lengths = np.array([len(ds) for ds in self.datasets], dtype=np.intp)
        self._cumulative = np.cumsum(self._lengths)

    def _resolve_keys(self) -> None:
        keys = set(self.datasets[0].keys)
        for ds in self.datasets[1:]:
            keys &= set(ds.keys)
        self._keys = sorted(keys)

    @property
    def keys(self) -> List[str]:
        return self._keys

    @keys.setter
    def keys(self, keys) -> None:
        self._set_keys(list(keys))
        self.__dict__.pop("timestamps", None)

    @functools.cached_property
    def timestamps(self) -> dict[str, np.ndarray] | None:
        """None for synchronous datasets, concatenated arrays for temporal ones."""
        if self.datasets[0].timestamps is None:
            return None
        result: dict[str, list[np.ndarray]] = {k: [] for k in self._keys}
        for ds in self.datasets:
            for k in self._keys:
                result[k].append(ds.timestamps[k])
        return {k: np.concatenate(v) for k, v in result.items()}

    @property
    def is_synchronous(self) -> bool:
        return self.datasets[0].timestamps is None

    def _dataset_idx_and_offset(self, idx: int) -> tuple[int, int]:
        if idx < 0 or idx >= self._cumulative[-1]:
            raise IndexError(f"Index {idx} out of range [0, {self._cumulative[-1]})")
        ds_idx = int(np.searchsorted(self._cumulative, idx, side="right"))
        offset = int(self._cumulative[ds_idx - 1]) if ds_idx > 0 else 0
        return ds_idx, offset

    def __len__(self) -> int:
        return int(self._cumulative[-1])

    def _load(self, idx: int) -> Sample:
        ds_idx, offset = self._dataset_idx_and_offset(idx)
        sample = self.datasets[ds_idx][idx - offset]
        return Sample(
            data={k: sample.data[k] for k in self._keys if k in sample.data},
            timestamp=sample.timestamp,
        )

timestamps cached property

timestamps: dict[str, ndarray] | None

None for synchronous datasets, concatenated arrays for temporal ones.


split_sequences

apairo.dataset.split_sequences

split_sequences(datasets: list, ratios: tuple[float, float, float] = (0.8, 0.1, 0.1)) -> tuple[list, list, list]

Split datasets into train/val/test at the sequence level.

Splitting at sequence level avoids temporal leakage between splits.

Parameters:

Name Type Description Default
datasets list

Ordered list (one entry per recording session).

required
ratios tuple[float, float, float]

(train, val, test) fractions, must sum to 1.0.

(0.8, 0.1, 0.1)
Source code in apairo/dataset/__init__.py
def split_sequences(
    datasets: list,
    ratios: tuple[float, float, float] = (0.8, 0.1, 0.1),
) -> tuple[list, list, list]:
    """Split datasets into train/val/test at the sequence level.

    Splitting at sequence level avoids temporal leakage between splits.

    Args:
        datasets: Ordered list (one entry per recording session).
        ratios: (train, val, test) fractions, must sum to 1.0.
    """
    if abs(sum(ratios) - 1.0) > 1e-6:
        raise ValueError(f"Ratios must sum to 1.0, got {sum(ratios):.4f}")
    n = len(datasets)
    i1 = int(n * ratios[0])
    i2 = i1 + int(n * ratios[1])
    return datasets[:i1], datasets[i1:i2], datasets[i2:]

Extensibility

ConfigurableDataset

apairo.core.configurable_dataset.ConfigurableDataset

Mixin for datasets that support preprocessed-channel extensibility via .apairo.

Any dataset class that wants to be extensible at runtime (i.e. allow users to register new preprocessed channels without touching source code) should inherit from this mixin alongside its normal base class.

Concrete subclasses must implement :meth:_bootstrap_config, which describes how to auto-discover the dataset's raw channels when .apairo does not yet exist.

Usage pattern for preprocessing scripts::

MyDataset.register_channel(
    seq_dir, "my_channel", "npys",
    timestamps_from="lidar",
    sources=["lidar"],
)

Usage in dataset __init__::

config = self._load_or_create_config(sequence_dir)
Source code in apairo/core/configurable_dataset.py
class ConfigurableDataset:
    """Mixin for datasets that support preprocessed-channel extensibility via ``.apairo``.

    Any dataset class that wants to be extensible at runtime (i.e. allow users to
    register new preprocessed channels without touching source code) should inherit
    from this mixin alongside its normal base class.

    Concrete subclasses must implement :meth:`_bootstrap_config`, which describes
    how to auto-discover the dataset's raw channels when ``.apairo`` does not yet
    exist.

    Usage pattern for preprocessing scripts::

        MyDataset.register_channel(
            seq_dir, "my_channel", "npys",
            timestamps_from="lidar",
            sources=["lidar"],
        )

    Usage in dataset ``__init__``::

        config = self._load_or_create_config(sequence_dir)
    """

    @classmethod
    def register_channel(
        cls,
        sequence_dir: str | Path,
        key: str,
        loader: str,
        *,
        timestamps_from: Optional[str] = None,
        sources: Optional[list[str]] = None,
    ) -> None:
        """Register a preprocessed channel in ``sequence_dir/.apairo``.

        Args:
            sequence_dir: Dataset sequence directory.
            key: Channel name -- must match its subdirectory name.
            loader: Data format: ``"npy"``, ``"npys"``, ``"bin"``, or ``"img"``.
            timestamps_from: Channel whose timestamps to share when this channel
                has no ``timestamps.txt`` of its own.
            sources: Provenance -- channels this channel was derived from.
        """
        _register_channel(
            sequence_dir,
            key,
            loader,
            timestamps_from=timestamps_from,
            sources=sources,
        )

    @abstractmethod
    def _bootstrap_config(self, sequence_dir: Path) -> dict:
        """Return an initial ``.apairo`` config for this dataset.

        Called when no ``.apairo`` exists yet.  Should auto-discover all raw
        channels present in ``sequence_dir`` and return a config dict of the form::

            {
                "version": 1,
                "channels": {
                    "channel_name": {"loader": "npys", "has_timestamps": True},
                    ...
                },
            }
        """
        ...

    run_preprocess = _RunPreprocessDescriptor()
    """Run a preprocessor and persist the output channel.

    Can be called on the class or on an existing instance:

    **Class form** -- root_dir required::

        Goose3DDataset.run_preprocess(preprocessor, "/data/GOOSE_3D", split="train")

    **Instance form** -- root_dir inferred from the dataset::

        ds = Goose3DDataset("/data/GOOSE_3D", keys=["lidar"], split="train")
        ds.run_preprocess(preprocessor)

    Extra keyword arguments are forwarded to the dataset constructor (class form)
    or ignored (instance form, since the instance is already configured).
    ``overwrite=True`` recomputes even if the output already exists.
    """

    @classmethod
    def describe(cls, sequence_dir: str | Path) -> dict:
        """Describe what is available in a sequence directory.

        Reads ``.apairo`` (creating it if absent) and cross-references it with
        the class's :attr:`available_keys` to produce a three-way breakdown:

        - **raw / present** -- raw channels on disk and registered
        - **raw / missing** -- raw channels known from the profile but not on disk
        - **preprocess** -- channels produced by a preprocessing pipeline

        Returns the breakdown as a dict and prints a human-readable summary.

        Example::

            MyDataset.describe("/data/my_dataset/sequence_01")
        """
        sequence_dir = Path(sequence_dir)
        config = (
            read_config(sequence_dir)
            if config_exists(sequence_dir)
            else cls(sequence_dir)._load_or_create_config(sequence_dir)
        )
        channels = config.get("channels", {})

        raw_present = sorted(
            k for k, v in channels.items() if v.get("kind", "raw") == "raw"
        )
        preprocess = {
            k: v for k, v in channels.items() if v.get("kind") == "preprocess"
        }
        raw_missing = sorted(
            k for k in getattr(cls, "available_keys", frozenset()) if k not in channels
        )

        # --- pretty print ---
        print(f"\n{cls.__name__} -- {sequence_dir.name}")
        print("─" * 50)

        print("Raw channels")
        if raw_present:
            print("  present  :", ", ".join(raw_present))
        if raw_missing:
            print("  missing  :", ", ".join(raw_missing))
        if not raw_present and not raw_missing:
            print("  (none)")

        print("Preprocessed channels")
        if preprocess:
            for key, meta in sorted(preprocess.items()):
                ts_info = (
                    f"<- timestamps from {meta['timestamps_from']}"
                    if "timestamps_from" in meta
                    else "<- own timestamps"
                )
                src_info = (
                    f"  sources: {meta['sources']}" if meta.get("sources") else ""
                )
                print(f"  {key:<20} {meta['loader']:<6} {ts_info}{src_info}")
        else:
            print("  (none)")
        print()

        return {
            "raw": {"present": raw_present, "missing": raw_missing},
            "preprocess": preprocess,
        }

    @classmethod
    def register_raw_channel(
        cls,
        sequence_dir: str | Path,
        key: str,
        loader: str,
        *,
        has_timestamps: Optional[bool] = None,
    ) -> None:
        """Declare a raw channel in ``sequence_dir/.apairo``.

        Use this to manually add or override a raw channel declaration, for
        example after :meth:`init` detected the wrong loader type.

        Args:
            sequence_dir: Dataset sequence directory.
            key: Channel name -- must match its subdirectory name.
            loader: Data format: ``"npy"``, ``"npys"``, ``"bin"``, or ``"img"``.
            has_timestamps: Whether the channel has ``timestamps.txt``.
                Auto-detected when ``None``.
        """
        _register_raw_channel(sequence_dir, key, loader, has_timestamps=has_timestamps)

    @classmethod
    def verify(cls, sequence_dir: str | Path) -> bool:
        """Verify that ``.apairo`` is coherent with what is on disk.

        Prints any issues found and returns ``True`` when the config is clean.

        Args:
            sequence_dir: Dataset sequence directory containing ``.apairo``.

        Returns:
            ``True`` if no issues were found, ``False`` otherwise.

        Example::

            ok = MyDataset.verify("/data/my_dataset/seq_01")
        """
        issues = _verify_config(sequence_dir)
        if not issues:
            print(f"OK  {Path(sequence_dir)}/.apairo")
            return True
        print(f"{len(issues)} issue(s) in {Path(sequence_dir)}/.apairo :")
        for issue in issues:
            print(f"  - {issue}")
        return False

    def _load_or_create_config(self, root_dir: Path) -> dict:
        """Read ``.apairo/channels.yaml`` if it exists, otherwise bootstrap and write it."""
        if not config_exists(root_dir):
            config = self._bootstrap_config(root_dir)
            write_config(root_dir, config)
        else:
            config = read_config(root_dir)
        return config

run_preprocess class-attribute instance-attribute

run_preprocess = _RunPreprocessDescriptor()

Run a preprocessor and persist the output channel.

Can be called on the class or on an existing instance:

Class form -- root_dir required::

Goose3DDataset.run_preprocess(preprocessor, "/data/GOOSE_3D", split="train")

Instance form -- root_dir inferred from the dataset::

ds = Goose3DDataset("/data/GOOSE_3D", keys=["lidar"], split="train")
ds.run_preprocess(preprocessor)

Extra keyword arguments are forwarded to the dataset constructor (class form) or ignored (instance form, since the instance is already configured). overwrite=True recomputes even if the output already exists.

register_channel classmethod

register_channel(sequence_dir: str | Path, key: str, loader: str, *, timestamps_from: Optional[str] = None, sources: Optional[list[str]] = None) -> None

Register a preprocessed channel in sequence_dir/.apairo.

Parameters:

Name Type Description Default
sequence_dir str | Path

Dataset sequence directory.

required
key str

Channel name -- must match its subdirectory name.

required
loader str

Data format: "npy", "npys", "bin", or "img".

required
timestamps_from Optional[str]

Channel whose timestamps to share when this channel has no timestamps.txt of its own.

None
sources Optional[list[str]]

Provenance -- channels this channel was derived from.

None
Source code in apairo/core/configurable_dataset.py
@classmethod
def register_channel(
    cls,
    sequence_dir: str | Path,
    key: str,
    loader: str,
    *,
    timestamps_from: Optional[str] = None,
    sources: Optional[list[str]] = None,
) -> None:
    """Register a preprocessed channel in ``sequence_dir/.apairo``.

    Args:
        sequence_dir: Dataset sequence directory.
        key: Channel name -- must match its subdirectory name.
        loader: Data format: ``"npy"``, ``"npys"``, ``"bin"``, or ``"img"``.
        timestamps_from: Channel whose timestamps to share when this channel
            has no ``timestamps.txt`` of its own.
        sources: Provenance -- channels this channel was derived from.
    """
    _register_channel(
        sequence_dir,
        key,
        loader,
        timestamps_from=timestamps_from,
        sources=sources,
    )

describe classmethod

describe(sequence_dir: str | Path) -> dict

Describe what is available in a sequence directory.

Reads .apairo (creating it if absent) and cross-references it with the class's :attr:available_keys to produce a three-way breakdown:

  • raw / present -- raw channels on disk and registered
  • raw / missing -- raw channels known from the profile but not on disk
  • preprocess -- channels produced by a preprocessing pipeline

Returns the breakdown as a dict and prints a human-readable summary.

Example::

MyDataset.describe("/data/my_dataset/sequence_01")
Source code in apairo/core/configurable_dataset.py
@classmethod
def describe(cls, sequence_dir: str | Path) -> dict:
    """Describe what is available in a sequence directory.

    Reads ``.apairo`` (creating it if absent) and cross-references it with
    the class's :attr:`available_keys` to produce a three-way breakdown:

    - **raw / present** -- raw channels on disk and registered
    - **raw / missing** -- raw channels known from the profile but not on disk
    - **preprocess** -- channels produced by a preprocessing pipeline

    Returns the breakdown as a dict and prints a human-readable summary.

    Example::

        MyDataset.describe("/data/my_dataset/sequence_01")
    """
    sequence_dir = Path(sequence_dir)
    config = (
        read_config(sequence_dir)
        if config_exists(sequence_dir)
        else cls(sequence_dir)._load_or_create_config(sequence_dir)
    )
    channels = config.get("channels", {})

    raw_present = sorted(
        k for k, v in channels.items() if v.get("kind", "raw") == "raw"
    )
    preprocess = {
        k: v for k, v in channels.items() if v.get("kind") == "preprocess"
    }
    raw_missing = sorted(
        k for k in getattr(cls, "available_keys", frozenset()) if k not in channels
    )

    # --- pretty print ---
    print(f"\n{cls.__name__} -- {sequence_dir.name}")
    print("─" * 50)

    print("Raw channels")
    if raw_present:
        print("  present  :", ", ".join(raw_present))
    if raw_missing:
        print("  missing  :", ", ".join(raw_missing))
    if not raw_present and not raw_missing:
        print("  (none)")

    print("Preprocessed channels")
    if preprocess:
        for key, meta in sorted(preprocess.items()):
            ts_info = (
                f"<- timestamps from {meta['timestamps_from']}"
                if "timestamps_from" in meta
                else "<- own timestamps"
            )
            src_info = (
                f"  sources: {meta['sources']}" if meta.get("sources") else ""
            )
            print(f"  {key:<20} {meta['loader']:<6} {ts_info}{src_info}")
    else:
        print("  (none)")
    print()

    return {
        "raw": {"present": raw_present, "missing": raw_missing},
        "preprocess": preprocess,
    }

register_raw_channel classmethod

register_raw_channel(sequence_dir: str | Path, key: str, loader: str, *, has_timestamps: Optional[bool] = None) -> None

Declare a raw channel in sequence_dir/.apairo.

Use this to manually add or override a raw channel declaration, for example after :meth:init detected the wrong loader type.

Parameters:

Name Type Description Default
sequence_dir str | Path

Dataset sequence directory.

required
key str

Channel name -- must match its subdirectory name.

required
loader str

Data format: "npy", "npys", "bin", or "img".

required
has_timestamps Optional[bool]

Whether the channel has timestamps.txt. Auto-detected when None.

None
Source code in apairo/core/configurable_dataset.py
@classmethod
def register_raw_channel(
    cls,
    sequence_dir: str | Path,
    key: str,
    loader: str,
    *,
    has_timestamps: Optional[bool] = None,
) -> None:
    """Declare a raw channel in ``sequence_dir/.apairo``.

    Use this to manually add or override a raw channel declaration, for
    example after :meth:`init` detected the wrong loader type.

    Args:
        sequence_dir: Dataset sequence directory.
        key: Channel name -- must match its subdirectory name.
        loader: Data format: ``"npy"``, ``"npys"``, ``"bin"``, or ``"img"``.
        has_timestamps: Whether the channel has ``timestamps.txt``.
            Auto-detected when ``None``.
    """
    _register_raw_channel(sequence_dir, key, loader, has_timestamps=has_timestamps)

verify classmethod

verify(sequence_dir: str | Path) -> bool

Verify that .apairo is coherent with what is on disk.

Prints any issues found and returns True when the config is clean.

Parameters:

Name Type Description Default
sequence_dir str | Path

Dataset sequence directory containing .apairo.

required

Returns:

Type Description
bool

True if no issues were found, False otherwise.

Example::

ok = MyDataset.verify("/data/my_dataset/seq_01")
Source code in apairo/core/configurable_dataset.py
@classmethod
def verify(cls, sequence_dir: str | Path) -> bool:
    """Verify that ``.apairo`` is coherent with what is on disk.

    Prints any issues found and returns ``True`` when the config is clean.

    Args:
        sequence_dir: Dataset sequence directory containing ``.apairo``.

    Returns:
        ``True`` if no issues were found, ``False`` otherwise.

    Example::

        ok = MyDataset.verify("/data/my_dataset/seq_01")
    """
    issues = _verify_config(sequence_dir)
    if not issues:
        print(f"OK  {Path(sequence_dir)}/.apairo")
        return True
    print(f"{len(issues)} issue(s) in {Path(sequence_dir)}/.apairo :")
    for issue in issues:
        print(f"  - {issue}")
    return False

RootSequenceMixin

Shared single-sequence vs. dataset-root handling for the asynchronous family (flat indexing, per-sequence access, per-sequence synchronize + concat). Reused by RawDataset and TartanKittiDataset.

apairo.core.root_sequence.RootSequenceMixin

Flat-indexed root over several same-typed sequence datasets.

Source code in apairo/core/root_sequence.py
class RootSequenceMixin:
    """Flat-indexed root over several same-typed sequence datasets."""

    _is_root: bool = False

    # ------------------------------------------------------------------ build

    def _init_root(
        self,
        root: str | Path,
        seq_dirs: List[Path],
        make_sequence: Callable[[Path], "RootSequenceMixin"],
        *,
        build_index: bool = True,
    ) -> None:
        """Populate the root from *seq_dirs*, one sub-dataset per directory.

        Args:
            root: The dataset root directory.
            seq_dirs: Sequence directories, in load order.
            make_sequence: Factory building one single-sequence instance of the
                concrete dataset class from a sequence directory.
            build_index: Build the flat index now.  Pass ``False`` for lazy
                datasets whose sequences have no keys loaded yet (the index is
                built later, when ``keys`` is set).
        """
        self._is_root = True
        self._root_dir = Path(root)
        self._sequences = [make_sequence(d) for d in seq_dirs]
        if build_index:
            self._build_flat_index()

    def _build_flat_index(self) -> None:
        lengths = [len(s) for s in self._sequences]
        self._cumulative_lengths = np.array([0, *np.cumsum(lengths)], dtype=np.intp)

    # ------------------------------------------------------------ subclass hooks

    def _single_available(self) -> frozenset:
        """Channels available in a single sequence -- implemented by subclasses."""
        raise NotImplementedError

    def _set_single_keys(self, keys) -> None:
        """Apply *keys* to a single sequence -- implemented by subclasses."""
        raise NotImplementedError

    # ------------------------------------------------------------- public API

    @property
    def root_dir(self) -> Path:
        return self._root_dir if self._is_root else self._sequence_dir

    @property
    def available(self) -> frozenset:
        """Channels available -- intersection across sequences for a root dataset."""
        if not self._is_root:
            return self._single_available()
        if not self._sequences:
            return frozenset()
        common = frozenset(self._sequences[0].available)
        for seq in self._sequences[1:]:
            common &= frozenset(seq.available)
        return common

    @property
    def sequences(self) -> list:
        """Per-sequence datasets (root datasets only)."""
        if not self._is_root:
            raise AttributeError("'sequences' is only available on root datasets.")
        return self._sequences

    @property
    def sequence_ids(self) -> List[str]:
        """Sequence directory names, in load order (root datasets only)."""
        if not self._is_root:
            raise AttributeError("'sequence_ids' is only available on root datasets.")
        return [seq._sequence_dir.name for seq in self._sequences]

    def sequence(self, seq_id: str) -> "SequenceView":
        """Return a :class:`~apairo.core.sequence_view.SequenceView` for *seq_id*."""
        if not self._is_root:
            raise AttributeError("'sequence()' is only available on root datasets.")
        from apairo.core.sequence_view import SequenceView

        for seq in self._sequences:
            if seq._sequence_dir.name == seq_id:
                return SequenceView(seq, range(len(seq)), seq_id)
        raise KeyError(f"Sequence '{seq_id}' not found. Available: {self.sequence_ids}")

    def synchronize(self, reference=None, method="latest", tolerance=None):
        """Resample onto a reference clock -- see :meth:`AbstractDataset.synchronize`.

        On a root dataset each sequence is synchronized independently (clocks are
        not comparable across recordings) and the results concatenated, so an
        external clock array is only valid on a single sequence.
        """
        if not self._is_root:
            return super().synchronize(
                reference=reference, method=method, tolerance=tolerance
            )
        if reference is not None and not isinstance(reference, str):
            raise ValueError(
                "An external clock array cannot be applied to a root dataset: each "
                "sequence has its own time base. Synchronize sequences individually "
                "(ds.sequences[i].synchronize(...)) and concat the results."
            )
        from apairo.dataset.concat import ConcatDataset

        return ConcatDataset([
            seq.synchronize(reference=reference, method=method, tolerance=tolerance)
            for seq in self._sequences
        ])

    # ------------------------------------------------------------------ keys

    @property
    def keys(self) -> List[str]:
        if self._is_root:
            return self._sequences[0].keys if self._sequences else []
        return super().keys

    @keys.setter
    def keys(self, keys) -> None:
        if not self._is_root:
            self._set_single_keys(keys)
            return
        if keys == "all":
            keys = sorted(self.available)
        for seq in self._sequences:
            seq.keys = list(keys)
        self._build_flat_index()

    # ------------------------------------------------------------------ dunder

    def __len__(self) -> int:
        if not self._is_root:
            return super().__len__()
        if not hasattr(self, "_cumulative_lengths"):
            raise RuntimeError("No keys loaded. Set ds.keys = [...] first.")
        return int(self._cumulative_lengths[-1])

    def _load(self, idx):
        if isinstance(idx, tuple):
            seq_id, local_idx = idx
            view = self.sequence(seq_id)
            return self._load(view._indices[local_idx])
        if not self._is_root:
            return super()._load(idx)
        if not hasattr(self, "_cumulative_lengths"):
            raise RuntimeError("No keys loaded. Set ds.keys = [...] first.")
        if not 0 <= idx < len(self):
            raise IndexError(f"Index {idx} out of range [0, {len(self)})")
        seq_idx = int(np.searchsorted(self._cumulative_lengths[1:], idx, side="right"))
        local_idx = idx - int(self._cumulative_lengths[seq_idx])
        return self._sequences[seq_idx]._load(local_idx)

available property

available: frozenset

Channels available -- intersection across sequences for a root dataset.

sequences property

sequences: list

Per-sequence datasets (root datasets only).

sequence_ids property

sequence_ids: List[str]

Sequence directory names, in load order (root datasets only).

sequence

sequence(seq_id: str) -> 'SequenceView'

Return a :class:~apairo.core.sequence_view.SequenceView for seq_id.

Source code in apairo/core/root_sequence.py
def sequence(self, seq_id: str) -> "SequenceView":
    """Return a :class:`~apairo.core.sequence_view.SequenceView` for *seq_id*."""
    if not self._is_root:
        raise AttributeError("'sequence()' is only available on root datasets.")
    from apairo.core.sequence_view import SequenceView

    for seq in self._sequences:
        if seq._sequence_dir.name == seq_id:
            return SequenceView(seq, range(len(seq)), seq_id)
    raise KeyError(f"Sequence '{seq_id}' not found. Available: {self.sequence_ids}")

synchronize

synchronize(reference=None, method='latest', tolerance=None)

Resample onto a reference clock -- see :meth:AbstractDataset.synchronize.

On a root dataset each sequence is synchronized independently (clocks are not comparable across recordings) and the results concatenated, so an external clock array is only valid on a single sequence.

Source code in apairo/core/root_sequence.py
def synchronize(self, reference=None, method="latest", tolerance=None):
    """Resample onto a reference clock -- see :meth:`AbstractDataset.synchronize`.

    On a root dataset each sequence is synchronized independently (clocks are
    not comparable across recordings) and the results concatenated, so an
    external clock array is only valid on a single sequence.
    """
    if not self._is_root:
        return super().synchronize(
            reference=reference, method=method, tolerance=tolerance
        )
    if reference is not None and not isinstance(reference, str):
        raise ValueError(
            "An external clock array cannot be applied to a root dataset: each "
            "sequence has its own time base. Synchronize sequences individually "
            "(ds.sequences[i].synchronize(...)) and concat the results."
        )
    from apairo.dataset.concat import ConcatDataset

    return ConcatDataset([
        seq.synchronize(reference=reference, method=method, tolerance=tolerance)
        for seq in self._sequences
    ])

register_channel

apairo.core.config.register_channel

register_channel(root_dir: str | Path, key: str, loader: str, *, timestamps_from: Optional[str] = None, sources: Optional[list[str]] = None, frame: Optional[str] = None) -> None

Register a preprocessed channel in root_dir/.apairo/channels.yaml.

This is the low-level standalone function. Most users will prefer the classmethod :meth:ConfigurableDataset.register_channel so that the call site names the dataset type explicitly.

Existing channels (raw or preprocessed) are preserved -- only key is updated.

Parameters:

Name Type Description Default
root_dir str | Path

Dataset root directory.

required
key str

Channel name -- must match its subdirectory name.

required
loader str

Data format: "npy", "npys", "bin", or "img".

required
timestamps_from Optional[str]

Source channel whose timestamps this channel shares (provenance only -- the channel always has its own timestamps.txt).

None
sources Optional[list[str]]

Provenance -- raw channels this channel was derived from.

None
frame Optional[str]

Coordinate frame the channel's data is expressed in (descriptive metadata only; apairo does not apply transforms).

None
Source code in apairo/core/config.py
def register_channel(
    root_dir: str | Path,
    key: str,
    loader: str,
    *,
    timestamps_from: Optional[str] = None,
    sources: Optional[list[str]] = None,
    frame: Optional[str] = None,
) -> None:
    """Register a preprocessed channel in ``root_dir/.apairo/channels.yaml``.

    This is the low-level standalone function.  Most users will prefer the
    classmethod :meth:`ConfigurableDataset.register_channel` so that the call
    site names the dataset type explicitly.

    Existing channels (raw or preprocessed) are preserved -- only ``key`` is
    updated.

    Args:
        root_dir: Dataset root directory.
        key: Channel name -- must match its subdirectory name.
        loader: Data format: ``"npy"``, ``"npys"``, ``"bin"``, or ``"img"``.
        timestamps_from: Source channel whose timestamps this channel shares
            (provenance only -- the channel always has its own ``timestamps.txt``).
        sources: Provenance -- raw channels this channel was derived from.
        frame: Coordinate frame the channel's data is expressed in (descriptive
            metadata only; apairo does not apply transforms).
    """
    root_dir = Path(root_dir)
    # Read existing config to preserve all other channels (raw + preprocessed).
    config = (
        read_config(root_dir)
        if config_exists(root_dir)
        else {"version": 1, "channels": {}}
    )

    has_ts = (root_dir / key / "timestamps.txt").exists()
    entry: dict = {"has_timestamps": has_ts, "kind": "preprocess", "loader": loader}
    if timestamps_from is not None:
        entry["timestamps_from"] = timestamps_from
    if sources:
        entry["sources"] = list(sources)
    if frame is not None:
        entry["frame"] = frame

    config["channels"][key] = entry
    write_config(root_dir, config)

WRITERS

Format writers used by the preprocessing runner. Keyed by loader name ("npy", "npys", "bin", "pt").

from apairo import WRITERS

writer = WRITERS["npy"]()
writer.write(my_array, Path("/data/output/000000.npy"))

apairo.writer.WRITERS module-attribute

WRITERS: dict[str, type] = {'npy': NPYWriter, 'npys': NPYWriter, 'bin': BINWriter, 'zarr': ZarrWriter, 'img': TarImageWriter}

DERIVED_LOADERS

File-level loaders for derived/preprocessed keys. Keyed by loader name ("npy", "pt", "bin", "img"). Each entry is a Callable[[Path], torch.Tensor].

from apairo import DERIVED_LOADERS

tensor = DERIVED_LOADERS["npy"](Path("/data/output/000000.npy"))

apairo.loader.DERIVED_LOADERS module-attribute

DERIVED_LOADERS: dict[str, Callable[[Path], ndarray]] = {'npy': lambda path: load(path), 'bin': lambda path: fromfile(path, dtype=float32), 'img': _load_img}

Preprocessing

FramePreprocessor

apairo.core.preprocessor.FramePreprocessor

Bases: Preprocessor

Preprocessor that operates frame-by-frame.

The runner calls :meth:process once per input frame. Use this for per-scan operations (label inference, feature extraction, …).

Output is stored as one file per frame (000000.npy, 000001.npy, …) when output_loader is "npys" or "bin".

Example::

class TravLabel(FramePreprocessor):
    output_key    = "trav_label"
    output_loader = "npys"
    input_keys    = ["velodyne_0"]
    timestamps_from = "velodyne_0"   # no own timestamps.txt

    def process(self, sample: Sample) -> np.ndarray:
        pts = sample.data["velodyne_0"]
        return my_model(pts)
Source code in apairo/core/preprocessor.py
class FramePreprocessor(Preprocessor):
    """Preprocessor that operates frame-by-frame.

    The runner calls :meth:`process` once per input frame.  Use this for
    per-scan operations (label inference, feature extraction, …).

    Output is stored as one file per frame (``000000.npy``, ``000001.npy``,
    …) when ``output_loader`` is ``"npys"`` or ``"bin"``.

    Example::

        class TravLabel(FramePreprocessor):
            output_key    = "trav_label"
            output_loader = "npys"
            input_keys    = ["velodyne_0"]
            timestamps_from = "velodyne_0"   # no own timestamps.txt

            def process(self, sample: Sample) -> np.ndarray:
                pts = sample.data["velodyne_0"]
                return my_model(pts)
    """

    @abstractmethod
    def process(self, sample: Sample) -> Any:
        """Process one frame.

        Args:
            sample: A :class:`~apairo.core.sample.Sample` whose ``data`` dict
                contains exactly the keys declared in :attr:`input_keys`.

        Returns:
            A ``numpy.ndarray`` representing the output for this frame.
        """
        ...

process abstractmethod

process(sample: Sample) -> Any

Process one frame.

Parameters:

Name Type Description Default
sample Sample

A :class:~apairo.core.sample.Sample whose data dict contains exactly the keys declared in :attr:input_keys.

required

Returns:

Type Description
Any

A numpy.ndarray representing the output for this frame.

Source code in apairo/core/preprocessor.py
@abstractmethod
def process(self, sample: Sample) -> Any:
    """Process one frame.

    Args:
        sample: A :class:`~apairo.core.sample.Sample` whose ``data`` dict
            contains exactly the keys declared in :attr:`input_keys`.

    Returns:
        A ``numpy.ndarray`` representing the output for this frame.
    """
    ...

SequencePreprocessor

apairo.core.preprocessor.SequencePreprocessor

Bases: Preprocessor

Preprocessor that operates on the full sequence at once.

The runner calls :meth:process with an iterator over all input frames. Use this for algorithms that need global context (ICP, trajectory smoothing, …).

Output is stored as a single {output_key}.npy file when output_loader is "npy".

Example::

class GICPPoses(SequencePreprocessor):
    output_key    = "gicp_poses"
    output_loader = "npy"
    input_keys    = ["velodyne_0"]
    sources       = ["velodyne_0"]   # has its own timestamps.txt

    def process(self, frames: Iterator[Sample]) -> np.ndarray:
        poses = []
        for sample in frames:
            pts = sample.data["velodyne_0"]
            poses.append(register(pts))
        return np.stack(poses)           # (N, 4, 4)
Source code in apairo/core/preprocessor.py
class SequencePreprocessor(Preprocessor):
    """Preprocessor that operates on the full sequence at once.

    The runner calls :meth:`process` with an iterator over all input frames.
    Use this for algorithms that need global context (ICP, trajectory
    smoothing, …).

    Output is stored as a single ``{output_key}.npy`` file when
    ``output_loader`` is ``"npy"``.

    Example::

        class GICPPoses(SequencePreprocessor):
            output_key    = "gicp_poses"
            output_loader = "npy"
            input_keys    = ["velodyne_0"]
            sources       = ["velodyne_0"]   # has its own timestamps.txt

            def process(self, frames: Iterator[Sample]) -> np.ndarray:
                poses = []
                for sample in frames:
                    pts = sample.data["velodyne_0"]
                    poses.append(register(pts))
                return np.stack(poses)           # (N, 4, 4)
    """

    @abstractmethod
    def process(self, frames: Iterator[Sample]) -> Any:
        """Process all frames.

        Args:
            frames: Iterator of :class:`~apairo.core.sample.Sample` objects.

        Returns:
            A ``numpy.ndarray`` of shape ``(N, ...)``.
        """
        ...

process abstractmethod

process(frames: Iterator[Sample]) -> Any

Process all frames.

Parameters:

Name Type Description Default
frames Iterator[Sample]

Iterator of :class:~apairo.core.sample.Sample objects.

required

Returns:

Type Description
Any

A numpy.ndarray of shape (N, ...).

Source code in apairo/core/preprocessor.py
@abstractmethod
def process(self, frames: Iterator[Sample]) -> Any:
    """Process all frames.

    Args:
        frames: Iterator of :class:`~apairo.core.sample.Sample` objects.

    Returns:
        A ``numpy.ndarray`` of shape ``(N, ...)``.
    """
    ...

Data structures

Sample

apairo.core.sample.Sample dataclass

A single dataset sample -- one timeline event or one complete frame.

For temporal datasets: data has one key, timestamp is set. For synchronous datasets: data has all modality keys, timestamp is None.

Source code in apairo/core/sample.py
@dataclass
class Sample:
    """A single dataset sample -- one timeline event or one complete frame.

    For temporal datasets: data has one key, timestamp is set.
    For synchronous datasets: data has all modality keys, timestamp is None.
    """

    data: dict[str, Any]
    timestamp: float | None = None

ModalitySpec

apairo.core.profiled_dataset.ModalitySpec dataclass

Source code in apairo/core/profiled_dataset.py
@dataclass
class ModalitySpec:
    ext: str
    dtype: Optional[str] = None
    reshape: Optional[list] = None
    mask: Optional[int] = None
    torch_dtype: Optional[str] = None
    loader: Optional[str] = None
    subpath: list[str] = field(default_factory=list)
    optional: bool = False
    resolved_dtype: Optional[type] = field(default=None, compare=False, repr=False)

    @classmethod
    def from_dict(cls, key: str, d: dict) -> "ModalitySpec":
        ext = d.get("ext", "")
        if ext and not ext.startswith("."):
            ext = f".{ext}"
        torch_dtype = d.get("torch_dtype")
        return cls(
            ext=ext,
            dtype=d.get("dtype"),
            reshape=d.get("reshape"),
            mask=d.get("mask"),
            torch_dtype=torch_dtype,
            loader=d.get("loader"),
            subpath=d.get("subpath", []),
            optional=d.get("optional", False),
            resolved_dtype=_NUMPY_DTYPE.get(torch_dtype) if torch_dtype else None,
        )

    @property
    def is_sequence_file(self) -> bool:
        return self.loader in _SEQUENCE_LOADERS

    def effective_subpath(self, key: str) -> list[str]:
        return self.subpath if self.subpath else [key]