diff --git a/convert-to-parquet/convert-to-parquet.py b/convert-to-parquet/convert-to-parquet.py index 6fea930b5d231604580041ac755481c2350c6b99..2a8b70c693a9594a428fbc53724239052caf7ff7 100755 --- a/convert-to-parquet/convert-to-parquet.py +++ b/convert-to-parquet/convert-to-parquet.py @@ -1,20 +1,16 @@ from urllib.parse import unquote import os import re -import dask.bag as db -import dask.dataframe as dd import argparse +import pandas as pd +import gzip from pathlib import PurePath desc = """ Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n Works with data from /data/user, /data/user/home, /data/project, and /scratch The data are parsed and the directory beneath any of those listed above is set as the top-level -directory ('tld'). The tld is then set as the index for the parquet file to improve aggregation speed - -If the full log is split into multiple parts, the full dataset will need to be repartitioned after all of -the parts are converted to parquet to make sure all entries for each tld are in the same partition. This -can be done with a separate script +directory ('tld'). """ def parse_args(): @@ -108,11 +104,14 @@ def main(): os.makedirs(outdir,exist_ok=True) - bag = db.read_text(file).map(parse_line) - ddf = bag.to_dataframe(meta=schema).set_index('tld') + #bag = db.read_text(file).map(parse_line) + with gzip.open(file,'r') as f: + dicts = [parse_line(l) for l in f] + df = pd.DataFrame.from_dict(dicts).sort_values('tld') + df = df.astype(schema) outname = file.with_suffix('.parquet').name - ddf.to_parquet(outdir.joinpath(outname)) + df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow') if __name__ == '__main__': main() \ No newline at end of file