From 943e956e52e00504f46d687389df588de9f33f83 Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 25 Apr 2025 14:42:03 -0500
Subject: [PATCH 1/3] implement no-clobber in the convert function instead of
 the CLI function. Allows users to submit full array jobs that only target log
 chunks which are missing parquet conversions

---
 src/rc_gpfs/cli/convert_to_parquet.py | 12 ++++--------
 src/rc_gpfs/policy/convert.py         | 13 ++++++++++++-
 2 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py
index 1e00596..b0478dd 100644
--- a/src/rc_gpfs/cli/convert_to_parquet.py
+++ b/src/rc_gpfs/cli/convert_to_parquet.py
@@ -3,6 +3,8 @@ import argparse
 import subprocess
 from pathlib import Path
 import multiprocessing
+
+from ..policy.convert import convert
 from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
 from ..utils import parse_scontrol
 
@@ -37,7 +39,7 @@ def parse_args():
     parser.add_argument('--pool-size',type=int,default=None,
                         help="Number of cores to include in the pool for local parallel processing. If None, will default to all cores available to the invoking Python process")
     parser.add_argument('--no-clobber', action='store_true',default=False,
-                        help='When set and existing parquet files are found, immediately exits without any processing')
+                        help='When set, skips any log chunks that already have corresponding parquet files. Chunks without a parquet file are processed as normal.')
     args = parser.parse_args()
     return vars(args)
 
@@ -51,7 +53,7 @@ BATCH_SCRIPT = """\
 #SBATCH --mem={mem}
 #SBATCH --output={output_log}
 #SBATCH --error={error_log}
-#SBATCH --array=1-{nlogs}
+#SBATCH --array={array_idxs}
 
 {env_cmd}
 
@@ -77,17 +79,11 @@ def submit_batch(**kwargs):
 def convert_to_parquet() -> None:
     args = parse_args()
 
-    from ..policy.convert import convert
-
     if args['output_dir'] is None:
         args['output_dir'] = args['input'].parent.joinpath('parquet')
 
     args['output_dir'].mkdir(exist_ok = True, mode = 0o2770)
 
-    output_files_exist = len(list(args['output_dir'].glob('*.parquet'))) > 0
-    if args['no_clobber'] and output_files_exist:
-        sys.exit('The output directory already contains parquet files. Exiting')
-    
     if args['input'].is_file():
         nlogs = 1
     else:
diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py
index ab42702..05f5f62 100755
--- a/src/rc_gpfs/policy/convert.py
+++ b/src/rc_gpfs/policy/convert.py
@@ -31,7 +31,8 @@ def parse_line(line):
 def convert(
         input_file: str | Path, 
         output_dir: str | Path | None = None, 
-        output_name: str | Path | None = None
+        output_name: str | Path | None = None,
+        no_clobber: bool = False,
     ) -> None:
     """
     Converts a GPFS log file to parquet format. The data schema assumes the same policy definition from list-path-external and list-path-dirplus.
