from urllib.parse import unquote import os import re import argparse import pandas as pd import gzip from pathlib import PurePath desc = """ Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n Works with data from /data/user, /data/user/home, /data/project, and /scratch The data are parsed and the directory beneath any of those listed above is set as the top-level directory ('tld'). """ def parse_args(): parser = argparse.ArgumentParser(description=desc, formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true') parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet") parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part") args = parser.parse_args() return args def parse_line(line): try: ul = unquote(line).strip() ul = re.sub(r'[\n\t]','',ul) details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups() d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')]) grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path) if grp: tld = grp.group(1) else: tld = None d.update({'path': path, 'tld': tld}) return d except: return line def print_policy(): policy = """ /* list of files to include */ define( include_list, (PATH_NAME LIKE 'FILEPATH%') ) /* define access_age */ define(access_age, (DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME)) ) RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%' RULE 'list-path' LIST 'gather-info' SHOW ('|size=' || varchar(FILE_SIZE) || '|kballoc='|| varchar(KB_ALLOCATED) || '|access=' || varchar(ACCESS_TIME) || '|create=' || varchar(CREATION_TIME) || '|modify=' || varchar(MODIFICATION_TIME) || '|uid=' || varchar(USER_ID) || '|gid=' || varchar(GROUP_ID) || '|heat=' || varchar(FILE_HEAT) || '|pool=' || varchar(POOL_NAME) || '|mode=' || varchar(MODE) || '|misc=' || varchar(MISC_ATTRIBUTES) || '|' ) WHERE include_list """ print(policy) return schema = { 'size': 'int64', 'kballoc': 'int64', 'access': 'datetime64[ns]', 'create': 'datetime64[ns]', 'modify': 'datetime64[ns]', 'uid': 'int64', 'gid': 'int64', 'heat': 'str', 'pool': 'str', 'mode': 'str', 'misc': 'str', 'path': 'str', 'tld': 'str' } def main(): args = parse_args() if args.policy: print_policy() exit() if args.file: file = PurePath(args.file) else: exit('Error: must specify a file to convert') if not args.output_dir: outdir = file.parent.joinpath('parquet') else: outdir = PurePath(args.output_dir) os.makedirs(outdir,exist_ok=True) #bag = db.read_text(file).map(parse_line) with gzip.open(file,'r') as f: dicts = [parse_line(l) for l in f] df = pd.DataFrame.from_dict(dicts).sort_values('tld') df = df.astype(schema) outname = file.with_suffix('.parquet').name df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow') if __name__ == '__main__': main()