Skip to content
Snippets Groups Projects
Commit 1a9c7b10 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

remove dask components for now

parent d2f5b02a
No related branches found
No related tags found
1 merge request!8Automate conversion of GPFS policy outputs to parquet without Jupyter
from urllib.parse import unquote from urllib.parse import unquote
import os import os
import re import re
import dask.bag as db
import dask.dataframe as dd
import argparse import argparse
import pandas as pd
import gzip
from pathlib import PurePath from pathlib import PurePath
desc = """ desc = """
Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n Converts GPFS policy run logs to parquet files for easier aggregation and analysis.\n
Works with data from /data/user, /data/user/home, /data/project, and /scratch Works with data from /data/user, /data/user/home, /data/project, and /scratch
The data are parsed and the directory beneath any of those listed above is set as the top-level The data are parsed and the directory beneath any of those listed above is set as the top-level
directory ('tld'). The tld is then set as the index for the parquet file to improve aggregation speed directory ('tld').
If the full log is split into multiple parts, the full dataset will need to be repartitioned after all of
the parts are converted to parquet to make sure all entries for each tld are in the same partition. This
can be done with a separate script
""" """
def parse_args(): def parse_args():
...@@ -108,11 +104,14 @@ def main(): ...@@ -108,11 +104,14 @@ def main():
os.makedirs(outdir,exist_ok=True) os.makedirs(outdir,exist_ok=True)
bag = db.read_text(file).map(parse_line) #bag = db.read_text(file).map(parse_line)
ddf = bag.to_dataframe(meta=schema).set_index('tld') with gzip.open(file,'r') as f:
dicts = [parse_line(l) for l in f]
df = pd.DataFrame.from_dict(dicts).sort_values('tld')
df = df.astype(schema)
outname = file.with_suffix('.parquet').name outname = file.with_suffix('.parquet').name
ddf.to_parquet(outdir.joinpath(outname)) df.to_parquet(outdir.joinpath(outname),engine = 'pyarrow')
if __name__ == '__main__': if __name__ == '__main__':
main() main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment