Data Providers

Note

All get_* methods apply workload filters and date range automatically unless stated otherwise.

ClusterLoadDataProvider

class applications.load.providers.cluster_load.ClusterLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for cluster load metrics (cores, cost, CO₂, power).

Thin adapter over the cluster_load service. Resolves the accounting data source, workload filters, and hardware configuration from the workload, then delegates computation to the service layer.

Flat methods return aggregated timeseries DTOs. Grouped methods (*_by_group()) accept a grouping_key argument and return per-group timeseries.

Example

>>> provider = ClusterLoadDataProvider(workload)
>>> load = provider.get_core_load()
>>> print(f"Utilization: {load.utilization_rate:.2%}")
>>>
>>> daily = provider.get_core_load(resolution=Resolution.DAY)
>>> grouped = provider.get_core_load_by_group(grouping_key="Account")
get_co2_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[Co2LoadDTO | None, ResultStatus]

Get cluster CO₂ footprint timeseries (kgCO₂e).

Parameters:

resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

Returns:

Tuple of Co2LoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_co2_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[GroupedCo2LoadDTO | None, ResultStatus]

Get CO₂ footprint grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

Returns:

Tuple of GroupedCo2LoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_core_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[CoreLoadDTO | None, ResultStatus]

Get allocated cores timeseries with hardware capacity.

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

CoreLoadDTO with allocated/requested cores, capacity, and utilization.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> load = provider.get_core_load()
>>> print(f"Utilization: {load.utilization_rate:.1%}")
>>> load = provider.get_core_load(resolution=Resolution.DAY)
get_core_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GroupedCoreLoadDTO | None, ResultStatus]

Get core load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GroupedCoreLoadDTO with one GroupEntryDTO per group.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> grouped = provider.get_core_load_by_group(grouping_key="Account")
>>> for g in grouped.groups:
...     print(g.group, g.avg_running)
get_corehour_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[CorehourLoadDTO | None, ResultStatus]

Get core-hour load timeseries (allocated cores × elapsed time).

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

Tuple of CorehourLoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_cost_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[CostLoadDTO | None, ResultStatus]

Get cluster cost load timeseries.

Parameters:

resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

Returns:

Tuple of CostLoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_cost_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[GroupedCostLoadDTO | None, ResultStatus]

Get cost load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

Returns:

Tuple of GroupedCostLoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_power_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[PowerLoadDTO | None, ResultStatus]

Get cluster power consumption timeseries (watts).

Parameters:

resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

Returns:

Tuple of PowerLoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_power_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[GroupedPowerLoadDTO | None, ResultStatus]

Get power consumption grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

Returns:

Tuple of GroupedPowerLoadDTO (or None if no data) and ResultStatus.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

ConcurrentUsersDataProvider

class applications.concusers.providers.concurrent_users.ConcurrentUsersDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for concurrent users metrics.

Thin facade over the submit_users and runwait_users services. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = ConcurrentUsersDataProvider(workload)
>>> dto, status = provider.get_submit_users()
get_running_and_waiting_users(resolution: ComputeLoadResolution = ComputeLoadResolution.DAY) tuple[RunningAndWaitingUsersDTO | None, ResultStatus]

Get distinct user counts for jobs that are running or waiting.

Parameters:

resolution – Time resolution string (e.g. "1hour", "1day").

Returns:

Tuple of (RunningAndWaitingUsersDTO, ResultStatus). DTO is None when status is not OK.

Raises:

DataProviderError – If accounting filters are missing or query fails.

get_submit_users(resolution: ComputeLoadResolution = ComputeLoadResolution.DAY) tuple[SubmitUserCountsDTO | None, ResultStatus]

Get distinct user counts bucketed by job submit time.

Parameters:

resolution – Time resolution string (e.g. "1hour", "1day").

Returns:

Tuple of (SubmitUserCountsDTO, ResultStatus). DTO is None when status is not OK.

Raises:

DataProviderError – If accounting filters are missing or query fails.

CongestionDataProvider

class applications.congestion.providers.congestion.CongestionDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for congestion/contention analysis.

Thin adapter over the congestion service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = CongestionDataProvider(workload)
>>> metrics = provider.get_congestion_metrics()
>>> print(f"Avg running: {metrics.avg_running_pct:.1f}%")
get_congestion_metrics(data_type: DataType = DataType.CORE, resolution: ComputeLoadResolution = ComputeLoadResolution.DAY) tuple[CongestionMetricsDTO | None, ResultStatus]

Compute congestion metrics for the workload.

Parameters:
  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

  • resolution – Time bucket size (e.g. “1day”, “1hour”).

Returns:

Tuple of (CongestionMetricsDTO, ResultStatus). DTO is None when status is not OK.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

ConsumersDataProvider

class applications.consumers.providers.consumers.ConsumersDataProvider(workload: Workload)

Bases: BaseDataProvider

Provide consumers group/detail metrics for a given workload.

Thin facade over the consumers services. Resolves the accounting data source and workload filters, then delegates to the service layer. Wraps unexpected exceptions from the service into DataProviderError.

Examples

>>> provider = ConsumersDataProvider(workload)
>>> dto, status = provider.get_group_aggregation("Account")
get_detail_stats(group: str, group_obj_name: str, subgroup: str | None = None, subgroup_obj_name: str | None = None) tuple[DetailStatsDTO | None, ResultStatus]

Per-field stats for jobs in a single group (and optionally subgroup).

Parameters:
  • group – Group field name.

  • group_obj_name – Value of group to filter on.

  • subgroup – Optional secondary field name.

  • subgroup_obj_name – Optional value of subgroup to filter on.

Returns:

Tuple of (DetailStatsDTO, ResultStatus). DTO is None when status is not OK.

Raises:

DataProviderError – If accounting filters are missing or the query fails.

get_group_aggregation(group: str, subgroup: str | None = None) tuple[GroupAggregationDTO | None, ResultStatus]

Aggregate job metrics by group (and optionally subgroup).

Parameters:
  • group – Field name to aggregate on (e.g. "Account").

  • subgroup – Optional secondary field name (e.g. "User").

Returns:

Tuple of (GroupAggregationDTO, ResultStatus). DTO is None when status is not OK.

Raises:

DataProviderError – If accounting filters are missing or the query fails.

CoresDataProvider

class applications.resources.providers.cores.CoresDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for core allocation distribution analysis.

Thin facade over the cores service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = CoresDataProvider(workload)
>>> dto, status = provider.get_flat_distribution()
>>> dto, status = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution() tuple[CoresFlatDistributionDTO | None, ResultStatus]

Get ungrouped core allocation distribution.

Returns:

Tuple of (dto, ResultStatus.OK) with per-bin statistics and extended descriptive stats, or (None, ResultStatus.NO_DATA) when no jobs have allocated cores.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10) tuple[CoresGroupedDistributionDTO | None, ResultStatus]

Get grouped core allocation distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

Returns:

Tuple of (dto, ResultStatus.OK) with per-group statistics, (None, ResultStatus.NO_DATA) when no jobs have allocated cores, or (None, ResultStatus.NO_RESULTS) when the grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

CoresMemoryDataProvider

class applications.resources.providers.cores_memory.CoresMemoryDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for cores-vs-memory matrix analysis.

Example

>>> provider = CoresMemoryDataProvider(workload)
>>> result = provider.get_flat_matrix(constants.MAXRSS)
>>> result = provider.get_grouped_matrix(constants.MAXRSS, grouping_type="uid")
get_flat_matrix(req_or_consumed: str = 'MaxRSS') tuple[CoresMemoryFlatMatrixDTO | None, ResultStatus]

Get ungrouped cores-vs-memory 2D matrix.

Parameters:

req_or_consumed – Memory field — constants.REQMEM or constants.MAXRSS.

Returns:

Tuple of (dto, ResultStatus.OK) on success, or (None, ResultStatus.NO_DATA) when the memory field is absent or no jobs have allocated cores/memory.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_matrix(req_or_consumed: str = 'MaxRSS', grouping_type: str = 'uid', grouping_size: int = 10) tuple[CoresMemoryGroupedMatrixDTO | None, ResultStatus]

Get grouped cores-vs-memory 2D matrix.

Parameters:
  • req_or_consumed – Memory field — constants.REQMEM or constants.MAXRSS.

  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return.

Returns:

Tuple of (dto, ResultStatus.OK) on success, (None, ResultStatus.NO_DATA) when the memory field is absent, or (None, ResultStatus.NO_RESULTS) when grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

