outrank.core_utils

  1from __future__ import annotations
  2
  3import csv
  4import glob
  5import json
  6import logging
  7import os
  8from collections import Counter
  9from collections import defaultdict
 10from dataclasses import dataclass
 11from typing import Any
 12from typing import Dict
 13from typing import List
 14from typing import Optional
 15from typing import Set
 16from typing import Tuple
 17from typing import Union
 18
 19import numpy as np
 20import pandas as pd
 21import xxhash
 22
 23logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
 24
 25pro_tips = [
 26    'OutRank can construct subfeatures; features based on subspaces. Example command argument is: --subfeature_mapping "feature_a->feature_b;feature_c<->feature_d;feature_c<->feature_e"',
 27    'Heuristic MI-numba-randomized seems like the best of both worlds! (speed + performance).',
 28    'Heuristic surrogate-lr performs cross-validation (internally), keep that in mind!',
 29    'Consider running OutRank on a smaller data sample first, might be enough (--subsampling = a lot).',
 30    'There are two types of combinations supported; unsupervised pairwise ranking (redundancies- --target_ranking_only=False), and supervised combinations - (--interaction_order > 1)',
 31    'Visualization part also includes clustering - this might be very insightful!',
 32    'By default OutRank includes feature cardinality and coverage in feature names (card; cov)',
 33    'Intermediary checkpoints (tmp_checkpoint.tsv) might already give you insights during longer runs.',
 34    'In theory, you can rank redundancies of combined features (--interaction_order AND --target_ranking_only=False).',
 35    'Give it as many threads as physically possible (--num_threads).',
 36    'You can speed up ranking by diminishing feature buffer size (--combination_number_upper_bound determines how many ranking computations per batch will be considered). This, and --subsampling are very powerful together.',
 37    'Want to rank feature transformations, but not sure which ones to choose? --transformers=default should serve as a solid baseline (common DS transformations included).',
 38    'Your target can be any feature! (explaining one feature with others)',
 39    'OutRank uses HyperLogLog for cardinality estimation - this is also a potential usecase (understanding cardinalities across different data sets).',
 40    'Each feature is named as featureName(cardinality, coverage in percents) in the final files.',
 41    'You can generate candidate feature transformation ranges (fw) by using --task=feature_summary_transformers.',
 42]
 43
 44
 45def write_json_dump_to_file(args: Any, config_name: str) -> None:
 46
 47    out_content = json.dumps(args.__dict__)
 48    with open(config_name, 'w') as out_config:
 49        out_config.write(out_content)
 50
 51
 52def internal_hash(input_obj: str) -> str:
 53    """A generic internal hash used throughout ranking procedure - let's hardcode seed here for sure"""
 54    return xxhash.xxh32(input_obj, seed=20141025).hexdigest()
 55
 56
 57@dataclass
 58class DatasetInformationStorage:
 59    """A generic class for holding properties of a given type of dataset"""
 60
 61    data_path: str
 62    column_names: list[str]
 63    column_types: set[str]
 64    col_delimiter: str | None
 65    encoding: str
 66    fw_map: dict[str, str] | None
 67
 68
 69@dataclass
 70class NumericFeatureSummary:
 71    """A generic class storing numeric feature statistics"""
 72
 73    feature_name: str
 74    minimum: float
 75    maximum: float
 76    median: float
 77    num_unique: int
 78
 79
 80@dataclass
 81class NominalFeatureSummary:
 82    """A generic class storing numeric feature statistics"""
 83
 84    feature_name: str
 85    num_unique: int
 86
 87
 88@dataclass
 89class BatchRankingSummary:
 90    """A generic class representing batched ranking results"""
 91
 92    triplet_scores: list[tuple[str, str, float]]
 93    step_times: dict[str, Any]
 94
 95
 96def display_random_tip() -> None:
 97    TIP_CONTENT = np.random.choice(pro_tips)
 98    tip_core = f"""
 99=====>
100Random tip: {TIP_CONTENT}
101=====>
102    """
103
104    print(tip_core)
105
106
107def get_dataset_info(args: Any):
108    if args.data_source == 'ob-raw-dump':
109        dataset_info = parse_ob_raw_feature_information(args.data_path)
110
111    elif args.data_source == 'ob-vw':
112        dataset_info = parse_ob_vw_feature_information(args.data_path)
113
114    elif args.data_source == 'ob-csv':
115        dataset_info = parse_csv_with_description_information(args.data_path)
116
117    elif args.data_source == 'csv-raw':
118        dataset_info = parse_csv_raw(args.data_path)
119    else:
120        raise NotImplementedError(
121            'Plase, select a supported data source. Possible sources: {csv-raw, ob-vw, ob-csv}',
122        )
123
124    return dataset_info
125
126
127def display_tool_name() -> None:
128    tool_name = """
129
130
131                        *///////////////.
132                     //////////////////////*
133                   */////////////////////////.
134                  ////////////// */////////////
135                  /////////*          /////////
136                 //////   /////   ////,   /////
137                  ////////     ///    /////////
138                  /////   /////  ./////   ////*
139                   ,////                 ////
140                     *////             ////.
141                         ///////*///////
142
143
144    ░█████╗░██╗░░░██╗████████╗██████╗░░█████╗░███╗░░██╗██╗░░██╗
145    ██╔══██╗██║░░░██║╚══██╔══╝██╔══██╗██╔══██╗████╗░██║██║░██╔╝
146    ██║░░██║██║░░░██║░░░██║░░░██████╔╝███████║██╔██╗██║█████═╝░
147    ██║░░██║██║░░░██║░░░██║░░░██╔══██╗██╔══██║██║╚████║██╔═██╗░
148    ╚█████╔╝╚██████╔╝░░░██║░░░██║░░██║██║░░██║██║░╚███║██║░╚██╗
149    ░╚════╝░░╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝╚═╝░░╚═╝╚═╝░░╚══╝╚═╝░░╚═╝
150
151
152    """
153
154    print(tool_name)
155
156
157def parse_ob_line(
158    line_string: str, delimiter: str = '\t', args: Any = None,
159) -> list[str]:
160    """Outbrain line parsing - generic TSVs"""
161
162    line_string = line_string.strip()
163    parts = line_string.split(delimiter)
164    return parts
165
166
167def parse_ob_line_vw(
168    line_string: str,
169    delimiter: str,
170    args: Any = None,
171    fw_col_mapping  = None,
172    table_header  = None,
173    include_namespace_info = False,
174) -> list[str | None]:
175    """Parse a sparse vw line into a pandas df with pre-defined namespace"""
176
177    all_line_parts = line_string.strip().split('|')
178    label_part = all_line_parts[0].split(' ')[0]
179    remainder = all_line_parts[1:]
180    label = label_part
181    remainder_hash = dict()
182
183    # Hash multi-value tuples and store name-val mappings
184    for remaining_part in remainder:
185        core_parts = remaining_part.strip().split(' ')
186        namespace_part = core_parts[0]
187        other_parts = '-'.join(x for x in core_parts[1:] if x != '')
188
189        if namespace_part in fw_col_mapping:
190            remainder_hash[fw_col_mapping[namespace_part]] = other_parts
191        else:
192            logging.error(f"Didn't find namespace {namespace_part}")
193
194    # Construct the consistently-mapped instance based on the remainder mapping
195    the_real_instance = [
196        remainder_hash.get(
197            el, None,
198        ) for el in table_header[1:]
199    ]
200    if not include_namespace_info:
201        the_real_instance = [
202            x[2:] if not x is None else None for x in the_real_instance
203        ]
204
205    parts = [label] + the_real_instance
206
207    return parts
208
209
210def parse_ob_csv_line(
211    line_string: str, delimiter: str = ',', args: Any = None,
212) -> list[str]:
213    """Data can have commas within JSON field dumps"""
214
215    clx = list(csv.reader([line_string])).pop()
216    return clx
217
218
219def generic_line_parser(
220    line_string: str,
221    delimiter: str,
222    args: Any = None,
223    fw_col_mapping: Any = None,
224    table_header: Any = None,
225) -> list[Any]:
226    """A generic method aimed to parse data from different sources."""
227
228    if args.data_source == 'ob-raw-dump':
229        return parse_ob_line(line_string, delimiter, args)
230
231    elif args.data_source == 'ob-vw':
232        return parse_ob_line_vw(
233            line_string, delimiter, args, fw_col_mapping, table_header,
234        )
235
236    elif args.data_source == 'ob-csv' or args.data_source == 'csv-raw':
237        return parse_ob_csv_line(line_string, delimiter, args)
238
239    else:
240        raise NotImplementedError(
241            'Please, specify a valid --data_source argument!',
242        )
243
244
245def read_reference_json(json_path) -> dict[str, dict]:
246    """A helper method for reading a JSON"""
247    with open(json_path) as jp:
248        return json.load(jp)
249
250
251def parse_namespace(namespace_path: str) -> tuple[set[str], dict[str, str]]:
252    """Parse the feature namespace for type awareness"""
253
254    float_set = set()
255    id_feature_map = {}
256
257    with open(namespace_path) as nm:
258        for line in nm:
259            try:
260                namespace_parts = line.strip().split(',')
261                if len(namespace_parts) == 2 and '_' not in namespace_parts[0]:
262                    fw_id, feature = namespace_parts
263                    type_name = 'generic'
264
265                else:
266                    fw_id, feature, type_name = namespace_parts
267
268                id_feature_map[fw_id] = feature
269                if type_name == 'f32':
270                    float_set.add(feature)
271            except Exception as es:
272                pass
273
274    return float_set, id_feature_map
275
276
277def read_column_names(mapping_file: str) -> list[str]:
278    """Read the col. header"""
279
280    with open(mapping_file, encoding='utf-8') as mf:
281        columns = mf.read().strip().split('\t')
282    return columns
283
284
285def parse_ob_vw_feature_information(data_path) -> DatasetInformationStorage:
286    """A generic parser of ob-based data"""
287
288    # Get column names
289    column_descriptions = os.path.join(data_path, 'vw_namespace_map.csv')
290    column_types, fw_map = parse_namespace(column_descriptions)
291
292    # We establish column order here
293    column_names = ['label'] + list(fw_map.values())
294
295    data_path = os.path.join(data_path, 'data.vw.gz')
296    col_delimiter = None
297    encoding = 'utf-8'
298
299    return DatasetInformationStorage(
300        data_path, column_names, column_types, col_delimiter, encoding, fw_map,
301    )
302
303
304def parse_ob_raw_feature_information(data_path) -> DatasetInformationStorage:
305    """A generic parser of ob-based data"""
306
307    # Get column names
308    column_types: list[str] = []
309
310    # Get set of numeric columns
311    table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv')
312    table_header = read_column_names(table_header_path)
313
314    data_path_train = os.path.join(data_path, 'raw_data/1_train/*')
315    col_delimiter = '\t'
316    encoding = 'utf-8'
317
318    final_df = []
319    core_data_folders = glob.glob(data_path_train)
320    for actual_data in core_data_folders:
321        for dump in glob.glob(actual_data + '/*'):
322            tmp_df = pd.read_csv(
323                dump, sep='\t', low_memory=True, dtype='object',
324            )
325            assert tmp_df.shape[1] == len(table_header)
326            tmp_df.columns = table_header
327            final_df.append(tmp_df)
328
329    final_df_concat = pd.concat(final_df, axis=0)
330    final_path = os.path.join(data_path, 'raw_dump.tsv')
331    logging.info(
332        f'Stored data dump of dimension {final_df_concat.shape} to {final_path}',
333    )
334    final_df_concat.to_csv(final_path, sep='\t', index=False)
335    data_path = os.path.join(data_path, 'raw_dump.tsv')
336
337    return DatasetInformationStorage(
338        data_path, table_header, set(column_types), col_delimiter, encoding, None,
339    )
340
341
342def parse_ob_feature_information(data_path) -> DatasetInformationStorage:
343    """A generic parser of ob-based data"""
344
345    # Get column names
346    column_names = os.path.join(data_path, 'vw_namespace_map.csv')
347    column_types, _ = parse_namespace(column_names)
348
349    # Get set of numeric columns
350    table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv')
351    table_header = read_column_names(table_header_path)
352
353    data_path = os.path.join(data_path, 'raw_data/1_train/*')
354    col_delimiter = '\t'
355    encoding = 'utf-8'
356
357    return DatasetInformationStorage(
358        data_path, table_header, column_types, col_delimiter, encoding, None,
359    )
360
361
362def parse_csv_with_description_information(data_path) -> DatasetInformationStorage:
363    dataset_description = read_reference_json(
364        os.path.join(data_path, 'dataset_desc.json'),
365    )
366    column_names = []
367    column_types = set()
368    for feature in dataset_description.get('data_features', []):
369        feature_name = feature.get('name')
370        column_names.append(feature_name)
371        feature_type = feature.get('type', '')
372        if 'float' in feature_type or 'Float' in feature_type:
373            column_types.add(feature_name)
374    col_delimiter = ','
375    data_path = os.path.join(data_path, 'data.csv')
376    encoding = 'latin1'
377    return DatasetInformationStorage(
378        data_path, column_names, column_types, col_delimiter, encoding, None,
379    )
380
381
382def parse_csv_raw(data_path) -> DatasetInformationStorage:
383    column_types: set[str] = set()
384
385    data_path = os.path.join(data_path, 'data.csv')
386    with open(data_path) as inp_data:
387        header = inp_data.readline()
388    col_delimiter = ','
389    column_names = header.strip().split(col_delimiter)
390    encoding = 'latin1'
391    return DatasetInformationStorage(
392        data_path, column_names, column_types, col_delimiter, encoding, None,
393    )
394
395
396def extract_features_from_reference_JSON(json_path: str, combined_features_only = False, all_features = False) -> set[Any]:
397    """Given a model's JSON, extract unique features"""
398
399    with open(json_path) as jp:
400        content = json.load(jp)
401
402    unique_features = set()
403    feature_space = content['desc'].get('features', [])
404    if all_features:
405        return set(feature_space)
406
407    fields_space = content['desc'].get('fields', [])
408    joint_space = feature_space + fields_space
409
410    if combined_features_only:
411        return {feature for feature in feature_space if len(feature.split(','))>1}
412
413    for feature_tuple in joint_space:
414        for individual_feature in feature_tuple.split(','):
415            unique_features.add(individual_feature)
416
417    return unique_features
418
419
420def summarize_feature_bounds_for_transformers(
421    bounds_object_storage: Any,
422    feature_types: list[str],
423    task_name: str,
424    label_name: str,
425    granularity: int = 15,
426    output_summary_table_only: bool = False,
427):
428    """summarization auxilliary method for generating JSON-based specs"""
429
430    if bounds_object_storage is None:
431        logging.info('Bounds storage object is empty.')
432        exit()
433
434    final_storage = defaultdict(list)
435    for el in bounds_object_storage:
436        if isinstance(el, dict):
437            for k, v in el.items():
438                final_storage[k].append(v)
439
440    summary_table_rows = []
441    for k, v in final_storage.items():
442        # Conduct local aggregation + bound changes
443        if k in feature_types and k != label_name:
444            minima, maxima, medians, uniques = [], [], [], []
445            for feature_summary in v:
446                minima.append(feature_summary.minimum)
447                maxima.append(feature_summary.maximum)
448                medians.append(feature_summary.median)
449                uniques.append(feature_summary.num_unique)
450            summary_table_rows.append(
451                [
452                    k,
453                    round(np.min(minima), 2),
454                    round(np.max(maxima), 2),
455                    round(np.median(medians), 2),
456                    int(np.mean(uniques)),
457                ],
458            )
459
460    if len(summary_table_rows) == 0:
461        logging.info('No numeric features to summarize.')
462        return None
463
464    summary_table: pd.Dataframe = pd.DataFrame(summary_table_rows)
465    summary_table.columns = [
466        'Feature',
467        'Minimum',
468        'Maximum',
469        'Median',
470        'Num avg. unique (batch)',
471    ]
472
473    if output_summary_table_only:
474        return summary_table
475
476    if len(summary_table) == 0:
477        logging.info('Summary table empty, skipping transformer generation ..')
478        return
479
480    if task_name == 'feature_summary_transformers':
481        transformers_per_feature = defaultdict(list)
482
483        # Take care of weights first -> range is pre-defined
484        for k, v in final_storage.items():
485            if label_name in k or 'dummy' in k:
486                continue
487
488            weight_template = {
489                'feature': k,
490                'src_features': [k],
491                'transformations': ['Weight'],
492                'weights': [0, 0.5, 1.5, 2, 3, 10],
493            }
494            transformers_per_feature[k].append(weight_template)
495
496        # Consider numeric transformations - pairs and single ones
497        for enx, row in summary_table.iterrows():
498            if row.Feature == 'dummy':
499                continue
500            try:
501                actual_range = (
502                    np.arange(
503                        row['Minimum'],
504                        row['Maximum'],
505                        (row['Maximum'] - row['Minimum']) / granularity,
506                    )
507                    .round(2)
508                    .tolist()
509                )
510                binner_template = {
511                    'feature': f'{row.Feature}',
512                    'src_features': [row.Feature],
513                    'transformations': [
514                        'BinnerSqrt',
515                        'BinnerLog',
516                        'BinnerSqrtPlain',
517                        'BinnerLogPlain',
518                    ],
519                    'n': actual_range,
520                    'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128],
521                }
522
523            except Exception as es:
524                logging.info(
525                    f'\U0001F631 Encountered {es}. The problematic feature is: {row}, skipping transformer for this feature ..',
526                )
527
528            transformers_per_feature[row.Feature].append(binner_template)
529
530            # We want the full loop here, due to asymmetry of transformation(s)
531            for enx_second, row_second in summary_table.iterrows():
532                if enx_second < enx:
533                    continue
534
535                # The n values are defined based on maxima of the second feature
536                if row_second.Feature != row.Feature:
537                    n_bound = round(row_second['Median'] + row['Median'], 2)
538                    max_bound = round(
539                        min(row_second['Maximum'], row['Maximum']), 2,
540                    )
541                    min_bound = round(
542                        row_second['Minimum'] + row['Minimum'], 2,
543                    )
544                    range_spectrum = sorted(
545                        list(
546                            {
547                                0.0,
548                                min_bound,
549                                n_bound / 10,
550                                n_bound / 5,
551                                n_bound,
552                                max_bound,
553                            },
554                        ),
555                    )
556
557                    range_spectrum = [x for x in range_spectrum if x >= 0]
558                    binner_pair_template = {
559                        'feature': f'{row.Feature}Ratio{row_second.Feature}',
560                        'src_features': [row.Feature, row_second.Feature],
561                        'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'],
562                        'n': range_spectrum,
563                        'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128],
564                    }
565
566                    binner_pair_template_second = {
567                        'feature': f'{row_second.Feature}Ratio{row.Feature}',
568                        'src_features': [row_second.Feature, row.Feature],
569                        'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'],
570                        'n': range_spectrum,
571                        'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128],
572                    }
573
574                    transformers_per_feature[row.Feature].append(
575                        binner_pair_template,
576                    )
577                    transformers_per_feature[row.Feature].append(
578                        binner_pair_template_second,
579                    )
580
581        binner_templates = []
582        for k, v in transformers_per_feature.items():
583            for transformer_struct in v:
584                binner_templates.append(transformer_struct)
585
586        logging.info(
587            f'Generated {len(binner_templates)} transformation search specifications.\n',
588        )
589        namespace_full = f'"random_grid_feature_transform": {json.dumps(binner_templates)}, "random_grid_epochs": 512'
590        logging.info('Generated transformations below:\n')
591        print(namespace_full)
592
593
594def summarize_rare_counts(
595    term_counter: Any,
596    args: Any,
597    cardinality_object: Any,
598    object_info: DatasetInformationStorage,
599) -> None:
600    """Write rare values"""
601
602    out_df_rows = []
603    logging.info(
604        f'Rare value summary (freq <= {args.rare_value_count_upper_bound}) follows ..',
605    )
606
607    for namespace_tuple, count in term_counter.items():
608        namespace, value = namespace_tuple
609        out_df_rows.append([namespace, value, count])
610    out_df: pd.DataFrame = pd.DataFrame(out_df_rows)
611    out_df.columns = ['Namespace', 'value', 'Count']
612    out_df.to_csv(
613        os.path.join(args.output_folder, 'rare_values.tsv'), sep='\t', index=False,
614    )
615    logging.info(f'Wrote rare values to {args.output_folder}/rare_values.tsv')
616
617    overall_rare_counts = Counter(out_df.Namespace.values)
618    sorted_counts = sorted(
619        overall_rare_counts.items(), key=lambda pair: pair[1], reverse=True,
620    )
621    for k, v in sorted_counts:
622        logging.info(f'Namespace: {k} ---- Rare values observed: {v}')
623
624    final_df_rows = []
625    for k, v in sorted_counts:
626        cardinality = len(cardinality_object[k])
627        rare_proportion = np.round(100 * (v / cardinality), 2)
628        col_type = 'nominal'
629        if k in object_info.column_types:
630            col_type = 'numeric'
631        final_df_rows.append(
632            {
633                'rare_proportion': rare_proportion,
634                'feature_type': col_type,
635                'feature_name': k,
636            },
637        )
638
639    final_df: pd.DataFrame = pd.DataFrame(final_df_rows)
640    final_df = final_df.sort_values(by=['rare_proportion'])
641    logging.info(
642        f'Wrote feature sparsity summary to {args.output_folder}/feature_sparsity_summary.tsv',
643    )
644    final_df.to_csv(
645        f'{args.output_folder}/feature_sparsity_summary.tsv', index=False, sep='\t',
646    )
647
648
649def is_prior_heuristic(args: Any) -> bool:
650    if '-prior' in args.heuristic and args.reference_model_JSON:
651        return True
652    return False
653
654
655def get_num_of_instances(fname: str) -> int:
656    """Count the number of lines in a file, fast - useful for progress logging"""
657
658    def _make_gen(reader):
659        while True:
660            b = reader(2**16)
661            if not b:
662                break
663            yield b
664
665    with open(fname, 'rb') as f:
666        count = sum(buf.count(b'\n') for buf in _make_gen(f.raw.read))
667    return count
pro_tips = ['OutRank can construct subfeatures; features based on subspaces. Example command argument is: --subfeature_mapping "feature_a->feature_b;feature_c<->feature_d;feature_c<->feature_e"', 'Heuristic MI-numba-randomized seems like the best of both worlds! (speed + performance).', 'Heuristic surrogate-lr performs cross-validation (internally), keep that in mind!', 'Consider running OutRank on a smaller data sample first, might be enough (--subsampling = a lot).', 'There are two types of combinations supported; unsupervised pairwise ranking (redundancies- --target_ranking_only=False), and supervised combinations - (--interaction_order > 1)', 'Visualization part also includes clustering - this might be very insightful!', 'By default OutRank includes feature cardinality and coverage in feature names (card; cov)', 'Intermediary checkpoints (tmp_checkpoint.tsv) might already give you insights during longer runs.', 'In theory, you can rank redundancies of combined features (--interaction_order AND --target_ranking_only=False).', 'Give it as many threads as physically possible (--num_threads).', 'You can speed up ranking by diminishing feature buffer size (--combination_number_upper_bound determines how many ranking computations per batch will be considered). This, and --subsampling are very powerful together.', 'Want to rank feature transformations, but not sure which ones to choose? --transformers=default should serve as a solid baseline (common DS transformations included).', 'Your target can be any feature! (explaining one feature with others)', 'OutRank uses HyperLogLog for cardinality estimation - this is also a potential usecase (understanding cardinalities across different data sets).', 'Each feature is named as featureName(cardinality, coverage in percents) in the final files.', 'You can generate candidate feature transformation ranges (fw) by using --task=feature_summary_transformers.']
def write_json_dump_to_file(args: Any, config_name: str) -> None:
46def write_json_dump_to_file(args: Any, config_name: str) -> None:
47
48    out_content = json.dumps(args.__dict__)
49    with open(config_name, 'w') as out_config:
50        out_config.write(out_content)
def internal_hash(input_obj: str) -> str:
53def internal_hash(input_obj: str) -> str:
54    """A generic internal hash used throughout ranking procedure - let's hardcode seed here for sure"""
55    return xxhash.xxh32(input_obj, seed=20141025).hexdigest()

