Source code for elliot.recommender.latent_factor_models.CML.CML_model

"""
Module description:

"""

__version__ = '0.3.1'
__author__ = 'Felice Antonio Merra, Vito Walter Anelli, Claudio Pomo'
__email__ = 'felice.merra@poliba.it, vitowalter.anelli@poliba.it, claudio.pomo@poliba.it'

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'


[docs]class CML_model(keras.Model): def __init__(self, user_factors=200, item_factors=200, learning_rate=0.001, l_w=0, l_b=0, margin=0.5, num_users=100, num_items=100, random_seed=42, name="CML", **kwargs): super().__init__(name=name, **kwargs) tf.random.set_seed(random_seed) self._user_factors = user_factors self._item_factors = item_factors self._learning_rate = learning_rate self.l_w = l_w self.l_b = l_b self._num_items = num_items self._num_users = num_users self.initializer = tf.initializers.GlorotUniform() self.Gu = LatentFactor(num_instances=self._num_users, dim=self._user_factors, name='user_latent_factor') self.Gi = LatentFactor(num_instances=self._num_items, dim=self._item_factors, name='item_latent_factor') self.Bi = LatentFactor(num_instances=self._num_items, dim=1, name='item_bias') self.margin = margin self.optimizer = tf.optimizers.Adam(self._learning_rate)
[docs] @tf.function def call(self, inputs, training=None): user, item = inputs beta_i = tf.squeeze(self.Bi(item)) gamma_u = tf.squeeze(self.Gu(user)) gamma_i = tf.squeeze(self.Gi(item)) l2_user_pos = tf.math.reduce_sum(tf.math.square(gamma_u - gamma_i), axis=-1, keepdims=True) score = (-l2_user_pos) + beta_i return score, beta_i, gamma_u, gamma_i
[docs] @tf.function def train_step(self, batch): user, pos, neg = batch with tf.GradientTape() as tape: # Clean Inference xu_pos, beta_pos, gamma_u, gamma_pos = self(inputs=(user, pos), training=True) xu_neg, beta_neg, gamma_u, gamma_neg = self(inputs=(user, neg), training=True) difference = tf.clip_by_value(xu_pos - xu_neg, -80.0, 1e8) loss = tf.reduce_sum(tf.maximum(self.margin - difference, 0)) # Regularization Component reg_loss = self.l_w * tf.reduce_sum([tf.nn.l2_loss(gamma_u), tf.nn.l2_loss(gamma_pos), tf.nn.l2_loss(gamma_neg)]) \ + self.l_b * tf.nn.l2_loss(beta_pos) \ + self.l_b * tf.nn.l2_loss(beta_neg) / 10 # Loss to be optimized loss += reg_loss # grads = tape.gradient(loss, [self.Bi, self.Gu, self.Gi]) # self.optimizer.apply_gradients(zip(grads, [self.Bi, self.Gu, self.Gi])) grads = tape.gradient(loss, self.trainable_variables) self.optimizer.apply_gradients(zip(grads, self.trainable_variables)) return loss
[docs] @tf.function def predict(self, start, stop, **kwargs): # return self.Bi + tf.matmul(self.Gu[start:stop], self.Gi, transpose_b=True) user_vec = self.Gu.embeddings[start:stop] return -tf.math.reduce_sum( tf.math.square(tf.expand_dims(user_vec, axis=1) - self.Gi.variables[0]), axis=-1, keepdims=False) + tf.reshape(self.Bi.variables[0], [-1])
[docs] @tf.function def get_top_k(self, predictions, train_mask, k=100): return tf.nn.top_k(tf.where(train_mask, predictions, -np.inf), k=k, sorted=True)
[docs] @tf.function def get_positions(self, predictions, train_mask, items, inner_test_user_true_mask): predictions = tf.gather(predictions, inner_test_user_true_mask) train_mask = tf.gather(train_mask, inner_test_user_true_mask) equal = tf.reshape(items, [len(items), 1]) i = tf.argsort(tf.where(train_mask, predictions, -np.inf), axis=-1, direction='DESCENDING', stable=False, name=None) positions = tf.where(tf.equal(equal, i))[:, 1] return 1 - (positions / tf.reduce_sum(tf.cast(train_mask, tf.int64), axis=1))
[docs] def get_config(self): raise NotImplementedError
[docs]class LatentFactor(tf.keras.layers.Embedding): def __init__(self, num_instances, dim, zero_init=False, name=None): if zero_init: initializer = 'zeros' else: initializer = 'uniform' super(LatentFactor, self).__init__(input_dim=num_instances, output_dim=dim, embeddings_initializer=initializer, name=name)
[docs] def censor(self, censor_id): unique_censor_id, _ = tf.unique(censor_id) embedding_gather = tf.gather(self.variables[0], indices=unique_censor_id) norm = tf.norm(embedding_gather, axis=1, keepdims=True) return self.variables[0].scatter_nd_update(indices=tf.expand_dims(unique_censor_id, 1), updates=embedding_gather / tf.math.maximum(norm, 0.1))