satnogs-wut/wut-ml-auto

279 lines
7.8 KiB
Python
Executable File

#!/usr/bin/python3
# wut-ml
#
# Vet a SatNOGS image using machine learning (guessing).
# It will vet the image located at test/unvetted/waterfall.png.
#
# Note, there is an issue to fix where it will vet everything
# under the data/test directory, so fix that. For now, just delete
# everything else. :)
#
# Usage:
# wut-ml
# Example:
# wut-ml
import os
import numpy as np
import tensorflow.python.keras
from tensorflow.python.keras import Sequential
from tensorflow.python.keras.layers import Activation, Dropout, Flatten, Dense
from tensorflow.python.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.keras.layers import Convolution2D, MaxPooling2D, ZeroPadding2D
from tensorflow.python.keras import optimizers
from tensorflow.python.keras.preprocessing import image
from tensorflow.python.keras.models import load_model
from tensorflow.python.keras.preprocessing.image import load_img
from tensorflow.python.keras.preprocessing.image import img_to_array
# XXX
from tensorflow.python.keras.models import Model
from tensorflow.python.keras.layers import Input, concatenate
#from tensorflow.python.keras.optimizers import Adam
# XXX Plot
from tensorflow.python.keras.utils import plot_model
from tensorflow.python.keras.callbacks import ModelCheckpoint
## for visualizing
import matplotlib.pyplot as plt, numpy as np
from sklearn.decomposition import PCA
# https://keras.io/preprocessing/image/
# TODO:
# * Pre-process image
print("datagen")
datagen = ImageDataGenerator(
featurewise_center=False,
samplewise_center=False,
featurewise_std_normalization=False,
samplewise_std_normalization=False,
zca_whitening=False,
zca_epsilon=1e-06,
rescale=1./255,
shear_range=0.0,
zoom_range=0.0,
rotation_range=0,
width_shift_range=0.0,
height_shift_range=0.0,
brightness_range=None,
channel_shift_range=0.0,
fill_mode='nearest',
cval=0.0,
horizontal_flip=False,
vertical_flip=False,
preprocessing_function=None,
data_format='channels_last',
validation_split=0.0,
dtype='float32')
print("datagen.flow")
train_it = datagen.flow_from_directory('/srv/satnogs/data/train/', class_mode='binary')
val_it = datagen.flow_from_directory('/srv/satnogs/data/val/', class_mode='binary')
test_it = datagen.flow_from_directory('/srv/satnogs/data/test/', class_mode='binary')
print("train_it.next()")
#batchX, batchy = train_it.next()
#print('Batch shape=%s, min=%.3f, max=%.3f' % (batchX.shape, batchX.min(), batchX.max()))
trainX, trainY = train_it.next()
print('Batch shape=%s, min=%.3f, max=%.3f' % (trainX.shape, trainX.min(), trainX.max()))
valX, valY = val_it.next()
print('Batch shape=%s, min=%.3f, max=%.3f' % (valX.shape, valX.min(), valX.max()))
testX, testY = test_it.next()
print('Batch shape=%s, min=%.3f, max=%.3f' % (testX.shape, testX.min(), testX.max()))
print("input shape")
input_shape=trainX.shape[1:]
print(input_shape)
print("autoencoder")
# this is the size of our encoded representations
encoding_dim = 32 # 32 floats -> compression of factor 24.5, assuming the input is 784 floats
# this is our input placeholder
#input_img = Input(shape=(784,))
input_img = Input(shape=(196608,))
# "encoded" is the encoded representation of the input
encoded = Dense(encoding_dim, activation='relu')(input_img)
# "decoded" is the lossy reconstruction of the input
decoded = Dense(196608, activation='sigmoid')(encoded)
#decoded = Dense(784, activation='sigmoid')(encoded)
# this model maps an input to its reconstruction
autoencoder = Model(input_img, decoded)
# this model maps an input to its encoded representation
encoder = Model(input_img, encoded)
# create a placeholder for an encoded (32-dimensional) input
encoded_input = Input(shape=(encoding_dim,))
# retrieve the last layer of the autoencoder model
decoder_layer = autoencoder.layers[-1]
# create the decoder model
decoder = Model(encoded_input, decoder_layer(encoded_input))
autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy')
trainX = trainX.astype('float32') / 255.
valX = valX.astype('float32') / 255.
trainX = trainX.reshape((len(trainX), np.prod(trainX.shape[1:])))
valX = valX.reshape((len(valX), np.prod(valX.shape[1:])))
print("trainX.shape")
print(trainX.shape)
print("valX.shape")
print(valX.shape)
#batch_size=256,
autoencoder.fit(trainX, trainX,
epochs=50,
shuffle=True,
validation_data=(valX, valX))
encoded_imgs = encoder.predict(trainX)
decoded_imgs = decoder.predict(encoded_imgs)
# use Matplotlib (don't ask)
import matplotlib.pyplot as plt
n = 10 # how many digits we will display
plt.figure(figsize=(20, 4))
for i in range(n):
# display original
ax = plt.subplot(2, n, i + 1)
plt.imshow(testX[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
ax = plt.subplot(2, n, i + 1 + n)
plt.imshow(decoded_imgs[i].reshape(28, 28))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
#img_width=823
#img_height=1606
img_width=256
img_height=256
print("Height", img_height, "Width", img_width)
# https://keras.io/models/sequential/
# https://keras.io/getting-started/sequential-model-guide/
print("Sequential")
model = Sequential()
print("add")
# Other data to consider adding:
# * JSON metadata
# * TLE
# * Audio File (ogg)
# * Decoded Data (HEX, ASCII, PNG)
# Data from external sources to consider adding:
# * Weather
print("convolution 2 deeeee")
# https://keras.io/layers/convolutional/
#model.add(Convolution2D(32, 3, 3, input_shape=trainX.shape[1:]))
model.add(Convolution2D(32, 3, 3, input_shape=(255,255,3)))
# https://keras.io/activations/
print("Activation relu")
model.add(Activation('relu'))
# https://keras.io/layers/pooling/
print("Pooling")
model.add(MaxPooling2D(pool_size=(2, 2)))
print("Convolution2D")
model.add(Convolution2D(32, 3, 3))
print("Activation relu")
model.add(Activation('relu'))
print("Pooling")
model.add(MaxPooling2D(pool_size=(2, 2)))
print("Convolution2D")
model.add(Convolution2D(64, 3, 3))
print("Activation relu")
model.add(Activation('relu'))
print("Pooling")
model.add(MaxPooling2D(pool_size=(2, 2)))
# https://keras.io/layers/core/
print("Flatten")
model.add(Flatten())
# https://keras.io/layers/core/
print("Dense")
model.add(Dense(64))
print("Activation relu")
model.add(Activation('relu'))
# https://keras.io/layers/core/
print("Dropout")
model.add(Dropout(0.1))
print("Dense")
model.add(Dense(1))
print("Activation softmax")
model.add(Activation('softmax'))
# https://keras.io/models/sequential/
print("compile")
model.compile(
loss='categorical_crossentropy',
loss_weights=None,
sample_weight_mode=None,
weighted_metrics=None,
target_tensors=None,
optimizer='rmsprop',
metrics=['accuracy'])
# https://keras.io/models/sequential/
print("fit")
model.fit(
x=train_it,
y=None,
batch_size=None,
epochs=1,
verbose=2,
callbacks=None,
validation_split=0.0,
validation_data=val_it,
shuffle=True,
class_weight=None,
sample_weight=None,
initial_epoch=0,
steps_per_epoch=None,
validation_steps=None,
validation_freq=1,
max_queue_size=10,
workers=16,
use_multiprocessing=True)
# https://keras.io/models/sequential/
# evaluate(x=None, y=None, batch_size=None, verbose=1, sample_weight=None, steps=None, callbacks=None, max_queue_size=10, workers=1, use_multiprocessing=False)
# TODO:
# * Generate output to visualize training/validating/testing.
# Plot, fail
#print("plot")
#plot_model(test_it, to_file='data/wut-plot.png', show_shapes=True, show_layer_names=True)
# https://keras.io/models/sequential/
print("predict")
prediction = model.predict(
x=test_it,
batch_size=None,
verbose=2,
steps=None,
callbacks=None,
max_queue_size=10,
workers=16,
use_multiprocessing=True)
print(prediction)
if prediction[0][0] == 1:
rating = 'bad'
else:
rating = 'good'
print('Observation: %s' % (rating))