setup wut-worker-mas

master
ml server 2020-01-20 10:08:37 -07:00
parent 051346789f
commit c6aa78fa0e
1 changed files with 137 additions and 27 deletions

View File

@ -2,11 +2,30 @@
# #
# wut-worker-mas.py # wut-worker-mas.py
# #
# https://spacecruft.org/spacecruft/satnogs-wut
#
# Distributed Learning # Distributed Learning
import tensorflow as tf from __future__ import absolute_import, division, print_function, unicode_literals
import json from __future__ import print_function
import os import os
import json
import numpy as np
import datetime
import tensorflow as tf
import tensorflow.python.keras
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D
from tensorflow.python.keras import optimizers
from tensorflow.python.keras import Sequential
from tensorflow.python.keras.layers import Activation, Dropout, Flatten, Dense
from tensorflow.python.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D
from tensorflow.python.keras.layers import Input, concatenate
from tensorflow.python.keras.models import load_model
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.preprocessing import image
from tensorflow.python.keras.preprocessing.image import img_to_array
from tensorflow.python.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.keras.preprocessing.image import load_img
os.environ["TF_CONFIG"] = json.dumps({ os.environ["TF_CONFIG"] = json.dumps({
"cluster": { "cluster": {
@ -15,12 +34,49 @@ os.environ["TF_CONFIG"] = json.dumps({
"task": {"type": "worker", "index": 0 }, "task": {"type": "worker", "index": 0 },
}) })
IMG_HEIGHT = 416
IMG_WIDTH= 804
batch_size = 32
epochs = 4
BUFFER_SIZE = 10000
NUM_WORKERS = 6
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS
POSITIVE_DIRECTORY = '/home/jebba/devel/spacecruft/satnogs-wut/data/pos'
pos_dir = '/home/jebba/devel/spacecruft/satnogs-wut/data/posdir'
def get_bytes_and_label(filepath): def get_bytes_and_label(filepath):
raw_bytes = tf.io.read_file(filepath) raw_bytes = tf.io.read_file(filepath)
label = tf.strings.regex_full_match( label = tf.strings.regex_full_match(
POSITIVE_DIRECTORY, pos_dir + ".+") POSITIVE_DIRECTORY, pos_dir + ".+")
return raw_bytes, label return raw_bytes, label
def uncompiled_model():
model = Sequential([
Conv2D(16, 3, padding='same', activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)),
MaxPooling2D(),
Conv2D(32, 3, padding='same', activation='relu'),
MaxPooling2D(),
Conv2D(64, 3, padding='same', activation='relu'),
MaxPooling2D(),
Flatten(),
Dense(512, activation='relu'),
Dense(1, activation='sigmoid')
])
return model
input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)
def process_image(image_bytes, label):
image = tf.io.decode_png(image_bytes)
#image = tf.image.resize(image, resolution)
image.set_shape(input_shape)
#image = image / 255. - 0.5
#image = tf.image.random_flip_left_right(image)
#image = tf.image.random_flip_up_down(image)
#image += tf.random.normal(
# image.shape, mean=0, steddev=0.1)
return image, tf.cast(label, tf.float32)
AUTOTUNE = tf.data.experimental.AUTOTUNE AUTOTUNE = tf.data.experimental.AUTOTUNE
NUM_TOTAL_IMAGES=100 NUM_TOTAL_IMAGES=100
data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data" data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data"
@ -34,37 +90,53 @@ dataset = dataset.prefetch(buffer_size=AUTOTUNE)
print(tf.__version__) print(tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
with tf.device("GPU:0"): print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU')))
tf.ones(()) # Make sure we can run on GPU #with tf.device("GPU:0"):
# tf.ones(()) # Make sure we can run on GPU
# This ensures that XLA and ptxas work well together, and helps with scaling. # This ensures that XLA and ptxas work well together, and helps with scaling.
print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS")))
os.makedirs(profile_dir, exist_ok=True) os.makedirs(profile_dir, exist_ok=True)
barf()
# tf.data.Dataset.from_generator # tf.data.Dataset.from_generator
tf.config.optimizer.set_jit(True) tf.config.optimizer.set_jit(True)
tf.summary.trace_on(profiler=True) tf.summary.trace_on(profiler=True)
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
tf.distribute.experimental.CollectiveCommunication.RING)
def compiled_model():
model = uncompiled_model()
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy'])
return model
with strategy.scope(): with strategy.scope():
# model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) #model = tf.keras.applications.mobilenet_v2.MobileNetV2(...)
optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) #optimizer = tf.keras.optimzers.SGD(learning_rate=0.01)
loss_fn = tf.nn.sigmoid_cross_entropy_with_logits #loss_fn = tf.nn.sigmoid_cross_entropy_with_logits
model.compile(..., optimezer=optimizer) #model.compile(..., optimizer=optimizer)
model.fit(train_dataset, epochs=10) model = uncompiled_model()
model = compiled_model()
#model.fit(train_dataset, epochs=10)
model.fit(
train_data_gen,
steps_per_epoch=total_train // batch_size,
epochs=epochs,
validation_data=val_data_gen,
validation_steps=total_val // batch_size,
verbose=2
)
tf.summary.trace_export(name=trace-export,profiler_outdir=logs) tf.summary.trace_export(name=trace-export,profiler_outdir=logs)
strategy = tf.distribute.MirroredStrategy()
with strategy.scope(): with strategy.scope():
model, loss_fn, optimzer = ... #model, loss_fn, optimzer = ...
@tf.function @tf.function
def replicated_step(features, labels): def replicated_step(features, labels):
return strategy.experimental_run_v2(step, (features, labels)) return strategy.experimental_run_v2(step, (features, labels))
@ -77,7 +149,7 @@ with strategy.scope():
optimizer.apply_gradients(zip(grads, model.trainable_variables)) optimizer.apply_gradients(zip(grads, model.trainable_variables))
return loss return loss
data = strategey.experimental_distribute_dataset(data) data = strategy.experimental_distribute_dataset(data)
for features, labels in data: for features, labels in data:
loss = replicated_step(features, labels) loss = replicated_step(features, labels)
@ -89,7 +161,7 @@ def data_generator():
# Load from disk # Load from disk
image = imread(image_path) image = imread(image_path)
# Resize # Resize
image = resize(image, resolution) # image = resize(image, resolution)
# Horizontal and vertical flip # Horizontal and vertical flip
#image = random_flip(image) #image = random_flip(image)
# Normalize and add Gaussian noise # Normalize and add Gaussian noise
@ -103,16 +175,54 @@ def handle_batching():
yield concat(batch) yield concat(batch)
batch.reset() batch.reset()
def process_image(image_bytes, label): train_dir = os.path.join('data/', 'train')
image = tf.io.decode_png(image_bytes) val_dir = os.path.join('data/', 'val')
image = tf.image.resize(image, resolution) train_good_dir = os.path.join(train_dir, 'good')
image.set_shape(input_shape) train_bad_dir = os.path.join(train_dir, 'bad')
image = image / 255. - 0.5 val_good_dir = os.path.join(val_dir, 'good')
val_bad_dir = os.path.join(val_dir, 'bad')
num_train_good = len(os.listdir(train_good_dir))
num_train_bad = len(os.listdir(train_bad_dir))
num_val_good = len(os.listdir(val_good_dir))
num_val_bad = len(os.listdir(val_bad_dir))
total_train = num_train_good + num_train_bad
total_val = num_val_good + num_val_bad
#image = tf.image.random_flip_left_right(image) print('total training good images:', num_train_good)
#image = tf.image.random_flip_up_down(image) print('total training bad images:', num_train_bad)
#image += tf.random.normal( print("--")
# image.shape, mean=0, steddev=0.1) print("Total training images:", total_train)
print('total validation good images:', num_val_good)
print('total validation bad images:', num_val_bad)
print("--")
print("Total validation images:", total_val)
print("--")
print("Reduce training and validation set when testing")
#total_train = 16
#total_val = 16
print("Reduced training images:", total_train)
print("Reduced validation images:", total_val)
return image, tf.cast(label, tf.float32)
tf.keras.backend.clear_session()
options = tf.data.Options()
train_image_generator = ImageDataGenerator(
rescale=1./255
)
val_image_generator = ImageDataGenerator(
rescale=1./255
)
#train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size,
# directory=train_dir,
# shuffle=True,
# target_size=(IMG_HEIGHT, IMG_WIDTH),
# class_mode='binary')
#val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size,
# directory=val_dir,
# target_size=(IMG_HEIGHT, IMG_WIDTH),
# class_mode='binary')