File size: 2,982 Bytes
188f311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import math
import torch
from torch.utils.data.distributed import DistributedSampler
__all__ = ["MyDistributedSampler", "WeightedDistributedSampler"]
class MyDistributedSampler(DistributedSampler):
"""Allow Subset Sampler in Distributed Training"""
def __init__(
self, dataset, num_replicas=None, rank=None, shuffle=True, sub_index_list=None
):
super(MyDistributedSampler, self).__init__(dataset, num_replicas, rank, shuffle)
self.sub_index_list = sub_index_list # numpy
self.num_samples = int(
math.ceil(len(self.sub_index_list) * 1.0 / self.num_replicas)
)
self.total_size = self.num_samples * self.num_replicas
print("Use MyDistributedSampler: %d, %d" % (self.num_samples, self.total_size))
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(len(self.sub_index_list), generator=g).tolist()
# add extra samples to make it evenly divisible
indices += indices[: (self.total_size - len(indices))]
indices = self.sub_index_list[indices].tolist()
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank : self.total_size : self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
class WeightedDistributedSampler(DistributedSampler):
"""Allow Weighted Random Sampling in Distributed Training"""
def __init__(
self,
dataset,
num_replicas=None,
rank=None,
shuffle=True,
weights=None,
replacement=True,
):
super(WeightedDistributedSampler, self).__init__(
dataset, num_replicas, rank, shuffle
)
self.weights = (
torch.as_tensor(weights, dtype=torch.double)
if weights is not None
else None
)
self.replacement = replacement
print("Use WeightedDistributedSampler")
def __iter__(self):
if self.weights is None:
return super(WeightedDistributedSampler, self).__iter__()
else:
g = torch.Generator()
g.manual_seed(self.epoch)
if self.shuffle:
# original: indices = torch.randperm(len(self.dataset), generator=g).tolist()
indices = torch.multinomial(
self.weights, len(self.dataset), self.replacement, generator=g
).tolist()
else:
indices = list(range(len(self.dataset)))
# add extra samples to make it evenly divisible
indices += indices[: (self.total_size - len(indices))]
assert len(indices) == self.total_size
# subsample
indices = indices[self.rank : self.total_size : self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
|