Source code for elliot.recommender.latent_factor_models.FISM.FISM_model

"""
Module description:

"""

__version__ = '0.3.1'
__author__ = 'Vito Walter Anelli, Claudio Pomo, Daniele Malitesta'
__email__ = 'vitowalter.anelli@poliba.it, claudio.pomo@poliba.it, daniele.malitesta@poliba.it'

import os
import numpy as np
import tensorflow as tf
from tensorflow import keras

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'


[docs]class FISM_model(keras.Model): def __init__(self, data, factors, lr, alpha, beta, lambda_, gamma, num_users, num_items, random_seed, name="FISM", **kwargs): super().__init__(name=name, **kwargs) tf.random.set_seed(random_seed) self._data = data self._factors = factors self._lr = lr self._beta = beta self._lambda = lambda_ self._gamma = gamma self._alpha = alpha self._num_users = num_users self._num_items = num_items self._history_item_matrix, self._history_lens, self._mask_history_matrix = self.create_history_item_matrix() self.initializer = tf.keras.initializers.RandomUniform(-0.001, 0.001) self.Bi = tf.Variable(tf.zeros(self._num_items), name='Bi', dtype=tf.float32) self.Bu = tf.Variable(tf.zeros(self._num_users), name='Bu', dtype=tf.float32) self.Gi = tf.Variable(self.initializer(shape=[self._num_items, self._factors]), name='Gi', dtype=tf.float32) self.Gj = tf.Variable(self.initializer(shape=[self._num_items, self._factors]), name='Gj', dtype=tf.float32) self.optimizer = tf.optimizers.Adam(self._lr) self.loss = keras.losses.BinaryCrossentropy()
[docs] @tf.function def call(self, inputs, training=None): user, item = inputs user_inter = tf.nn.embedding_lookup(self._history_item_matrix, user) item_num = tf.nn.embedding_lookup(self._history_lens, user) batch_mask_mat = tf.nn.embedding_lookup(self._mask_history_matrix, user) user_history = tf.squeeze(tf.nn.embedding_lookup(self.Gi, user_inter)) # batch_size x max_len x embedding_size target = tf.squeeze(tf.nn.embedding_lookup(self.Gj, item)) # batch_size x embedding_size user_bias = tf.squeeze(tf.nn.embedding_lookup(self.Bu, user)) # batch_size x 1 item_bias = tf.squeeze(tf.nn.embedding_lookup(self.Bi, item)) similarity = tf.squeeze(tf.matmul(user_history, tf.expand_dims(target, axis=2))) similarity = tf.convert_to_tensor(batch_mask_mat, dtype=tf.float32) * similarity coeff = tf.pow(item_num, -tf.convert_to_tensor(self._alpha)) scores = 1 / (1 + tf.math.exp(-(coeff * tf.reduce_sum(similarity, axis=1) + user_bias + item_bias))) return scores, user_bias, item_bias, user_history, target
[docs] @tf.function def train_step(self, batch): user, pos, label = batch with tf.GradientTape() as tape: # Clean Inference output, user_bias, item_bias, source, target = self(inputs=(user, pos), training=True) reg_loss = self._lambda * tf.nn.l2_loss(user_bias) + self._gamma * tf.nn.l2_loss(item_bias) + self._beta * tf.reduce_sum( [tf.nn.l2_loss(source), tf.nn.l2_loss(target)]) reg_loss = tf.cast(reg_loss, tf.float64) loss = self.loss(output, label) + reg_loss grads = tape.gradient(loss, self.trainable_weights) self.optimizer.apply_gradients(zip(grads, self.trainable_weights)) return loss
[docs] @tf.function def predict(self, user, **kwargs): # user_inters = self._history_item_matrix[user] user_inters = tf.nn.embedding_lookup(self._history_item_matrix, user) # item_num = self._history_lens[user] item_num = tf.nn.embedding_lookup(self._history_lens, user) # user_input = user_inters[:item_num] user_input = user_inters[:] repeats = self._num_items user_bias = tf.squeeze(tf.nn.embedding_lookup(self.Bu, user)) item_num = tf.repeat(item_num, repeats) user_history = tf.squeeze(tf.nn.embedding_lookup(self.Gi, user_input)) # inter_num x embedding_size user_history = tf.ones([repeats, 1, 1]) * user_history # target_items x inter_num x embedding_size targets = self.Gj # target_items x embedding_size item_bias = self.Bi similarity = tf.squeeze(tf.matmul(user_history, tf.expand_dims(targets, axis=2))) coeff = tf.pow(tf.cast(item_num, tf.float32), -self._alpha) scores = 1 / (1 + tf.math.exp(-(coeff * tf.reduce_sum(similarity, axis=1) + user_bias + item_bias))) return scores
[docs] @tf.function def batch_predict(self, user_start, user_stop, **kwargs): user_inters = tf.nn.embedding_lookup(self._history_item_matrix, range(user_start, user_stop)) item_num = tf.nn.embedding_lookup(self._history_lens, range(user_start, user_stop)) user_input = user_inters[:] repeats = self._num_items user_bias = tf.repeat(tf.nn.embedding_lookup(self.Bu, range(user_start, user_stop)), repeats) item_num = tf.repeat(item_num, repeats) user_history = tf.squeeze(tf.nn.embedding_lookup(self.Gi, user_input)) # bs x inter_num x embedding_size user_history = tf.ones([repeats, 1, 1, 1]) * user_history # bs x target_items x inter_num x embedding_size user_history = tf.reshape(user_history, [user_history.shape[1], user_history.shape[0], user_history.shape[2], user_history.shape[3]]) targets = self.Gj # target_items x embedding_size item_bias = self.Bi similarity = tf.squeeze(tf.matmul(user_history, tf.expand_dims(targets, axis=2))) batch_eval = similarity.shape[0] # coeff = tf.pow(tf.cast(item_num, tf.float32), -self._alpha) # scores = 1 / (1 + tf.math.exp(-(coeff * tf.reduce_sum(similarity, axis=1) + user_bias + item_bias))) coeff = tf.reshape(tf.pow(tf.cast(item_num, tf.float32), -self._alpha), [batch_eval, self._num_items]) prod = coeff * tf.reduce_sum(similarity, axis=2) return 1 / (1 + tf.math.exp(-(prod + tf.reshape(user_bias, prod.shape) + item_bias)))
[docs] def create_history_item_matrix(self): user_ids, item_ids = self._data.sp_i_train.nonzero()[0], self._data.sp_i_train.nonzero()[1] row_num, max_col_num = self._num_users, self._num_items row_ids, col_ids = user_ids, item_ids history_len = np.zeros(row_num, dtype=np.int64) for row_id in row_ids: history_len[row_id] += 1 col_num = np.max(history_len) history_matrix = np.zeros((row_num, col_num), dtype=np.int64) mask_history_matrix = np.zeros((row_num, col_num), dtype=np.int64) history_len[:] = 0 for row_id, col_id in zip(row_ids, col_ids): history_matrix[row_id, history_len[row_id]] = col_id mask_history_matrix[row_id, history_len[row_id]] = 1 history_len[row_id] += 1 # return history_matrix, history_len, mask_history_matrix return tf.Variable(history_matrix), tf.Variable(history_len, dtype=tf.float32), tf.Variable(mask_history_matrix, dtype=tf.float32)
[docs] @tf.function def get_top_k(self, predictions, train_mask, k=100): return tf.nn.top_k(tf.where(train_mask, predictions, -np.inf), k=k, sorted=True)
[docs] @tf.function def get_positions(self, predictions, train_mask, items, inner_test_user_true_mask): predictions = tf.gather(predictions, inner_test_user_true_mask) train_mask = tf.gather(train_mask, inner_test_user_true_mask) equal = tf.reshape(items, [len(items), 1]) i = tf.argsort(tf.where(train_mask, predictions, -np.inf), axis=-1, direction='DESCENDING', stable=False, name=None) positions = tf.where(tf.equal(equal, i))[:, 1] return 1 - (positions / tf.reduce_sum(tf.cast(train_mask, tf.int64), axis=1))
[docs] def get_config(self): raise NotImplementedError
[docs]class LatentFactor(tf.keras.layers.Embedding): def __init__(self, num_instances, dim, zero_init=False, name=None): if zero_init: initializer = 'zeros' else: initializer = 'uniform' super(LatentFactor, self).__init__(input_dim=num_instances, output_dim=dim, embeddings_initializer=initializer, name=name)
[docs] def censor(self, censor_id): unique_censor_id, _ = tf.unique(censor_id) embedding_gather = tf.gather(self.variables[0], indices=unique_censor_id) norm = tf.norm(embedding_gather, axis=1, keepdims=True) return self.variables[0].scatter_nd_update(indices=tf.expand_dims(unique_censor_id, 1), updates=embedding_gather / tf.math.maximum(norm, 0.1))