Skip to content

Monitor

This page documents the monitoring stack under jumper_extension.monitor, including the core monitor and metric backends. High‑level usage is described in the Public API and Jupyter API sections; the content below is generated directly from the Python code.

Core

MonitorUnavailableError

Bases: RuntimeError

This monitor is a stub and cannot be used.

Source code in jumper_extension/monitor/common.py
245
246
class MonitorUnavailableError(RuntimeError):
    """This monitor is a stub and cannot be used."""

OfflinePerformanceMonitor

Offline monitor that satisfies MonitorProtocol.

It holds static data frames plus metadata from a manifest; does not collect live data.

Source code in jumper_extension/monitor/common.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
class OfflinePerformanceMonitor:
    """Offline monitor that satisfies MonitorProtocol.

    It holds static data frames plus metadata from a manifest; does not collect live data.
    """

    def __init__(
        self,
        *,
        manifest: Dict,
        perf_dfs: Dict[str, pd.DataFrame],
        source: Optional[str] = None,
    ):
        monitor_info = manifest.get("monitor", {})

        # Protocol surface
        self.interval = float(monitor_info.get("interval", 1.0) or 1.0)
        self.running = False
        self.start_time = monitor_info.get("start_time")
        self.stop_time = monitor_info.get("stop_time")

        # Hardware/context
        self.num_cpus = int(monitor_info.get("num_cpus", 0) or 0)
        self.num_system_cpus = int(monitor_info.get("num_system_cpus", self.num_cpus) or self.num_cpus)
        self.num_gpus = int(monitor_info.get("num_gpus", 0) or 0)
        self.gpu_memory = float(monitor_info.get("gpu_memory", 0.0) or 0.0)
        self.gpu_name = monitor_info.get("gpu_name", "") or ""
        self.cpu_handles = monitor_info.get("cpu_handles", []) or []
        self.memory_limits = monitor_info.get("memory_limits", {}) or {}

        # Performance data container
        self.data = PerformanceData(
            self.num_cpus,
            self.num_system_cpus,
            self.num_gpus,
        )
        for level, df in (perf_dfs or {}).items():
            try:
                self.data._validate_level(level)
            except Exception:
                pass
            self.data.data[level] = df

        # Imported session state
        self.is_imported = True
        self.session_source = source

    # No-op lifecycle
    def start(self, interval: float = 1.0) -> None:
        self.interval = interval
        self.running = False

    def stop(self) -> None:
        self.running = False

PerformanceMonitor

