Time Series on HPC#

Kernel installation#

  • Run the following line (script) once, if the required kernel (big-data-dda-kernel) is not installed.

  • You need to install this kernel only once

%%bash
mkdir -p ~/.local/share/jupyter/kernels/ai4seismology-bigdata
cp /data/horse/ws/s4122485-ai4seismology_dev/thursday_bigdata/kernel.json ~/.local/share/jupyter/kernels/ai4seismology-bigdata

Important!!!#

Once the Kernel is installed,

  1. Reload the notebook (reload/refresh the web page)

  2. Select the kernel: Menu -> Kernel -> Change Kernel -> Select “dda-kernel” Always use this kernel for upcoming exercises.

Select ai4seismology-bigdata kernel#

import sys
!{sys.executable} -m pip install --user --upgrade ipympl jupyter_leaflet leafmap ipyleaflet

Data Processing#

Start Spark Standalone Cluster#

# Import utilities required to run big data frameworks on ZIH HPC systems
from big_data_utils.environment_utils import ClusterConfig
from big_data_utils.cluster_utils import ClusterService
# Configure the cluster environment
myconfig = ClusterConfig(fw_name="spark")
myconfig.configure_env(conf_dest="./my-conf",randomize_ports=True)
[INFO ] [08/05/2025 10:52:20] - Removing existing configuration directory: './my-conf'
[INFO ] [08/05/2025 10:52:21] - Environment configuration initialized:
[INFO ] [08/05/2025 10:52:21] -   • Framework:        SPARK
[INFO ] [08/05/2025 10:52:21] -   • Config template:  /software/rapids/r24.04/Spark/3.5.0-GCC-13.2.0-hadoop3/conf
[INFO ] [08/05/2025 10:52:21] -   • Config target:    ./my-conf
[INFO ] [08/05/2025 10:52:21] -   • Log directory:    ./my-conf/log
[INFO ] [08/05/2025 10:52:21] - Initializing configuration from template.
[INFO ] [08/05/2025 10:52:23] - Cluster topology:
[INFO ] [08/05/2025 10:52:23] -   • Master node:  n1042:7904
[INFO ] [08/05/2025 10:52:23] -   • Worker nodes: n1042
[INFO ] [08/05/2025 10:52:23] -   • Spark master URL: spark://n1042:7904
[INFO ] [08/05/2025 10:52:23] - Once the cluster is started, one can access the spark GUI in browser using port forwarding.
[INFO ] [08/05/2025 10:52:23] - To access, spark GUI, type following in your terminal on local machine:
[INFO ] [08/05/2025 10:52:23] -   ssh s4122485@login1.barnard.hpc.tu-dresden.de -L 4040:n1042:4040 -L 8080:n1042:8080 -L 8081:n1042:8081
[INFO ] [08/05/2025 10:52:23] - Once the port is forwarded, one can access the GUI, by accessing
[INFO ] [08/05/2025 10:52:23] -   • http://localhost:4040
[INFO ] [08/05/2025 10:52:23] -   • http://localhost:8080
[INFO ] [08/05/2025 10:52:23] -   • http://localhost:8081
# Initialize the cluster service class
mycluster = ClusterService("spark")

# Check which java processes are running
mycluster.check_status()
[INFO ] [08/05/2025 10:52:26] - Currently, following java processes are running:
[INFO ] [08/05/2025 10:52:26] - 	Process ID, Name 
# Start Spark standalone cluster
mycluster.start_cluster()
[INFO ] [08/05/2025 10:52:31] - Starting SPARK cluster.
[INFO ] [08/05/2025 10:52:37] - Logging cluster startup info at: ./my-conf/spark/log/cluster.log

----Performance Data----
Duration: 5.41

CPU Util (Across CPUs)       	AVG: 40.08	 MIN: 0.00	 MAX: 100.00
Mem Util in GB (Across nodes)	AVG: 215.77	 MIN: 215.61	 MAX: 215.95
IO Ops (excl.) Read          	Total: 25004
               Write         	Total: 3718
IO Bytes (excl.) Read        	Total: 101.08
                 Write       	Total: 0.64
# Check if the master and worker processes are started or not
mycluster.check_status()
[INFO ] [08/05/2025 10:52:39] - Currently, following java processes are running:
[INFO ] [08/05/2025 10:52:39] - 	Process ID, Name 
[INFO ] [08/05/2025 10:52:39] - 	2875051, Master
[INFO ] [08/05/2025 10:52:39] - 	2875137, Worker

Spark context initialization#

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from glob import glob
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.window import Window
import pandas as pd
import matplotlib.pyplot as plt
# Create spark session
spark = SparkSession.builder \
                    .master(f"spark://{myconfig.get_master_host()}:{myconfig.get_master_port()}") \
                    .appName("TimeSeriesAnalysis") \
                    .getOrCreate()
sc = spark.sparkContext
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 10:52:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
----Performance Data----
Duration: 4.82

CPU Util (Across CPUs)       	AVG: 28.81	 MIN: 0.00	 MAX: 100.00
Mem Util in GB (Across nodes)	AVG: 216.19	 MIN: 216.05	 MAX: 216.33
IO Ops (excl.) Read          	Total: 27276
               Write         	Total: 3833
IO Bytes (excl.) Read        	Total: 129.13
                 Write       	Total: 0.73

Data loading#

The following data contain measurements of power consumption (unit: watts) and temperature (unit: °C) of CPUs on the HPC cluster of TU Dresden. Every single CPU is identified by its socket_id.

df = spark.read.parquet("/data/horse/ws/s4122485-ai4seismology/big_data/2025-04-21-2025-04-25timestamp-socket_id-power_sum-temperature3113440779630811.parquet")
                                                                                
----Performance Data----
Duration: 5.08

CPU Util (Across CPUs)       	AVG: 43.62	 MIN: 18.10	 MAX: 100.00
Mem Util in GB (Across nodes)	AVG: 217.03	 MIN: 216.95	 MAX: 217.11
IO Ops (excl.) Read          	Total: 27549
               Write         	Total: 3870
IO Bytes (excl.) Read        	Total: 131.33
                 Write       	Total: 0.77
df.printSchema()
df.show()
print(f"Number of rows {df.count()}")
root
 |-- timestamp: string (nullable = true)
 |-- socket_id: long (nullable = true)
 |-- power_sum: double (nullable = true)
 |-- temperature: double (nullable = true)
                                                                                
+-------------------+---------+---------+-----------+
|          timestamp|socket_id|power_sum|temperature|
+-------------------+---------+---------+-----------+
|2025-04-22 00:12:58|     6977|    140.0|      52.86|
|2025-04-22 00:02:32|     6977|    140.0|     52.954|
|2025-04-22 00:02:17|     6977|    140.0|     52.704|
|2025-04-22 00:04:24|     6977|    140.0|     52.922|
|2025-04-22 00:24:57|     6977|    138.0|     53.094|
|2025-04-22 00:20:44|     6977|    140.0|     53.079|
|2025-04-22 00:19:58|     6977|    140.0|     53.219|
|2025-04-22 00:25:10|     6977|    140.0|     53.094|
|2025-04-22 00:04:50|     6977|    139.0|     52.579|
|2025-04-22 00:21:35|     6977|    139.0|      52.75|
|2025-04-22 00:01:57|     6977|    140.0|     52.813|
|2025-04-22 00:11:32|     6977|    139.0|     52.938|
|2025-04-22 00:07:20|     6977|    140.0|     52.672|
|2025-04-22 00:25:12|     6977|    139.0|     53.094|
|2025-04-22 00:09:49|     6977|    140.0|     52.719|
|2025-04-22 00:02:38|     6977|    140.0|     52.704|
|2025-04-22 00:15:35|     6977|    132.0|     52.766|
|2025-04-22 00:13:41|     6977|    139.0|     52.563|
|2025-04-22 00:10:41|     6977|    139.0|     52.985|
|2025-04-22 00:18:50|     6977|    139.0|     52.844|
+-------------------+---------+---------+-----------+
only showing top 20 rows

Number of rows 861970

----Performance Data----
Duration: 5.28

CPU Util (Across CPUs)       	AVG: 43.34	 MIN: 16.30	 MAX: 91.70
Mem Util in GB (Across nodes)	AVG: 217.30	 MIN: 217.17	 MAX: 217.38
IO Ops (excl.) Read          	Total: 27789
               Write         	Total: 3916
IO Bytes (excl.) Read        	Total: 139.43
                 Write       	Total: 0.80

Basic cleaning and data summary#

# set meaningful data types
df = df \
    .withColumn("timestamp", df.timestamp.cast(TimestampType())) \
    .withColumn("temperature", df.temperature.cast(FloatType())) \
    .withColumn("power_sum", df.power_sum.cast(FloatType())) \
    .withColumn("socket_id", df.socket_id.cast(StringType())) \
    .orderBy("timestamp")
df.printSchema()
root
 |-- timestamp: timestamp (nullable = true)
 |-- socket_id: string (nullable = true)
 |-- power_sum: float (nullable = true)
 |-- temperature: float (nullable = true)
# getting summary statistics
# hint: The summary function tries to interpret strings as numbers.
df.summary().show()
25/05/08 10:56:17 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
+-------+------------------+------------------+------------------+
|summary|         socket_id|         power_sum|       temperature|
+-------+------------------+------------------+------------------+
|  count|            861970|            861970|            861970|
|   mean|            6977.5|123.43936679930856|52.821822555181576|
| stddev|0.5000002900335522|26.089912860755167| 4.227714869610159|
|    min|              6977|              29.0|            40.954|
|    25%|            6977.0|             118.0|            51.344|
|    50%|            6978.0|             135.0|            53.032|
|    75%|            6978.0|             140.0|            54.829|
|    max|              6978|             200.0|             66.61|
+-------+------------------+------------------+------------------+


----Performance Data----
Duration: 6.39

CPU Util (Across CPUs)       	AVG: 44.52	 MIN: 7.00	 MAX: 85.70
Mem Util in GB (Across nodes)	AVG: 217.70	 MIN: 217.61	 MAX: 217.83
IO Ops (excl.) Read          	Total: 28221
               Write         	Total: 4001
IO Bytes (excl.) Read        	Total: 148.93
                 Write       	Total: 0.88
# socket_id is a categorical column, check unique values
df.select("socket_id").distinct().show()
+---------+
|socket_id|
+---------+
|     6978|
|     6977|
+---------+
# count number of observations per group
df.groupBy("socket_id").count()  # Physical plan (no repartition)
print("Partitions (original df):", df.rdd.getNumPartitions())
df.groupBy("socket_id").agg(
    F.count("temperature").alias("temp_count_per_socket"),
    F.mean("temperature").alias("mean_temp_socket")
).show()
[Stage 16:=============================>                            (1 + 1) / 2]
Partitions (original df): 4
                                                                                
+---------+---------------------+-----------------+
|socket_id|temp_count_per_socket| mean_temp_socket|
+---------+---------------------+-----------------+
|     6978|               430985|51.69739268016241|
|     6977|               430985|53.94625243020074|
+---------+---------------------+-----------------+

Data reduction using resampling#

For easier handling of big data volumes, data reduction is essential. One common approach is subsampling, as it allows prototyping/developing the operations of interest and plotting.

# Step 1: Get all distinct socket_ids and convert to python list
distinct_ids = df.select("socket_id").distinct().rdd.flatMap(lambda x: x).collect()
print(f'type of dinstinct_ids: {type(distinct_ids)}')

# Step 2: Assign the same fraction (e.g., 10%) to each ID
sampling_fraction = 0.005
fractions = {id_: sampling_fraction for id_ in distinct_ids}

