# This file is part of AIdsorb.
# Copyright (C) 2024 Antonios P. Sarikas
# AIdsorb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
r"""
This module provides helper functions and classes for creating datasets and
handling point clouds of variable sizes.
"""
import os
import json
from pathlib import Path
from typing import Sequence
import numpy as np
import torch
from torch.utils.data import random_split, Dataset
from torch.nn.utils.rnn import pad_sequence
from . _internal import _SEED, pd
[docs]
def prepare_data(source: str, split_ratio: Sequence=(0.8, 0.1, 0.1), seed: int=_SEED):
r"""
Split a source of point clouds in train, validation and test sets.
Each ``.json`` file that is created, stores the names of the point clouds
that will be used for *training*, *validation* and *testing*.
.. warning::
* No directory is created by :func:`prepare_data`. All ``.json`` files
are stored under the directory containing ``source``.
* Splitting doesn't support stratification. If your dataset is small and
you want to perform classification, consider using
`train_test_split`_.
.. _train_test_split: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html
Parameters
----------
source : str
Absolute or relative path to the file holding the point clouds.
split_ratio : sequence, default=(0.8, 0.1, 0.1)
The sizes or fractions of splits to be produced.
* ``split_ratio[0] == train``.
* ``split_ratio[1] == validation``.
* ``split_ratio[2] == test``.
seed : int, default=1
Controls the randomness of the ``rng`` used for splitting.
Examples
--------
Before the split::
pcd_data
└──source.npz
>>> prepare_data('path/to/pcd_data/source.npz') # doctest: +SKIP
After the split::
pcd_data
├──source.npz
├──train.json
├──validation.json
└──test.json
"""
rng = torch.Generator().manual_seed(seed)
path = Path(source).parent
pcds = np.load(source)
train, val, test = random_split(pcds.files, split_ratio, generator=rng)
for split, mode in zip((train, val, test), ('train', 'validation', 'test')):
names = list(split)
with open(os.path.join(path, f'{mode}.json'), 'w') as fhand:
json.dump(names, fhand, indent=4)
print('\033[32mData preparation completed!\033[0m')
[docs]
def get_names(filename):
r"""
Return names stored in a ``.json`` file.
Parameters
----------
filename : str
The name of the file from which names will be retrieved.
Returns
-------
names : list
"""
with open(filename, 'r') as fhand:
names = json.load(fhand)
return names
[docs]
def upsample_pcd(pcd, size):
r"""
Upsample ``pcd`` to a new ``size`` by sampling with replacement from ``pcd``.
Parameters
----------
pcd : tensor of shape (N, C)
The original point cloud of size ``N``.
size : int
The size of the new point cloud.
Returns
-------
new_pcd : tensor of shape (size, C).
Examples
--------
>>> pcd = torch.tensor([[2, 4, 5, 6]])
>>> upsample_pcd(pcd, 3)
tensor([[2, 4, 5, 6],
[2, 4, 5, 6],
[2, 4, 5, 6]])
>>> # New points point must be from pcd.
>>> pcd = torch.randn(10, 4)
>>> new_pcd = upsample_pcd(pcd, 20)
>>> (new_pcd[-1] == pcd).all(1).any() # Check for last point.
tensor(True)
>>> # No upsampling.
>>> pcd = torch.randn(100, 4)
>>> new_pcd = upsample_pcd(pcd, len(pcd))
>>> torch.equal(pcd, new_pcd)
True
"""
n_samples = size - len(pcd)
indices = torch.from_numpy(np.random.choice(len(pcd), n_samples, replace=True))
new_points = pcd[indices]
return torch.cat((pcd, new_points))
[docs]
def pad_pcds(pcds, channels_first=True, mode='upsample'):
r"""
Pad a sequence of variable size point clouds.
Each point cloud must have shape ``(N_i, C)``.
Parameters
----------
pcds : sequence of tensors
mode : {'zeropad', 'upsample'}, default='upsample'
channels_first : bool, default=True
Returns
-------
batch : tensor of shape (B, T, C) or (B, C, T)
If ``channels_first=False``, then ``batch`` has shape ``(B, T, C)``,
where ``B == len(pcds)`` is the batch size and ``T`` is the size of
the largest point cloud in ``pcds``. Otherwise, ``(B, C, T)``.
See Also
--------
:func:`upsample_pcd` : For a description of ``'upsample'`` mode.
:func:`torch.nn.utils.rnn.pad_sequence` : For a description of ``'zeropad'`` mode.
Examples
--------
>>> x1 = torch.tensor([[1, 2, 3, 4]])
>>> x2 = torch.tensor([[2, 5, 3, 8], [0, 2, 8, 9]])
>>> batch = pad_pcds((x1, x2), channels_first=False)
>>> batch
tensor([[[1, 2, 3, 4],
[1, 2, 3, 4]],
<BLANKLINE>
[[2, 5, 3, 8],
[0, 2, 8, 9]]])
>>> batch = pad_pcds((x1, x2), channels_first=True)
>>> batch
tensor([[[1, 1],
[2, 2],
[3, 3],
[4, 4]],
<BLANKLINE>
[[2, 0],
[5, 2],
[3, 8],
[8, 9]]])
>>> batch = pad_pcds((x1, x2), channels_first=False, mode='zeropad')
>>> batch
tensor([[[1, 2, 3, 4],
[0, 0, 0, 0]],
<BLANKLINE>
[[2, 5, 3, 8],
[0, 2, 8, 9]]])
>>> batch = pad_pcds((x1, x2), channels_first=True, mode='zeropad')
>>> batch
tensor([[[1, 0],
[2, 0],
[3, 0],
[4, 0]],
<BLANKLINE>
[[2, 0],
[5, 2],
[3, 8],
[8, 9]]])
"""
if mode == 'zeropad':
batch = pad_sequence(pcds, batch_first=True, padding_value=0)
elif mode == 'upsample':
max_len = max(len(i) for i in pcds)
new_pcds = [upsample_pcd(p, max_len) if len(p) < max_len else p for p in pcds]
batch = torch.stack(new_pcds)
# Shape (B, n_points, C).
if channels_first:
batch = batch.transpose(1, 2) # Shape (B, C, n_points).
return batch
[docs]
class Collator():
r"""
Collate a sequence of samples into a ``batch``.
Point clouds are padded before collation, so they can form a batch.
.. rubric:: Shapes
* Input: sequence of samples
Each sample is a tuple of tensors ``(pcd, label)``, where
``pcd`` has shape ``(N_i, C)`` and ``label`` has shape
``(n_outputs,)`` or ``()``.
* Output: tuple of length 2
* ``batch[0] == x`` with shape ``(B, C, T)`` if ``channels_first=True``,
otherwise ``(B, T, C)``. ``B`` is the batch size and ``T`` is the size
of the largest point cloud in the sequence.
* ``batch[1] == y`` with shape ``(B, n_outputs)`` or ``(B,)``.
.. tip::
Use an instance of this class as ``collate_fn`` with
``channels_first=True``, if your model is :class:`~aidsorb.models.PointNet`.
.. todo::
Add functionality for collating only point clouds (useful when the
dataset is unlabeled).
Parameters
----------
channels_first : bool, default=True
mode : {'zeropad', 'upsample'}, default='upsample'
See Also
--------
:func:`pad_pcds` : For a description of the parameters.
:func:`upsample_pcd` : For a description of the parameters.
Examples
--------
>>> sample1 = (torch.tensor([[1, 4, 5, 2]]), torch.tensor([1., 2.]))
>>> sample2 = (torch.tensor([[0, 4, 0, 2], [2, 4, 1, 8]]), torch.tensor([7., 3.]))
>>> collate_fn = Collator()
>>> x, y = collate_fn((sample1, sample2))
>>> x.shape
torch.Size([2, 4, 2])
>>> y.shape
torch.Size([2, 2])
>>> x
tensor([[[1, 1],
[4, 4],
[5, 5],
[2, 2]],
<BLANKLINE>
[[0, 2],
[4, 4],
[0, 1],
[2, 8]]])
>>> y
tensor([[1., 2.],
[7., 3.]])
>>> collate_fn = Collator(channels_first=False, mode='zeropad')
>>> x, y = collate_fn((sample1, sample2))
>>> x
tensor([[[1, 4, 5, 2],
[0, 0, 0, 0]],
<BLANKLINE>
[[0, 4, 0, 2],
[2, 4, 1, 8]]])
>>> y
tensor([[1., 2.],
[7., 3.]])
>>> # Label has shape (), i.e. is scalar.
>>> sample1 = (torch.tensor([[3, 4, 3, 2]]), torch.tensor(0))
>>> sample2 = (torch.tensor([[2, 4, 8, 2], [9, 4, 1, 8]]), torch.tensor(1))
>>> collate_fn = Collator(channels_first=False, mode='zeropad')
>>> x, y = collate_fn((sample1, sample2))
>>> x
tensor([[[3, 4, 3, 2],
[0, 0, 0, 0]],
<BLANKLINE>
[[2, 4, 8, 2],
[9, 4, 1, 8]]])
>>> y
tensor([0, 1])
"""
def __init__(self, channels_first=True, mode='upsample'):
self.channels_first = channels_first
self.mode = mode
def __call__(self, samples):
r"""
Parameters
----------
samples : sequence of tuples
Each sample is a tuple of tensors ``(pcd, label)`` where
``pcd.shape == (n_points, C)`` and ``label`` has shape
``(n_outputs,)`` or ``()``.
Returns
-------
batch : tuple of length 2
* ``batch[0] == x`` with shape ``(B, C, T)`` or ``(B, T, C)``, where
``T`` is the size of the largest point cloud.
* ``batch[1] == y`` with shape ``(B, n_outputs)`` or ``(B,)``.
"""
pcds, labels = list(zip(*samples))
x = pad_pcds(pcds, channels_first=self.channels_first, mode=self.mode)
y = torch.stack(labels)
return x, y
[docs]
class PCDDataset(Dataset):
r"""
``Dataset`` for point clouds.
.. tip::
For implementing your own transforms, have a look at the transforms
`tutorial`_. For more flexibility, consider implementing them as
callable instances of classes.
.. _tutorial: https://pytorch.org/tutorials/beginner/data_loading_tutorial.html#transforms
Parameters
----------
pcd_names : list
List containing the names of the point clouds.
path_to_X : str
Absolute or relative path to the ``.npz`` file holding the point clouds.
path_to_Y : str, optional
Absolute or relative path to the ``.csv`` file holding the labels of the
point clouds.
.. warning::
The comma ``,`` is assumed as the field separator.
index_col : str, optional
Column name of the ``.csv`` file to be used as row labels. The names
(values) under this column must follow the same naming scheme as in
``pcd_names``.
labels : list, optional
List containing the names of the properties to be predicted. No effect
if ``path_to_Y=None``.
transform_x : callable, optional
Transforms applied to ``input``, i.e to each point cloud.
transform_y : callable, optional
Transforms applied to ``output``. No effect if ``path_to_Y=None``.
See Also
--------
:mod:`aidsorb.transforms` : For available point cloud transformations.
"""
def __init__(
self, pcd_names, path_to_X,
path_to_Y=None, index_col=None, labels=None,
transform_x=None, transform_y=None,
):
if (labels is not None) and (type(labels) != list):
raise ValueError('labels must be a list!')
self._pcd_names = pcd_names
self.path_to_X = path_to_X
self.path_to_Y = path_to_Y
self.labels = labels
self.index_col = index_col
self.transform_x = transform_x
self.transform_y = transform_y
self.X = None
self.Y = None
@property
def pcd_names(self):
r"""The names of the point clouds."""
return self._pcd_names
def __len__(self):
return len(self.pcd_names)
def __getitem__(self, idx):
# Account for np.load and multiprocessing.
if self.X is None:
self.X = np.load(self.path_to_X)
if self.Y is None and self.path_to_Y is not None:
self.Y = pd.read_csv(
self.path_to_Y,
index_col=self.index_col,
usecols=[*self.labels, self.index_col],
)
name = self.pcd_names[idx]
sample_x = self.X[name]
if self.transform_x is not None:
sample_x = self.transform_x(sample_x)
# Only for labeled datasets.
if self.Y is not None:
sample_y = self.Y.loc[name].to_numpy()
if self.transform_y is not None:
sample_y = self.transform_y(sample_y)
return (
torch.tensor(sample_x, dtype=torch.float),
torch.tensor(sample_y, dtype=torch.float)
)
return torch.tensor(sample_x, dtype=torch.float)