Skip to content
Snippets Groups Projects
dask-mpi-validate-galaxy-tar-data-2024-05-03.ipynb 29.91 KiB

run report on pickled list policy data

The script reads pickled files that match the glob_pattern from the pickledir derived from dirname and runs the report saving it as a csv to the subdir "dirname/reports" dir by default.

Some progress info is available via the verbose flag.

The current report aggrates storage stats by top-level-dir and age (year) of data's last access. The goal of this report is to understand the distribution of lesser used data.

!conda info --envs

!conda list

!pip list -freeze

import datetime
import pandas as pd
import matplotlib.pyplot as plt
from urllib.parse import unquote
import sys
import os
import pathlib
import re
import dask.dataframe as dd
import dask
from dask.diagnostics import ProgressBar
from dask.distributed import Client

Client(scheduler_file='scheduler.json')

client = Client(scheduler_file='scheduler.json')

client = Client()

input vars

dirname="data/list-policy_projects_2023-08-31"  # directory to fine files to pickle
glob_pattern = "*.parquet"  # file name glob pattern to match, can be file name for individual file
line_regex_filter = ".*"   # regex to match lines of interest in file
pickledir=f"{dirname}/parquet"
reportdir=f"{dirname}/reports"
tldpath="/data/project/ccts/galaxy"

verbose = True
limit = 0

Utilities

# get top level dir on which to aggregate

def get_tld(df, dirname):
    '''
    df: dataframe with path column (e.g. from policy run)
    dirname: top level dir (TLD) that contains dirs for report
    
    The function uses the length of dirname to locate the TLD column in the split path.
    '''
    dirpaths = dirname.split("/")
    new=df["path"].str.split("/", n=len(dirpaths)+1, expand=True)
    #df=df.assign(tld=new[len(dirpaths)])
    #df["tld"] = new[len(dirpaths)]
   
    return new[len(dirpaths)]
# get top level dir on which to aggregate

def get_year(df, column):
    '''
    df: dataframe with path column (e.g. from policy run)
    dirname: top level dir (TLD) that contains dirs for report
    
    The function uses the length of dirname to locate the TLD column in the split path.
    '''
    new = df[column].dt.year
    #dirpaths = dirname.split("/")
    #new=df["path"].str.split("/", n=len(dirpaths)+1, expand=True)
    #df=df.assign(tld=new[len(dirpaths)])
    #df["tld"] = new[len(dirpaths)]
   
    return new
def report_tld_year(df):
    '''
    Aggregate the sum and count of files by year in the top level dir (TLD)
    
    Uses dict parameter to pandas agg to apply sum and count function to size column
    '''
    report = df.groupby(['tld', df.access.dt.year]).agg({"size": ["sum", "count"]})
    return report

Read and parse the files according to glob_pattern

dask.config.set(scheduler='threads')

dask.config.set(scheduler='processes')

def read_policy_parquet(file, columns=['size', 'access', 'modify', 'uid', 'path'], engine="pyarrow"):
    
    df = dd.read_parquet(file, columns=columns, engine=engine)
    
    df = client.persist(df)
    
    df=df.repartition(partition_size="64MB")

    return df

df = dd.read_parquet(f'{pickledir}/list-*.parquet', columns=['size', 'access', 'modify', 'uid', 'path'], engine="pyarrow")

df

df = client.persist(df)

df.dask

%%time

df=df.repartition(partition_size="64MB")

df

%%time

df.map_partitions(len).compute()

df.dask

df = df[df.path.str.startswith("/data/project/ccts/galaxy/")]

%%time

maydf = read_policy_parquet("data/list-policy_projects_2024-05-03/parquet")
maydf = maydf[maydf.path.str.startswith("/data/project/ccts/galaxy/")]

%%time

maydf = maydf.set_index(maydf.path, npartitions="auto")

%%time

augdf = read_policy_parquet("data/list-policy_projects_2023-08-31/parquet")
augdf = augdf[augdf.path.str.startswith("/data/project/ccts/galaxy/")]
%%time

augdf=augdf.repartition(partition_size="64MB")
%%time

maydf=maydf.repartition(partition_size="64MB")
%%time

len(augdf)
%%time

len(maydf)
%%time

augdf = augdf.set_index(augdf.path)
%%time

maydf = maydf.set_index(maydf.path)
%%time
joindf = maydf.join(augdf, how="outer", lsuffix="_may", rsuffix="_aug")
joindf
joindf.dask
%%time

len(joindf)
%%time

len(joindf[joindf.modify_aug.isna()])
%%time

len(joindf[joindf.modify_may.isna()])
modify_comp = joindf.modify_may != joindf.modify_aug
%%time

len(joindf[modify_comp])
%%time

len(joindf[joindf.size_may != joindf.size_aug])
%%time

len(joindf[joindf.size_may == joindf.size_aug])
%%time

len(joindf[joindf.uid_may != joindf.uid_aug])
%%time

len(joindf[joindf.access_may != joindf.access_aug])
%%time

len(joindf[joindf.access_may == joindf.access_aug])
stop

Aggregate stats into running totals

df1=get_tld(df, tldpath)
df1.dask

%%time

with ProgressBar(): display(df1.head())

df = df.assign(tld=df1)
df.dask

df = df.drop(columns="path")

df1 = get_year(df, "access")
df = df.assign(year=df1)

df = df.drop(columns=["uid","access"])