# Step 3: Perform sampling per socket_id
sampled = df.sampleBy("socket_id", fractions, seed=42)
sampled.show()
type of dinstinct_ids: <class 'list'>
+-------------------+---------+---------+-----------+
|          timestamp|socket_id|power_sum|temperature|
+-------------------+---------+---------+-----------+
|2025-04-21 00:02:07|     6978|    154.0|     52.438|
|2025-04-21 00:02:36|     6978|    146.0|     52.266|
|2025-04-21 00:09:59|     6978|    145.0|     52.454|
|2025-04-21 00:10:04|     6977|    141.0|     52.657|
|2025-04-21 00:11:42|     6978|    147.0|     52.469|
|2025-04-21 00:13:41|     6978|     88.0|      50.25|
|2025-04-21 00:14:05|     6978|    155.0|     52.219|
|2025-04-21 00:15:12|     6977|    139.0|     52.375|
|2025-04-21 00:17:29|     6978|    161.0|     52.485|
|2025-04-21 00:19:49|     6978|    107.0|     52.516|
|2025-04-21 00:21:20|     6977|    149.0|       52.5|
|2025-04-21 00:23:54|     6977|    146.0|     52.157|
|2025-04-21 00:27:16|     6978|    155.0|     52.313|
|2025-04-21 00:28:07|     6977|    145.0|       52.5|
|2025-04-21 00:30:18|     6978|    146.0|     52.375|
|2025-04-21 00:30:56|     6977|    146.0|     52.282|
|2025-04-21 00:31:04|     6978|    146.0|     52.204|
|2025-04-21 00:32:12|     6977|    138.0|     52.625|
|2025-04-21 00:36:45|     6978|    124.0|     51.844|
|2025-04-21 00:37:09|     6977|    128.0|     52.204|
+-------------------+---------+---------+-----------+
only showing top 20 rows
import matplotlib.pyplot as plt
import pandas as pd

# Collect and prepare
pdf = sampled.select("timestamp", "temperature", "power_sum", "socket_id").toPandas()
pdf["timestamp"] = pd.to_datetime(pdf["timestamp"])
pdf = pdf.sort_values(["socket_id", "timestamp"])

# --------- Plot 1: Temperature ---------
plt.figure(figsize=(12, 6))
for socket_id, group in pdf.groupby("socket_id"):
    plt.plot(group["timestamp"], group["temperature"], label=f"Socket {socket_id}", alpha=0.6)
