import datetime
import pandas as pd
import matplotlib.pyplot as plt
from urllib.parse import unquote
import sys
import os
import pathlib
import re
import dask.dataframe as dd
import dask
-
John-Paul Robinson authored
These nodebooks use dataframes build from parquet files to sanity check file listings from multiple sources.
91e8f412
run report on pickled list policy data
The script reads pickled files that match the glob_pattern
from the pickledir
derived from dirname
and runs the report saving it as a csv to the subdir "dirname
/reports" dir by default.
Some progress info is available via the verbose
flag.
The current report aggrates storage stats by top-level-dir and age (year) of data's last access. The goal of this report is to understand the distribution of lesser used data.
!conda info --envs
!conda list
!pip list -freeze
from dask.diagnostics import ProgressBar
from dask.distributed import Client
Client(scheduler_file='scheduler.json')
client = Client(scheduler_file='scheduler.json')
client = Client()
input vars
dirname="data/list-policy_projects_2023-08-31" # directory to fine files to pickle
glob_pattern = "*.parquet" # file name glob pattern to match, can be file name for individual file
line_regex_filter = ".*" # regex to match lines of interest in file
pickledir=f"{dirname}/parquet"
reportdir=f"{dirname}/reports"
tldpath="/data/project/ccts/galaxy"
verbose = True
limit = 0
Utilities
# get top level dir on which to aggregate
def get_tld(df, dirname):
'''
df: dataframe with path column (e.g. from policy run)
dirname: top level dir (TLD) that contains dirs for report
The function uses the length of dirname to locate the TLD column in the split path.
'''
dirpaths = dirname.split("/")
new=df["path"].str.split("/", n=len(dirpaths)+1, expand=True)
#df=df.assign(tld=new[len(dirpaths)])
#df["tld"] = new[len(dirpaths)]
return new[len(dirpaths)]
# get top level dir on which to aggregate
def get_year(df, column):
'''
df: dataframe with path column (e.g. from policy run)
dirname: top level dir (TLD) that contains dirs for report
The function uses the length of dirname to locate the TLD column in the split path.
'''
new = df[column].dt.year
#dirpaths = dirname.split("/")
#new=df["path"].str.split("/", n=len(dirpaths)+1, expand=True)
#df=df.assign(tld=new[len(dirpaths)])
#df["tld"] = new[len(dirpaths)]
return new
def report_tld_year(df):
'''
Aggregate the sum and count of files by year in the top level dir (TLD)
Uses dict parameter to pandas agg to apply sum and count function to size column
'''
report = df.groupby(['tld', df.access.dt.year]).agg({"size": ["sum", "count"]})
return report
Read and parse the files according to glob_pattern
dask.config.set(scheduler='threads')
dask.config.set(scheduler='processes')
def read_policy_parquet(file, columns=['size', 'access', 'modify', 'uid', 'path'], engine="pyarrow"):
df = dd.read_parquet(file, columns=columns, engine=engine)
df = client.persist(df)
df=df.repartition(partition_size="64MB")
return df
df = dd.read_parquet(f'{pickledir}/list-*.parquet', columns=['size', 'access', 'modify', 'uid', 'path'], engine="pyarrow")
df
df = client.persist(df)
df.dask
%%time
df=df.repartition(partition_size="64MB")
df
%%time
df.map_partitions(len).compute()
df.dask
df = df[df.path.str.startswith("/data/project/ccts/galaxy/")]
%%time
maydf = read_policy_parquet("data/list-policy_projects_2024-05-03/parquet")
maydf = maydf[maydf.path.str.startswith("/data/project/ccts/galaxy/")]
%%time
maydf = maydf.set_index(maydf.path, npartitions="auto")
%%time
augdf = read_policy_parquet("data/list-policy_projects_2023-08-31/parquet")
augdf = augdf[augdf.path.str.startswith("/data/project/ccts/galaxy/")]
%%time
augdf=augdf.repartition(partition_size="64MB")
%%time
maydf=maydf.repartition(partition_size="64MB")
%%time
len(augdf)
%%time
len(maydf)
%%time
augdf = augdf.set_index(augdf.path)
%%time
maydf = maydf.set_index(maydf.path)
%%time
joindf = maydf.join(augdf, how="outer", lsuffix="_may", rsuffix="_aug")
joindf
joindf.dask
%%time
len(joindf)
%%time
len(joindf[joindf.modify_aug.isna()])
%%time
len(joindf[joindf.modify_may.isna()])
modify_comp = joindf.modify_may != joindf.modify_aug
%%time
len(joindf[modify_comp])
%%time
len(joindf[joindf.size_may != joindf.size_aug])
%%time
len(joindf[joindf.size_may == joindf.size_aug])
%%time
len(joindf[joindf.uid_may != joindf.uid_aug])
%%time
len(joindf[joindf.access_may != joindf.access_aug])
%%time
len(joindf[joindf.access_may == joindf.access_aug])
stop
Aggregate stats into running totals
df1=get_tld(df, tldpath)
df1.dask
%%time
with ProgressBar(): display(df1.head())
df = df.assign(tld=df1)
df.dask
df = df.drop(columns="path")
df1 = get_year(df, "access")
df = df.assign(year=df1)
df = df.drop(columns=["uid","access"])
df
df.dask
df = client.persist(df)
df.dask
df.head()
def ls_path(df, path):
tmp = df[df.path.str.match(path)]
tmp = tmp.assign(tld=get_tld(tmp, path))
return tmp
def du_by_year(df, path, time="access"):
tmp = df[df.path.str.match(path)]
tmp = tmp.assign(tld=get_tld(tmp, path))
tmp = tmp.assign(year=get_year(tmp, time))
tmp = tmp.drop(columns=["uid", "access", "path"])
tmp = client.persist(tmp)
tmp = tmp.groupby(['tld', 'year']).sum()
tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
return tmp
def du_by_year(df, path, time="access"):
tmp = df[df.path.str.match(path)]
tmp = tmp.assign(tld=get_tld(tmp, path))
tmp = tmp.assign(year=get_year(tmp, time))
tmp = tmp.drop(columns=["uid", time, "path"])
tmp = client.persist(tmp)
tmp = tmp.groupby(['tld', 'year']).sum()
tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
return tmp
def du_by_tld(df, path, time="access"):
tmp = df[df.path.str.match(path)]
tmp = tmp.assign(tld=get_tld(tmp, path))
#tmp = tmp.assign(year=get_year(tmp, time))
tmp = tmp.drop(columns=["uid", "access", "path", "year"])
tmp = client.persist(tmp)
tmp = tmp.groupby(['tld']).sum()
tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
return tmp
df.dask
df.dask
%time
dudf = du_by_year(df, '/data/project/ccts', "modify")
dudf.dask
%time
dudf = du_by_tld(df, '/data/project/ccts')
%%time
dudf = client.persist(dudf)
dudf.dask
%%time
dudf=dudf.repartition(partition_size="64MB")
dudf
%%time
dudf = client.compute(dudf)
%%time
dudf = dudf.result()
%%time
dudf.sort_values(["tld", "year"])
%%time
tmp=dudf.reset_index()
#tmp[(tmp['tld']=="galaxy")]
tmp[tmp.tld=='galaxy'].sort_values('year')
%%time
dudf.head()
%%time
dudf.groupby("tld").sum()
%%time
lsdf = ls_path(df, '/data/project/ccts/galaxy')
%%time
lsdf = client.persist(lsdf.tld.unique())
lsdf.dask
%%time
lsdf = client.compute(lsdf)
lsdf.result()
df.dask
%%time
dfccts = df[df.path.str.match('/data/project/ccts')]
dfccts.dask
dfccts = dfccts.assign(tld=get_tld(dfccts, '/data/project/ccts/galaxy'))
dfccts
dfccts.dask
%%time
dfccts.head()
df1 = df
lru_projects = ['ICOS', 'boldlab', 'hartmanlab', 'sdtrlab', 'kinglab', 'kobielab', 'MRIPhantom', 'NCRlab', 'bridgeslab', 'hsight', 'kutschlab', 'lcdl', 'metalsgroup', 'rowelab', 'szaflarski_mirman']
condition=df1["tld"].isin(lru_projects)
condition=df1["tld"].isin(["ccts"])
lru=df1[condition]
%%time
with ProgressBar(): display(lru.head())
df.groupby(['tld', 'year']).size.sum.visualize(node_attr={'penwidth': '6'})
df2 = df.groupby(['tld', 'year']).agg({"size": ["sum", "count"]})
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
%%time
df2 = report_tld_year(lru)
df
%%time
df2 = df.groupby(['tld', 'year']).sum()
df2
df2.dask
tbsize = df2["size"]/(10**12)
df2 = df2.assign(terrabytes=tbsize)
df2
df2.dask
report=df2
%%time
report = client.compute(report)
report
Create final report
Create summary format for gigabyte and terabyte columns https://stackoverflow.com/a/20937592/8928529
report["average_size"] = report["sum"]/report["count"]
report["terabytes"] = report["sum"]/(10**12) report["terabytes"] = report["terabytes"].map('{:,.2f}'.format)
report["gigabytes"] = report["sum"]/(10**9) report["gigabytes"] = report["gigabytes"].map('{:,.2f}'.format)
Save report as CSV
%%time
report = report.result()
report
# only create dir if there is data to pickle
if (len(report) and not os.path.isdir(reportdir)):
os.mkdir(reportdir)
reportdir
%%time
if (verbose): print(f"report: groupby-tld")
report.to_csv(f"{reportdir}/groupby-tld-dask3.csv.gz")
%%time
report.to_parquet(f"{reportdir}/groupby-tld-year-dask4.parquet")
Summarize high-level stats
report
report.reset_index()
report[report["sum"] == report["sum"].max()]
report[(report["size"] > 5*10**13)]
report=report.reset_index()
summer = report.groupby("tld").agg("sum", "sum") #[report["sum"] > 10**13
summer["terabytes"] = summer["sum"]/(10**12) summer["terabytes"] = summer["terabytes"].map('{:,.2f}'.format)
print(summer[summer["sum"] > 10**13].sort_values("sum", ascending=False)[['count', 'terabytes']])
report[(report["sum"] > 10**13) & (report["access"] <= 2021)]
report[(report["sum"] > 10**13) & (report["access"] <= 2021)]["sum"].sum()
report[(report["sum"] <= 10**13) & (report["access"] <= 2021)]["sum"].sum()
report[(report["sum"] > 1013) & (report["access"] < 2023)]["sum"].sum()/1012