diff --git a/wut-worker-mas.py b/wut-worker-mas.py index 9735a7e..45d0dda 100644 --- a/wut-worker-mas.py +++ b/wut-worker-mas.py @@ -2,11 +2,30 @@ # # wut-worker-mas.py # +# https://spacecruft.org/spacecruft/satnogs-wut +# # Distributed Learning -import tensorflow as tf -import json +from __future__ import absolute_import, division, print_function, unicode_literals +from __future__ import print_function import os +import json +import numpy as np +import datetime +import tensorflow as tf +import tensorflow.python.keras +from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D +from tensorflow.python.keras import optimizers +from tensorflow.python.keras import Sequential +from tensorflow.python.keras.layers import Activation, Dropout, Flatten, Dense +from tensorflow.python.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D +from tensorflow.python.keras.layers import Input, concatenate +from tensorflow.python.keras.models import load_model +from tensorflow.python.keras.models import Model +from tensorflow.python.keras.preprocessing import image +from tensorflow.python.keras.preprocessing.image import img_to_array +from tensorflow.python.keras.preprocessing.image import ImageDataGenerator +from tensorflow.python.keras.preprocessing.image import load_img os.environ["TF_CONFIG"] = json.dumps({ "cluster": { @@ -15,12 +34,49 @@ os.environ["TF_CONFIG"] = json.dumps({ "task": {"type": "worker", "index": 0 }, }) +IMG_HEIGHT = 416 +IMG_WIDTH= 804 +batch_size = 32 +epochs = 4 + +BUFFER_SIZE = 10000 +NUM_WORKERS = 6 +GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS + +POSITIVE_DIRECTORY = '/home/jebba/devel/spacecruft/satnogs-wut/data/pos' +pos_dir = '/home/jebba/devel/spacecruft/satnogs-wut/data/posdir' def get_bytes_and_label(filepath): raw_bytes = tf.io.read_file(filepath) label = tf.strings.regex_full_match( POSITIVE_DIRECTORY, pos_dir + ".+") return raw_bytes, label +def uncompiled_model(): + model = Sequential([ + Conv2D(16, 3, padding='same', activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)), + MaxPooling2D(), + Conv2D(32, 3, padding='same', activation='relu'), + MaxPooling2D(), + Conv2D(64, 3, padding='same', activation='relu'), + MaxPooling2D(), + Flatten(), + Dense(512, activation='relu'), + Dense(1, activation='sigmoid') + ]) + return model + +input_shape=(IMG_HEIGHT, IMG_WIDTH ,3) +def process_image(image_bytes, label): + image = tf.io.decode_png(image_bytes) + #image = tf.image.resize(image, resolution) + image.set_shape(input_shape) + #image = image / 255. - 0.5 + #image = tf.image.random_flip_left_right(image) + #image = tf.image.random_flip_up_down(image) + #image += tf.random.normal( + # image.shape, mean=0, steddev=0.1) + return image, tf.cast(label, tf.float32) + AUTOTUNE = tf.data.experimental.AUTOTUNE NUM_TOTAL_IMAGES=100 data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data" @@ -34,37 +90,53 @@ dataset = dataset.prefetch(buffer_size=AUTOTUNE) print(tf.__version__) print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) -with tf.device("GPU:0"): - tf.ones(()) # Make sure we can run on GPU +print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) +#with tf.device("GPU:0"): +# tf.ones(()) # Make sure we can run on GPU # This ensures that XLA and ptxas work well together, and helps with scaling. print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) os.makedirs(profile_dir, exist_ok=True) -barf() - # tf.data.Dataset.from_generator tf.config.optimizer.set_jit(True) tf.summary.trace_on(profiler=True) -strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() +strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + tf.distribute.experimental.CollectiveCommunication.RING) + + +def compiled_model(): + model = uncompiled_model() + model.compile(optimizer='adam', + loss='binary_crossentropy', + metrics=['accuracy']) + return model with strategy.scope(): -# model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) - optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) - loss_fn = tf.nn.sigmoid_cross_entropy_with_logits - model.compile(..., optimezer=optimizer) - model.fit(train_dataset, epochs=10) + #model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) + #optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) + #loss_fn = tf.nn.sigmoid_cross_entropy_with_logits + #model.compile(..., optimizer=optimizer) + model = uncompiled_model() + model = compiled_model() + #model.fit(train_dataset, epochs=10) + model.fit( + train_data_gen, + steps_per_epoch=total_train // batch_size, + epochs=epochs, + validation_data=val_data_gen, + validation_steps=total_val // batch_size, + verbose=2 + ) tf.summary.trace_export(name=trace-export,profiler_outdir=logs) -strategy = tf.distribute.MirroredStrategy() with strategy.scope(): - model, loss_fn, optimzer = ... - + #model, loss_fn, optimzer = ... @tf.function def replicated_step(features, labels): return strategy.experimental_run_v2(step, (features, labels)) @@ -77,7 +149,7 @@ with strategy.scope(): optimizer.apply_gradients(zip(grads, model.trainable_variables)) return loss - data = strategey.experimental_distribute_dataset(data) + data = strategy.experimental_distribute_dataset(data) for features, labels in data: loss = replicated_step(features, labels) @@ -89,7 +161,7 @@ def data_generator(): # Load from disk image = imread(image_path) # Resize - image = resize(image, resolution) + # image = resize(image, resolution) # Horizontal and vertical flip #image = random_flip(image) # Normalize and add Gaussian noise @@ -103,16 +175,54 @@ def handle_batching(): yield concat(batch) batch.reset() -def process_image(image_bytes, label): - image = tf.io.decode_png(image_bytes) - image = tf.image.resize(image, resolution) - image.set_shape(input_shape) - image = image / 255. - 0.5 +train_dir = os.path.join('data/', 'train') +val_dir = os.path.join('data/', 'val') +train_good_dir = os.path.join(train_dir, 'good') +train_bad_dir = os.path.join(train_dir, 'bad') +val_good_dir = os.path.join(val_dir, 'good') +val_bad_dir = os.path.join(val_dir, 'bad') +num_train_good = len(os.listdir(train_good_dir)) +num_train_bad = len(os.listdir(train_bad_dir)) +num_val_good = len(os.listdir(val_good_dir)) +num_val_bad = len(os.listdir(val_bad_dir)) +total_train = num_train_good + num_train_bad +total_val = num_val_good + num_val_bad - #image = tf.image.random_flip_left_right(image) - #image = tf.image.random_flip_up_down(image) - #image += tf.random.normal( - # image.shape, mean=0, steddev=0.1) +print('total training good images:', num_train_good) +print('total training bad images:', num_train_bad) +print("--") +print("Total training images:", total_train) +print('total validation good images:', num_val_good) +print('total validation bad images:', num_val_bad) +print("--") +print("Total validation images:", total_val) +print("--") +print("Reduce training and validation set when testing") +#total_train = 16 +#total_val = 16 +print("Reduced training images:", total_train) +print("Reduced validation images:", total_val) - return image, tf.cast(label, tf.float32) + +tf.keras.backend.clear_session() + +options = tf.data.Options() + + +train_image_generator = ImageDataGenerator( + rescale=1./255 +) +val_image_generator = ImageDataGenerator( + rescale=1./255 +) + +#train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size, +# directory=train_dir, +# shuffle=True, +# target_size=(IMG_HEIGHT, IMG_WIDTH), +# class_mode='binary') +#val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size, +# directory=val_dir, +# target_size=(IMG_HEIGHT, IMG_WIDTH), +# class_mode='binary')