Skip to content
Snippets Groups Projects

Update no-clobber for convert-to-parquet

2 files
+ 90
19
Compare changes
  • Side-by-side
  • Inline
Files
2
import sys
import argparse
import re
import subprocess
from pathlib import Path
import multiprocessing
from ..policy.convert import convert, set_output_filename
from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
from ..utils import parse_scontrol
@@ -37,7 +39,7 @@ def parse_args():
parser.add_argument('--pool-size',type=int,default=None,
help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
parser.add_argument('--no-clobber', action='store_true',default=False,
help='When set and existing parquet files are found, immediately exits without any processing')
help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.')
args = parser.parse_args()
return vars(args)
@@ -51,17 +53,15 @@ BATCH_SCRIPT = """\
#SBATCH --mem={mem}
#SBATCH --output={output_log}
#SBATCH --error={error_log}
#SBATCH --array=1-{nlogs}
#SBATCH --array={array_idxs}
{env_cmd}
log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")
log=$(ls {input}/*.gz | sort | awk "NR==${{SLURM_ARRAY_TASK_ID+1}} {{ print $1 }}")
convert-to-parquet -o {output_dir} ${{log}}
convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}}
"""
def submit_batch(**kwargs):
env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
kwargs.update({"env_cmd":env_cmd})
@@ -74,26 +74,82 @@ def submit_batch(**kwargs):
subprocess.run(['sbatch'],input=script,shell=True,text=True)
pass
def _find_sequential_indexes(idxs):
if not idxs:
return []
idxs = sorted(idxs)
result = []
start = idxs[0]
prev = idxs[0]
for num in idxs[1:]:
if num == prev + 1:
prev = num
else:
if start == prev:
result.append(str(start))
else:
result.append(f"{start}-{prev}")
start = num
prev = num
# Add the last sequence
if start == prev:
result.append(str(start))
else:
result.append(f"{start}-{prev}")
return result
def _get_missing_indexes(chunks, parquets):
missing_indexes = [
index for index, element in enumerate(chunks) if element not in parquets
]
return missing_indexes
def convert_to_parquet() -> None:
args = parse_args()
from ..policy.convert import convert
if args['output_dir'] is None:
args['output_dir'] = args['input'].parent.joinpath('parquet')
args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0
if args['no_clobber'] and output_files_exist:
sys.exit('The output directory already contains parquet files. Exiting')
if args['input'].is_file():
nlogs = 1
else:
logs = list(args.get('input').glob('*.gz'))
nlogs = len(logs)
if args["no_clobber"]:
args.update({"no_clobber_opt": "--no-clobber"})
if args['input'].is_dir():
chunks = logs
chunks.sort()
else:
chunks = [args["input"].name]
pqs = [f.name for f in args["output_dir"].glob("*.parquet")]
target_pqs = [set_output_filename(f) for f in chunks]
idxs_to_run = _get_missing_indexes(target_pqs,pqs)
if len(idxs_to_run) == 0:
print("INFO: All log chunks have been converted to parquet. Exiting without processing")
return
array_idxs = _find_sequential_indexes(idxs_to_run)
args['array_idxs'] = ','.join(array_idxs)
else:
args.update(
{
"no_clobber_opt": "",
"array_idxs" : f"0-{nlogs-1}"
}
)
args.update({'nlogs':nlogs})
if args['batch']:
Loading