Skip to content
Snippets Groups Projects

Add example notebook for full analysis

1 file
+ 188
0
Compare changes
  • Side-by-side
  • Inline
+ 188
0
%% Cell type:markdown id: tags:
## Example GPFS Workflow
This notebook aims to give an example GPFS workflow using an early version of the `rc_gpfs` package. This will start with a raw GPFS log file moving through stages for splitting and conversion to parquet using the Python API, although both steps could be done using the built-in CLI as well. Once the parquet dataset is created, it will create the basic `tld` summarized report on file count and storage use. From there, some example plots showing breakdowns of storage used by file age will be created.
This notebook assumes being run in a job with at least one GPU, although some options can be changed to use a CPU-only backend. A GPU is recommended for analyzing larger logs, such as for `/data/project`.
%% Cell type:markdown id: tags:
### Suggested Compute Resources
For analyzing a `/data/project` report, I suggest the following resources:
- ntasks = 2
- cpus-per-task = 32
- ntasks-per-socket = 1
- gres = gpu:2
- partition = amperenodes
- mem = 256G
When using more than 1 GPU, specifying 1 task per socket is highly recommended to guarantee matching the GPU socket socket affinity on the current A100 DGX.
%% Cell type:markdown id: tags:
### Package and Input Setup
%% Cell type:code id: tags:
``` python
from pathlib import Path
# Import the three main Python API submodules for analyzing a report
from rc_gpfs import process, policy, report
```
%% Cell type:code id: tags:
``` python
# File patch setup
gpfs_log_root = Path('/data/rc/gpfs-policy/data/list-policy_data-project_list-path-external_slurm-31035593_2025-01-05T00:00:24/')
raw_log = gpfs_log_root.joinpath('raw','list-policy_data-project_list-path-external_slurm-31035593_2025-01-05T00:00:24.gz')
```
%% Cell type:markdown id: tags:
### Log Preprocessing
This section shows how to use the `policy` module to split the large GPFS log into smaller parts and then convert each part to a separate parquet file for analysis elsewhere. You can specify different filepaths for the outputs of each stage if you want, but it's generally easier to let the functions use the standard log directory structure seen below:
``` text
.
└── log_root/
├── raw/
│ └── raw_log.gz
├── chunks/
│ ├── list-000.gz
│ ├── list-001.gz
│ └── ...
├── parquet/
│ ├── list-000.parquet
│ ├── list-001.parquet
│ └── ...
├── reports/
└── misc/
```
The directory structure will be automatically created as needed by each function. It's generally easier to not specify output paths for consistency in organization.
%% Cell type:markdown id: tags:
#### Split
%% Cell type:code id: tags:
``` python
# Split raw log
policy.split(log=raw_log)
```
%% Output
INFO: Checking for pigz
INFO: pigz found at /home/mdefende/bin/pigz
INFO: 120 logs found. Beginning compression
%% Cell type:markdown id: tags:
#### Convert
Opposed to split, it's much faster to submit the parquet conversion as an array job (built into the `cli` submodule), but it's possible to do here via the multiprocessing library as well.
%% Cell type:code id: tags:
``` python
from multiprocessing import Pool
from rc_gpfs.utils import parse_scontrol
cores,_ = parse_scontrol()
split_files = list(gpfs_log_root.joinpath('chunks').glob('*.gz'))
```
%% Cell type:code id: tags:
``` python
with Pool(cores) as pool:
pool.map(policy.convert,split_files)
```
%% Cell type:markdown id: tags:
### Aggregate
%% Cell type:code id: tags:
``` python
pq_path = gpfs_log_root.joinpath('parquet')
delta_vals = range(0,210,30)
delta_unit = 'D'
report_name = 'agg_by_tld_atime.parquet'
```
%% Cell type:markdown id: tags:
When choosing what sort of compute we want to use, we can let the package infer the backend based on the in-memory dataset size as well as the available compute resources, but it's easier in this case to specify that we want to use `dask_cuda`. Backend initialization happens later in the notebook
%% Cell type:code id: tags:
``` python
# Compute backend setup
with_cuda = True
with_dask = True
```
%% Cell type:code id: tags:
``` python
process.aggregate_gpfs_dataset(dataset_path=pq_path, delta_vals=delta_vals, delta_unit=delta_unit, report_name=report_name, with_cuda=with_cuda, with_dask=with_dask)
```
%% Output
---------------------------------------------------------------------------
TypeCheckError Traceback (most recent call last)
Cell In[16], line 1
----> 1 process.aggregate_gpfs_dataset(dataset_path=pq_path, delta_vals=delta_vals, delta_unit=delta_unit, report_name=report_name, with_cuda=with_cuda, with_dask=with_dask)
File ~/.conda/envs/gpfs/lib/python3.11/site-packages/rc_gpfs/process/process.py:56, in aggregate_gpfs_dataset(dataset_path, run_date, delta_vals, delta_unit, time_val, report_dir, report_name, n_workers, with_cuda, with_dask, **kwargs)
49 pass
51 # ENH: Refactor this, it should not automatically create the compute backend, that should be passed to it. Creating the
52 # backend is beyond the scope of the process module and this function specifically. Can wrap this with the backend
53 # creation in a CLI command later for convenience, but that shouldn't be baked in
55 @typechecked
---> 56 def aggregate_gpfs_dataset(
57 dataset_path: str | Path,
58 run_date: pd.Timestamp | None = None,
59 delta_vals: int | list[int] | None = None,
60 delta_unit: Literal['D','W','M','Y'] | None = None,
61 time_val: Literal['access','modify','create'] = 'access',
62 report_dir: str | Path | None = None,
63 report_name: str | Path | None = None,
64 n_workers: int | None = None,
65 with_cuda: Literal['infer'] | bool = 'infer',
66 with_dask: Literal['infer'] | bool = 'infer',
67 **kwargs
68 ) -> None:
69 # Input checking
70 dataset_path = _check_dataset_path(dataset_path)
71 if dataset_path.is_file():
File ~/.conda/envs/gpfs/lib/python3.11/site-packages/typeguard/_functions.py:137, in check_argument_types(func_name, arguments, memo)
134 raise exc
136 try:
--> 137 check_type_internal(value, annotation, memo)
138 except TypeCheckError as exc:
139 qualname = qualified_name(value, add_class_prefix=True)
File ~/.conda/envs/gpfs/lib/python3.11/site-packages/typeguard/_checkers.py:946, in check_type_internal(value, annotation, memo)
944 checker = lookup_func(origin_type, args, extras)
945 if checker:
--> 946 checker(value, origin_type, args, memo)
947 return
949 if isclass(origin_type):
File ~/.conda/envs/gpfs/lib/python3.11/site-packages/typeguard/_checkers.py:454, in check_uniontype(value, origin_type, args, memo)
451 finally:
452 del errors # avoid creating ref cycle
--> 454 raise TypeCheckError(f"did not match any element in the union:\n{formatted_errors}")
TypeCheckError: argument "delta_vals" (range) did not match any element in the union:
int: is not an instance of int
list[int]: is not a list
NoneType: is not an instance of NoneType
Loading