A generic internal hash used throughout ranking procedure - let's hardcode seed here for sure

@dataclass
class DatasetInformationStorage:
58@dataclass
59class DatasetInformationStorage:
60    """A generic class for holding properties of a given type of dataset"""
61
62    data_path: str
63    column_names: list[str]
64    column_types: set[str]
65    col_delimiter: str | None
66    encoding: str
67    fw_map: dict[str, str] | None

A generic class for holding properties of a given type of dataset

DatasetInformationStorage( data_path: str, column_names: list[str], column_types: set[str], col_delimiter: str | None, encoding: str, fw_map: dict[str, str] | None)
data_path: str
column_names: list[str]
column_types: set[str]
col_delimiter: str | None
encoding: str
fw_map: dict[str, str] | None
@dataclass
class NumericFeatureSummary:
70@dataclass
71class NumericFeatureSummary:
72    """A generic class storing numeric feature statistics"""
73
74    feature_name: str
75    minimum: float
76    maximum: float
77    median: float
78    num_unique: int

A generic class storing numeric feature statistics

NumericFeatureSummary( feature_name: str, minimum: float, maximum: float, median: float, num_unique: int)
feature_name: str
minimum: float
maximum: float
median: float
num_unique: int
@dataclass
class NominalFeatureSummary:
81@dataclass
82class NominalFeatureSummary:
83    """A generic class storing numeric feature statistics"""
84
85    feature_name: str
86    num_unique: int

A generic class storing numeric feature statistics

NominalFeatureSummary(feature_name: str, num_unique: int)
feature_name: str
num_unique: int
@dataclass
class BatchRankingSummary:
89@dataclass
90class BatchRankingSummary:
91    """A generic class representing batched ranking results"""
92
93    triplet_scores: list[tuple[str, str, float]]
94    step_times: dict[str, Any]

A generic class representing batched ranking results

BatchRankingSummary( triplet_scores: list[tuple[str, str, float]], step_times: dict[str, typing.Any])
triplet_scores: list[tuple[str, str, float]]
step_times: dict[str, typing.Any]
def display_random_tip() -> None:
 97def display_random_tip() -> None:
 98    TIP_CONTENT = np.random.choice(pro_tips)
 99    tip_core = f"""
100=====>
101Random tip: {TIP_CONTENT}
102=====>
103    """
104
105    print(tip_core)
def get_dataset_info(args: Any):
108def get_dataset_info(args: Any):
109    if args.data_source == 'ob-raw-dump':
110        dataset_info = parse_ob_raw_feature_information(args.data_path)
111
112    elif args.data_source == 'ob-vw':
113        dataset_info = parse_ob_vw_feature_information(args.data_path)
114
115    elif args.data_source == 'ob-csv':
116        dataset_info = parse_csv_with_description_information(args.data_path)
117
118    elif args.data_source == 'csv-raw':
119        dataset_info = parse_csv_raw(args.data_path)
120    else:
121        raise NotImplementedError(
122            'Plase, select a supported data source. Possible sources: {csv-raw, ob-vw, ob-csv}',
123        )
124
125    return dataset_info
def display_tool_name() -> None:
128def display_tool_name() -> None:
129    tool_name = """
130
131
132                        *///////////////.
133                     //////////////////////*
134                   */////////////////////////.
135                  ////////////// */////////////
136                  /////////*          /////////
137                 //////   /////   ////,   /////
138                  ////////     ///    /////////
139                  /////   /////  ./////   ////*
140                   ,////                 ////
141                     *////             ////.
142                         ///////*///////
143
144
145    ░█████╗░██╗░░░██╗████████╗██████╗░░█████╗░███╗░░██╗██╗░░██╗
146    ██╔══██╗██║░░░██║╚══██╔══╝██╔══██╗██╔══██╗████╗░██║██║░██╔╝
147    ██║░░██║██║░░░██║░░░██║░░░██████╔╝███████║██╔██╗██║█████═╝░
148    ██║░░██║██║░░░██║░░░██║░░░██╔══██╗██╔══██║██║╚████║██╔═██╗░
149    ╚█████╔╝╚██████╔╝░░░██║░░░██║░░██║██║░░██║██║░╚███║██║░╚██╗
150    ░╚════╝░░╚═════╝░░░░╚═╝░░░╚═╝░░╚═╝╚═╝░░╚═╝╚═╝░░╚══╝╚═╝░░╚═╝
151
152
153    """
154
155    print(tool_name)
def parse_ob_line(line_string: str, delimiter: str = '\t', args: Any = None) -> list[str]:
158def parse_ob_line(
159    line_string: str, delimiter: str = '\t', args: Any = None,
160) -> list[str]:
161    """Outbrain line parsing - generic TSVs"""
162
163    line_string = line_string.strip()
164    parts = line_string.split(delimiter)
165    return parts