Source code in jumper_extension/monitor/common.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class PerformanceMonitor:
    def __init__(self):
        self.interval = 1.0
        self.running = False
        self.start_time = None
        self.stop_time = None
        self.monitor_thread = None
        self.process = psutil.Process()
        self.n_measurements = 0
        self.n_missed_measurements = 0
        """
        on MacOS cpu_affinity is not implemented in psutil 
        (raises AttributeError)
        set the num_cpus to the number of cpus in the system
        same for cpu_affinity
        """
        try:
            self.cpu_handles = self.process.cpu_affinity()
            self.num_cpus = len(self.cpu_handles)
        except AttributeError:
            self.cpu_handles = []
            self.num_cpus = len(psutil.cpu_percent(percpu=True))
        self.num_system_cpus = len(psutil.cpu_percent(percpu=True))
        self.pid = os.getpid()
        self.uid = os.getuid()
        self.slurm_job = os.environ.get("SLURM_JOB_ID", 0)
        self.levels = get_available_levels()
        self.process_pids = []

        self.memory_limits = {
            level: detect_memory_limit(level, self.uid, self.slurm_job)
            for level in self.levels
        }

        self._process_backend = PsutilProcessBackend(self)
        self._cpu_backend = PsutilCpuBackend(self)
        self._memory_backend = PsutilMemoryBackend(self)
        self._io_backend = PsutilIoBackend(self)
        for backend in (
            self._process_backend,
            self._cpu_backend,
            self._memory_backend,
            self._io_backend,
        ):
            backend.setup()

        self.nvidia_gpu_handles = []
        self.amd_gpu_handles = []
        self.gpu_memory = 0
        self.gpu_name = ""
        self._gpu_backends = GpuBackendDiscovery(self).discover()
        for backend in self._gpu_backends:
            backend.setup()
        self.num_gpus = len(self.nvidia_gpu_handles) + len(
            self.amd_gpu_handles
        )
        self.metrics = [
            "cpu",
            "memory",
            "io_read",
            "io_write",
            "io_read_count",
            "io_write_count",
        ]

        if self.num_gpus:
            self.metrics.extend(["gpu_util", "gpu_band", "gpu_mem"])

        self.data = PerformanceData(
            self.num_cpus, self.num_system_cpus, self.num_gpus
        )
        # session state
        self.is_imported = False
        self.session_source = None

    def _get_process_pids(self):
        return self._process_backend.get_process_pids()

    def _validate_level(self, level):
        if level not in self.levels:
            raise ValueError(
                EXTENSION_ERROR_MESSAGES[
                    ExtensionErrorCode.INVALID_LEVEL
                ].format(level=level, levels=self.levels)
            )

    def _filter_process(self, proc, mode):
        return self._process_backend.filter_process(proc, mode)

    def _get_filtered_processes(self, level="user", mode="cpu", handle=None):
        return self._process_backend.get_filtered_processes(
            level, mode, handle
        )

    def _safe_proc_call(self, proc, proc_func, default=0):
        return self._process_backend.safe_proc_call(proc, proc_func, default)

    def _collect_cpu(self, level="process"):
        return self._cpu_backend.collect(level)

    def _collect_memory(self, level="process"):
        return self._memory_backend.collect(level)

    def _collect_io(self, level="process"):
        return self._io_backend.collect(level)

    def _collect_gpu(self, level="process"):
        if self.num_gpus == 0:
            return [], [], []

        self._validate_level(level)
        gpu_util, gpu_band, gpu_mem = [], [], []

        for backend in self._gpu_backends:
            b_util, b_band, b_mem = backend.collect(level)
            gpu_util.extend(b_util)
            gpu_band.extend(b_band)
            gpu_mem.extend(b_mem)

        return gpu_util, gpu_band, gpu_mem


    def _collect_metrics(self):
        time_mark = time.perf_counter()
        return tuple(
            (
                time_mark,
                self._collect_cpu(level),
                self._collect_memory(level),
                *self._collect_gpu(level),
                self._collect_io(level),
            )
            for level in self.levels
        )

    def _collect_data(self):
        while self.running:
            time_start_measurement = time.perf_counter()
            self.process_pids = self._get_process_pids()
            metrics = self._collect_metrics()
            for level, data_tuple in zip(self.levels, metrics):
                self.data.add_sample(level, *data_tuple)
            time_measurement = time.perf_counter() - time_start_measurement
            self.n_measurements += 1
            if time_measurement > self.interval:
                """
                logger.warning(
                    EXTENSION_INFO_MESSAGES[
                        ExtensionInfoCode.IMPRECISE_INTERVAL
                    ].format(interval=self.interval),
                    end="\r",
                )
                """
                self.n_missed_measurements += 1
            else:
                time.sleep(self.interval - time_measurement)

    def start(self, interval: float = 1.0):
        if self.running:
            logger.warning(
                EXTENSION_ERROR_MESSAGES[
                    ExtensionErrorCode.MONITOR_ALREADY_RUNNING
                ]
            )
            return
        self.interval = interval
        self.start_time = time.perf_counter()
        self.running = True
        self.monitor_thread = threading.Thread(
            target=self._collect_data, daemon=True
        )
        self.monitor_thread.start()
        logger.info(
            EXTENSION_INFO_MESSAGES[ExtensionInfoCode.MONITOR_STARTED].format(
                pid=self.pid,
                interval=self.interval,
            )
        )

    def stop(self):
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join(timeout=2.0)
        self.stop_time = time.perf_counter()
        logger.info(
            EXTENSION_INFO_MESSAGES[ExtensionInfoCode.MONITOR_STOPPED].format(
                seconds=self.stop_time - self.start_time
            )
        )
        logger.info(
            EXTENSION_INFO_MESSAGES[ExtensionInfoCode.MISSED_MEASUREMENTS].format(
                perc_missed_measurements=self.n_missed_measurements / self.n_measurements
            )
        )

n_missed_measurements = 0 instance-attribute

on MacOS cpu_affinity is not implemented in psutil (raises AttributeError) set the num_cpus to the number of cpus in the system same for cpu_affinity

UnavailablePerformanceMonitor

A stub that type-checks against PerformanceMonitor Protocol but fails at runtime.

  • Declares all required attributes for structural typing.
  • Any attribute access or method call raises MonitorUnavailableError, except 'running', which is always readable and returns False.