ExecTimeDataProvider

class applications.throughput.providers.exec_time.ExecTimeDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for execution time metrics.

Thin facade over the exec_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = ExecTimeDataProvider(workload)
>>> result = provider.get_exec_time()
get_exec_time(data_type: DataType = DataType.CORE) tuple[ExecTimeDistributionDTO | None, ResultStatus]

Get execution time histogram.

Parameters:

data_type – Resource dimension — CPU or GPU.

Returns:

(dto, ResultStatus.OK) with bins and resource-hour sums, or (None, ResultStatus.NO_RESULTS) when no jobs match the filters.

get_exec_time_grouped(data_type: DataType = DataType.CORE, grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[GroupedExecTimeEntryDTO] | None, ResultStatus]

Get grouped execution time histogram.

Parameters:
  • data_type – Resource dimension — CPU or GPU.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Tuple of (list of GroupedExecTimeEntryDTO, ResultStatus). List is None when status is not OK.

get_exec_time_vs_timelimit(config: ExecTimeVsTimelimitConfig | None = None) tuple[ExecTimeVsTimelimitDTO | None, ResultStatus]

Get execution time vs timelimit ratio distribution.

Parameters:

config – Override configuration. When None, loads from DB.

Returns:

(dto, ResultStatus.OK) with categorized ratio data, or (None, ResultStatus.NO_DATA) when the Timelimit field is absent, or (None, ResultStatus.NO_RESULTS) when no jobs match the filters.

get_exec_time_vs_timelimit_grouped(config: ExecTimeVsTimelimitConfig | None = None, grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[GroupedExecTimeVsTimelimitEntryDTO] | None, ResultStatus]

Get grouped execution time vs timelimit ratio distribution.

Parameters:
  • config – Override configuration. When None, loads from DB.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Tuple of (list of GroupedExecTimeVsTimelimitEntryDTO, ResultStatus). List is None when status is not OK.

GpuDataProvider

class applications.resources.providers.gpu.GpuDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for GPU distribution analysis.

Thin facade over the GPU service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = GpuDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution(gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) tuple[GpuFlatDistributionDTO | None, ResultStatus]

Get ungrouped GPU distribution.

Parameters:

gpu_field – ES field name — GpuAccountingField.ALLOC_GPUS or GpuAccountingField.REQ_GPUS.

Returns:

Tuple of (GpuFlatDistributionDTO, ResultStatus). Status is NO_DATA when the field is absent or no jobs have a positive GPU value.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10, gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) tuple[GpuGroupedDistributionDTO | None, ResultStatus]

Get grouped GPU distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

  • gpu_field – ES field name — GpuAccountingField.ALLOC_GPUS or GpuAccountingField.REQ_GPUS.

Returns:

Tuple of (GpuGroupedDistributionDTO, ResultStatus). Status is NO_DATA when the field is absent or no jobs have a positive GPU value, or NO_RESULTS when the grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

GpuLoadDataProvider

class applications.load.providers.gpu_load.GpuLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for GPU load metrics.

Thin adapter over the gpu_load service. Resolves the accounting data source, workload filters, and hardware GPU configuration from the workload, then delegates computation to the service layer.

Flat methods return aggregated timeseries DTOs. Grouped methods (*_by_group()) accept a grouping_key argument and return per-group timeseries.

Example

>>> provider = GpuLoadDataProvider(workload)
>>> load = provider.get_gpu_load()
>>> print(f"GPU Utilization: {load.utilization_rate:.2%}")
>>>
>>> grouped = provider.get_gpu_load_by_group(grouping_key="Account")
get_gpu_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GpuLoadDTO | None, ResultStatus]

Get allocated GPUs timeseries with hardware capacity.

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GpuLoadDTO with allocated/requested GPUs, capacity, and utilization.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> load = provider.get_gpu_load()
>>> print(f"GPU Utilization: {load.utilization_rate:.1%}")
get_gpu_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GroupedGpuLoadDTO | None, ResultStatus]

Get GPU load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GroupedGpuLoadDTO with one GroupEntryDTO per group.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

get_gpuhour_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GpuhourLoadDTO | None, ResultStatus]

Get GPU-hour load timeseries (allocated GPUs × elapsed time).

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GpuhourLoadDTO with GPU-hour accounting fields.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

JobFrequencyDataProvider

class applications.throughput.providers.job_freq.JobFrequencyDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for job frequency and interarrival metrics.

Thin facade over the job_freq service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = JobFrequencyDataProvider(workload)
>>> result = provider.get_job_frequency()
get_interarrival(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) tuple[dict | None, ResultStatus]

Get job interarrival time statistics.

Parameters:

datetime_col – Timestamp column.

Returns:

(interarrival_dict, OK) on success, where the dict has data, columns, and median keys. (None, NO_DATA) when the index does not exist. (None, NO_RESULTS) when no jobs match.

get_interarrival_grouped(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) tuple[dict | None, ResultStatus]

Get grouped interarrival time statistics.

Parameters:
  • datetime_col – Timestamp column.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Tuple of (dict, ResultStatus). Dict is None when status is not OK. Dict has grouping_type and grouped_data keys on success.

get_job_frequency(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) tuple[JobFrequencyDTO | None, ResultStatus]

Get job submission frequency timeseries.

Parameters:
  • resolution – Time bucket size.

  • datetime_col – Timestamp column for bucketing.

Returns:

(JobFrequencyDTO, OK) on success. (dto_with_resolution, NO_RESULTS) on resolution mismatch. (None, NO_RESULTS) when no jobs match.

get_job_frequency_grouped(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[dict] | None, ResultStatus]

Get grouped job frequency timeseries.

Parameters:
  • resolution – Time bucket size.

  • datetime_col – Timestamp column.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Tuple of (list of per-group dicts, ResultStatus). List is None when status is not OK.

JobLoadDataProvider

class applications.load.providers.job_load.JobLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for job count load metrics.

Thin adapter over the job_load service. Resolves the accounting data source and workload filters, then delegates computation to the service layer.

Flat methods return aggregated timeseries DTOs. Grouped methods (*_by_group()) accept a grouping_key argument and return per-group timeseries.

Example

>>> provider = JobLoadDataProvider(workload)
>>> load = provider.get_job_load()
>>> print(f"Avg running jobs: {load.avg_running:.1f}")
>>>
>>> grouped = provider.get_job_load_by_group(grouping_key="Account")
get_job_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[JobLoadDTO | None, ResultStatus]

Get job count timeseries for RUNNING and WAITING states.

Parameters:
  • resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

Tuple of (JobLoadDTO or None, ResultStatus).

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> load, status = provider.get_job_load()
>>> if status == ResultStatus.OK:
...     print(f"Avg running: {load.avg_running:.1f}")
get_job_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GroupedJobLoadDTO | None, ResultStatus]

Get job count load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

Tuple of (GroupedJobLoadDTO or None, ResultStatus).

Raises:

DataProviderError – If accounting filters are not configured or query fails.

JobsDataProvider

class applications.ait.providers.jobs.JobsDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for querying job records from accounting data.

Thin adapter over the jobs service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = JobsDataProvider(workload)
>>> page = provider.get_jobs_page(sort_field="Submit", limit=50)
>>> print(f"Showing {len(page.jobs)} of {page.total_count} jobs")
get_available_fields(max_fields: int = 100) JobFieldsDTO

Get priority-ordered field names from the ES mapping.

Parameters:

max_fields – Maximum number of fields to return.

Returns:

JobFieldsDTO with ordered field list and total count.

Raises:

DataProviderError – If data source is unavailable.

get_job_count() int

Count jobs matching the workload filters.

Returns:

an integer.

Raises:

DataProviderError – If accounting filters are missing or query fails.

get_jobs_page(sort_field: str | None = None, sort_order: str = 'asc', offset: int = 0, limit: int = 25) tuple[JobsPageDTO | None, ResultStatus]

Fetch a paginated page of job records.

Parameters:
  • sort_field – Field to sort by.

  • sort_order – Sort direction (“asc” or “desc”).

  • offset – Number of records to skip.

  • limit – Number of records to return.

Returns:

Tuple of (JobsPageDTO, ResultStatus). DTO is None when status is not OK.

Raises:

DataProviderError – If accounting filters are missing or query fails.

KpiDataProvider

class applications.kpi.providers.kpi.KpiDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for KPI dashboard metrics.

Thin facade over the KPI service layer. Resolves the accounting data source and workload filters, then delegates to individual services.

Example

>>> provider = KpiDataProvider(workload)
>>> logs = provider.get_logs_metrics()
>>> cost = provider.get_cost_metrics()
get_carbon_footprint() tuple[float | None, ResultStatus]

Get CO2 emissions.

Returns:

Tuple of (total_carbon_footprint_kg or None, ResultStatus). Total carbon footprint in kg of CO2.

Raises:

DataProviderError – If carbon footprint computation fails.

get_core_stats() tuple[HardwareStatsDTO | None, ResultStatus]

Get core count statistics (min/max/avg).

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

HardwareStatsDTO for cores over the period.

get_cost_metrics() tuple[float | None, ResultStatus]

Get total cost.

Returns:

Tuple of (total_cost or None, ResultStatus).

Raises:

DataProviderError – If cost computation fails.

get_energy_metrics() tuple[EnergyMetricsDTO | None, ResultStatus]

Get power and energy consumption.

Returns:

Tuple of (EnergyMetricsDTO or None, ResultStatus).

Raises:

DataProviderError – If energy computation fails.

get_gpu_stats() tuple[HardwareStatsDTO | None, ResultStatus]

Get GPU count statistics (min/max/avg).

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

HardwareStatsDTO for GPUs over the period.

get_job_count() tuple[int | None, ResultStatus]

Get total number of jobs.

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

Job count from the logs metrics.

get_logs_metrics() tuple[LogsMetricsDTO | None, ResultStatus]

Get job count, resource hours, usage, and hardware stats.

Returns:

LogsMetricsDTO with all job-level KPI metrics.

Raises:

DataProviderError – If no data is found or no results match the filters.

get_user_count() tuple[int | None, ResultStatus]

Get number of unique users.

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

Unique user count from the logs metrics.

MemoryDataProvider

class applications.resources.providers.memory.MemoryDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for memory distribution analysis.

Example

>>> provider = MemoryDataProvider(workload)
>>> result = provider.get_memory_flat_distribution(constants.MAXRSS)
>>> grouped = provider.get_memory_grouped_distribution(constants.MAXRSS, "Account")
get_memory_flat_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS) tuple[MemoryFlatDistributionDTO | None, ResultStatus]

Get ungrouped memory distribution.

Parameters:

req_or_consumed – Memory field — MemoryAccountingField.REQMEM or MemoryAccountingField.MAX_RSS.

Returns:

Tuple of (DTO, status). DTO is None when no data is available.

get_memory_grouped_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS, grouping_type: str = 'Account', grouping_size: int = 10) tuple[MemoryGroupedDistributionDTO | None, ResultStatus]

Get grouped memory distribution.

Parameters:
  • req_or_consumed – Memory field — MemoryAccountingField.REQMEM or MemoryAccountingField.MAX_RSS.

  • grouping_type – ES field used for grouping (e.g. "Account", "UID").

  • grouping_size – Number of top groups to return.

Returns:

Tuple of (DTO, status). DTO is None when no data or groups are available.

MemoryRatioDataProvider

class applications.resources.providers.consumed_vs_requested_memory.MemoryRatioDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for consumed-vs-requested memory ratio analysis.

Example

>>> provider = MemoryRatioDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution() tuple[MemoryRatioFlatDistributionDTO | None, ResultStatus]

Get ungrouped consumed-vs-requested memory ratio distribution.

Returns:

Tuple of (MemoryRatioFlatDistributionDTO, ResultStatus). Status is NO_DATA when the MEMORYRATIO field is absent or all values are null/zero.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10) tuple[MemoryRatioGroupedDistributionDTO | None, ResultStatus]

Get grouped consumed-vs-requested memory ratio distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

Returns:

Tuple of (MemoryRatioGroupedDistributionDTO, ResultStatus). Status is NO_DATA when the MEMORYRATIO field is absent, or NO_RESULTS when grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

NodesDataProvider

class applications.resources.providers.nodes.NodesDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for node allocation distribution analysis.

Thin facade over the nodes service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = NodesDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution() tuple[NodesFlatDistributionDTO | None, ResultStatus]

Get ungrouped node allocation distribution.

Returns:

Tuple of (DTO, status). DTO is None when no jobs have a positive node allocation value (NO_DATA).

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10) tuple[NodesGroupedDistributionDTO | None, ResultStatus]

Get grouped node allocation distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

Returns:

Tuple of (DTO, status). DTO is None when no jobs have a positive node value (NO_DATA), or when grouping yields no groups (NO_RESULTS).

Raises:

DataProviderError – If the data source cannot be resolved.

NodesLoadDataProvider

class applications.load.providers.nodes_load.NodesLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for per-node job count metrics.

Thin wrapper around the nodes_load service. Resolves the accounting data source and filters from the workload, then delegates to the service.

Example

>>> provider = NodesLoadDataProvider(workload)
>>> node_counts, status = provider.get_nb_jobs_per_node()
>>> print(node_counts)
{'node001': 42, 'node002': 15, ...}
get_nb_jobs_per_node() tuple[dict[str, int] | None, ResultStatus]

Get job count per node over the workload’s date range.

Returns:

Tuple of (node_counts dict or None, ResultStatus).

Raises:

DataProviderError – If accounting filters are not configured or the Elasticsearch query fails.

OccupancyDataProvider

class applications.load.providers.occupancy.OccupancyDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for cluster cores and nodes occupancy metrics.

Thin wrapper around the occupancy service. Resolves the monitoring data source and filters from the workload, then delegates to the service.

Unlike other load providers, occupancy data comes from the monitoring data source (nodes_stats_fetch pipeline → cores_stats index), not the accounting data source.

Example

>>> provider = OccupancyDataProvider(workload)
>>> cores = provider.get_cores_occupancy(resolution="1hour")
>>> print(f"Avg allocated: {sum(cores.allocated) / len(cores.allocated):.0f}")
>>>
>>> nodes = provider.get_nodes_occupancy(resolution="1day")
>>> grouped = provider.get_cores_occupancy_by_group("Partition", grouping_size=5)
get_cores_occupancy(resolution: Resolution) tuple[CoresOccupancyDTO | None, ResultStatus]

Get cores occupancy timeseries from the monitoring index.

Parameters:

resolution – Time bucket size string (e.g. "1hour", "1day"). This is a required parameter — no default is provided because occupancy always needs an explicit resolution.

Returns:

CoresOccupancyDTO with per-state core counts and total capacity.

Raises:

DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.

Example

>>> cores = provider.get_cores_occupancy(resolution="1hour")
>>> for ts, alloc in zip(cores.timestamps, cores.allocated):
...     print(f"{ts}: {alloc:.0f} cores allocated")
get_cores_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) tuple[GroupedCoresOccupancyDTO | None, ResultStatus]

Get cores occupancy grouped by a field (e.g., Partition, Queue).

Parameters:
  • grouping_key – Elasticsearch field to group by.

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size string. Default: "1hour".

Returns:

GroupedCoresOccupancyDTO with one CoresOccupancyGroupEntryDTO per group.

Raises:

DataProviderError – If monitoring filters are not configured or query fails.

Example

>>> grouped = provider.get_cores_occupancy_by_group("Partition")
>>> for g in grouped.groups:
...     print(g.group, sum(g.allocated) / len(g.allocated))
get_nodes_occupancy(resolution: Resolution) tuple[NodesOccupancyDTO | None, ResultStatus]

Get nodes occupancy timeseries from the monitoring index.

Parameters:

resolution – Time bucket size string (e.g. "1hour", "1day"). Required — no default provided.

Returns:

NodesOccupancyDTO with per-state node counts and totals.

Raises:

DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.

Example

>>> nodes = provider.get_nodes_occupancy(resolution="1day")
>>> for ts, avail in zip(nodes.timestamps, nodes.available):
...     print(f"{ts}: {avail:.0f} nodes available")
get_nodes_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) tuple[GroupedNodesOccupancyDTO | None, ResultStatus]

Get nodes occupancy grouped by a field (e.g., Partition, Queue).

Parameters:
  • grouping_key – Elasticsearch field to group by.

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size string. Default: "1hour".

Returns:

GroupedNodesOccupancyDTO with one NodesOccupancyGroupEntryDTO per group.

Raises:

DataProviderError – If monitoring filters are not configured or query fails.

Example

>>> grouped = provider.get_nodes_occupancy_by_group("Partition")
>>> for g in grouped.groups:
...     print(g.group, sum(g.available) / len(g.available))

SlowdownDataProvider