Outbrain line parsing - generic TSVs

def parse_ob_line_vw( line_string: str, delimiter: str, args: Any = None, fw_col_mapping=None, table_header=None, include_namespace_info=False) -> list[str | None]:
168def parse_ob_line_vw(
169    line_string: str,
170    delimiter: str,
171    args: Any = None,
172    fw_col_mapping  = None,
173    table_header  = None,
174    include_namespace_info = False,
175) -> list[str | None]:
176    """Parse a sparse vw line into a pandas df with pre-defined namespace"""
177
178    all_line_parts = line_string.strip().split('|')
179    label_part = all_line_parts[0].split(' ')[0]
180    remainder = all_line_parts[1:]
181    label = label_part
182    remainder_hash = dict()
183
184    # Hash multi-value tuples and store name-val mappings
185    for remaining_part in remainder:
186        core_parts = remaining_part.strip().split(' ')
187        namespace_part = core_parts[0]
188        other_parts = '-'.join(x for x in core_parts[1:] if x != '')
189
190        if namespace_part in fw_col_mapping:
191            remainder_hash[fw_col_mapping[namespace_part]] = other_parts
192        else:
193            logging.error(f"Didn't find namespace {namespace_part}")
194
195    # Construct the consistently-mapped instance based on the remainder mapping
196    the_real_instance = [
197        remainder_hash.get(
198            el, None,
199        ) for el in table_header[1:]
200    ]
201    if not include_namespace_info:
202        the_real_instance = [
203            x[2:] if not x is None else None for x in the_real_instance
204        ]
205
206    parts = [label] + the_real_instance
207
208    return parts

