pytorch/test/test_cuda_multigpu.py

1722 lines
70 KiB
Python

# Owner(s): ["module: cuda"]
import collections
import contextlib
import ctypes
import io
import gc
import queue
import sys
import tempfile
import threading
import torch
import torch.cuda.comm as comm
import unittest
from itertools import repeat, chain
from typing import NamedTuple
from torch.nn.parallel import scatter_gather
from torch.testing._internal.common_utils import (
IS_JETSON,
IS_REMOTE_GPU,
IS_SANDCASTLE,
NoTest,
TEST_CUDA,
TestCase,
get_cycles_per_ms,
instantiate_parametrized_tests,
run_tests,
skipCUDANonDefaultStreamIf,
skipIfRocm,
)
from torch.testing._internal.common_cuda import TEST_MULTIGPU, _create_scaling_case, _create_scaling_models_optimizers
TEST_CUDAMALLOCASYNC = TEST_CUDA and (torch.cuda.get_allocator_backend() == "cudaMallocAsync")
if not TEST_CUDA:
print('CUDA not available, skipping tests', file=sys.stderr)
TestCase = NoTest # noqa: F811
class TestCudaMultiGPU(TestCase):
FIFTY_MIL_CYCLES = 50000000
def _check_memory_stat_consistency(self):
snapshot = torch.cuda.memory_snapshot()
expected_each_device = collections.defaultdict(lambda: collections.defaultdict(int))
for segment in snapshot:
expandable = segment["is_expandable"]
expected = expected_each_device[segment["device"]]
pool_str = segment["segment_type"] + "_pool"
if not expandable:
expected["segment.all.current"] += 1
expected["segment." + pool_str + ".current"] += 1
expected["allocated_bytes.all.current"] += segment["allocated_size"]
expected["allocated_bytes." + pool_str + ".current"] += segment["allocated_size"]
expected["reserved_bytes.all.current"] += segment["total_size"]
expected["reserved_bytes." + pool_str + ".current"] += segment["total_size"]
expected["active_bytes.all.current"] += segment["active_size"]
expected["active_bytes." + pool_str + ".current"] += segment["active_size"]
expected["requested_bytes.all.current"] += segment["requested_size"]
expected["requested_bytes." + pool_str + ".current"] += segment["requested_size"]
sum_requested = 0
is_split = len(segment["blocks"]) > 1
for block in segment["blocks"]:
if block["state"] == "active_allocated":
expected["allocation.all.current"] += 1
expected["allocation." + pool_str + ".current"] += 1
if block["state"].startswith("active_"):
sum_requested += block["requested_size"]
expected["active.all.current"] += 1
expected["active." + pool_str + ".current"] += 1
if block["state"] == "inactive" and is_split and not expandable:
expected["inactive_split.all.current"] += 1
expected["inactive_split." + pool_str + ".current"] += 1
expected["inactive_split_bytes.all.current"] += block["size"]
expected["inactive_split_bytes." + pool_str + ".current"] += block["size"]
self.assertEqual(sum_requested, segment["requested_size"])
for device, expected in expected_each_device.items():
stats = torch.cuda.memory_stats(device)
for k, v in expected.items():
self.assertEqual(v, stats[k])
def test_cuda_synchronize(self):
torch.cuda.synchronize()
torch.cuda.synchronize('cuda')
torch.cuda.synchronize('cuda:0')
torch.cuda.synchronize(0)
torch.cuda.synchronize(torch.device('cuda:0'))
if TEST_MULTIGPU:
torch.cuda.synchronize('cuda:1')
torch.cuda.synchronize(1)
torch.cuda.synchronize(torch.device('cuda:1'))
with self.assertRaisesRegex(ValueError, "Expected a cuda device, but"):
torch.cuda.synchronize(torch.device("cpu"))
with self.assertRaisesRegex(ValueError, "Expected a cuda device, but"):
torch.cuda.synchronize("cpu")
@staticmethod
def _test_memory_stats_generator(self, device=None, N=35):
if device is None:
device = torch.cuda.current_device()
m0 = torch.cuda.memory_allocated(device)
last_m_arr = [torch.cuda.memory_allocated(device)]
max_m_arr = [torch.cuda.max_memory_allocated(device)]
last_r_arr = [torch.cuda.memory_reserved(device)]
max_r_arr = [torch.cuda.max_memory_reserved(device)]
def alloc(*size):
with torch.cuda.device(device):
# NOTE: do **not** use methods that can have additional
# memory overhead, e.g., inplace random sampling methods.
# they can leave some memory occupied even after being
# deallocated, e.g., initialized RNG state, causing some
# memory checks below to fail.
return torch.cuda.FloatTensor(*size)
def assert_change(comp=1, empty_cache=False, reset_peak=False):
# comp > 0: increased
# comp = 0: equal
# comp < 0: decreased
new_m = torch.cuda.memory_allocated(device)
new_max_m = torch.cuda.max_memory_allocated(device)
if comp > 0:
self.assertGreater(new_m, last_m_arr[0])
elif comp < 0:
self.assertLess(new_m, last_m_arr[0])
else:
self.assertEqual(new_m, last_m_arr[0])
self.assertLessEqual(new_m, new_max_m)
self.assertGreaterEqual(new_max_m, max_m_arr[0])
last_m_arr[0] = new_m
max_m_arr[0] = new_max_m
new_r = torch.cuda.memory_reserved(device)
new_max_r = torch.cuda.max_memory_reserved(device)
# emptying cache may happen (due to allocation or empty_cache), so
# we can't assert new_c >= last_c
self.assertLessEqual(new_r, new_max_r)
self.assertGreaterEqual(new_max_r, max_r_arr[0])
last_r_arr[0] = new_r
max_r_arr[0] = new_max_r
if empty_cache:
torch.cuda.empty_cache()
new_r = torch.cuda.memory_reserved(device)
new_max_r = torch.cuda.max_memory_reserved(device)
self.assertLessEqual(new_r, last_r_arr[0])
self.assertLessEqual(new_r, new_max_r)
self.assertEqual(new_max_r, max_r_arr[0])
last_r_arr[0] = new_r
if reset_peak:
torch.cuda.reset_peak_memory_stats(device)
self.assertEqual(torch.cuda.memory_allocated(device), last_m_arr[0])
self.assertEqual(torch.cuda.max_memory_allocated(device), last_m_arr[0])
max_m_arr[0] = last_m_arr[0]
self.assertEqual(torch.cuda.memory_reserved(device), last_r_arr[0])
self.assertEqual(torch.cuda.max_memory_reserved(device), last_r_arr[0])
max_r_arr[0] = last_r_arr[0]
assert_change(0)
assert_change(0, reset_peak=True)
assert_change(0, empty_cache=True)
assert_change(0, reset_peak=True)
assert_change(0)
yield
tensors1 = [alloc(1), alloc(10, 20), alloc(200, 300, 2000)]
m1 = torch.cuda.memory_allocated(device)
assert_change(1)
yield
tensors2 = []
for i in range(1, int(N / 2) + 1):
# small ones
tensors2.append(alloc(i, i * 4))
assert_change(1)
yield
for i in range(5, int(N / 2) + 5):
# large ones
tensors2.append(alloc(i, i * 7, i * 9, i * 11))
assert_change(1, reset_peak=(i % 2 == 0))
yield
tensors2.append(alloc(0, 0, 0))
assert_change(0)
yield
permute = []
for i in torch.randperm(len(tensors2)):
permute.append(tensors2[i])
assert_change(0)
yield
del tensors2
assert_change(0)
yield
tensors2 = permute
assert_change(0)
yield
del permute
assert_change(0, reset_peak=True)
yield
for i in range(int(N / 2)):
x = tensors2[i].numel()
del tensors2[i]
assert_change(-x) # in case that tensors2[i] is empty
yield
for i in range(2, int(2 * N / 3) + 2):
tensors2.append(alloc(i, i * 3, i * 8))
assert_change(1)
yield
del tensors2
assert_change(-1, reset_peak=True)
assert_change(0)
self.assertEqual(torch.cuda.memory_allocated(device), m1)
yield True
del tensors1
assert_change(-1, reset_peak=True)
self.assertEqual(torch.cuda.memory_allocated(device), m0)
# test empty_cache and reset_peak
assert_change(0, empty_cache=True)
assert_change(0, reset_peak=True)
@unittest.skipIf(TEST_CUDAMALLOCASYNC, "temporarily disabled")
def test_memory_stats(self):
gc.collect()
torch.cuda.empty_cache()
for _ in self._test_memory_stats_generator(self):
self._check_memory_stat_consistency()
@unittest.skipIf(TEST_CUDAMALLOCASYNC, "temporarily disabled")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_memory_stats_multigpu(self):
# advance a generator with a end flag
def advance(gen, end):
if not end:
try:
next(gen)
except StopIteration:
end = True
return end
# interlace
torch.cuda.empty_cache()
gen0 = self._test_memory_stats_generator(self, device='cuda:0', N=35)
gen1 = self._test_memory_stats_generator(self, device=torch.device('cuda:1'), N=35)
end0 = end1 = False
while not (end0 and end1):
end0 = advance(gen0, end0)
end1 = advance(gen1, end1)
# semi-random order
torch.cuda.empty_cache()
gen0 = self._test_memory_stats_generator(self, device=0, N=35)
gen1 = self._test_memory_stats_generator(self, device=torch.device('cuda:1'), N=35)
end0 = end1 = False
while not (end0 and end1):
end0 = advance(gen0, end0)
if not end0:
gen1_max_times = torch.LongTensor(1).random_(0, 3)[0]
else:
gen1_max_times = torch.inf
t = 0
while t < gen1_max_times and not end1:
end1 = advance(gen1, end1)
t += 1
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_autogpu(self):
x = torch.randn(5, 5).cuda()
y = torch.randn(5, 5).cuda()
self.assertEqual(x.get_device(), 0)
self.assertEqual(x.get_device(), 0)
with torch.cuda.device(1):
z = torch.randn(5, 5).cuda()
self.assertEqual(z.get_device(), 1)
q = x.add(y)
self.assertEqual(q.get_device(), 0)
w = torch.randn(5, 5).cuda()
self.assertEqual(w.get_device(), 1)
self.assertEqual(y.cuda().get_device(), 1)
z = z.cuda()
self.assertEqual(z.get_device(), 0)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_new(self):
x = torch.randn(3, 3).cuda()
self.assertEqual(x.new([0, 1, 2]).get_device(), 0)
self.assertEqual(x.new([0, 1, 2], device=1).get_device(), 1)
with torch.cuda.device(1):
self.assertEqual(x.new([0, 1, 2]).get_device(), 0)
self.assertEqual(x.new([0, 1, 2], device=1).get_device(), 1)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_copy_device(self):
x = torch.randn(5, 5).cuda()
with torch.cuda.device(1):
y = x.cuda()
self.assertEqual(y.get_device(), 1)
self.assertIs(y.cuda(), y)
z = y.cuda(0)
self.assertEqual(z.get_device(), 0)
self.assertIs(z.cuda(0), z)
x = torch.randn(5, 5)
with torch.cuda.device(1):
y = x.cuda()
self.assertEqual(y.get_device(), 1)
self.assertIs(y.cuda(), y)
z = y.cuda(0)
self.assertEqual(z.get_device(), 0)
self.assertIs(z.cuda(0), z)
def _test_copy_sync_current_stream(self, x, y):
x_plus_one = x + 1
s0 = torch.cuda.Stream(device=x.device)
s1 = torch.cuda.Stream(device=y.device)
s2 = torch.cuda.Stream(device=x.device)
s3 = torch.cuda.Stream(device=y.device)
# same dst stream different src streams
with torch.cuda.stream(s0):
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
with torch.cuda.stream(s1):
y.copy_(x_plus_one)
with torch.cuda.stream(s2), torch.cuda.stream(s1):
y.copy_(x)
s1.synchronize()
# The copy() is synchronized on the current streams of both src and dst.
# In the above test, the _sleep() op on s0 will not block the copy() on
# s2, but both copies are synchronized on s1 in the dst device. Hence,
# x is copied to y after x_plus_one is copied to y. If x and y are on
# the same device, both copy() ops are synchronized on s1.
self.assertEqual(y, x)
# same src stream different dst streams
with torch.cuda.stream(s1):
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
with torch.cuda.stream(s0):
y.copy_(x_plus_one)
with torch.cuda.stream(s3), torch.cuda.stream(s0):
y.copy_(x)
s0.synchronize()
# Similarly, both copy() ops are synchronized on s0.
self.assertEqual(y, x)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_copy_streams(self):
d0 = torch.device('cuda:0')
x0 = torch.zeros(5, 5, device=d0)
d1 = torch.device('cuda:1')
x1 = torch.zeros(5, 5, device=d1)
self._test_copy_sync_current_stream(x0, x1)
x2 = torch.zeros(5, 5, device=d0)
self._test_copy_sync_current_stream(x0, x2)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_cat_autogpu(self):
x = torch.randn(4, 4).cuda(1)
y = torch.randn(4, 4).cuda(1)
z = torch.cat([x, y], 0)
self.assertEqual(z.get_device(), x.get_device())
@unittest.skipIf(torch.cuda.device_count() >= 10, "Loading a cuda:9 tensor")
def test_load_nonexistent_device(self):
# Setup: create a serialized file object with a 'cuda:9' restore location
tensor = torch.randn(2, device='cuda')
buf = io.BytesIO()
torch.save(tensor, buf)
# NB: this might not work in the future if serialization changes
buf = io.BytesIO(buf.getvalue().replace(b'cuda:0', b'cuda:9'))
msg = r'Attempting to deserialize object on CUDA device 9'
with self.assertRaisesRegex(RuntimeError, msg):
_ = torch.load(buf)
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_multigpu_serialization_remap(self):
x = [torch.randn(4, 4).cuda(0), torch.randn(4, 4).cuda(1)]
def gpu_remap(storage, location):
if location == 'cuda:1':
return storage.cuda(0)
with tempfile.NamedTemporaryFile() as f:
torch.save(x, f)
f.seek(0)
x_copy = torch.load(f, map_location=gpu_remap)
for original, copy in zip(x, x_copy):
self.assertEqual(copy, original)
self.assertIs(type(copy), type(original))
self.assertEqual(copy.get_device(), 0)
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_multigpu_serialization_remap_dict(self):
x = [torch.randn(4, 4).cuda(0), torch.randn(4, 4).cuda(1)]
with tempfile.NamedTemporaryFile() as f:
torch.save(x, f)
f.seek(0)
x_copy = torch.load(f, map_location={'cuda:1': 'cuda:0'})
for original, copy in zip(x, x_copy):
self.assertEqual(copy, original)
self.assertIs(type(copy), type(original))
self.assertEqual(copy.get_device(), 0)
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_multigpu_storage_clone(self):
x = torch.randn(4, 4, device='cuda:1').storage()
y = x.clone()
self.assertEqual(x.get_device(), y.get_device())
for t in ['byte', 'char', 'short', 'int', 'long', 'half', 'double']:
self.assertEqual(getattr(x, t)().get_device(), x.get_device())
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_cuda_set_device(self):
x = torch.randn(5, 5)
with torch.cuda.device(1):
self.assertEqual(x.cuda().get_device(), 1)
torch.cuda.set_device(0)
self.assertEqual(x.cuda().get_device(), 0)
with torch.cuda.device(1):
self.assertEqual(x.cuda().get_device(), 1)
self.assertEqual(x.cuda().get_device(), 0)
torch.cuda.set_device(1)
self.assertEqual(x.cuda().get_device(), 0)
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_current_stream(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
s0 = torch.cuda.current_stream()
s1 = torch.cuda.current_stream(device=1)
s2 = torch.cuda.current_stream(device=0)
self.assertEqual(d0, s0.device)
self.assertEqual(d1, s1.device)
self.assertEqual(d0, s2.device)
self.assertEqual(s0, s2)
with torch.cuda.device(d1):
s0 = torch.cuda.current_stream()
s1 = torch.cuda.current_stream(1)
s2 = torch.cuda.current_stream(d0)
self.assertEqual(d1, s0.device)
self.assertEqual(d1, s1.device)
self.assertEqual(d0, s2.device)
self.assertEqual(s0, s1)
with self.assertRaisesRegex(ValueError,
"Expected a cuda device, but got: cpu"):
torch.cuda.current_stream(torch.device('cpu'))
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
@skipCUDANonDefaultStreamIf(True)
def test_default_stream(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
with torch.cuda.device(d0):
s0 = torch.cuda.default_stream()
with torch.cuda.device(d1):
s1 = torch.cuda.default_stream()
s2 = torch.cuda.default_stream(device=0)
s3 = torch.cuda.default_stream(d1)
self.assertEqual(d0, s0.device)
self.assertEqual(d1, s1.device)
self.assertEqual(d0, s2.device)
self.assertEqual(d1, s3.device)
self.assertEqual(s0, s2)
self.assertEqual(s1, s3)
with torch.cuda.device(d0):
self.assertEqual(torch.cuda.current_stream(), s0)
with torch.cuda.device(d1):
self.assertEqual(torch.cuda.current_stream(), s1)
with self.assertRaisesRegex(ValueError,
"Expected a cuda device, but got: cpu"):
torch.cuda.default_stream(torch.device('cpu'))
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_stream_event_device(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
e0 = torch.cuda.Event()
self.assertEqual(None, e0.device)
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
s0.record_event(e0)
with torch.cuda.device(d1):
s1 = torch.cuda.Stream()
e1 = s1.record_event()
self.assertEqual(s0.device, torch.device('cuda:0'))
self.assertEqual(e0.device, torch.device('cuda:0'))
self.assertEqual(s1.device, torch.device('cuda:1'))
self.assertEqual(e1.device, torch.device('cuda:1'))
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_stream_context(self):
s0 = torch.cuda.current_stream()
s1 = torch.cuda.Stream(device=1)
s2 = torch.cuda.Stream(device=0)
with torch.cuda.device(s1.device):
prev_stream_on_cuda1 = torch.cuda.current_stream()
self.assertEqual(torch.cuda.current_stream(), s0)
self.assertEqual(0, torch.cuda.current_device())
with torch.cuda.stream(s1):
self.assertEqual(torch.cuda.current_stream(), s1)
self.assertEqual(1, torch.cuda.current_device())
with torch.cuda.stream(s2):
self.assertEqual(torch.cuda.current_stream(), s2)
self.assertEqual(0, torch.cuda.current_device())
with torch.cuda.stream(s0):
self.assertEqual(torch.cuda.current_stream(), s0)
self.assertEqual(0, torch.cuda.current_device())
self.assertEqual(torch.cuda.current_stream(), s2)
self.assertEqual(0, torch.cuda.current_device())
self.assertEqual(torch.cuda.current_stream(), s1)
self.assertEqual(1, torch.cuda.current_device())
with torch.cuda.device(s1.device):
self.assertEqual(prev_stream_on_cuda1, torch.cuda.current_stream())
self.assertEqual(torch.cuda.current_stream(), s0)
self.assertEqual(0, torch.cuda.current_device())
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_streams_multi_gpu(self):
default_stream = torch.cuda.current_stream()
self.assertEqual(default_stream.device, torch.device('cuda:0'))
stream = torch.cuda.Stream(device=1)
self.assertEqual(stream.device, torch.device('cuda:1'))
with torch.cuda.device(1):
self.assertEqual(
torch.cuda.current_stream().device, torch.device('cuda:1'))
self.assertNotEqual(torch.cuda.current_stream(), default_stream)
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_streams_multi_gpu_query(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
torch.cuda.synchronize(d0)
torch.cuda.synchronize(d1)
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
with torch.cuda.device(d1):
s1 = torch.cuda.current_stream()
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
self.assertTrue(s0.query())
self.assertFalse(s1.query())
with torch.cuda.device(d0):
self.assertTrue(s0.query())
self.assertFalse(s1.query())
with torch.cuda.device(d1):
self.assertTrue(s0.query())
self.assertFalse(s1.query())
# deliberately using a different device
with torch.cuda.device(d0):
s1.synchronize()
self.assertTrue(s0.query())
self.assertTrue(s1.query())
with torch.cuda.device(d0):
self.assertTrue(s0.query())
self.assertTrue(s1.query())
with torch.cuda.device(d1):
self.assertTrue(s0.query())
self.assertTrue(s1.query())
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_streams_multi_gpu_eq(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
s1 = torch.cuda.current_stream()
with torch.cuda.device(d1):
s2 = torch.cuda.current_stream()
s3 = torch.cuda.current_stream()
self.assertTrue(s0 == s0)
self.assertTrue(s0 == s1)
self.assertTrue(s2 == s2)
self.assertTrue(s2 == s3)
self.assertFalse(s0 == s2)
self.assertFalse(s1 == s3)
self.assertEqual(s0.device, s1.device)
self.assertEqual(s0.cuda_stream, s1.cuda_stream)
self.assertEqual(s2.device, s3.device)
self.assertEqual(s2.cuda_stream, s3.cuda_stream)
self.assertNotEqual(s0.device, s3.device)
self.assertEqual(hash(s0), hash(s1))
self.assertEqual(hash(s2), hash(s3))
self.assertNotEqual(hash(s0), hash(s3))
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_streams_priority(self):
low, high = torch.cuda.Stream.priority_range()
s0 = torch.cuda.Stream(device=0, priority=low)
self.assertEqual(low, s0.priority)
self.assertEqual(torch.device('cuda:0'), s0.device)
s1 = torch.cuda.Stream(device=1, priority=high)
self.assertEqual(high, s1.priority)
self.assertEqual(torch.device('cuda:1'), s1.device)
@unittest.skipIf(not TEST_MULTIGPU, "multi-GPU not supported")
def test_tensor_device(self):
self.assertEqual(torch.cuda.FloatTensor(1).get_device(), 0)
self.assertEqual(torch.cuda.FloatTensor(1, device=1).get_device(), 1)
with torch.cuda.device(1):
self.assertEqual(torch.cuda.FloatTensor(1).get_device(), 1)
self.assertEqual(torch.cuda.FloatTensor(1, device=0).get_device(), 0)
self.assertEqual(torch.cuda.FloatTensor(1, device=None).get_device(), 1)
@staticmethod
def _stream_synchronize(self, spin_time_cycles):
s = torch.cuda.current_stream()
e_tik = torch.cuda.Event(enable_timing=True)
e_tok = torch.cuda.Event(enable_timing=True)
e_tik.record(s)
torch.cuda._sleep(spin_time_cycles)
e_tok.record(s)
s.synchronize()
self.assertTrue(s.query())
# not necessary to check e_tik and e_tok, as elapsed_time would throw
# exception if otherwise.
return e_tik.elapsed_time(e_tok)
@staticmethod
def _event_synchronize(self, spin_time_cycles):
s = torch.cuda.current_stream()
e_tik = torch.cuda.Event(enable_timing=True)
e_tok = torch.cuda.Event(enable_timing=True)
e_tik.record(s)
torch.cuda._sleep(spin_time_cycles)
s.record_event(e_tok)
e_tok.synchronize()
self.assertTrue(s.query())
# not necessary to check e_tik and e_tok, as elapsed_time would throw
# exception if otherwise.
return e_tik.elapsed_time(e_tok)
@staticmethod
def _event_wait(self, spin_time_cycles):
s0 = torch.cuda.current_stream()
s1 = torch.cuda.Stream()
e_tik = torch.cuda.Event(blocking=True, enable_timing=True)
e_tok = torch.cuda.Event(blocking=True, enable_timing=True)
e_tik.record(s0)
torch.cuda._sleep(spin_time_cycles - 10)
e_sync = torch.cuda.Event(blocking=True)
e_sync.record()
e_sync.wait(s1)
with torch.cuda.stream(s1):
torch.cuda._sleep(10)
s1.synchronize()
e_tok.record()
e_tok.synchronize()
self.assertTrue(s0.query())
self.assertTrue(s1.query())
self.assertTrue(e_sync.query())
# not necessary to check e_tik and e_tok, as elapsed_time would throw
# exception if otherwise.
return e_tik.elapsed_time(e_tok)
@staticmethod
def _test_stream_event_nogil(self, sync_func, p2c, c2p):
with torch.cuda.device('cuda:1'):
c2p.put(0)
p2c.get()
c2p.put(sync_func(self, TestCudaMultiGPU.FIFTY_MIL_CYCLES))
# Skip the test for ROCm as per https://github.com/pytorch/pytorch/issues/53190
@skipIfRocm
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_stream_event_nogil(self):
for sync_func in [TestCudaMultiGPU._stream_synchronize,
TestCudaMultiGPU._event_synchronize,
TestCudaMultiGPU._event_wait]:
p2c = queue.Queue()
c2p = queue.Queue()
e_tik = torch.cuda.Event(enable_timing=True)
e_tok = torch.cuda.Event(enable_timing=True)
t = threading.Thread(
target=TestCudaMultiGPU._test_stream_event_nogil,
args=(self, sync_func, p2c, c2p))
t.daemon = True
t.start()
c2p.get()
with torch.cuda.device('cuda:0'):
e_tik.record()
p2c.put(0)
parent_time = sync_func(self, TestCudaMultiGPU.FIFTY_MIL_CYCLES)
child_time = c2p.get()
e_tok.record()
e_tok.synchronize()
total_time = e_tik.elapsed_time(e_tok)
# Without GIL, synchronizations in parent and child threads can
# overlap. The total execution time should be a little bit longer
# than spinning fifty million cycles and much shorter than twice of
# that. However, testing absolute execution time is not reliable as
# it may vary on different hardware in different environments.
# Therefore, this test uses relative comparisons, checking if the
# sum of parent and child threads execution time is greater than the
# real execution time by least 40%.
self.assertGreater(parent_time + child_time, total_time * 1.4)
# This test is flaky for ROCm, see issue #62602
@skipIfRocm
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_events_wait(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
torch.cuda.synchronize(d0)
torch.cuda.synchronize(d1)
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
e0 = torch.cuda.Event()
s0.record_event(e0)
with torch.cuda.device(d1):
s1 = torch.cuda.current_stream()
self.assertFalse(s0.query())
self.assertTrue(s1.query())
s1.wait_event(e0)
s1.synchronize()
self.assertTrue(e0.query())
self.assertTrue(s0.query())
self.assertTrue(s1.query())
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_events_multi_gpu_query(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
e0 = s0.record_event()
s0.synchronize()
with torch.cuda.device(d1):
s1 = torch.cuda.current_stream()
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
e1 = s1.record_event()
self.assertTrue(e0.query())
self.assertFalse(e1.query())
with torch.cuda.device(d0):
self.assertTrue(e0.query())
self.assertFalse(e1.query())
with torch.cuda.device(d1):
self.assertTrue(e0.query())
self.assertFalse(e1.query())
# deliberately using a different device
with torch.cuda.device(d0):
e1.synchronize()
self.assertTrue(e0.query())
self.assertTrue(e1.query())
with torch.cuda.device(d0):
self.assertTrue(e0.query())
self.assertTrue(e1.query())
with torch.cuda.device(d1):
self.assertTrue(e0.query())
self.assertTrue(e1.query())
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
@skipIfRocm
def test_events_multi_gpu_elapsed_time(self):
d0 = torch.device('cuda:0')
d1 = torch.device('cuda:1')
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
e0 = torch.cuda.Event(enable_timing=True)
torch.cuda._sleep(10)
s0.record_event(e0)
with torch.cuda.device(d1):
s1 = torch.cuda.current_stream()
e1 = torch.cuda.Event(enable_timing=True)
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
s1.record_event(e1)
e0.synchronize()
e1.synchronize()
with torch.cuda.device(d0):
with self.assertRaises(RuntimeError):
self.assertGreater(e0.elapsed_time(e1), 0)
with torch.cuda.device(d1):
with self.assertRaises(RuntimeError):
self.assertGreater(e0.elapsed_time(e1), 0)
with torch.cuda.device(d0):
s0 = torch.cuda.current_stream()
e2 = torch.cuda.Event(enable_timing=True)
torch.cuda._sleep(TestCudaMultiGPU.FIFTY_MIL_CYCLES)
s0.record_event(e2)
s0.synchronize()
self.assertGreater(e0.elapsed_time(e2), 0)
# deliberately calling from a different device
with torch.cuda.device(d1):
self.assertGreater(e0.elapsed_time(e2), 0)
@contextlib.contextmanager
def _get_external_stream(self, device):
cudart = torch.cuda.cudart()
stream = ctypes.c_ulonglong(0)
stream_p = ctypes.POINTER(ctypes.c_void_p)(stream)
stream_p_int = ctypes.cast(stream_p, ctypes.c_void_p).value
with device:
try:
out = cudart.cudaStreamCreate(stream_p_int)
self.assertEqual(out, 0)
self.assertNotEqual(stream.value, 0)
yield stream.value
finally:
out = cudart.cudaStreamDestroy(stream.value)
self.assertEqual(out, 0)
def test_external_streams(self):
device = torch.cuda.device(0)
with self._get_external_stream(device) as stream_v:
ext_stream = torch.cuda.ExternalStream(stream_v)
self.assertEqual(stream_v, ext_stream.cuda_stream)
self.assertEqual(ext_stream.device.index, device.idx)
@unittest.skipIf(not TEST_MULTIGPU, "detected only one GPU")
def test_external_streams_multi_device(self):
device = torch.cuda.device(1)
with self._get_external_stream(device) as stream_v:
ext_stream = torch.cuda.ExternalStream(
stream_v, device=device)
self.assertEqual(stream_v, ext_stream.cuda_stream)
self.assertEqual(ext_stream.device.index, device.idx)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_caching_pinned_memory_multi_gpu(self):
# checks that the events preventing pinned memory from being re-used
# too early are recorded on the correct GPU
cycles_per_ms = get_cycles_per_ms()
t = torch.FloatTensor([1]).pin_memory()
ptr = t.data_ptr()
gpu_tensor0 = torch.cuda.FloatTensor([0], device=0)
gpu_tensor1 = torch.cuda.FloatTensor([0], device=1)
with torch.cuda.device(1):
torch.cuda._sleep(int(1000 * cycles_per_ms)) # delay the copy by 1s
gpu_tensor1.copy_(t, non_blocking=True)
del t
t = torch.FloatTensor([2]).pin_memory()
self.assertNotEqual(t.data_ptr(), ptr, msg='allocation re-used too soon')
with torch.cuda.device(0):
gpu_tensor0.copy_(t, non_blocking=True)
self.assertEqual(gpu_tensor1[0], 1)
self.assertEqual(gpu_tensor0[0], 2)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_get_set_rng_state_all(self):
states = torch.cuda.get_rng_state_all()
before0 = torch.cuda.FloatTensor(100, device=0).normal_()
before1 = torch.cuda.FloatTensor(100, device=1).normal_()
torch.cuda.set_rng_state_all(states)
after0 = torch.cuda.FloatTensor(100, device=0).normal_()
after1 = torch.cuda.FloatTensor(100, device=1).normal_()
self.assertEqual(before0, after0, atol=0, rtol=0)
self.assertEqual(before1, after1, atol=0, rtol=0)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_rng_state_offset(self):
before = torch.cuda.get_rng_state()
torch.cuda._set_rng_state_offset(100)
offset = torch.cuda._get_rng_state_offset()
torch.cuda.set_rng_state(before)
self.assertEqual(offset, 100)
# Verifies that mem_get_info works, including when called for a different device
def test_mem_get_info(self):
def _test(idx):
before_free_bytes, before_available_bytes = torch.cuda.mem_get_info(idx)
# increasing to 8MB to force acquiring a new block and overcome blocksize differences across platforms
t = torch.randn(1024 * 1024 * 8, device='cuda:' + str(idx))
if IS_JETSON:
# w/o syncing, mem_get_info will run before memory allocated has actually increased.
# This race condition causes consistent failure
torch.cuda.synchronize()
after_free_bytes, after_available_bytes = torch.cuda.mem_get_info(idx)
self.assertLess(after_free_bytes, before_free_bytes)
self.assertEqual(before_available_bytes, after_available_bytes)
_test(0)
if TEST_MULTIGPU:
_test(1)
# Test that wrap_with_cuda_memory_check successfully detects leak
def test_cuda_memory_leak_detection(self):
l = []
@self.wrap_with_cuda_memory_check
def no_leak():
pass
@self.wrap_with_cuda_memory_check
def leak_gpu0():
# increasing to 8MB to force acquiring a new block and overcome blocksize differences across platforms
l.append(torch.randn(1024 * 1024 * 8, device=torch.device("cuda:0")))
no_leak()
regex = r"CUDA driver API confirmed .+ on device 0.+"
if IS_JETSON:
try:
leak_gpu0()
except RuntimeError as e:
import re
assert re.match(regex, str(e)), str(e) + "\n does not match: \n" + regex
else:
# assertRaisesRegex does not pass with Python for Jetson,
# even though the RuntimeError matches regex using re.match
with self.assertRaisesRegex(RuntimeError, regex):
leak_gpu0()
if TEST_MULTIGPU:
@self.wrap_with_cuda_memory_check
def leak_gpu1():
# increasing to 8MB to force acquiring a new block and overcome blocksize differences across platforms
l.append(torch.randn(1024 * 1024 * 8, device=torch.device("cuda:1")))
with self.assertRaisesRegex(RuntimeError, r"CUDA driver API confirmed .+ on device 1.+"):
leak_gpu1()
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_streaming_backwards_device_transfer(self):
# This function must run with non-default current streams on all devices, otherwise it's meaningless.
# The intention is to test that to()'s backward (CopyBackward) interacts properly with the
# synchronization logic in torch/csrc/autograd/input_buffer.cpp.
dev0 = torch.device("cuda:0")
dev1 = torch.device("cuda:1")
# Unfortunately I need to make the tensors largeish.
# Bigger tensors = longer D2D transfers = more likely to expose races.
size = 2**26
a = torch.full((size,), 1, device=dev1, dtype=torch.float64, requires_grad=True)
b = torch.full((size,), 1, device=dev1, dtype=torch.float64, requires_grad=True)
# Here to_backward_recipient = a*b is used only once, so MulBackward's InputBuffer slot only expects 1 input.
# This tests the situation where we don't call InputBuffer::accumulate for MulBackward's InputBuffer.
to_backward_recipient = a * b
s = to_backward_recipient.to(device="cuda:0").sum()
torch.cuda.synchronize(device=dev0)
torch.cuda.synchronize(device=dev1)
s.backward()
self.assertTrue(a.grad.sum().item() == size)
self.assertTrue(b.grad.sum().item() == size)
# Here to_backward_recipient = a*b is used twice, so MulBackward's InputBuffer slot expects 2 inputs.
# This tests the situation where we do call InputBuffer::accumulate for MulBackward's InputBuffer.
a.grad = None
b.grad = None
to_backward_recipient = a * b
# Multiply by 2 here so to's backward creates gradient values that are different from the case above,
# to mitigate weirdness if the caching allocator happens to reuse memory regions that were populated
# with 1s by the case above
s0 = to_backward_recipient.to(device="cuda:0").sum() * 2.
s1 = to_backward_recipient.to(device="cuda:0").sum() * 2.
torch.cuda.synchronize(device=dev0)
torch.cuda.synchronize(device=dev1)
s0.backward(retain_graph=True)
s1.backward()
self.assertTrue(a.grad.sum().item() == 4 * size)
self.assertTrue(b.grad.sum().item() == 4 * size)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
@unittest.skipIf(IS_SANDCASTLE or IS_REMOTE_GPU, "Does not work on Sandcastle")
def test_cuda_init_race(self):
# See https://github.com/pytorch/pytorch/issues/16559
import subprocess
subprocess.check_call([sys.executable, '-c', """\
import torch
import threading
def worker(rank):
torch.tensor([1.]).cuda(rank)
t1 = threading.Thread(target=worker, args=(0,))
t2 = threading.Thread(target=worker, args=(1,))
t1.start()
t2.start()
"""])
def test_grad_scaling_unscale(self, dtype=torch.float):
inv_scale = torch.full((1,), 0.25, dtype=torch.float, device="cuda:0")
found_inf = torch.full((1,), 0.0, dtype=torch.float, device="cuda:0")
size = 10
g = torch.full((size, size), 4.0, dtype=dtype, device="cuda:0")
ginf = g.clone()
ginf[2, 2] = float('inf')
gnan = g.clone()
gnan[2, 2] = float('nan')
# Tries selected combinations of
# - contiguous grads
# - g.clone().t() which is not contiguous but still non overlapping and dense
# - variants of g.clone()[:, :5] which are not non overlapping and dense
# Non overlapping and dense grads route into a multi tensor apply kernel,
# others use a fallback per-tensor kernel, so we should try both.
cases = (
([g.clone(), g.clone()], False),
([g.clone(), g.clone().t()], False),
([g.clone(), g.clone()[:, :5]], False),
([g.clone()[:, :5], g.clone()[:, :5]], False),
([g.clone(), ginf.clone()], True),
([g.clone(), gnan.clone()], True),
([g.clone(), ginf.clone()[:, :5]], True),
([g.clone(), gnan.clone()[:, :5]], True),
([ginf.clone(), g.clone()[:, :5]], True),
([ginf.clone()[:, :5], g.clone()[:, :5]], True),
)
for grads, has_inf in cases:
found_inf.zero_()
torch._amp_foreach_non_finite_check_and_unscale_(grads, found_inf, inv_scale)
if has_inf:
self.assertEqual(found_inf, 1.0)
else:
self.assertEqual(found_inf, 0.0)
for grad in grads:
self.assertEqual(grad, torch.ones_like(grad), rtol=1e-5, atol=1e-7)
# When passing lists with mismatched dtypes to a raw
# _amp_foreach_non_finite_check_and_unscale_ call,
# it's expected to fall back to single-tensor TensorIterator kernel.
grads = [g.clone(), g.to(dtype=torch.float16)]
torch._amp_foreach_non_finite_check_and_unscale_(grads, found_inf, inv_scale)
for grad in grads:
self.assertEqual(grad, torch.ones_like(grad), rtol=1e-5, atol=1e-7)
# Passing lists with mismatched devices to a raw
# _amp_foreach_non_finite_check_and_unscale_ call should raise errors.
if TEST_MULTIGPU:
with self.assertRaisesRegex(RuntimeError, r"Expected all tensors to be on the same device"):
torch._amp_foreach_non_finite_check_and_unscale_([g.clone(), g.to(device="cuda:1")],
found_inf,
inv_scale)
# Creates a list of grads with mismatched dtypes and devices, to ensure
# scaler._unscale_grads_ organizes grads by dtype and device before calling
# _amp_foreach_non_finite_check_and_unscale_ on each set.
# If inject_inf >= 0, writes an inf into one grad for _unscale_grads_ to find.
def perfect_storm_grads(inject_inf):
grads = [g.clone(), g.clone()[:, :5], g.to(dtype=torch.float16), g.to(dtype=torch.float16)]
if TEST_MULTIGPU:
grads += [g.to(device="cuda:1"),
g.to(device="cuda:1")[:, :5],
g.to(device="cuda:1", dtype=torch.float16),
g.to(device="cuda:1", dtype=torch.float16)]
if inject_inf >= 0:
grads[inject_inf][2, 2] = float('inf')
return grads
scaler = torch.cuda.amp.GradScaler()
dummy_params = [torch.empty_like(g) for g in perfect_storm_grads(-1)]
dummy_opt = torch.optim.SGD(dummy_params, lr=1.)
# Ensures the inf/nan checking can find an inf injected onto any grad in the perfect storm.
for inject_inf in range(-1, len(dummy_params)):
found_inf = torch.full((1,), 0.0, dtype=torch.float, device="cuda:0")
grads = perfect_storm_grads(inject_inf)
for i, p in enumerate(dummy_params):
p.grad = grads[i]
found_inf_per_device = scaler._unscale_grads_(dummy_opt, inv_scale, found_inf, True)
if inject_inf < 0:
# No inf was injected, ensures unscaling worked normally.
self.assertTrue(sum(v.item() for v in found_inf_per_device.values()) == 0)
for grad in grads:
self.assertEqual(grad, torch.ones_like(grad), rtol=1e-5, atol=1e-7)
else:
# inf was injected, ensures inf was found.
self.assertTrue(sum(v.item() for v in found_inf_per_device.values()) == 1)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_grad_scaling_device_as_key(self):
# Ensure that different instances of "device" objects that point to the same device
# are treated as identical keys by dicts. GradScaler relies on this behavior, and may
# error otherwise in a way that's difficult to detect (a silent performance hit).
d = {}
t = torch.empty((1,), device="cuda:0")
dev0a = torch.device("cuda:0")
dev0b = torch.device("cuda:0")
dev1a = torch.device("cuda:1")
dev1b = torch.device("cuda:1")
self.assertTrue(hash(dev0a) == hash(dev0b))
self.assertTrue(hash(dev1a) == hash(dev1b))
d[dev0a] = "0a"
d[dev0b] = "0b"
self.assertTrue(len(d) == 1)
self.assertTrue(d[dev0a] == "0b")
d[t.device] = "t"
self.assertTrue(len(d) == 1)
self.assertTrue(d[dev0a] == "t")
d[dev1a] = "1a"
d[dev1b] = "1b"
self.assertTrue(len(d) == 2)
self.assertTrue(d[dev1a] == "1b")
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_grad_scaling_scale(self):
scaler = torch.cuda.amp.GradScaler(init_scale=2.)
t0 = torch.full((1,), 4.0, dtype=torch.float32, device="cuda:0")
t1 = torch.full((1,), 4.0, dtype=torch.float32, device="cuda:1")
# Create some nested iterables of tensors on different devices.
outputs = (t1.clone(), (t0.clone(), t1.clone()), [t0.clone(), (t1.clone(), t0.clone())])
outputs = scaler.scale(outputs)
self.assertTrue(outputs[0] == 8.0 and outputs[1][0] == 8.0 and outputs[1][1] == 8.0 and
outputs[2][0] == 8.0 and outputs[2][1][0] == 8.0 and outputs[2][1][1] == 8.0)
self.assertTrue(scaler._scale.device == t1.device)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_grad_scaling_multigpu(self):
# Same as above, but runs some of the models on device 1.
# GradScaler should transparently handle losses and gradients on multiple devices.
# This test could be combined with the test above, but I think it makes sense to treat
# multi-GPU operations separately.
dev0 = torch.device("cuda:0")
dev1 = torch.device("cuda:1")
for enabled in True, False:
mod_control0, mod_scaling0, opt_control0, opt_scaling0, data, loss_fn, skip_iter = \
_create_scaling_case()
mod_control1, mod_scaling1, opt_control1, opt_scaling1 = \
_create_scaling_models_optimizers(device=dev1)
scaler = torch.cuda.amp.GradScaler(init_scale=128., growth_factor=2.0, enabled=enabled, growth_interval=1)
def run(model0, model1, optimizer0, optimizer1, try_scaling_api):
for i, (input, target) in enumerate(data):
optimizer0.zero_grad()
optimizer1.zero_grad()
output0 = model0(input)
output1 = model1(input.to(dev1))
loss0 = loss_fn(0.3 * output0 + 0.7 * output1.to(dev0), target)
loss1 = loss_fn(0.6 * output0.to(dev1) - 0.4 * output1, target.to(dev1))
if try_scaling_api:
scaler.scale(loss0).backward(retain_graph=True)
scaler.scale(loss1).backward()
if i == skip_iter and scaler.is_enabled():
model1[1].weight.grad.data.fill_(float('inf'))
# As an additional stress test, separately unscale for one of the optimizers.
scaler.unscale_(optimizer0)
scaler.step(optimizer0)
scaler.step(optimizer1)
# Make sure the found_infs were collected properly across optimizers and devices.
if scaler.is_enabled():
self.assertTrue(len(scaler._found_inf_per_device(optimizer0)) == 1)
self.assertTrue(len(scaler._found_inf_per_device(optimizer1)) == 1)
self.assertTrue(scaler._found_inf_per_device(optimizer0)[dev0].item() == 0.)
self.assertTrue(scaler._found_inf_per_device(optimizer1)[dev1].item() ==
float(i == skip_iter))
scaler.update()
else:
loss0.backward(retain_graph=True)
loss1.backward()
optimizer0.step()
if (not scaler.is_enabled()) or (i != skip_iter):
optimizer1.step()
run(mod_control0, mod_control1, opt_control0, opt_control1, False)
run(mod_scaling0, mod_scaling1, opt_scaling0, opt_scaling1, True)
# The loss scale should have been multiplied by the growth factor 3 times and the backoff factor once.
self.assertTrue(scaler.get_scale() == (128. * scaler.get_growth_factor()**3 *
scaler.get_backoff_factor()**1) if enabled else 1.0)
# Copy mod_control1 and mod_scaling1 back the device 0 for comparison
mod_control1.to(dev0)
mod_scaling1.to(dev0)
for c, s in zip(chain(mod_control0.parameters(), mod_control1.parameters()),
chain(mod_scaling0.parameters(), mod_scaling1.parameters())):
self.assertEqual(c, s, rtol=1e-5, atol=1e-7)
@unittest.skipIf(not TEST_MULTIGPU, "Test needs multiple GPUs")
def test_cuda_device_memory_allocated(self):
from torch.cuda import memory_allocated
device_count = torch.cuda.device_count()
current_alloc = [memory_allocated(idx) for idx in range(device_count)]
x = torch.ones(10, device="cuda:0")
self.assertGreater(memory_allocated(0), current_alloc[0])
self.assertTrue(all(memory_allocated(torch.cuda.device(idx)) == current_alloc[idx] for idx in range(1, device_count)))
class TestCudaComm(TestCase):
def _test_broadcast(self, input):
if not TEST_MULTIGPU:
raise unittest.SkipTest("only one GPU detected")
# test regular
results = comm.broadcast(input, (0, 1))
for i, t in enumerate(results):
self.assertEqual(t.get_device(), i)
self.assertEqual(t, input)
if input.is_cuda and input.get_device() == i: # test not copying on same device
self.assertEqual(t.data_ptr(), input.data_ptr())
# test out=
for inplace in [True, False]:
if inplace:
outputs = [torch.empty_like(input, device=0), torch.empty_like(input, device=1)]
else:
outputs = [input.cuda(0), torch.empty_like(input, device=1)]
results = comm.broadcast(input, out=outputs)
for r, o in zip(results, outputs):
self.assertIs(r, o)
for i, t in enumerate(results):
self.assertEqual(t.get_device(), i)
self.assertEqual(t, input)
# test error msg
with self.assertRaisesRegex(RuntimeError, r"Exactly one of 'devices' and 'out'"):
comm.broadcast(input, (0, 1), out=outputs)
with self.assertRaisesRegex(RuntimeError,
r"Expected all output tensors to be CUDA tensors, but output tensor at index 1"):
comm.broadcast(input, out=[input.cuda(0), input.cpu()])
with self.assertRaisesRegex(RuntimeError,
r"Expected all output tensors to have same shape as the source .+ at index 1"):
comm.broadcast(input, out=[input.cuda(0), input.cuda(1).unsqueeze(0)])
def test_broadcast_cpu(self):
self._test_broadcast(torch.randn(5, 5))
def test_broadcast_gpu(self):
self._test_broadcast(torch.randn(5, 5).cuda())
def _test_broadcast_coalesced(self, tensors, buffer_size):
b_tensors = [comm.broadcast(t, (0, 1)) for t in tensors]
for (_, bt), t in zip(b_tensors, tensors):
self.assertEqual(bt.get_device(), 1)
self.assertEqual(bt, t)
self.assertIsInstance(bt, type(t))
bc_tensors = comm.broadcast_coalesced(tensors, (0, 1), buffer_size=buffer_size)
bc_tensors_t = list(zip(*bc_tensors))
self.assertEqual(b_tensors, bc_tensors_t)
for (_, bt), (_, bct) in zip(b_tensors, bc_tensors_t):
self.assertEqual(bt.get_device(), bct.get_device())
self.assertIsInstance(bct, type(bt))
# check that tensors on device[0] are returned as-is
for out_tensors in (b_tensors, bc_tensors_t):
for inp_t, (out_t, _) in zip(tensors, out_tensors):
self.assertIs(inp_t, out_t)
# check that the tensors not on device[0] have different version counters
# NOTE [ Version Counter in comm.*_coalesced ]
versions = [t._version for _, t in bc_tensors_t]
for old_version, (_, t) in zip(versions, bc_tensors_t):
self.assertEqual(t._version, old_version)
t.zero_()
self.assertEqual(t._version, old_version + 1)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
# Note: fails sometimes on the CI, passes on dual gfx906
def test_broadcast_coalesced(self):
numel = 5
num_bytes = numel * 8
tensors = [
self.genSparseTensor((2, 3), 2, 1, False, 'cuda', torch.float64)[0],
torch.randn(numel).long().cuda(),
torch.randn(numel).cuda(),
self.genSparseTensor((2, 3), 2, 10, False, 'cuda', torch.float64)[0],
self.genSparseTensor((2, 3), 2, 5, False, 'cuda', torch.float64)[0],
self.genSparseTensor((3, 3), 2, 7, False, 'cuda', torch.int64)[0],
self.genSparseTensor((2, 3), 2, 2, False, 'cuda', torch.float32)[0],
torch.randn(numel).long().cuda(),
torch.randn(numel).long().cuda(),
self.genSparseTensor((2, 7), 2, 3, False, 'cuda', torch.int64)[0],
torch.randn(numel * 2).int().cuda(), # int is 2x shorter
torch.randn(numel).cuda(),
]
self._test_broadcast_coalesced(tensors, num_bytes * 5 // 2)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_broadcast_coalesced_dense_only(self):
numel = 5
num_bytes = numel * 8
tensors = [
torch.randn(numel).long().cuda(),
torch.randn(numel).cuda(),
torch.randn(numel).long().cuda(),
torch.randn(numel).long().cuda(),
torch.randn(numel * 2).int().cuda(), # int is 2x shorter
torch.randn(numel).cuda(),
]
self._test_broadcast_coalesced(tensors, num_bytes * 5 // 2)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_broadcast_coalesced_empty_tensors(self):
tensors = [
torch.tensor([]).byte().cuda(),
torch.randn(5).cuda(),
torch.randn(5).double().cuda()
]
self._test_broadcast_coalesced(tensors, 256)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_reduce_add(self):
x = torch.randn(5, 5)
y = torch.randn(5, 5)
x_cuda = x.cuda(0)
y_cuda = y.cuda(1)
result = comm.reduce_add((x_cuda, y_cuda))
self.assertEqual(result.get_device(), 0)
self.assertEqual(result.cpu(), x + y)
def _test_reduce_add_coalesced(self, tensors, buffer_size):
dup_tensors = [tensors, [t.cuda(1) for t in tensors]]
r_tensors = [comm.reduce_add(t) for t in zip(*dup_tensors)]
for r, t in zip(r_tensors, tensors):
self.assertEqualTypeString(r, t)
self.assertEqual(r.coalesce() if r.is_sparse else r, t * 2)
rc_tensors = comm.reduce_add_coalesced(dup_tensors, buffer_size=buffer_size)
self.assertEqual(r_tensors, rc_tensors)
for r, rc in zip(r_tensors, rc_tensors):
self.assertEqualTypeString(rc, r)
# Since we have both cuda:0 and cuda:1 inputs, the outputs must be new.
# We can check that they have different version counters.
# NOTE [ Version Counter in comm.*_coalesced ]
versions = [t._version for t in rc_tensors]
for old_version, t in zip(versions, rc_tensors):
self.assertEqual(t._version, old_version)
t.zero_()
self.assertEqual(t._version, old_version + 1)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_reduce_add_coalesced(self):
numel = 5
num_bytes = numel * 8
tensors = [
self.genSparseTensor((2, 3), 2, 1, False, 'cuda', torch.float64)[0],
torch.randn(numel).long().cuda(),
torch.randn(numel).cuda(),
self.genSparseTensor((2, 3), 2, 10, False, 'cuda', torch.float64)[0],
self.genSparseTensor((2, 3), 2, 5, False, 'cuda', torch.float64)[0],
self.genSparseTensor((3, 3), 2, 7, False, 'cuda', torch.int64)[0],
self.genSparseTensor((2, 3), 2, 2, False, 'cuda', torch.float32)[0],
torch.randn(numel).long().cuda(),
torch.randn(numel).long().cuda(),
self.genSparseTensor((2, 7), 2, 3, False, 'cuda', torch.int64)[0],
torch.randn(numel * 2).int().cuda(), # int is 2x shorter
torch.randn(numel).cuda(),
]
self._test_reduce_add_coalesced(tensors, num_bytes * 5 // 2)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_reduce_add_coalesced_dense_only(self):
numel = 5
num_bytes = numel * 8
tensors = [
torch.randn(numel).long().cuda(),
torch.randn(numel).cuda(),
torch.randn(numel).long().cuda(),
torch.randn(numel).long().cuda(),
torch.randn(numel * 2).int().cuda(), # int is 2x shorter
torch.randn(numel).cuda(),
]
self._test_reduce_add_coalesced(tensors, num_bytes * 5 // 2)
def _test_scatter(self, input, chunk_sizes=None, dim=0):
if not TEST_MULTIGPU:
raise unittest.SkipTest("only one GPU detected")
if chunk_sizes is None:
ref_chunk_sizes = tuple(repeat(input.size(dim) // 2, 2))
else:
ref_chunk_sizes = chunk_sizes
# test regular
result = comm.scatter(input, (0, 1), chunk_sizes, dim)
self.assertEqual(len(result), 2)
chunk_start = 0
for i, r in enumerate(result):
chunk_end = chunk_start + ref_chunk_sizes[i]
index = [slice(None, None) for _ in range(input.dim())]
index[dim] = slice(chunk_start, chunk_end)
self.assertEqual(r, input[tuple(index)], atol=0, rtol=0)
chunk_start = chunk_end
if r.device == input.device:
self.assertEqual(r.data_ptr(), input.data_ptr()) # for target @ same device, a view should be returned
# test out
out = [torch.empty_like(t) for t in result]
result = comm.scatter(input, dim=dim, out=out)
self.assertEqual(len(result), 2)
chunk_start = 0
for i, r in enumerate(result):
self.assertIs(r, out[i])
chunk_end = chunk_start + ref_chunk_sizes[i]
index = [slice(None, None) for _ in range(input.dim())]
index[dim] = slice(chunk_start, chunk_end)
self.assertEqual(r, input[tuple(index)], atol=0, rtol=0)
chunk_start = chunk_end
# test error msg
if chunk_sizes is not None:
with self.assertRaisesRegex(RuntimeError, r"Expected devices and chunk_sizes to be of same length"):
comm.scatter(input, [0 for _ in range(len(chunk_sizes) + 1)], dim=dim, chunk_sizes=chunk_sizes)
with self.assertRaisesRegex(RuntimeError, r"'devices' must not be specified"):
comm.scatter(input, (0, 1), dim=dim, out=out)
with self.assertRaisesRegex(RuntimeError, r"Expected at least one device to scatter to"):
comm.scatter(input, (), dim=dim)
with self.assertRaisesRegex(RuntimeError, r"Expected at least one output tensor to scatter to"):
comm.scatter(input, dim=dim, out=[])
with self.assertRaisesRegex(RuntimeError,
r"Expected all output tensors to be CUDA tensors, but output tensor at index 0"):
comm.scatter(input, dim=dim, out=([out[0].cpu()] + out[1:]))
with self.assertRaisesRegex(RuntimeError, r"Output tensor at index 0 has incorrect shape"):
comm.scatter(input, dim=dim, out=([out[0].unsqueeze(0)] + out[1:]))
with self.assertRaisesRegex(RuntimeError, r"Total size for output tensors along scatter dim \d+ does not match"):
index = [slice(None, None) for _ in range(input.dim())]
index[dim] = slice(1, None)
comm.scatter(input, dim=dim, out=([out[0][tuple(index)]] + out[1:]))
def test_scatter_cpu(self):
self._test_scatter(torch.randn(4, 4), dim=0)
def test_scatter_cpu_dim(self):
self._test_scatter(torch.randn(4, 4), dim=1)
def test_scatter_cpu_neg_dim(self):
self._test_scatter(torch.randn(4, 4), dim=-2)
def test_scatter_cpu_sizes(self):
self._test_scatter(torch.randn(6, 4), chunk_sizes=(2, 4))
def test_scatter_gpu(self):
self._test_scatter(torch.randn(4, 4).cuda(), dim=0)
def test_scatter_gpu_dim(self):
self._test_scatter(torch.randn(4, 4).cuda(), dim=1)
def test_scatter_gpu_neg_dim(self):
self._test_scatter(torch.randn(4, 4).cuda(), dim=-2)
def test_scatter_gpu_sizes(self):
self._test_scatter(torch.randn(6, 4).cuda(), chunk_sizes=(2, 4))
def _test_gather(self, dim):
if not TEST_MULTIGPU:
raise unittest.SkipTest("only one GPU detected")
x = torch.randn(2, 5, device=0)
y = torch.randn(2, 5, device=1)
expected_size = list(x.size())
expected_size[dim] += y.size(dim)
expected_size = torch.Size(expected_size)
destinations = [None, torch.device('cuda:0'), torch.device('cpu')]
if torch.cuda.device_count() > 2:
destinations.append(torch.device('cuda:2'))
with torch.cuda.device(1):
for destination in destinations:
if destination is None:
expected_device = torch.device('cuda', torch.cuda.current_device())
else:
expected_device = destination
for use_out in [True, False]:
if use_out:
out = torch.empty(expected_size, device=expected_device)
result = comm.gather((x, y), dim, out=out)
self.assertIs(out, result)
else:
result = comm.gather((x, y), dim, destination=destination)
self.assertEqual(result.device, expected_device)
self.assertEqual(result.size(), expected_size)
index = [slice(None, None), slice(None, None)]
index[dim] = slice(0, x.size(dim))
self.assertEqual(result[tuple(index)], x)
index[dim] = slice(x.size(dim), x.size(dim) + y.size(dim))
self.assertEqual(result[tuple(index)], y)
# test error msg
with self.assertRaisesRegex(RuntimeError, r"'destination' must not be specified"):
comm.gather((x, y), dim, destination='cpu', out=torch.empty(expected_size, device='cpu'))
with self.assertRaisesRegex(RuntimeError, r"Expected at least one tensor to gather from"):
comm.gather(())
with self.assertRaisesRegex(RuntimeError, r"Expected all input tensors to be CUDA tensors, "):
comm.gather((x.cpu(), y))
with self.assertRaisesRegex(RuntimeError, r"Expected all input tensors to have the same number of dimensions"):
comm.gather((x, y.unsqueeze(0)))
with self.assertRaisesRegex(RuntimeError, r"Input tensor at index 1 has invalid shape"):
if dim in [0, -2]:
comm.gather((x, y[:, 1:]), dim=dim)
elif dim in [1, -1]:
comm.gather((x, y[1:, :]), dim=dim)
def test_gather(self):
self._test_gather(0)
def test_gather_dim(self):
self._test_gather(1)
def test_gather_neg_dim(self):
self._test_gather(-1)
@unittest.skipIf(not TEST_MULTIGPU, "only one GPU detected")
def test_memory_format_scatter_gather(self):
nhwc = torch.randn((10, 3, 32, 32), device='cpu').contiguous(memory_format=torch.channels_last)
results = torch.cuda.comm.scatter(nhwc, (0, 1), None, 0)
for result in results:
self.assertFalse(result.is_contiguous())
self.assertTrue(result.is_contiguous(memory_format=torch.channels_last))
gathered = torch.cuda.comm.gather(results)
self.assertTrue(gathered.is_contiguous(memory_format=torch.channels_last))
@unittest.skipIf(not TEST_MULTIGPU, "Test needs multiple GPUs")
def test_scatter_namedtuple(self):
# tests ability to scatter namedtuples and retrieve a list where each
# element is of the expected namedtuple type.
fields = ("a", "b")
TestNamedTupleInput_0 = collections.namedtuple("NamedTuple", fields)
num_gpus = torch.cuda.device_count()
a = torch.rand(num_gpus * 2, device=0)
b = torch.rand(num_gpus * 2, device=0)
a_tensors_for_gpu = [a[2 * i : 2 * i + 2].to(i) for i in range(num_gpus)]
b_tensors_for_gpu = [b[2 * i : 2 * i + 2].to(i) for i in range(num_gpus)]
inp = TestNamedTupleInput_0(a, b)
target_gpus = [torch.device(i) for i in range(num_gpus)]
scatter_out = scatter_gather.scatter(inp, target_gpus)
for i, x in enumerate(scatter_out):
self.assertTrue(isinstance(x, type(inp)))
self.assertEqual(x._fields, fields)
expected_a = a_tensors_for_gpu[i]
expected_b = b_tensors_for_gpu[i]
self.assertEqual(expected_a, x.a)
self.assertEqual(expected_b, x.b)
class TestNamedTupleInput_1(NamedTuple):
a: torch.tensor
b: torch.tensor
a = torch.rand(num_gpus * 2, device=0)
b = torch.rand(num_gpus * 2, device=0)
a_tensors_for_gpu = [a[2 * i : 2 * i + 2].to(i) for i in range(num_gpus)]
b_tensors_for_gpu = [b[2 * i : 2 * i + 2].to(i) for i in range(num_gpus)]
inp = TestNamedTupleInput_1(a, b)
scatter_out = scatter_gather.scatter(inp, target_gpus)
for i, x in enumerate(scatter_out):
self.assertTrue(isinstance(x, type(inp)))
self.assertEqual(x._fields, fields)
expected_a = a_tensors_for_gpu[i]
expected_b = b_tensors_for_gpu[i]
self.assertEqual(expected_a, x.a)
self.assertEqual(expected_b, x.b)
@unittest.skipIf(not TEST_MULTIGPU, "Test needs multiple GPUs")
def test_gather_namedtuple(self):
# tests ability to gather a list of namedtuples and return a namedtuple where each
# element is of the expected tensor type.
fields = ['a', 'b']
TestNamedTupleInput_0 = collections.namedtuple('NamedTuple', fields)
num_gpus = torch.cuda.device_count()
a = torch.rand(num_gpus * 2, device=0)
b = torch.rand(num_gpus * 2, device=1)
out1 = TestNamedTupleInput_0(a, b)
a = torch.rand(num_gpus * 2, device=1)
b = torch.rand(num_gpus * 2, device=0)
out2 = TestNamedTupleInput_0(a, b)
outputs = [out1, out2]
out = scatter_gather.gather(outputs, 'cpu') # test on CPU
for i, x in enumerate(out):
self.assertTrue(isinstance(x, type(out2[-1]))) # x must be a tensor
cat = torch.cat((outputs[0][i].to('cpu'), outputs[1][i].to('cpu')))
self.assertTrue(torch.equal(x, cat))
out = scatter_gather.gather(outputs, 0) # test on GPU
for i, x in enumerate(out):
self.assertTrue(isinstance(x, type(out2[-1])))
cat = torch.cat((outputs[0][i].to(0), outputs[1][i].to(0)))
self.assertTrue(torch.equal(x, cat))
class TestNamedTupleInput_1(NamedTuple):
a: torch.tensor
b: torch.tensor
a = torch.rand(num_gpus * 2, device=0)
b = torch.rand(num_gpus * 2, device=1)
out1 = TestNamedTupleInput_1(a, b)
a = torch.rand(num_gpus * 2, device=1)
b = torch.rand(num_gpus * 2, device=0)
out2 = TestNamedTupleInput_1(a, b)
outputs = [out1, out2]
out = scatter_gather.gather(outputs, 0) # test on GPU
for i, x in enumerate(out):
self.assertTrue(isinstance(x, type(out2[-1])))
cat = torch.cat((outputs[0][i].to(0), outputs[1][i].to(0)))
self.assertTrue(torch.equal(x, cat))
out = scatter_gather.gather(outputs, 'cpu') # test on CPU
for i, x in enumerate(out):
self.assertTrue(isinstance(x, type(out2[-1])))
cat = torch.cat((outputs[0][i].to('cpu'), outputs[1][i].to('cpu')))
self.assertTrue(torch.equal(x, cat))
instantiate_parametrized_tests(TestCudaMultiGPU)
if __name__ == '__main__':
run_tests()