2020-01-19 22:46:48 -07:00
|
|
|
#!/usr/bin/env python3
|
2020-01-20 09:33:46 -07:00
|
|
|
#
|
|
|
|
# wut-worker-mas.py
|
|
|
|
#
|
|
|
|
# Distributed Learning
|
2020-01-19 22:46:48 -07:00
|
|
|
|
2020-01-20 09:33:46 -07:00
|
|
|
import tensorflow as tf
|
|
|
|
import json
|
|
|
|
import os
|
|
|
|
|
|
|
|
os.environ["TF_CONFIG"] = json.dumps({
|
|
|
|
"cluster": {
|
|
|
|
"worker": [ "ml0-int:2222", "ml1-int:2222", "ml2-int:2222", "ml3-int:2222", "ml4-int:2222", "ml5-int:2222" ]
|
|
|
|
},
|
|
|
|
"task": {"type": "worker", "index": 0 },
|
|
|
|
})
|
|
|
|
|
|
|
|
def get_bytes_and_label(filepath):
|
|
|
|
raw_bytes = tf.io.read_file(filepath)
|
|
|
|
label = tf.strings.regex_full_match(
|
|
|
|
POSITIVE_DIRECTORY, pos_dir + ".+")
|
|
|
|
return raw_bytes, label
|
|
|
|
|
|
|
|
AUTOTUNE = tf.data.experimental.AUTOTUNE
|
|
|
|
NUM_TOTAL_IMAGES=100
|
|
|
|
data_root = "/home/jebba/devel/spacecruft/satnogs-wut/data"
|
|
|
|
profile_dir = os.path.join(data_root, "profiles")
|
|
|
|
dataset = tf.data.Dataset.list_files(data_root)
|
|
|
|
dataset = dataset.shuffle(NUM_TOTAL_IMAGES)
|
|
|
|
dataset = dataset.map(get_bytes_and_label, num_parallel_calls=AUTOTUNE)
|
|
|
|
dataset = dataset.map(process_image, num_parallel_calls=AUTOTUNE)
|
|
|
|
dataset = dataset.batch(batch_size=32)
|
|
|
|
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
|
|
|
|
|
|
|
|
print(tf.__version__)
|
|
|
|
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
|
|
|
|
with tf.device("GPU:0"):
|
|
|
|
tf.ones(()) # Make sure we can run on GPU
|
|
|
|
|
|
|
|
# This ensures that XLA and ptxas work well together, and helps with scaling.
|
|
|
|
print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS")))
|
|
|
|
|
|
|
|
os.makedirs(profile_dir, exist_ok=True)
|
|
|
|
|
|
|
|
barf()
|
|
|
|
|
|
|
|
# tf.data.Dataset.from_generator
|
|
|
|
|
|
|
|
tf.config.optimizer.set_jit(True)
|
|
|
|
|
|
|
|
tf.summary.trace_on(profiler=True)
|
|
|
|
|
|
|
|
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
|
|
|
|
|
|
|
|
with strategy.scope():
|
|
|
|
# model = tf.keras.applications.mobilenet_v2.MobileNetV2(...)
|
|
|
|
optimizer = tf.keras.optimzers.SGD(learning_rate=0.01)
|
|
|
|
loss_fn = tf.nn.sigmoid_cross_entropy_with_logits
|
|
|
|
model.compile(..., optimezer=optimizer)
|
|
|
|
model.fit(train_dataset, epochs=10)
|
|
|
|
|
|
|
|
tf.summary.trace_export(name=trace-export,profiler_outdir=logs)
|
|
|
|
|
|
|
|
strategy = tf.distribute.MirroredStrategy()
|
|
|
|
with strategy.scope():
|
|
|
|
model, loss_fn, optimzer = ...
|
|
|
|
|
|
|
|
@tf.function
|
|
|
|
def replicated_step(features, labels):
|
|
|
|
return strategy.experimental_run_v2(step, (features, labels))
|
|
|
|
with tf.GradientTape() as tape:
|
|
|
|
logits = model(features, training=True)
|
|
|
|
loss = tf.nn.compute_average_loss(
|
|
|
|
loss, global_batch_size=global_batch_size)
|
|
|
|
|
|
|
|
grads = tape.gradient(loss, model.trainable_variables)
|
|
|
|
optimizer.apply_gradients(zip(grads, model.trainable_variables))
|
|
|
|
return loss
|
|
|
|
|
|
|
|
data = strategey.experimental_distribute_dataset(data)
|
|
|
|
|
|
|
|
for features, labels in data:
|
|
|
|
loss = replicated_step(features, labels)
|
|
|
|
|
|
|
|
def data_generator():
|
|
|
|
batch = []
|
|
|
|
shuffle(data)
|
|
|
|
for image_path, label in data:
|
|
|
|
# Load from disk
|
|
|
|
image = imread(image_path)
|
|
|
|
# Resize
|
|
|
|
image = resize(image, resolution)
|
|
|
|
# Horizontal and vertical flip
|
|
|
|
#image = random_flip(image)
|
|
|
|
# Normalize and add Gaussian noise
|
|
|
|
#image = normalize_and_add_noise(image)
|
|
|
|
batch.append((image, label))
|
|
|
|
handle_batching
|
|
|
|
|
|
|
|
# XXX ?
|
|
|
|
def handle_batching():
|
|
|
|
if len(batch) == batch_size:
|
|
|
|
yield concat(batch)
|
|
|
|
batch.reset()
|
|
|
|
|
|
|
|
def process_image(image_bytes, label):
|
|
|
|
image = tf.io.decode_png(image_bytes)
|
|
|
|
image = tf.image.resize(image, resolution)
|
|
|
|
image.set_shape(input_shape)
|
|
|
|
image = image / 255. - 0.5
|
|
|
|
|
|
|
|
#image = tf.image.random_flip_left_right(image)
|
|
|
|
#image = tf.image.random_flip_up_down(image)
|
|
|
|
#image += tf.random.normal(
|
|
|
|
# image.shape, mean=0, steddev=0.1)
|
|
|
|
|
|
|
|
return image, tf.cast(label, tf.float32)
|
2020-01-19 22:46:48 -07:00
|
|
|
|