#!/usr/bin/env python3 # # wut-worker-mas.py # # https://spacecruft.org/spacecruft/satnogs-wut # # Distributed Learning from __future__ import absolute_import, division, print_function, unicode_literals from __future__ import print_function import os import json import numpy as np import datetime import tensorflow as tf import tensorflow.python.keras from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D from tensorflow.python.keras import optimizers from tensorflow.python.keras import Sequential from tensorflow.python.keras.layers import Activation, Dropout, Flatten, Dense from tensorflow.python.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D from tensorflow.python.keras.layers import Input, concatenate from tensorflow.python.keras.models import load_model from tensorflow.python.keras.models import Model from tensorflow.python.keras.preprocessing import image from tensorflow.python.keras.preprocessing.image import img_to_array from tensorflow.python.keras.preprocessing.image import ImageDataGenerator from tensorflow.python.keras.preprocessing.image import load_img tf.keras.backend.clear_session() tf.config.optimizer.set_jit(True) options = tf.data.Options() os.environ["TF_CONFIG"] = json.dumps({ "cluster": { "chief": [ "ml0-int:2222" ], "worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ] }, "task": {"type": "chief", "index": 0 }, }) print("Tensorflow Version: ", tf.__version__) print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) IMG_HEIGHT = 416 IMG_WIDTH= 804 batch_size = 32 epochs = 4 BUFFER_SIZE = 10000 NUM_WORKERS = 6 GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS # XXX POSITIVE_DIRECTORY = '/home/jebba/devel/spacecruft/satnogs-wut/data/pos' pos_dir = '/home/jebba/devel/spacecruft/satnogs-wut/data/posdir' strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( tf.distribute.experimental.CollectiveCommunication.RING) def get_bytes_and_label(filepath): raw_bytes = tf.io.read_file(filepath) label = tf.strings.regex_full_match( POSITIVE_DIRECTORY, pos_dir + ".+") return raw_bytes, label def uncompiled_model(): model = Sequential([ Conv2D(16, 3, padding='same', activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)), MaxPooling2D(), Conv2D(32, 3, padding='same', activation='relu'), MaxPooling2D(), Conv2D(64, 3, padding='same', activation='relu'), MaxPooling2D(), Flatten(), Dense(512, activation='relu'), Dense(1, activation='sigmoid') ]) return model input_shape=(IMG_HEIGHT, IMG_WIDTH ,3) def process_image(image_bytes, label): image = tf.io.decode_png(image_bytes) #image = tf.image.resize(image, resolution) image.set_shape(input_shape) #image = image / 255. - 0.5 #image = tf.image.random_flip_left_right(image) #image = tf.image.random_flip_up_down(image) #image += tf.random.normal( # image.shape, mean=0, steddev=0.1) return image, tf.cast(label, tf.float32) AUTOTUNE = tf.data.experimental.AUTOTUNE NUM_TOTAL_IMAGES=100 data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data" profile_dir = os.path.join(data_root, "profiles") dataset = tf.data.Dataset.list_files(data_root) dataset = dataset.shuffle(NUM_TOTAL_IMAGES) dataset = dataset.map(get_bytes_and_label, num_parallel_calls=AUTOTUNE) dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE) dataset = dataset.batch(batch_size=32) dataset = dataset.prefetch(buffer_size=AUTOTUNE) os.makedirs(profile_dir, exist_ok=True) # tf.data.Dataset.from_generator #tf.summary.trace_on(profiler=True) def compiled_model(): model = uncompiled_model() model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) return model with strategy.scope(): #model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) #optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) #loss_fn = tf.nn.sigmoid_cross_entropy_with_logits #model.compile(..., optimizer=optimizer) model = uncompiled_model() model = compiled_model() #model.fit(train_dataset, epochs=10) model.fit( train_data_gen, steps_per_epoch=total_train // batch_size, epochs=epochs, validation_data=val_data_gen, validation_steps=total_val // batch_size, verbose=2 ) #tf.summary.trace_export(name=trace-export,profiler_outdir=logs) with strategy.scope(): #model, loss_fn, optimzer = ... @tf.function def replicated_step(features, labels): return strategy.experimental_run_v2(step, (features, labels)) with tf.GradientTape() as tape: logits = model(features, training=True) loss = tf.nn.compute_average_loss( loss, global_batch_size=global_batch_size) grads = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) return loss data = strategy.experimental_distribute_dataset(data) for features, labels in data: loss = replicated_step(features, labels) def data_generator(): batch = [] shuffle(data) for image_path, label in data: # Load from disk image = imread(image_path) # Resize # image = resize(image, resolution) # Horizontal and vertical flip #image = random_flip(image) # Normalize and add Gaussian noise #image = normalize_and_add_noise(image) batch.append((image, label)) handle_batching # XXX ? def handle_batching(): if len(batch) == batch_size: yield concat(batch) batch.reset() train_dir = os.path.join('data/', 'train') val_dir = os.path.join('data/', 'val') train_good_dir = os.path.join(train_dir, 'good') train_bad_dir = os.path.join(train_dir, 'bad') val_good_dir = os.path.join(val_dir, 'good') val_bad_dir = os.path.join(val_dir, 'bad') num_train_good = len(os.listdir(train_good_dir)) num_train_bad = len(os.listdir(train_bad_dir)) num_val_good = len(os.listdir(val_good_dir)) num_val_bad = len(os.listdir(val_bad_dir)) total_train = num_train_good + num_train_bad total_val = num_val_good + num_val_bad print('total training good images:', num_train_good) print('total training bad images:', num_train_bad) print("--") print("Total training images:", total_train) print('total validation good images:', num_val_good) print('total validation bad images:', num_val_bad) print("--") print("Total validation images:", total_val) print("--") print("Reduce training and validation set when testing") #total_train = 16 #total_val = 16 print("Reduced training images:", total_train) print("Reduced validation images:", total_val) #train_image_generator = ImageDataGenerator( # rescale=1./255 #) #val_image_generator = ImageDataGenerator( # rescale=1./255 #) #train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size, # directory=train_dir, # shuffle=True, # target_size=(IMG_HEIGHT, IMG_WIDTH), # class_mode='binary') #val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size, # directory=val_dir, # target_size=(IMG_HEIGHT, IMG_WIDTH), # class_mode='binary')