Source code for elliot.splitter.base_splitter

import typing as t
import pandas as pd
import numpy as np
import math
import shutil
import os

from types import SimpleNamespace

from elliot.utils.folder import create_folder_by_index

"""        
data_config:
    strategy: dataset|hierarchy|fixed
    dataset: example
    dataloader: KnowledgeChains
    dataset_path: "path"
    root_folder: "path"
    train_path: ""
    validation_path: ""
    test_path: ""
    side_information:
        feature_data: ../data/{0}/original/features.npy
        map: ../data/{0}/map.tsv
        features: ../data/{0}/features.tsv
        properties: ../data/{0}/properties.conf
    output_rec_result: ../results/{0}/recs/
    output_rec_weight: ../results/{0}/weights/
    output_rec_performance: ../results/{0}/performance/
splitting:
    save_on_disk: True
    save_path: "path"
    test_splitting:
        strategy: fixed_timestamp|temporal_hold_out|random_subsampling|random_cross_validation
        timestamp: best|1609786061
        test_ratio: 0.2
        leave_n_out: 1
        folds: 5
    validation_splitting:
        strategy: fixed_timestamp|temporal_hold_out|random_subsampling|random_cross_validation
        timestamp: best|1609786061
        test_ratio: 0.2
        leave_n_out: 1
        folds: 5
"""
"""
Nested Cross-Validation
[(train_0,test_0), (train_1,test_1), (train_2,test_2), (train_3,test_3), (train_4,test_4)]

[([(train_0,val_0), (train_1,val_1), (train_2,val_2), (train_3,val_3), (train_4,val_4)],test_0),
([(train_0,val_0), (train_1,val_1), (train_2,val_2), (train_3,val_3), (train_4,val_4)],test_1),
([(train_0,val_0), (train_1,val_1), (train_2,val_2), (train_3,val_3), (train_4,val_4)],test_2),
([(train_0,val_0), (train_1,val_1), (train_2,val_2), (train_3,val_3), (train_4,val_4)],test_3),
([(train_0,val_0), (train_1,val_1), (train_2,val_2), (train_3,val_3), (train_4,val_4)],test_4)]

Nested Hold-Out
[(train_0,test_0)]

[([(train_0,test_0)],test_0)]
"""


