satnogs-wut/src/wut-worker-mas.py

223 lines
7.3 KiB
Python
Raw Normal View History

2020-01-19 22:46:48 -07:00
#!/usr/bin/env python3
2020-01-20 09:33:46 -07:00
#
# wut-worker-mas.py
#
2020-01-20 10:08:37 -07:00
# https://spacecruft.org/spacecruft/satnogs-wut
#
2020-01-20 09:33:46 -07:00
# Distributed Learning
2020-01-19 22:46:48 -07:00
2020-01-20 10:08:37 -07:00
from __future__ import absolute_import, division, print_function, unicode_literals
from __future__ import print_function
2020-01-20 09:33:46 -07:00
import os
2020-01-20 10:08:37 -07:00
import json
import numpy as np
import datetime
import tensorflow as tf
import tensorflow.python.keras
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D
2022-08-16 18:58:50 -06:00
from tensorflow.keras import optimizers
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense
from tensorflow.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D
from tensorflow.keras.layers import Input, concatenate
from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import load_img
2020-01-20 09:33:46 -07:00
2020-01-20 13:01:58 -07:00
tf.keras.backend.clear_session()
2020-01-20 19:09:22 -07:00
tf.config.optimizer.set_jit(True)
2020-01-20 13:01:58 -07:00
options = tf.data.Options()
2020-01-20 09:33:46 -07:00
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
2020-01-20 13:01:58 -07:00
"chief": [ "ml0-int:2222" ],
"worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ]
2020-01-20 09:33:46 -07:00
},
2020-01-20 13:01:58 -07:00
"task": {"type": "chief", "index": 0 },
2020-01-20 09:33:46 -07:00
})
2020-01-20 13:01:58 -07:00
print("Tensorflow Version: ", tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU')))
print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS")))
2020-01-20 09:33:46 -07:00
2020-01-20 10:08:37 -07:00
IMG_HEIGHT = 416
IMG_WIDTH= 804
batch_size = 32
epochs = 4
BUFFER_SIZE = 10000
NUM_WORKERS = 6
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS
2020-01-20 10:12:12 -07:00
# XXX
2020-01-26 17:27:12 -07:00
POSITIVE_DIRECTORY = '/srv/satnogs/data/pos'
pos_dir = '/srv/satnogs/data/posdir'
2020-01-20 10:12:12 -07:00
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
tf.distribute.experimental.CollectiveCommunication.RING)
2020-01-20 09:33:46 -07:00
def get_bytes_and_label(filepath):
raw_bytes = tf.io.read_file(filepath)
label = tf.strings.regex_full_match(
POSITIVE_DIRECTORY, pos_dir + ".+")
return raw_bytes, label
2020-01-20 10:08:37 -07:00
def uncompiled_model():
model = Sequential([
Conv2D(16, 3, padding='same', activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
MaxPooling2D(),
Conv2D(32, 3, padding='same', activation='relu'),
MaxPooling2D(),
Conv2D(64, 3, padding='same', activation='relu'),
MaxPooling2D(),
Flatten(),
Dense(512, activation='relu'),
Dense(1, activation='sigmoid')
])
return model
input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)
def process_image(image_bytes, label):
image = tf.io.decode_png(image_bytes)
#image = tf.image.resize(image, resolution)
image.set_shape(input_shape)
#image = image / 255. - 0.5
#image = tf.image.random_flip_left_right(image)
#image = tf.image.random_flip_up_down(image)
#image += tf.random.normal(
# image.shape, mean=0, steddev=0.1)
return image, tf.cast(label, tf.float32)
2020-01-20 09:33:46 -07:00
AUTOTUNE = tf.data.experimental.AUTOTUNE
NUM_TOTAL_IMAGES=100
2020-01-26 17:27:12 -07:00
data_root = "/srv/satnogs/data"
2020-01-20 09:33:46 -07:00
profile_dir = os.path.join(data_root, "profiles")
dataset = tf.data.Dataset.list_files(data_root)
dataset = dataset.shuffle(NUM_TOTAL_IMAGES)
dataset = dataset.map(get_bytes_and_label, num_parallel_calls=AUTOTUNE)
dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE)
dataset = dataset.batch(batch_size=32)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
os.makedirs(profile_dir, exist_ok=True)
# tf.data.Dataset.from_generator
2020-01-20 13:01:58 -07:00
#tf.summary.trace_on(profiler=True)
2020-01-20 09:33:46 -07:00
2020-01-20 10:08:37 -07:00
def compiled_model():
model = uncompiled_model()
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy'])
return model
2020-01-20 09:33:46 -07:00
with strategy.scope():
2020-01-20 10:08:37 -07:00
#model = tf.keras.applications.mobilenet_v2.MobileNetV2(...)
#optimizer = tf.keras.optimzers.SGD(learning_rate=0.01)
#loss_fn = tf.nn.sigmoid_cross_entropy_with_logits
#model.compile(..., optimizer=optimizer)
model = uncompiled_model()
model = compiled_model()
#model.fit(train_dataset, epochs=10)
model.fit(
train_data_gen,
steps_per_epoch=total_train // batch_size,
epochs=epochs,
validation_data=val_data_gen,
validation_steps=total_val // batch_size,
verbose=2
)
2020-01-20 09:33:46 -07:00
2020-01-20 13:01:58 -07:00
#tf.summary.trace_export(name=trace-export,profiler_outdir=logs)
2020-01-20 09:33:46 -07:00
with strategy.scope():
2020-01-20 10:08:37 -07:00
#model, loss_fn, optimzer = ...
2020-01-20 09:33:46 -07:00
@tf.function
def replicated_step(features, labels):
return strategy.experimental_run_v2(step, (features, labels))
with tf.GradientTape() as tape:
logits = model(features, training=True)
loss = tf.nn.compute_average_loss(
loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
return loss
2020-01-20 10:08:37 -07:00
data = strategy.experimental_distribute_dataset(data)
2020-01-20 09:33:46 -07:00
for features, labels in data:
loss = replicated_step(features, labels)
def data_generator():
batch = []
shuffle(data)
for image_path, label in data:
# Load from disk
image = imread(image_path)
# Resize
2020-01-20 10:08:37 -07:00
# image = resize(image, resolution)
2020-01-20 09:33:46 -07:00
# Horizontal and vertical flip
#image = random_flip(image)
# Normalize and add Gaussian noise
#image = normalize_and_add_noise(image)
batch.append((image, label))
handle_batching
# XXX ?
def handle_batching():
if len(batch) == batch_size:
yield concat(batch)
batch.reset()
2020-01-26 17:27:12 -07:00
train_dir = os.path.join('/srv/satnogs/data/', 'train')
val_dir = os.path.join('/srv/satnogs/data/', 'val')
2020-01-20 10:08:37 -07:00
train_good_dir = os.path.join(train_dir, 'good')
train_bad_dir = os.path.join(train_dir, 'bad')
val_good_dir = os.path.join(val_dir, 'good')
val_bad_dir = os.path.join(val_dir, 'bad')
num_train_good = len(os.listdir(train_good_dir))
num_train_bad = len(os.listdir(train_bad_dir))
num_val_good = len(os.listdir(val_good_dir))
num_val_bad = len(os.listdir(val_bad_dir))
total_train = num_train_good + num_train_bad
total_val = num_val_good + num_val_bad
print('total training good images:', num_train_good)
print('total training bad images:', num_train_bad)
print("--")
print("Total training images:", total_train)
print('total validation good images:', num_val_good)
print('total validation bad images:', num_val_bad)
print("--")
print("Total validation images:", total_val)
print("--")
print("Reduce training and validation set when testing")
#total_train = 16
#total_val = 16
print("Reduced training images:", total_train)
print("Reduced validation images:", total_val)
2020-01-20 13:01:58 -07:00
#train_image_generator = ImageDataGenerator(
# rescale=1./255
#)
#val_image_generator = ImageDataGenerator(
# rescale=1./255
#)
2020-01-20 10:08:37 -07:00
#train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size,
# directory=train_dir,
# shuffle=True,
# target_size=(IMG_HEIGHT, IMG_WIDTH),
# class_mode='binary')
#val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size,
# directory=val_dir,
# target_size=(IMG_HEIGHT, IMG_WIDTH),
# class_mode='binary')
2020-01-19 22:46:48 -07:00