Source code in jumper_extension/monitor/common.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class UnavailablePerformanceMonitor:
    """
    A stub that type-checks against PerformanceMonitor Protocol but fails at runtime.

    - Declares all required attributes for structural typing.
    - Any attribute access or method call raises MonitorUnavailableError,
      except 'running', which is always readable and returns False.
    """

    # --- Protocol surface ---
    interval: float
    data: "PerformanceData"
    start_time: Optional[float]
    num_cpus: int
    num_system_cpus: int
    num_gpus: int
    gpu_memory: float
    memory_limits: dict
    cpu_handles: list[int]
    gpu_name: str
    running: bool

    def start(self, interval: float = 1.0) -> None: ...
    def stop(self) -> None: ...

    # --- Runtime behavior ---
    def __init__(self, reason: str = "Performance monitor is not available"):
        object.__setattr__(self, "_reason", reason)

    def __getattribute__(self, name: str):
        # allow a few safe attributes + running
        if name in {
            "_reason", "__class__", "__repr__", "__str__",
            "__init__", "__getattribute__", "__setattr__",
            "__dict__", "__annotations__"
        }:
            return object.__getattribute__(self, name)

        if name == "running":
            return False

        reason = object.__getattribute__(self, "_reason")
        raise MonitorUnavailableError(f"Access to '{name}' is not allowed: {reason}")

    def __setattr__(self, name: str, value):
        if name in {"_reason", "__dict__", "__annotations__"}:
            return object.__setattr__(self, name, value)
        reason = object.__getattribute__(self, "_reason")
        raise MonitorUnavailableError(f"Setting '{name}' is not allowed: {reason}")

    def __repr__(self) -> str:
        return f"<UnavailablePerformanceMonitor: {self._reason}>"

CPU

PsutilCpuBackend

Bases: CpuBackend

CPU backend implemented via psutil.

Source code in jumper_extension/monitor/metrics/cpu/psutil.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class PsutilCpuBackend(CpuBackend):
    """CPU backend implemented via psutil."""

    name = "cpu-psutil"

    def collect(self, level: str = "process") -> list[float]:
        self._m._validate_level(level)
        if level == "system":
            # just return the whole system here
            cpu_util_per_core = psutil.cpu_percent(percpu=True)
            return cpu_util_per_core
        elif level == "process":
            # get process pids
            pids = self._m.process_pids
            cpu_total = sum(
                self._m._process_backend.safe_proc_call(
                    pid, lambda p: p.cpu_percent(interval=0.1)
                )
                for pid in pids
            )
            return [cpu_total / self._m.num_cpus] * self._m.num_cpus
        else:  # user or slurm
            cpu_total = sum(
                self._m._process_backend.safe_proc_call(
                    proc, lambda p: p.cpu_percent()
                )
                for proc in self._m._process_backend.get_filtered_processes(
                    level, "cpu"
                )
            )
            return [cpu_total / self._m.num_cpus] * self._m.num_cpus

Memory

PsutilMemoryBackend

Bases: MemoryBackend

Memory backend implemented via psutil.

Source code in jumper_extension/monitor/metrics/memory/psutil.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class PsutilMemoryBackend(MemoryBackend):
    """Memory backend implemented via psutil."""

    name = "memory-psutil"

    def collect(self, level: str = "process") -> float:
        self._m._validate_level(level)
        if level == "system":
            return (
                psutil.virtual_memory().total
                - psutil.virtual_memory().available
            ) / (1024**3)
        elif level == "process":
            pids = self._m.process_pids
            memory_total = sum(
                self._m._process_backend.safe_proc_call(
                    pid, lambda p: p.memory_full_info().uss
                )
                for pid in pids
            )
            return memory_total / (1024**3)
        else:  # user or slurm
            memory_total = sum(
                self._m._process_backend.safe_proc_call(
                    proc, lambda p: p.memory_full_info().uss, 0
                )
                for proc in self._m._process_backend.get_filtered_processes(
                    level, "cpu"
                )
            )
            return memory_total / (1024**3)

IO

PsutilIoBackend

Bases: IoBackend

I/O backend implemented via psutil.

