Skip to content
Snippets Groups Projects
Commit 328db9a8 authored by Matthew K Defenderfer's avatar Matthew K Defenderfer
Browse files

Update no-clobber for convert-to-parquet

parent 6f38ecb6
No related branches found
No related tags found
1 merge request!60Update no-clobber for convert-to-parquet
import sys
import argparse import argparse
import re
import subprocess import subprocess
from pathlib import Path from pathlib import Path
import multiprocessing import multiprocessing
from ..policy.convert import convert, set_output_filename
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..utils import parse_scontrol from ..utils import parse_scontrol
...@@ -37,7 +39,7 @@ def parse_args(): ...@@ -37,7 +39,7 @@ def parse_args():
parser.add_argument('--pool-size',type=int,default=None, parser.add_argument('--pool-size',type=int,default=None,
help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process") help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
parser.add_argument('--no-clobber', action='store_true',default=False, parser.add_argument('--no-clobber', action='store_true',default=False,
help='When set and existing parquet files are found, immediately exits without any processing') help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.')
args = parser.parse_args() args = parser.parse_args()
return vars(args) return vars(args)
...@@ -51,17 +53,15 @@ BATCH_SCRIPT = """\ ...@@ -51,17 +53,15 @@ BATCH_SCRIPT = """\
#SBATCH --mem={mem} #SBATCH --mem={mem}
#SBATCH --output={output_log} #SBATCH --output={output_log}
#SBATCH --error={error_log} #SBATCH --error={error_log}
#SBATCH --array=1-{nlogs} #SBATCH --array={array_idxs}
{env_cmd} {env_cmd}
log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}") log=$(ls {input}/*.gz | sort | awk "NR==${{SLURM_ARRAY_TASK_ID+1}} {{ print $1 }}")
convert-to-parquet -o {output_dir} ${{log}} convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}}
""" """
def submit_batch(**kwargs): def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env')) env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd}) kwargs.update({"env_cmd":env_cmd})
...@@ -74,26 +74,82 @@ def submit_batch(**kwargs): ...@@ -74,26 +74,82 @@ def submit_batch(**kwargs):
subprocess.run(['sbatch'],input=script,shell=True,text=True) subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass pass
def _find_sequential_indexes(idxs):
if not idxs:
return []
idxs = sorted(idxs)
result = []
start = idxs[0]
prev = idxs[0]
for num in idxs[1:]:
if num == prev + 1:
prev = num
else:
if start == prev:
result.append(str(start))
else:
result.append(f"{start}-{prev}")
start = num
prev = num
# Add the last sequence
if start == prev:
result.append(str(start))
else:
result.append(f"{start}-{prev}")
return result
def _get_missing_indexes(chunks, parquets):
missing_indexes = [
index for index, element in enumerate(chunks) if element not in parquets
]
return missing_indexes
def convert_to_parquet() -> None: def convert_to_parquet() -> None:
args = parse_args() args = parse_args()
from ..policy.convert import convert
if args['output_dir'] is None: if args['output_dir'] is None:
args['output_dir'] = args['input'].parent.joinpath('parquet') args['output_dir'] = args['input'].parent.joinpath('parquet')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770) args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0
if args['no_clobber'] and output_files_exist:
sys.exit('The output directory already contains parquet files. Exiting')
if args['input'].is_file(): if args['input'].is_file():
nlogs = 1 nlogs = 1
else: else:
logs = list(args.get('input').glob('*.gz')) logs = list(args.get('input').glob('*.gz'))
nlogs = len(logs) nlogs = len(logs)
if args["no_clobber"]:
args.update({"no_clobber_opt": "--no-clobber"})
if args['input'].is_dir():
chunks = logs
chunks.sort()
else:
chunks = [args["input"].name]
pqs = [f.name for f in args["output_dir"].glob("*.parquet")]
target_pqs = [set_output_filename(f) for f in chunks]
idxs_to_run = _get_missing_indexes(target_pqs,pqs)
if len(idxs_to_run) == 0:
print("INFO: All log chunks have been converted to parquet. Exiting without processing")
return
array_idxs = _find_sequential_indexes(idxs_to_run)
args['array_idxs'] = ','.join(array_idxs)
else:
args.update(
{
"no_clobber_opt": "",
"array_idxs" : f"0-{nlogs-1}"
}
)
args.update({'nlogs':nlogs}) args.update({'nlogs':nlogs})
if args['batch']: if args['batch']:
......
...@@ -28,10 +28,19 @@ def parse_line(line): ...@@ -28,10 +28,19 @@ def parse_line(line):
except: except:
return line return line
def set_output_filename(input_file,output_name = None):
if output_name is None:
output_name = input_file.with_suffix(".parquet").name
else:
output_name = as_path(output_name).with_suffix(".parquet").name
return str(output_name)
def convert( def convert(
input_file: str | Path, input_file: str | Path,
output_dir: str | Path | None = None, output_dir: str | Path | None = None,
output_name: str | Path | None = None output_name: str | Path | None = None,
no_clobber: bool = False,
) -> None: ) -> None:
""" """
Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus. Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus.
...@@ -44,6 +53,8 @@ def convert( ...@@ -44,6 +53,8 @@ def convert(
Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None
output_name : str | Path | None, optional output_name : str | Path | None, optional
Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None
no_clobber : bool, optional
When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten
""" """
input_file = as_path(input_file) input_file = as_path(input_file)
...@@ -52,14 +63,18 @@ def convert( ...@@ -52,14 +63,18 @@ def convert(
output_dir = as_path(output_dir) output_dir = as_path(output_dir)
else: else:
output_dir = input_file.parent.parent.joinpath('parquet') output_dir = input_file.parent.parent.joinpath('parquet')
if output_name is None:
output_name = input_file.with_suffix('.parquet').name
else:
output_name = as_path(output_name).with_suffix('.parquet').name
output_name = set_output_filename(input_file,output_name)
output_path = output_dir.joinpath(output_name) output_path = output_dir.joinpath(output_name)
if output_path.exists() and no_clobber:
print(
"INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.",
flush=True,
)
print("INFO: Cleaning and exiting.",flush=True)
return
output_dir.mkdir(mode=0o2770, exist_ok=True) output_dir.mkdir(mode=0o2770, exist_ok=True)
with gzip.open(input_file,'r') as f: with gzip.open(input_file,'r') as f:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment