From cc31af10627b13726f268f75b535c72699fc2c70 Mon Sep 17 00:00:00 2001 From: ml server Date: Sat, 18 Jan 2020 19:25:23 -0700 Subject: [PATCH 1/8] Distribution is working, then breaks... --- jupyter/wut-train-cluster-fn.ipynb | 148 +++++++---------------------- 1 file changed, 34 insertions(+), 114 deletions(-) diff --git a/jupyter/wut-train-cluster-fn.ipynb b/jupyter/wut-train-cluster-fn.ipynb index cc8046e..2ad5653 100644 --- a/jupyter/wut-train-cluster-fn.ipynb +++ b/jupyter/wut-train-cluster-fn.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -17,7 +17,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -31,7 +31,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -53,7 +53,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -68,24 +68,16 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "tf 2.1.0\n" - ] - } - ], + "outputs": [], "source": [ "print('tf {}'.format(tf.__version__))" ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -100,7 +92,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -118,7 +110,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -127,7 +119,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -137,19 +129,9 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:worker/replica:0/task:0/device:CPU:0', '/job:worker/replica:0/task:0/device:XLA_CPU:0']\n", - "INFO:tensorflow:Using MirroredStrategy with devices ('/job:worker/task:0',)\n", - "INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'worker': ['10.100.100.130:2222', 'ml1:2222', 'ml2:2222', 'ml3:2222', 'ml4:2222', 'ml5:2222']}, task_type = 'worker', task_id = 0, num_workers = 6, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.RING\n" - ] - } - ], + "outputs": [], "source": [ "strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(\n", " tf.distribute.experimental.CollectiveCommunication.RING)\n", @@ -168,7 +150,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -188,24 +170,9 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "total training good images: 3291\n", - "total training bad images: 609\n", - "--\n", - "Total training images: 3900\n", - "total validation good images: 3361\n", - "total validation bad images: 601\n", - "--\n", - "Total validation images: 3962\n" - ] - } - ], + "outputs": [], "source": [ "print('total training good images:', num_train_good)\n", "print('total training bad images:', num_train_bad)\n", @@ -219,20 +186,9 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "--\n", - "Reduce training and validation set when testing\n", - "Reduced training images: 3900\n", - "Reduced validation images: 3962\n" - ] - } - ], + "outputs": [], "source": [ "print(\"--\")\n", "print(\"Reduce training and validation set when testing\")\n", @@ -244,18 +200,9 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found 3900 images belonging to 2 classes.\n", - "Found 3962 images belonging to 2 classes.\n" - ] - } - ], + "outputs": [], "source": [ "train_image_generator = ImageDataGenerator(\n", " rescale=1./255\n", @@ -276,7 +223,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -298,7 +245,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -313,7 +260,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -331,7 +278,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -340,7 +287,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -361,7 +308,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -370,7 +317,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -384,7 +331,7 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -393,7 +340,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -403,7 +350,7 @@ }, { "cell_type": "code", - "execution_count": 24, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -422,7 +369,7 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -432,7 +379,7 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -442,7 +389,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -452,7 +399,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -468,34 +415,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_workers = 6, communication_hint = RING\n", - "INFO:tensorflow:Collective batch_all_reduce: 1 all-reduces, num_workers = 6, communication_hint = RING\n", - "INFO:tensorflow:Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'worker': ['10.100.100.130:2222', 'ml1:2222', 'ml2:2222', 'ml3:2222', 'ml4:2222', 'ml5:2222']}, task_type = 'worker', task_id = 0, environment = None, rpc_layer = 'grpc'\n", - "WARNING:tensorflow:`eval_fn` is not passed in. The `worker_fn` will be used if an \"evaluator\" task exists in the cluster.\n", - "WARNING:tensorflow:`eval_strategy` is not passed in. No distribution strategy will be used for evaluation.\n", - "INFO:tensorflow:Using MirroredStrategy with devices ('/job:worker/task:0',)\n", - "INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'worker': ['10.100.100.130:2222', 'ml1:2222', 'ml2:2222', 'ml3:2222', 'ml4:2222', 'ml5:2222']}, task_type = 'worker', task_id = 0, num_workers = 6, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.RING\n", - "INFO:tensorflow:Using MirroredStrategy with devices ('/job:worker/task:0',)\n", - "INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'worker': ['10.100.100.130:2222', 'ml1:2222', 'ml2:2222', 'ml3:2222', 'ml4:2222', 'ml5:2222']}, task_type = 'worker', task_id = 0, num_workers = 6, local_devices = ('/job:worker/task:0',), communication = CollectiveCommunication.RING\n", - "WARNING:tensorflow:ModelCheckpoint callback is not provided. Workers will need to restart training if any fails.\n", - "WARNING:tensorflow:sample_weight modes were coerced from\n", - " ...\n", - " to \n", - " ['...']\n", - "WARNING:tensorflow:sample_weight modes were coerced from\n", - " ...\n", - " to \n", - " ['...']\n", - "Train for 121 steps, validate for 123 steps\n", - "Epoch 1/4\n" - ] - } - ], + "outputs": [], "source": [ "with strategy.scope():\n", " model = get_compiled_model()\n", From 68b2fc2730c4e6ed200355820fbd337c5c9c9c4f Mon Sep 17 00:00:00 2001 From: ml server Date: Sat, 18 Jan 2020 21:19:56 -0700 Subject: [PATCH 2/8] still meh fit() dist --- jupyter/wut-train-cluster-fn.ipynb | 62 ++++++++++++++++++++++++------ wut-train-cluster-fn.py | 31 +++------------ 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/jupyter/wut-train-cluster-fn.ipynb b/jupyter/wut-train-cluster-fn.ipynb index 2ad5653..7f3296a 100644 --- a/jupyter/wut-train-cluster-fn.ipynb +++ b/jupyter/wut-train-cluster-fn.ipynb @@ -117,6 +117,15 @@ "tf.keras.backend.clear_session()" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "options = tf.data.Options()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -210,17 +219,29 @@ "val_image_generator = ImageDataGenerator(\n", " rescale=1./255\n", ")\n", - "train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size,\n", + "#train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size,\n", + "train_data_gen = train_image_generator.flow_from_directory(batch_size=GLOBAL_BATCH_SIZE,\n", " directory=train_dir,\n", " shuffle=True,\n", " target_size=(IMG_HEIGHT, IMG_WIDTH),\n", " class_mode='binary')\n", - "val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size,\n", + "#val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size,\n", + "val_data_gen = val_image_generator.flow_from_directory(batch_size=GLOBAL_BATCH_SIZE,\n", " directory=val_dir,\n", " target_size=(IMG_HEIGHT, IMG_WIDTH),\n", " class_mode='binary')" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#train_dist_dataset = strategy.experimental_distribute_dataset()\n", + "#val_dist_dataset = strategy.experimental_distribute_dataset()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -264,16 +285,24 @@ "metadata": {}, "outputs": [], "source": [ - "#strategy.num_replicas_in_sync\n", + "strategy.num_replicas_in_sync" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "## Compute global batch size using number of replicas.\n", - "#BATCH_SIZE_PER_REPLICA = 5\n", - "#print(BATCH_SIZE_PER_REPLICA)\n", - "#global_batch_size = (BATCH_SIZE_PER_REPLICA *\n", - "# strategy.num_replicas_in_sync)\n", - "#print(global_batch_size)\n", - "#dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)\n", - "#dataset = dataset.batch(global_batch_size)\n", - "#LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}" + "BATCH_SIZE_PER_REPLICA = 5\n", + "print(BATCH_SIZE_PER_REPLICA)\n", + "global_batch_size = (BATCH_SIZE_PER_REPLICA *\n", + " strategy.num_replicas_in_sync)\n", + "print(global_batch_size)\n", + "dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)\n", + "dataset = dataset.batch(global_batch_size)\n", + "LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}" ] }, { @@ -338,6 +367,17 @@ "#model = get_compiled_model()" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a checkpoint directory to store the checkpoints.\n", + "checkpoint_dir = './training_checkpoints'\n", + "checkpoint_prefix = os.path.join(checkpoint_dir, \"ckpt\")" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/wut-train-cluster-fn.py b/wut-train-cluster-fn.py index bea3a56..db37826 100644 --- a/wut-train-cluster-fn.py +++ b/wut-train-cluster-fn.py @@ -41,9 +41,15 @@ IMG_WIDTH= 804 batch_size = 32 epochs = 4 +BUFFER_SIZE = 10000 +NUM_WORKERS = 6 +GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS + # XXX #tf.keras.backend.clear_session() +options = tf.data.Options() + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( tf.distribute.experimental.CollectiveCommunication.RING) @@ -112,31 +118,6 @@ def get_compiled_model(): metrics=['accuracy']) return model -#def get_fit_model(): -# model = get_compiled_model() -# model.fit( -# train_data_gen, -# steps_per_epoch=total_train // batch_size, -# epochs=epochs, -# validation_data=val_data_gen, -# validation_steps=total_val // batch_size, -# verbose=2 -# ) -# return model - -#with strategy.scope(): -# get_uncompiled_model() -#with strategy.scope(): -# get_compiled_model() -#with strategy.scope(): -# get_fit_model() - -#multi_worker_model = get_compiled_model() -#multi_worker_model.fit( -# x=train_data_gen, -# epochs=epochs, -# steps_per_epoch=total_train // batch_size -# ) with strategy.scope(): model = get_compiled_model() From b3273782b42fe53c0a8e201c54c44e46e282ae33 Mon Sep 17 00:00:00 2001 From: ml server Date: Sun, 19 Jan 2020 22:46:48 -0700 Subject: [PATCH 3/8] new python worker --- wut-worker-mas.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 wut-worker-mas.py diff --git a/wut-worker-mas.py b/wut-worker-mas.py new file mode 100644 index 0000000..066c83e --- /dev/null +++ b/wut-worker-mas.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python3 + + From 051346789faea3d9f750cd7e4530a70bf5262f86 Mon Sep 17 00:00:00 2001 From: ml server Date: Mon, 20 Jan 2020 09:33:46 -0700 Subject: [PATCH 4/8] mas --- wut-worker-mas.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/wut-worker-mas.py b/wut-worker-mas.py index 066c83e..9735a7e 100644 --- a/wut-worker-mas.py +++ b/wut-worker-mas.py @@ -1,3 +1,118 @@ #!/usr/bin/env python3 +# +# wut-worker-mas.py +# +# Distributed Learning +import tensorflow as tf +import json +import os + +os.environ["TF_CONFIG"] = json.dumps({ + "cluster": { + "worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ] + }, + "task": {"type": "worker", "index": 0 }, +}) + +def get_bytes_and_label(filepath): + raw_bytes = tf.io.read_file(filepath) + label = tf.strings.regex_full_match( + POSITIVE_DIRECTORY, pos_dir + ".+") + return raw_bytes, label + +AUTOTUNE = tf.data.experimental.AUTOTUNE +NUM_TOTAL_IMAGES=100 +data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data" +profile_dir = os.path.join(data_root, "profiles") +dataset = tf.data.Dataset.list_files(data_root) +dataset = dataset.shuffle(NUM_TOTAL_IMAGES) +dataset = dataset.map(get_bytes_and_label, num_parallel_calls=AUTOTUNE) +dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE) +dataset = dataset.batch(batch_size=32) +dataset = dataset.prefetch(buffer_size=AUTOTUNE) + +print(tf.__version__) +print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) +with tf.device("GPU:0"): + tf.ones(()) # Make sure we can run on GPU + +# This ensures that XLA and ptxas work well together, and helps with scaling. +print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) + +os.makedirs(profile_dir, exist_ok=True) + +barf() + +# tf.data.Dataset.from_generator + +tf.config.optimizer.set_jit(True) + +tf.summary.trace_on(profiler=True) + +strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() + +with strategy.scope(): +# model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) + optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) + loss_fn = tf.nn.sigmoid_cross_entropy_with_logits + model.compile(..., optimezer=optimizer) + model.fit(train_dataset, epochs=10) + +tf.summary.trace_export(name=trace-export,profiler_outdir=logs) + +strategy = tf.distribute.MirroredStrategy() +with strategy.scope(): + model, loss_fn, optimzer = ... + + @tf.function + def replicated_step(features, labels): + return strategy.experimental_run_v2(step, (features, labels)) + with tf.GradientTape() as tape: + logits = model(features, training=True) + loss = tf.nn.compute_average_loss( + loss, global_batch_size=global_batch_size) + + grads = tape.gradient(loss, model.trainable_variables) + optimizer.apply_gradients(zip(grads, model.trainable_variables)) + return loss + + data = strategey.experimental_distribute_dataset(data) + + for features, labels in data: + loss = replicated_step(features, labels) + +def data_generator(): + batch = [] + shuffle(data) + for image_path, label in data: + # Load from disk + image = imread(image_path) + # Resize + image = resize(image, resolution) + # Horizontal and vertical flip + #image = random_flip(image) + # Normalize and add Gaussian noise + #image = normalize_and_add_noise(image) + batch.append((image, label)) + handle_batching + +# XXX ? +def handle_batching(): + if len(batch) == batch_size: + yield concat(batch) + batch.reset() + +def process_image(image_bytes, label): + image = tf.io.decode_png(image_bytes) + image = tf.image.resize(image, resolution) + image.set_shape(input_shape) + image = image / 255. - 0.5 + + #image = tf.image.random_flip_left_right(image) + #image = tf.image.random_flip_up_down(image) + #image += tf.random.normal( + # image.shape, mean=0, steddev=0.1) + + return image, tf.cast(label, tf.float32) From c6aa78fa0eb254152b786d42033dc680cd614956 Mon Sep 17 00:00:00 2001 From: ml server Date: Mon, 20 Jan 2020 10:08:37 -0700 Subject: [PATCH 5/8] setup wut-worker-mas --- wut-worker-mas.py | 164 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 137 insertions(+), 27 deletions(-) diff --git a/wut-worker-mas.py b/wut-worker-mas.py index 9735a7e..45d0dda 100644 --- a/wut-worker-mas.py +++ b/wut-worker-mas.py @@ -2,11 +2,30 @@ # # wut-worker-mas.py # +# https://spacecruft.org/spacecruft/satnogs-wut +# # Distributed Learning -import tensorflow as tf -import json +from __future__ import absolute_import, division, print_function, unicode_literals +from __future__ import print_function import os +import json +import numpy as np +import datetime +import tensorflow as tf +import tensorflow.python.keras +from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D +from tensorflow.python.keras import optimizers +from tensorflow.python.keras import Sequential +from tensorflow.python.keras.layers import Activation, Dropout, Flatten, Dense +from tensorflow.python.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D +from tensorflow.python.keras.layers import Input, concatenate +from tensorflow.python.keras.models import load_model +from tensorflow.python.keras.models import Model +from tensorflow.python.keras.preprocessing import image +from tensorflow.python.keras.preprocessing.image import img_to_array +from tensorflow.python.keras.preprocessing.image import ImageDataGenerator +from tensorflow.python.keras.preprocessing.image import load_img os.environ["TF_CONFIG"] = json.dumps({ "cluster": { @@ -15,12 +34,49 @@ os.environ["TF_CONFIG"] = json.dumps({ "task": {"type": "worker", "index": 0 }, }) +IMG_HEIGHT = 416 +IMG_WIDTH= 804 +batch_size = 32 +epochs = 4 + +BUFFER_SIZE = 10000 +NUM_WORKERS = 6 +GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS + +POSITIVE_DIRECTORY = '/home/jebba/devel/spacecruft/satnogs-wut/data/pos' +pos_dir = '/home/jebba/devel/spacecruft/satnogs-wut/data/posdir' def get_bytes_and_label(filepath): raw_bytes = tf.io.read_file(filepath) label = tf.strings.regex_full_match( POSITIVE_DIRECTORY, pos_dir + ".+") return raw_bytes, label +def uncompiled_model(): + model = Sequential([ + Conv2D(16, 3, padding='same', activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH ,3)), + MaxPooling2D(), + Conv2D(32, 3, padding='same', activation='relu'), + MaxPooling2D(), + Conv2D(64, 3, padding='same', activation='relu'), + MaxPooling2D(), + Flatten(), + Dense(512, activation='relu'), + Dense(1, activation='sigmoid') + ]) + return model + +input_shape=(IMG_HEIGHT, IMG_WIDTH ,3) +def process_image(image_bytes, label): + image = tf.io.decode_png(image_bytes) + #image = tf.image.resize(image, resolution) + image.set_shape(input_shape) + #image = image / 255. - 0.5 + #image = tf.image.random_flip_left_right(image) + #image = tf.image.random_flip_up_down(image) + #image += tf.random.normal( + # image.shape, mean=0, steddev=0.1) + return image, tf.cast(label, tf.float32) + AUTOTUNE = tf.data.experimental.AUTOTUNE NUM_TOTAL_IMAGES=100 data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data" @@ -34,37 +90,53 @@ dataset = dataset.prefetch(buffer_size=AUTOTUNE) print(tf.__version__) print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) -with tf.device("GPU:0"): - tf.ones(()) # Make sure we can run on GPU +print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) +#with tf.device("GPU:0"): +# tf.ones(()) # Make sure we can run on GPU # This ensures that XLA and ptxas work well together, and helps with scaling. print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) os.makedirs(profile_dir, exist_ok=True) -barf() - # tf.data.Dataset.from_generator tf.config.optimizer.set_jit(True) tf.summary.trace_on(profiler=True) -strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() +strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + tf.distribute.experimental.CollectiveCommunication.RING) + + +def compiled_model(): + model = uncompiled_model() + model.compile(optimizer='adam', + loss='binary_crossentropy', + metrics=['accuracy']) + return model with strategy.scope(): -# model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) - optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) - loss_fn = tf.nn.sigmoid_cross_entropy_with_logits - model.compile(..., optimezer=optimizer) - model.fit(train_dataset, epochs=10) + #model = tf.keras.applications.mobilenet_v2.MobileNetV2(...) + #optimizer = tf.keras.optimzers.SGD(learning_rate=0.01) + #loss_fn = tf.nn.sigmoid_cross_entropy_with_logits + #model.compile(..., optimizer=optimizer) + model = uncompiled_model() + model = compiled_model() + #model.fit(train_dataset, epochs=10) + model.fit( + train_data_gen, + steps_per_epoch=total_train // batch_size, + epochs=epochs, + validation_data=val_data_gen, + validation_steps=total_val // batch_size, + verbose=2 + ) tf.summary.trace_export(name=trace-export,profiler_outdir=logs) -strategy = tf.distribute.MirroredStrategy() with strategy.scope(): - model, loss_fn, optimzer = ... - + #model, loss_fn, optimzer = ... @tf.function def replicated_step(features, labels): return strategy.experimental_run_v2(step, (features, labels)) @@ -77,7 +149,7 @@ with strategy.scope(): optimizer.apply_gradients(zip(grads, model.trainable_variables)) return loss - data = strategey.experimental_distribute_dataset(data) + data = strategy.experimental_distribute_dataset(data) for features, labels in data: loss = replicated_step(features, labels) @@ -89,7 +161,7 @@ def data_generator(): # Load from disk image = imread(image_path) # Resize - image = resize(image, resolution) + # image = resize(image, resolution) # Horizontal and vertical flip #image = random_flip(image) # Normalize and add Gaussian noise @@ -103,16 +175,54 @@ def handle_batching(): yield concat(batch) batch.reset() -def process_image(image_bytes, label): - image = tf.io.decode_png(image_bytes) - image = tf.image.resize(image, resolution) - image.set_shape(input_shape) - image = image / 255. - 0.5 +train_dir = os.path.join('data/', 'train') +val_dir = os.path.join('data/', 'val') +train_good_dir = os.path.join(train_dir, 'good') +train_bad_dir = os.path.join(train_dir, 'bad') +val_good_dir = os.path.join(val_dir, 'good') +val_bad_dir = os.path.join(val_dir, 'bad') +num_train_good = len(os.listdir(train_good_dir)) +num_train_bad = len(os.listdir(train_bad_dir)) +num_val_good = len(os.listdir(val_good_dir)) +num_val_bad = len(os.listdir(val_bad_dir)) +total_train = num_train_good + num_train_bad +total_val = num_val_good + num_val_bad - #image = tf.image.random_flip_left_right(image) - #image = tf.image.random_flip_up_down(image) - #image += tf.random.normal( - # image.shape, mean=0, steddev=0.1) +print('total training good images:', num_train_good) +print('total training bad images:', num_train_bad) +print("--") +print("Total training images:", total_train) +print('total validation good images:', num_val_good) +print('total validation bad images:', num_val_bad) +print("--") +print("Total validation images:", total_val) +print("--") +print("Reduce training and validation set when testing") +#total_train = 16 +#total_val = 16 +print("Reduced training images:", total_train) +print("Reduced validation images:", total_val) - return image, tf.cast(label, tf.float32) + +tf.keras.backend.clear_session() + +options = tf.data.Options() + + +train_image_generator = ImageDataGenerator( + rescale=1./255 +) +val_image_generator = ImageDataGenerator( + rescale=1./255 +) + +#train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size, +# directory=train_dir, +# shuffle=True, +# target_size=(IMG_HEIGHT, IMG_WIDTH), +# class_mode='binary') +#val_data_gen = val_image_generator.flow_from_directory(batch_size=batch_size, +# directory=val_dir, +# target_size=(IMG_HEIGHT, IMG_WIDTH), +# class_mode='binary') From ac33fbe4fff36583343da43c0923289d8da78c3a Mon Sep 17 00:00:00 2001 From: ml server Date: Mon, 20 Jan 2020 10:12:12 -0700 Subject: [PATCH 6/8] wut-worker-mas, sorta --- wut-worker-mas | 24 ++++++++++++++++++++++++ wut-worker-mas.py | 9 ++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) create mode 100755 wut-worker-mas diff --git a/wut-worker-mas b/wut-worker-mas new file mode 100755 index 0000000..c904033 --- /dev/null +++ b/wut-worker-mas @@ -0,0 +1,24 @@ +#!/bin/bash +# wut-worker-mas +# +# Starts worker client. +# +# Usage: +# wut-worker-mas +# Example: +# wut-worker-mas +# +# Note: +# Each node needs a unique index number. +# +# NOTE! +# This generates the node number based off the hostname. +# The hosts are ml0 through ml5. + +HOSTNUM=`hostname | sed -e 's/ml//g'` + +export TF_CONFIG='{"cluster": {"worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}, "task": {"index": '$HOSTNUM', "type": "worker"}}' + +echo $TF_CONFIG +python3 wut-worker-mas.py + diff --git a/wut-worker-mas.py b/wut-worker-mas.py index 45d0dda..334d9c9 100644 --- a/wut-worker-mas.py +++ b/wut-worker-mas.py @@ -43,8 +43,13 @@ BUFFER_SIZE = 10000 NUM_WORKERS = 6 GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS +# XXX POSITIVE_DIRECTORY = '/home/jebba/devel/spacecruft/satnogs-wut/data/pos' pos_dir = '/home/jebba/devel/spacecruft/satnogs-wut/data/posdir' + +strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + tf.distribute.experimental.CollectiveCommunication.RING) + def get_bytes_and_label(filepath): raw_bytes = tf.io.read_file(filepath) label = tf.strings.regex_full_match( @@ -88,7 +93,7 @@ dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE) dataset = dataset.batch(batch_size=32) dataset = dataset.prefetch(buffer_size=AUTOTUNE) -print(tf.__version__) +print("Tensorflow Version: ", tf.__version__) print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) #with tf.device("GPU:0"): @@ -105,8 +110,6 @@ tf.config.optimizer.set_jit(True) tf.summary.trace_on(profiler=True) -strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( - tf.distribute.experimental.CollectiveCommunication.RING) def compiled_model(): From aaa0091db8b230f65e3d32157c1bf5b430d7af57 Mon Sep 17 00:00:00 2001 From: ml server Date: Mon, 20 Jan 2020 12:26:00 -0700 Subject: [PATCH 7/8] wtf scripts to check tensorflow setup --- wut-tf | 25 +++++++++++++++++++++++ wut-tf.py | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100755 wut-tf create mode 100644 wut-tf.py diff --git a/wut-tf b/wut-tf new file mode 100755 index 0000000..bf8c5f6 --- /dev/null +++ b/wut-tf @@ -0,0 +1,25 @@ +#!/bin/bash +# wut-tf +# +# Starts worker client. +# +# Usage: +# wut-tf +# Example: +# wut-tf +# +# Note: +# Each node needs a unique index number. +# +# NOTE! +# This generates the node number based off the hostname. +# The hosts are ml0 through ml5. + +HOSTNUM=`hostname | sed -e 's/ml//g'` + +#export TF_CONFIG='{"cluster": {"worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}, "task": {"index": '$HOSTNUM', "type": "worker"}}' +export TF_CONFIG='{"cluster": {"worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}}' + +echo $TF_CONFIG +python3 wut-tf.py + diff --git a/wut-tf.py b/wut-tf.py new file mode 100644 index 0000000..0e3b0de --- /dev/null +++ b/wut-tf.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +# +# wut-tf.py +# +# https://spacecruft.org/spacecruft/satnogs-wut +# +# Distributed Learning + +from __future__ import absolute_import, division, print_function, unicode_literals +from __future__ import print_function +import os +import json +import numpy as np +import datetime +import tensorflow as tf +import tensorflow.python.keras +from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D +from tensorflow.python.keras import optimizers +from tensorflow.python.keras import Sequential +from tensorflow.python.keras.layers import Activation, Dropout, Flatten, Dense +from tensorflow.python.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D +from tensorflow.python.keras.layers import Input, concatenate +from tensorflow.python.keras.models import load_model +from tensorflow.python.keras.models import Model +from tensorflow.python.keras.preprocessing import image +from tensorflow.python.keras.preprocessing.image import img_to_array +from tensorflow.python.keras.preprocessing.image import ImageDataGenerator +from tensorflow.python.keras.preprocessing.image import load_img +os.environ["TF_CONFIG"] = json.dumps({ + "cluster": { + "worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ] + }#, + #"task": {"type": "worker", "index": 0 }, +}) +print("Tensorflow Version: ", tf.__version__) +print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) +print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) +print(tf.config.experimental.list_physical_devices()) +#with tf.device("GPU:0"): +# tf.ones(()) # Make sure we can run on GPU +print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) +print(os.getenv("XLA_FLAGS")) +tf.keras.backend.clear_session() +IMG_HEIGHT = 416 +IMG_WIDTH= 804 +batch_size = 32 +epochs = 4 +BUFFER_SIZE = 10000 +NUM_WORKERS = 6 +GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS +#strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() +#strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( +# tf.distribute.experimental.CollectiveCommunication.RING) +AUTOTUNE = tf.data.experimental.AUTOTUNE +NUM_TOTAL_IMAGES=100 +tf.config.optimizer.set_jit(True) +#tf.summary.trace_on(profiler=True) +#tf.summary.trace_export(name=trace-export,profiler_outdir=logs) +options = tf.data.Options() + From eaf2785986c35c63748f4a4e366053e295bbdece Mon Sep 17 00:00:00 2001 From: ml server Date: Mon, 20 Jan 2020 13:01:58 -0700 Subject: [PATCH 8/8] mas cruft --- wut-worker-mas | 4 +++- wut-worker-mas.py | 57 ++++++++++++++++++++++++++++------------------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/wut-worker-mas b/wut-worker-mas index c904033..b0852e1 100755 --- a/wut-worker-mas +++ b/wut-worker-mas @@ -17,7 +17,9 @@ HOSTNUM=`hostname | sed -e 's/ml//g'` -export TF_CONFIG='{"cluster": {"worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}, "task": {"index": '$HOSTNUM', "type": "worker"}}' +#export TF_CONFIG='{"cluster": {"worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}, "task": {"index": '$HOSTNUM', "type": "worker"}}' +#export TF_CONFIG='{"cluster": {"worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}}' +export TF_CONFIG='{"cluster": {"chief": [ "ml0-int:2222" ], "worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222"]}, "task": {"index": '$HOSTNUM', "type": "worker"}}' echo $TF_CONFIG python3 wut-worker-mas.py diff --git a/wut-worker-mas.py b/wut-worker-mas.py index 334d9c9..2026d12 100644 --- a/wut-worker-mas.py +++ b/wut-worker-mas.py @@ -26,13 +26,34 @@ from tensorflow.python.keras.preprocessing import image from tensorflow.python.keras.preprocessing.image import img_to_array from tensorflow.python.keras.preprocessing.image import ImageDataGenerator from tensorflow.python.keras.preprocessing.image import load_img +#import tensorflow.python.distribute.cluster_resolver +#from tensorflow.python.distribute.cluster_resolver import TFConfigClusterResolver +#from tensorflow.python.distribute.cluster_resolver.TFConfigClusterResolver +tf.keras.backend.clear_session() +options = tf.data.Options() os.environ["TF_CONFIG"] = json.dumps({ "cluster": { - "worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ] + "chief": [ "ml0-int:2222" ], + "worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ] }, - "task": {"type": "worker", "index": 0 }, + "task": {"type": "chief", "index": 0 }, }) +#os.environ["TF_CONFIG"] = json.dumps({ +# "cluster": { +# "worker": [ "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ] +# }#, +# #"task": {"type": "worker", "index": 0 }, +#}) + +print("Tensorflow Version: ", tf.__version__) +print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) +print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) +#with tf.device("GPU:0"): +# tf.ones(()) # Make sure we can run on GPU + +# This ensures that XLA and ptxas work well together, and helps with scaling. +print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) IMG_HEIGHT = 416 IMG_WIDTH= 804 @@ -47,9 +68,12 @@ GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS POSITIVE_DIRECTORY = '/home/jebba/devel/spacecruft/satnogs-wut/data/pos' pos_dir = '/home/jebba/devel/spacecruft/satnogs-wut/data/posdir' +from tensorflow.python.distribute.cluster_resolver import SimpleClusterResolver + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( tf.distribute.experimental.CollectiveCommunication.RING) + def get_bytes_and_label(filepath): raw_bytes = tf.io.read_file(filepath) label = tf.strings.regex_full_match( @@ -93,14 +117,6 @@ dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE) dataset = dataset.batch(batch_size=32) dataset = dataset.prefetch(buffer_size=AUTOTUNE) -print("Tensorflow Version: ", tf.__version__) -print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU'))) -print("Num CPUs Available: ", len(tf.config.experimental.list_physical_devices('CPU'))) -#with tf.device("GPU:0"): -# tf.ones(()) # Make sure we can run on GPU - -# This ensures that XLA and ptxas work well together, and helps with scaling. -print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS"))) os.makedirs(profile_dir, exist_ok=True) @@ -108,7 +124,7 @@ os.makedirs(profile_dir, exist_ok=True) tf.config.optimizer.set_jit(True) -tf.summary.trace_on(profiler=True) +#tf.summary.trace_on(profiler=True) @@ -136,7 +152,7 @@ with strategy.scope(): verbose=2 ) -tf.summary.trace_export(name=trace-export,profiler_outdir=logs) +#tf.summary.trace_export(name=trace-export,profiler_outdir=logs) with strategy.scope(): #model, loss_fn, optimzer = ... @@ -207,17 +223,12 @@ print("Reduced training images:", total_train) print("Reduced validation images:", total_val) -tf.keras.backend.clear_session() - -options = tf.data.Options() - - -train_image_generator = ImageDataGenerator( - rescale=1./255 -) -val_image_generator = ImageDataGenerator( - rescale=1./255 -) +#train_image_generator = ImageDataGenerator( +# rescale=1./255 +#) +#val_image_generator = ImageDataGenerator( +# rescale=1./255 +#) #train_data_gen = train_image_generator.flow_from_directory(batch_size=batch_size, # directory=train_dir,