Source code in jumper_extension/monitor/metrics/io/psutil.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class PsutilIoBackend(IoBackend):
    """I/O backend implemented via psutil."""

    name = "io-psutil"

    def collect(self, level: str = "process") -> list[int]:
        self._m._validate_level(level)
        totals = [0, 0, 0, 0]
        if level == "process":
            pids = self._m.process_pids
            for pid in pids:
                io_data = self._m._process_backend.safe_proc_call(
                    pid, lambda p: p.io_counters()
                )
                if io_data:
                    totals[0] += io_data.read_count
                    totals[1] += io_data.write_count
                    totals[2] += io_data.read_bytes
                    totals[3] += io_data.write_bytes
        elif level == "system":
            for proc in psutil.process_iter(["pid"]):
                io_data = self._m._process_backend.safe_proc_call(
                    proc, lambda p: p.io_counters()
                )
                if io_data:
                    totals[0] += io_data.read_count
                    totals[1] += io_data.write_count
                    totals[2] += io_data.read_bytes
                    totals[3] += io_data.write_bytes
        else:  # user or slurm
            for proc in self._m._process_backend.get_filtered_processes(
                level, "cpu"
            ):
                io_data = self._m._process_backend.safe_proc_call(
                    proc, lambda p: p.io_counters()
                )
                if io_data:
                    totals[0] += io_data.read_count
                    totals[1] += io_data.write_count
                    totals[2] += io_data.read_bytes
                    totals[3] += io_data.write_bytes
        return totals

Process

PsutilProcessBackend

Bases: ProcessBackend

Process backend implemented via psutil.

Source code in jumper_extension/monitor/metrics/process/psutil.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class PsutilProcessBackend(ProcessBackend):
    """Process backend implemented via psutil."""

    name = "process-psutil"

    def get_process_pids(self) -> set[int]:
        """Get current process PID and all its children PIDs."""
        pids = {self._m.pid}
        try:
            pids.update(
                child.pid for child in self._m.process.children(recursive=True)
            )
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            pass
        return pids

    def filter_process(self, proc: psutil.Process, mode: str) -> bool:
        """Check if process matches the filtering mode."""
        try:
            if mode == "user":
                return proc.uids().real == self._m.uid
            elif mode == "slurm":
                if not is_slurm_available():
                    return False
                return proc.environ().get("SLURM_JOB_ID") == str(
                    self._m.slurm_job
                )
        except (psutil.AccessDenied, psutil.NoSuchProcess):
            pass
        return False

    def get_filtered_processes(
        self,
        level: str = "user",
        mode: str = "cpu",
        handle: Optional[object] = None,
    ):
        """Get filtered processes for CPU or GPU monitoring."""
        if mode == "cpu":
            return [
                proc
                for proc in psutil.process_iter(["pid", "uids"])
                if self.safe_proc_call(
                    proc, lambda p: self.filter_process(p, level), False
                )
            ]
        elif mode == "nvidia_gpu":
            try:
                import pynvml
            except ImportError:
                return [], []
            all_procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
            filtered = [
                p
                for p in all_procs
                if self.safe_proc_call(
                    p.pid,
                    lambda proc: self.filter_process(proc, level),
                    False,
                )
            ]
            return filtered, all_procs
        else:
            raise ValueError(f"Unknown mode: {mode}")

    def safe_proc_call(
        self,
        proc,
        proc_func: Callable[[psutil.Process], Any],
        default=0,
    ):
        """Safely call a process method and return default on error."""
        try:
            if not isinstance(proc, psutil.Process):
                # proc might be a pid. Moved Process creation here to catch
                # exceptions at the same place
                proc = psutil.Process(proc)
            result = proc_func(proc)
            return result if result is not None else default
        except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
            return default
        except TypeError:
            # in test case, where psutil is a mock
            if isinstance(psutil.Process, unittest.mock.MagicMock):
                return default

filter_process(proc, mode)

Check if process matches the filtering mode.

Source code in jumper_extension/monitor/metrics/process/psutil.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def filter_process(self, proc: psutil.Process, mode: str) -> bool:
    """Check if process matches the filtering mode."""
    try:
        if mode == "user":
            return proc.uids().real == self._m.uid
        elif mode == "slurm":
            if not is_slurm_available():
                return False
            return proc.environ().get("SLURM_JOB_ID") == str(
                self._m.slurm_job
            )
    except (psutil.AccessDenied, psutil.NoSuchProcess):
        pass
    return False

get_filtered_processes(level='user', mode='cpu', handle=None)

Get filtered processes for CPU or GPU monitoring.