class applications.throughput.providers.slowdown.SlowdownDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for slowdown metrics.

Thin facade over the slowdown service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = SlowdownDataProvider(workload)
>>> dist_dto, status = provider.get_slowdown_distribution()
>>> stats_dto, status = provider.get_slowdown_stats()
get_slowdown_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[SlowdownDistributionDTO | None, ResultStatus]

Get slowdown PDF/CDF distribution.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

Tuple of (SlowdownDistributionDTO, ResultStatus.OK) on success, or (None, ResultStatus.NO_DATA/NO_RESULTS) when no data is available.

get_slowdown_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[SlowdownStatsDTO | None, ResultStatus]

Get slowdown descriptive statistics.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

Tuple of (SlowdownStatsDTO, ResultStatus.OK) on success, or (None, ResultStatus.NO_DATA/NO_RESULTS) when no data is available.

StateDataProvider

class applications.state.providers.state.StateDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for job state distribution analysis.

Thin adapter over the state service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = StateDataProvider(workload)
>>> status = provider.get_jobs_status()
>>> for entry in status.entries:
...     print(f"{entry.state}: {entry.count}")
get_jobs_status(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) tuple[JobsStatusDTO | None, ResultStatus]

Get flat job status distribution (one entry per state).

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

Returns:

Tuple of (JobsStatusDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

get_jobs_status_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) tuple[GroupedJobsStatusDTO | None, ResultStatus]

Get grouped job status distribution (per-state, per-group breakdown).

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

  • grouping_type – Field to group by (e.g. “Account”, “UID”, “GID”).

  • grouping_size – Maximum number of groups (excess goes to “Others”).

Returns:

Tuple of (GroupedJobsStatusDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

get_jobs_status_yms(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) tuple[JobsStatusYmsDTO | None, ResultStatus]

Get flat year-month state distribution.

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

Returns:

Tuple of (JobsStatusYmsDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

get_jobs_status_yms_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) tuple[GroupedJobsStatusYmsDTO | None, ResultStatus]

Get grouped year-month state distribution.

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

  • grouping_type – Field to group by.

  • grouping_size – Maximum number of groups.

Returns:

Tuple of (GroupedJobsStatusYmsDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

SubmitDateDataProvider

class applications.throughput.providers.submit_date.SubmitDateDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for submission date metrics.

Thin facade over the submit_date service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = SubmitDateDataProvider(workload)
>>> dto, status = provider.get_submit_hour()
get_submit_hour() tuple[SubmitHourDTO | None, ResultStatus]

Get hour-of-day submission histogram.

Returns:

(SubmitHourDTO, OK) with hour values (0–23) and job counts per hour. (None, NO_RESULTS) when no jobs match the filters.

get_submit_hour_grouped(grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[dict] | None, ResultStatus]

Get hour-of-day submission histogram broken down by a grouping dimension.

Parameters:
  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Tuple of (list of per-group dicts, ResultStatus). List is None when status is not OK.

get_submit_weekday() tuple[SubmitWeekdayDTO | None, ResultStatus]

Get day-of-week submission histogram.

Returns:

(SubmitWeekdayDTO, OK) with weekday names (Monday–Sunday) and job counts. (None, NO_RESULTS) when no jobs match the filters.

get_submit_weekday_grouped(grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[dict] | None, ResultStatus]

Get day-of-week submission histogram broken down by a grouping dimension.

Parameters:
  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Tuple of (list of per-group dicts, ResultStatus). List is None when status is not OK.

WaitTimeDataProvider

class applications.throughput.providers.wait_time.WaitTimeDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for wait time metrics.

Thin facade over the wait_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = WaitTimeDataProvider(workload)
>>> dist, status = provider.get_wait_time_distribution()
>>> stats, status = provider.get_wait_time_stats()
get_wait_time_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[WaitTimeDistributionDTO | None, ResultStatus]

Get wait time PDF/CDF distribution.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

(dto, ResultStatus.OK) with a DataFrame of [value, frequency, cdf], or (None, ResultStatus.NO_RESULTS) when no jobs match the filters.

get_wait_time_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[WaitTimeStatsDTO | None, ResultStatus]

Get wait time descriptive statistics.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

(dto, ResultStatus.OK) with descriptive statistics (data, columns, median), or (None, ResultStatus.NO_RESULTS) when no jobs match the filters.