212 lines
6.7 KiB
Python
212 lines
6.7 KiB
Python
import os, mmap
|
|
|
|
try:
|
|
import _posixshmem
|
|
except Exception:
|
|
pass
|
|
from typing import Callable, Dict, Tuple
|
|
from tinygrad.helpers import prod, DType, OSX, dtypes
|
|
from tinygrad.device import Interpreted, Allocator
|
|
from tinygrad.ops import Op, MovementOps, UnaryOps
|
|
from tinygrad.shape.view import strides_for_shape
|
|
|
|
|
|
class UnderlyingDiskBuffer:
|
|
"""
|
|
This class represents an underlying disk buffer. It is initialized with a file descriptor (fd) and memory (mem).
|
|
|
|
Attributes:
|
|
fd (file descriptor): The file descriptor of the disk buffer.
|
|
mem (memory): The in-memory data associated with the disk buffer.
|
|
"""
|
|
def __init__(self, fd, mem):
|
|
"""
|
|
Constructs all the necessary attributes for the underlying disk buffer.
|
|
|
|
Parameters:
|
|
fd (file descriptor): The file descriptor of the disk buffer.
|
|
mem (memory): The in-memory data associated with the disk buffer.
|
|
"""
|
|
self.fd, self.mem = fd, mem
|
|
|
|
def __del__(self):
|
|
"""
|
|
Closes the file descriptor if it exists when the object is destroyed.
|
|
"""
|
|
if self.fd:
|
|
self.fd.close()
|
|
|
|
|
|
class DiskBuffer:
|
|
"""
|
|
Class for handling disk buffer operations.
|
|
|
|
Attributes:
|
|
ud (UnderlyingDiskBuffer): Underlying disk buffer.
|
|
size (int): Size of the buffer.
|
|
dtype (DType): Data type of the buffer elements. Default is dtypes.uint8.
|
|
offset (int): Offset in the buffer. Default is 0.
|
|
"""
|
|
def __init__(
|
|
self, ud: UnderlyingDiskBuffer, size: int, dtype: DType = dtypes.uint8, offset=0
|
|
):
|
|
self.ud, self.size, self.dtype, self.offset = ud, size, dtype, offset
|
|
|
|
def __repr__(self):
|
|
"""
|
|
Return a string representation of the DiskBuffer object.
|
|
|
|
Returns:
|
|
str: String representation of the DiskBuffer object containing its size, data type and offset.
|
|
"""
|
|
return f"<DiskBuffer size={self.size} dtype={self.dtype} offset={self.offset}>"
|
|
|
|
def cast(self, arg: Tuple[DType, bool]):
|
|
"""
|
|
Cast the DiskBuffer to a new data type.
|
|
|
|
Args:
|
|
arg (Tuple[DType, bool]): A tuple containing the new data type and a boolean value.
|
|
|
|
Returns:
|
|
DiskBuffer: A new DiskBuffer with the casted data type.
|
|
"""
|
|
return DiskBuffer(self.ud, self.size, arg[0], offset=self.offset)
|
|
|
|
def as_strided(self, arg):
|
|
"""
|
|
Create a view of the original buffer with new dimensions and strides.
|
|
|
|
Args:
|
|
arg: A tuple containing the new shape, calculated strides and an offset.
|
|
|
|
Returns:
|
|
DiskBuffer: A new DiskBuffer with the reshaped data.
|
|
"""
|
|
assert strides_for_shape(arg[0]) == arg[1], "disk tensors don't support strides"
|
|
return DiskBuffer(
|
|
self.ud,
|
|
prod(arg[0]),
|
|
self.dtype,
|
|
offset=self.offset + arg[2] * self.dtype.itemsize,
|
|
)
|
|
|
|
def _buf(self) -> memoryview:
|
|
"""
|
|
Return a memory view of the buffer.
|
|
|
|
Returns:
|
|
memoryview: A memory view of the underlying buffer.
|
|
"""
|
|
return memoryview(self.ud.mem)[
|
|
self.offset : self.offset + self.size * self.dtype.itemsize
|
|
]
|
|
|
|
|
|
disk_fxn_for_op: Dict[Op, Callable] = {
|
|
UnaryOps.CAST: DiskBuffer.cast,
|
|
MovementOps.AS_STRIDED: DiskBuffer.as_strided,
|
|
}
|
|
|
|
MAP_LOCKED, MAP_POPULATE = 0x2000, 0x008000
|
|
|
|
|
|
class DiskAllocator(Allocator):
|
|
"""
|
|
DiskAllocator class for allocating disk space.
|
|
|
|
Attributes:
|
|
device (str): The device to be used for allocation.
|
|
"""
|
|
def __init__(self, device):
|
|
"""
|
|
Initializes the DiskAllocator object.
|
|
|
|
Parameters:
|
|
device (str): The device to use for disk allocation.
|
|
"""
|
|
self.device = device
|
|
|
|
def _alloc(self, size):
|
|
"""
|
|
Allocates memory from the device.
|
|
|
|
Parameters:
|
|
size (int): Size of the memory to allocate in bytes.
|
|
|
|
Returns:
|
|
DiskBuffer: A DiskBuffer object representing the allocated memory.
|
|
"""
|
|
if str(self.device).startswith("shm:"):
|
|
if OSX:
|
|
with open(f"/tmp/shm_{self.device[4:]}", "w+b") as f:
|
|
f.truncate(size)
|
|
shm = mmap.mmap(f.fileno(), size, flags=mmap.MAP_SHARED)
|
|
else:
|
|
fd = _posixshmem.shm_open(self.device[4:], os.O_RDWR, 0o600)
|
|
# TODO: these flags are somewhat platform specific, but python doesn't expose the ones we need
|
|
shm = mmap.mmap(
|
|
fd, size, flags=mmap.MAP_SHARED | MAP_LOCKED | MAP_POPULATE
|
|
)
|
|
shm.madvise(mmap.MADV_HUGEPAGE) # type: ignore # not on OSX
|
|
os.close(fd)
|
|
buf = UnderlyingDiskBuffer(None, shm)
|
|
else:
|
|
f = open(self.device, "a+b")
|
|
if os.path.getsize(self.device) < size:
|
|
os.ftruncate(f.fileno(), size)
|
|
buf = UnderlyingDiskBuffer(f, mmap.mmap(f.fileno(), size))
|
|
return DiskBuffer(buf, size)
|
|
|
|
def as_buffer(self, src: DiskBuffer):
|
|
"""
|
|
Returns the underlying buffer of a DiskBuffer object.
|
|
|
|
Parameters:
|
|
src (DiskBuffer): The DiskBuffer object to get the buffer from.
|
|
|
|
Returns:
|
|
UnderlyingDiskBuffer: The underlying buffer of the DiskBuffer object.
|
|
"""
|
|
return src._buf()
|
|
|
|
def copyin(self, dest: DiskBuffer, src: memoryview):
|
|
"""
|
|
Copies data from a memoryview object to a DiskBuffer object.
|
|
|
|
Parameters:
|
|
dest (DiskBuffer): The destination DiskBuffer object.
|
|
src (memoryview): The source memoryview object.
|
|
"""
|
|
dest._buf()[:] = src
|
|
|
|
def copyout(self, dest: memoryview, src: DiskBuffer):
|
|
"""
|
|
Copies data from a DiskBuffer object to a memoryview object.
|
|
|
|
Parameters:
|
|
dest (memoryview): The destination memoryview object.
|
|
src (DiskBuffer): The source DiskBuffer object.
|
|
"""
|
|
if src.ud.fd is not None:
|
|
src.ud.fd.seek(src.offset)
|
|
src.ud.fd.readinto(dest)
|
|
else:
|
|
dest[:] = src._buf()
|
|
|
|
|
|
class DiskDevice(Interpreted):
|
|
"""
|
|
Initialize a new DiskDevice object.
|
|
|
|
Parameters:
|
|
device (str): The device identifier for the disk to be allocated.
|
|
|
|
Attributes:
|
|
super (Interpreted): Inherit from the Interpreted class.
|
|
DiskAllocator (DiskAllocator): Initialize a new DiskAllocator object with the given device.
|
|
disk_fxn_for_op (function): The function to be used for performing operations on the disk.
|
|
"""
|
|
def __init__(self, device):
|
|
super().__init__(DiskAllocator(device[5:]), disk_fxn_for_op)
|