SLP utility functions
Reuseable utility functions for pytorch and system operations.
configure_logging(logfile_prefix=None)
configure_logging Configure loguru to intercept logging module logs, tqdm.writes and write to a logfile
We use logure for stdout/stderr logging in this project. This function configures loguru to intercept logs from other modules that use the default python logging module. It also configures loguru so that it plays well with writes in the tqdm progress bars If a logfile_prefix is provided, loguru will also write all logs into a logfile with a unique name constructed using logfile_prefix and datetime.now()
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logfile_prefix |
Optional[str] |
Optional prefix to file where logs will be written. |
None |
Returns:
Type | Description |
---|---|
Optional[str] |
str: The logfile where logs are written |
Examples:
>>> configure_logging("logs/my-cool-experiment)
logs/my-cool-experiment.20210228-211832.log
Source code in slp/util/log.py
def configure_logging(logfile_prefix: Optional[str] = None) -> Optional[str]:
"""configure_logging Configure loguru to intercept logging module logs, tqdm.writes and write to a logfile
We use logure for stdout/stderr logging in this project.
This function configures loguru to intercept logs from other modules that use the default python logging module.
It also configures loguru so that it plays well with writes in the tqdm progress bars
If a logfile_prefix is provided, loguru will also write all logs into a logfile with a unique name constructed using
logfile_prefix and datetime.now()
Args:
logfile_prefix (Optional[str]): Optional prefix to file where logs will be written.
Returns:
str: The logfile where logs are written
Examples:
>>> configure_logging("logs/my-cool-experiment)
logs/my-cool-experiment.20210228-211832.log
"""
class InterceptHandler(logging.Handler):
def emit(self, record):
"""Intercept standard logging logs in loguru. Should test this for distributed pytorch lightning"""
# Get corresponding Loguru level if it exists
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Find caller from where originated the logged message
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
logger.info("Intercepting standard logging logs in loguru")
# Make loguru play well with tqdm
logger.remove()
def tqdm_write(msg: str) -> Any:
"""Loguru wrapper for tqdm.write"""
return tqdm.write(msg, end="")
logger.add(tqdm_write, colorize=True)
logging.basicConfig(handlers=[InterceptHandler()], level=logging.INFO)
logfile = None
if logfile_prefix is not None:
logfile = log_to_file(logfile_prefix)
logger.info(f"Log file will be saved in {logfile}")
return logfile
log_to_file(fname_prefix)
log_to_file Configure loguru to log to a logfile
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fname_prefix |
Optional[str] |
Optional prefix to file where logs will be written. |
required |
Returns:
Type | Description |
---|---|
str |
str: The logfile where logs are written |
Source code in slp/util/log.py
def log_to_file(fname_prefix: Optional[str]) -> str:
"""log_to_file Configure loguru to log to a logfile
Args:
fname_prefix (Optional[str]): Optional prefix to file where logs will be written.
Returns:
str: The logfile where logs are written
"""
logfile = f"{fname_prefix}.{date_fname()}.log"
logger.add(
logfile,
colorize=False,
level="DEBUG",
enqueue=True,
)
return logfile
NoOp
forward(self, x)
Defines the computation performed at every call.
Should be overridden by all subclasses.
.. note::
Although the recipe for forward pass needs to be defined within
this function, one should call the :class:Module
instance afterwards
instead of this since the former takes care of running the
registered hooks while the latter silently ignores them.
Source code in slp/util/pytorch.py
def forward(self, x):
return x
PackSequence
__init__(self, batch_first=True)
special
Wrap sequence packing in nn.Module
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_first |
bool |
Use batch first representation. Defaults to True. |
True |
Source code in slp/util/pytorch.py
def __init__(self, batch_first: bool = True):
"""Wrap sequence packing in nn.Module
Args:
batch_first (bool, optional): Use batch first representation. Defaults to True.
"""
super(PackSequence, self).__init__()
self.batch_first = batch_first
forward(self, x, lengths)
Pack a padded sequence and sort lengths
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
Tensor |
Padded tensor |
required |
lengths |
Tensor |
Original lengths befor padding |
required |
Returns:
Type | Description |
---|---|
Tuple[torch.nn.utils.rnn.PackedSequence, torch.Tensor] |
Tuple[torch.nn.utils.rnn.PackedSequence, torch.Tensor]: (packed sequence, sorted lengths) |
Source code in slp/util/pytorch.py
def forward(
self, x: torch.Tensor, lengths: torch.Tensor
) -> Tuple[torch.nn.utils.rnn.PackedSequence, torch.Tensor]:
"""Pack a padded sequence and sort lengths
Args:
x (torch.Tensor): Padded tensor
lengths (torch.Tensor): Original lengths befor padding
Returns:
Tuple[torch.nn.utils.rnn.PackedSequence, torch.Tensor]: (packed sequence, sorted lengths)
"""
out: torch.nn.utils.rnn.PackedSequence = pack_padded_sequence(
x, lengths, batch_first=self.batch_first, enforce_sorted=False
)
lengths = lengths[out.sorted_indices]
return out, lengths
PadPackedSequence
__init__(self, batch_first=True, max_length=-1)
special
Wrap sequence padding in nn.Module
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_first |
bool |
Use batch first representation. Defaults to True. |
True |
Source code in slp/util/pytorch.py
def __init__(self, batch_first: bool = True, max_length: int = -1):
"""Wrap sequence padding in nn.Module
Args:
batch_first (bool, optional): Use batch first representation. Defaults to True.
"""
super(PadPackedSequence, self).__init__()
self.batch_first = batch_first
self.max_length = max_length if max_length > 0 else None
forward(self, x, lengths)
Convert packed sequence to padded sequence
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
PackedSequence |
Packed sequence |
required |
lengths |
Tensor |
Sorted original sequence lengths |
required |
Returns:
Type | Description |
---|---|
Tensor |
torch.Tensor: Padded sequence |
Source code in slp/util/pytorch.py
def forward(
self, x: torch.nn.utils.rnn.PackedSequence, lengths: torch.Tensor
) -> torch.Tensor:
"""Convert packed sequence to padded sequence
Args:
x (torch.nn.utils.rnn.PackedSequence): Packed sequence
lengths (torch.Tensor): Sorted original sequence lengths
Returns:
torch.Tensor: Padded sequence
"""
out, _ = pad_packed_sequence(
x, batch_first=self.batch_first, total_length=self.max_length # type: ignore
)
return out # type: ignore
from_checkpoint(checkpoint_file, obj, map_location='cpu', dataparallel=False)
Load model or optimizer from saved state_dict
Parameters:
Name | Type | Description | Default |
---|---|---|---|
checkpoint_file |
Optional[str] |
File containing the state dict |
required |
obj |
Union[torch.nn.modules.module.Module, torch.optim.optimizer.Optimizer] |
Module or optimizer instance to load the checkpoint |
required |
map_location |
Union[torch.device, str] |
Where to load. Defaults to "cpu". |
'cpu' |
dataparallel |
bool |
If data parallel remove leading "module." from statedict keys. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
Union[torch.nn.modules.module.Module, torch.optim.optimizer.Optimizer] |
types.ModuleOrOptimizer: Loaded module or optimizer |
Source code in slp/util/pytorch.py
def from_checkpoint(
checkpoint_file: Optional[str],
obj: types.ModuleOrOptimizer,
map_location: Optional[types.Device] = "cpu",
dataparallel: bool = False,
) -> types.ModuleOrOptimizer:
"""Load model or optimizer from saved state_dict
Args:
checkpoint_file (Optional[str]): File containing the state dict
obj (types.ModuleOrOptimizer): Module or optimizer instance to load the checkpoint
map_location (Optional[types.Device], optional): Where to load. Defaults to "cpu".
dataparallel (bool, optional): If data parallel remove leading "module." from statedict keys. Defaults to False.
Returns:
types.ModuleOrOptimizer: Loaded module or optimizer
"""
if checkpoint_file is None:
return obj
if not system.is_file(checkpoint_file):
logger.warning(
f"The checkpoint {checkpoint_file} you are trying to load "
"does not exist. Continuing without loading..."
)
return obj
state_dict = torch.load(checkpoint_file, map_location=map_location)
if dataparallel:
state_dict = {k.replace("module.", ""): v for k, v in state_dict.items()}
obj.load_state_dict(state_dict)
return obj
mktensor(data, dtype=torch.float32, device='cpu', requires_grad=False, copy_tensor=True)
Convert a list or numpy array to torch tensor. If a torch tensor is passed it is cast to dtype, device and the requires_grad flag is set. This can copy data or make the operation in place.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[numpy.ndarray, torch.Tensor, List[~T]] |
(list, np.ndarray, torch.Tensor): Data to be converted to torch tensor. |
required |
dtype |
dtype |
(torch.dtype): The type of the tensor elements (Default value = torch.float) |
torch.float32 |
device |
Union[torch.device, str] |
(torch.device, str): Device where the tensor should be (Default value = 'cpu') |
'cpu' |
requires_grad |
bool |
(bool): Trainable tensor or not? (Default value = False) |
False |
copy_tensor |
bool |
(bool): If false creates the tensor inplace else makes a copy (Default value = True) |
True |
Returns:
Type | Description |
---|---|
Tensor |
(torch.Tensor): A tensor of appropriate dtype, device and requires_grad containing data |
Source code in slp/util/pytorch.py
def mktensor(
data: types.NdTensor,
dtype: torch.dtype = torch.float,
device: types.Device = "cpu",
requires_grad: bool = False,
copy_tensor: bool = True,
) -> torch.Tensor:
"""Convert a list or numpy array to torch tensor. If a torch tensor
is passed it is cast to dtype, device and the requires_grad flag is
set. This can copy data or make the operation in place.
Args:
data: (list, np.ndarray, torch.Tensor): Data to be converted to
torch tensor.
dtype: (torch.dtype): The type of the tensor elements
(Default value = torch.float)
device: (torch.device, str): Device where the tensor should be
(Default value = 'cpu')
requires_grad: (bool): Trainable tensor or not? (Default value = False)
copy_tensor: (bool): If false creates the tensor inplace else makes a copy
(Default value = True)
Returns:
(torch.Tensor): A tensor of appropriate dtype, device and
requires_grad containing data
"""
tensor_factory = t if copy_tensor else t_
return tensor_factory(data, dtype=dtype, device=device, requires_grad=requires_grad)
moore_penrose_pinv(x, num_iter=6)
Calculate approximate Moore-Penrose pseudoinverse, via iterative method
- Method is described in (Razavi et al 2014) https://www.hindawi.com/journals/aaa/2014/563787/
- Implementation modified from lucidrains https://github.com/lucidrains/nystrom-attention/blob/main/nystrom_attention/nystrom_attention.py#L13
Parameters:
Name | Type | Description | Default |
---|---|---|---|
x |
torch.Tensor |
(*, M, M) The square tensors to inverse. Dimension * can be any number of additional dimensions, e.g. (batch_size, num_heads, M, M) |
required |
num_iter |
int |
Number of iterations to run for approximation (6 is good enough usually) |
6 |
Returns:
Type | Description |
---|---|
(torch.Tensor) |
(B, H, N, N) The approximate Moore-Penrose pseudoinverse of mat |
Source code in slp/util/pytorch.py
def moore_penrose_pinv(x, num_iter=6):
"""Calculate approximate Moore-Penrose pseudoinverse, via iterative method
* Method is described in (Razavi et al 2014) https://www.hindawi.com/journals/aaa/2014/563787/
* Implementation modified from lucidrains https://github.com/lucidrains/nystrom-attention/blob/main/nystrom_attention/nystrom_attention.py#L13
Args:
x (torch.Tensor): (*, M, M) The square tensors to inverse.
Dimension * can be any number of additional dimensions, e.g. (batch_size, num_heads, M, M)
num_iter (int): Number of iterations to run for approximation (6 is good enough usually)
Returns:
(torch.Tensor): (B, H, N, N) The approximate Moore-Penrose pseudoinverse of mat
"""
abs_x = torch.abs(x)
col = abs_x.sum(dim=-1)
row = abs_x.sum(dim=-2)
z = x.transpose(-1, -2).contiguous()
z = z / (torch.max(col) * torch.max(row))
I = torch.eye(x.shape[-1], device=x.device).unsqueeze(0)
for _ in range(num_iter):
xz = x @ z
z = 0.25 * z @ (13 * I - (xz @ (15 * I - (xz @ (7 * I - xz)))))
return z
pad_mask(lengths, max_length=None)
Generate mask for padded tokens
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lengths |
Tensor |
Original sequence lengths before padding |
required |
max_length |
Union[torch.Tensor, int] |
Maximum sequence length. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Tensor |
torch.Tensor: padding mask |
Source code in slp/util/pytorch.py
def pad_mask(
lengths: torch.Tensor, max_length: Optional[Union[torch.Tensor, int]] = None
) -> torch.Tensor:
"""Generate mask for padded tokens
Args:
lengths (torch.Tensor): Original sequence lengths before padding
max_length (Optional[Union[torch.Tensor, int]], optional): Maximum sequence length. Defaults to None.
Returns:
torch.Tensor: padding mask
"""
if max_length is None or max_length < 0:
max_length = cast(int, torch.max(lengths).item())
max_length = cast(int, max_length)
idx = torch.arange(0, max_length, device=lengths.device).unsqueeze(0)
mask: torch.Tensor = (idx < lengths.unsqueeze(1)).float()
return mask
pad_sequence(sequences, batch_first=False, padding_value=0.0, max_length=-1)
Pad a list of variable length Tensors with padding_value
pad_sequence
stacks a list of Tensors along a new dimension,
and pads them to equal length. For example, if the input is list of
sequences with size L x *
and if batch_first is False, and T x B x *
otherwise.
B
is batch size. It is equal to the number of elements in sequences
.
T
is length of the longest sequence.
L
is length of the sequence.
*
is any number of trailing dimensions, including none.
Examples:
>>> from torch.nn.utils.rnn import pad_sequence
>>> a = torch.ones(25, 300)
>>> b = torch.ones(22, 300)
>>> c = torch.ones(15, 300)
>>> pad_sequence([a, b, c]).size()
torch.Size([25, 3, 300])
!!! note
This function returns a Tensor of size T x B x *
or B x T x *
where T
is the length of the longest sequence. This function assumes
trailing dimensions and type of all the Tensors in sequences are same.
Note:
This implementation is modified from torch.nn.utils.rnn.pad_sequence, to accept a
max_length argument for fixed length padding
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sequences |
List[torch.Tensor] |
list of variable length sequences. |
required |
batch_first |
bool |
output will be in |
False |
padding_value |
Union[float, int] |
value for padded elements. Default: 0. |
0.0 |
max_length |
int |
If max length is > 0 then this function will pad to a fixed maximum length. If any sequence is longer than max_length, it will be trimmed. |
-1 |
Returns:
Type | Description |
---|---|
Tensor of size ``T x B x *`` if |
attr: |
Source code in slp/util/pytorch.py
def pad_sequence(
sequences: List[torch.Tensor],
batch_first: bool = False,
padding_value: Union[float, int] = 0.0,
max_length: int = -1,
):
r"""Pad a list of variable length Tensors with ``padding_value``
``pad_sequence`` stacks a list of Tensors along a new dimension,
and pads them to equal length. For example, if the input is list of
sequences with size ``L x *`` and if batch_first is False, and ``T x B x *``
otherwise.
`B` is batch size. It is equal to the number of elements in ``sequences``.
`T` is length of the longest sequence.
`L` is length of the sequence.
`*` is any number of trailing dimensions, including none.
Example:
>>> from torch.nn.utils.rnn import pad_sequence
>>> a = torch.ones(25, 300)
>>> b = torch.ones(22, 300)
>>> c = torch.ones(15, 300)
>>> pad_sequence([a, b, c]).size()
torch.Size([25, 3, 300])
Note:
This function returns a Tensor of size ``T x B x *`` or ``B x T x *``
where `T` is the length of the longest sequence. This function assumes
trailing dimensions and type of all the Tensors in sequences are same.
Note:
This implementation is modified from torch.nn.utils.rnn.pad_sequence, to accept a
max_length argument for fixed length padding
Args:
sequences (list[Tensor]): list of variable length sequences.
batch_first (bool, optional): output will be in ``B x T x *`` if True, or in
``T x B x *`` otherwise
padding_value (float, optional): value for padded elements. Default: 0.
max_length (int): If max length is > 0 then this function will pad to a fixed maximum
length. If any sequence is longer than max_length, it will be trimmed.
Returns:
Tensor of size ``T x B x *`` if :attr:`batch_first` is ``False``.
Tensor of size ``B x T x *`` otherwise
"""
# assuming trailing dimensions and type of all the Tensors
# in sequences are same and fetching those from sequences[0]
max_size = sequences[0].size()
trailing_dims = max_size[1:]
if max_length < 0:
max_len = max([s.size(0) for s in sequences])
else:
max_len = max_length
if batch_first:
out_dims = (len(sequences), max_len) + trailing_dims
else:
out_dims = (max_len, len(sequences)) + trailing_dims
out_tensor = sequences[0].new_full(out_dims, padding_value)
for i, tensor in enumerate(sequences):
length = tensor.size(0)
# use index notation to prevent duplicate references to the tensor
if batch_first:
out_tensor[i, : min(length, max_len), ...] = tensor[
: min(length, max_len), ...
]
else:
out_tensor[: min(length, max_len), i, ...] = tensor[
: min(length, max_len), ...
]
return out_tensor
repeat_layer(l, times)
Clone a layer multiple times
Parameters:
Name | Type | Description | Default |
---|---|---|---|
l |
Module |
nn.Module to stack |
required |
times |
int |
Times to clone |
required |
Returns:
Type | Description |
---|---|
List[torch.nn.modules.module.Module] |
List[nn.Module]: List of identical clones of input layer |
Source code in slp/util/pytorch.py
def repeat_layer(l: nn.Module, times: int) -> List[nn.Module]:
"""Clone a layer multiple times
Args:
l (nn.Module): nn.Module to stack
times (int): Times to clone
Returns:
List[nn.Module]: List of identical clones of input layer
"""
return [l] + [copy.deepcopy(l) for _ in range(times - 1)]
rotate_tensor(l, n=1)
Roate tensor by n positions to the right
Parameters:
Name | Type | Description | Default |
---|---|---|---|
l |
Tensor |
input tensor |
required |
n |
int |
positions to rotate. Defaults to 1. |
1 |
Returns:
Type | Description |
---|---|
Tensor |
torch.Tensor: rotated tensor |
Source code in slp/util/pytorch.py
def rotate_tensor(l: torch.Tensor, n: int = 1) -> torch.Tensor:
"""Roate tensor by n positions to the right
Args:
l (torch.Tensor): input tensor
n (int, optional): positions to rotate. Defaults to 1.
Returns:
torch.Tensor: rotated tensor
"""
return torch.cat((l[n:], l[:n]))
shift_tensor(l, n=1)
Shift tensor by n positions
Parameters:
Name | Type | Description | Default |
---|---|---|---|
l |
Tensor |
input tensor |
required |
n |
int |
positions to shift. Defaults to 1. |
1 |
Returns:
Type | Description |
---|---|
Tensor |
torch.Tensor: shifted tensor |
Source code in slp/util/pytorch.py
def shift_tensor(l: torch.Tensor, n: int = 1) -> torch.Tensor:
"""Shift tensor by n positions
Args:
l (torch.Tensor): input tensor
n (int, optional): positions to shift. Defaults to 1.
Returns:
torch.Tensor: shifted tensor
"""
out = rotate_tensor(l, n=n)
out[-n:] = 0
return out
sort_sequences(inputs, lengths)
Sort sequences according to lengths (descending)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
Tensor |
input sequences, size [B, T, D] |
required |
lengths |
Tensor |
length of each sequence, size [B] |
required |
Returns:
Type | Description |
---|---|
Tuple[torch.Tensor, torch.Tensor, Callable[[torch.Tensor], torch.Tensor]] |
Tuple[torch.Tensor, torch.Tensor, Callable[[torch.Tensor], torch.tensor]]: (sorted inputs, sorted lengths, function to revert inputs and lengths to unsorted state) |
Source code in slp/util/pytorch.py
def sort_sequences(
inputs: torch.Tensor, lengths: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor, Callable[[torch.Tensor], torch.Tensor]]:
"""Sort sequences according to lengths (descending)
Args:
inputs (torch.Tensor): input sequences, size [B, T, D]
lengths (torch.Tensor): length of each sequence, size [B]
Returns:
Tuple[torch.Tensor, torch.Tensor, Callable[[torch.Tensor], torch.tensor]]:
(sorted inputs, sorted lengths, function to revert inputs and lengths to unsorted state)
"""
lengths_sorted, sorted_idx = lengths.sort(descending=True)
_, unsorted_idx = sorted_idx.sort()
def unsort(tt: torch.Tensor) -> torch.Tensor:
"""Restore original unsorted sequence"""
return tt[unsorted_idx]
return inputs[sorted_idx], lengths_sorted, unsort
subsequent_mask(max_length)
Generate subsequent (lower triangular) mask for transformer autoregressive tasks
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_length |
int |
Maximum sequence length |
required |
Returns:
Type | Description |
---|---|
Tensor |
torch.Tensor: The subsequent mask |
Source code in slp/util/pytorch.py
def subsequent_mask(max_length: int) -> torch.Tensor:
"""Generate subsequent (lower triangular) mask for transformer autoregressive tasks
Args:
max_length (int): Maximum sequence length
Returns:
torch.Tensor: The subsequent mask
"""
mask = torch.ones(max_length, max_length)
# Ignore typecheck because pytorch types are incomplete
return mask.triu().t().unsqueeze(0).contiguous() # type: ignore
t(data, dtype=torch.float32, device='cpu', requires_grad=False)
Convert a list or numpy array to torch tensor. If a torch tensor is passed it is cast to dtype, device and the requires_grad flag is set. This always copies data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[numpy.ndarray, torch.Tensor, List[~T]] |
(list, np.ndarray, torch.Tensor): Data to be converted to torch tensor. |
required |
dtype |
dtype |
(torch.dtype): The type of the tensor elements (Default value = torch.float) |
torch.float32 |
device |
Union[torch.device, str] |
(torch.device, str): Device where the tensor should be (Default value = 'cpu') |
'cpu' |
requires_grad |
bool |
(bool): Trainable tensor or not? (Default value = False) |
False |
Returns:
Type | Description |
---|---|
Tensor |
(torch.Tensor): A tensor of appropriate dtype, device and requires_grad containing data |
Source code in slp/util/pytorch.py
def t(
data: types.NdTensor,
dtype: torch.dtype = torch.float,
device: types.Device = "cpu",
requires_grad: bool = False,
) -> torch.Tensor:
"""Convert a list or numpy array to torch tensor. If a torch tensor
is passed it is cast to dtype, device and the requires_grad flag is
set. This always copies data.
Args:
data: (list, np.ndarray, torch.Tensor): Data to be converted to
torch tensor.
dtype: (torch.dtype): The type of the tensor elements
(Default value = torch.float)
device: (torch.device, str): Device where the tensor should be
(Default value = 'cpu')
requires_grad: (bool): Trainable tensor or not? (Default value = False)
Returns:
(torch.Tensor): A tensor of appropriate dtype, device and
requires_grad containing data
"""
tt = torch.tensor(data, dtype=dtype, device=device, requires_grad=requires_grad)
return tt
t_(data, dtype=torch.float32, device='cpu', requires_grad=False)
Convert a list or numpy array to torch tensor. If a torch tensor is passed it is cast to dtype, device and the requires_grad flag is set IN PLACE.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[numpy.ndarray, torch.Tensor, List[~T]] |
(list, np.ndarray, torch.Tensor): Data to be converted to torch tensor. |
required |
dtype |
dtype |
(torch.dtype): The type of the tensor elements (Default value = torch.float) |
torch.float32 |
device |
Union[torch.device, str] |
(torch.device, str): Device where the tensor should be (Default value = 'cpu') |
'cpu' |
requires_grad |
bool |
bool): Trainable tensor or not? (Default value = False) |
False |
Returns:
Type | Description |
---|---|
Tensor |
(torch.Tensor): A tensor of appropriate dtype, device and requires_grad containing data |
Source code in slp/util/pytorch.py
def t_(
data: types.NdTensor,
dtype: torch.dtype = torch.float,
device: Optional[types.Device] = "cpu",
requires_grad: bool = False,
) -> torch.Tensor:
"""Convert a list or numpy array to torch tensor. If a torch tensor
is passed it is cast to dtype, device and the requires_grad flag is
set IN PLACE.
Args:
data: (list, np.ndarray, torch.Tensor): Data to be converted to
torch tensor.
dtype: (torch.dtype): The type of the tensor elements
(Default value = torch.float)
device: (torch.device, str): Device where the tensor should be
(Default value = 'cpu')
requires_grad: bool): Trainable tensor or not? (Default value = False)
Returns:
(torch.Tensor): A tensor of appropriate dtype, device and
requires_grad containing data
"""
if isinstance(device, str):
device = torch.device(device)
tt = torch.as_tensor(data, dtype=dtype, device=device).requires_grad_(requires_grad)
return tt
to_device(tt, device='cpu', non_blocking=False)
Send a tensor to a device
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tt |
Tensor |
input tensor |
required |
device |
Union[torch.device, str] |
Output device. Defaults to "cpu". |
'cpu' |
non_blocking |
bool |
Use blocking or non-blocking memory transfer. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
Tensor |
torch.Tensor: Tensor in the desired device |
Source code in slp/util/pytorch.py
def to_device(
tt: torch.Tensor, device: Optional[types.Device] = "cpu", non_blocking: bool = False
) -> torch.Tensor:
"""Send a tensor to a device
Args:
tt (torch.Tensor): input tensor
device (Optional[types.Device], optional): Output device. Defaults to "cpu".
non_blocking (bool, optional): Use blocking or non-blocking memory transfer. Defaults to False.
Returns:
torch.Tensor: Tensor in the desired device
"""
return tt.to(device, non_blocking=non_blocking)
date_fname()
date_fname Generate a filename based on datetime.now().
If multiple calls are made within the same second, the filename will not be unique. We could add miliseconds etc. in the fname but that would hinder readability. For practical purposes e.g. unique logs between different experiments this should be enough. Either way if we need a truly unique descriptor, there is the uuid module.
Returns:
Type | Description |
---|---|
str |
str: A filename, e.g. 20210228-211832 |
Source code in slp/util/system.py
def date_fname() -> str:
"""date_fname Generate a filename based on datetime.now().
If multiple calls are made within the same second, the filename will not be unique.
We could add miliseconds etc. in the fname but that would hinder readability.
For practical purposes e.g. unique logs between different experiments this should be enough.
Either way if we need a truly unique descriptor, there is the uuid module.
Returns:
str: A filename, e.g. 20210228-211832
"""
return datetime.now().strftime("%Y%m%d-%H%M%S")
download_url(url, dest_path)
download_url Download a file to a destination path given a URL
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
A url pointing to the file we want to download |
required |
dest_path |
str |
The destination path to write the file |
required |
Returns:
Type | Description |
---|---|
str |
(str): The filename where the downloaded file is written |
Source code in slp/util/system.py
def download_url(url: str, dest_path: str) -> str:
"""download_url Download a file to a destination path given a URL
Args:
url (str): A url pointing to the file we want to download
dest_path (str): The destination path to write the file
Returns:
(str): The filename where the downloaded file is written
"""
name = url.rsplit("/")[-1]
dest = os.path.join(dest_path, name)
safe_mkdirs(dest_path)
response = urllib.request.urlopen(url)
with open(dest, "wb") as fd:
shutil.copyfileobj(response, fd)
return dest
has_internet_connection(timeout=3)
has_internet_connection Check if you are connected to the internet
Check if internet connection exists by pinging Google DNS server
Host: 8.8.8.8 (google-public-dns-a.google.com) OpenPort: 53/tcp Service: domain (DNS/TCP)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int |
Seconds to wait before giving up |
3 |
Returns:
Type | Description |
---|---|
bool |
bool: True if connection is established, False if we are not connected to the internet |
Source code in slp/util/system.py
def has_internet_connection(timeout: int = 3) -> bool:
"""has_internet_connection Check if you are connected to the internet
Check if internet connection exists by pinging Google DNS server
Host: 8.8.8.8 (google-public-dns-a.google.com)
OpenPort: 53/tcp
Service: domain (DNS/TCP)
Args:
timeout (int): Seconds to wait before giving up
Returns:
bool: True if connection is established, False if we are not connected to the internet
"""
host, port = "8.8.8.8", 53
try:
socket.setdefaulttimeout(timeout)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except socket.error as ex:
print(ex)
return False
is_file(inp)
is_file Check if the provided string is valid file in the system path
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inp |
Optional[str] |
A potential file or None |
required |
Returns:
Type | Description |
---|---|
Union[validators.utils.ValidationFailure, bool] |
types.ValidationResult: True if a valid file is provided, False if the string is not a url |
Examples:
>>> is_file("/bin/bash")
True
>>> is_file("/supercalifragilisticexpialidocious") # This does not exist. I hope...
False
Source code in slp/util/system.py
def is_file(inp: Optional[str]) -> types.ValidationResult:
"""is_file Check if the provided string is valid file in the system path
Args:
inp (Optional[str]): A potential file or None
Returns:
types.ValidationResult: True if a valid file is provided, False if the string is not a url
Examples:
>>> is_file("/bin/bash")
True
>>> is_file("/supercalifragilisticexpialidocious") # This does not exist. I hope...
False
"""
if not inp:
return False
return os.path.isfile(inp)
is_subpath(child, parent)
is_subpath Check if child path is a subpath of parent
Parameters:
Name | Type | Description | Default |
---|---|---|---|
child |
str |
Child path |
required |
parent |
str |
parent path |
required |
Returns:
Type | Description |
---|---|
bool |
bool: True if child is a subpath of parent, false if not |
Examples:
>>> is_subpath("/usr/bin/Xorg", "/usr")
True
Source code in slp/util/system.py
def is_subpath(child: str, parent: str) -> bool:
"""is_subpath Check if child path is a subpath of parent
Args:
child (str): Child path
parent (str): parent path
Returns:
bool: True if child is a subpath of parent, false if not
Examples:
>>> is_subpath("/usr/bin/Xorg", "/usr")
True
"""
parent = os.path.abspath(parent)
child = os.path.abspath(child)
return cast(
bool, os.path.commonpath([parent]) == os.path.commonpath([parent, child])
)
is_url(inp)
is_url Check if the provided string is a URL
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inp |
Optional[str] |
A potential link or None |
required |
Returns:
Type | Description |
---|---|
Union[validators.utils.ValidationFailure, bool] |
types.ValidationResult: True if a valid url is provided, False if the string is not a url |
Examples:
>>> is_url("Hello World")
ValidationFailure(func=url, args={'value': 'Hello World', 'public': False})
>>> is_url("http://google.com")
True
Source code in slp/util/system.py
def is_url(inp: Optional[str]) -> types.ValidationResult:
"""is_url Check if the provided string is a URL
Args:
inp (Optional[str]): A potential link or None
Returns:
types.ValidationResult: True if a valid url is provided, False if the string is not a url
Examples:
>>> is_url("Hello World")
ValidationFailure(func=url, args={'value': 'Hello World', 'public': False})
>>> is_url("http://google.com")
True
"""
if not inp:
return False
return validators.url(inp)
json_dump(data, fname)
json_dump Save dict to a json file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict[~K, ~V] |
Dict to save |
required |
fname |
str |
Output json file |
required |
Source code in slp/util/system.py
def json_dump(data: types.GenericDict, fname: str) -> None:
"""json_dump Save dict to a json file
Args:
data (types.GenericDict): Dict to save
fname (str): Output json file
"""
with open(fname, "w") as fd:
json.dump(data, fd)
json_load(fname)
json_load Load dict from a json file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fname |
str |
Json file to load |
required |
Returns:
Type | Description |
---|---|
Dict[~K, ~V] |
types.GenericDict: Dict of loaded data |
Source code in slp/util/system.py
def json_load(fname: str) -> types.GenericDict:
"""json_load Load dict from a json file
Args:
fname (str): Json file to load
Returns:
types.GenericDict: Dict of loaded data
"""
with open(fname, "r") as fd:
data = json.load(fd)
return cast(types.GenericDict, data)
pickle_dump(data, fname)
pickle_dump Save data to pickle file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
Data to save |
required |
fname |
str |
Output pickle file |
required |
Source code in slp/util/system.py
def pickle_dump(data: Any, fname: str) -> None:
"""pickle_dump Save data to pickle file
Args:
data (Any): Data to save
fname (str): Output pickle file
"""
with open(fname, "wb") as fd:
pickle.dump(data, fd)
pickle_load(fname)
pickle_load Load data from pickle file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fname |
str |
file name of pickle file |
required |
Returns:
Type | Description |
---|---|
Any |
Any: Loaded data |
Source code in slp/util/system.py
def pickle_load(fname: str) -> Any:
"""pickle_load Load data from pickle file
Args:
fname (str): file name of pickle file
Returns:
Any: Loaded data
"""
with open(fname, "rb") as fd:
data = pickle.load(fd)
return data
print_separator(symbol='*', n=10, print_fn=<built-in function print>)
print_separator Print a repeated symbol as a separator
Parameters:
Name | Type | Description | Default |
---|---|---|---|
symbol |
str |
Symbol to print |
'*' |
n |
int |
Number of times to print the symbol |
10 |
print_fn |
Callable[[str], NoneType] |
Print function to use, e.g. print or logger.info |
<built-in function print> |
Examples:
>>> print_separator(symbol="-", n=2)
--
Source code in slp/util/system.py
def print_separator(
symbol: str = "*", n: int = 10, print_fn: Callable[[str], None] = print
):
"""print_separator Print a repeated symbol as a separator
*********************************************************
Args:
symbol (str): Symbol to print
n (int): Number of times to print the symbol
print_fn (Callable[[str], None]): Print function to use, e.g. print or logger.info
Examples:
>>> print_separator(symbol="-", n=2)
--
"""
print_fn(symbol * n)
read_wav(wav_sample)
read_wav Reads a wav clip into a string and returns the hex string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
wav_sample |
str |
Path to wav file |
required |
Returns:
Type | Description |
---|---|
str |
A hex string with the audio information. |
Source code in slp/util/system.py
def read_wav(wav_sample: str) -> str:
"""read_wav Reads a wav clip into a string and returns the hex string.
Args:
wav_sample (str): Path to wav file
Returns:
A hex string with the audio information.
"""
with open(wav_sample, "r") as wav_fd:
clip = wav_fd.read()
return clip
run_cmd(command)
run_cmd Run given shell command
!!! args
command (str): Shell command to run
!!! returns
(int, str): Status code, stdout of shell command
!!! examples
>>> run_cmd("ls /")
(0, 'bin
boot dev etc home init lib lib32 lib64 libx32 lost+found media mnt opt proc root run sbin snap srv sys tmp usr var ')
Source code in slp/util/system.py
def run_cmd(command: str) -> Tuple[int, str]:
"""run_cmd Run given shell command
Args:
command (str): Shell command to run
Returns:
(int, str): Status code, stdout of shell command
Examples:
>>> run_cmd("ls /")
(0, 'bin\nboot\ndev\netc\nhome\ninit\nlib\nlib32\nlib64\nlibx32\nlost+found\nmedia\nmnt\nopt\nproc\nroot\nrun\nsbin\nsnap\nsrv\nsys\ntmp\nusr\nvar\n')
"""
command = f'{os.getenv("SHELL")} -c "{command}"'
pipe = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
stdout = ""
if pipe.stdout is not None:
stdout = "".join(
[line.decode("utf-8") for line in iter(pipe.stdout.readline, b"")]
)
pipe.stdout.close()
returncode = pipe.wait()
return returncode, stdout
run_cmd_silent(command)
run_cmd_silent Run command without printing to console
!!! args
command (str): Shell command to run
!!! returns
(int, str): Status code, stdout of shell command
!!! examples
>>> run_cmd("ls /")
(0, 'bin
boot dev etc home init lib lib32 lib64 libx32 lost+found media mnt opt proc root run sbin snap srv sys tmp usr var ')
Source code in slp/util/system.py
def run_cmd_silent(command: str) -> Tuple[int, str]:
"""run_cmd_silent Run command without printing to console
Args:
command (str): Shell command to run
Returns:
(int, str): Status code, stdout of shell command
Examples:
>>> run_cmd("ls /")
(0, 'bin\nboot\ndev\netc\nhome\ninit\nlib\nlib32\nlib64\nlibx32\nlost+found\nmedia\nmnt\nopt\nproc\nroot\nrun\nsbin\nsnap\nsrv\nsys\ntmp\nusr\nvar\n')
"""
return cast(Tuple[int, str], suppress_print(run_cmd)(command))
safe_mkdirs(path)
Makes recursively all the directories in input path
Utility function similar to mkdir -p. Makes directories recursively, if given path does not exist
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Path to mkdir -p |
required |
Examples:
>>> safe_mkdirs("super/cali/fragi/listic/expi/ali/docious")
Source code in slp/util/system.py
def safe_mkdirs(path: str) -> None:
"""Makes recursively all the directories in input path
Utility function similar to mkdir -p. Makes directories recursively, if given path does not exist
Args:
path (str): Path to mkdir -p
Examples:
>>> safe_mkdirs("super/cali/fragi/listic/expi/ali/docious")
"""
if not os.path.exists(path):
try:
os.makedirs(path)
except Exception as e:
logger.warning(e)
raise IOError((f"Failed to create recursive directories: {path}"))
suppress_print(func)
suppress_print Decorator to supress stdout of decorated function
Examples:
>>> @slp.util.system.timethis
>>> def very_verbose_function(...): ...
Source code in slp/util/system.py
def suppress_print(func: Callable) -> Callable:
"""suppress_print Decorator to supress stdout of decorated function
Examples:
>>> @slp.util.system.timethis
>>> def very_verbose_function(...): ...
"""
def func_wrapper(*args: types.T, **kwargs: types.T):
"""Inner function for decorator closure"""
with open("/dev/null", "w") as sys.stdout:
ret = func(*args, **kwargs)
sys.stdout = sys.__stdout__
return ret
return cast(Callable, func_wrapper)
timethis(method=False)
Decorator to measure the time it takes for a function to complete
Examples:
>>> @slp.util.system.timethis
>>> def time_consuming_function(...): ...
Source code in slp/util/system.py
def timethis(method=False) -> Callable:
"""Decorator to measure the time it takes for a function to complete
Examples:
>>> @slp.util.system.timethis
>>> def time_consuming_function(...): ...
"""
def timethis_inner(func: Callable) -> Callable:
"""Inner function for decorator closure"""
@functools.wraps(func)
def timed(*args: types.T, **kwargs: types.T):
"""Inner function for decorator closure"""
ts = time.time()
result = func(*args, **kwargs)
te = time.time()
elapsed = f"{te - ts}"
if method:
logger.info(
"BENCHMARK: {cls}.{f}(*{a}, **{kw}) took: {t} sec".format(
f=func.__name__, cls=args[0], a=args[1:], kw=kwargs, t=elapsed
)
)
else:
logger.info(
"BENCHMARK: {f}(*{a}, **{kw}) took: {t} sec".format(
f=func.__name__, a=args, kw=kwargs, t=elapsed
)
)
return result
return cast(Callable, timed)
return timethis_inner
write_wav(byte_str, wav_file)
write_wav Write a hex string into a wav file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
byte_str |
str |
The hex string containing the audio data |
required |
wav_file |
str |
The output wav file |
required |
Source code in slp/util/system.py
def write_wav(byte_str: str, wav_file: str) -> None:
"""write_wav Write a hex string into a wav file
Args:
byte_str (str): The hex string containing the audio data
wav_file (str): The output wav file
"""
with open(wav_file, "w") as fd:
fd.write(byte_str)
yaml_dump(data, fname)
yaml_dump Save dict to a yaml file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict[~K, ~V] |
Dict to save |
required |
fname |
str |
Output json file |
required |
Source code in slp/util/system.py
def yaml_dump(data: types.GenericDict, fname: str) -> None:
"""yaml_dump Save dict to a yaml file
Args:
data (types.GenericDict): Dict to save
fname (str): Output json file
"""
with open(fname, "w") as fd:
yaml.dump(data, fd)
yaml_load(fname)
yaml_load Load dict from a yaml file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fname |
str |
Json file to load |
required |
Returns:
Type | Description |
---|---|
Dict[~K, ~V] |
types.GenericDict: Dict of loaded data |
Source code in slp/util/system.py
def yaml_load(fname: str) -> types.GenericDict:
"""yaml_load Load dict from a yaml file
Args:
fname (str): Json file to load
Returns:
types.GenericDict: Dict of loaded data
"""
with open(fname, "r") as fd:
data = yaml.load(fd)
return cast(types.GenericDict, data)
dir_path(path)
dir_path Type to use when parsing a path in argparse arguments
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
User provided path |
required |
Exceptions:
Type | Description |
---|---|
argparse.ArgumentTypeError |
Path does not exists, so argparse fails |
Returns:
Type | Description |
---|---|
str |
User provided path |
Examples:
>>> from slp.util.types import dir_path
>>> import argparse
>>> parser = argparse.ArgumentParser("My cool model")
>>> parser.add_argument("--config", type=dir_path)
>>> parser.parse_args(args=["--config", "my_random_config_that_does_not_exist.yaml"])
Traceback (most recent call last):
argparse.ArgumentTypeError: User provided path 'my_random_config_that_does_not_exist.yaml' does not exist
Source code in slp/util/types.py
def dir_path(path):
"""dir_path Type to use when parsing a path in argparse arguments
Args:
path (str): User provided path
Raises:
argparse.ArgumentTypeError: Path does not exists, so argparse fails
Returns:
str: User provided path
Examples:
>>> from slp.util.types import dir_path
>>> import argparse
>>> parser = argparse.ArgumentParser("My cool model")
>>> parser.add_argument("--config", type=dir_path)
>>> parser.parse_args(args=["--config", "my_random_config_that_does_not_exist.yaml"])
Traceback (most recent call last):
argparse.ArgumentTypeError: User provided path 'my_random_config_that_does_not_exist.yaml' does not exist
"""
if os.path.isdir(path):
return path
raise argparse.ArgumentTypeError(f"User provided path '{path}' does not exist")