472 lines
20 KiB
Python
Executable File
472 lines
20 KiB
Python
Executable File
# https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
|
|
import sys
|
|
import io
|
|
import time
|
|
import math
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
from tinygrad.tensor import Tensor
|
|
from tinygrad.nn import BatchNorm2d, Conv2d
|
|
from tinygrad.helpers import fetch
|
|
|
|
|
|
def show_labels(prediction, confidence=0.5, num_classes=80):
|
|
coco_labels = fetch(
|
|
"https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names"
|
|
).read_bytes()
|
|
coco_labels = coco_labels.decode("utf-8").split("\n")
|
|
prediction = prediction.detach().numpy()
|
|
conf_mask = prediction[:, :, 4] > confidence
|
|
prediction *= np.expand_dims(conf_mask, 2)
|
|
labels = []
|
|
# Iterate over batches
|
|
for img_pred in prediction:
|
|
max_conf = np.amax(img_pred[:, 5 : 5 + num_classes], axis=1)
|
|
max_conf_score = np.argmax(img_pred[:, 5 : 5 + num_classes], axis=1)
|
|
max_conf_score = np.expand_dims(max_conf_score, axis=1)
|
|
max_conf = np.expand_dims(max_conf, axis=1)
|
|
seq = (img_pred[:, :5], max_conf, max_conf_score)
|
|
image_pred = np.concatenate(seq, axis=1)
|
|
non_zero_ind = np.nonzero(image_pred[:, 4])[0]
|
|
assert all(image_pred[non_zero_ind, 0] > 0)
|
|
image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind), :], (-1, 7))
|
|
classes, indexes = np.unique(image_pred_[:, -1], return_index=True)
|
|
for index, coco_class in enumerate(classes):
|
|
label, probability = (
|
|
coco_labels[int(coco_class)],
|
|
image_pred_[indexes[index]][4] * 100,
|
|
)
|
|
print(f"Detected {label} {probability:.2f}")
|
|
labels.append(label)
|
|
return labels
|
|
|
|
|
|
def add_boxes(img, prediction):
|
|
if isinstance(prediction, int): # no predictions
|
|
return img
|
|
coco_labels = fetch(
|
|
"https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names"
|
|
)
|
|
coco_labels = coco_labels.decode("utf-8").split("\n")
|
|
height, width = img.shape[0:2]
|
|
scale_factor = 608 / width
|
|
prediction[:, [1, 3]] -= (608 - scale_factor * width) / 2
|
|
prediction[:, [2, 4]] -= (608 - scale_factor * height) / 2
|
|
for pred in prediction:
|
|
corner1 = tuple(pred[1:3].astype(int))
|
|
corner2 = tuple(pred[3:5].astype(int))
|
|
w = corner2[0] - corner1[0]
|
|
h = corner2[1] - corner1[1]
|
|
corner2 = (corner2[0] + w, corner2[1] + h)
|
|
label = coco_labels[int(pred[-1])]
|
|
img = cv2.rectangle(img, corner1, corner2, (255, 0, 0), 2)
|
|
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1, 1)[0]
|
|
c2 = corner1[0] + t_size[0] + 3, corner1[1] + t_size[1] + 4
|
|
img = cv2.rectangle(img, corner1, c2, (255, 0, 0), -1)
|
|
img = cv2.putText(
|
|
img,
|
|
label,
|
|
(corner1[0], corner1[1] + t_size[1] + 4),
|
|
cv2.FONT_HERSHEY_PLAIN,
|
|
1,
|
|
[225, 255, 255],
|
|
1,
|
|
)
|
|
return img
|
|
|
|
|
|
def bbox_iou(box1, box2):
|
|
"""
|
|
Returns the IoU of two bounding boxes
|
|
IoU: IoU = Area Of Overlap / Area of Union -> How close the predicted bounding box is
|
|
to the ground truth bounding box. Higher IoU = Better accuracy
|
|
In training, used to track accuracy. with inference, using to remove duplicate bounding boxes
|
|
"""
|
|
# Get the coordinates of bounding boxes
|
|
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:, 0], box1[:, 1], box1[:, 2], box1[:, 3]
|
|
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:, 0], box2[:, 1], box2[:, 2], box2[:, 3]
|
|
# get the coordinates of the intersection rectangle
|
|
inter_rect_x1 = np.maximum(b1_x1, b2_x1)
|
|
inter_rect_y1 = np.maximum(b1_y1, b2_y1)
|
|
inter_rect_x2 = np.maximum(b1_x2, b2_x2)
|
|
inter_rect_y2 = np.maximum(b1_y2, b2_y2)
|
|
# Intersection area
|
|
inter_area = np.clip(inter_rect_x2 - inter_rect_x1 + 1, 0, 99999) * np.clip(
|
|
inter_rect_y2 - inter_rect_y1 + 1, 0, 99999
|
|
)
|
|
# Union Area
|
|
b1_area = (b1_x2 - b1_x1 + 1) * (b1_y2 - b1_y1 + 1)
|
|
b2_area = (b2_x2 - b2_x1 + 1) * (b2_y2 - b2_y1 + 1)
|
|
iou = inter_area / (b1_area + b2_area - inter_area)
|
|
return iou
|
|
|
|
|
|
def process_results(prediction, confidence=0.9, num_classes=80, nms_conf=0.4):
|
|
prediction = prediction.detach().numpy()
|
|
conf_mask = prediction[:, :, 4] > confidence
|
|
conf_mask = np.expand_dims(conf_mask, 2)
|
|
prediction = prediction * conf_mask
|
|
# Non max suppression
|
|
box_corner = prediction
|
|
box_corner[:, :, 0] = prediction[:, :, 0] - prediction[:, :, 2] / 2
|
|
box_corner[:, :, 1] = prediction[:, :, 1] - prediction[:, :, 3] / 2
|
|
box_corner[:, :, 2] = prediction[:, :, 0] + prediction[:, :, 2] / 2
|
|
box_corner[:, :, 3] = prediction[:, :, 1] + prediction[:, :, 3] / 2
|
|
prediction[:, :, :4] = box_corner[:, :, :4]
|
|
write = False
|
|
# Process img
|
|
img_pred = prediction[0]
|
|
max_conf = np.amax(img_pred[:, 5 : 5 + num_classes], axis=1)
|
|
max_conf_score = np.argmax(img_pred[:, 5 : 5 + num_classes], axis=1)
|
|
max_conf_score = np.expand_dims(max_conf_score, axis=1)
|
|
max_conf = np.expand_dims(max_conf, axis=1)
|
|
seq = (img_pred[:, :5], max_conf, max_conf_score)
|
|
image_pred = np.concatenate(seq, axis=1)
|
|
non_zero_ind = np.nonzero(image_pred[:, 4])[0]
|
|
assert all(image_pred[non_zero_ind, 0] > 0)
|
|
image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind), :], (-1, 7))
|
|
if image_pred_.shape[0] == 0:
|
|
print("No detections found!")
|
|
return 0
|
|
for cls in np.unique(image_pred_[:, -1]):
|
|
# perform NMS, get the detections with one particular class
|
|
cls_mask = image_pred_ * np.expand_dims(image_pred_[:, -1] == cls, axis=1)
|
|
class_mask_ind = np.squeeze(np.nonzero(cls_mask[:, -2]))
|
|
# class_mask_ind = np.nonzero()
|
|
image_pred_class = np.reshape(image_pred_[class_mask_ind], (-1, 7))
|
|
# sort the detections such that the entry with the maximum objectness
|
|
# confidence is at the top
|
|
conf_sort_index = np.argsort(image_pred_class[:, 4])
|
|
image_pred_class = image_pred_class[conf_sort_index]
|
|
for i in range(image_pred_class.shape[0]):
|
|
# Get the IOUs of all boxes that come after the one we are looking at in the loop
|
|
try:
|
|
ious = bbox_iou(
|
|
np.expand_dims(image_pred_class[i], axis=0),
|
|
image_pred_class[i + 1 :],
|
|
)
|
|
except:
|
|
break
|
|
# Zero out all the detections that have IoU > threshold
|
|
iou_mask = np.expand_dims((ious < nms_conf), axis=1)
|
|
image_pred_class[i + 1 :] *= iou_mask
|
|
# Remove the non-zero entries
|
|
non_zero_ind = np.squeeze(np.nonzero(image_pred_class[:, 4]))
|
|
image_pred_class = np.reshape(image_pred_class[non_zero_ind], (-1, 7))
|
|
batch_ind = np.array([[0]])
|
|
seq = (batch_ind, image_pred_class)
|
|
if not write:
|
|
output, write = np.concatenate(seq, axis=1), True
|
|
else:
|
|
out = np.concatenate(seq, axis=1)
|
|
output = np.concatenate((output, out))
|
|
return output
|
|
|
|
|
|
def infer(model, img):
|
|
img = np.array(Image.fromarray(img).resize((608, 608)))
|
|
img = img[:, :, ::-1].transpose((2, 0, 1))
|
|
img = img[np.newaxis, :, :, :] / 255.0
|
|
prediction = model.forward(Tensor(img.astype(np.float32)))
|
|
return prediction
|
|
|
|
|
|
def parse_cfg(cfg):
|
|
# Return a list of blocks
|
|
lines = cfg.decode("utf-8").split("\n")
|
|
lines = [x for x in lines if len(x) > 0]
|
|
lines = [x for x in lines if x[0] != "#"]
|
|
lines = [x.rstrip().lstrip() for x in lines]
|
|
block, blocks = {}, []
|
|
for line in lines:
|
|
if line[0] == "[":
|
|
if len(block) != 0:
|
|
blocks.append(block)
|
|
block = {}
|
|
block["type"] = line[1:-1].rstrip()
|
|
else:
|
|
key, value = line.split("=")
|
|
block[key.rstrip()] = value.lstrip()
|
|
blocks.append(block)
|
|
return blocks
|
|
|
|
|
|
# TODO: Speed up this function, avoid copying stuff from GPU to CPU
|
|
def predict_transform(prediction, inp_dim, anchors, num_classes):
|
|
batch_size = prediction.shape[0]
|
|
stride = inp_dim // prediction.shape[2]
|
|
grid_size = inp_dim // stride
|
|
bbox_attrs = 5 + num_classes
|
|
num_anchors = len(anchors)
|
|
prediction = prediction.reshape(
|
|
shape=(batch_size, bbox_attrs * num_anchors, grid_size * grid_size)
|
|
)
|
|
prediction = prediction.transpose(1, 2)
|
|
prediction = prediction.reshape(
|
|
shape=(batch_size, grid_size * grid_size * num_anchors, bbox_attrs)
|
|
)
|
|
prediction_cpu = prediction.numpy()
|
|
for i in (0, 1, 4):
|
|
prediction_cpu[:, :, i] = 1 / (1 + np.exp(-prediction_cpu[:, :, i]))
|
|
# Add the center offsets
|
|
grid = np.arange(grid_size)
|
|
a, b = np.meshgrid(grid, grid)
|
|
x_offset = a.reshape((-1, 1))
|
|
y_offset = b.reshape((-1, 1))
|
|
x_y_offset = np.concatenate((x_offset, y_offset), 1)
|
|
x_y_offset = np.tile(x_y_offset, (1, num_anchors))
|
|
x_y_offset = x_y_offset.reshape((-1, 2))
|
|
x_y_offset = np.expand_dims(x_y_offset, 0)
|
|
anchors = [(a[0] / stride, a[1] / stride) for a in anchors]
|
|
anchors = np.tile(anchors, (grid_size * grid_size, 1))
|
|
anchors = np.expand_dims(anchors, 0)
|
|
prediction_cpu[:, :, :2] += x_y_offset
|
|
prediction_cpu[:, :, 2:4] = np.exp(prediction_cpu[:, :, 2:4]) * anchors
|
|
prediction_cpu[:, :, 5 : 5 + num_classes] = 1 / (
|
|
1 + np.exp(-prediction_cpu[:, :, 5 : 5 + num_classes])
|
|
)
|
|
prediction_cpu[:, :, :4] *= stride
|
|
return Tensor(prediction_cpu)
|
|
|
|
|
|
class Darknet:
|
|
def __init__(self, cfg):
|
|
self.blocks = parse_cfg(cfg)
|
|
self.net_info, self.module_list = self.create_modules(self.blocks)
|
|
print("Modules length:", len(self.module_list))
|
|
|
|
def create_modules(self, blocks):
|
|
net_info = blocks[0] # Info about model hyperparameters
|
|
prev_filters, filters = 3, None
|
|
output_filters, module_list = [], []
|
|
## module
|
|
for index, x in enumerate(blocks[1:]):
|
|
module_type = x["type"]
|
|
module = []
|
|
if module_type == "convolutional":
|
|
try:
|
|
batch_normalize, bias = int(x["batch_normalize"]), False
|
|
except:
|
|
batch_normalize, bias = 0, True
|
|
# layer
|
|
activation = x["activation"]
|
|
filters = int(x["filters"])
|
|
padding = int(x["pad"])
|
|
pad = (int(x["size"]) - 1) // 2 if padding else 0
|
|
module.append(
|
|
Conv2d(
|
|
prev_filters,
|
|
filters,
|
|
int(x["size"]),
|
|
int(x["stride"]),
|
|
pad,
|
|
bias=bias,
|
|
)
|
|
)
|
|
# BatchNorm2d
|
|
if batch_normalize:
|
|
module.append(
|
|
BatchNorm2d(filters, eps=1e-05, track_running_stats=True)
|
|
)
|
|
# LeakyReLU activation
|
|
if activation == "leaky":
|
|
module.append(lambda x: x.leakyrelu(0.1))
|
|
elif module_type == "maxpool":
|
|
size, stride = int(x["size"]), int(x["stride"])
|
|
module.append(
|
|
lambda x: x.max_pool2d(kernel_size=(size, size), stride=stride)
|
|
)
|
|
elif module_type == "upsample":
|
|
module.append(
|
|
lambda x: Tensor(x.numpy().repeat(2, axis=-2).repeat(2, axis=-1))
|
|
)
|
|
elif module_type == "route":
|
|
x["layers"] = x["layers"].split(",")
|
|
# Start of route
|
|
start = int(x["layers"][0])
|
|
# End if it exists
|
|
try:
|
|
end = int(x["layers"][1])
|
|
except:
|
|
end = 0
|
|
if start > 0:
|
|
start -= index
|
|
if end > 0:
|
|
end -= index
|
|
module.append(lambda x: x)
|
|
if end < 0:
|
|
filters = (
|
|
output_filters[index + start] + output_filters[index + end]
|
|
)
|
|
else:
|
|
filters = output_filters[index + start]
|
|
# Shortcut corresponds to skip connection
|
|
elif module_type == "shortcut":
|
|
module.append(lambda x: x)
|
|
elif module_type == "yolo":
|
|
mask = list(map(int, x["mask"].split(",")))
|
|
anchors = [int(a) for a in x["anchors"].split(",")]
|
|
anchors = [
|
|
(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)
|
|
]
|
|
module.append([anchors[i] for i in mask])
|
|
# Append to module_list
|
|
module_list.append(module)
|
|
if filters is not None:
|
|
prev_filters = filters
|
|
output_filters.append(filters)
|
|
return (net_info, module_list)
|
|
|
|
def dump_weights(self):
|
|
for i in range(len(self.module_list)):
|
|
module_type = self.blocks[i + 1]["type"]
|
|
if module_type == "convolutional":
|
|
print(self.blocks[i + 1]["type"], "weights", i)
|
|
model = self.module_list[i]
|
|
conv = model[0]
|
|
print(conv.weight.numpy()[0][0][0])
|
|
if conv.bias is not None:
|
|
print("biases")
|
|
print(conv.bias.shape)
|
|
print(conv.bias.numpy()[0][0:5])
|
|
else:
|
|
print("None biases for layer", i)
|
|
|
|
def load_weights(self, url):
|
|
weights = np.frombuffer(fetch(url), dtype=np.float32)[5:]
|
|
ptr = 0
|
|
for i in range(len(self.module_list)):
|
|
module_type = self.blocks[i + 1]["type"]
|
|
if module_type == "convolutional":
|
|
model = self.module_list[i]
|
|
try: # we have batchnorm, load conv weights without biases, and batchnorm values
|
|
batch_normalize = int(self.blocks[i + 1]["batch_normalize"])
|
|
except: # no batchnorm, load conv weights + biases
|
|
batch_normalize = 0
|
|
conv = model[0]
|
|
if batch_normalize:
|
|
bn = model[1]
|
|
# Get the number of weights of batchnorm
|
|
num_bn_biases = math.prod(bn.bias.shape)
|
|
# Load weights
|
|
bn_biases = Tensor(weights[ptr : ptr + num_bn_biases])
|
|
ptr += num_bn_biases
|
|
bn_weights = Tensor(weights[ptr : ptr + num_bn_biases])
|
|
ptr += num_bn_biases
|
|
bn_running_mean = Tensor(weights[ptr : ptr + num_bn_biases])
|
|
ptr += num_bn_biases
|
|
bn_running_var = Tensor(weights[ptr : ptr + num_bn_biases])
|
|
ptr += num_bn_biases
|
|
# Cast the loaded weights into dims of model weights
|
|
bn_biases = bn_biases.reshape(shape=tuple(bn.bias.shape))
|
|
bn_weights = bn_weights.reshape(shape=tuple(bn.weight.shape))
|
|
bn_running_mean = bn_running_mean.reshape(
|
|
shape=tuple(bn.running_mean.shape)
|
|
)
|
|
bn_running_var = bn_running_var.reshape(
|
|
shape=tuple(bn.running_var.shape)
|
|
)
|
|
# Copy data
|
|
bn.bias = bn_biases
|
|
bn.weight = bn_weights
|
|
bn.running_mean = bn_running_mean
|
|
bn.running_var = bn_running_var
|
|
else:
|
|
# load biases of the conv layer
|
|
num_biases = math.prod(conv.bias.shape)
|
|
# Load weights
|
|
conv_biases = Tensor(weights[ptr : ptr + num_biases])
|
|
ptr += num_biases
|
|
# Reshape
|
|
conv_biases = conv_biases.reshape(shape=tuple(conv.bias.shape))
|
|
# Copy
|
|
conv.bias = conv_biases
|
|
# Load weighys for conv layers
|
|
num_weights = math.prod(conv.weight.shape)
|
|
conv_weights = Tensor(weights[ptr : ptr + num_weights])
|
|
ptr += num_weights
|
|
conv_weights = conv_weights.reshape(shape=tuple(conv.weight.shape))
|
|
conv.weight = conv_weights
|
|
|
|
def forward(self, x):
|
|
modules = self.blocks[1:]
|
|
outputs = {} # Cached outputs for route layer
|
|
detections, write = None, False
|
|
for i, module in enumerate(modules):
|
|
module_type = module["type"]
|
|
if module_type == "convolutional" or module_type == "upsample":
|
|
for layer in self.module_list[i]:
|
|
x = layer(x)
|
|
elif module_type == "route":
|
|
layers = module["layers"]
|
|
layers = [int(a) for a in layers]
|
|
if (layers[0]) > 0:
|
|
layers[0] = layers[0] - i
|
|
if len(layers) == 1:
|
|
x = outputs[i + (layers[0])]
|
|
else:
|
|
if (layers[1]) > 0:
|
|
layers[1] = layers[1] - i
|
|
map1 = outputs[i + layers[0]]
|
|
map2 = outputs[i + layers[1]]
|
|
x = Tensor(np.concatenate((map1.numpy(), map2.numpy()), axis=1))
|
|
elif module_type == "shortcut":
|
|
from_ = int(module["from"])
|
|
x = outputs[i - 1] + outputs[i + from_]
|
|
elif module_type == "yolo":
|
|
anchors = self.module_list[i][0]
|
|
inp_dim = int(self.net_info["height"]) # 416
|
|
num_classes = int(module["classes"])
|
|
x = predict_transform(x, inp_dim, anchors, num_classes)
|
|
if not write:
|
|
detections, write = x, True
|
|
else:
|
|
detections = Tensor(
|
|
np.concatenate((detections.numpy(), x.numpy()), axis=1)
|
|
)
|
|
outputs[i] = x
|
|
return detections
|
|
|
|
|
|
if __name__ == "__main__":
|
|
model = Darknet(
|
|
fetch(
|
|
"https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg"
|
|
)
|
|
)
|
|
print("Loading weights file (237MB). This might take a while…")
|
|
model.load_weights("https://pjreddie.com/media/files/yolov3.weights")
|
|
if len(sys.argv) > 1:
|
|
url = sys.argv[1]
|
|
else:
|
|
url = "https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png"
|
|
if url == "webcam":
|
|
cap = cv2.VideoCapture(0)
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
while 1:
|
|
_ = cap.grab() # discard one frame to circumvent capture buffering
|
|
ret, frame = cap.read()
|
|
prediction = process_results(infer(model, frame))
|
|
img = Image.fromarray(frame[:, :, [2, 1, 0]])
|
|
boxes = add_boxes(np.array(img.resize((608, 608))), prediction)
|
|
boxes = cv2.cvtColor(boxes, cv2.COLOR_RGB2BGR)
|
|
cv2.imshow("yolo", boxes)
|
|
if cv2.waitKey(1) & 0xFF == ord("q"):
|
|
break
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
elif url.startswith("http"):
|
|
img_stream = io.BytesIO(fetch(url))
|
|
img = cv2.imdecode(np.frombuffer(img_stream.read(), np.uint8), 1)
|
|
else:
|
|
img = cv2.imread(url)
|
|
st = time.time()
|
|
print("running inference…")
|
|
prediction = infer(model, img)
|
|
print(f"did inference in {(time.time() - st):2f}s")
|
|
show_labels(prediction)
|
|
prediction = process_results(prediction)
|
|
boxes = add_boxes(np.array(Image.fromarray(img).resize((608, 608))), prediction)
|
|
cv2.imwrite("boxes.jpg", boxes)
|