Skip to content
Snippets Groups Projects
convert-to-parquet.py 3.71 KiB
Newer Older
from urllib.parse import unquote
import os
import re
import dask.bag as db
import dask.dataframe as dd
import argparse
from pathlib import PurePath

desc = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld'). The tld is then set as the index for the parquet file to improve aggregation speed

If the full log is split into multiple parts, the full dataset will need to be repartitioned after all of
the parts are converted to parquet to make sure all entries for each tld are in the same partition. This
can be done with a separate script
"""

def parse_args():
    parser = argparse.ArgumentParser(description=desc,
                                     formatter_class=argparse.RawTextHelpFormatter
                                     )
Matthew K Defenderfer's avatar
Matthew K Defenderfer committed
    parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true')
    parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet")
Matthew K Defenderfer's avatar
Matthew K Defenderfer committed
    parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part")
    args = parser.parse_args()
    return args

def parse_line(line):
    try:
        ul = unquote(line).strip()
        ul = re.sub(r'[\n\t]','',ul)
        details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
        
        d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])

        tld = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path).group(1)
        d.update({'path': path,
                  'tld': tld})
        return d
    except:
        return line

def print_policy():
    policy = """
/* list of files to include */
define( include_list,
  (PATH_NAME LIKE 'FILEPATH%')
)

/* define access_age */
define(access_age,
  (DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME))
)

RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%'
RULE 'list-path' LIST 'gather-info'
  SHOW ('|size='   || varchar(FILE_SIZE) ||
        '|kballoc='|| varchar(KB_ALLOCATED) ||
        '|access=' || varchar(ACCESS_TIME) ||
        '|create=' || varchar(CREATION_TIME) ||
        '|modify=' || varchar(MODIFICATION_TIME) ||
	    '|uid='    || varchar(USER_ID) ||
        '|gid='    || varchar(GROUP_ID) ||
        '|heat='   || varchar(FILE_HEAT) ||
        '|pool='   || varchar(POOL_NAME) ||
        '|mode='   || varchar(MODE) ||
        '|misc='   || varchar(MISC_ATTRIBUTES) ||
        '|'
       )
  WHERE include_list
"""
    print(policy)
    return

schema = {
    'size': 'int64',
    'kballoc': 'int64',
    'access': 'datetime64[ns]',
    'create': 'datetime64[ns]',
    'modify': 'datetime64[ns]',
    'uid': 'int64',
    'gid': 'int64',
    'heat': 'str',
    'pool': 'str',
    'mode': 'str',
    'misc': 'str',
    'path': 'str',
    'tld': 'str'
}

def main():
    args   = parse_args()
    if args.policy:
        print_policy()
        exit()
    
Matthew K Defenderfer's avatar
Matthew K Defenderfer committed
    if args.file:
        file = PurePath(args.file)
    else:
        exit('Error: must specify a file to convert')
Matthew K Defenderfer's avatar
Matthew K Defenderfer committed
    if not args.output_dir:
        outdir = file.parent.joinpath('parquet')
Matthew K Defenderfer's avatar
Matthew K Defenderfer committed
    else:
        outdir = PurePath(args.output_dir)
        
    os.makedirs(outdir,exist_ok=True)
    
    bag = db.read_text(file).map(parse_line)
    ddf  = bag.to_dataframe(meta=schema).set_index('tld')

    outname = file.with_suffix('.parquet').name
    ddf.to_parquet(outdir.joinpath(outname))

if __name__ == '__main__':
    main()