Parse a sparse vw line into a pandas df with pre-defined namespace

def parse_ob_csv_line(line_string: str, delimiter: str = ',', args: Any = None) -> list[str]:
211def parse_ob_csv_line(
212    line_string: str, delimiter: str = ',', args: Any = None,
213) -> list[str]:
214    """Data can have commas within JSON field dumps"""
215
216    clx = list(csv.reader([line_string])).pop()
217    return clx

Data can have commas within JSON field dumps

def generic_line_parser( line_string: str, delimiter: str, args: Any = None, fw_col_mapping: Any = None, table_header: Any = None) -> list[typing.Any]:
220def generic_line_parser(
221    line_string: str,
222    delimiter: str,
223    args: Any = None,
224    fw_col_mapping: Any = None,
225    table_header: Any = None,
226) -> list[Any]:
227    """A generic method aimed to parse data from different sources."""
228
229    if args.data_source == 'ob-raw-dump':
230        return parse_ob_line(line_string, delimiter, args)
231
232    elif args.data_source == 'ob-vw':
233        return parse_ob_line_vw(
234            line_string, delimiter, args, fw_col_mapping, table_header,
235        )
236
237    elif args.data_source == 'ob-csv' or args.data_source == 'csv-raw':
238        return parse_ob_csv_line(line_string, delimiter, args)
239
240    else:
241        raise NotImplementedError(
242            'Please, specify a valid --data_source argument!',
243        )

