Skip to content
Snippets Groups Projects
convert-to-parquet.py 3.58 KiB
Newer Older
from urllib.parse import unquote
import os
import re
import argparse
import pandas as pd
import gzip
from pathlib import PurePath

desc = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""

def parse_args():
    parser = argparse.ArgumentParser(description=desc,
                                     formatter_class=argparse.RawTextHelpFormatter
                                     )
    parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true')
    parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet")
    parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part")
    args = parser.parse_args()
    return args

def parse_line(line):
    try:
        ul = unquote(line).strip()
        ul = re.sub(r'[\n\t]','',ul)
        details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
        
        d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])

        grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path)
        if grp:
        d.update({'path': path,
                  'tld': tld})
        return d
    except:
        return line

def print_policy():
    policy = """
/* list of files to include */
define( include_list,
  (PATH_NAME LIKE 'FILEPATH%')
)

/* define access_age */
define(access_age,
  (DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME))
)

RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%'
RULE 'list-path' LIST 'gather-info'
  SHOW ('|size='   || varchar(FILE_SIZE) ||
        '|kballoc='|| varchar(KB_ALLOCATED) ||
        '|access=' || varchar(ACCESS_TIME) ||
        '|create=' || varchar(CREATION_TIME) ||
        '|modify=' || varchar(MODIFICATION_TIME) ||
	    '|uid='    || varchar(USER_ID) ||
        '|gid='    || varchar(GROUP_ID) ||
        '|heat='   || varchar(FILE_HEAT) ||
        '|pool='   || varchar(POOL_NAME) ||
        '|mode='   || varchar(MODE) ||
        '|misc='   || varchar(MISC_ATTRIBUTES) ||
        '|'
       )
  WHERE include_list
"""
    print(policy)
    return

schema = {
    'size': 'int64',
    'kballoc': 'int64',
    'access': 'datetime64[ns]',
    'create': 'datetime64[ns]',
    'modify': 'datetime64[ns]',
    'uid': 'int64',
    'gid': 'int64',
    'heat': 'str',
    'pool': 'str',
    'mode': 'str',
    'misc': 'str',
    'path': 'str',
    'tld': 'str'
}

def main():
    args   = parse_args()
    if args.policy:
        print_policy()
        exit()
    
    if args.file:
        file = PurePath(args.file)
    else:
        exit('Error: must specify a file to convert')
    if not args.output_dir:
        outdir = file.parent.joinpath('parquet')
    else:
        outdir = PurePath(args.output_dir)
    #bag = db.read_text(file).map(parse_line)
    with gzip.open(file,'r') as f:
        dicts = [parse_line(l) for l in f]
    df  = pd.DataFrame.from_dict(dicts).sort_values('tld')
    df = df.astype(schema)

    outname = file.with_suffix('.parquet').name
    df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow')