diff --git a/churn-analysis.ipynb b/churn-analysis.ipynb index 42c35ae092a2ee3e6d75032ee9410f63d3d8ba6c..1b1f6a2745b7bffca3b713a1c608e5155818359a 100644 --- a/churn-analysis.ipynb +++ b/churn-analysis.ipynb @@ -30,7 +30,7 @@ "metadata": {}, "outputs": [], "source": [ - "df = pl.read_database(\"SELECT * FROM churn WHERE prior_log_dt >= '2024-11-14'\",connection=engine)" + "df = pl.read_database(\"SELECT * FROM churn\",connection=engine)" ] }, { @@ -43,12 +43,22 @@ " df.with_columns([\n", " pl.sum_horizontal('modified','deleted','created').alias('total_churn'),\n", " pl.sum_horizontal('created_bytes','deleted_bytes','modified_bytes_net').alias('total_churn_bytes'),\n", + " (pl.sum_horizontal('created_bytes','modified_bytes_net')-pl.col('deleted_bytes')).alias('net_bytes_added'),\n", " pl.col('log_dt','prior_log_dt').str.to_datetime(),\n", " pl.col('tld').cast(pl.Categorical)\n", " ])\n", ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df.sort('tld','log_dt')" + ] + }, { "cell_type": "code", "execution_count": null, @@ -59,12 +69,21 @@ " df\n", " .group_by('tld')\n", " .agg(\n", - " pl.sum('total_churn','total_churn_bytes','accessed','accessed_bytes')\n", + " pl.sum('total_churn','total_churn_bytes','net_bytes_added','accessed','accessed_bytes')\n", " )\n", - " .sort('total_churn',descending=True)\n", + " .sort('net_bytes_added',descending=True)\n", ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tld_agg.with_columns(pl.col('net_bytes_added')/(1024**3))" + ] + }, { "cell_type": "code", "execution_count": null, @@ -74,6 +93,15 @@ "no_churn = tld_agg.filter(pl.col('total_churn').eq(0)).select('tld')" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "no_churn" + ] + }, { "cell_type": "code", "execution_count": null, @@ -85,14 +113,12 @@ " hive_dir.joinpath('**/*.parquet'),\n", " parallel='prefiltered',\n", " hive_partitioning=True,\n", - " try_parse_hive_dates=False\n", + " hive_schema=pl.Schema({'tld':pl.String,'acq':pl.String})\n", " )\n", " .filter(\n", - " pl.col('tld').is_in(no_churn),\n", - " pl.col('acq').eq('2025-01-15')\n", + " pl.col('tld').is_in(no_churn['tld'].to_list()),\n", + " pl.col('acq').eq('2025-05-08')\n", " )\n", - " .select(['tld','size','kballoc'])\n", - " .with_columns(pl.col('tld').cast(pl.Categorical))\n", " .collect(engine='streaming')\n", ")" ] @@ -112,7 +138,29 @@ "metadata": {}, "outputs": [], "source": [ - "inactive_storage['kballoc'].sum()/(1024**3)" + "inactive_storage['size'].sum()/(1024**3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "prev_inactive = [\n", + " 'NCRlab',\n", + " 'SPI',\n", + " 'fobian_lab',\n", + " 'gersteneckerlab',\n", + " 'kobielab',\n", + " 'kutschlab',\n", + " 'lcdl',\n", + " 'muellerlab',\n", + " 'sdtrlab',\n", + " 'xenotransplant',\n", + " 'yanda-lab',\n", + " 'youngerlab',\n", + "]" ] }, { diff --git a/example-job-scripts/30-calculate-churn.sh b/example-job-scripts/30-calculate-churn.sh index 2656f1840ce1e7d72e0c2be7c12061641df4a5c6..d2225fc508c9785a3b3faf9023638872db3a924a 100644 --- a/example-job-scripts/30-calculate-churn.sh +++ b/example-job-scripts/30-calculate-churn.sh @@ -3,14 +3,13 @@ #SBATCH --job-name=calculate-churn #SBATCH --ntasks=1 #SBATCH --cpus-per-task=16 -#SBATCH --mem=90G +#SBATCH --mem=32G #SBATCH --partition=amperenodes-reserve #SBATCH --time=12:00:00 #SBATCH --reservation=rc-gpfs -#SBATCH --gres=gpu:1 #SBATCH --output=out/churn-%A-%a.out #SBATCH --error=out/churn-%A-%a.err -#SBATCH --array=0-166 +#SBATCH --array=0-177 module load Anaconda3 conda activate gpfs @@ -23,23 +22,30 @@ echo "TLD: ${tld}" python << END from pathlib import Path -from datetime import datetime -from dateutil.rrule import rrule, DAILY -from rc_gpfs.process.process import calculate_churn +import numpy as np +import time +from rc_gpfs.db.utils import df_to_sql +from rc_gpfs.process import calculate_churn hive_path = Path("${hive}") tld = "${tld}" -#acqs = hive_path.joinpath(f"tld={tld}").glob("acq=*") -#acq_dates = [p.name.removeprefix("acq=") for p in acqs] +db = hive_path.parent.joinpath('db','data-project.db') -start_date = datetime(2025, 1, 20) -end_date = datetime(2025, 2, 4) +start_date = '2024-05-08' +end_date = '2025-05-10' -acq_dates = [datetime.strftime(d,'%Y-%m-%d') for d in list(rrule(DAILY, dtstart=start_date, until=end_date))] +acq_dates = list(np.arange(np.datetime64(start_date), np.datetime64(end_date), step=1)) -with_cuda=True +churn = calculate_churn(hive_path,tld,acq_dates) -churn = calculate_churn(hive_path,tld,acq_dates,with_cuda) +finished = False + +while not finished: + try: + df_to_sql(churn, db, 'churn') + finished=True + except: + time.sleep(1) END diff --git a/poetry.lock b/poetry.lock index 24f58d2d829bfc926703c90db341920b82602bab..0b75e3c0a97c31c933747a8afa137ad48dc2cc68 100644 --- a/poetry.lock +++ b/poetry.lock @@ -184,6 +184,89 @@ files = [ {file = "packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"}, ] +[[package]] +name = "pandas" +version = "2.2.3" +description = "Powerful data structures for data analysis, time series, and statistics" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pandas-2.2.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:1948ddde24197a0f7add2bdc4ca83bf2b1ef84a1bc8ccffd95eda17fd836ecb5"}, + {file = "pandas-2.2.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:381175499d3802cde0eabbaf6324cce0c4f5d52ca6f8c377c29ad442f50f6348"}, + {file = "pandas-2.2.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:d9c45366def9a3dd85a6454c0e7908f2b3b8e9c138f5dc38fed7ce720d8453ed"}, + {file = "pandas-2.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:86976a1c5b25ae3f8ccae3a5306e443569ee3c3faf444dfd0f41cda24667ad57"}, + {file = "pandas-2.2.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:b8661b0238a69d7aafe156b7fa86c44b881387509653fdf857bebc5e4008ad42"}, + {file = "pandas-2.2.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:37e0aced3e8f539eccf2e099f65cdb9c8aa85109b0be6e93e2baff94264bdc6f"}, + {file = "pandas-2.2.3-cp310-cp310-win_amd64.whl", hash = "sha256:56534ce0746a58afaf7942ba4863e0ef81c9c50d3f0ae93e9497d6a41a057645"}, + {file = "pandas-2.2.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:66108071e1b935240e74525006034333f98bcdb87ea116de573a6a0dccb6c039"}, + {file = "pandas-2.2.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7c2875855b0ff77b2a64a0365e24455d9990730d6431b9e0ee18ad8acee13dbd"}, + {file = "pandas-2.2.3-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd8d0c3be0515c12fed0bdbae072551c8b54b7192c7b1fda0ba56059a0179698"}, + {file = "pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c124333816c3a9b03fbeef3a9f230ba9a737e9e5bb4060aa2107a86cc0a497fc"}, + {file = "pandas-2.2.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:63cc132e40a2e084cf01adf0775b15ac515ba905d7dcca47e9a251819c575ef3"}, + {file = "pandas-2.2.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:29401dbfa9ad77319367d36940cd8a0b3a11aba16063e39632d98b0e931ddf32"}, + {file = "pandas-2.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:3fc6873a41186404dad67245896a6e440baacc92f5b716ccd1bc9ed2995ab2c5"}, + {file = "pandas-2.2.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b1d432e8d08679a40e2a6d8b2f9770a5c21793a6f9f47fdd52c5ce1948a5a8a9"}, + {file = "pandas-2.2.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a5a1595fe639f5988ba6a8e5bc9649af3baf26df3998a0abe56c02609392e0a4"}, + {file = "pandas-2.2.3-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:5de54125a92bb4d1c051c0659e6fcb75256bf799a732a87184e5ea503965bce3"}, + {file = "pandas-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fffb8ae78d8af97f849404f21411c95062db1496aeb3e56f146f0355c9989319"}, + {file = "pandas-2.2.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6dfcb5ee8d4d50c06a51c2fffa6cff6272098ad6540aed1a76d15fb9318194d8"}, + {file = "pandas-2.2.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:062309c1b9ea12a50e8ce661145c6aab431b1e99530d3cd60640e255778bd43a"}, + {file = "pandas-2.2.3-cp312-cp312-win_amd64.whl", hash = "sha256:59ef3764d0fe818125a5097d2ae867ca3fa64df032331b7e0917cf5d7bf66b13"}, + {file = "pandas-2.2.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f00d1345d84d8c86a63e476bb4955e46458b304b9575dcf71102b5c705320015"}, + {file = "pandas-2.2.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:3508d914817e153ad359d7e069d752cdd736a247c322d932eb89e6bc84217f28"}, + {file = "pandas-2.2.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:22a9d949bfc9a502d320aa04e5d02feab689d61da4e7764b62c30b991c42c5f0"}, + {file = "pandas-2.2.3-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f3a255b2c19987fbbe62a9dfd6cff7ff2aa9ccab3fc75218fd4b7530f01efa24"}, + {file = "pandas-2.2.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:800250ecdadb6d9c78eae4990da62743b857b470883fa27f652db8bdde7f6659"}, + {file = "pandas-2.2.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6374c452ff3ec675a8f46fd9ab25c4ad0ba590b71cf0656f8b6daa5202bca3fb"}, + {file = "pandas-2.2.3-cp313-cp313-win_amd64.whl", hash = "sha256:61c5ad4043f791b61dd4752191d9f07f0ae412515d59ba8f005832a532f8736d"}, + {file = "pandas-2.2.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:3b71f27954685ee685317063bf13c7709a7ba74fc996b84fc6821c59b0f06468"}, + {file = "pandas-2.2.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:38cf8125c40dae9d5acc10fa66af8ea6fdf760b2714ee482ca691fc66e6fcb18"}, + {file = "pandas-2.2.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ba96630bc17c875161df3818780af30e43be9b166ce51c9a18c1feae342906c2"}, + {file = "pandas-2.2.3-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1db71525a1538b30142094edb9adc10be3f3e176748cd7acc2240c2f2e5aa3a4"}, + {file = "pandas-2.2.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:15c0e1e02e93116177d29ff83e8b1619c93ddc9c49083f237d4312337a61165d"}, + {file = "pandas-2.2.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:ad5b65698ab28ed8d7f18790a0dc58005c7629f227be9ecc1072aa74c0c1d43a"}, + {file = "pandas-2.2.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:bc6b93f9b966093cb0fd62ff1a7e4c09e6d546ad7c1de191767baffc57628f39"}, + {file = "pandas-2.2.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:5dbca4c1acd72e8eeef4753eeca07de9b1db4f398669d5994086f788a5d7cc30"}, + {file = "pandas-2.2.3-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8cd6d7cc958a3910f934ea8dbdf17b2364827bb4dafc38ce6eef6bb3d65ff09c"}, + {file = "pandas-2.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:99df71520d25fade9db7c1076ac94eb994f4d2673ef2aa2e86ee039b6746d20c"}, + {file = "pandas-2.2.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:31d0ced62d4ea3e231a9f228366919a5ea0b07440d9d4dac345376fd8e1477ea"}, + {file = "pandas-2.2.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:7eee9e7cea6adf3e3d24e304ac6b8300646e2a5d1cd3a3c2abed9101b0846761"}, + {file = "pandas-2.2.3-cp39-cp39-win_amd64.whl", hash = "sha256:4850ba03528b6dd51d6c5d273c46f183f39a9baf3f0143e566b89450965b105e"}, + {file = "pandas-2.2.3.tar.gz", hash = "sha256:4f18ba62b61d7e192368b84517265a99b4d7ee8912f8708660fb4a366cc82667"}, +] + +[package.dependencies] +numpy = {version = ">=1.26.0", markers = "python_version >= \"3.12\""} +python-dateutil = ">=2.8.2" +pytz = ">=2020.1" +tzdata = ">=2022.7" + +[package.extras] +all = ["PyQt5 (>=5.15.9)", "SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-driver-sqlite (>=0.8.0)", "beautifulsoup4 (>=4.11.2)", "bottleneck (>=1.3.6)", "dataframe-api-compat (>=0.1.7)", "fastparquet (>=2022.12.0)", "fsspec (>=2022.11.0)", "gcsfs (>=2022.11.0)", "html5lib (>=1.1)", "hypothesis (>=6.46.1)", "jinja2 (>=3.1.2)", "lxml (>=4.9.2)", "matplotlib (>=3.6.3)", "numba (>=0.56.4)", "numexpr (>=2.8.4)", "odfpy (>=1.4.1)", "openpyxl (>=3.1.0)", "pandas-gbq (>=0.19.0)", "psycopg2 (>=2.9.6)", "pyarrow (>=10.0.1)", "pymysql (>=1.0.2)", "pyreadstat (>=1.2.0)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)", "python-calamine (>=0.1.7)", "pyxlsb (>=1.0.10)", "qtpy (>=2.3.0)", "s3fs (>=2022.11.0)", "scipy (>=1.10.0)", "tables (>=3.8.0)", "tabulate (>=0.9.0)", "xarray (>=2022.12.0)", "xlrd (>=2.0.1)", "xlsxwriter (>=3.0.5)", "zstandard (>=0.19.0)"] +aws = ["s3fs (>=2022.11.0)"] +clipboard = ["PyQt5 (>=5.15.9)", "qtpy (>=2.3.0)"] +compression = ["zstandard (>=0.19.0)"] +computation = ["scipy (>=1.10.0)", "xarray (>=2022.12.0)"] +consortium-standard = ["dataframe-api-compat (>=0.1.7)"] +excel = ["odfpy (>=1.4.1)", "openpyxl (>=3.1.0)", "python-calamine (>=0.1.7)", "pyxlsb (>=1.0.10)", "xlrd (>=2.0.1)", "xlsxwriter (>=3.0.5)"] +feather = ["pyarrow (>=10.0.1)"] +fss = ["fsspec (>=2022.11.0)"] +gcp = ["gcsfs (>=2022.11.0)", "pandas-gbq (>=0.19.0)"] +hdf5 = ["tables (>=3.8.0)"] +html = ["beautifulsoup4 (>=4.11.2)", "html5lib (>=1.1)", "lxml (>=4.9.2)"] +mysql = ["SQLAlchemy (>=2.0.0)", "pymysql (>=1.0.2)"] +output-formatting = ["jinja2 (>=3.1.2)", "tabulate (>=0.9.0)"] +parquet = ["pyarrow (>=10.0.1)"] +performance = ["bottleneck (>=1.3.6)", "numba (>=0.56.4)", "numexpr (>=2.8.4)"] +plot = ["matplotlib (>=3.6.3)"] +postgresql = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "psycopg2 (>=2.9.6)"] +pyarrow = ["pyarrow (>=10.0.1)"] +spss = ["pyreadstat (>=1.2.0)"] +sql-other = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-driver-sqlite (>=0.8.0)"] +test = ["hypothesis (>=6.46.1)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)"] +xml = ["lxml (>=4.9.2)"] + [[package]] name = "plotly" version = "5.24.1" @@ -351,6 +434,45 @@ files = [ [package.dependencies] pytest = ">=3.6" +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +description = "Extensions to the standard Python datetime module" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["main"] +files = [ + {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, + {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, +] + +[package.dependencies] +six = ">=1.5" + +[[package]] +name = "pytz" +version = "2025.2" +description = "World timezone definitions, modern and historical" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00"}, + {file = "pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3"}, +] + +[[package]] +name = "six" +version = "1.17.0" +description = "Python 2 and 3 compatibility utilities" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["main"] +files = [ + {file = "six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274"}, + {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, +] + [[package]] name = "sqlalchemy" version = "2.0.40" @@ -494,7 +616,19 @@ files = [ {file = "typing_extensions-4.13.2.tar.gz", hash = "sha256:e6c81219bd689f51865d9e372991c540bda33a0379d5573cddb9a3a23f7caaef"}, ] +[[package]] +name = "tzdata" +version = "2025.2" +description = "Provider of IANA time zone data" +optional = false +python-versions = ">=2" +groups = ["main"] +files = [ + {file = "tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8"}, + {file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}, +] + [metadata] lock-version = "2.1" python-versions = ">=3.12,<4.0" -content-hash = "b774e71210a804105de6c03976133f42953ebfc5be59ef0d2c6cc62c842a6372" +content-hash = "17b055646277a1fb5b9eda3c31365f62a0e029aa9e79bf3ef5c02c7d09eaf9e1" diff --git a/pyproject.toml b/pyproject.toml index 0120b6f4a8fd720fa86ccb718ef2b30839b78eed..f56d1eebd42d142a3b68d10f0fcbcd883dde50be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ pytest = "^8.3.5" pytest-datafiles = "^3.0.0" sqlalchemy = "*" typeguard = "*" +pandas = "^2.2.3" [[tool.poetry.source]] name="rapids" diff --git a/src/rc_gpfs/process/process.py b/src/rc_gpfs/process/process.py index 6d0a3c39025468812157114cdcf73a4b7db2a354..a2542ed7a625a6c85e04064e7cf7372f488f9d88 100644 --- a/src/rc_gpfs/process/process.py +++ b/src/rc_gpfs/process/process.py @@ -133,75 +133,77 @@ def calculate_churn( f"Fewer than two given policy acquisition dates contained data for {tld} in {hive_path}." ) - churn_l = [] - - df_init = ( + df = ( pl.scan_parquet( dataset_path.joinpath("**/*.parquet"), hive_partitioning=True, hive_schema=pl.Schema({"tld": pl.String, "acq": pl.String}), - parallel="prefiltered", + parallel="auto", ) - .filter(pl.col("tld").eq(tld), pl.col("acq").eq(acq_dates[0])) - .select(["path", "modify", "size", "access"]) - ).collect(engine="streaming") + .select(["path", "acq", "access", "modify", "size"]) + .with_columns(pl.col("path").hash()) + ) + + churn_l = [] for i in range(1, len(acq_dates)): - df_target = ( - pl.scan_parquet( - dataset_path.joinpath("**/*.parquet"), - hive_partitioning=True, - hive_schema=pl.Schema({"tld": pl.String, "acq": pl.String}), - parallel="prefiltered", - ) - .filter(pl.col("tld").eq(tld), pl.col("acq").eq(acq_dates[i])) - .select(["path", "modify", "size", "access"]) - ).collect(engine="streaming") + log_date = acq_dates[i] + prior_date = acq_dates[i - 1] + + df1 = df.filter(pl.col("acq").eq(prior_date)) + df2 = df.filter(pl.col("acq").eq(log_date)) - churn = _calculate_churn(df_init, df_target).with_columns( - pl.lit(acq_dates[i]).alias("log_dt"), - pl.lit(acq_dates[i - 1]).alias("prior_log_dt"), + churn = _calculate_churn(df1, df2) + + churn = churn.with_columns( + pl.lit(log_date).alias("log_dt"), + pl.lit(prior_date).alias("prior_log_dt"), pl.lit(tld).alias("tld"), ) churn_l.append(churn) - # This delete pattern paired with the loop creates a type of rotating list where each dataframe, aside from - # the initial and final, is processed as the target and the source for which files exist at a given time. - # The target is then referred to as the source as we move through the time series. Each source is removed - # from memory. This both limits the amount of memory used to only two datasets at a time while also - # only reading each dataset once. - del df_init - df_init = df_target # noqa: F841 - del df_target - - churn_df = pl.concat(churn_l) + churn_l = pl.collect_all(churn_l, engine="streaming") + churn_df = pl.concat(churn_l, how="vertical_relaxed").pivot( + on="name", index=["tld", "log_dt", "prior_log_dt"], values="value" + ) return churn_df -def _calculate_churn(df1, df2) -> None: - empty_df = pl.DataFrame(data={"name": CHURN_TBL_COLS, "value": 0}) - if df1.equals(df2): - return empty_df.transpose(column_names="name") +def _calculate_churn(df1, df2) -> pl.DataFrame: + empty_df = pl.LazyFrame(data={"name": CHURN_TBL_COLS, "value": 0}) dfm = df1.join(df2, how="full", on="path", suffix="_updated", coalesce=True) conditions = [ - dfm["access"].is_null(), - dfm["access_updated"].is_null(), - (dfm["modify"] != dfm["modify_updated"]).fill_null(False), - (dfm["access"] != dfm["access_updated"]).fill_null(False), + pl.col("access").is_null(), + pl.col("access_updated").is_null(), + pl.col("modify").ne(pl.col("modify_updated")), + pl.col("access").ne(pl.col("access_updated")), + ] + + choices = [ + pl.lit("created"), + pl.lit("deleted"), + pl.lit("modified"), + pl.lit("accessed"), ] - choices = ["created", "deleted", "modified", "accessed"] + default = pl.lit("unchanged") dfm = dfm.with_columns( - pl.Series( - name="type", - values=np.select(conditions, choices, default="unchanged"), - dtype=pl.Categorical, - ) + pl.when(conditions[0]) + .then(choices[0]) + .when(conditions[1]) + .then(choices[1]) + .when(conditions[2]) + .then(choices[2]) + .when(conditions[3]) + .then(choices[3]) + .otherwise(default) + .cast(pl.Categorical) + .alias("type") ) dfm = dfm.filter(pl.col("type").ne("unchanged")).drop( @@ -209,7 +211,10 @@ def _calculate_churn(df1, df2) -> None: ) modified = dfm.filter(pl.col("type").eq("modified")) - modified_bytes_net = modified["size_updated"].sum() - modified["size"].sum() + modified_bytes_net = modified.select( + pl.lit("modified_bytes_net").alias("name"), + (pl.sum("size_updated") - pl.sum("size")).alias("value"), + ) # Instead of writing logic to aggregate across initial size for deleted files and final size for all other # files, we can essentially condense size across both columns into a new column. Size of deleted files will @@ -231,12 +236,13 @@ def _calculate_churn(df1, df2) -> None: ) .with_columns(pl.col("name").str.strip_suffix("_files")) .select(["name", "value"]) - .extend( - pl.DataFrame({"name": "modified_bytes_net", "value": modified_bytes_net}) - ) + ) + + agg_df = ( + pl.concat([agg_df, modified_bytes_net], how="vertical_relaxed") .join(empty_df.select("name"), on="name", how="right") .fill_null(0) - .transpose(column_names="name") + .sort("name") ) return agg_df