[docs]class Splitter: def __init__(self, data: pd.DataFrame, splitting_ns: SimpleNamespace, random_seed=42): self.random_seed = random_seed self.data = data self.splitting_ns = splitting_ns self.save_on_disk = False self.save_folder = None
[docs] def process_splitting(self): np.random.seed(self.random_seed) data = self.data splitting_ns = self.splitting_ns if hasattr(splitting_ns, "save_on_disk"): if hasattr(splitting_ns, "save_folder"): self.save_on_disk = True self.save_folder = splitting_ns.save_folder if os.path.exists(self.save_folder): shutil.rmtree(self.save_folder, ignore_errors=True) os.makedirs(self.save_folder) else: raise Exception("Train or Test paths are missing") if hasattr(splitting_ns, "test_splitting"): # [(train_0,test_0), (train_1,test_1), (train_2,test_2), (train_3,test_3), (train_4,test_4)] tuple_list = self.handle_hierarchy(data, splitting_ns.test_splitting) if hasattr(splitting_ns, "validation_splitting"): exploded_train_list = [] for single_train, single_test in tuple_list: # [(train_0,test_0), (train_1,test_1), (train_2,test_2), (train_3,test_3), (train_4,test_4)] train_val_test_tuples_list = self.handle_hierarchy(single_train, splitting_ns.validation_splitting) exploded_train_list.append(train_val_test_tuples_list) tuple_list = self.rearrange_data(tuple_list, exploded_train_list) print("\nRealized a Train/Validation Test splitting strategy\n") else: print("\nRealized a Train/Test splitting strategy\n") else: raise Exception("Test splitting strategy is not defined") if self.save_on_disk: self.store_splitting(tuple_list) return tuple_list
[docs] def store_splitting(self, tuple_list): for i, (train_val, test) in enumerate(tuple_list): actual_test_folder = create_folder_by_index(self.save_folder, str(i)) test.to_csv(os.path.abspath(os.sep.join([actual_test_folder, "test.tsv"])), sep='\t', index=False, header=False) if isinstance(train_val, list): for j, (train, val) in enumerate(train_val): actual_val_folder = create_folder_by_index(actual_test_folder, str(j)) val.to_csv(os.path.abspath(os.sep.join([actual_val_folder, "val.tsv"])), sep='\t', index=False, header=False) train.to_csv(os.path.abspath(os.sep.join([actual_val_folder, "train.tsv"])), sep='\t', index=False, header=False) else: train_val.to_csv(os.path.abspath(os.sep.join([actual_test_folder, "train.tsv"])), sep='\t', index=False, header=False)
# def read_folder(self, folder_path): # for root, dirs, files in os.walk(folder_path): # if not dirs: # # leggi i due file # # pass # else: # pass # pass
[docs] def handle_hierarchy(self, data: pd.DataFrame, valtest_splitting_ns: SimpleNamespace) -> t.List[ t.Tuple[pd.DataFrame, pd.DataFrame]]: if hasattr(valtest_splitting_ns, "strategy"): if valtest_splitting_ns.strategy == "fixed_timestamp": if hasattr(valtest_splitting_ns, "timestamp"): if valtest_splitting_ns.timestamp.isdigit(): tuple_list = self.splitting_passed_timestamp(data, int(valtest_splitting_ns.timestamp)) elif valtest_splitting_ns.timestamp == "best": print("Here") kwargs = {} if hasattr(valtest_splitting_ns, "min_below"): kwargs["min_below"] = int(valtest_splitting_ns.min_below) if hasattr(valtest_splitting_ns, "min_over"): kwargs["min_over"] = int(valtest_splitting_ns.min_over) tuple_list = self.splitting_best_timestamp(data, **kwargs) else: raise Exception("Timestamp option value is not valid") else: raise Exception(f"Option timestamp missing for {valtest_splitting_ns.strategy} strategy") elif valtest_splitting_ns.strategy == "temporal_hold_out": if hasattr(valtest_splitting_ns, "test_ratio"): tuple_list = self.splitting_temporal_holdout(data, float(valtest_splitting_ns.test_ratio)) elif hasattr(valtest_splitting_ns, "leave_n_out"): tuple_list = self.splitting_temporal_leavenout(data, int(valtest_splitting_ns.leave_n_out)) else: raise Exception(f"Option missing for {valtest_splitting_ns.strategy} strategy") elif valtest_splitting_ns.strategy == "random_subsampling": if hasattr(valtest_splitting_ns, "folds"): if str(valtest_splitting_ns.folds).isdigit(): pass else: raise Exception("Folds option value is not valid") else: valtest_splitting_ns.folds = 1 print("Folds option value is missing. It has been set to 1") # raise Exception(f"Option missing for {valtest_splitting_ns.strategy} strategy") if hasattr(valtest_splitting_ns, "test_ratio"): tuple_list = self.splitting_randomsubsampling_kfolds(data, int(valtest_splitting_ns.folds), float(valtest_splitting_ns.test_ratio)) elif hasattr(valtest_splitting_ns, "leave_n_out"): tuple_list = self.splitting_randomsubsampling_kfolds_leavenout(data, int(valtest_splitting_ns.folds), int(valtest_splitting_ns.leave_n_out)) else: raise Exception(f"Option missing for {valtest_splitting_ns.strategy} strategy") elif valtest_splitting_ns.strategy == "random_cross_validation": if hasattr(valtest_splitting_ns, "folds"): if str(valtest_splitting_ns.folds).isdigit(): tuple_list = self.splitting_kfolds(data, int(valtest_splitting_ns.folds)) else: raise Exception("Folds option value is not valid") else: raise Exception(f"Option missing for {valtest_splitting_ns.strategy} strategy") else: raise Exception(f"Unrecognized Test Strategy:\t{valtest_splitting_ns.strategy}") else: raise Exception("Strategy option not found") return tuple_list # it returns a list tuples (pairs) of train test dataframes
[docs] def rearrange_data(self, train_test: t.List[t.Tuple[pd.DataFrame, pd.DataFrame]], train_val: t.List[t.List[t.Tuple[pd.DataFrame, pd.DataFrame]]]): return [(train_val[p], v[1]) for p, v in enumerate(train_test)]
[docs] def generic_split_function(self, data: pd.DataFrame, **kwargs) -> t.List[t.Tuple[pd.DataFrame, pd.DataFrame]]: pass
[docs] def fold_list_generator(self, length, folds=5): def infinite_looper(folds=5): while True: for f in range(folds): yield f looper = infinite_looper(folds) return [next(looper) for _ in range(length)]
[docs] def splitting_kfolds(self, data: pd.DataFrame, folds=5): tuple_list = [] user_groups = data.groupby(['userId']) for name, group in user_groups: data.loc[group.index, 'fold'] = self.fold_list_generator(len(group), folds) data["fold"] = pd.to_numeric(data["fold"], downcast='integer') for i in range(folds): test = data[data["fold"] == i].drop(columns=["fold"]).reset_index(drop=True) train = data[data["fold"] != i].drop(columns=["fold"]).reset_index(drop=True) tuple_list.append((train, test)) return tuple_list
[docs] def splitting_temporal_holdout(self, d: pd.DataFrame, ratio=0.2): tuple_list = [] data = d.copy() user_size = data.groupby(['userId'], as_index=True).size() user_threshold = user_size.apply(lambda x: math.floor(x * (1 - ratio))) data['rank_first'] = data.groupby(['userId'])['timestamp'].rank(method='first', ascending=True, axis=1) data["test_flag"] = data.apply( lambda x: x["rank_first"] > user_threshold.loc[x["userId"]], axis=1) test = data[data["test_flag"] == True].drop(columns=["rank_first", "test_flag"]).reset_index(drop=True) train = data[data["test_flag"] == False].drop(columns=["rank_first", "test_flag"]).reset_index(drop=True) tuple_list.append((train, test)) return tuple_list
[docs] def splitting_temporal_leavenout(self, d: pd.DataFrame, n=1): tuple_list = [] data = d.copy() data['rank_first'] = data.groupby(['userId'])['timestamp'].rank(method='first', ascending=False, axis=1) data["test_flag"] = data.apply( lambda x: x["rank_first"] <= n, axis=1) test = data[data["test_flag"] == True].drop(columns=["rank_first", "test_flag"]).reset_index(drop=True) train = data[data["test_flag"] == False].drop(columns=["rank_first", "test_flag"]).reset_index(drop=True) tuple_list.append((train, test)) return tuple_list
[docs] def splitting_passed_timestamp(self, d: pd.DataFrame, timestamp=1): tuple_list = [] data = d.copy() data["test_flag"] = data.apply(lambda x: x["timestamp"] >= timestamp, axis=1) test = data[data["test_flag"] == True].drop(columns=["test_flag"]).reset_index(drop=True) train = data[data["test_flag"] == False].drop(columns=["test_flag"]).reset_index(drop=True) tuple_list.append((train, test)) return tuple_list
[docs] def subsampling_list_generator(self, length, ratio=0.2): train = int(math.floor(length * (1 - ratio))) test = length - train list_ = [0] * train + [1] * test np.random.shuffle(list_) return list_
[docs] def splitting_randomsubsampling_kfolds(self, d: pd.DataFrame, folds=5, ratio=0.2): tuple_list = [] data = d.copy() user_groups = data.groupby(['userId']) for i in range(folds): for name, group in user_groups: data.loc[group.index, 'test_flag'] = self.subsampling_list_generator(len(group), ratio) data["test_flag"] = pd.to_numeric(data["test_flag"], downcast='integer') test = data[data["test_flag"] == 1].drop(columns=["test_flag"]).reset_index(drop=True) train = data[data["test_flag"] == 0].drop(columns=["test_flag"]).reset_index(drop=True) tuple_list.append((train, test)) return tuple_list
[docs] def subsampling_leavenout_list_generator(self, length, n=1): test = n train = length - test list_ = [0] * train + [1] * test np.random.shuffle(list_) return list_
[docs] def splitting_randomsubsampling_kfolds_leavenout(self, d: pd.DataFrame, folds=5, n=1): tuple_list = [] data = d.copy() user_groups = data.groupby(['userId']) for i in range(folds): for name, group in user_groups: data.loc[group.index, 'test_flag'] = self.subsampling_leavenout_list_generator(len(group), n) data["test_flag"] = pd.to_numeric(data["test_flag"], downcast='integer') test = data[data["test_flag"] == 1].drop(columns=["test_flag"]).reset_index(drop=True) train = data[data["test_flag"] == 0].drop(columns=["test_flag"]).reset_index(drop=True) tuple_list.append((train, test)) return tuple_list
[docs] def splitting_best_timestamp(self, d: pd.DataFrame, min_below=1, min_over=1): data = d.copy() unique_timestamps = data["timestamp"].unique() user_groups = data.groupby(['userId']) ts_dict = {} nuniques = len(unique_timestamps) i = 0 for ts in unique_timestamps: print(nuniques - i) i += 1 ts_dict[ts] = 0 for name, group in user_groups: below = group[group["timestamp"] < ts]["timestamp"].count() over = len(group) - below if (below >= min_below) and (over >= min_over): ts_dict[ts] += 1 max_val = max(ts_dict.values()) best_tie = [ts for ts,v in ts_dict.items() if v == max_val] max_ts = max(best_tie) print(f"Best Timestamp: {max_ts}") return self.splitting_passed_timestamp(d, max_ts)