Source code for aidsorb.data

# This file is part of AIdsorb.
# Copyright (C) 2024 Antonios P. Sarikas

# AIdsorb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

r"""
Helper functions and classes for creating datasets and handling point clouds of
variable sizes.
"""

import json
import os
from collections.abc import Callable, Sequence
from pathlib import Path

import numpy as np
import torch
from torch import Tensor
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, random_split

from ._internal import pd
from .transforms import upsample_pcd


[docs] def prepare_data( source: str, split_ratio: Sequence | None = None, seed: int = 1, ) -> None: r""" Split point clouds into train, validation and test sets. Each ``.json`` file that is created, stores the names of the point clouds that will be used for training, validation and testing. .. warning:: * All ``.json`` files are stored under the parent directory of ``source``. * Splitting doesn't support stratification. If your dataset is small and you want to perform classification, consider using `train_test_split`_. .. _train_test_split: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html Parameters ---------- source : str Absolute or relative path to the directory holding the point clouds. split_ratio : sequence, default=None Absolute sizes or fractions of splits of the form ``(train, val, test)``. If :obj:`None`, it is set to ``(0.8, 0.1, 0.1)``. seed : int, default=1 Controls randomness of the ``rng`` used for splitting. Examples -------- Before the split:: project_root └── source ├── foo.npy ├── ... └── bar.npy >>> prepare_data('path/to/source') # doctest: +SKIP After the split:: project_root ├── source │   ├── foo.npy │   ├── ... │   └── bar.npy ├── test.json ├── train.json └── validation.json """ rng = torch.Generator().manual_seed(seed) path = Path(source).parent pcd_names = [name.removesuffix('.npy') for name in sorted(os.listdir(source))] # Set default split ratio. if split_ratio is None: split_ratio = (0.8, 0.1, 0.1) # Split the names of the point clouds. train, val, test = random_split(pcd_names, split_ratio, generator=rng) for split, mode in zip((train, val, test), ('train', 'validation', 'test')): names = list(split) filename = os.path.join(path, f'{mode}.json') with open(filename, 'w') as fhand: json.dump(names, fhand, indent=4) print(f'Created file: \033[0;34m{filename}\033[0m') print('\033[32;1mData preparation completed!\033[0m')
[docs] def get_names(filename: str) -> tuple: r""" Return point cloud names stored in a ``.json`` file. Parameters ---------- filename : str Absolute or relative path to the file. Returns ------- names : tuple """ with open(filename, 'r') as fhand: return tuple(json.load(fhand))
[docs] def pad_pcds( pcds: Sequence[Tensor], *, channels_first: bool, mode: str = 'upsample', return_mask: bool = False, ) -> Tensor | tuple: r""" Pad a sequence of variable size point clouds. Each point cloud must have shape ``(N_i, C)``. .. rubric:: Shapes * ``batch`` tensor of shape ``(B, T, C)`` if ``channels_first=False``, else ``(B, C, T)``. * ``mask`` boolean tensor of shape ``(B, T)`` where :obj:`True` indicates padding. ``B`` is the batch size and ``T`` is the size of the largest point cloud in the sequence. Parameters ---------- pcds : sequence of tensors channels_first : bool mode : {'zeropad', 'upsample'}, default='upsample' return_mask : bool, default=False Returns ------- tensor or tuple of tensors ``batch`` if ``return_mask=False``, else ``(batch, mask)``. See Also -------- :func:`~.upsample_pcd` : For a description of ``'upsample'`` mode. :func:`torch.nn.utils.rnn.pad_sequence` : For a description of ``'zeropad'`` mode. Examples -------- >>> x1 = torch.tensor([[1, 2, 3, 4]]) >>> x2 = torch.tensor([[2, 5, 3, 8], [0, 2, 8, 9]]) >>> batch = pad_pcds((x1, x2), channels_first=False) >>> batch tensor([[[1, 2, 3, 4], [1, 2, 3, 4]], <BLANKLINE> [[2, 5, 3, 8], [0, 2, 8, 9]]]) >>> batch = pad_pcds((x1, x2), channels_first=True) >>> batch tensor([[[1, 1], [2, 2], [3, 3], [4, 4]], <BLANKLINE> [[2, 0], [5, 2], [3, 8], [8, 9]]]) >>> batch = pad_pcds((x1, x2), channels_first=False, mode='zeropad') >>> batch tensor([[[1, 2, 3, 4], [0, 0, 0, 0]], <BLANKLINE> [[2, 5, 3, 8], [0, 2, 8, 9]]]) >>> batch = pad_pcds((x1, x2), channels_first=True, mode='zeropad') >>> batch tensor([[[1, 0], [2, 0], [3, 0], [4, 0]], <BLANKLINE> [[2, 0], [5, 2], [3, 8], [8, 9]]]) >>> # Pad and return padding mask (useful for attention-based architectures). >>> batch, mask = pad_pcds((x1, x2), channels_first=False, return_mask=True) >>> batch tensor([[[1, 2, 3, 4], [1, 2, 3, 4]], <BLANKLINE> [[2, 5, 3, 8], [0, 2, 8, 9]]]) >>> mask tensor([[False, True], [False, False]]) >>> # Pad a single point cloud. >>> pad_pcds([x1], channels_first=False, mode='zeropad') tensor([[[1, 2, 3, 4]]]) >>> pad_pcds([x1], channels_first=True, mode='upsample') tensor([[[1], [2], [3], [4]]]) """ pcd_len = torch.tensor([len(p) for p in pcds]) max_len = pcd_len.max().item() if mode == 'zeropad': batch = pad_sequence( pcds, batch_first=True, padding_value=0.0, padding_side='right' ) elif mode == 'upsample': padded_pcds = [upsample_pcd(p, max_len) if len(p) < max_len else p for p in pcds] batch = torch.stack(padded_pcds) # Shape (B, max_len, C). if channels_first: batch = batch.transpose(1, 2) # Shape (B, C, max_len). # Note: right padding is assumed. if return_mask: mask = torch.arange(max_len)[None] >= pcd_len[:, None] return batch, mask return batch
[docs] class Collator: r""" Collate a sequence of samples into a batch. Point clouds are padded before collation, so they can form a batch. .. rubric:: Shapes * Input: sequence of samples Each sample is a tuple of ``(pcd, label)``. * ``pcd`` tensor of shape ``(N_i, C)``. * ``label`` tensor of shape ``(n_outputs,)``, ``()`` or :obj:`None`. * Output: tuple If ``return_mask=False``, then output is ``(x, y)``, else ``((x, mask), y)``. * ``x`` tensor of shape ``(B, C, T)`` if ``channels_first=True``, else ``(B, T, C)``. * ``y`` tensor of shape ``(B, n_outputs)``, ``(B,)`` or :obj:`None`. * ``mask`` boolean tensor of shape ``(B, T)`` where :obj:`True` indicates padding. ``B`` is the batch size and ``T`` is the size of the largest point cloud in the sequence. Parameters ---------- channels_first : bool mode : {'zeropad', 'upsample'}, default='upsample' return_mask : bool, default=False See Also -------- :func:`pad_pcds` : For a description of the parameters. Examples -------- >>> sample1 = (torch.tensor([[1, 4, 5, 2]]), torch.tensor([1., 2.])) >>> sample2 = (torch.tensor([[0, 4, 0, 2], [2, 4, 1, 8]]), torch.tensor([7., 3.])) >>> collate_fn = Collator(channels_first=True) >>> x, y = collate_fn((sample1, sample2)) >>> x tensor([[[1, 1], [4, 4], [5, 5], [2, 2]], <BLANKLINE> [[0, 2], [4, 4], [0, 1], [2, 8]]]) >>> y tensor([[1., 2.], [7., 3.]]) >>> collate_fn = Collator(channels_first=False, mode='zeropad') >>> x, y = collate_fn((sample1, sample2)) >>> x tensor([[[1, 4, 5, 2], [0, 0, 0, 0]], <BLANKLINE> [[0, 4, 0, 2], [2, 4, 1, 8]]]) >>> y tensor([[1., 2.], [7., 3.]]) >>> # Label has shape (), i.e. is scalar. >>> sample1 = (torch.tensor([[3, 4, 3, 2]]), torch.tensor(0)) >>> sample2 = (torch.tensor([[2, 4, 8, 2], [9, 4, 1, 8]]), torch.tensor(1)) >>> collate_fn = Collator(channels_first=False, mode='zeropad') >>> x, y = collate_fn((sample1, sample2)) >>> x tensor([[[3, 4, 3, 2], [0, 0, 0, 0]], <BLANKLINE> [[2, 4, 8, 2], [9, 4, 1, 8]]]) >>> y tensor([0, 1]) >>> # Label is None, i.e. unlabeled data. >>> sample1 = (torch.tensor([[1., 0., 1., 0.]]), None) >>> sample2 = (torch.tensor([[5., 2., 2., 0.], [9., 0., 0., 1.]]), None) >>> collate_fn = Collator(channels_first=True, mode='zeropad') >>> x, y = collate_fn((sample1, sample2)) >>> x tensor([[[1., 0.], [0., 0.], [1., 0.], [0., 0.]], <BLANKLINE> [[5., 9.], [2., 0.], [2., 0.], [0., 1.]]]) >>> y >>> # Collate and return padding mask. >>> sample1 = (torch.tensor([[4, 2, 1, 4], [2, 0, 0, 1]]), torch.tensor(1)) >>> sample2 = (torch.tensor([[1, 2, 3, 1]]), torch.tensor(4)) >>> collate_fn = Collator(channels_first=False, mode='zeropad', return_mask=True) >>> (x, mask), y = collate_fn((sample1, sample2)) >>> x tensor([[[4, 2, 1, 4], [2, 0, 0, 1]], <BLANKLINE> [[1, 2, 3, 1], [0, 0, 0, 0]]]) >>> y tensor([1, 4]) >>> mask tensor([[False, False], [False, True]]) >>> # Batch a single unlabeled sample. >>> sample = (torch.tensor([[2, 3, 4]]), None) >>> collate_fn = Collator(channels_first=False) >>> x, y = collate_fn([sample]) >>> x tensor([[[2, 3, 4]]]) >>> y >>> # Batch a single labeled sample. >>> sample = (torch.tensor([[1, 1, 2]]), torch.tensor(10)) >>> collate_fn = Collator(channels_first=True, mode='zeropad') >>> x, y = collate_fn([sample]) >>> x tensor([[[1], [1], [2]]]) >>> y tensor([10]) """ def __init__( self, *, channels_first: bool, mode: str = 'upsample', return_mask: bool = False, ) -> None: self.channels_first = channels_first self.mode = mode self.return_mask = return_mask def __call__( self, samples: Sequence[tuple[Tensor, Tensor | None]], ) -> tuple[Tensor, Tensor | None]: r""" Parameters ---------- samples : sequence of tuples Each sample is a tuple of tensors ``(pcd, label)`` or ``(pcd, None)``. Returns ------- tuple ``(x, y)`` or ``(x, None)``. If ``return_mask=True``, then ``x`` is a tuple ``(batch, mask)``, else ``batch``. """ pcds, labels = list(zip(*samples)) x = pad_pcds( pcds, channels_first=self.channels_first, mode=self.mode, return_mask=self.return_mask ) y = torch.stack(labels) if None not in labels else None return x, y
[docs] class PCDDataset(Dataset): r""" :class:`~torch.utils.data.Dataset` for point clouds. Indexing the dataset returns ``(x, None)`` if data are unlabeled, i.e. ``path_to_Y=None``, else ``(x, y)``, where ``x`` and ``y`` are the results of ``transform_x`` and ``transform_y``, respectively. .. note:: * All data (i.e. point cloud and its label) are converted to :class:`~.torch.Tensor` before passed to transforms. As such, ``transform_x`` and ``transform_y`` expect :class:`~.torch.Tensor` as input. * ``y`` has shape ``(len(labels),)`` if ``transform_y=None``. * Comma ``,`` is assumed as the field separator in ``.csv`` file. Parameters ---------- pcd_names : sequence Point cloud names. path_to_X : str Absolute or relative path to the directory holding the point clouds. path_to_Y : str, optional Absolute or relative path to the ``.csv`` file holding the labels of the point clouds. index_col : str, optional Column name of the ``.csv`` file to be used for indexing. This column must include ``pcd_names``. No effect if ``path_to_Y=None``. labels : list, optional List of column names from the ``.csv`` file containing the properties to be predicted. No effect if ``path_to_Y=None``. transform_x : callable, optional Transformation to apply to point cloud. transform_y : callable, optional Transformation to apply to label. No effect if ``path_to_Y=None``. See Also -------- :mod:`aidsorb.transforms` : For available point cloud transformations. """ def __init__( self, pcd_names: Sequence[str], path_to_X: str, *, path_to_Y: str | None = None, index_col: str | None = None, labels: list[str] | None = None, transform_x: Callable | None = None, transform_y: Callable | None = None, ) -> None: self._pcd_names = tuple(pcd_names) # Immutable for safety. self.path_to_X = path_to_X self.path_to_Y = path_to_Y self.index_col = index_col self.labels = labels self.transform_x = transform_x self.transform_y = transform_y #: Dataframe for the labels. The columns follow the order in ``labels``. self.Y = None if self.path_to_Y is not None: # Only for labeled datasets. self.Y = pd.read_csv( self.path_to_Y, index_col=self.index_col, usecols=[*self.labels, self.index_col], )[self.labels] @property def pcd_names(self) -> tuple: r"""Point cloud names.""" return self._pcd_names def __len__(self) -> int: return len(self.pcd_names) def __getitem__(self, idx: int) -> tuple[Tensor, Tensor | None]: pcd_name = self.pcd_names[idx] pcd_path = os.path.join(self.path_to_X, f'{pcd_name}.npy') pcd = torch.tensor(np.load(pcd_path), dtype=torch.float) label = None if self.transform_x is not None: pcd = self.transform_x(pcd) if self.Y is not None: y_arr = self.Y.loc[pcd_name].to_numpy() dtype = torch.float if np.issubdtype(y_arr.dtype, np.floating) else None label = torch.tensor(y_arr, dtype=dtype) if self.transform_y is not None: label = self.transform_y(label) return pcd, label