plt.title("Temperature Over Time by Socket ID")
plt.xlabel("Timestamp")
plt.ylabel("Temperature")
plt.xticks(rotation=45)
plt.tight_layout()
plt.legend(title="Socket ID", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.show()

# --------- Plot 2: Power Sum ---------
plt.figure(figsize=(12, 6))
for socket_id, group in pdf.groupby("socket_id"):
    plt.plot(group["timestamp"], group["power_sum"], label=f"Socket {socket_id}", alpha=0.6)
plt.title("Power Sum Over Time by Socket ID")
plt.xlabel("Timestamp")
plt.ylabel("Power Sum")
plt.xticks(rotation=45)
plt.tight_layout()
plt.legend(title="Socket ID", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.show()
----Performance Data----
Duration: 5.32

CPU Util (Across CPUs)       	AVG: 11.32	 MIN: 0.00	 MAX: 38.20
Mem Util in GB (Across nodes)	AVG: 218.19	 MIN: 218.15	 MAX: 218.22
IO Ops (excl.) Read          	Total: 29600
               Write         	Total: 4200
IO Bytes (excl.) Read        	Total: 188.62
                 Write       	Total: 1.06

Moving average calculation#

# getting number of observations per hour to determine the order of the moving average
df = df.withColumn("hour", F.date_trunc("hour", "timestamp"))

obs_per_hour = df.groupBy("hour").agg(F.count("*").alias("count_per_hour"))
obs_per_hour.orderBy("hour").show()
mean_obs = obs_per_hour.agg(F.mean("count_per_hour").alias("avg_obs_per_hour"))
avg_obs_per_hour = int(mean_obs.collect()[0]["avg_obs_per_hour"])
print(f'average number of counts per hour: {avg_obs_per_hour}')                            
ma_order = avg_obs_per_hour  # Moving average order

# Define window: partition by ID, order by time, look back k-1 rows
window_spec = (
    Window
    .partitionBy("socket_id")
    .orderBy("timestamp")
    .rowsBetween(-ma_order + 1, 0)
)

# Add moving average column
df = df.withColumn("temperature_ma", F.avg("temperature").over(window_spec))
df.show()
# Step 1: Get all distinct socket_ids and convert to python list
distinct_ids = df.select("socket_id").distinct().rdd.flatMap(lambda x: x).collect()
print(f'type of dinstinct_ids: {type(distinct_ids)}')

# Step 2: Assign the same fraction (e.g., 10%) to each ID
sampling_fraction = 0.005
fractions = {id_: sampling_fraction for id_ in distinct_ids}

# Step 3: Perform sampling per socket_id
sampled = df.sampleBy("socket_id", fractions, seed=42)
sampled.show()
# Collect and prepare
pdf = sampled.select("timestamp", "temperature", "temperature_ma", "socket_id").toPandas()
pdf["timestamp"] = pd.to_datetime(pdf["timestamp"])
pdf = pdf.sort_values(["socket_id", "timestamp"])
plt.figure(figsize=(12, 6))

# Loop through each socket_id group
for socket_id, group in pdf.groupby("socket_id"):
    # Plot raw temperature
    plt.plot(group["timestamp"], group["temperature"],
             label=f"Socket {socket_id} (Raw)", alpha=0.4)
    
    # Plot moving average
    plt.plot(group["timestamp"], group["temperature_ma"],
             label=f"Socket {socket_id} (MA)", linewidth=2)

plt.title("Temperature Over Time with Moving Average by Socket ID")
plt.xlabel("Timestamp")
plt.ylabel("Temperature")
plt.xticks(rotation=45)
plt.tight_layout()
plt.legend(title="Socket ID", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.show()

Lazy Evaluation: Transformation vs. Action#

%%time
# Groupwise summary: mean temperature per socket
# this is a lazy operation
grouped_df = df.groupBy("socket_id").agg(F.mean("temperature").alias("mean_temperature"))
CPU times: user 927 ÎĽs, sys: 1.96 ms, total: 2.89 ms
Wall time: 18 ms
%%time
# actual computation is done here, as show() is an action
grouped_df.show()
                                                                                
+---------+-----------------+
|socket_id| mean_temperature|
+---------+-----------------+
|     6978|51.69739268016241|
|     6977|53.94625243020074|
+---------+-----------------+

CPU times: user 2.92 ms, sys: 11 ms, total: 13.9 ms
Wall time: 2.52 s

Stop the Spark context and Spark standalone cluster#

# Stopping spark context and checking the cluster status
sc.stop()
mycluster.stop_cluster()
mycluster.check_status()
[INFO ] [08/05/2025 10:51:42] - Stopping SPARK cluster.
[INFO ] [08/05/2025 10:51:46] - Logging cluster stopping info at: ./my-conf/spark/log/cluster.log
[INFO ] [08/05/2025 10:51:46] - Currently, following java processes are running:
[INFO ] [08/05/2025 10:51:46] - 	Process ID, Name 
# Sometimes the SparkSubmit process is not killed, and requires to be killed manually
mycluster.kill_cluster_processes()
mycluster.check_status()
[INFO ] [08/05/2025 10:51:49] - Killing SPARK cluster processes.
[INFO ] [08/05/2025 10:51:49] - Killing process SparkSubmit
[INFO ] [08/05/2025 10:51:49] - Killing process CoarseGrainedExecutorBackend
[INFO ] [08/05/2025 10:51:49] - Killing process Master
[INFO ] [08/05/2025 10:51:49] - Killing process Worker
[INFO ] [08/05/2025 10:51:49] - Cluster processes killed.
[INFO ] [08/05/2025 10:51:49] - Currently, following java processes are running:
[INFO ] [08/05/2025 10:51:49] - 	Process ID, Name