Source code in jumper_extension/monitor/metrics/process/psutil.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_filtered_processes(
    self,
    level: str = "user",
    mode: str = "cpu",
    handle: Optional[object] = None,
):
    """Get filtered processes for CPU or GPU monitoring."""
    if mode == "cpu":
        return [
            proc
            for proc in psutil.process_iter(["pid", "uids"])
            if self.safe_proc_call(
                proc, lambda p: self.filter_process(p, level), False
            )
        ]
    elif mode == "nvidia_gpu":
        try:
            import pynvml
        except ImportError:
            return [], []
        all_procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
        filtered = [
            p
            for p in all_procs
            if self.safe_proc_call(
                p.pid,
                lambda proc: self.filter_process(proc, level),
                False,
            )
        ]
        return filtered, all_procs
    else:
        raise ValueError(f"Unknown mode: {mode}")

get_process_pids()

Get current process PID and all its children PIDs.

Source code in jumper_extension/monitor/metrics/process/psutil.py
15
16
17
18
19
20
21
22
23
24
def get_process_pids(self) -> set[int]:
    """Get current process PID and all its children PIDs."""
    pids = {self._m.pid}
    try:
        pids.update(
            child.pid for child in self._m.process.children(recursive=True)
        )
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        pass
    return pids

safe_proc_call(proc, proc_func, default=0)

Safely call a process method and return default on error.

Source code in jumper_extension/monitor/metrics/process/psutil.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def safe_proc_call(
    self,
    proc,
    proc_func: Callable[[psutil.Process], Any],
    default=0,
):
    """Safely call a process method and return default on error."""
    try:
        if not isinstance(proc, psutil.Process):
            # proc might be a pid. Moved Process creation here to catch
            # exceptions at the same place
            proc = psutil.Process(proc)
        result = proc_func(proc)
        return result if result is not None else default
    except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
        return default
    except TypeError:
        # in test case, where psutil is a mock
        if isinstance(psutil.Process, unittest.mock.MagicMock):
            return default

GPU

GpuBackend

A pluggable backend that provides GPU discovery and metric collection.

Source code in jumper_extension/monitor/metrics/gpu/common.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class GpuBackend:
    """A pluggable backend that provides GPU discovery and metric collection."""

    name = "gpu-base"

    def __init__(self, monitor: Optional["MonitorProtocol"] = None):
        self._monitor = monitor

    def setup(self) -> None:
        """Initialize backend and attach any discovered handles to the monitor."""
        return None

    def shutdown(self) -> None:
        """Clean up resources if needed."""
        return None

    def _iter_handles(self) -> Iterable[object]:
        return []

    def _collect_system(self, handle: object) -> tuple[float, float, float]:
        raise NotImplementedError

    def _collect_process(self, handle: object) -> tuple[float, float, float]:
        raise NotImplementedError

    def _collect_other(
            self, handle: object, level: str
    ) -> tuple[float, float, float]:
        raise NotImplementedError

    def collect(self, level: str = "process"):
        """Collect metrics for the given level.

        Returns: (gpu_util, gpu_band, gpu_mem)
        """
        gpu_util, gpu_band, gpu_mem = [], [], []

        for handle in self._iter_handles():
            if level == "system":
                util, band, mem = self._collect_system(handle)
            elif level == "process":
                util, band, mem = self._collect_process(handle)
            else:  # user or slurm
                util, band, mem = self._collect_other(handle, level)
            gpu_util.append(util)
            gpu_band.append(band)
            gpu_mem.append(mem)

        return gpu_util, gpu_band, gpu_mem

collect(level='process')

Collect metrics for the given level.

Returns: (gpu_util, gpu_band, gpu_mem)

Source code in jumper_extension/monitor/metrics/gpu/common.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def collect(self, level: str = "process"):
    """Collect metrics for the given level.

    Returns: (gpu_util, gpu_band, gpu_mem)
    """
    gpu_util, gpu_band, gpu_mem = [], [], []

    for handle in self._iter_handles():
        if level == "system":
            util, band, mem = self._collect_system(handle)
        elif level == "process":
            util, band, mem = self._collect_process(handle)
        else:  # user or slurm
            util, band, mem = self._collect_other(handle, level)
        gpu_util.append(util)
        gpu_band.append(band)
        gpu_mem.append(mem)

    return gpu_util, gpu_band, gpu_mem

setup()

