494 lines
16 KiB
Python
494 lines
16 KiB
Python
from __future__ import annotations
|
|
from typing import Tuple, Optional, List
|
|
import ctypes, functools
|
|
import gpuctypes.opencl as cl
|
|
from tinygrad.helpers import (
|
|
init_c_var,
|
|
to_char_p_p,
|
|
from_mv,
|
|
diskcache,
|
|
OSX,
|
|
ImageDType,
|
|
DEBUG,
|
|
)
|
|
from tinygrad.codegen.kernel import LinearizerOptions
|
|
from tinygrad.renderer.cstyle import OpenCLRenderer
|
|
from tinygrad.device import Compiled, LRUAllocator
|
|
|
|
OSX_TIMING_RATIO = (
|
|
(125 / 3) if OSX else 1.0
|
|
) # see test/external/external_osx_profiling.py to determine this ratio. it's in like GPU clocks or something
|
|
|
|
|
|
def check(status):
|
|
"""Check the status of an OpenCL operation.
|
|
|
|
This function checks the status of an OpenCL operation and raises a RuntimeError if the status is not equal to zero,
|
|
indicating an error has occurred.
|
|
|
|
Args:
|
|
status (int): The status code returned by an OpenCL operation.
|
|
|
|
Raises:
|
|
RuntimeError: If the status is not equal to zero, indicating an error has occurred in the OpenCL operation.
|
|
"""
|
|
if status != 0:
|
|
raise RuntimeError(f"OpenCL Error {status}")
|
|
|
|
|
|
def checked(ret, status):
|
|
"""Check the status of an OpenCL operation and return the result.
|
|
|
|
This function checks the status of an OpenCL operation, raises a RuntimeError if the status is not equal to zero,
|
|
indicating an error has occurred, and returns the result of the operation.
|
|
|
|
Args:
|
|
ret (Any): The result of an OpenCL operation.
|
|
status (int): The status code returned by an OpenCL operation.
|
|
|
|
Returns:
|
|
Any: The result of the OpenCL operation if the status is zero, otherwise raises a RuntimeError.
|
|
|
|
Raises:
|
|
RuntimeError: If the status is not equal to zero, indicating an error has occurred in the OpenCL operation.
|
|
"""
|
|
return (check(status.value), ret)[1]
|
|
|
|
|
|
@diskcache
|
|
def compile_cl(prg: str) -> bytes:
|
|
"""
|
|
Compile an OpenCL program.
|
|
|
|
This function compiles an OpenCL program from a string source code. It asserts that the
|
|
CLDevice compiler context is not None, meaning that a device has been initialized before calling this function.
|
|
The function then creates a program object and builds it for the initialized device. If there's an error during
|
|
building, it retrieves and raises a RuntimeError with the build log. Finally, it retrieves the built binary,
|
|
releases the program object, and returns the binary as bytes.
|
|
|
|
:param prg: The OpenCL source code string to compile.
|
|
:type prg: str
|
|
:return: The compiled binary in bytes.
|
|
:rtype: bytes
|
|
"""
|
|
assert (
|
|
CLDevice.compiler_context is not None
|
|
), 'OpenCL requires a "compiler_context" to compile, init a device before you call this'
|
|
program = checked(
|
|
cl.clCreateProgramWithSource(
|
|
CLDevice.compiler_context.context,
|
|
1,
|
|
to_char_p_p([prg_bytes := prg.encode()]),
|
|
ctypes.byref(ctypes.c_size_t(len(prg_bytes))),
|
|
ctypes.byref(status := ctypes.c_int32()),
|
|
),
|
|
status,
|
|
)
|
|
status = cl.clBuildProgram(
|
|
program,
|
|
1,
|
|
ctypes.byref(CLDevice.compiler_context.device_id),
|
|
None,
|
|
cl.clBuildProgram.argtypes[4](),
|
|
None,
|
|
)
|
|
if status != 0:
|
|
cl.clGetProgramBuildInfo(
|
|
program,
|
|
CLDevice.compiler_context.device_id,
|
|
cl.CL_PROGRAM_BUILD_LOG,
|
|
0,
|
|
None,
|
|
ctypes.byref(log_size := ctypes.c_size_t()),
|
|
)
|
|
cl.clGetProgramBuildInfo(
|
|
program,
|
|
CLDevice.compiler_context.device_id,
|
|
cl.CL_PROGRAM_BUILD_LOG,
|
|
log_size.value,
|
|
mstr := ctypes.create_string_buffer(log_size.value),
|
|
None,
|
|
)
|
|
raise RuntimeError(
|
|
f"OpenCL Compile Error\n\n{ctypes.string_at(mstr, size=log_size.value).decode()}"
|
|
)
|
|
binary_sizes = init_c_var(
|
|
(ctypes.c_size_t * 1)(),
|
|
lambda x: check(
|
|
cl.clGetProgramInfo(
|
|
program,
|
|
cl.CL_PROGRAM_BINARY_SIZES,
|
|
ctypes.sizeof(x),
|
|
ctypes.byref(x),
|
|
None,
|
|
)
|
|
),
|
|
)
|
|
binary = init_c_var(
|
|
ctypes.create_string_buffer(binary_sizes[0]),
|
|
lambda x: check(
|
|
cl.clGetProgramInfo(
|
|
program,
|
|
cl.CL_PROGRAM_BINARIES,
|
|
ctypes.sizeof(ctypes.c_void_p),
|
|
ctypes.byref((ctypes.c_void_p * 1)(ctypes.addressof(x))),
|
|
None,
|
|
)
|
|
),
|
|
)
|
|
check(cl.clReleaseProgram(program))
|
|
return bytes(binary)
|
|
|
|
|
|
class CLProgram:
|
|
"""
|
|
This class represents a Compute Program (CLProgram) in OpenCL. It is initialized with a device, name and binary code.
|
|
|
|
Attributes:
|
|
device (CLDevice): The device where the program will be executed.
|
|
name (str): The name of the kernel function to be executed.
|
|
lib (bytes): The compiled binary code for the OpenCL program.
|
|
|
|
Methods:
|
|
__init__(self, device: CLDevice, name: str, lib: bytes) -> None: Initializes the program with given device, name and binary code.
|
|
__del__(self) -> None: Releases the kernel and program when the object is destroyed.
|
|
__call__(self, *bufs: cl.cl_mem, global_size: Tuple[int, ...], local_size: Optional[Tuple[int, ...]] = None, vals: Tuple[int, ...] = (), wait=False) -> Optional[float]: Executes the kernel with given arguments and returns execution time if wait is True.
|
|
"""
|
|
|
|
def __init__(self, device: CLDevice, name: str, lib: bytes):
|
|
self.device, self.name, self.lib = device, name, lib
|
|
self.program = checked(
|
|
cl.clCreateProgramWithBinary(
|
|
device.context,
|
|
1,
|
|
ctypes.byref(device.device_id),
|
|
(ctypes.c_size_t * 1)(len(lib)),
|
|
to_char_p_p([lib], ctypes.c_ubyte),
|
|
ctypes.byref(binary_status := ctypes.c_int32()),
|
|
ctypes.byref(errcode_ret := ctypes.c_int32()),
|
|
),
|
|
errcode_ret,
|
|
)
|
|
check(binary_status.value)
|
|
check(
|
|
cl.clBuildProgram(
|
|
self.program,
|
|
1,
|
|
ctypes.byref(device.device_id),
|
|
None,
|
|
cl.clBuildProgram.argtypes[4](),
|
|
None,
|
|
)
|
|
) # NOTE: OSX requires this
|
|
self.kernel = checked(
|
|
cl.clCreateKernel(
|
|
self.program, name.encode(), ctypes.byref(status := ctypes.c_int32())
|
|
),
|
|
status,
|
|
)
|
|
|
|
def __del__(self):
|
|
check(cl.clReleaseKernel(self.kernel))
|
|
check(cl.clReleaseProgram(self.program))
|
|
|
|
def __call__(
|
|
self,
|
|
*bufs: cl.cl_mem,
|
|
global_size: Tuple[int, ...],
|
|
local_size: Optional[Tuple[int, ...]] = None,
|
|
vals: Tuple[int, ...] = (),
|
|
wait=False,
|
|
) -> Optional[float]:
|
|
for i, b in enumerate(bufs):
|
|
cl.clSetKernelArg(self.kernel, i, ctypes.sizeof(b), ctypes.byref(b))
|
|
for i, b in enumerate(vals, start=len(bufs)):
|
|
cl.clSetKernelArg(self.kernel, i, 4, ctypes.byref(ctypes.c_int32(b)))
|
|
if local_size is not None:
|
|
global_size = tuple(int(g * l) for g, l in zip(global_size, local_size))
|
|
event = cl.cl_event() if wait else None
|
|
check(
|
|
cl.clEnqueueNDRangeKernel(
|
|
self.device.queue,
|
|
self.kernel,
|
|
len(global_size),
|
|
None,
|
|
(ctypes.c_size_t * len(global_size))(*global_size),
|
|
(ctypes.c_size_t * len(local_size))(*local_size)
|
|
if local_size
|
|
else None,
|
|
0,
|
|
None,
|
|
event,
|
|
)
|
|
)
|
|
if wait:
|
|
check(cl.clWaitForEvents(1, ctypes.byref(event)))
|
|
start = init_c_var(
|
|
ctypes.c_ulong(),
|
|
lambda x: check(
|
|
cl.clGetEventProfilingInfo(
|
|
event,
|
|
cl.CL_PROFILING_COMMAND_START,
|
|
ctypes.sizeof(x),
|
|
ctypes.byref(x),
|
|
None,
|
|
)
|
|
),
|
|
)
|
|
end = init_c_var(
|
|
ctypes.c_ulong(),
|
|
lambda x: check(
|
|
cl.clGetEventProfilingInfo(
|
|
event,
|
|
cl.CL_PROFILING_COMMAND_END,
|
|
ctypes.sizeof(x),
|
|
ctypes.byref(x),
|
|
None,
|
|
)
|
|
),
|
|
)
|
|
return float(end.value - start.value) * OSX_TIMING_RATIO * 1e-9
|
|
return None
|
|
|
|
|
|
class CLAllocator(LRUAllocator):
|
|
"""
|
|
OpenCL memory allocator.
|
|
|
|
Attributes:
|
|
device (CLDevice): The device to use for memory allocation.
|
|
"""
|
|
|
|
def __init__(self, device: CLDevice):
|
|
"""
|
|
Initializes the allocator with a specific device.
|
|
|
|
Args:
|
|
device (CLDevice): The device to use for memory allocation.
|
|
"""
|
|
self.device = device
|
|
super().__init__()
|
|
|
|
def _alloc(self, size: int) -> cl.cl_mem:
|
|
"""
|
|
Allocates a buffer on the device.
|
|
|
|
Args:
|
|
size (int): The size of the buffer to allocate.
|
|
|
|
Returns:
|
|
cl.cl_mem: The allocated buffer.
|
|
"""
|
|
return checked(
|
|
cl.clCreateBuffer(
|
|
self.device.context,
|
|
cl.CL_MEM_READ_WRITE,
|
|
size,
|
|
None,
|
|
ctypes.byref(status := ctypes.c_int32()),
|
|
),
|
|
status,
|
|
)
|
|
|
|
def _free(self, buf: cl.cl_mem):
|
|
"""
|
|
Frees a buffer on the device.
|
|
|
|
Args:
|
|
buf (cl.cl_mem): The buffer to free.
|
|
"""
|
|
check(cl.clReleaseMemObject(buf))
|
|
|
|
def _cast_image(
|
|
self, buf: cl.cl_mem, dtype: ImageDType, row_pitch: int
|
|
) -> cl.cl_mem:
|
|
"""
|
|
Casts an image buffer to a different format.
|
|
|
|
Args:
|
|
buf (cl.cl_mem): The buffer to cast.
|
|
dtype (ImageDType): The desired data type of the output image.
|
|
row_pitch (int): The pitch in bytes of the rows in the image.
|
|
|
|
Returns:
|
|
cl.cl_mem: The casted image buffer.
|
|
"""
|
|
desc = cl.cl_image_desc(
|
|
image_type=cl.CL_MEM_OBJECT_IMAGE2D,
|
|
image_width=dtype.shape[1],
|
|
image_height=dtype.shape[0],
|
|
image_row_pitch=row_pitch,
|
|
)
|
|
desc._0.mem_object = buf
|
|
return checked(
|
|
cl.clCreateImage(
|
|
self.device.context,
|
|
cl.CL_MEM_READ_WRITE,
|
|
cl.cl_image_format(
|
|
cl.CL_RGBA, {2: cl.CL_HALF_FLOAT, 4: cl.CL_FLOAT}[dtype.itemsize]
|
|
),
|
|
desc,
|
|
None,
|
|
ctypes.byref(status := ctypes.c_int32()),
|
|
),
|
|
status,
|
|
)
|
|
|
|
def copyin(self, dest: cl.cl_mem, src: memoryview):
|
|
"""
|
|
Copies data from host memory to device memory.
|
|
|
|
Args:
|
|
dest (cl.cl_mem): The destination buffer on the device.
|
|
src (memoryview): The source data in host memory.
|
|
"""
|
|
check(
|
|
cl.clEnqueueWriteBuffer(
|
|
self.device.queue,
|
|
dest,
|
|
False,
|
|
0,
|
|
len(src) * src.itemsize,
|
|
from_mv(src),
|
|
0,
|
|
None,
|
|
None,
|
|
)
|
|
)
|
|
self.device.pending_copyin.append(
|
|
src
|
|
) # NOTE: these can't be freed until the GPU actually executes this command
|
|
|
|
def copyout(self, dest: memoryview, src: cl.cl_mem):
|
|
"""
|
|
Copies data from device memory to host memory.
|
|
|
|
Args:
|
|
dest (memoryview): The destination buffer in host memory.
|
|
src (cl.cl_mem): The source buffer on the device.
|
|
"""
|
|
check(
|
|
cl.clEnqueueReadBuffer(
|
|
self.device.queue,
|
|
src,
|
|
False,
|
|
0,
|
|
len(dest) * dest.itemsize,
|
|
from_mv(dest),
|
|
0,
|
|
None,
|
|
None,
|
|
)
|
|
)
|
|
self.device.synchronize()
|
|
|
|
|
|
class CLDevice(Compiled):
|
|
"""
|
|
This class represents an OpenCL device. It is responsible for managing the context, queue and synchronization of the device.
|
|
|
|
Attributes:
|
|
device_ids (List[cl.cl_device_id]): Global list of available device IDs, initialized once.
|
|
compiler_context (Optional[CLDevice]): The first created context used for compilation.
|
|
"""
|
|
|
|
device_ids = None # this is global and only initted once
|
|
compiler_context = None # this is the first created context. we make an assumption they are all the same for the compiler
|
|
|
|
def __init__(self, device: str = ""):
|
|
"""
|
|
Initializes a new instance of the CLDevice class.
|
|
|
|
Parameters:
|
|
device (str): The device string, default is "".
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
if CLDevice.device_ids is None:
|
|
num_platforms = init_c_var(
|
|
ctypes.c_uint32(),
|
|
lambda x: check(cl.clGetPlatformIDs(0, None, ctypes.byref(x))),
|
|
)
|
|
platform_ids = init_c_var(
|
|
(cl.cl_platform_id * num_platforms.value)(),
|
|
lambda x: check(cl.clGetPlatformIDs(num_platforms.value, x, None)),
|
|
)
|
|
for device_type in [cl.CL_DEVICE_TYPE_GPU, cl.CL_DEVICE_TYPE_DEFAULT]:
|
|
num_devices = ctypes.c_uint32()
|
|
err = cl.clGetDeviceIDs(
|
|
platform_ids[0], device_type, 0, None, ctypes.byref(num_devices)
|
|
)
|
|
if err == 0 and num_devices.value != 0:
|
|
break
|
|
if DEBUG >= 1:
|
|
print(
|
|
f"CLDevice: got {num_platforms.value} platforms and {num_devices.value} devices"
|
|
)
|
|
CLDevice.device_ids = init_c_var(
|
|
(cl.cl_device_id * num_devices.value)(),
|
|
lambda x: check(
|
|
cl.clGetDeviceIDs(
|
|
platform_ids[0], device_type, num_devices, x, None
|
|
)
|
|
),
|
|
)
|
|
|
|
self.device_id = CLDevice.device_ids[
|
|
0 if ":" not in device else int(device.split(":")[1])
|
|
]
|
|
self.context = checked(
|
|
cl.clCreateContext(
|
|
None,
|
|
1,
|
|
ctypes.byref(self.device_id),
|
|
cl.clCreateContext.argtypes[3](),
|
|
None,
|
|
ctypes.byref(status := ctypes.c_int32()),
|
|
),
|
|
status,
|
|
)
|
|
if CLDevice.compiler_context is None:
|
|
CLDevice.compiler_context = self
|
|
self.queue = checked(
|
|
cl.clCreateCommandQueue(
|
|
self.context,
|
|
self.device_id,
|
|
cl.CL_QUEUE_PROFILING_ENABLE,
|
|
ctypes.byref(status),
|
|
),
|
|
status,
|
|
)
|
|
self.pending_copyin: List[memoryview] = []
|
|
super().__init__(
|
|
CLAllocator(self),
|
|
LinearizerOptions(),
|
|
OpenCLRenderer,
|
|
compile_cl,
|
|
functools.partial(CLProgram, self),
|
|
)
|
|
|
|
def synchronize(self):
|
|
"""
|
|
Synchronize the queue and clear any pending copy operations.
|
|
|
|
This function ensures that all commands in the queue are executed
|
|
before proceeding with further operations. It is particularly useful
|
|
when you need to make sure all OpenCL operations have completed
|
|
before continuing with the rest of your program.
|
|
|
|
Attributes:
|
|
self (obj): The instance of the class calling the synchronize method.
|
|
|
|
Returns:
|
|
None
|
|
|
|
Raises:
|
|
Exception: If there is an issue with finishing the queue (`clFinish()` returns non-zero).
|
|
"""
|
|
check(cl.clFinish(self.queue))
|
|
self.pending_copyin.clear()
|
|
|
|
|
|
GPUDevice = CLDevice # for legacy reasons
|