df
df.dask
df = client.persist(df)
df.dask
df.head()
def ls_path(df, path):
    tmp = df[df.path.str.match(path)]
    tmp = tmp.assign(tld=get_tld(tmp, path))
    
    return tmp
def du_by_year(df, path, time="access"):
    tmp = df[df.path.str.match(path)]

    tmp = tmp.assign(tld=get_tld(tmp, path))
    
    tmp = tmp.assign(year=get_year(tmp, time))
    
    tmp = tmp.drop(columns=["uid", "access", "path"])
    tmp = client.persist(tmp)
    
    tmp = tmp.groupby(['tld', 'year']).sum()
    
    tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
    
    return tmp
                    
def du_by_year(df, path, time="access"):
    tmp = df[df.path.str.match(path)]

    tmp = tmp.assign(tld=get_tld(tmp, path))
    
    tmp = tmp.assign(year=get_year(tmp, time))
    
    tmp = tmp.drop(columns=["uid", time, "path"])
    tmp = client.persist(tmp)
    
    tmp = tmp.groupby(['tld', 'year']).sum()
    
    tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
    
    return tmp
def du_by_tld(df, path, time="access"):
    tmp = df[df.path.str.match(path)]

    
    tmp = tmp.assign(tld=get_tld(tmp, path))
    
    #tmp = tmp.assign(year=get_year(tmp, time))
    
    tmp = tmp.drop(columns=["uid", "access", "path", "year"])
    tmp = client.persist(tmp)
    tmp = tmp.groupby(['tld']).sum()
    
    tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
    
    return tmp
df.dask
df.dask
%time

dudf = du_by_year(df, '/data/project/ccts', "modify")
dudf.dask

%time

dudf = du_by_tld(df, '/data/project/ccts')

%%time

dudf = client.persist(dudf)
dudf.dask

%%time

dudf=dudf.repartition(partition_size="64MB")

dudf
%%time

dudf = client.compute(dudf)
%%time

dudf = dudf.result()
%%time

dudf.sort_values(["tld", "year"])
%%time

tmp=dudf.reset_index()
#tmp[(tmp['tld']=="galaxy")]
tmp[tmp.tld=='galaxy'].sort_values('year')
%%time

dudf.head()
%%time

dudf.groupby("tld").sum()

%%time

lsdf = ls_path(df, '/data/project/ccts/galaxy')

%%time

lsdf = client.persist(lsdf.tld.unique())

lsdf.dask

%%time

lsdf = client.compute(lsdf)

lsdf.result()

df.dask
%%time

dfccts = df[df.path.str.match('/data/project/ccts')]
dfccts.dask
dfccts = dfccts.assign(tld=get_tld(dfccts, '/data/project/ccts/galaxy'))
dfccts
dfccts.dask
%%time

dfccts.head()
df1 = df

lru_projects = ['ICOS', 'boldlab', 'hartmanlab', 'sdtrlab', 'kinglab', 'kobielab', 'MRIPhantom', 'NCRlab', 'bridgeslab', 'hsight', 'kutschlab', 'lcdl', 'metalsgroup', 'rowelab', 'szaflarski_mirman']

condition=df1["tld"].isin(lru_projects)

condition=df1["tld"].isin(["ccts"])

lru=df1[condition]

%%time

with ProgressBar(): display(lru.head())

df.groupby(['tld', 'year']).size.sum.visualize(node_attr={'penwidth': '6'})

df2 = df.groupby(['tld', 'year']).agg({"size": ["sum", "count"]})

df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})

%%time

df2 = report_tld_year(lru)

df
%%time 

df2 = df.groupby(['tld', 'year']).sum()
df2
df2.dask
tbsize = df2["size"]/(10**12)
df2 = df2.assign(terrabytes=tbsize)
df2
df2.dask
report=df2
%%time

report = client.compute(report)
report

Create final report

Create summary format for gigabyte and terabyte columns https://stackoverflow.com/a/20937592/8928529

report["average_size"] = report["sum"]/report["count"]

report["terabytes"] = report["sum"]/(10**12) report["terabytes"] = report["terabytes"].map('{:,.2f}'.format)

report["gigabytes"] = report["sum"]/(10**9) report["gigabytes"] = report["gigabytes"].map('{:,.2f}'.format)

Save report as CSV

%%time

report = report.result()
report
# only create dir if there is data to pickle
if (len(report) and not os.path.isdir(reportdir)):
    os.mkdir(reportdir)
reportdir
%%time

if (verbose): print(f"report: groupby-tld")
report.to_csv(f"{reportdir}/groupby-tld-dask3.csv.gz")
%%time

report.to_parquet(f"{reportdir}/groupby-tld-year-dask4.parquet")

Summarize high-level stats

report
report.reset_index()

report[report["sum"] == report["sum"].max()]

report[(report["size"] > 5*10**13)]

report=report.reset_index()

summer = report.groupby("tld").agg("sum", "sum") #[report["sum"] > 10**13

summer["terabytes"] = summer["sum"]/(10**12) summer["terabytes"] = summer["terabytes"].map('{:,.2f}'.format)

print(summer[summer["sum"] > 10**13].sort_values("sum", ascending=False)[['count', 'terabytes']])

report[(report["sum"] > 10**13) & (report["access"] <= 2021)]

report[(report["sum"] > 10**13) & (report["access"] <= 2021)]["sum"].sum()

report[(report["sum"] <= 10**13) & (report["access"] <= 2021)]["sum"].sum()

report[(report["sum"] > 1013) & (report["access"] < 2023)]["sum"].sum()/1012