Source code for elliot.recommender.autoencoders.vae.multi_vae_model

"""
Module description:

"""

__version__ = '0.3.1'
__author__ = 'Vito Walter Anelli, Claudio Pomo'
__email__ = 'vitowalter.anelli@poliba.it, claudio.pomo@poliba.it'

import logging
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'


[docs]class Sampling(layers.Layer): """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
[docs] @tf.function def call(self, inputs): z_mean, z_log_var = inputs batch = tf.shape(z_mean)[0] dim = tf.shape(z_mean)[1] epsilon = tf.keras.backend.random_normal(shape=(batch, dim)) return z_mean + tf.exp(0.5 * z_log_var) * epsilon
[docs]class Encoder(layers.Layer): """Maps MNIST digits to a triplet (z_mean, z_log_var, z).""" def __init__(self, latent_dim=200, intermediate_dim=600, dropout_rate=0, regularization_lambda=0.01, name="encoder", **kwargs): super().__init__(name=name, **kwargs) self.l2_normalizer = layers.Lambda(lambda x: keras.backend.l2_normalize(x, axis=1)) self.input_dropout = layers.Dropout(dropout_rate) self.dense_proj = layers.Dense(intermediate_dim, activation="tanh", kernel_initializer=keras.initializers.GlorotNormal(), kernel_regularizer=keras.regularizers.l2(regularization_lambda)) self.dense_mean = layers.Dense(latent_dim, kernel_initializer=keras.initializers.GlorotNormal(), kernel_regularizer=keras.regularizers.l2(regularization_lambda)) self.dense_log_var = layers.Dense(latent_dim, kernel_initializer=keras.initializers.GlorotNormal(), kernel_regularizer=keras.regularizers.l2(regularization_lambda)) self.sampling = Sampling()
[docs] @tf.function def call(self, inputs, training=None): i_normalized = self.l2_normalizer(inputs, 1) i_drop = self.input_dropout(i_normalized, training=training) x = self.dense_proj(i_drop) z_mean = self.dense_mean(x) z_log_var = self.dense_log_var(x) z = self.sampling((z_mean, z_log_var)) return z_mean, z_log_var, z
[docs]class Decoder(layers.Layer): """Converts z, the encoded digit vector, back into a readable digit.""" def __init__(self, original_dim, intermediate_dim=600, name="decoder", regularization_lambda=0.01, **kwargs): super().__init__(name=name, **kwargs) self.dense_proj = layers.Dense(intermediate_dim, activation="tanh", kernel_initializer=keras.initializers.GlorotNormal(), kernel_regularizer=keras.regularizers.l2(regularization_lambda)) self.dense_output = layers.Dense(original_dim, kernel_initializer=keras.initializers.GlorotNormal(), kernel_regularizer=keras.regularizers.l2(regularization_lambda))
[docs] @tf.function def call(self, inputs, **kwargs): x = self.dense_proj(inputs) return self.dense_output(x)
[docs]class VariationalAutoEncoder(keras.Model): """Combines the encoder and decoder into an end-to-end model for training.""" def __init__(self, original_dim, intermediate_dim=600, latent_dim=200, learning_rate=0.001, dropout_rate=0, regularization_lambda=0.01, random_seed=42, name="VariationalAutoEncoder", **kwargs): super().__init__(name=name, **kwargs) tf.random.set_seed(random_seed) self.original_dim = original_dim self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim, dropout_rate=dropout_rate, regularization_lambda=regularization_lambda) self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim, regularization_lambda=regularization_lambda) self.optimizer = tf.optimizers.Adam(learning_rate)
[docs] def get_config(self): raise NotImplementedError
[docs] @tf.function def call(self, inputs, training=None, **kwargs): z_mean, z_log_var, z = self.encoder(inputs, training=training) reconstructed = self.decoder(z) # Add KL divergence regularization loss. kl_loss = -0.5 * tf.reduce_mean( z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1 ) # self.add_loss(kl_loss) return reconstructed, kl_loss
[docs] @tf.function def train_step(self, batch, anneal_ph=0.0, **kwargs): with tf.GradientTape() as tape: # Clean Inference logits, KL = self.call(inputs=batch, training=True) log_softmax_var = tf.nn.log_softmax(logits) # per-user average negative log-likelihood neg_ll = -tf.reduce_mean(tf.reduce_sum( log_softmax_var * batch, axis=-1)) loss = neg_ll + anneal_ph * KL grads = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) return loss
[docs] @tf.function def predict(self, inputs, training=False, **kwargs): """ Get full predictions on the whole users/items matrix. Returns: The matrix of predicted values. """ logits, _ = self.call(inputs=inputs, training=training) log_softmax_var = tf.nn.log_softmax(logits) return log_softmax_var
[docs] @tf.function def get_top_k(self, preds, train_mask, k=100): return tf.nn.top_k(tf.where(train_mask, preds, -np.inf), k=k, sorted=True)