first commit
This commit is contained in:
20
rtdetrv2_pytorch/src/data/transforms/__init__.py
Normal file
20
rtdetrv2_pytorch/src/data/transforms/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
""""Copyright(c) 2023 lyuwenyu. All Rights Reserved.
|
||||
"""
|
||||
|
||||
|
||||
from ._transforms import (
|
||||
EmptyTransform,
|
||||
RandomPhotometricDistort,
|
||||
RandomZoomOut,
|
||||
RandomIoUCrop,
|
||||
RandomHorizontalFlip,
|
||||
Resize,
|
||||
PadToSize,
|
||||
SanitizeBoundingBoxes,
|
||||
RandomCrop,
|
||||
Normalize,
|
||||
ConvertBoxes,
|
||||
ConvertPILImage,
|
||||
)
|
||||
from .container import Compose
|
||||
from .mosaic import Mosaic
|
||||
148
rtdetrv2_pytorch/src/data/transforms/_transforms.py
Normal file
148
rtdetrv2_pytorch/src/data/transforms/_transforms.py
Normal file
@@ -0,0 +1,148 @@
|
||||
""""Copyright(c) 2023 lyuwenyu. All Rights Reserved.
|
||||
"""
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
import torchvision
|
||||
torchvision.disable_beta_transforms_warning()
|
||||
|
||||
import torchvision.transforms.v2 as T
|
||||
import torchvision.transforms.v2.functional as F
|
||||
|
||||
import PIL
|
||||
import PIL.Image
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from .._misc import convert_to_tv_tensor, _boxes_keys
|
||||
from .._misc import Image, Video, Mask, BoundingBoxes
|
||||
from .._misc import SanitizeBoundingBoxes
|
||||
|
||||
from ...core import register
|
||||
|
||||
|
||||
RandomPhotometricDistort = register()(T.RandomPhotometricDistort)
|
||||
RandomZoomOut = register()(T.RandomZoomOut)
|
||||
RandomHorizontalFlip = register()(T.RandomHorizontalFlip)
|
||||
Resize = register()(T.Resize)
|
||||
# ToImageTensor = register()(T.ToImageTensor)
|
||||
# ConvertDtype = register()(T.ConvertDtype)
|
||||
# PILToTensor = register()(T.PILToTensor)
|
||||
SanitizeBoundingBoxes = register(name='SanitizeBoundingBoxes')(SanitizeBoundingBoxes)
|
||||
RandomCrop = register()(T.RandomCrop)
|
||||
Normalize = register()(T.Normalize)
|
||||
|
||||
|
||||
@register()
|
||||
class EmptyTransform(T.Transform):
|
||||
def __init__(self, ) -> None:
|
||||
super().__init__()
|
||||
|
||||
def forward(self, *inputs):
|
||||
inputs = inputs if len(inputs) > 1 else inputs[0]
|
||||
return inputs
|
||||
|
||||
|
||||
@register()
|
||||
class PadToSize(T.Pad):
|
||||
_transformed_types = (
|
||||
PIL.Image.Image,
|
||||
Image,
|
||||
Video,
|
||||
Mask,
|
||||
BoundingBoxes,
|
||||
)
|
||||
def _get_params(self, flat_inputs: List[Any]) -> Dict[str, Any]:
|
||||
sp = F.get_spatial_size(flat_inputs[0])
|
||||
h, w = self.size[1] - sp[0], self.size[0] - sp[1]
|
||||
self.padding = [0, 0, w, h]
|
||||
return dict(padding=self.padding)
|
||||
|
||||
def make_params(self, flat_inputs: List[Any]) -> Dict[str, Any]:
|
||||
return self._get_params(flat_inputs)
|
||||
|
||||
def __init__(self, size, fill=0, padding_mode='constant') -> None:
|
||||
if isinstance(size, int):
|
||||
size = (size, size)
|
||||
self.size = size
|
||||
super().__init__(0, fill, padding_mode)
|
||||
|
||||
def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
|
||||
fill = self._fill[type(inpt)]
|
||||
padding = params['padding']
|
||||
return F.pad(inpt, padding=padding, fill=fill, padding_mode=self.padding_mode) # type: ignore[arg-type]
|
||||
|
||||
def transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
|
||||
return self._transform(inpt, params)
|
||||
|
||||
def __call__(self, *inputs: Any) -> Any:
|
||||
outputs = super().forward(*inputs)
|
||||
if len(outputs) > 1 and isinstance(outputs[1], dict):
|
||||
outputs[1]['padding'] = torch.tensor(self.padding)
|
||||
return outputs
|
||||
|
||||
|
||||
@register()
|
||||
class RandomIoUCrop(T.RandomIoUCrop):
|
||||
def __init__(self, min_scale: float = 0.3, max_scale: float = 1, min_aspect_ratio: float = 0.5, max_aspect_ratio: float = 2, sampler_options: Optional[List[float]] = None, trials: int = 40, p: float = 1.0):
|
||||
super().__init__(min_scale, max_scale, min_aspect_ratio, max_aspect_ratio, sampler_options, trials)
|
||||
self.p = p
|
||||
|
||||
def __call__(self, *inputs: Any) -> Any:
|
||||
if torch.rand(1) >= self.p:
|
||||
return inputs if len(inputs) > 1 else inputs[0]
|
||||
|
||||
return super().forward(*inputs)
|
||||
|
||||
|
||||
@register()
|
||||
class ConvertBoxes(T.Transform):
|
||||
_transformed_types = (
|
||||
BoundingBoxes,
|
||||
)
|
||||
def __init__(self, fmt='', normalize=False) -> None:
|
||||
super().__init__()
|
||||
self.fmt = fmt
|
||||
self.normalize = normalize
|
||||
|
||||
def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
|
||||
spatial_size = getattr(inpt, _boxes_keys[1])
|
||||
if self.fmt:
|
||||
in_fmt = inpt.format.value.lower()
|
||||
inpt = torchvision.ops.box_convert(inpt, in_fmt=in_fmt, out_fmt=self.fmt.lower())
|
||||
inpt = convert_to_tv_tensor(inpt, key='boxes', box_format=self.fmt.upper(), spatial_size=spatial_size)
|
||||
|
||||
if self.normalize:
|
||||
inpt = inpt / torch.tensor(spatial_size[::-1]).tile(2)[None]
|
||||
|
||||
return inpt
|
||||
|
||||
def transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
|
||||
return self._transform(inpt, params)
|
||||
|
||||
|
||||
@register()
|
||||
class ConvertPILImage(T.Transform):
|
||||
_transformed_types = (
|
||||
PIL.Image.Image,
|
||||
)
|
||||
def __init__(self, dtype='float32', scale=True) -> None:
|
||||
super().__init__()
|
||||
self.dtype = dtype
|
||||
self.scale = scale
|
||||
|
||||
def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
|
||||
inpt = F.pil_to_tensor(inpt)
|
||||
if self.dtype == 'float32':
|
||||
inpt = inpt.float()
|
||||
|
||||
if self.scale:
|
||||
inpt = inpt / 255.
|
||||
|
||||
inpt = Image(inpt)
|
||||
|
||||
return inpt
|
||||
|
||||
def transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
|
||||
return self._transform(inpt, params)
|
||||
95
rtdetrv2_pytorch/src/data/transforms/container.py
Normal file
95
rtdetrv2_pytorch/src/data/transforms/container.py
Normal file
@@ -0,0 +1,95 @@
|
||||
""""Copyright(c) 2023 lyuwenyu. All Rights Reserved.
|
||||
"""
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
import torchvision
|
||||
torchvision.disable_beta_transforms_warning()
|
||||
import torchvision.transforms.v2 as T
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from ._transforms import EmptyTransform
|
||||
from ...core import register, GLOBAL_CONFIG
|
||||
|
||||
|
||||
@register()
|
||||
class Compose(T.Compose):
|
||||
def __init__(self, ops, policy=None) -> None:
|
||||
transforms = []
|
||||
if ops is not None:
|
||||
for op in ops:
|
||||
if isinstance(op, dict):
|
||||
name = op.pop('type')
|
||||
transfom = getattr(GLOBAL_CONFIG[name]['_pymodule'], GLOBAL_CONFIG[name]['_name'])(**op)
|
||||
transforms.append(transfom)
|
||||
op['type'] = name
|
||||
|
||||
elif isinstance(op, nn.Module):
|
||||
transforms.append(op)
|
||||
|
||||
else:
|
||||
raise ValueError('')
|
||||
else:
|
||||
transforms =[EmptyTransform(), ]
|
||||
|
||||
super().__init__(transforms=transforms)
|
||||
|
||||
if policy is None:
|
||||
policy = {'name': 'default'}
|
||||
|
||||
self.policy = policy
|
||||
self.global_samples = 0
|
||||
|
||||
def forward(self, *inputs: Any) -> Any:
|
||||
return self.get_forward(self.policy['name'])(*inputs)
|
||||
|
||||
def get_forward(self, name):
|
||||
forwards = {
|
||||
'default': self.default_forward,
|
||||
'stop_epoch': self.stop_epoch_forward,
|
||||
'stop_sample': self.stop_sample_forward,
|
||||
}
|
||||
return forwards[name]
|
||||
|
||||
def default_forward(self, *inputs: Any) -> Any:
|
||||
sample = inputs if len(inputs) > 1 else inputs[0]
|
||||
for transform in self.transforms:
|
||||
sample = transform(sample)
|
||||
return sample
|
||||
|
||||
def stop_epoch_forward(self, *inputs: Any):
|
||||
sample = inputs if len(inputs) > 1 else inputs[0]
|
||||
dataset = sample[-1]
|
||||
|
||||
cur_epoch = dataset.epoch
|
||||
policy_ops = self.policy['ops']
|
||||
policy_epoch = self.policy['epoch']
|
||||
|
||||
for transform in self.transforms:
|
||||
if type(transform).__name__ in policy_ops and cur_epoch >= policy_epoch:
|
||||
pass
|
||||
else:
|
||||
sample = transform(sample)
|
||||
|
||||
return sample
|
||||
|
||||
|
||||
def stop_sample_forward(self, *inputs: Any):
|
||||
sample = inputs if len(inputs) > 1 else inputs[0]
|
||||
dataset = sample[-1]
|
||||
|
||||
cur_epoch = dataset.epoch
|
||||
policy_ops = self.policy['ops']
|
||||
policy_sample = self.policy['sample']
|
||||
|
||||
for transform in self.transforms:
|
||||
if type(transform).__name__ in policy_ops and self.global_samples >= policy_sample:
|
||||
pass
|
||||
else:
|
||||
sample = transform(sample)
|
||||
|
||||
self.global_samples += 1
|
||||
|
||||
return sample
|
||||
169
rtdetrv2_pytorch/src/data/transforms/functional.py
Normal file
169
rtdetrv2_pytorch/src/data/transforms/functional.py
Normal file
@@ -0,0 +1,169 @@
|
||||
import torch
|
||||
import torchvision.transforms.functional as F
|
||||
|
||||
from packaging import version
|
||||
from typing import Optional, List
|
||||
from torch import Tensor
|
||||
|
||||
# needed due to empty tensor bug in pytorch and torchvision 0.5
|
||||
import torchvision
|
||||
if version.parse(torchvision.__version__) < version.parse('0.7'):
|
||||
from torchvision.ops import _new_empty_tensor
|
||||
from torchvision.ops.misc import _output_size
|
||||
|
||||
|
||||
def interpolate(input, size=None, scale_factor=None, mode="nearest", align_corners=None):
|
||||
# type: (Tensor, Optional[List[int]], Optional[float], str, Optional[bool]) -> Tensor
|
||||
"""
|
||||
Equivalent to nn.functional.interpolate, but with support for empty batch sizes.
|
||||
This will eventually be supported natively by PyTorch, and this
|
||||
class can go away.
|
||||
"""
|
||||
if version.parse(torchvision.__version__) < version.parse('0.7'):
|
||||
if input.numel() > 0:
|
||||
return torch.nn.functional.interpolate(
|
||||
input, size, scale_factor, mode, align_corners
|
||||
)
|
||||
|
||||
output_shape = _output_size(2, input, size, scale_factor)
|
||||
output_shape = list(input.shape[:-2]) + list(output_shape)
|
||||
return _new_empty_tensor(input, output_shape)
|
||||
else:
|
||||
return torchvision.ops.misc.interpolate(input, size, scale_factor, mode, align_corners)
|
||||
|
||||
|
||||
|
||||
def crop(image, target, region):
|
||||
cropped_image = F.crop(image, *region)
|
||||
|
||||
target = target.copy()
|
||||
i, j, h, w = region
|
||||
|
||||
# should we do something wrt the original size?
|
||||
target["size"] = torch.tensor([h, w])
|
||||
|
||||
fields = ["labels", "area", "iscrowd"]
|
||||
|
||||
if "boxes" in target:
|
||||
boxes = target["boxes"]
|
||||
max_size = torch.as_tensor([w, h], dtype=torch.float32)
|
||||
cropped_boxes = boxes - torch.as_tensor([j, i, j, i])
|
||||
cropped_boxes = torch.min(cropped_boxes.reshape(-1, 2, 2), max_size)
|
||||
cropped_boxes = cropped_boxes.clamp(min=0)
|
||||
area = (cropped_boxes[:, 1, :] - cropped_boxes[:, 0, :]).prod(dim=1)
|
||||
target["boxes"] = cropped_boxes.reshape(-1, 4)
|
||||
target["area"] = area
|
||||
fields.append("boxes")
|
||||
|
||||
if "masks" in target:
|
||||
# FIXME should we update the area here if there are no boxes?
|
||||
target['masks'] = target['masks'][:, i:i + h, j:j + w]
|
||||
fields.append("masks")
|
||||
|
||||
# remove elements for which the boxes or masks that have zero area
|
||||
if "boxes" in target or "masks" in target:
|
||||
# favor boxes selection when defining which elements to keep
|
||||
# this is compatible with previous implementation
|
||||
if "boxes" in target:
|
||||
cropped_boxes = target['boxes'].reshape(-1, 2, 2)
|
||||
keep = torch.all(cropped_boxes[:, 1, :] > cropped_boxes[:, 0, :], dim=1)
|
||||
else:
|
||||
keep = target['masks'].flatten(1).any(1)
|
||||
|
||||
for field in fields:
|
||||
target[field] = target[field][keep]
|
||||
|
||||
return cropped_image, target
|
||||
|
||||
|
||||
def hflip(image, target):
|
||||
flipped_image = F.hflip(image)
|
||||
|
||||
w, h = image.size
|
||||
|
||||
target = target.copy()
|
||||
if "boxes" in target:
|
||||
boxes = target["boxes"]
|
||||
boxes = boxes[:, [2, 1, 0, 3]] * torch.as_tensor([-1, 1, -1, 1]) + torch.as_tensor([w, 0, w, 0])
|
||||
target["boxes"] = boxes
|
||||
|
||||
if "masks" in target:
|
||||
target['masks'] = target['masks'].flip(-1)
|
||||
|
||||
return flipped_image, target
|
||||
|
||||
|
||||
def resize(image, target, size, max_size=None):
|
||||
# size can be min_size (scalar) or (w, h) tuple
|
||||
|
||||
def get_size_with_aspect_ratio(image_size, size, max_size=None):
|
||||
w, h = image_size
|
||||
if max_size is not None:
|
||||
min_original_size = float(min((w, h)))
|
||||
max_original_size = float(max((w, h)))
|
||||
if max_original_size / min_original_size * size > max_size:
|
||||
size = int(round(max_size * min_original_size / max_original_size))
|
||||
|
||||
if (w <= h and w == size) or (h <= w and h == size):
|
||||
return (h, w)
|
||||
|
||||
if w < h:
|
||||
ow = size
|
||||
oh = int(size * h / w)
|
||||
else:
|
||||
oh = size
|
||||
ow = int(size * w / h)
|
||||
|
||||
# r = min(size / min(h, w), max_size / max(h, w))
|
||||
# ow = int(w * r)
|
||||
# oh = int(h * r)
|
||||
|
||||
return (oh, ow)
|
||||
|
||||
def get_size(image_size, size, max_size=None):
|
||||
if isinstance(size, (list, tuple)):
|
||||
return size[::-1]
|
||||
else:
|
||||
return get_size_with_aspect_ratio(image_size, size, max_size)
|
||||
|
||||
size = get_size(image.size, size, max_size)
|
||||
rescaled_image = F.resize(image, size)
|
||||
|
||||
if target is None:
|
||||
return rescaled_image, None
|
||||
|
||||
ratios = tuple(float(s) / float(s_orig) for s, s_orig in zip(rescaled_image.size, image.size))
|
||||
ratio_width, ratio_height = ratios
|
||||
|
||||
target = target.copy()
|
||||
if "boxes" in target:
|
||||
boxes = target["boxes"]
|
||||
scaled_boxes = boxes * torch.as_tensor([ratio_width, ratio_height, ratio_width, ratio_height])
|
||||
target["boxes"] = scaled_boxes
|
||||
|
||||
if "area" in target:
|
||||
area = target["area"]
|
||||
scaled_area = area * (ratio_width * ratio_height)
|
||||
target["area"] = scaled_area
|
||||
|
||||
h, w = size
|
||||
target["size"] = torch.tensor([h, w])
|
||||
|
||||
if "masks" in target:
|
||||
target['masks'] = interpolate(
|
||||
target['masks'][:, None].float(), size, mode="nearest")[:, 0] > 0.5
|
||||
|
||||
return rescaled_image, target
|
||||
|
||||
|
||||
def pad(image, target, padding):
|
||||
# assumes that we only pad on the bottom right corners
|
||||
padded_image = F.pad(image, (0, 0, padding[0], padding[1]))
|
||||
if target is None:
|
||||
return padded_image, None
|
||||
target = target.copy()
|
||||
# should we do something wrt the original size?
|
||||
target["size"] = torch.tensor(padded_image.size[::-1])
|
||||
if "masks" in target:
|
||||
target['masks'] = torch.nn.functional.pad(target['masks'], (0, padding[0], 0, padding[1]))
|
||||
return padded_image, target
|
||||
72
rtdetrv2_pytorch/src/data/transforms/mosaic.py
Normal file
72
rtdetrv2_pytorch/src/data/transforms/mosaic.py
Normal file
@@ -0,0 +1,72 @@
|
||||
""""Copyright(c) 2023 lyuwenyu. All Rights Reserved.
|
||||
"""
|
||||
|
||||
import torch
|
||||
import torchvision
|
||||
torchvision.disable_beta_transforms_warning()
|
||||
import torchvision.transforms.v2 as T
|
||||
import torchvision.transforms.v2.functional as F
|
||||
|
||||
import random
|
||||
from PIL import Image
|
||||
|
||||
from .._misc import convert_to_tv_tensor
|
||||
from ...core import register
|
||||
|
||||
|
||||
@register()
|
||||
class Mosaic(T.Transform):
|
||||
def __init__(self, size, max_size=None, ) -> None:
|
||||
super().__init__()
|
||||
self.resize = T.Resize(size=size, max_size=max_size)
|
||||
self.crop = T.RandomCrop(size=max_size if max_size else size)
|
||||
|
||||
# TODO add arg `output_size` for affine`
|
||||
# self.random_perspective = T.RandomPerspective(distortion_scale=0.5, p=1., )
|
||||
self.random_affine = T.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.5, 1.5), fill=114)
|
||||
|
||||
def forward(self, *inputs):
|
||||
inputs = inputs if len(inputs) > 1 else inputs[0]
|
||||
image, target, dataset = inputs
|
||||
|
||||
images = []
|
||||
targets = []
|
||||
indices = random.choices(range(len(dataset)), k=3)
|
||||
for i in indices:
|
||||
image, target = dataset.load_item(i)
|
||||
image, target = self.resize(image, target)
|
||||
images.append(image)
|
||||
targets.append(target)
|
||||
|
||||
h, w = F.get_spatial_size(images[0])
|
||||
offset = [[0, 0], [w, 0], [0, h], [w, h]]
|
||||
image = Image.new(mode=images[0].mode, size=(w * 2, h * 2), color=0)
|
||||
for i, im in enumerate(images):
|
||||
image.paste(im, offset[i])
|
||||
|
||||
offset = torch.tensor([[0, 0], [w, 0], [0, h], [w, h]]).repeat(1, 2)
|
||||
target = {}
|
||||
for k in targets[0]:
|
||||
if k == 'boxes':
|
||||
v = [t[k] + offset[i] for i, t in enumerate(targets)]
|
||||
else:
|
||||
v = [t[k] for t in targets]
|
||||
|
||||
if isinstance(v[0], torch.Tensor):
|
||||
v = torch.cat(v, dim=0)
|
||||
|
||||
target[k] = v
|
||||
|
||||
if 'boxes' in target:
|
||||
# target['boxes'] = target['boxes'].clamp(0, 640 * 2 - 1)
|
||||
w, h = image.size
|
||||
target['boxes'] = convert_to_tv_tensor(target['boxes'], 'boxes', box_format='xyxy', spatial_size=[h, w])
|
||||
|
||||
if 'masks' in target:
|
||||
target['masks'] = convert_to_tv_tensor(target['masks'], 'masks')
|
||||
|
||||
image, target = self.random_affine(image, target)
|
||||
# image, target = self.resize(image, target)
|
||||
image, target = self.crop(image, target)
|
||||
|
||||
return image, target, dataset
|
||||
2
rtdetrv2_pytorch/src/data/transforms/presets.py
Normal file
2
rtdetrv2_pytorch/src/data/transforms/presets.py
Normal file
@@ -0,0 +1,2 @@
|
||||
""""Copyright(c) 2023 lyuwenyu. All Rights Reserved.
|
||||
"""
|
||||
Reference in New Issue
Block a user