Trying to upgrade this awesome implementation of gumble-softmax-vae found here. However, I keep getting
TypeError: Cannot convert a symbolic Keras input/output to a numpy array.
I am stumped - tried many many things. Interestingly some searches return with other implementation of VAEs. I believe the error is somewhere in the "KL" term calculation of the loss.
Here is the almost working code:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
batch_size = 10
data_dim = 784
M = 10 # classes
N = 30 # how many distributions
nb_epoch = 100
epsilon_std = 0.01
anneal_rate = 0.0003
min_temperature = 0.5
tau = tf.Variable(5.0, dtype=tf.float32)
class Sampling(keras.layers.Layer):
def call(self, logits_y):
u = tf.random.uniform(tf.shape(logits_y), 0, 1)
y = logits_y - tf.math.log(
-tf.math.log(u + 1e-20) + 1e-20
) # logits + gumbel noise
y = tf.nn.softmax(tf.reshape(y, (-1, N, M)) / tau)
y = tf.reshape(y, (-1, N * M))
return y
encoder_inputs = keras.Input(shape=(data_dim))
x = keras.layers.Dense(512, activation="relu")(encoder_inputs)
x = keras.layers.Dense(256, activation="relu")(x)
logits_y = keras.layers.Dense(M * N, name="logits_y")(x)
z = Sampling()(logits_y)
encoder = keras.Model(encoder_inputs, z, name="encoder")
encoder.build(encoder_inputs)
print(encoder.summary())
decoder_inputs = keras.Input(shape=(N * M))
x = keras.layers.Dense(256, activation="relu")(decoder_inputs)
x = keras.layers.Dense(512, activation="relu")(x)
decoder_outputs = keras.layers.Dense(data_dim, activation="sigmoid")(x)
decoder = keras.Model(decoder_inputs, decoder_outputs, name="decoder")
decoder.build(decoder_inputs)
print(decoder.summary())
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super(VAE, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.bce = tf.keras.losses.BinaryCrossentropy()
self.loss_tracker = keras.metrics.Mean(name="loss")
@property
def metrics(self):
return [self.loss_tracker]
def call(self, x):
z = self.encoder(x)
x_hat = self.decoder(z)
return x_hat
@tf.function
def gumbel_loss(self, y_true, y_pred, logits_y):
q_y = tf.reshape(logits_y, (-1, N, M))
q_y = tf.nn.softmax(q_y)
log_q_y = tf.math.log(q_y + 1e-20)
kl_tmp = q_y * (log_q_y - tf.math.log(1.0 / M))
kl = tf.math.reduce_sum(kl_tmp, axis=(1, 2))
kl = tf.squeeze(kl, axis=0)
elbo = data_dim * self.bce(y_true, y_pred) - kl
return elbo
def train_step(self, data):
x = data
with tf.GradientTape(persistent=True) as tape:
z = self.encoder(x, training=True)
x_hat = self.decoder(z, training=True)
x = tf.cast(x, dtype=tf.float32)
x_hat = tf.cast(x_hat, dtype=tf.float32)
logits_y = self.encoder.get_layer('logits_y').output
loss = self.gumbel_loss(x, x_hat, logits_y)
grads = tape.gradient(loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def main():
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(
path="mnist.npz"
)
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:])))
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))
vae = VAE(encoder, decoder, name="vae-model")
vae_inputs = (None, data_dim)
vae.build(vae_inputs)
vae.compile(optimizer="adam", loss=None)
vae.fit(
x_train,
shuffle=True,
epochs=1,
batch_size=batch_size
)
if __name__ == "__main__":
main()