from tinygrad.nn import Conv2d, BatchNorm2d
from tinygrad.tensor import Tensor
import numpy as np
from itertools import chain
from pathlib import Path
import cv2
from collections import defaultdict
import time, sys
from tinygrad.helpers import fetch
from tinygrad.nn.state import safe_load, load_state_dict
# Model architecture from https://github.com/ultralytics/ultralytics/issues/189
# The upsampling class has been taken from this pull request https://github.com/tinygrad/tinygrad/pull/784 by dc-dc-dc. Now 2(?) models use upsampling. (retinet and this)
# Pre processing image functions.
def compute_transform(
image, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, stride=32
shape = image.shape[:2] # current shape [height, width]
new_shape = (new_shape, new_shape) if isinstance(new_shape, int) else new_shape
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
r = min(r, 1.0) if not scaleup else r
new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
dw, dh = (np.mod(dw, stride), np.mod(dh, stride)) if auto else (0.0, 0.0)
new_unpad = (new_shape[1], new_shape[0]) if scaleFill else new_unpad
dw /= 2
dh /= 2
image = (
cv2.resize(image, new_unpad, interpolation=cv2.INTER_LINEAR)
if shape[::-1] != new_unpad
else image
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
image = cv2.copyMakeBorder(
image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114)
return image
def preprocess(im, imgsz=640, model_stride=32, model_pt=True):
same_shapes = all(x.shape == im[0].shape for x in im)
auto = same_shapes and model_pt
im = Tensor(
compute_transform(x, new_shape=imgsz, auto=auto, stride=model_stride)
for x in im
im = Tensor.stack(im) if im.shape[0] > 1 else im
im = im[..., ::-1].permute(0, 3, 1, 2) # BGR to RGB, BHWC to BCHW, (n, 3, h, w)
im /= 255 # 0 - 255 to 0.0 - 1.0
return im
# Post Processing functions
def box_area(box):
return (box[:, 2] - box[:, 0]) * (box[:, 3] - box[:, 1])
def box_iou(box1, box2):
lt = np.maximum(box1[:, None, :2], box2[:, :2])
rb = np.minimum(box1[:, None, 2:], box2[:, 2:])
wh = np.clip(rb - lt, 0, None)
inter = wh[:, :, 0] * wh[:, :, 1]
area1 = box_area(box1)[:, None]
area2 = box_area(box2)[None, :]
iou = inter / (area1 + area2 - inter)
return iou
def compute_nms(boxes, scores, iou_threshold):
order, keep = scores.argsort()[::-1], []
while order.size > 0:
i = order[0]
if order.size == 1:
iou = box_iou(boxes[i][None, :], boxes[order[1:]])
inds = np.where(iou.squeeze() <= iou_threshold)[0]
order = order[inds + 1]
return np.array(keep)
def non_max_suppression(
prediction = prediction[0] if isinstance(prediction, (list, tuple)) else prediction
bs, nc = prediction.shape[0], nc or (prediction.shape[1] - 4)
xc = np.amax(prediction[:, 4 : 4 + nc], axis=1) > conf_thres
nm = prediction.shape[1] - nc - 4
output = [np.zeros((0, 6 + nm))] * bs
for xi, x in enumerate(prediction):
x = x.swapaxes(0, -1)[xc[xi]]
if not x.shape[0]:
box, cls, mask = np.split(x, [4, 4 + nc], axis=1)
conf, j = np.max(cls, axis=1, keepdims=True), np.argmax(
cls, axis=1, keepdims=True
x = np.concatenate((xywh2xyxy(box), conf, j.astype(np.float32), mask), axis=1)
x = x[conf.ravel() > conf_thres]
if not x.shape[0]:
x = x[np.argsort(-x[:, 4])]
c = x[:, 5:6] * (0 if agnostic else max_wh)
boxes, scores = x[:, :4] + c, x[:, 4]
i = compute_nms(boxes, scores, iou_thres)[:max_det]
output[xi] = x[i]
return output
def postprocess(preds, img, orig_imgs):
print("copying to CPU now for post processing")
# if you are on CPU, this causes an overflow runtime error. doesn't "seem" to make any difference in the predictions though.
# TODO: make non_max_suppression in tinygrad - to make this faster
preds = preds.numpy() if isinstance(preds, Tensor) else preds
preds = non_max_suppression(
prediction=preds, conf_thres=0.25, iou_thres=0.7, agnostic=False, max_det=300
all_preds = []
for i, pred in enumerate(preds):
orig_img = orig_imgs[i] if isinstance(orig_imgs, list) else orig_imgs
if not isinstance(orig_imgs, Tensor):
pred[:, :4] = scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
return all_preds
def draw_bounding_boxes_and_save(
orig_img_paths, output_img_paths, all_predictions, class_labels, iou_threshold=0.5
color_dict = {
label: tuple(
(((i + 1) * 50) % 256, ((i + 1) * 100) % 256, ((i + 1) * 150) % 256)
for i, label in enumerate(class_labels)
def is_bright_color(color):
r, g, b = color
brightness = (r * 299 + g * 587 + b * 114) / 1000
return brightness > 127
for img_idx, (orig_img_path, output_img_path, predictions) in enumerate(
zip(orig_img_paths, output_img_paths, all_predictions)
predictions = np.array(predictions)
orig_img = (
if not isinstance(orig_img_path, np.ndarray)
else cv2.imdecode(orig_img_path, 1)
height, width, _ = orig_img.shape
box_thickness = int((height + width) / 400)
font_scale = (height + width) / 2500
grouped_preds = defaultdict(list)
object_count = defaultdict(int)
for pred_np in predictions:
def draw_box_and_label(pred, color):
x1, y1, x2, y2, conf, _ = pred
x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
cv2.rectangle(orig_img, (x1, y1), (x2, y2), color, box_thickness)
label = f"{class_labels[class_id]} {conf:.2f}"
text_size, _ = cv2.getTextSize(label, font, font_scale, 1)
label_y, bg_y = (
(y1 - 4, y1 - text_size[1] - 4)
if y1 - text_size[1] - 4 > 0
else (y1 + text_size[1], y1)
(x1, bg_y),
(x1 + text_size[0], bg_y + text_size[1]),
font_color = (0, 0, 0) if is_bright_color(color) else (255, 255, 255)
(x1, label_y),
for class_id, pred_list in grouped_preds.items():
pred_list = np.array(pred_list)
while len(pred_list) > 0:
max_conf_idx = np.argmax(pred_list[:, 4])
max_conf_pred = pred_list[max_conf_idx]
pred_list = np.delete(pred_list, max_conf_idx, axis=0)
color = color_dict[class_labels[class_id]]
draw_box_and_label(max_conf_pred, color)
object_count[class_labels[class_id]] += 1
iou_scores = box_iou(np.array([max_conf_pred[:4]]), pred_list[:, :4])
low_iou_indices = np.where(iou_scores[0] < iou_threshold)[0]
pred_list = pred_list[low_iou_indices]
for low_conf_pred in pred_list:
draw_box_and_label(low_conf_pred, color)
print(f"Image {img_idx + 1}:")
print("Objects detected:")
for obj, count in object_count.items():
print(f"- {obj}: {count}")
cv2.imwrite(output_img_path, orig_img)
print(f"saved detections at {output_img_path}")
# utility functions for forward pass.
def dist2bbox(distance, anchor_points, xywh=True, dim=-1):
lt, rb = distance.chunk(2, dim)
x1y1 = anchor_points - lt
x2y2 = anchor_points + rb
if xywh:
c_xy = (x1y1 + x2y2) / 2
wh = x2y2 - x1y1
return c_xy.cat(wh, dim=1)
return x1y1.cat(x2y2, dim=1)
def make_anchors(feats, strides, grid_cell_offset=0.5):
anchor_points, stride_tensor = [], []
assert feats is not None
for i, stride in enumerate(strides):
_, _, h, w = feats[i].shape
sx = Tensor.arange(w) + grid_cell_offset
sy = Tensor.arange(h) + grid_cell_offset
# this is np.meshgrid but in tinygrad
sx = sx.reshape(1, -1).repeat([h, 1]).reshape(-1)
sy = sy.reshape(-1, 1).repeat([1, w]).reshape(-1)
anchor_points.append(Tensor.stack((sx, sy), -1).reshape(-1, 2))
stride_tensor.append(Tensor.full((h * w), stride))
anchor_points = anchor_points[0].cat(anchor_points[1], anchor_points[2])
stride_tensor = (
stride_tensor[0].cat(stride_tensor[1], stride_tensor[2]).unsqueeze(1)
return anchor_points, stride_tensor
# this function is from the original implementation
def autopad(k, p=None, d=1): # kernel, padding, dilation
if d > 1:
k = (
d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k]
) # actual kernel-size
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
def clip_boxes(boxes, shape):
boxes[..., [0, 2]] = np.clip(boxes[..., [0, 2]], 0, shape[1]) # x1, x2
boxes[..., [1, 3]] = np.clip(boxes[..., [1, 3]], 0, shape[0]) # y1, y2
return boxes
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
gain = (
if ratio_pad
else min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])
pad = (
(img1_shape[1] - img0_shape[1] * gain) / 2,
(img1_shape[0] - img0_shape[0] * gain) / 2,
boxes_np = boxes.numpy() if isinstance(boxes, Tensor) else boxes
boxes_np[..., [0, 2]] -= pad[0]
boxes_np[..., [1, 3]] -= pad[1]
boxes_np[..., :4] /= gain
boxes_np = clip_boxes(boxes_np, img0_shape)
return boxes_np
def xywh2xyxy(x):
xy = x[..., :2] # center x, y
wh = x[..., 2:4] # width, height
xy1 = xy - wh / 2 # top left x, y
xy2 = xy + wh / 2 # bottom right x, y
result = np.concatenate((xy1, xy2), axis=-1)
return Tensor(result) if isinstance(x, Tensor) else result
def get_variant_multiples(variant):
return {
"n": (0.33, 0.25, 2.0),
"s": (0.33, 0.50, 2.0),
"m": (0.67, 0.75, 1.5),
"l": (1.0, 1.0, 1.0),
"x": (1, 1.25, 1.0),
}.get(variant, None)
def label_predictions(all_predictions):
class_index_count = defaultdict(int)
for predictions in all_predictions:
predictions = np.array(predictions)
for pred_np in predictions:
class_id = int(pred_np[-1])
class_index_count[class_id] += 1
return dict(class_index_count)
# this is taken from https://github.com/tinygrad/tinygrad/pull/784/files by dc-dc-dc (Now 2 models use upsampling)
class Upsample:
def __init__(self, scale_factor: int, mode: str = "nearest") -> None:
assert mode == "nearest" # only mode supported for now
self.mode = mode
self.scale_factor = scale_factor
def __call__(self, x: Tensor) -> Tensor:
assert len(x.shape) > 2 and len(x.shape) <= 5
(b, c), _lens = x.shape[:2], len(x.shape[2:])
tmp = x.reshape([b, c, -1] + [1] * _lens) * Tensor.ones(
*[1, 1, 1] + [self.scale_factor] * _lens
return (
tmp.reshape(list(x.shape) + [self.scale_factor] * _lens)
[0, 1]
+ list(
chain.from_iterable([[y + 2, y + 2 + _lens] for y in range(_lens)])
.reshape([b, c] + [x * self.scale_factor for x in x.shape[2:]])
class Conv_Block:
def __init__(
self, c1, c2, kernel_size=1, stride=1, groups=1, dilation=1, padding=None
self.conv = Conv2d(
padding=autopad(kernel_size, padding, dilation),
self.bn = BatchNorm2d(c2, eps=0.001)
def __call__(self, x):
return self.bn(self.conv(x)).silu()
class Bottleneck:
def __init__(
self, c1, c2, shortcut: bool, g=1, kernels: list = (3, 3), channel_factor=0.5
c_ = int(c2 * channel_factor)
self.cv1 = Conv_Block(c1, c_, kernel_size=kernels[0], stride=1, padding=None)
self.cv2 = Conv_Block(
c_, c2, kernel_size=kernels[1], stride=1, padding=None, groups=g
self.residual = c1 == c2 and shortcut
def __call__(self, x):
return x + self.cv2(self.cv1(x)) if self.residual else self.cv2(self.cv1(x))
class C2f:
def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5):
self.c = int(c2 * e)
self.cv1 = Conv_Block(
2 * self.c,
self.cv2 = Conv_Block((2 + n) * self.c, c2, 1)
self.bottleneck = [
kernels=[(3, 3), (3, 3)],
for _ in range(n)
def __call__(self, x):
y = list(self.cv1(x).chunk(2, 1))
y.extend(m(y[-1]) for m in self.bottleneck)
z = y[0]
for i in y[1:]:
z = z.cat(i, dim=1)
return self.cv2(z)
class SPPF:
def __init__(self, c1, c2, k=5):
c_ = c1 // 2 # hidden channels
self.cv1 = Conv_Block(c1, c_, 1, 1, padding=None)
self.cv2 = Conv_Block(c_ * 4, c2, 1, 1, padding=None)
# TODO: this pads with 0s, whereas torch function pads with -infinity. This results in a < 2% difference in prediction which does not make a difference visually.
self.maxpool = lambda x: x.pad2d((k // 2, k // 2, k // 2, k // 2)).max_pool2d(
kernel_size=k, stride=1
def __call__(self, x):
x = self.cv1(x)
x2 = self.maxpool(x)
x3 = self.maxpool(x2)
x4 = self.maxpool(x3)
return self.cv2(x.cat(x2, x3, x4, dim=1))
class DFL:
def __init__(self, c1=16):
self.conv = Conv2d(c1, 1, 1, bias=False)
x = Tensor.arange(c1)
self.conv.weight.assign(x.reshape(1, c1, 1, 1))
self.c1 = c1
def __call__(self, x):
b, c, a = x.shape # batch, channels, anchors
return self.conv(
x.reshape(b, 4, self.c1, a).transpose(2, 1).softmax(1)
).reshape(b, 4, a)
# backbone
class Darknet:
def __init__(self, w, r, d):
self.b1 = [
Conv_Block(c1=3, c2=int(64 * w), kernel_size=3, stride=2, padding=1),
Conv_Block(int(64 * w), int(128 * w), kernel_size=3, stride=2, padding=1),
self.b2 = [
C2f(c1=int(128 * w), c2=int(128 * w), n=round(3 * d), shortcut=True),
Conv_Block(int(128 * w), int(256 * w), 3, 2, 1),
C2f(int(256 * w), int(256 * w), round(6 * d), True),
self.b3 = [
Conv_Block(int(256 * w), int(512 * w), kernel_size=3, stride=2, padding=1),
C2f(int(512 * w), int(512 * w), round(6 * d), True),
self.b4 = [
int(512 * w), int(512 * w * r), kernel_size=3, stride=2, padding=1
C2f(int(512 * w * r), int(512 * w * r), round(3 * d), True),
self.b5 = [SPPF(int(512 * w * r), int(512 * w * r), 5)]
def return_modules(self):
return [*self.b1, *self.b2, *self.b3, *self.b4, *self.b5]
def __call__(self, x):
x1 = x.sequential(self.b1)
x2 = x1.sequential(self.b2)
x3 = x2.sequential(self.b3)
x4 = x3.sequential(self.b4)
x5 = x4.sequential(self.b5)
return (x2, x3, x5)
# yolo fpn (neck)
class Yolov8NECK:
def __init__(self, w, r, d): # width_multiple, ratio_multiple, depth_multiple
self.up = Upsample(2, mode="nearest")
self.n1 = C2f(
c1=int(512 * w * (1 + r)), c2=int(512 * w), n=round(3 * d), shortcut=False
self.n2 = C2f(c1=int(768 * w), c2=int(256 * w), n=round(3 * d), shortcut=False)
self.n3 = Conv_Block(
c1=int(256 * w), c2=int(256 * w), kernel_size=3, stride=2, padding=1
self.n4 = C2f(c1=int(768 * w), c2=int(512 * w), n=round(3 * d), shortcut=False)
self.n5 = Conv_Block(
c1=int(512 * w), c2=int(512 * w), kernel_size=3, stride=2, padding=1
self.n6 = C2f(
c1=int(512 * w * (1 + r)),
c2=int(512 * w * r),
n=round(3 * d),
def return_modules(self):
return [self.n1, self.n2, self.n3, self.n4, self.n5, self.n6]
def __call__(self, p3, p4, p5):
x = self.n1(self.up(p5).cat(p4, dim=1))
head_1 = self.n2(self.up(x).cat(p3, dim=1))
head_2 = self.n4(self.n3(head_1).cat(x, dim=1))
head_3 = self.n6(self.n5(head_2).cat(p5, dim=1))
return [head_1, head_2, head_3]
# task specific head.
class DetectionHead:
def __init__(self, nc=80, filters=()):
self.ch = 16
self.nc = nc # number of classes
self.nl = len(filters)
self.no = nc + self.ch * 4 #
self.stride = [8, 16, 32]
c1 = max(filters[0], self.nc)
c2 = max((filters[0] // 4, self.ch * 4))
self.dfl = DFL(self.ch)
self.cv3 = [
[Conv_Block(x, c1, 3), Conv_Block(c1, c1, 3), Conv2d(c1, self.nc, 1)]
for x in filters
self.cv2 = [
[Conv_Block(x, c2, 3), Conv_Block(c2, c2, 3), Conv2d(c2, 4 * self.ch, 1)]
for x in filters
def __call__(self, x):
for i in range(self.nl):
x[i] = x[i].sequential(self.cv2[i]).cat(x[i].sequential(self.cv3[i]), dim=1)
self.anchors, self.strides = (
x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5)
y = [(i.reshape(x[0].shape[0], self.no, -1)) for i in x]
x_cat = y[0].cat(y[1], y[2], dim=2)
box, cls = x_cat[:, : self.ch * 4], x_cat[:, self.ch * 4 :]
dbox = (
dist2bbox(self.dfl(box), self.anchors.unsqueeze(0), xywh=True, dim=1)
* self.strides
z = dbox.cat(cls.sigmoid(), dim=1)
return z
class YOLOv8:
def __init__(
self, w, r, d, num_classes
): # width_multiple, ratio_multiple, depth_multiple
self.net = Darknet(w, r, d)
self.fpn = Yolov8NECK(w, r, d)
self.head = DetectionHead(
num_classes, filters=(int(256 * w), int(512 * w), int(512 * w * r))
def __call__(self, x):
x = self.net(x)
x = self.fpn(*x)
return self.head(x)
def return_all_trainable_modules(self):
backbone_modules = [*range(10)]
yolov8neck_modules = [12, 15, 16, 18, 19, 21]
yolov8_head_weights = [(22, self.head)]
return [
*zip(backbone_modules, self.net.return_modules()),
*zip(yolov8neck_modules, self.fpn.return_modules()),
if __name__ == "__main__":
# usage : python3 yolov8.py "image_URL OR image_path" "v8 variant" (optional, n is default)
if len(sys.argv) < 2:
print("Error: Image URL or path not provided.")
img_path = sys.argv[1]
yolo_variant = (
if len(sys.argv) >= 3
else (
"No variant given, so choosing 'n' as the default. Yolov8 has different variants, you can choose from ['n', 's', 'm', 'l', 'x']"
or "n"
print(f"running inference for YOLO version {yolo_variant}")
output_folder_path = Path("./outputs_yolov8")
output_folder_path.mkdir(parents=True, exist_ok=True)
# absolute image path or URL
image_location = [np.frombuffer(fetch(img_path).read_bytes(), np.uint8)]
image = [cv2.imdecode(image_location[0], 1)]
out_paths = [
output_folder_path / f"{Path(img_path).stem}_output{Path(img_path).suffix}"
if not isinstance(image[0], np.ndarray):
print("Error in image loading. Check your image file.")
pre_processed_image = preprocess(image)
# Different YOLOv8 variants use different w , r, and d multiples. For a list , refer to this yaml file (the scales section) https://github.com/ultralytics/ultralytics/blob/main/ultralytics/models/v8/yolov8.yaml
depth, width, ratio = get_variant_multiples(yolo_variant)
yolo_infer = YOLOv8(w=width, r=ratio, d=depth, num_classes=80)
state_dict = safe_load(
load_state_dict(yolo_infer, state_dict)
st = time.time()
predictions = yolo_infer(pre_processed_image)
print(f"did inference in {int(round(((time.time() - st) * 1000)))}ms")
post_predictions = postprocess(
preds=predictions, img=pre_processed_image, orig_imgs=image
# v8 and v3 have same 80 class names for Object Detection
class_labels = (
# TODO for later:
# 1. Fix SPPF minor difference due to maxpool
# 2. AST exp overflow warning while on cpu
# 3. Make NMS faster
# 4. Add video inference and webcam support