Initialize backend and attach any discovered handles to the monitor.

Source code in jumper_extension/monitor/metrics/gpu/common.py
16
17
18
def setup(self) -> None:
    """Initialize backend and attach any discovered handles to the monitor."""
    return None

shutdown()

Clean up resources if needed.

Source code in jumper_extension/monitor/metrics/gpu/common.py
20
21
22
def shutdown(self) -> None:
    """Clean up resources if needed."""
    return None

GpuBackendDiscovery

Selects GPU backends based on what's available at runtime.

Source code in jumper_extension/monitor/metrics/gpu/common.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class GpuBackendDiscovery:
    """Selects GPU backends based on what's available at runtime."""

    def __init__(self, monitor):
        self._monitor = monitor

    def discover(self):
        from jumper_extension.monitor.metrics.gpu.nvml import NvmlGpuBackend
        from jumper_extension.monitor.metrics.gpu.adlx import AdlxGpuBackend

        return [
            NvmlGpuBackend(self._monitor),
            AdlxGpuBackend(self._monitor),
        ]

NullGpuBackend

Bases: GpuBackend

A no-op backend used when no GPU backend is available.

Source code in jumper_extension/monitor/metrics/gpu/common.py
59
60
61
62
63
64
65
class NullGpuBackend(GpuBackend):
    """A no-op backend used when no GPU backend is available."""

    name = "gpu-disabled"

    def _iter_handles(self):
        return []

NvmlGpuBackend

Bases: GpuBackend

NVIDIA NVML backend (uses pynvml).

Source code in jumper_extension/monitor/metrics/gpu/nvml.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class NvmlGpuBackend(GpuBackend):
    """NVIDIA NVML backend (uses pynvml)."""

    name = "nvidia-nvml"

    def __init__(self, monitor):
        super().__init__(monitor)
        self._pynvml = None

    def _iter_handles(self):
        return self._monitor.nvidia_gpu_handles

    def _get_util_rates(self, handle):
        if self._pynvml is None:
            class DefaultUtilRates:
                gpu = 0.0
                memory = 0.0

            return DefaultUtilRates()
        try:
            return self._pynvml.nvmlDeviceGetUtilizationRates(handle)
        except self._pynvml.NVMLError:
            # If permission denied or other error, use default values
            class DefaultUtilRates:
                gpu = 0.0
                memory = 0.0

            return DefaultUtilRates()

    def setup(self) -> None:
        # Logic is intentionally kept identical to the previous implementation.
        try:
            import pynvml

            pynvml.nvmlInit()
            self._pynvml = pynvml
            globals()["pynvml"] = pynvml
            ngpus = self._pynvml.nvmlDeviceGetCount()
            self._monitor.nvidia_gpu_handles = [
                self._pynvml.nvmlDeviceGetHandleByIndex(i)
                for i in range(ngpus)
            ]
            if self._monitor.nvidia_gpu_handles:
                handle = self._monitor.nvidia_gpu_handles[0]
                gpu_mem = round(
                    self._pynvml.nvmlDeviceGetMemoryInfo(handle).total
                    / (1024**3),
                    2,
                    )
                if self._monitor.gpu_memory == 0:
                    self._monitor.gpu_memory = gpu_mem
                name = self._pynvml.nvmlDeviceGetName(handle)
                gpu_name = name.decode() if isinstance(name, bytes) else name
                if not self._monitor.gpu_name:
                    self._monitor.gpu_name = gpu_name
                else:
                    self._monitor.gpu_name += f", {gpu_name}"
        except ImportError:
            logger.warning(
                EXTENSION_ERROR_MESSAGES[
                    ExtensionErrorCode.PYNVML_NOT_AVAILABLE
                ]
            )
            self._monitor.nvidia_gpu_handles = []
        except Exception:
            logger.warning(
                EXTENSION_ERROR_MESSAGES[
                    ExtensionErrorCode.NVIDIA_DRIVERS_NOT_AVAILABLE
                ]
            )
            self._monitor.nvidia_gpu_handles = []

    def _collect_system(self, handle):
        util_rates = self._get_util_rates(handle)
        memory_info = self._pynvml.nvmlDeviceGetMemoryInfo(handle)
        return util_rates.gpu, 0.0, memory_info.used / (1024**3)

    def _collect_process(self, handle):
        util_rates = self._get_util_rates(handle)
        pids = self._monitor.process_pids
        process_mem = (
                sum(
                    p.usedGpuMemory
                    for p in self._pynvml.nvmlDeviceGetComputeRunningProcesses(
                        handle
                    )
                    if p.pid in pids and p.usedGpuMemory
                )
                / (1024**3)
        )
        return util_rates.gpu if process_mem > 0 else 0.0, 0.0, process_mem

    def _collect_other(self, handle, level: str):
        util_rates = self._get_util_rates(handle)
        filtered_gpu_processes, all_processes = (
            self._monitor._get_filtered_processes(level, "nvidia_gpu", handle)
        )
        filtered_mem = (
                sum(
                    p.usedGpuMemory
                    for p in filtered_gpu_processes
                    if p.usedGpuMemory
                )
                / (1024**3)
        )
        filtered_util = (
            (
                    util_rates.gpu
                    * len(filtered_gpu_processes)
                    / max(len(all_processes), 1)
            )
            if filtered_gpu_processes
            else 0.0
        )
        return filtered_util, 0.0, filtered_mem

    def shutdown(self) -> None:
        return None

