diff --git a/pyproject.toml b/pyproject.toml index cfa6b73b4a464a0e832324f3fa7d72abec86f6d7..a5f1f62237a01ecbffb17226d63437c3d8586bee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,10 +19,10 @@ dynamic = ["version","dependencies","classifiers"] repository = "https://gitlab.rc.uab.edu/rc/gpfs-policy" [project.scripts] -convert-to-hive = "rc_gpfs.cli:convert_flat_to_hive" -convert-to-parquet = "rc_gpfs.cli:convert_to_parquet" -split-log = "rc_gpfs.cli:split_log" -gpfspart = "rc_gpfs.cli:gpfs_fpart" +convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive" +convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet" +split-log = "rc_gpfs.cli.split_log:split_log" +fparq = "rc_gpfs.cli.fparq_cli:fparq_cli" [tool.poetry] requires-poetry = ">=2.0" @@ -48,12 +48,6 @@ name="rapids" url="https://pypi.nvidia.com" priority = "supplemental" -[tool.poetry.scripts] -convert-to-hive = "rc_gpfs.cli.convert_flat_to_hive:convert_flat_to_hive" -convert-to-parquet = "rc_gpfs.cli.convert_to_parquet:convert_to_parquet" -split-log = "rc_gpfs.cli.split_log:split_log" -fparq = "rc_gpfs.cli.fparq_cli:fparq_cli" - [tool.poetry.requires-plugins] poetry-dynamic-versioning = { version = ">=1.0.0,<2.0.0", extras = ["plugin"] } diff --git a/src/rc_gpfs/cli/__init__.py b/src/rc_gpfs/cli/__init__.py index c5e66dbcd6f50d016ed18fcb1183bb27ce008fb2..fe7491323d880507a2d8b5bbe6f8a7c30b182615 100644 --- a/src/rc_gpfs/cli/__init__.py +++ b/src/rc_gpfs/cli/__init__.py @@ -1,4 +1,4 @@ -from .convert_flat_to_hive import convert_flat_to_hive -from .convert_to_parquet import convert_to_parquet -from .split_log import split_log -from .fparq_cli import fparq_cli \ No newline at end of file +#from .convert_flat_to_hive import convert_flat_to_hive +#from .convert_to_parquet import convert_to_parquet +#from .split_log import split_log +#from .fparq_cli import fparq_cli \ No newline at end of file diff --git a/src/rc_gpfs/cli/convert_flat_to_hive.py b/src/rc_gpfs/cli/convert_flat_to_hive.py index a10eee033fc0c5667478597273912c84c5828303..b6768087ee93445db469746fcc31fbd9a0b49b74 100644 --- a/src/rc_gpfs/cli/convert_flat_to_hive.py +++ b/src/rc_gpfs/cli/convert_flat_to_hive.py @@ -2,13 +2,7 @@ import argparse import subprocess from pathlib import Path -import dask.dataframe as dd -import dask.config -dask.config.set({'dataframe.backend':'cudf'}) -from dask.diagnostics import ProgressBar - from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy import hivize DESCRIPTION = """ Converts flat parquet GPFS datasets to a hive format partitioned by tld and log acquisition date. This essentially creates a timeseries of structured datasets for each tld for much easier more efficient log comparisons within tld. Each file path is set as the index and sorted, and all final output parquets are partitioned to have similar in-memory sizes. @@ -86,68 +80,18 @@ def submit_batch(**kwargs): subprocess.run(['sbatch'],input=script,shell=True,text=True) pass -def split_into_groups(series, cutoff): - groups = [] - - while len(series.index) > 0: - current_group = [] - current_sum = 0 - for username, storage_size in series.items(): - if storage_size > cutoff: - groups.append({username}) - series = series.drop(username) - break - elif current_sum + storage_size <= cutoff: - current_group.append(username) - current_sum += storage_size - - series = series.drop(current_group) - if current_group: - groups.append(set(current_group)) - - return groups - -def calc_tld_mem(df): - mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum()) - return mem - -def define_tld_groups(input,cutoff): - ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) - with ProgressBar(): - tld_mem = ddf.map_partitions(calc_tld_mem).compute() - tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas() - grps = split_into_groups(tld_mem,cutoff) - return grps - -def nested_list_to_log(nest,file): - """ - Writes a list of lists to a text log - - Args: - nest (list): A list of lists to be converted. - """ - with open(file, 'w', newline='') as f: - for l in nest: - f.write(f"{','.join(l)}\n") - def convert_flat_to_hive(): args = parse_args() + from ..policy.hive import prep_hivize, hivize + if args.get('batch'): if not args.get('grp_file'): - grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff')) - - misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt') - misc_path.parent.mkdir(exist_ok = True, parents = True) - nested_list_to_log(grps,misc_path) - ngroups = len(grps) - grp_file = str(misc_path) - args.update({'ngroups':ngroups, - 'grp_file':grp_file}) + prep_hivize(args) else: ngroups = sum(1 for line in open(args.get('grp_file'))) - args.update({'ngroups':ngroups}) - + args.update({'ngroups':ngroups}) + submit_batch(**args) else: diff --git a/src/rc_gpfs/cli/convert_to_parquet.py b/src/rc_gpfs/cli/convert_to_parquet.py index f035843cb54588d6394501efc3d3b8f9f7fcdb23..1e00596b0752da10c82dfc3bba9da278dd3d841d 100644 --- a/src/rc_gpfs/cli/convert_to_parquet.py +++ b/src/rc_gpfs/cli/convert_to_parquet.py @@ -4,7 +4,6 @@ import subprocess from pathlib import Path import multiprocessing from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy import convert from ..utils import parse_scontrol __all__ = ['convert_to_parquet'] @@ -77,6 +76,9 @@ def submit_batch(**kwargs): def convert_to_parquet() -> None: args = parse_args() + + from ..policy.convert import convert + if args['output_dir'] is None: args['output_dir'] = args['input'].parent.joinpath('parquet') diff --git a/src/rc_gpfs/cli/fparq_cli.py b/src/rc_gpfs/cli/fparq_cli.py index 34fd6b0f9f381be47f3bde28871df60dd49aaaaf..da60a8a3061e4f7cfbf785a6987e669c22a2be71 100644 --- a/src/rc_gpfs/cli/fparq_cli.py +++ b/src/rc_gpfs/cli/fparq_cli.py @@ -1,8 +1,5 @@ import argparse from pathlib import Path -import pandas as pd - -from ..process import fparq DESCRIPTION = """ gpfspart is a custom implementation of the fpart algorithm specifically designed to work with processed GPFS policy outputs. fpart crawls a directory tree partitioning the files by size up to a specified max and stores them in a number of lists. These lists are useful for passing to rsync and its derivative implementations to load balance and parallelize large data transfers. However, fpart has to crawl a file tree to create these partitions which can impact performance on very large, network file systems such as GPFS. @@ -62,6 +59,9 @@ def parse_args(): def fparq_cli(): args = parse_args() + import pandas as pd + from ..process import fparq + if args.get('partition_path') is None: pq_path = args.get('parquet_path') args.update({'partition_path': pq_path.joinpath('_partitions')}) diff --git a/src/rc_gpfs/cli/split_log.py b/src/rc_gpfs/cli/split_log.py index c0cc64a672eca4cd9bdbc28ffe5af51f190d56da..44c8461365465ba6bc529eb2f4642a29f2df1fa7 100644 --- a/src/rc_gpfs/cli/split_log.py +++ b/src/rc_gpfs/cli/split_log.py @@ -3,7 +3,6 @@ import argparse import subprocess from pathlib import Path from .utils import define_python_interpreter,batch_parser,setup_slurm_logs -from ..policy import split BATCH_SCRIPT = """\ #!/bin/bash @@ -69,5 +68,6 @@ def split_log(): if args.get('batch'): submit_batch(**args) else: + from ..policy.split import split split(**args) pass \ No newline at end of file diff --git a/src/rc_gpfs/policy/__init__.py b/src/rc_gpfs/policy/__init__.py index c24d1c30ce26f7b9c49e0df278f96df4e4ba2590..1a7b96e03d112141a4ddf21fee7b4626aa46ae54 100644 --- a/src/rc_gpfs/policy/__init__.py +++ b/src/rc_gpfs/policy/__init__.py @@ -1,2 +1,2 @@ -from .split import split,compress_logs -from .convert import convert, hivize \ No newline at end of file +#from .split import split, compress_logs +#from .convert import convert, hivize \ No newline at end of file diff --git a/src/rc_gpfs/policy/convert.py b/src/rc_gpfs/policy/convert.py index 73c26d012ebf79a34baf704da79873717a0cca41..54f05f7cb5f04ae8c04450b810927bfbce24f81a 100755 --- a/src/rc_gpfs/policy/convert.py +++ b/src/rc_gpfs/policy/convert.py @@ -1,27 +1,11 @@ -import os import re import gzip -import json -import random -import string -import shutil from pathlib import Path -from typing import Literal, List from urllib.parse import unquote -import GPUtil -import cudf import pandas as pd -import numpy as np -import dask.dataframe as dd -import dask.config -import rmm - -import pyarrow as pa -import pyarrow.parquet as pq from .policy_defs import SCHEMA -from ..compute.backend import infer_cuda from ..utils import as_path def parse_line(line): @@ -83,177 +67,4 @@ def convert( df = pd.DataFrame.from_dict(dicts).sort_values('tld') df = df.astype(SCHEMA) - df.to_parquet(output_path,engine = 'pyarrow') - - -def hivize( - parquet_path: str | Path, - hive_path: str | Path, - tld: str | List[str] | None = None, - staging_path: str | Path | None = None, - partition_size: str = '100MiB', - with_cuda: bool | Literal['infer'] = 'infer', - write_metadata: bool = True, - **kwargs - ) -> None: - parquet_path = as_path(parquet_path) - hive_path = as_path(hive_path) - - if staging_path is None: - rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) - staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') - print(f"INFO: Using {staging_path} as temporary directory",flush=True) - else: - staging_path = as_path(staging_path) - - hive_path.mkdir(exist_ok=True,parents=True) - staging_path.mkdir(exist_ok=True,parents=True) - - if with_cuda == 'infer': - with_cuda = infer_cuda() - - if with_cuda: - import dask_cudf as backend - from dask_cudf.core import from_cudf as from_local - dask.config.set({'dataframe.backend':'cudf'}) - rmm.reinitialize( - pool_allocator=True, - managed_memory=True, - initial_pool_size='60GiB' - ) - else: - import dask as backend - from dask.dataframe import from_pandas as from_local - dask.config.set({'dataframe.backend':'pandas'}) - - def indexed_name(ind): - return f"indexed-{ind}.parquet" - - if tld is not None: - if not isinstance(tld,list): - tld = [tld] - predicates = [('tld','in',tld)] - else: - predicates = None - print(f"DEBUG: Filtering predicates are: {predicates}",flush=True) - - acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) - print(f"DEBUG: Acquisition date is {acq}",flush=True) - - # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even - # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically - # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe - # to create partitions within the parquet dataset and write to multiple files defined by those partitions. - ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) - df = ddf.compute() - df = df.set_index('path').sort_index().assign(acq=acq) - ddf = from_local(df).repartition(partition_size=partition_size,force=True) - ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) - - if write_metadata: - ( - df - .groupby(['tld','acq']) - .apply(lambda x: - write_dataset_metadata( - x, - staging_path.joinpath(f"tld={x['tld'].iloc[0]}",f"acq={x['acq'].iloc[0]}") - ) - ) - ) - - shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) - shutil.rmtree(staging_path) - pass - -def bytes_to_human_readable_size(num_bytes): - units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] - - # Handle the case where num_bytes is 0 - if num_bytes == 0: - return "0 B" - - # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 - unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) - num = int(num_bytes // (1024 ** unit_index)) - - # Format the number to 2 decimal places and append the unit - return f"{num} {units[unit_index]}" - -def create_bin_labels(bins, include_outer_bins = True): - if any([b < 0 for b in bins]): - raise ValueError("File size cannot be less than 0 B") - bins = list(set(bins)) - bins.sort() # Sorts and removes any duplicates - if np.isinf(bins[-1]): - bins.remove(np.inf) - - labels = [] - - # Check for including a bin for all files less than the smallest value if - if include_outer_bins and bins[0] != 0: - labels.append(f"0 B-{bytes_to_human_readable_size(bins[0])}") - - for i in range(len(bins) - 1): - lower_bound = bytes_to_human_readable_size(bins[i]) - upper_bound = bytes_to_human_readable_size(bins[i + 1]) - labels.append(f"{lower_bound}-{upper_bound}") - - if include_outer_bins: - labels.append(f">{bytes_to_human_readable_size(bins[-1])}") - return labels,bins - -def calculate_size_distribution( - sizes: pd.Series | cudf.Series, - size_bins: List[int | float] = [0,4096] + [x*1024**3 for x in [1,10,50,100]] + [1024**4], - include_outer_bins: bool = True, - **kwargs -) -> pd.Series | cudf.Series: - - size_labels,size_bins = create_bin_labels(size_bins,include_outer_bins) - - if include_outer_bins: - if size_bins[0] < 0: - size_bins.insert(0,0) - if float("inf") not in size_bins: - size_bins.append(float("inf")) - - fn = pd.cut if isinstance(sizes,pd.Series) else cudf.cut - size_grps = fn(sizes,bins=size_bins,labels=size_labels,ordered=True,include_lowest=True,**kwargs) - - return size_grps - -def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict: - parquet_path = as_path(parquet_path) - if isinstance(df,cudf.DataFrame): - vram_usage = df.memory_usage(deep=True).to_dict() - ram_usage = df.to_pandas().memory_usage(deep=True).to_dict() - else: - if len(GPUtil.getAvailable()) > 0: - vram_usage = cudf.from_pandas(df).memory_usage(deep=True).to_dict() - else: - vram_usage = 'NA' - ram_usage = df.memory_usage(deep=True).to_dict() - - df = df[['size']] - df['grp'] = calculate_size_distribution(df['size']) - - size_dist = ( - df - .groupby('grp',observed=True)['size'] - .agg(['count','sum']) - .rename(columns={'count':'file_count','sum':'bytes'}) - .T - .to_dict() - ) - - metadata = { - 'num_rows': df.shape[0], - 'num_columns': df.shape[1], - 'vram_usage': vram_usage, - 'ram_usage': ram_usage, - 'size_distribution': size_dist - } - - with open(parquet_path.joinpath('_metadata.json'),'w') as f: - json.dump(metadata,f) \ No newline at end of file + df.to_parquet(output_path,engine = 'pyarrow') \ No newline at end of file diff --git a/src/rc_gpfs/policy/hive.py b/src/rc_gpfs/policy/hive.py new file mode 100644 index 0000000000000000000000000000000000000000..29c9f163bec67a717c20a40bad554ba9eb0551f1 --- /dev/null +++ b/src/rc_gpfs/policy/hive.py @@ -0,0 +1,249 @@ +import os +import re +import json +import random +import string +import shutil +from pathlib import Path +from typing import Literal, List +import GPUtil + +import cudf +import pandas as pd +import numpy as np + +import dask.dataframe as dd +import dask.config +dask.config.set({'dataframe.backend':'cudf'}) +from dask.diagnostics import ProgressBar +import rmm + +from ..compute.backend import infer_cuda +from ..utils import as_path + +def split_into_groups(series, cutoff): + groups = [] + + while len(series.index) > 0: + current_group = [] + current_sum = 0 + for username, storage_size in series.items(): + if storage_size > cutoff: + groups.append({username}) + series = series.drop(username) + break + elif current_sum + storage_size <= cutoff: + current_group.append(username) + current_sum += storage_size + + series = series.drop(current_group) + if current_group: + groups.append(set(current_group)) + + return groups + +def calc_tld_mem(df): + mem = df.groupby('tld',observed=True).apply(lambda x: x.memory_usage(deep=True).sum()) + return mem + +def define_tld_groups(input,cutoff): + ddf = dd.read_parquet(input,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + with ProgressBar(): + tld_mem = ddf.map_partitions(calc_tld_mem).compute() + tld_mem = tld_mem.groupby(tld_mem.index).sum().divide(1024**3).to_pandas() + grps = split_into_groups(tld_mem,cutoff) + return grps + +def nested_list_to_log(nest,file): + """ + Writes a list of lists to a text log + + Args: + nest (list): A list of lists to be converted. + """ + with open(file, 'w', newline='') as f: + for l in nest: + f.write(f"{','.join(l)}\n") + +def prep_hivize(args): + grps = define_tld_groups(args.get('parquet_path'),args.get('cutoff')) + misc_path = args.get('parquet_path').parent.joinpath('misc','tld_grps.txt') + misc_path.parent.mkdir(exist_ok = True, parents = True) + nested_list_to_log(grps,misc_path) + ngroups = len(grps) + grp_file = str(misc_path) + args.update({'ngroups':ngroups, + 'grp_file':grp_file}) + return args + +def hivize( + parquet_path: str | Path, + hive_path: str | Path, + tld: str | List[str] | None = None, + staging_path: str | Path | None = None, + partition_size: str = '100MiB', + with_cuda: bool | Literal['infer'] = 'infer', + write_metadata: bool = True, + **kwargs + ) -> None: + parquet_path = as_path(parquet_path).resolve() + hive_path = as_path(hive_path).resolve() + + if staging_path is None: + rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + staging_path = Path(os.getenv('TMPDIR')).joinpath(os.getenv('USER'),f'hive-{rand_str}') + print(f"INFO: Using {staging_path} as temporary directory",flush=True) + else: + staging_path = as_path(staging_path) + + hive_path.mkdir(exist_ok=True,parents=True) + staging_path.mkdir(exist_ok=True,parents=True) + + if with_cuda == 'infer': + with_cuda = infer_cuda() + + if with_cuda: + import dask_cudf as backend + from dask_cudf.core import from_cudf as from_local + dask.config.set({'dataframe.backend':'cudf'}) + rmm.reinitialize( + pool_allocator=True, + managed_memory=True, + initial_pool_size='60GiB' + ) + else: + import dask as backend + from dask.dataframe import from_pandas as from_local + dask.config.set({'dataframe.backend':'pandas'}) + + def indexed_name(ind): + return f"indexed-{ind}.parquet" + + if tld is not None: + if not isinstance(tld,list): + tld = [tld] + predicates = [('tld','in',tld)] + else: + predicates = None + print(f"DEBUG: Filtering predicates are: {predicates}",flush=True) + + acq = re.search(r'[\d]{4}-[\d]{2}-[\d]{2}',str(parquet_path)).group(0) + print(f"DEBUG: Acquisition date is {acq}",flush=True) + + # The flat parquet is initially read in via dask to avoid reading the full dataset into memory which happens even + # when including predicates for filtering. The dask dataframe is converted to a regular dataframe to drastically + # improve indexing and sorting by removing partitions. The sorted dataframe is converted back to a dask dataframe + # to create partitions within the parquet dataset and write to multiple files defined by those partitions. + ddf = backend.read_parquet(parquet_path,filters=predicates,columns = ['size','kballoc','access','create','modify','uid','gid','path','tld']) + df = ddf.compute() + df = df.set_index('path').sort_index().assign(acq=acq) + ddf = from_local(df).repartition(partition_size=partition_size,force=True) + ddf.to_parquet(staging_path,partition_on=['tld','acq'],name_function = indexed_name) + + if write_metadata: + ( + df + .groupby(['tld','acq']) + .apply(lambda x: + write_dataset_metadata( + x, + staging_path.joinpath(f"tld={x['tld'].iloc[0]}",f"acq={x['acq'].iloc[0]}") + ) + ) + ) + + shutil.copytree(staging_path,hive_path,dirs_exist_ok=True) + shutil.rmtree(staging_path) + pass + +def bytes_to_human_readable_size(num_bytes): + units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] + + # Handle the case where num_bytes is 0 + if num_bytes == 0: + return "0 B" + + # Calculate the appropriate unit. Take the floor if the number is not divisible by 1024 + unit_index = min(len(units) - 1, int(np.log2(num_bytes) // 10)) + num = int(num_bytes // (1024 ** unit_index)) + + # Format the number to 2 decimal places and append the unit + return f"{num} {units[unit_index]}" + +def create_bin_labels(bins, include_outer_bins = True): + if any([b < 0 for b in bins]): + raise ValueError("File size cannot be less than 0 B") + bins = list(set(bins)) + bins.sort() # Sorts and removes any duplicates + if np.isinf(bins[-1]): + bins.remove(np.inf) + + labels = [] + + # Check for including a bin for all files less than the smallest value if + if include_outer_bins and bins[0] != 0: + labels.append(f"0 B-{bytes_to_human_readable_size(bins[0])}") + + for i in range(len(bins) - 1): + lower_bound = bytes_to_human_readable_size(bins[i]) + upper_bound = bytes_to_human_readable_size(bins[i + 1]) + labels.append(f"{lower_bound}-{upper_bound}") + + if include_outer_bins: + labels.append(f">{bytes_to_human_readable_size(bins[-1])}") + return labels,bins + +def calculate_size_distribution( + sizes: pd.Series | cudf.Series, + size_bins: List[int | float] = [0,4096] + [x*1024**3 for x in [1,10,50,100]] + [1024**4], + include_outer_bins: bool = True, + **kwargs +) -> pd.Series | cudf.Series: + + size_labels,size_bins = create_bin_labels(size_bins,include_outer_bins) + + if include_outer_bins: + if size_bins[0] < 0: + size_bins.insert(0,0) + if float("inf") not in size_bins: + size_bins.append(float("inf")) + + fn = pd.cut if isinstance(sizes,pd.Series) else cudf.cut + size_grps = fn(sizes,bins=size_bins,labels=size_labels,ordered=True,include_lowest=True,**kwargs) + + return size_grps + +def write_dataset_metadata(df: cudf.DataFrame | pd.DataFrame, parquet_path: str | Path) -> dict: + parquet_path = as_path(parquet_path) + if isinstance(df,cudf.DataFrame): + vram_usage = df.memory_usage(deep=True).to_dict() + ram_usage = df.to_pandas().memory_usage(deep=True).to_dict() + else: + if len(GPUtil.getAvailable()) > 0: + vram_usage = cudf.from_pandas(df).memory_usage(deep=True).to_dict() + else: + vram_usage = 'NA' + ram_usage = df.memory_usage(deep=True).to_dict() + + df = df[['size']] + df['grp'] = calculate_size_distribution(df['size']) + + size_dist = ( + df + .groupby('grp',observed=True)['size'] + .agg(['count','sum']) + .rename(columns={'count':'file_count','sum':'bytes'}) + .T + .to_dict() + ) + + metadata = { + 'num_rows': df.shape[0], + 'num_columns': df.shape[1], + 'vram_usage': vram_usage, + 'ram_usage': ram_usage, + 'size_distribution': size_dist + } + + with open(parquet_path.joinpath('_metadata.json'),'w') as f: + json.dump(metadata,f) \ No newline at end of file