Newer
Older
import re
import gzip
from pathlib import Path
from urllib.parse import unquote
from .policy_defs import SCHEMA
from ..utils import as_path
def parse_line(line):
try:
ul = unquote(line).strip()
ul = re.sub(r'[\n\t]','',ul)
details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])
grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path)
if grp:
tld = grp.group(1)
else:
tld = None
d.update({'path': path,
'tld': tld})
return d
except:
return line
def set_output_filename(input_file,output_name = None):
if output_name is None:
output_name = input_file.with_suffix(".parquet").name
else:
output_name = as_path(output_name).with_suffix(".parquet").name
return str(output_name)
def convert(
input_file: str | Path,
output_dir: str | Path | None = None,
output_name: str | Path | None = None,
no_clobber: bool = False,
) -> None:
"""
Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus.
Parameters
----------
input_file : str | Path
Path to the file to convert. Can be either compressed or uncompressed.
output_dir : str | Path | None, optional
Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None
output_name : str | Path | None, optional
Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None
no_clobber : bool, optional
When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten
"""
input_file = as_path(input_file)
if output_dir is not None:
output_dir = as_path(output_dir)
else:
output_dir = input_file.parent.parent.joinpath('parquet')
output_name = set_output_filename(input_file,output_name)
output_path = output_dir.joinpath(output_name)
if output_path.exists() and no_clobber:
print(
"INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.",
flush=True,
)
print("INFO: Cleaning and exiting.",flush=True)
return
output_dir.mkdir(mode=0o2770, exist_ok=True)
with gzip.open(input_file,'r') as f:
dicts = [parse_line(l) for l in f]
df = (
pl.from_dicts(dicts,schema=SCHEMA)
.with_columns(
pl.col(name).str.to_datetime(time_unit='ns') for name in ['access','create','modify']
)
.sort('path')
)
df.write_parquet(output_path,use_pyarrow=True)