Source code for elliot.recommender.visual_recommenders.ACF.ACF_model

"""
Module description:

"""

__version__ = '0.3.1'
__author__ = 'Vito Walter Anelli, Claudio Pomo, Daniele Malitesta'
__email__ = 'vitowalter.anelli@poliba.it, claudio.pomo@poliba.it, daniele.malitesta@poliba.it'

import numpy as np
import tensorflow as tf
from tensorflow import keras


[docs]class ACFModel(keras.Model): def __init__(self, factors=200, layers_component=(64, 1), layers_item=(64, 1), learning_rate=0.001, l_w=0, feature_shape=(49, 2048), num_users=100, num_items=100, random_seed=42, name="ACF", **kwargs): super().__init__(name=name, **kwargs) tf.random.set_seed(random_seed) self._factors = factors self.l_w = l_w self.feature_shape = feature_shape self._learning_rate = learning_rate self._num_items = num_items self._num_users = num_users self.layers_component = layers_component self.layers_item = layers_item self.initializer = tf.initializers.RandomNormal(stddev=0.01) self.initializer_attentive = tf.initializers.GlorotUniform() self.Gu = tf.Variable(self.initializer(shape=[self._num_users, self._factors]), name='Gu', dtype=tf.float32) self.Gi = tf.Variable(self.initializer(shape=[self._num_items, self._factors]), name='Gi', dtype=tf.float32) self.Pi = tf.Variable( self.initializer(shape=[self._num_items, self._factors]), name='Tu', dtype=tf.float32) self.component_weights, self.item_weights = self._build_attention_weights() self.optimizer = tf.keras.optimizers.Adam(learning_rate=self._learning_rate) def _build_attention_weights(self): component_dict = dict() items_dict = dict() for c in range(len(self.layers_component)): # the inner layer has all components if c == 0: component_dict['W_{}_u'.format(c)] = tf.Variable( self.initializer_attentive(shape=[self._factors, self.layers_component[c]]), name='W_{}_u'.format(c), dtype=tf.float32 ) component_dict['W_{}_i'.format(c)] = tf.Variable( self.initializer_attentive(shape=[self.feature_shape[-1], self.layers_component[c]]), name='W_{}_i'.format(c), dtype=tf.float32 ) component_dict['b_{}'.format(c)] = tf.Variable( self.initializer_attentive(shape=[self.layers_component[c]]), name='b_{}'.format(c), dtype=tf.float32 ) else: component_dict['W_{}'.format(c)] = tf.Variable( self.initializer_attentive(shape=[self.layers_component[c - 1], self.layers_component[c]]), name='W_{}_u'.format(c), dtype=tf.float32 ) component_dict['b_{}'.format(c)] = tf.Variable( self.initializer_attentive(shape=[self.layers_component[c]]), name='b_{}'.format(c), dtype=tf.float32 ) for i in range(len(self.layers_item)): # the inner layer has all components if i == 0: items_dict['W_{}_u'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self._factors, self.layers_item[i]]), name='W_{}_u'.format(i), dtype=tf.float32 ) items_dict['W_{}_iv'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self._factors, self.layers_item[i]]), name='W_{}_iv'.format(i), dtype=tf.float32 ) items_dict['W_{}_ip'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self._factors, self.layers_item[i]]), name='W_{}_ip'.format(i), dtype=tf.float32 ) items_dict['W_{}_ix'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self.feature_shape[-1], self.layers_item[i]]), name='W_{}_ix'.format(i), dtype=tf.float32 ) items_dict['b_{}'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self.layers_item[i]]), name='b_{}'.format(i), dtype=tf.float32 ) else: items_dict['W_{}'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self.layers_item[i - 1], self.layers_item[i]]), name='W_{}_u'.format(i), dtype=tf.float32 ) items_dict['b_{}'.format(i)] = tf.Variable( self.initializer_attentive(shape=[self.layers_item[i]]), name='b_{}'.format(i), dtype=tf.float32 ) return component_dict, items_dict @tf.function def _calculate_beta_alpha(self, g_u, g_i, p_i, f_i): # calculate beta b_i_l = tf.expand_dims( tf.expand_dims(tf.matmul(tf.expand_dims(g_u, 0), self.component_weights['W_{}_u'.format(0)]), 1), 1) + \ tf.matmul(f_i, self.component_weights['W_{}_i'.format(0)]) + \ self.component_weights['b_{}'.format(0)] b_i_l = tf.nn.relu(b_i_l) for c in range(1, len(self.layers_component)): b_i_l = tf.matmul(b_i_l, self.component_weights['W_{}'.format(c)]) + \ self.component_weights['b_{}'.format(c)] b_i_l = tf.nn.softmax(b_i_l, 2) all_x_l = tf.reduce_sum(tf.multiply(b_i_l, f_i), axis=2) # calculate alpha a_i_l = tf.expand_dims(tf.matmul(tf.expand_dims(g_u, 0), self.item_weights['W_{}_u'.format(0)]), 1) + \ tf.matmul(tf.expand_dims(g_i, 0), self.item_weights['W_{}_iv'.format(0)]) + \ tf.matmul(tf.expand_dims(p_i, 0), self.item_weights['W_{}_ip'.format(0)]) + \ tf.matmul(all_x_l, self.item_weights['W_{}_ix'.format(0)]) + \ self.item_weights['b_{}'.format(0)] a_i_l = tf.nn.relu(a_i_l) for c in range(1, len(self.layers_item)): a_i_l = tf.matmul(a_i_l, self.item_weights['W_{}'.format(c)]) + \ self.item_weights['b_{}'.format(c)] a_i_l = tf.nn.softmax(a_i_l, 2) all_a_i_l = tf.reduce_sum(tf.multiply(a_i_l, tf.expand_dims(p_i, 0)), 2) g_u_p = g_u + all_a_i_l return tf.squeeze(g_u_p)
[docs] @tf.function def call(self, inputs, training=None, mask=None): user, item, user_pos, f_u_i_pos = inputs gamma_u = tf.squeeze(tf.nn.embedding_lookup(self.Gu, user)) gamma_i = tf.squeeze(tf.nn.embedding_lookup(self.Gi, item)) p_i = tf.squeeze(tf.nn.embedding_lookup(self.Pi, item)) gamma_i_u_pos = tf.nn.embedding_lookup(self.Gi, user_pos) p_i_u_pos = tf.nn.embedding_lookup(self.Pi, user_pos) gamma_u_p = self._calculate_beta_alpha(gamma_u, gamma_i_u_pos, p_i_u_pos, f_u_i_pos) xui = tf.reduce_sum(gamma_u_p * gamma_i) return xui, gamma_u, gamma_i, p_i
[docs] @tf.function def train_step(self, batch): with tf.GradientTape() as t: user, pos, neg, user_pos, feat_pos = batch xu_pos, gamma_u, gamma_pos, p_i_pos = self((user, pos, user_pos, feat_pos), training=True) xu_neg, _, gamma_neg, p_i_neg = self((user, neg, user_pos, feat_pos), training=True) result = tf.clip_by_value(xu_pos - xu_neg, -80.0, 1e8) loss = tf.reduce_sum(tf.nn.softplus(-result)) # Regularization Component reg_loss = self.l_w * tf.reduce_sum([tf.nn.l2_loss(gamma_u), tf.nn.l2_loss(gamma_pos), tf.nn.l2_loss(gamma_neg), tf.nn.l2_loss(p_i_pos), tf.nn.l2_loss(p_i_neg), *[tf.nn.l2_loss(value) for _, value in self.component_weights.items()], *[tf.nn.l2_loss(value) for _, value in self.item_weights.items()]]) # Loss to be optimized loss += reg_loss params = [self.Gu, self.Gi, self.Pi, *[value for _, value in self.component_weights.items()], *[value for _, value in self.item_weights.items()]] grads = t.gradient(loss, params) self.optimizer.apply_gradients(zip(grads, params)) return loss
[docs] @tf.function def predict(self, user, user_pos, feat_pos): gamma_u = tf.squeeze(tf.nn.embedding_lookup(self.Gu, user)) gamma_i_u_pos = tf.expand_dims(tf.nn.embedding_lookup(self.Gi, user_pos), 0) p_i_u_pos = tf.expand_dims(tf.nn.embedding_lookup(self.Pi, user_pos), 0) gamma_u_p = self._calculate_beta_alpha(gamma_u, gamma_i_u_pos, p_i_u_pos, tf.expand_dims(feat_pos, 0)) return tf.squeeze(tf.matmul(tf.expand_dims(gamma_u_p, 0), self.Gi, transpose_b=True))
[docs] @tf.function def get_top_k(self, preds, train_mask, k=100): return tf.nn.top_k(tf.where(train_mask, preds, -np.inf), k=k, sorted=True)
[docs] def get_config(self): raise NotImplementedError