parent
9b58d4cb37
commit
40246d35bc
|
@ -6,7 +6,7 @@ from tinygrad.lazy import LazyBuffer
|
|||
from tinygrad.runtime.lib import RawBuffer, RawBufferCopyIn, RawBufferCopyInOut
|
||||
try: from tinygrad.runtime.ops_hip import RawHIPBuffer
|
||||
except: RawHIPBuffer = None
|
||||
from tinygrad.runtime.ops_shm import RawShmBuffer
|
||||
from tinygrad.runtime.ops_disk import RawDiskBuffer
|
||||
from tinygrad.jit import CacheCollector
|
||||
from tinygrad.tensor import Tensor, Function
|
||||
import extra.hip_wrapper as hip
|
||||
|
@ -51,7 +51,7 @@ def _send_rb(x:RawBuffer, target_rank:int):
|
|||
s.close()
|
||||
|
||||
# copy the buffer into shared memory
|
||||
y = RawShmBuffer(x.size, x.dtype, device=shm_name)
|
||||
y = RawDiskBuffer(x.size, x.dtype, device="disk:shm:"+shm_name)
|
||||
# fast path when we can directly copyout
|
||||
if isinstance(x, RawBufferCopyInOut): x._copyout(np.frombuffer(y._buffer(), dtype=x.dtype.np))
|
||||
else: y.fromCPU(x.toCPU())
|
||||
|
@ -78,7 +78,7 @@ def _recv_rb(x:RawBuffer, target_rank:int):
|
|||
CacheCollector.add(__recv_rb, [x, target_rank, y], {})
|
||||
else:
|
||||
shm_name = dist.OOB.recv(target_rank)
|
||||
y = RawShmBuffer(x.size, x.dtype, device=shm_name)
|
||||
y = RawDiskBuffer(x.size, x.dtype, device="disk:shm:"+shm_name)
|
||||
|
||||
# fast path when we can directly copyin
|
||||
if isinstance(x, RawBufferCopyIn): x._copyin(y.toCPU())
|
||||
|
|
|
@ -8,7 +8,7 @@ def multidevice_test(fxn):
|
|||
exclude_devices = getenv("EXCLUDE_DEVICES", "").split(",")
|
||||
def ret(self):
|
||||
for device in Device._buffers:
|
||||
if device in ["DISK", "SHM", "FAKE"]: continue
|
||||
if device in ["DISK", "FAKE"]: continue
|
||||
if not CI: print(device)
|
||||
if device in exclude_devices:
|
||||
if not CI: print(f"WARNING: {device} test is excluded")
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import unittest
|
||||
import multiprocessing.shared_memory as shared_memory
|
||||
from tinygrad.helpers import CI
|
||||
from tinygrad.runtime.ops_shm import RawShmBuffer
|
||||
from tinygrad.tensor import Tensor, Device
|
||||
import numpy as np
|
||||
|
||||
|
@ -12,7 +11,7 @@ class TestRawShmBuffer(unittest.TestCase):
|
|||
# copy to shm
|
||||
shm_name = (s := shared_memory.SharedMemory(create=True, size=t.nbytes())).name
|
||||
s.close()
|
||||
t_shm = t.to(f"shm:{shm_name}").realize()
|
||||
t_shm = t.to(f"disk:shm:{shm_name}").realize()
|
||||
|
||||
# copy from shm
|
||||
t2 = t_shm.to(Device.DEFAULT).realize()
|
||||
|
@ -27,7 +26,7 @@ class TestRawShmBuffer(unittest.TestCase):
|
|||
# copy to shm
|
||||
shm_name = (s := shared_memory.SharedMemory(create=True, size=t.nbytes())).name
|
||||
s.close()
|
||||
t_shm = t.to(f"shm:{shm_name}").realize()
|
||||
t_shm = t.to(f"disk:shm:{shm_name}").realize()
|
||||
|
||||
# copy from shm
|
||||
t2 = t_shm.to(Device.DEFAULT).realize()
|
||||
|
|
|
@ -43,8 +43,9 @@ class RawBufferMapped(RawBufferCopyIn):
|
|||
def _copyin(self, x:np.ndarray) -> None: np.copyto(self.buffer_view(), x.reshape(-1))
|
||||
|
||||
# this one is simple enough that i moved it out of the runtimes
|
||||
ctypes_map = {dtypes.float64:ctypes.c_double, dtypes.float32: ctypes.c_float, dtypes.float16: ctypes.c_int16, dtypes.bfloat16: ctypes.c_int16, dtypes.int8: ctypes.c_int8, dtypes.uint8: ctypes.c_uint8, dtypes.bool: ctypes.c_uint8, dtypes.int32: ctypes.c_int32, dtypes.uint32: ctypes.c_uint32, dtypes.int64: ctypes.c_int64, dtypes.uint64: ctypes.c_uint64, dtypes.int16: ctypes.c_int16, dtypes.uint16: ctypes.c_uint16}
|
||||
class RawMallocBuffer(RawBufferMapped):
|
||||
def __init__(self, size, dtype: DType): super().__init__(size, dtype, ({dtypes.float64:ctypes.c_double, dtypes.float32: ctypes.c_float, dtypes.float16: ctypes.c_int16, dtypes.bfloat16: ctypes.c_int16, dtypes.int8: ctypes.c_int8, dtypes.uint8: ctypes.c_uint8, dtypes.bool: ctypes.c_uint8, dtypes.int32: ctypes.c_int32, dtypes.uint32: ctypes.c_uint32, dtypes.int64: ctypes.c_int64, dtypes.uint64: ctypes.c_uint64, dtypes.int16: ctypes.c_int16, dtypes.uint16: ctypes.c_uint16}[dtype] * size)())
|
||||
def __init__(self, size, dtype: DType): super().__init__(size, dtype, (ctypes_map[dtype] * size)())
|
||||
def _buffer(self): return memoryview(self._buf)
|
||||
|
||||
class RawBufferCopyInOut(RawBufferCopyIn):
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import os, mmap
|
||||
try: import _posixshmem
|
||||
except Exception: pass
|
||||
from typing import Optional
|
||||
from typing import Callable, Dict, Tuple
|
||||
from tinygrad.helpers import prod, DType
|
||||
from tinygrad.helpers import prod, DType, OSX
|
||||
from tinygrad.runtime.lib import RawBufferMapped
|
||||
from tinygrad.ops import Interpreted, Op, MovementOps, UnaryOps, BufferOps
|
||||
from tinygrad.shape.view import strides_for_shape
|
||||
|
@ -12,25 +14,41 @@ class RawDiskBuffer(RawBufferMapped):
|
|||
self.offset = offset # this is an offset in bytes
|
||||
assert device is not None or buf is not None, "disk tensor needs a path or a buf"
|
||||
if device is not None:
|
||||
f = open(device, "a+b")
|
||||
if os.path.getsize(device) < size * dtype.itemsize: os.ftruncate(f.fileno(), size * dtype.itemsize)
|
||||
buf = [f, mmap.mmap(f.fileno(), size * dtype.itemsize), 1]
|
||||
if device.startswith("shm:"):
|
||||
if OSX:
|
||||
with open(f"/tmp/shm_{device[4:]}", "w+b") as f:
|
||||
f.truncate(size * dtype.itemsize)
|
||||
shm = mmap.mmap(f.fileno(), size * dtype.itemsize, flags=mmap.MAP_SHARED)
|
||||
else:
|
||||
fd = _posixshmem.shm_open(device[4:], os.O_RDWR, 0o600)
|
||||
# TODO: these flags are somewhat platform specific, but python doesn't expose the ones we need
|
||||
shm = mmap.mmap(fd, size * dtype.itemsize, flags=mmap.MAP_SHARED | 0x2000 | 0x008000)
|
||||
shm.madvise(mmap.MADV_HUGEPAGE) # type: ignore # not on OSX
|
||||
os.close(fd)
|
||||
buf = [None, shm, 1]
|
||||
else:
|
||||
f = open(device, "a+b")
|
||||
if os.path.getsize(device) < size * dtype.itemsize: os.ftruncate(f.fileno(), size * dtype.itemsize)
|
||||
buf = [f, mmap.mmap(f.fileno(), size * dtype.itemsize), 1]
|
||||
else:
|
||||
buf[2] += 1
|
||||
# NOTE: we don't call super since disk tensors don't use RAM
|
||||
self.size, self.dtype, self._buf = size, dtype, buf
|
||||
def __del__(self):
|
||||
self._buf[2] -= 1
|
||||
if self._buf[2] == 0: self._buf[0].close()
|
||||
if self._buf[2] == 0 and self._buf[0] is not None: self._buf[0].close()
|
||||
def cast(self, arg:Tuple[DType, bool]): return RawDiskBuffer(self.size, arg[0], buf=self._buf, shape=self.shape, offset=self.offset)
|
||||
def as_strided(self, arg):
|
||||
assert strides_for_shape(arg[0]) == arg[1], "disk tensors don't support strides"
|
||||
return RawDiskBuffer(prod(arg[0]), self.dtype, buf=self._buf, offset=self.offset+arg[2]*self.dtype.itemsize, shape=arg[0])
|
||||
|
||||
def _buffer(self): return memoryview(self._buf[1])[self.offset:self.offset+self.size*self.dtype.itemsize]
|
||||
def readinto(self, buf):
|
||||
self._buf[0].seek(self.offset)
|
||||
self._buf[0].readinto(buf)
|
||||
def readinto(self, buf:memoryview):
|
||||
if self._buf[0] is not None:
|
||||
self._buf[0].seek(self.offset)
|
||||
self._buf[0].readinto(buf)
|
||||
else:
|
||||
buf.cast('B')[:] = self._buffer()
|
||||
|
||||
disk_fxn_for_op: Dict[Op, Callable] = { BufferOps.MEM: lambda x: x, UnaryOps.NOOP: lambda x: x, UnaryOps.CAST: RawDiskBuffer.cast, MovementOps.AS_STRIDED: RawDiskBuffer.as_strided }
|
||||
DiskBuffer = Interpreted(RawDiskBuffer, disk_fxn_for_op)
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
import os, mmap
|
||||
try: import _posixshmem
|
||||
except Exception: pass
|
||||
from typing import Callable, Dict
|
||||
from tinygrad.helpers import DType, OSX
|
||||
from tinygrad.runtime.lib import RawBufferMapped
|
||||
from tinygrad.ops import Interpreted, Op, UnaryOps, MovementOps, BufferOps
|
||||
|
||||
class RawShmBuffer(RawBufferMapped):
|
||||
def __init__(self, size, dtype:DType, device:str):
|
||||
if OSX:
|
||||
with open(f"/tmp/shm_{device}", "w+b") as f:
|
||||
f.truncate(size * dtype.itemsize)
|
||||
shm = mmap.mmap(f.fileno(), size * dtype.itemsize, flags=mmap.MAP_SHARED)
|
||||
else:
|
||||
fd = _posixshmem.shm_open(device, os.O_RDWR, 0o600)
|
||||
# TODO: these flags are somewhat platform specific, but python doesn't expose the ones we need
|
||||
shm = mmap.mmap(fd, size * dtype.itemsize, flags=mmap.MAP_SHARED | 0x2000 | 0x008000)
|
||||
shm.madvise(mmap.MADV_HUGEPAGE) # type: ignore # not on OSX
|
||||
os.close(fd)
|
||||
|
||||
super().__init__(size, dtype, shm)
|
||||
def __del__(self): self._buf.close()
|
||||
def _buffer(self): return memoryview(self._buf)
|
||||
|
||||
# TODO: is this wrong?
|
||||
shm_fxn_for_op: Dict[Op, Callable] = { BufferOps.MEM: lambda x: x, UnaryOps.NOOP: lambda x:x, MovementOps.RESHAPE: lambda x,_:x, MovementOps.AS_STRIDED: lambda x,_:x }
|
||||
ShmBuffer = Interpreted(RawShmBuffer, shm_fxn_for_op)
|
Loading…
Reference in New Issue