From 1a9c7b108b57999f00890139363b8f908eec0b87 Mon Sep 17 00:00:00 2001 From: Matthew K Defenderfer <mdefende@uab.edu> Date: Thu, 15 Aug 2024 20:38:55 -0500 Subject: [PATCH] remove dask components for now --- convert-to-parquet/convert-to-parquet.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/convert-to-parquet/convert-to-parquet.py b/convert-to-parquet/convert-to-parquet.py index 6fea930..2a8b70c 100755 --- a/convert-to-parquet/convert-to-parquet.py +++ b/convert-to-parquet/convert-to-parquet.py @@ -1,20 +1,16 @@ from urllib.parse import unquote import os import re -import dask.bag as db -import dask.dataframe as dd import argparse +import pandas as pd +import gzip from pathlib import PurePath desc = """ Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n Works with data from /data/user, /data/user/home, /data/project, and /scratch The data are parsed and the directory beneath any of those listed above is set as the top-level -directory ('tld'). The tld is then set as the index for the parquet file to improve aggregation speed - -If the full log is split into multiple parts, the full dataset will need to be repartitioned after all of -the parts are converted to parquet to make sure all entries for each tld are in the same partition. This -can be done with a separate script +directory ('tld'). """ def parse_args(): @@ -108,11 +104,14 @@ def main(): os.makedirs(outdir,exist_ok=True) - bag = db.read_text(file).map(parse_line) - ddf = bag.to_dataframe(meta=schema).set_index('tld') + #bag = db.read_text(file).map(parse_line) + with gzip.open(file,'r') as f: + dicts = [parse_line(l) for l in f] + df = pd.DataFrame.from_dict(dicts).sort_values('tld') + df = df.astype(schema) outname = file.with_suffix('.parquet').name - ddf.to_parquet(outdir.joinpath(outname)) + df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow') if __name__ == '__main__': main() \ No newline at end of file -- GitLab