AdlxGpuBackend

Bases: GpuBackend

AMD ADLX backend (uses ADLXPybind).

Source code in jumper_extension/monitor/metrics/gpu/adlx.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class AdlxGpuBackend(GpuBackend):
    """AMD ADLX backend (uses ADLXPybind)."""

    name = "amd-adlx"

    def __init__(self, monitor: "PerformanceMonitor"):
        super().__init__(monitor)
        self._adlx_helper = None
        self._adlx_system = None

    def _iter_handles(self):
        return self._monitor.amd_gpu_handles

    def setup(self) -> None:
        # Logic is intentionally kept identical to the previous implementation.
        try:
            from ADLXPybind import ADLXHelper, ADLX_RESULT

            self._adlx_helper = ADLXHelper()
            if self._adlx_helper.Initialize() != ADLX_RESULT.ADLX_OK:
                self._monitor.amd_gpu_handles = []
                return
            self._adlx_system = self._adlx_helper.GetSystemServices()
            gpus_list = self._adlx_system.GetGPUs()
            num_amd_gpus = gpus_list.Size()
            self._monitor.amd_gpu_handles = [
                gpus_list.At(i) for i in range(num_amd_gpus)
            ]
            if self._monitor.amd_gpu_handles:
                gpu = self._monitor.amd_gpu_handles[0]
                # Get memory info
                gpu_mem_info = gpu.TotalVRAM()
                gpu_mem = round(gpu_mem_info / (1024**3), 2)
                if self._monitor.gpu_memory == 0:
                    self._monitor.gpu_memory = gpu_mem
                # Get GPU name
                gpu_name = gpu.Name()
                if not self._monitor.gpu_name:
                    self._monitor.gpu_name = gpu_name
                else:
                    self._monitor.gpu_name += f", {gpu_name}"
        except ImportError:
            logger.warning(
                EXTENSION_ERROR_MESSAGES[
                    ExtensionErrorCode.ADLX_NOT_AVAILABLE
                ]
            )
            self._monitor.amd_gpu_handles = []
        except Exception:
            logger.warning(
                EXTENSION_ERROR_MESSAGES[
                    ExtensionErrorCode.AMD_DRIVERS_NOT_AVAILABLE
                ]
            )
            self._monitor.amd_gpu_handles = []

    def _collect_system(self, handle):
        try:
            if self._adlx_system is None:
                return 0.0, 0.0, 0.0
            # Get performance metrics interface
            perf_monitoring = (
                self._adlx_system.GetPerformanceMonitoringServices()
            )

            # Get current metrics
            current_metrics = perf_monitoring.GetCurrentPerformanceMetrics(
                handle
            )

            # Get GPU utilization
            util = current_metrics.GPUUsage()

            # Get memory info
            mem_info = current_metrics.GPUVRAMUsage()

            # AMD ADLX doesn't provide memory bandwidth easily
            return util, 0.0, mem_info / 1024.0
        except Exception:
            # If we can't get metrics, return zeros
            return 0.0, 0.0, 0.0

    def _collect_process(self, handle):
        # AMD ADLX doesn't provide per-process metrics easily
        return 0.0, 0.0, 0.0

    def _collect_other(self, handle, level: str):
        # AMD ADLX doesn't provide per-user metrics easily
        return 0.0, 0.0, 0.0

    def shutdown(self) -> None:
        return None