@@ -44,6 +45,8 @@ def convert(
         Directory path in which to store the file. If the directory does not exist, it will be created. If None, the output directory will be set to ./input_file/../../parquet in accordance with our standard organization. By default None
     output_name : str | Path | None, optional
         Name of the output file. The name will be automatically appended with .parquet. If None, the name of the input file will be used. By default None
+    no_clobber : bool, optional
+        When set to True, if output_dir/output_name.parquet already exists, exit without overwriting the existing file. If False (default), any existing parquet file will be overwritten
     """
     
     input_file = as_path(input_file)
@@ -60,6 +63,14 @@ def convert(
 
     output_path = output_dir.joinpath(output_name)
 
+    if output_path.exists() and no_clobber:
+        print(
+            "INFO: Output file already exists. Pass no_clobber=False to overwrite any existing output parquet file.",
+            flush=True,
+        )
+        print("INFO: Cleaning and exiting.",flush=True)
+        return
+
     output_dir.mkdir(mode=0o2770, exist_ok=True)
     
     with gzip.open(input_file,'r') as f:
-- 
GitLab


From 5695579205e6371409726ee0aa0f066059bbb62c Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 25 Apr 2025 15:46:19 -0500
Subject: [PATCH 2/3] split set_output_filename to its own function so it can
 be used in the CLI function

---
 src/rc_gpfs/policy/convert.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py
index 05f5f62..0873e9a 100755
--- a/src/rc_gpfs/policy/convert.py
+++ b/src/rc_gpfs/policy/convert.py
@@ -28,6 +28,14 @@ def parse_line(line):
     except:
         return line
 
+def set_output_filename(input_file,output_name = None):
+    if output_name is None:
+        output_name = input_file.with_suffix(".parquet").name
+    else:
+        output_name = as_path(output_name).with_suffix(".parquet").name
+    
+    return str(output_name)
+
 def convert(
         input_file: str | Path, 
         output_dir: str | Path | None = None, 
@@ -55,12 +63,8 @@ def convert(
         output_dir = as_path(output_dir)
     else:
         output_dir = input_file.parent.parent.joinpath('parquet')
-    
-    if output_name is None:
-        output_name = input_file.with_suffix('.parquet').name
-    else:
-        output_name = as_path(output_name).with_suffix('.parquet').name
 
+    output_name = set_output_filename(input_file,output_name)
     output_path = output_dir.joinpath(output_name)
 
     if output_path.exists() and no_clobber:
-- 
GitLab


From 5d1a1f2e328c1ae84e28ba2bc7a563d6d5373c8b Mon Sep 17 00:00:00 2001
From: Matthew K Defenderfer <mdefende@uab.edu>
Date: Fri, 25 Apr 2025 15:48:46 -0500
Subject: [PATCH 3/3] update no-clobber option to apply to each individual log
 chunk instead of quitting execution if any parquet files are found. also
 update array job to be 0 indexed to match the index name in the files
 themselves

---
 src/rc_gpfs/cli/convert_to_parquet.py | 72 ++++++++++++++++++++++++---
 1 file changed, 66 insertions(+), 6 deletions(-)

diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py
index b0478dd..0a0f78a 100644
--- a/src/rc_gpfs/cli/convert_to_parquet.py
+++ b/src/rc_gpfs/cli/convert_to_parquet.py
@@ -1,10 +1,10 @@
-import sys
 import argparse
+import re
 import subprocess
 from pathlib import Path
 import multiprocessing
 
-from ..policy.convert import convert
+from ..policy.convert import convert, set_output_filename
 from .utils import define_python_interpreter,batch_parser,setup_slurm_logs
 from ..utils import parse_scontrol
 
@@ -57,13 +57,11 @@ BATCH_SCRIPT = """\
 
 {env_cmd}
 
-log=$(ls {input}/*.gz | awk "NR==${{SLURM_ARRAY_TASK_ID}} {{ print $1 }}")
+log=$(ls {input}/*.gz | sort | awk "NR==${{SLURM_ARRAY_TASK_ID+1}} {{ print $1 }}")
 
-convert-to-parquet -o {output_dir} ${{log}}
+convert-to-parquet {no_clobber_opt} -o {output_dir} ${{log}}
 """
 
-
-
 def submit_batch(**kwargs):
     env_cmd = define_python_interpreter(kwargs.get('python_path'),kwargs.get('conda_env'))
     kwargs.update({"env_cmd":env_cmd})
@@ -76,6 +74,39 @@ def submit_batch(**kwargs):
     subprocess.run(['sbatch'],input=script,shell=True,text=True)
     pass
 
+def _find_sequential_indexes(idxs):
+    if not idxs:
+        return []
+
+    idxs = sorted(idxs)
+    result = []
+    start = idxs[0]
+    prev = idxs[0]
+
+    for num in idxs[1:]:
+        if num == prev + 1:
+            prev = num
+        else:
+            if start == prev:
+                result.append(str(start))
+            else:
+                result.append(f"{start}-{prev}")
+            start = num
+            prev = num
+    # Add the last sequence
+    if start == prev:
+        result.append(str(start))
+    else:
+        result.append(f"{start}-{prev}")
+
+    return result
+
+def _get_missing_indexes(chunks, parquets):
+    missing_indexes = [
+        index for index, element in enumerate(chunks) if element not in parquets
+    ]
+    return missing_indexes
+
 def convert_to_parquet() -> None:
     args = parse_args()
 
@@ -90,6 +121,35 @@ def convert_to_parquet() -> None:
         logs = list(args.get('input').glob('*.gz'))
         nlogs = len(logs)
     
+    if args["no_clobber"]:
+        args.update({"no_clobber_opt": "--no-clobber"})
+
+        if args['input'].is_dir():
+            chunks = logs
+            chunks.sort()
+        else:
+            chunks = [args["input"].name]
+        
+        pqs = [f.name for f in args["output_dir"].glob("*.parquet")]
+
+        target_pqs = [set_output_filename(f) for f in chunks]
+
+        idxs_to_run = _get_missing_indexes(target_pqs,pqs)
+        
+        if len(idxs_to_run) == 0:
+            print("INFO: All log chunks have been converted to parquet. Exiting without processing")
+            return
+        
+        array_idxs = _find_sequential_indexes(idxs_to_run)
+        args['array_idxs'] = ','.join(array_idxs)
+    else:
+        args.update(
+            {
+                "no_clobber_opt": "",
+                "array_idxs" : f"0-{nlogs-1}"
+            }
+        )
+    
     args.update({'nlogs':nlogs})
 
     if args['batch']:
-- 
GitLab