Skip to content
Snippets Groups Projects
convert.py 3.11 KiB
Newer Older
import re
import gzip
from pathlib import Path
from urllib.parse import unquote

import polars as pl
from .policy_defs import SCHEMA
from ..utils import as_path

def parse_line(line):
    try:
        ul = unquote(line).strip()
        ul = re.sub(r'[\n\t]','',ul)
        details,path = re.match(r'^[^|]+\|(.*)\| -- (.*)$', ul).groups()
        
        d = dict([re.match(r'([\w]+)=(.*)',l).groups() for l in details.split('|')])

        grp = re.match(r'(?:/data/user(?:/home)?/|/data/project/|/scratch/)([^/]+)',path)
        if grp:
            tld = grp.group(1)
        else:
            tld = None
            
        d.update({'path': path,
                  'tld': tld})
        return d
    except:
        return line

def set_output_filename(input_file,output_name = None):
    if output_name is None:
        output_name = input_file.with_suffix(".parquet").name
    else:
        output_name = as_path(output_name).with_suffix(".parquet").name
    
    return str(output_name)

def convert(
        input_file: str | Path, 
        output_dir: str | Path | None = None, 
        output_name: str | Path | None = None,
        no_clobber: bool = False,
    ) -> None:
    """
    Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus.

    Parameters
    ----------
    input_file : str | Path
        Path to the file to convert. Can be either compressed or uncompressed.
    output_dir : str | Path | None, optional
        Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None
    output_name : str | Path | None, optional
        Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None
    no_clobber : bool, optional
        When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten
    """
    
    input_file = as_path(input_file)
    
    if output_dir is not None:
        output_dir = as_path(output_dir)
    else:
        output_dir = input_file.parent.parent.joinpath('parquet')

    output_name = set_output_filename(input_file,output_name)
    output_path = output_dir.joinpath(output_name)

    if output_path.exists() and no_clobber:
        print(
            "INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.",
            flush=True,
        )
        print("INFO: Cleaning and exiting.",flush=True)
        return

    output_dir.mkdir(mode=0o2770, exist_ok=True)
    
    with gzip.open(input_file,'r') as f:
        dicts = [parse_line(l) for l in f]
    
    df = (
        pl.from_dicts(dicts,schema=SCHEMA)
        .with_columns(
            pl.col(name).str.to_datetime(time_unit='ns') for name in ['access','create','modify']
        )
        .sort('path')
    )
    df.write_parquet(output_path,use_pyarrow=True)