A generic method aimed to parse data from different sources.

def read_reference_json(json_path) -> dict[str, dict]:
246def read_reference_json(json_path) -> dict[str, dict]:
247    """A helper method for reading a JSON"""
248    with open(json_path) as jp:
249        return json.load(jp)

A helper method for reading a JSON

def parse_namespace(namespace_path: str) -> tuple[set[str], dict[str, str]]:
252def parse_namespace(namespace_path: str) -> tuple[set[str], dict[str, str]]:
253    """Parse the feature namespace for type awareness"""
254
255    float_set = set()
256    id_feature_map = {}
257
258    with open(namespace_path) as nm:
259        for line in nm:
260            try:
261                namespace_parts = line.strip().split(',')
262                if len(namespace_parts) == 2 and '_' not in namespace_parts[0]:
263                    fw_id, feature = namespace_parts
264                    type_name = 'generic'
265
266                else:
267                    fw_id, feature, type_name = namespace_parts
268
269                id_feature_map[fw_id] = feature
270                if type_name == 'f32':
271                    float_set.add(feature)
272            except Exception as es:
273                pass
274
275    return float_set, id_feature_map

Parse the feature namespace for type awareness

def read_column_names(mapping_file: str) -> list[str]:
278def read_column_names(mapping_file: str) -> list[str]:
279    """Read the col. header"""
280
281    with open(mapping_file, encoding='utf-8') as mf:
282        columns = mf.read().strip().split('\t')
283    return columns

Read the col. header

def parse_ob_vw_feature_information(data_path) -> DatasetInformationStorage:
286def parse_ob_vw_feature_information(data_path) -> DatasetInformationStorage:
287    """A generic parser of ob-based data"""
288
289    # Get column names
290    column_descriptions = os.path.join(data_path, 'vw_namespace_map.csv')
291    column_types, fw_map = parse_namespace(column_descriptions)
292
293    # We establish column order here
294    column_names = ['label'] + list(fw_map.values())
295
296    data_path = os.path.join(data_path, 'data.vw.gz')
297    col_delimiter = None
298    encoding = 'utf-8'
299
300    return DatasetInformationStorage(
301        data_path, column_names, column_types, col_delimiter, encoding, fw_map,
302    )

A generic parser of ob-based data

def parse_ob_raw_feature_information(data_path) -> DatasetInformationStorage:
305def parse_ob_raw_feature_information(data_path) -> DatasetInformationStorage:
306    """A generic parser of ob-based data"""
307
308    # Get column names
309    column_types: list[str] = []
310
311    # Get set of numeric columns
312    table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv')
313    table_header = read_column_names(table_header_path)
314
315    data_path_train = os.path.join(data_path, 'raw_data/1_train/*')
316    col_delimiter = '\t'
317    encoding = 'utf-8'
318
319    final_df = []
320    core_data_folders = glob.glob(data_path_train)
321    for actual_data in core_data_folders:
322        for dump in glob.glob(actual_data + '/*'):
323            tmp_df = pd.read_csv(
324                dump, sep='\t', low_memory=True, dtype='object',
325            )
326            assert tmp_df.shape[1] == len(table_header)
327            tmp_df.columns = table_header
328            final_df.append(tmp_df)
329
330    final_df_concat = pd.concat(final_df, axis=0)
331    final_path = os.path.join(data_path, 'raw_dump.tsv')
332    logging.info(
333        f'Stored data dump of dimension {final_df_concat.shape} to {final_path}',
334    )
335    final_df_concat.to_csv(final_path, sep='\t', index=False)
336    data_path = os.path.join(data_path, 'raw_dump.tsv')
337
338    return DatasetInformationStorage(
339        data_path, table_header, set(column_types), col_delimiter, encoding, None,
340    )

A generic parser of ob-based data

def parse_ob_feature_information(data_path) -> DatasetInformationStorage:
343def parse_ob_feature_information(data_path) -> DatasetInformationStorage:
344    """A generic parser of ob-based data"""
345
346    # Get column names
347    column_names = os.path.join(data_path, 'vw_namespace_map.csv')
348    column_types, _ = parse_namespace(column_names)
349
350    # Get set of numeric columns
351    table_header_path = os.path.join(data_path, 'raw_data/0_header/header.csv')
352    table_header = read_column_names(table_header_path)
353
354    data_path = os.path.join(data_path, 'raw_data/1_train/*')
355    col_delimiter = '\t'
356    encoding = 'utf-8'
357
358    return DatasetInformationStorage(
359        data_path, table_header, column_types, col_delimiter, encoding, None,
360    )

A generic parser of ob-based data

def parse_csv_with_description_information(data_path) -> DatasetInformationStorage:
363def parse_csv_with_description_information(data_path) -> DatasetInformationStorage:
364    dataset_description = read_reference_json(
365        os.path.join(data_path, 'dataset_desc.json'),
366    )
367    column_names = []
368    column_types = set()
369    for feature in dataset_description.get('data_features', []):
370        feature_name = feature.get('name')
371        column_names.append(feature_name)
372        feature_type = feature.get('type', '')
373        if 'float' in feature_type or 'Float' in feature_type:
374            column_types.add(feature_name)
375    col_delimiter = ','
376    data_path = os.path.join(data_path, 'data.csv')
377    encoding = 'latin1'
378    return DatasetInformationStorage(
379        data_path, column_names, column_types, col_delimiter, encoding, None,
380    )
def parse_csv_raw(data_path) -> DatasetInformationStorage:
383def parse_csv_raw(data_path) -> DatasetInformationStorage:
384    column_types: set[str] = set()
385
386    data_path = os.path.join(data_path, 'data.csv')
387    with open(data_path) as inp_data:
388        header = inp_data.readline()
389    col_delimiter = ','
390    column_names = header.strip().split(col_delimiter)
391    encoding = 'latin1'
392    return DatasetInformationStorage(
393        data_path, column_names, column_types, col_delimiter, encoding, None,
394    )
def extract_features_from_reference_JSON( json_path: str, combined_features_only=False, all_features=False) -> set[typing.Any]:
397def extract_features_from_reference_JSON(json_path: str, combined_features_only = False, all_features = False) -> set[Any]:
398    """Given a model's JSON, extract unique features"""
399
400    with open(json_path) as jp:
401        content = json.load(jp)
402
403    unique_features = set()
404    feature_space = content['desc'].get('features', [])
405    if all_features:
406        return set(feature_space)
407
408    fields_space = content['desc'].get('fields', [])
409    joint_space = feature_space + fields_space
410
411    if combined_features_only:
412        return {feature for feature in feature_space if len(feature.split(','))>1}
413
414    for feature_tuple in joint_space:
415        for individual_feature in feature_tuple.split(','):
416            unique_features.add(individual_feature)
417
418    return unique_features

Given a model's JSON, extract unique features

def summarize_feature_bounds_for_transformers( bounds_object_storage: Any, feature_types: list[str], task_name: str, label_name: str, granularity: int = 15, output_summary_table_only: bool = False):
421def summarize_feature_bounds_for_transformers(
422    bounds_object_storage: Any,
423    feature_types: list[str],
424    task_name: str,
425    label_name: str,
426    granularity: int = 15,
427    output_summary_table_only: bool = False,
428):
429    """summarization auxilliary method for generating JSON-based specs"""
430
431    if bounds_object_storage is None:
432        logging.info('Bounds storage object is empty.')
433        exit()
434
435    final_storage = defaultdict(list)
436    for el in bounds_object_storage:
437        if isinstance(el, dict):
438            for k, v in el.items():
439                final_storage[k].append(v)
440
441    summary_table_rows = []
442    for k, v in final_storage.items():
443        # Conduct local aggregation + bound changes
444        if k in feature_types and k != label_name:
445            minima, maxima, medians, uniques = [], [], [], []
446            for feature_summary in v:
447                minima.append(feature_summary.minimum)
448                maxima.append(feature_summary.maximum)
449                medians.append(feature_summary.median)
450                uniques.append(feature_summary.num_unique)
451            summary_table_rows.append(
452                [
453                    k,
454                    round(np.min(minima), 2),
455                    round(np.max(maxima), 2),
456                    round(np.median(medians), 2),
457                    int(np.mean(uniques)),
458                ],
459            )
460
461    if len(summary_table_rows) == 0:
462        logging.info('No numeric features to summarize.')
463        return None
464
465    summary_table: pd.Dataframe = pd.DataFrame(summary_table_rows)
466    summary_table.columns = [
467        'Feature',
468        'Minimum',
469        'Maximum',
470        'Median',
471        'Num avg. unique (batch)',
472    ]
473
474    if output_summary_table_only:
475        return summary_table
476
477    if len(summary_table) == 0:
478        logging.info('Summary table empty, skipping transformer generation ..')
479        return
480
481    if task_name == 'feature_summary_transformers':
482        transformers_per_feature = defaultdict(list)
483
484        # Take care of weights first -> range is pre-defined
485        for k, v in final_storage.items():
486            if label_name in k or 'dummy' in k:
487                continue
488
489            weight_template = {
490                'feature': k,
491                'src_features': [k],
492                'transformations': ['Weight'],
493                'weights': [0, 0.5, 1.5, 2, 3, 10],
494            }
495            transformers_per_feature[k].append(weight_template)
496
497        # Consider numeric transformations - pairs and single ones
498        for enx, row in summary_table.iterrows():
499            if row.Feature == 'dummy':
500                continue
501            try:
502                actual_range = (
503                    np.arange(
504                        row['Minimum'],
505                        row['Maximum'],
506                        (row['Maximum'] - row['Minimum']) / granularity,
507                    )
508                    .round(2)
509                    .tolist()
510                )
511                binner_template = {
512                    'feature': f'{row.Feature}',
513                    'src_features': [row.Feature],
514                    'transformations': [
515                        'BinnerSqrt',
516                        'BinnerLog',
517                        'BinnerSqrtPlain',
518                        'BinnerLogPlain',
519                    ],
520                    'n': actual_range,
521                    'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128],
522                }
523
524            except Exception as es:
525                logging.info(
526                    f'\U0001F631 Encountered {es}. The problematic feature is: {row}, skipping transformer for this feature ..',
527                )
528
529            transformers_per_feature[row.Feature].append(binner_template)
530
531            # We want the full loop here, due to asymmetry of transformation(s)
532            for enx_second, row_second in summary_table.iterrows():
533                if enx_second < enx:
534                    continue
535
536                # The n values are defined based on maxima of the second feature
537                if row_second.Feature != row.Feature:
538                    n_bound = round(row_second['Median'] + row['Median'], 2)
539                    max_bound = round(
540                        min(row_second['Maximum'], row['Maximum']), 2,
541                    )
542                    min_bound = round(
543                        row_second['Minimum'] + row['Minimum'], 2,
544                    )
545                    range_spectrum = sorted(
546                        list(
547                            {
548                                0.0,
549                                min_bound,
550                                n_bound / 10,
551                                n_bound / 5,
552                                n_bound,
553                                max_bound,
554                            },
555                        ),
556                    )
557
558                    range_spectrum = [x for x in range_spectrum if x >= 0]
559                    binner_pair_template = {
560                        'feature': f'{row.Feature}Ratio{row_second.Feature}',
561                        'src_features': [row.Feature, row_second.Feature],
562                        'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'],
563                        'n': range_spectrum,
564                        'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128],
565                    }
566
567                    binner_pair_template_second = {
568                        'feature': f'{row_second.Feature}Ratio{row.Feature}',
569                        'src_features': [row_second.Feature, row.Feature],
570                        'transformations': ['BinnerLogRatioPlain', 'BinnerLogRatio'],
571                        'n': range_spectrum,
572                        'resolutions': [0.1, 2, 4, 8, 16, 32, 64, 128],
573                    }
574
575                    transformers_per_feature[row.Feature].append(
576                        binner_pair_template,
577                    )
578                    transformers_per_feature[row.Feature].append(
579                        binner_pair_template_second,
580                    )
581
582        binner_templates = []
583        for k, v in transformers_per_feature.items():
584            for transformer_struct in v:
585                binner_templates.append(transformer_struct)
586
587        logging.info(
588            f'Generated {len(binner_templates)} transformation search specifications.\n',
589        )
590        namespace_full = f'"random_grid_feature_transform": {json.dumps(binner_templates)}, "random_grid_epochs": 512'
591        logging.info('Generated transformations below:\n')
592        print(namespace_full)

summarization auxilliary method for generating JSON-based specs

def summarize_rare_counts( term_counter: Any, args: Any, cardinality_object: Any, object_info: DatasetInformationStorage) -> None:
595def summarize_rare_counts(
596    term_counter: Any,
597    args: Any,
598    cardinality_object: Any,
599    object_info: DatasetInformationStorage,
600) -> None:
601    """Write rare values"""
602
603    out_df_rows = []
604    logging.info(
605        f'Rare value summary (freq <= {args.rare_value_count_upper_bound}) follows ..',
606    )
607
608    for namespace_tuple, count in term_counter.items():
609        namespace, value = namespace_tuple
610        out_df_rows.append([namespace, value, count])
611    out_df: pd.DataFrame = pd.DataFrame(out_df_rows)
612    out_df.columns = ['Namespace', 'value', 'Count']
613    out_df.to_csv(
614        os.path.join(args.output_folder, 'rare_values.tsv'), sep='\t', index=False,
615    )
616    logging.info(f'Wrote rare values to {args.output_folder}/rare_values.tsv')
617
618    overall_rare_counts = Counter(out_df.Namespace.values)
619    sorted_counts = sorted(
620        overall_rare_counts.items(), key=lambda pair: pair[1], reverse=True,
621    )
622    for k, v in sorted_counts:
623        logging.info(f'Namespace: {k} ---- Rare values observed: {v}')
624
625    final_df_rows = []
626    for k, v in sorted_counts:
627        cardinality = len(cardinality_object[k])
628        rare_proportion = np.round(100 * (v / cardinality), 2)
629        col_type = 'nominal'
630        if k in object_info.column_types:
631            col_type = 'numeric'
632        final_df_rows.append(
633            {
634                'rare_proportion': rare_proportion,
635                'feature_type': col_type,
636                'feature_name': k,
637            },
638        )
639
640    final_df: pd.DataFrame = pd.DataFrame(final_df_rows)
641    final_df = final_df.sort_values(by=['rare_proportion'])
642    logging.info(
643        f'Wrote feature sparsity summary to {args.output_folder}/feature_sparsity_summary.tsv',
644    )
645    final_df.to_csv(
646        f'{args.output_folder}/feature_sparsity_summary.tsv', index=False, sep='\t',
647    )

Write rare values

def is_prior_heuristic(args: Any) -> bool:
650def is_prior_heuristic(args: Any) -> bool:
651    if '-prior' in args.heuristic and args.reference_model_JSON:
652        return True
653    return False
def get_num_of_instances(fname: str) -> int:
656def get_num_of_instances(fname: str) -> int:
657    """Count the number of lines in a file, fast - useful for progress logging"""
658
659    def _make_gen(reader):
660        while True:
661            b = reader(2**16)
662            if not b:
663                break
664            yield b
665
666    with open(fname, 'rb') as f:
667        count = sum(buf.count(b'\n') for buf in _make_gen(f.raw.read))
668    return count

Count the number of lines in a file, fast - useful for progress logging