outrank.core_utils
1from __future__ import annotations 2 3import csv 4import glob 5import json 6import logging 7import os 8from collections import Counter 9from collections import defaultdict 10from dataclasses import dataclass 11from typing import Any 12from typing import Dict 13from typing import List 14from typing import Optional 15from typing import Set 16from typing import Tuple 17from typing import Union 18 19import numpy as np 20import pandas as pd 21import xxhash 22 23logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) 24 25pro_tips = [ 26 'OutRank can construct subfeatures; features based on subspaces. Example command argument is: --subfeature_mapping "feature_a->feature_b;feature_c<->feature_d;feature_c<->feature_e"', 27 'Heuristic MI-numba-randomized seems like the best of both worlds! (speed + performance).', 28 'Heuristic surrogate-lr performs cross-validation (internally), keep that in mind!', 29 'Consider running OutRank on a smaller data sample first, might be enough (--subsampling = a lot).', 30 'There are two types of combinations supported; unsupervised pairwise ranking (redundancies- --target_ranking_only=False), and supervised combinations - (--interaction_order > 1)', 31 'Visualization part also includes clustering - this might be very insightful!', 32 'By default OutRank includes feature cardinality and coverage in feature names (card; cov)', 33 'Intermediary checkpoints (tmp_checkpoint.tsv) might already give you insights during longer runs.', 34 'In theory, you can rank redundancies of combined features (--interaction_order AND --target_ranking_only=False).', 35 'Give it as many threads as physically possible (--num_threads).', 36 'You can speed up ranking by diminishing feature buffer size (--combination_number_upper_bound determines how many ranking computations per batch will be considered). This, and --subsampling are very powerful together.', 37 'Want to rank feature transformations, but not sure which ones to choose? --transformers=default should serve as a solid baseline (common DS transformations included).', 38 'Your target can be any feature! (explaining one feature with others)', 39 'OutRank uses HyperLogLog for cardinality estimation - this is also a potential usecase (understanding cardinalities across different data sets).', 40 'Each feature is named as featureName(cardinality, coverage in percents) in the final files.', 41 'You can generate candidate feature transformation ranges (fw) by using --task=feature_summary_transformers.', 42] 43 44 45def write_json_dump_to_file(args: Any, config_name: str) -> None: 46 47 out_content = json.dumps(args.__dict__) 48 with open(config_name, 'w') as out_config: 49 out_config.write(out_content) 50 51 52def internal_hash(input_obj: str) -> str: 53 """A generic internal hash used throughout ranking procedure - let's hardcode seed here for sure""" 54 return xxhash.xxh32(input_obj, seed=20141025).hexdigest() 55 56 57@dataclass 58class DatasetInformationStorage: 59 """A generic class for holding properties of a given type of dataset""" 60 61 data_path: str 62 column_names: list[str] 63 column_types: set[str] 64 col_delimiter: str | None 65 encoding: str 66 fw_map: dict[str, str] | None 67 68 69@dataclass 70class NumericFeatureSummary: 71 """A generic class storing numeric feature statistics""" 72 73 feature_name: str 74 minimum: float 75 maximum: float 76 median: float 77 num_unique: int 78 79 80@dataclass 81class NominalFeatureSummary: 82 """A generic class storing numeric feature statistics""" 83 84 feature_name: str 85 num_unique: int 86 87 88@dataclass 89class BatchRankingSummary: 90 """A generic class representing batched ranking results""" 91 92 triplet_scores: list[tuple[str, str, float]] 93 step_times: dict[str, Any] 94 95 96def display_random_tip() -> None: 97 TIP_CONTENT = np.random.choice(pro_tips) 98 tip_core = f""" 99=====> 100Random tip: {TIP_CONTENT} 101=====> 102 """ 103 104 print(tip_core) 105 106 107def get_dataset_info(args: Any): 108 if args.data_source == 'ob-raw-dump': 109 dataset_info = parse_ob_raw_feature_information(args.data_path) 110 111 elif args.data_source == 'ob-vw': 112 dataset_info = parse_ob_vw_feature_information(args.data_path) 113 114 elif args.data_source == 'ob-csv': 115 dataset_info = parse_csv_with_description_information(args.data_path) 116 117 elif args.data_source == 'csv-raw': 118 dataset_info = parse_csv_raw(args.data_path) 119 else: 120 raise NotImplementedError( 121 'Plase, select a supported data source. Possible sources: {csv-raw, ob-vw, ob-csv}', 122 ) 123 124 return dataset_info 125 126 127def display_tool_name() -> None: 128 tool_name = """ 129 130 131 *///////////////. 132 //////////////////////* 133 */////////////////////////. 134 ////////////// *///////////// 135 /////////* ///////// 136 ////// ///// ////, ///// 137 //////// /// ///////// 138 ///// ///// .///// ////* 139 ,//// //// 140 *//// ////. 141 ///////*/////// 142 143 144 ░█████╗░██╗░░░██╗████████╗██████╗░░█████╗░███╗░░██╗██╗░░██╗ 145 ██╔══██╗██║░░░██║╚══██╔══╝██╔══██╗██╔══██╗████╗░██║██║░██╔╝ 146 ██║░░██║██║░░░██║░░░██║░░░██████╔╝███████║██╔██╗██║█████═╝░ 147 ██║░░██║██║░░░██║░░░██║░░░██╔══██╗██╔══██║██║╚████║██╔═██╗░ 148 ╚█████╔╝╚██████╔╝░░░██║░░░██║░░██║██║░░██║██║░╚███║██║░╚██╗ 149 ░╚════╝░░╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝╚═╝░░╚═╝╚═╝░░╚══╝╚═╝░░╚═╝ 150 151 152 """ 153 154 print(tool_name) 155 156 157def parse_ob_line( 158 line_string: str, delimiter: str = '\t', args: Any = None, 159) -> list[str]: 160 """Outbrain line parsing - generic TSVs""" 161 162 line_string = line_string.strip() 163 parts = line_string.split(delimiter) 164 return parts 165 166 167def parse_ob_line_vw( 168 line_string: str, 169 delimiter: str, 170 args: Any = None, 171 fw_col_mapping = None, 172 table_header = None, 173 include_namespace_info = False, 174) -> list[str | None]: 175 """Parse a sparse vw line into a pandas df with pre-defined namespace""" 176 177 all_line_parts = line_string.strip().split('|') 178 label_part = all_line_parts[0].split(' ')[0] 179 remainder = all_line_parts[1:] 180 label = label_part 181 remainder_hash = dict() 182 183 # Hash multi-value tuples and store name-val mappings 184 for remaining_part in remainder: 185 core_parts = remaining_part.strip().split(' ') 186 namespace_part = core_parts[0] 187 other_parts = '-'.join(x for x in core_parts[1:] if x != '') 188 189 if namespace_part in fw_col_mapping: 190 remainder_hash[fw_col_mapping[namespace_part]] = other_parts 191 else: 192 logging.error(f"Didn't find namespace {namespace_part}") 193 194 # Construct the consistently-mapped instance based on the remainder mapping 195 the_real_instance = [ 196 remainder_hash.get( 197 el, None, 198 ) for el in table_header[1:] 199 ] 200 if not include_namespace_info: 201 the_real_instance = [ 202 x[2:] if not x is None else None for x in the_real_instance 203 ] 204 205 parts = [label] + the_real_instance 206 207 return parts 208 209 210def parse_ob_csv_line( 211 line_string: str, delimiter: str = ',', args: Any = None, 212) -> list[str]: 213 """Data can have commas within JSON field dumps""" 214 215 clx = list(csv.reader([line_string])).pop() 216 return clx 217 218 219def generic_line_parser( 220 line_string: str, 221 delimiter: str, 222 args: Any = None, 223 fw_col_mapping: Any = None, 224 table_header: Any = None, 225) -> list[Any]: 226 """A generic method aimed to parse data from different sources.""" 227 228 if args.data_source == 'ob-raw-dump': 229 return parse_ob_line(line_string, delimiter, args) 230 231 elif args.data_source == 'ob-vw': 232 return parse_ob_line_vw( 233 line_string, delimiter, args, fw_col_mapping, table_header, 234 ) 235 236 elif args.data_source == 'ob-csv' or args.data_source == 'csv-raw': 237 return parse_ob_csv_line(line_string, delimiter, args) 238 239 else: 240 raise NotImplementedError( 241 'Please, specify a valid --data_source argument!', 242 ) 243 244 245def read_reference_json(json_path) -> dict[str, dict]: 246 """A helper method for reading a JSON""" 247 with open(json_path) as jp: 248 return json.load(jp) 249 250 251def parse_namespace(namespace_path: str) -> tuple[set[str], dict[str, str]]: 252 """Parse the feature namespace for type awareness""" 253 254 float_set = set() 255 id_feature_map = {} 256 257 with open(namespace_path) as nm: 258 for line in nm: 259 try: 260 namespace_parts = line.strip().split(',') 261 if len(namespace_parts) == 2 and '_' not in namespace_parts[0]: 262 fw_id, feature = namespace_parts 263 type_name = 'generic' 264 265 else: 266 fw_id, feature, type_name = namespace_parts 267 268 id_feature_map[fw_id] = feature 269 if type_name == 'f32': 270 float_set.add(feature) 271 except Exception as es: 272 pass 273 274 return float_set, id_feature_map 275 276 277def read_column_names(mapping_file: str) -> list[str]: 278 """Read the col. header""" 279 280 with open(mapping_file, encoding='utf-8') as mf: 281 columns = mf.read().strip().split('\t') 282 return columns 283 284 285def parse_ob_vw_feature_information(data_path) -> DatasetInformationStorage: 286 """A generic parser of ob-based data""" 287 288 # Get column names 289 column_descriptions = os.path.join(data_path, 'vw_namespace_map.csv') 290 column_types, fw_map = parse_namespace(column_descriptions) 291 292 # We establish column order here 293 column_names = ['label'] + list(fw_map.values()) 294 295 data_path = os.path.join(data_path, 'data.vw.gz') 296 col_delimiter = None 297 encoding = 'utf-8' 298 299 return DatasetInformationStorage( 300 data_path, column_names, column_types, col_delimiter, encoding, fw_map, 301 ) 302 303 304def parse_ob_raw_feature_information(data_path) -> DatasetInformationStorage: 305 """A generic parser of ob-based data""" 306 307 # Get column names 308 column_types: list[str] = [] 309 310 # Get set of numeric columns 311 table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv') 312 table_header = read_column_names(table_header_path) 313 314 data_path_train = os.path.join(data_path, 'raw_data/1_train/*') 315 col_delimiter = '\t' 316 encoding = 'utf-8' 317 318 final_df = [] 319 core_data_folders = glob.glob(data_path_train) 320 for actual_data in core_data_folders: 321 for dump in glob.glob(actual_data + '/*'): 322 tmp_df = pd.read_csv( 323 dump, sep='\t', low_memory=True, dtype='object', 324 ) 325 assert tmp_df.shape[1] == len(table_header) 326 tmp_df.columns = table_header 327 final_df.append(tmp_df) 328 329 final_df_concat = pd.concat(final_df, axis=0) 330 final_path = os.path.join(data_path, 'raw_dump.tsv') 331 logging.info( 332 f'Stored data dump of dimension {final_df_concat.shape} to {final_path}', 333 ) 334 final_df_concat.to_csv(final_path, sep='\t', index=False) 335 data_path = os.path.join(data_path, 'raw_dump.tsv') 336 337 return DatasetInformationStorage( 338 data_path, table_header, set(column_types), col_delimiter, encoding, None, 339 ) 340 341 342def parse_ob_feature_information(data_path) -> DatasetInformationStorage: 343 """A generic parser of ob-based data""" 344 345 # Get column names 346 column_names = os.path.join(data_path, 'vw_namespace_map.csv') 347 column_types, _ = parse_namespace(column_names) 348 349 # Get set of numeric columns 350 table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv') 351 table_header = read_column_names(table_header_path) 352 353 data_path = os.path.join(data_path, 'raw_data/1_train/*') 354 col_delimiter = '\t' 355 encoding = 'utf-8' 356 357 return DatasetInformationStorage( 358 data_path, table_header, column_types, col_delimiter, encoding, None, 359 ) 360 361 362def parse_csv_with_description_information(data_path) -> DatasetInformationStorage: 363 dataset_description = read_reference_json( 364 os.path.join(data_path, 'dataset_desc.json'), 365 ) 366 column_names = [] 367 column_types = set() 368 for feature in dataset_description.get('data_features', []): 369 feature_name = feature.get('name') 370 column_names.append(feature_name) 371 feature_type = feature.get('type', '') 372 if 'float' in feature_type or 'Float' in feature_type: 373 column_types.add(feature_name) 374 col_delimiter = ',' 375 data_path = os.path.join(data_path, 'data.csv') 376 encoding = 'latin1' 377 return DatasetInformationStorage( 378 data_path, column_names, column_types, col_delimiter, encoding, None, 379 ) 380 381 382def parse_csv_raw(data_path) -> DatasetInformationStorage: 383 column_types: set[str] = set() 384 385 data_path = os.path.join(data_path, 'data.csv') 386 with open(data_path) as inp_data: 387 header = inp_data.readline() 388 col_delimiter = ',' 389 column_names = header.strip().split(col_delimiter) 390 encoding = 'latin1' 391 return DatasetInformationStorage( 392 data_path, column_names, column_types, col_delimiter, encoding, None, 393 ) 394 395 396def extract_features_from_reference_JSON(json_path: str, combined_features_only = False, all_features = False) -> set[Any]: 397 """Given a model's JSON, extract unique features""" 398 399 with open(json_path) as jp: 400 content = json.load(jp) 401 402 unique_features = set() 403 feature_space = content['desc'].get('features', []) 404 if all_features: 405 return set(feature_space) 406 407 fields_space = content['desc'].get('fields', []) 408 joint_space = feature_space + fields_space 409 410 if combined_features_only: 411 return {feature for feature in feature_space if len(feature.split(','))>1} 412 413 for feature_tuple in joint_space: 414 for individual_feature in feature_tuple.split(','): 415 unique_features.add(individual_feature) 416 417 return unique_features 418 419 420def summarize_feature_bounds_for_transformers( 421 bounds_object_storage: Any, 422 feature_types: list[str], 423 task_name: str, 424 label_name: str, 425 granularity: int = 15, 426 output_summary_table_only: bool = False, 427): 428 """summarization auxilliary method for generating JSON-based specs""" 429 430 if bounds_object_storage is None: 431 logging.info('Bounds storage object is empty.') 432 exit() 433 434 final_storage = defaultdict(list) 435 for el in bounds_object_storage: 436 if isinstance(el, dict): 437 for k, v in el.items(): 438 final_storage[k].append(v) 439 440 summary_table_rows = [] 441 for k, v in final_storage.items(): 442 # Conduct local aggregation + bound changes 443 if k in feature_types and k != label_name: 444 minima, maxima, medians, uniques = [], [], [], [] 445 for feature_summary in v: 446 minima.append(feature_summary.minimum) 447 maxima.append(feature_summary.maximum) 448 medians.append(feature_summary.median) 449 uniques.append(feature_summary.num_unique) 450 summary_table_rows.append( 451 [ 452 k, 453 round(np.min(minima), 2), 454 round(np.max(maxima), 2), 455 round(np.median(medians), 2), 456 int(np.mean(uniques)), 457 ], 458 ) 459 460 if len(summary_table_rows) == 0: 461 logging.info('No numeric features to summarize.') 462 return None 463 464 summary_table: pd.Dataframe = pd.DataFrame(summary_table_rows) 465 summary_table.columns = [ 466 'Feature', 467 'Minimum', 468 'Maximum', 469 'Median', 470 'Num avg. unique (batch)', 471 ] 472 473 if output_summary_table_only: 474 return summary_table 475 476 if len(summary_table) == 0: 477 logging.info('Summary table empty, skipping transformer generation ..') 478 return 479 480 if task_name == 'feature_summary_transformers': 481 transformers_per_feature = defaultdict(list) 482 483 # Take care of weights first -> range is pre-defined 484 for k, v in final_storage.items(): 485 if label_name in k or 'dummy' in k: 486 continue 487 488 weight_template = { 489 'feature': k, 490 'src_features': [k], 491 'transformations': ['Weight'], 492 'weights': [0, 0.5, 1.5, 2, 3, 10], 493 } 494 transformers_per_feature[k].append(weight_template) 495 496 # Consider numeric transformations - pairs and single ones 497 for enx, row in summary_table.iterrows(): 498 if row.Feature == 'dummy': 499 continue 500 try: 501 actual_range = ( 502 np.arange( 503 row['Minimum'], 504 row['Maximum'], 505 (row['Maximum'] - row['Minimum']) / granularity, 506 ) 507 .round(2) 508 .tolist() 509 ) 510 binner_template = { 511 'feature': f'{row.Feature}', 512 'src_features': [row.Feature], 513 'transformations': [ 514 'BinnerSqrt', 515 'BinnerLog', 516 'BinnerSqrtPlain', 517 'BinnerLogPlain', 518 ], 519 'n': actual_range, 520 'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128], 521 } 522 523 except Exception as es: 524 logging.info( 525 f'\U0001F631 Encountered {es}. The problematic feature is: {row}, skipping transformer for this feature ..', 526 ) 527 528 transformers_per_feature[row.Feature].append(binner_template) 529 530 # We want the full loop here, due to asymmetry of transformation(s) 531 for enx_second, row_second in summary_table.iterrows(): 532 if enx_second < enx: 533 continue 534 535 # The n values are defined based on maxima of the second feature 536 if row_second.Feature != row.Feature: 537 n_bound = round(row_second['Median'] + row['Median'], 2) 538 max_bound = round( 539 min(row_second['Maximum'], row['Maximum']), 2, 540 ) 541 min_bound = round( 542 row_second['Minimum'] + row['Minimum'], 2, 543 ) 544 range_spectrum = sorted( 545 list( 546 { 547 0.0, 548 min_bound, 549 n_bound / 10, 550 n_bound / 5, 551 n_bound, 552 max_bound, 553 }, 554 ), 555 ) 556 557 range_spectrum = [x for x in range_spectrum if x >= 0] 558 binner_pair_template = { 559 'feature': f'{row.Feature}Ratio{row_second.Feature}', 560 'src_features': [row.Feature, row_second.Feature], 561 'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'], 562 'n': range_spectrum, 563 'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128], 564 } 565 566 binner_pair_template_second = { 567 'feature': f'{row_second.Feature}Ratio{row.Feature}', 568 'src_features': [row_second.Feature, row.Feature], 569 'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'], 570 'n': range_spectrum, 571 'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128], 572 } 573 574 transformers_per_feature[row.Feature].append( 575 binner_pair_template, 576 ) 577 transformers_per_feature[row.Feature].append( 578 binner_pair_template_second, 579 ) 580 581 binner_templates = [] 582 for k, v in transformers_per_feature.items(): 583 for transformer_struct in v: 584 binner_templates.append(transformer_struct) 585 586 logging.info( 587 f'Generated {len(binner_templates)} transformation search specifications.\n', 588 ) 589 namespace_full = f'"random_grid_feature_transform": {json.dumps(binner_templates)}, "random_grid_epochs": 512' 590 logging.info('Generated transformations below:\n') 591 print(namespace_full) 592 593 594def summarize_rare_counts( 595 term_counter: Any, 596 args: Any, 597 cardinality_object: Any, 598 object_info: DatasetInformationStorage, 599) -> None: 600 """Write rare values""" 601 602 out_df_rows = [] 603 logging.info( 604 f'Rare value summary (freq <= {args.rare_value_count_upper_bound}) follows ..', 605 ) 606 607 for namespace_tuple, count in term_counter.items(): 608 namespace, value = namespace_tuple 609 out_df_rows.append([namespace, value, count]) 610 out_df: pd.DataFrame = pd.DataFrame(out_df_rows) 611 out_df.columns = ['Namespace', 'value', 'Count'] 612 out_df.to_csv( 613 os.path.join(args.output_folder, 'rare_values.tsv'), sep='\t', index=False, 614 ) 615 logging.info(f'Wrote rare values to {args.output_folder}/rare_values.tsv') 616 617 overall_rare_counts = Counter(out_df.Namespace.values) 618 sorted_counts = sorted( 619 overall_rare_counts.items(), key=lambda pair: pair[1], reverse=True, 620 ) 621 for k, v in sorted_counts: 622 logging.info(f'Namespace: {k} ---- Rare values observed: {v}') 623 624 final_df_rows = [] 625 for k, v in sorted_counts: 626 cardinality = len(cardinality_object[k]) 627 rare_proportion = np.round(100 * (v / cardinality), 2) 628 col_type = 'nominal' 629 if k in object_info.column_types: 630 col_type = 'numeric' 631 final_df_rows.append( 632 { 633 'rare_proportion': rare_proportion, 634 'feature_type': col_type, 635 'feature_name': k, 636 }, 637 ) 638 639 final_df: pd.DataFrame = pd.DataFrame(final_df_rows) 640 final_df = final_df.sort_values(by=['rare_proportion']) 641 logging.info( 642 f'Wrote feature sparsity summary to {args.output_folder}/feature_sparsity_summary.tsv', 643 ) 644 final_df.to_csv( 645 f'{args.output_folder}/feature_sparsity_summary.tsv', index=False, sep='\t', 646 ) 647 648 649def is_prior_heuristic(args: Any) -> bool: 650 if '-prior' in args.heuristic and args.reference_model_JSON: 651 return True 652 return False 653 654 655def get_num_of_instances(fname: str) -> int: 656 """Count the number of lines in a file, fast - useful for progress logging""" 657 658 def _make_gen(reader): 659 while True: 660 b = reader(2**16) 661 if not b: 662 break 663 yield b 664 665 with open(fname, 'rb') as f: 666 count = sum(buf.count(b'\n') for buf in _make_gen(f.raw.read)) 667 return count
53def internal_hash(input_obj: str) -> str: 54 """A generic internal hash used throughout ranking procedure - let's hardcode seed here for sure""" 55 return xxhash.xxh32(input_obj, seed=20141025).hexdigest()
A generic internal hash used throughout ranking procedure - let's hardcode seed here for sure
58@dataclass 59class DatasetInformationStorage: 60 """A generic class for holding properties of a given type of dataset""" 61 62 data_path: str 63 column_names: list[str] 64 column_types: set[str] 65 col_delimiter: str | None 66 encoding: str 67 fw_map: dict[str, str] | None
A generic class for holding properties of a given type of dataset
70@dataclass 71class NumericFeatureSummary: 72 """A generic class storing numeric feature statistics""" 73 74 feature_name: str 75 minimum: float 76 maximum: float 77 median: float 78 num_unique: int
A generic class storing numeric feature statistics
81@dataclass 82class NominalFeatureSummary: 83 """A generic class storing numeric feature statistics""" 84 85 feature_name: str 86 num_unique: int
A generic class storing numeric feature statistics
89@dataclass 90class BatchRankingSummary: 91 """A generic class representing batched ranking results""" 92 93 triplet_scores: list[tuple[str, str, float]] 94 step_times: dict[str, Any]
A generic class representing batched ranking results
108def get_dataset_info(args: Any): 109 if args.data_source == 'ob-raw-dump': 110 dataset_info = parse_ob_raw_feature_information(args.data_path) 111 112 elif args.data_source == 'ob-vw': 113 dataset_info = parse_ob_vw_feature_information(args.data_path) 114 115 elif args.data_source == 'ob-csv': 116 dataset_info = parse_csv_with_description_information(args.data_path) 117 118 elif args.data_source == 'csv-raw': 119 dataset_info = parse_csv_raw(args.data_path) 120 else: 121 raise NotImplementedError( 122 'Plase, select a supported data source. Possible sources: {csv-raw, ob-vw, ob-csv}', 123 ) 124 125 return dataset_info
128def display_tool_name() -> None: 129 tool_name = """ 130 131 132 *///////////////. 133 //////////////////////* 134 */////////////////////////. 135 ////////////// *///////////// 136 /////////* ///////// 137 ////// ///// ////, ///// 138 //////// /// ///////// 139 ///// ///// .///// ////* 140 ,//// //// 141 *//// ////. 142 ///////*/////// 143 144 145 ░█████╗░██╗░░░██╗████████╗██████╗░░█████╗░███╗░░██╗██╗░░██╗ 146 ██╔══██╗██║░░░██║╚══██╔══╝██╔══██╗██╔══██╗████╗░██║██║░██╔╝ 147 ██║░░██║██║░░░██║░░░██║░░░██████╔╝███████║██╔██╗██║█████═╝░ 148 ██║░░██║██║░░░██║░░░██║░░░██╔══██╗██╔══██║██║╚████║██╔═██╗░ 149 ╚█████╔╝╚██████╔╝░░░██║░░░██║░░██║██║░░██║██║░╚███║██║░╚██╗ 150 ░╚════╝░░╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝╚═╝░░╚═╝╚═╝░░╚══╝╚═╝░░╚═╝ 151 152 153 """ 154 155 print(tool_name)
158def parse_ob_line( 159 line_string: str, delimiter: str = '\t', args: Any = None, 160) -> list[str]: 161 """Outbrain line parsing - generic TSVs""" 162 163 line_string = line_string.strip() 164 parts = line_string.split(delimiter) 165 return parts
Outbrain line parsing - generic TSVs
168def parse_ob_line_vw( 169 line_string: str, 170 delimiter: str, 171 args: Any = None, 172 fw_col_mapping = None, 173 table_header = None, 174 include_namespace_info = False, 175) -> list[str | None]: 176 """Parse a sparse vw line into a pandas df with pre-defined namespace""" 177 178 all_line_parts = line_string.strip().split('|') 179 label_part = all_line_parts[0].split(' ')[0] 180 remainder = all_line_parts[1:] 181 label = label_part 182 remainder_hash = dict() 183 184 # Hash multi-value tuples and store name-val mappings 185 for remaining_part in remainder: 186 core_parts = remaining_part.strip().split(' ') 187 namespace_part = core_parts[0] 188 other_parts = '-'.join(x for x in core_parts[1:] if x != '') 189 190 if namespace_part in fw_col_mapping: 191 remainder_hash[fw_col_mapping[namespace_part]] = other_parts 192 else: 193 logging.error(f"Didn't find namespace {namespace_part}") 194 195 # Construct the consistently-mapped instance based on the remainder mapping 196 the_real_instance = [ 197 remainder_hash.get( 198 el, None, 199 ) for el in table_header[1:] 200 ] 201 if not include_namespace_info: 202 the_real_instance = [ 203 x[2:] if not x is None else None for x in the_real_instance 204 ] 205 206 parts = [label] + the_real_instance 207 208 return parts
Parse a sparse vw line into a pandas df with pre-defined namespace
211def parse_ob_csv_line( 212 line_string: str, delimiter: str = ',', args: Any = None, 213) -> list[str]: 214 """Data can have commas within JSON field dumps""" 215 216 clx = list(csv.reader([line_string])).pop() 217 return clx
Data can have commas within JSON field dumps
220def generic_line_parser( 221 line_string: str, 222 delimiter: str, 223 args: Any = None, 224 fw_col_mapping: Any = None, 225 table_header: Any = None, 226) -> list[Any]: 227 """A generic method aimed to parse data from different sources.""" 228 229 if args.data_source == 'ob-raw-dump': 230 return parse_ob_line(line_string, delimiter, args) 231 232 elif args.data_source == 'ob-vw': 233 return parse_ob_line_vw( 234 line_string, delimiter, args, fw_col_mapping, table_header, 235 ) 236 237 elif args.data_source == 'ob-csv' or args.data_source == 'csv-raw': 238 return parse_ob_csv_line(line_string, delimiter, args) 239 240 else: 241 raise NotImplementedError( 242 'Please, specify a valid --data_source argument!', 243 )
A generic method aimed to parse data from different sources.
246def read_reference_json(json_path) -> dict[str, dict]: 247 """A helper method for reading a JSON""" 248 with open(json_path) as jp: 249 return json.load(jp)
A helper method for reading a JSON
252def parse_namespace(namespace_path: str) -> tuple[set[str], dict[str, str]]: 253 """Parse the feature namespace for type awareness""" 254 255 float_set = set() 256 id_feature_map = {} 257 258 with open(namespace_path) as nm: 259 for line in nm: 260 try: 261 namespace_parts = line.strip().split(',') 262 if len(namespace_parts) == 2 and '_' not in namespace_parts[0]: 263 fw_id, feature = namespace_parts 264 type_name = 'generic' 265 266 else: 267 fw_id, feature, type_name = namespace_parts 268 269 id_feature_map[fw_id] = feature 270 if type_name == 'f32': 271 float_set.add(feature) 272 except Exception as es: 273 pass 274 275 return float_set, id_feature_map
Parse the feature namespace for type awareness
278def read_column_names(mapping_file: str) -> list[str]: 279 """Read the col. header""" 280 281 with open(mapping_file, encoding='utf-8') as mf: 282 columns = mf.read().strip().split('\t') 283 return columns
Read the col. header
286def parse_ob_vw_feature_information(data_path) -> DatasetInformationStorage: 287 """A generic parser of ob-based data""" 288 289 # Get column names 290 column_descriptions = os.path.join(data_path, 'vw_namespace_map.csv') 291 column_types, fw_map = parse_namespace(column_descriptions) 292 293 # We establish column order here 294 column_names = ['label'] + list(fw_map.values()) 295 296 data_path = os.path.join(data_path, 'data.vw.gz') 297 col_delimiter = None 298 encoding = 'utf-8' 299 300 return DatasetInformationStorage( 301 data_path, column_names, column_types, col_delimiter, encoding, fw_map, 302 )
A generic parser of ob-based data
305def parse_ob_raw_feature_information(data_path) -> DatasetInformationStorage: 306 """A generic parser of ob-based data""" 307 308 # Get column names 309 column_types: list[str] = [] 310 311 # Get set of numeric columns 312 table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv') 313 table_header = read_column_names(table_header_path) 314 315 data_path_train = os.path.join(data_path, 'raw_data/1_train/*') 316 col_delimiter = '\t' 317 encoding = 'utf-8' 318 319 final_df = [] 320 core_data_folders = glob.glob(data_path_train) 321 for actual_data in core_data_folders: 322 for dump in glob.glob(actual_data + '/*'): 323 tmp_df = pd.read_csv( 324 dump, sep='\t', low_memory=True, dtype='object', 325 ) 326 assert tmp_df.shape[1] == len(table_header) 327 tmp_df.columns = table_header 328 final_df.append(tmp_df) 329 330 final_df_concat = pd.concat(final_df, axis=0) 331 final_path = os.path.join(data_path, 'raw_dump.tsv') 332 logging.info( 333 f'Stored data dump of dimension {final_df_concat.shape} to {final_path}', 334 ) 335 final_df_concat.to_csv(final_path, sep='\t', index=False) 336 data_path = os.path.join(data_path, 'raw_dump.tsv') 337 338 return DatasetInformationStorage( 339 data_path, table_header, set(column_types), col_delimiter, encoding, None, 340 )
A generic parser of ob-based data
343def parse_ob_feature_information(data_path) -> DatasetInformationStorage: 344 """A generic parser of ob-based data""" 345 346 # Get column names 347 column_names = os.path.join(data_path, 'vw_namespace_map.csv') 348 column_types, _ = parse_namespace(column_names) 349 350 # Get set of numeric columns 351 table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv') 352 table_header = read_column_names(table_header_path) 353 354 data_path = os.path.join(data_path, 'raw_data/1_train/*') 355 col_delimiter = '\t' 356 encoding = 'utf-8' 357 358 return DatasetInformationStorage( 359 data_path, table_header, column_types, col_delimiter, encoding, None, 360 )
A generic parser of ob-based data
363def parse_csv_with_description_information(data_path) -> DatasetInformationStorage: 364 dataset_description = read_reference_json( 365 os.path.join(data_path, 'dataset_desc.json'), 366 ) 367 column_names = [] 368 column_types = set() 369 for feature in dataset_description.get('data_features', []): 370 feature_name = feature.get('name') 371 column_names.append(feature_name) 372 feature_type = feature.get('type', '') 373 if 'float' in feature_type or 'Float' in feature_type: 374 column_types.add(feature_name) 375 col_delimiter = ',' 376 data_path = os.path.join(data_path, 'data.csv') 377 encoding = 'latin1' 378 return DatasetInformationStorage( 379 data_path, column_names, column_types, col_delimiter, encoding, None, 380 )
383def parse_csv_raw(data_path) -> DatasetInformationStorage: 384 column_types: set[str] = set() 385 386 data_path = os.path.join(data_path, 'data.csv') 387 with open(data_path) as inp_data: 388 header = inp_data.readline() 389 col_delimiter = ',' 390 column_names = header.strip().split(col_delimiter) 391 encoding = 'latin1' 392 return DatasetInformationStorage( 393 data_path, column_names, column_types, col_delimiter, encoding, None, 394 )
397def extract_features_from_reference_JSON(json_path: str, combined_features_only = False, all_features = False) -> set[Any]: 398 """Given a model's JSON, extract unique features""" 399 400 with open(json_path) as jp: 401 content = json.load(jp) 402 403 unique_features = set() 404 feature_space = content['desc'].get('features', []) 405 if all_features: 406 return set(feature_space) 407 408 fields_space = content['desc'].get('fields', []) 409 joint_space = feature_space + fields_space 410 411 if combined_features_only: 412 return {feature for feature in feature_space if len(feature.split(','))>1} 413 414 for feature_tuple in joint_space: 415 for individual_feature in feature_tuple.split(','): 416 unique_features.add(individual_feature) 417 418 return unique_features
Given a model's JSON, extract unique features
421def summarize_feature_bounds_for_transformers( 422 bounds_object_storage: Any, 423 feature_types: list[str], 424 task_name: str, 425 label_name: str, 426 granularity: int = 15, 427 output_summary_table_only: bool = False, 428): 429 """summarization auxilliary method for generating JSON-based specs""" 430 431 if bounds_object_storage is None: 432 logging.info('Bounds storage object is empty.') 433 exit() 434 435 final_storage = defaultdict(list) 436 for el in bounds_object_storage: 437 if isinstance(el, dict): 438 for k, v in el.items(): 439 final_storage[k].append(v) 440 441 summary_table_rows = [] 442 for k, v in final_storage.items(): 443 # Conduct local aggregation + bound changes 444 if k in feature_types and k != label_name: 445 minima, maxima, medians, uniques = [], [], [], [] 446 for feature_summary in v: 447 minima.append(feature_summary.minimum) 448 maxima.append(feature_summary.maximum) 449 medians.append(feature_summary.median) 450 uniques.append(feature_summary.num_unique) 451 summary_table_rows.append( 452 [ 453 k, 454 round(np.min(minima), 2), 455 round(np.max(maxima), 2), 456 round(np.median(medians), 2), 457 int(np.mean(uniques)), 458 ], 459 ) 460 461 if len(summary_table_rows) == 0: 462 logging.info('No numeric features to summarize.') 463 return None 464 465 summary_table: pd.Dataframe = pd.DataFrame(summary_table_rows) 466 summary_table.columns = [ 467 'Feature', 468 'Minimum', 469 'Maximum', 470 'Median', 471 'Num avg. unique (batch)', 472 ] 473 474 if output_summary_table_only: 475 return summary_table 476 477 if len(summary_table) == 0: 478 logging.info('Summary table empty, skipping transformer generation ..') 479 return 480 481 if task_name == 'feature_summary_transformers': 482 transformers_per_feature = defaultdict(list) 483 484 # Take care of weights first -> range is pre-defined 485 for k, v in final_storage.items(): 486 if label_name in k or 'dummy' in k: 487 continue 488 489 weight_template = { 490 'feature': k, 491 'src_features': [k], 492 'transformations': ['Weight'], 493 'weights': [0, 0.5, 1.5, 2, 3, 10], 494 } 495 transformers_per_feature[k].append(weight_template) 496 497 # Consider numeric transformations - pairs and single ones 498 for enx, row in summary_table.iterrows(): 499 if row.Feature == 'dummy': 500 continue 501 try: 502 actual_range = ( 503 np.arange( 504 row['Minimum'], 505 row['Maximum'], 506 (row['Maximum'] - row['Minimum']) / granularity, 507 ) 508 .round(2) 509 .tolist() 510 ) 511 binner_template = { 512 'feature': f'{row.Feature}', 513 'src_features': [row.Feature], 514 'transformations': [ 515 'BinnerSqrt', 516 'BinnerLog', 517 'BinnerSqrtPlain', 518 'BinnerLogPlain', 519 ], 520 'n': actual_range, 521 'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128], 522 } 523 524 except Exception as es: 525 logging.info( 526 f'\U0001F631 Encountered {es}. The problematic feature is: {row}, skipping transformer for this feature ..', 527 ) 528 529 transformers_per_feature[row.Feature].append(binner_template) 530 531 # We want the full loop here, due to asymmetry of transformation(s) 532 for enx_second, row_second in summary_table.iterrows(): 533 if enx_second < enx: 534 continue 535 536 # The n values are defined based on maxima of the second feature 537 if row_second.Feature != row.Feature: 538 n_bound = round(row_second['Median'] + row['Median'], 2) 539 max_bound = round( 540 min(row_second['Maximum'], row['Maximum']), 2, 541 ) 542 min_bound = round( 543 row_second['Minimum'] + row['Minimum'], 2, 544 ) 545 range_spectrum = sorted( 546 list( 547 { 548 0.0, 549 min_bound, 550 n_bound / 10, 551 n_bound / 5, 552 n_bound, 553 max_bound, 554 }, 555 ), 556 ) 557 558 range_spectrum = [x for x in range_spectrum if x >= 0] 559 binner_pair_template = { 560 'feature': f'{row.Feature}Ratio{row_second.Feature}', 561 'src_features': [row.Feature, row_second.Feature], 562 'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'], 563 'n': range_spectrum, 564 'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128], 565 } 566 567 binner_pair_template_second = { 568 'feature': f'{row_second.Feature}Ratio{row.Feature}', 569 'src_features': [row_second.Feature, row.Feature], 570 'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'], 571 'n': range_spectrum, 572 'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128], 573 } 574 575 transformers_per_feature[row.Feature].append( 576 binner_pair_template, 577 ) 578 transformers_per_feature[row.Feature].append( 579 binner_pair_template_second, 580 ) 581 582 binner_templates = [] 583 for k, v in transformers_per_feature.items(): 584 for transformer_struct in v: 585 binner_templates.append(transformer_struct) 586 587 logging.info( 588 f'Generated {len(binner_templates)} transformation search specifications.\n', 589 ) 590 namespace_full = f'"random_grid_feature_transform": {json.dumps(binner_templates)}, "random_grid_epochs": 512' 591 logging.info('Generated transformations below:\n') 592 print(namespace_full)
summarization auxilliary method for generating JSON-based specs
595def summarize_rare_counts( 596 term_counter: Any, 597 args: Any, 598 cardinality_object: Any, 599 object_info: DatasetInformationStorage, 600) -> None: 601 """Write rare values""" 602 603 out_df_rows = [] 604 logging.info( 605 f'Rare value summary (freq <= {args.rare_value_count_upper_bound}) follows ..', 606 ) 607 608 for namespace_tuple, count in term_counter.items(): 609 namespace, value = namespace_tuple 610 out_df_rows.append([namespace, value, count]) 611 out_df: pd.DataFrame = pd.DataFrame(out_df_rows) 612 out_df.columns = ['Namespace', 'value', 'Count'] 613 out_df.to_csv( 614 os.path.join(args.output_folder, 'rare_values.tsv'), sep='\t', index=False, 615 ) 616 logging.info(f'Wrote rare values to {args.output_folder}/rare_values.tsv') 617 618 overall_rare_counts = Counter(out_df.Namespace.values) 619 sorted_counts = sorted( 620 overall_rare_counts.items(), key=lambda pair: pair[1], reverse=True, 621 ) 622 for k, v in sorted_counts: 623 logging.info(f'Namespace: {k} ---- Rare values observed: {v}') 624 625 final_df_rows = [] 626 for k, v in sorted_counts: 627 cardinality = len(cardinality_object[k]) 628 rare_proportion = np.round(100 * (v / cardinality), 2) 629 col_type = 'nominal' 630 if k in object_info.column_types: 631 col_type = 'numeric' 632 final_df_rows.append( 633 { 634 'rare_proportion': rare_proportion, 635 'feature_type': col_type, 636 'feature_name': k, 637 }, 638 ) 639 640 final_df: pd.DataFrame = pd.DataFrame(final_df_rows) 641 final_df = final_df.sort_values(by=['rare_proportion']) 642 logging.info( 643 f'Wrote feature sparsity summary to {args.output_folder}/feature_sparsity_summary.tsv', 644 ) 645 final_df.to_csv( 646 f'{args.output_folder}/feature_sparsity_summary.tsv', index=False, sep='\t', 647 )
Write rare values
656def get_num_of_instances(fname: str) -> int: 657 """Count the number of lines in a file, fast - useful for progress logging""" 658 659 def _make_gen(reader): 660 while True: 661 b = reader(2**16) 662 if not b: 663 break 664 yield b 665 666 with open(fname, 'rb') as f: 667 count = sum(buf.count(b'\n') for buf in _make_gen(f.raw.read)) 668 return count
Count the number of lines in a file, fast - useful for progress logging