Skip to content
Snippets Groups Projects

Draft: Partition parquet dataset for sync with s5cmd

Closed Matthew K Defenderfer requested to merge partition-parquet-dataset into main
Files
8
+ 117
0
from urllib.parse import unquote
import os
import re
import argparse
import pandas as pd
import gzip
from pathlib import PurePath
desc = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld').
"""
def parse_args():
parser = argparse.ArgumentParser(description=desc,
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-p','--policy',help="Print the policy the script uses as a template and exit",action='store_true')
parser.add_argument('-o','--output-dir',help="Directory to store the output parquet. The parquet file will have the same name as the input. Defaults to input_file_dir/parquet")
parser.add_argument('-f','--file',help="Log file from mmlspolicy run to be converted to parquet. Can be either a full log or just a part")
args = parser.parse_args()
return args
def parse_line(line):
try:
ul = unquote(line).strip()
ul = re.sub(r'[\n\t]','',ul)
details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])
tld = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path).group(1)
d.update({'path': path,
'tld': tld})
return d
except:
return line
def print_policy():
policy = """
/* list of files to include */
define( include_list,
(PATH_NAME LIKE 'FILEPATH%')
)
/* define access_age */
define(access_age,
(DAYS(CURRENT_TIMESTAMP) - DAYS(ACCESS_TIME))
)
RULE 'gather-exec' EXTERNAL LIST 'gather-info' EXEC '' OPTS 'JOBID' ESCAPE '%'
RULE 'list-path' LIST 'gather-info'
SHOW ('|size=' || varchar(FILE_SIZE) ||
'|kballoc='|| varchar(KB_ALLOCATED) ||
'|access=' || varchar(ACCESS_TIME) ||
'|create=' || varchar(CREATION_TIME) ||
'|modify=' || varchar(MODIFICATION_TIME) ||
'|uid=' || varchar(USER_ID) ||
'|gid=' || varchar(GROUP_ID) ||
'|heat=' || varchar(FILE_HEAT) ||
'|pool=' || varchar(POOL_NAME) ||
'|mode=' || varchar(MODE) ||
'|misc=' || varchar(MISC_ATTRIBUTES) ||
'|'
)
WHERE include_list
"""
print(policy)
return
schema = {
'size': 'int64',
'kballoc': 'int64',
'access': 'datetime64[ns]',
'create': 'datetime64[ns]',
'modify': 'datetime64[ns]',
'uid': 'int64',
'gid': 'int64',
'heat': 'str',
'pool': 'str',
'mode': 'str',
'misc': 'str',
'path': 'str',
'tld': 'str'
}
def main():
args = parse_args()
if args.policy:
print_policy()
exit()
if args.file:
file = PurePath(args.file)
else:
exit('Error: must specify a file to convert')
if not args.output_dir:
outdir = file.parent.joinpath('parquet')
else:
outdir = PurePath(args.output_dir)
os.makedirs(outdir,exist_ok=True)
#bag = db.read_text(file).map(parse_line)
with gzip.open(file,'r') as f:
dicts = [parse_line(l) for l in f]
df = pd.DataFrame.from_dict(dicts).sort_values('tld')
df = df.astype(schema)
outname = file.with_suffix('.parquet').name
df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow')
if __name__ == '__main__':
main()
\ No newline at end of file
Loading