Data Providers
Note
All get_* methods apply workload filters and date range automatically
unless stated otherwise.
ClusterLoadDataProvider
- class applications.load.providers.cluster_load.ClusterLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for cluster load metrics (cores, cost, CO₂, power).
Thin adapter over the cluster_load service. Resolves the accounting data source, workload filters, and hardware configuration from the workload, then delegates computation to the service layer.
Flat methods return aggregated timeseries DTOs. Grouped methods (
*_by_group()) accept agrouping_keyargument and return per-group timeseries.Example
>>> provider = ClusterLoadDataProvider(workload) >>> load = provider.get_core_load() >>> print(f"Utilization: {load.utilization_rate:.2%}") >>> >>> daily = provider.get_core_load(resolution=Resolution.DAY) >>> grouped = provider.get_core_load_by_group(grouping_key="Account")
- get_co2_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[Co2LoadDTO | None, ResultStatus]
Get cluster CO₂ footprint timeseries (kgCO₂e).
- Parameters:
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
- Returns:
Tuple of Co2LoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_co2_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[GroupedCo2LoadDTO | None, ResultStatus]
Get CO₂ footprint grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
- Returns:
Tuple of GroupedCo2LoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_core_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[CoreLoadDTO | None, ResultStatus]
Get allocated cores timeseries with hardware capacity.
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
CoreLoadDTO with allocated/requested cores, capacity, and utilization.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> load = provider.get_core_load() >>> print(f"Utilization: {load.utilization_rate:.1%}") >>> load = provider.get_core_load(resolution=Resolution.DAY)
- get_core_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GroupedCoreLoadDTO | None, ResultStatus]
Get core load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GroupedCoreLoadDTO with one GroupEntryDTO per group.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> grouped = provider.get_core_load_by_group(grouping_key="Account") >>> for g in grouped.groups: ... print(g.group, g.avg_running)
- get_corehour_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[CorehourLoadDTO | None, ResultStatus]
Get core-hour load timeseries (allocated cores × elapsed time).
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
Tuple of CorehourLoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_cost_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[CostLoadDTO | None, ResultStatus]
Get cluster cost load timeseries.
- Parameters:
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
- Returns:
Tuple of CostLoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_cost_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[GroupedCostLoadDTO | None, ResultStatus]
Get cost load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
- Returns:
Tuple of GroupedCostLoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_power_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[PowerLoadDTO | None, ResultStatus]
Get cluster power consumption timeseries (watts).
- Parameters:
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
- Returns:
Tuple of PowerLoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_power_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR) tuple[GroupedPowerLoadDTO | None, ResultStatus]
Get power consumption grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
- Returns:
Tuple of GroupedPowerLoadDTO (or None if no data) and ResultStatus.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
ConcurrentUsersDataProvider
- class applications.concusers.providers.concurrent_users.ConcurrentUsersDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for concurrent users metrics.
Thin facade over the submit_users and runwait_users services. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = ConcurrentUsersDataProvider(workload) >>> dto, status = provider.get_submit_users()
- get_running_and_waiting_users(resolution: ComputeLoadResolution = ComputeLoadResolution.DAY) tuple[RunningAndWaitingUsersDTO | None, ResultStatus]
Get distinct user counts for jobs that are running or waiting.
- Parameters:
resolution – Time resolution string (e.g.
"1hour","1day").- Returns:
Tuple of (RunningAndWaitingUsersDTO, ResultStatus). DTO is None when status is not OK.
- Raises:
DataProviderError – If accounting filters are missing or query fails.
- get_submit_users(resolution: ComputeLoadResolution = ComputeLoadResolution.DAY) tuple[SubmitUserCountsDTO | None, ResultStatus]
Get distinct user counts bucketed by job submit time.
- Parameters:
resolution – Time resolution string (e.g.
"1hour","1day").- Returns:
Tuple of (SubmitUserCountsDTO, ResultStatus). DTO is None when status is not OK.
- Raises:
DataProviderError – If accounting filters are missing or query fails.
CongestionDataProvider
- class applications.congestion.providers.congestion.CongestionDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for congestion/contention analysis.
Thin adapter over the congestion service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = CongestionDataProvider(workload) >>> metrics = provider.get_congestion_metrics() >>> print(f"Avg running: {metrics.avg_running_pct:.1f}%")
- get_congestion_metrics(data_type: DataType = DataType.CORE, resolution: ComputeLoadResolution = ComputeLoadResolution.DAY) tuple[CongestionMetricsDTO | None, ResultStatus]
Compute congestion metrics for the workload.
- Parameters:
data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.resolution – Time bucket size (e.g. “1day”, “1hour”).
- Returns:
Tuple of (CongestionMetricsDTO, ResultStatus). DTO is None when status is not OK.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
ConsumersDataProvider
- class applications.consumers.providers.consumers.ConsumersDataProvider(workload: Workload)
Bases:
BaseDataProviderProvide consumers group/detail metrics for a given workload.
Thin facade over the consumers services. Resolves the accounting data source and workload filters, then delegates to the service layer. Wraps unexpected exceptions from the service into
DataProviderError.Examples
>>> provider = ConsumersDataProvider(workload) >>> dto, status = provider.get_group_aggregation("Account")
- get_detail_stats(group: str, group_obj_name: str, subgroup: str | None = None, subgroup_obj_name: str | None = None) tuple[DetailStatsDTO | None, ResultStatus]
Per-field stats for jobs in a single group (and optionally subgroup).
- Parameters:
group – Group field name.
group_obj_name – Value of
groupto filter on.subgroup – Optional secondary field name.
subgroup_obj_name – Optional value of
subgroupto filter on.
- Returns:
Tuple of (DetailStatsDTO, ResultStatus). DTO is
Nonewhen status is not OK.- Raises:
DataProviderError – If accounting filters are missing or the query fails.
- get_group_aggregation(group: str, subgroup: str | None = None) tuple[GroupAggregationDTO | None, ResultStatus]
Aggregate job metrics by
group(and optionallysubgroup).- Parameters:
group – Field name to aggregate on (e.g.
"Account").subgroup – Optional secondary field name (e.g.
"User").
- Returns:
Tuple of (GroupAggregationDTO, ResultStatus). DTO is
Nonewhen status is not OK.- Raises:
DataProviderError – If accounting filters are missing or the query fails.
CoresDataProvider
- class applications.resources.providers.cores.CoresDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for core allocation distribution analysis.
Thin facade over the cores service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = CoresDataProvider(workload) >>> dto, status = provider.get_flat_distribution() >>> dto, status = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution() tuple[CoresFlatDistributionDTO | None, ResultStatus]
Get ungrouped core allocation distribution.
- Returns:
Tuple of
(dto, ResultStatus.OK)with per-bin statistics and extended descriptive stats, or(None, ResultStatus.NO_DATA)when no jobs have allocated cores.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10) tuple[CoresGroupedDistributionDTO | None, ResultStatus]
Get grouped core allocation distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
- Returns:
Tuple of
(dto, ResultStatus.OK)with per-group statistics,(None, ResultStatus.NO_DATA)when no jobs have allocated cores, or(None, ResultStatus.NO_RESULTS)when the grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
CoresMemoryDataProvider
- class applications.resources.providers.cores_memory.CoresMemoryDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for cores-vs-memory matrix analysis.
Example
>>> provider = CoresMemoryDataProvider(workload) >>> result = provider.get_flat_matrix(constants.MAXRSS) >>> result = provider.get_grouped_matrix(constants.MAXRSS, grouping_type="uid")
- get_flat_matrix(req_or_consumed: str = 'MaxRSS') tuple[CoresMemoryFlatMatrixDTO | None, ResultStatus]
Get ungrouped cores-vs-memory 2D matrix.
- Parameters:
req_or_consumed – Memory field —
constants.REQMEMorconstants.MAXRSS.- Returns:
Tuple of
(dto, ResultStatus.OK)on success, or(None, ResultStatus.NO_DATA)when the memory field is absent or no jobs have allocated cores/memory.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_matrix(req_or_consumed: str = 'MaxRSS', grouping_type: str = 'uid', grouping_size: int = 10) tuple[CoresMemoryGroupedMatrixDTO | None, ResultStatus]
Get grouped cores-vs-memory 2D matrix.
- Parameters:
req_or_consumed – Memory field —
constants.REQMEMorconstants.MAXRSS.grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return.
- Returns:
Tuple of
(dto, ResultStatus.OK)on success,(None, ResultStatus.NO_DATA)when the memory field is absent, or(None, ResultStatus.NO_RESULTS)when grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
ExecTimeDataProvider
- class applications.throughput.providers.exec_time.ExecTimeDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for execution time metrics.
Thin facade over the exec_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = ExecTimeDataProvider(workload) >>> result = provider.get_exec_time()
- get_exec_time(data_type: DataType = DataType.CORE) tuple[ExecTimeDistributionDTO | None, ResultStatus]
Get execution time histogram.
- Parameters:
data_type – Resource dimension — CPU or GPU.
- Returns:
(dto, ResultStatus.OK)with bins and resource-hour sums, or(None, ResultStatus.NO_RESULTS)when no jobs match the filters.
- get_exec_time_grouped(data_type: DataType = DataType.CORE, grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[GroupedExecTimeEntryDTO] | None, ResultStatus]
Get grouped execution time histogram.
- Parameters:
data_type – Resource dimension — CPU or GPU.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Tuple of (list of GroupedExecTimeEntryDTO, ResultStatus). List is None when status is not OK.
- get_exec_time_vs_timelimit(config: ExecTimeVsTimelimitConfig | None = None) tuple[ExecTimeVsTimelimitDTO | None, ResultStatus]
Get execution time vs timelimit ratio distribution.
- Parameters:
config – Override configuration. When None, loads from DB.
- Returns:
(dto, ResultStatus.OK)with categorized ratio data, or(None, ResultStatus.NO_DATA)when the Timelimit field is absent, or(None, ResultStatus.NO_RESULTS)when no jobs match the filters.
- get_exec_time_vs_timelimit_grouped(config: ExecTimeVsTimelimitConfig | None = None, grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[GroupedExecTimeVsTimelimitEntryDTO] | None, ResultStatus]
Get grouped execution time vs timelimit ratio distribution.
- Parameters:
config – Override configuration. When None, loads from DB.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Tuple of (list of GroupedExecTimeVsTimelimitEntryDTO, ResultStatus). List is None when status is not OK.
GpuDataProvider
- class applications.resources.providers.gpu.GpuDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for GPU distribution analysis.
Thin facade over the GPU service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = GpuDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution(gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) tuple[GpuFlatDistributionDTO | None, ResultStatus]
Get ungrouped GPU distribution.
- Parameters:
gpu_field – ES field name —
GpuAccountingField.ALLOC_GPUSorGpuAccountingField.REQ_GPUS.- Returns:
Tuple of (
GpuFlatDistributionDTO,ResultStatus). Status isNO_DATAwhen the field is absent or no jobs have a positive GPU value.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10, gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) tuple[GpuGroupedDistributionDTO | None, ResultStatus]
Get grouped GPU distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
gpu_field – ES field name —
GpuAccountingField.ALLOC_GPUSorGpuAccountingField.REQ_GPUS.
- Returns:
Tuple of (
GpuGroupedDistributionDTO,ResultStatus). Status isNO_DATAwhen the field is absent or no jobs have a positive GPU value, orNO_RESULTSwhen the grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
GpuLoadDataProvider
- class applications.load.providers.gpu_load.GpuLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for GPU load metrics.
Thin adapter over the gpu_load service. Resolves the accounting data source, workload filters, and hardware GPU configuration from the workload, then delegates computation to the service layer.
Flat methods return aggregated timeseries DTOs. Grouped methods (
*_by_group()) accept agrouping_keyargument and return per-group timeseries.Example
>>> provider = GpuLoadDataProvider(workload) >>> load = provider.get_gpu_load() >>> print(f"GPU Utilization: {load.utilization_rate:.2%}") >>> >>> grouped = provider.get_gpu_load_by_group(grouping_key="Account")
- get_gpu_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GpuLoadDTO | None, ResultStatus]
Get allocated GPUs timeseries with hardware capacity.
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GpuLoadDTO with allocated/requested GPUs, capacity, and utilization.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> load = provider.get_gpu_load() >>> print(f"GPU Utilization: {load.utilization_rate:.1%}")
- get_gpu_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GroupedGpuLoadDTO | None, ResultStatus]
Get GPU load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GroupedGpuLoadDTO with one GroupEntryDTO per group.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- get_gpuhour_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GpuhourLoadDTO | None, ResultStatus]
Get GPU-hour load timeseries (allocated GPUs × elapsed time).
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GpuhourLoadDTO with GPU-hour accounting fields.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
JobFrequencyDataProvider
- class applications.throughput.providers.job_freq.JobFrequencyDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for job frequency and interarrival metrics.
Thin facade over the job_freq service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = JobFrequencyDataProvider(workload) >>> result = provider.get_job_frequency()
- get_interarrival(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) tuple[dict | None, ResultStatus]
Get job interarrival time statistics.
- Parameters:
datetime_col – Timestamp column.
- Returns:
(interarrival_dict, OK)on success, where the dict hasdata,columns, andmediankeys.(None, NO_DATA)when the index does not exist.(None, NO_RESULTS)when no jobs match.
- get_interarrival_grouped(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) tuple[dict | None, ResultStatus]
Get grouped interarrival time statistics.
- Parameters:
datetime_col – Timestamp column.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Tuple of (dict, ResultStatus). Dict is None when status is not OK. Dict has
grouping_typeandgrouped_datakeys on success.
- get_job_frequency(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) tuple[JobFrequencyDTO | None, ResultStatus]
Get job submission frequency timeseries.
- Parameters:
resolution – Time bucket size.
datetime_col – Timestamp column for bucketing.
- Returns:
(JobFrequencyDTO, OK)on success.(dto_with_resolution, NO_RESULTS)on resolution mismatch.(None, NO_RESULTS)when no jobs match.
- get_job_frequency_grouped(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[dict] | None, ResultStatus]
Get grouped job frequency timeseries.
- Parameters:
resolution – Time bucket size.
datetime_col – Timestamp column.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Tuple of (list of per-group dicts, ResultStatus). List is None when status is not OK.
JobLoadDataProvider
- class applications.load.providers.job_load.JobLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for job count load metrics.
Thin adapter over the job_load service. Resolves the accounting data source and workload filters, then delegates computation to the service layer.
Flat methods return aggregated timeseries DTOs. Grouped methods (
*_by_group()) accept agrouping_keyargument and return per-group timeseries.Example
>>> provider = JobLoadDataProvider(workload) >>> load = provider.get_job_load() >>> print(f"Avg running jobs: {load.avg_running:.1f}") >>> >>> grouped = provider.get_job_load_by_group(grouping_key="Account")
- get_job_load(resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[JobLoadDTO | None, ResultStatus]
Get job count timeseries for RUNNING and WAITING states.
- Parameters:
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
Tuple of (JobLoadDTO or None, ResultStatus).
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> load, status = provider.get_job_load() >>> if status == ResultStatus.OK: ... print(f"Avg running: {load.avg_running:.1f}")
- get_job_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: ComputeLoadResolution = ComputeLoadResolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) tuple[GroupedJobLoadDTO | None, ResultStatus]
Get job count load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: ComputeLoadResolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
Tuple of (GroupedJobLoadDTO or None, ResultStatus).
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
JobsDataProvider
- class applications.ait.providers.jobs.JobsDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for querying job records from accounting data.
Thin adapter over the jobs service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = JobsDataProvider(workload) >>> page = provider.get_jobs_page(sort_field="Submit", limit=50) >>> print(f"Showing {len(page.jobs)} of {page.total_count} jobs")
- get_available_fields(max_fields: int = 100) JobFieldsDTO
Get priority-ordered field names from the ES mapping.
- Parameters:
max_fields – Maximum number of fields to return.
- Returns:
JobFieldsDTO with ordered field list and total count.
- Raises:
DataProviderError – If data source is unavailable.
- get_job_count() int
Count jobs matching the workload filters.
- Returns:
an integer.
- Raises:
DataProviderError – If accounting filters are missing or query fails.
- get_jobs_page(sort_field: str | None = None, sort_order: str = 'asc', offset: int = 0, limit: int = 25) tuple[JobsPageDTO | None, ResultStatus]
Fetch a paginated page of job records.
- Parameters:
sort_field – Field to sort by.
sort_order – Sort direction (“asc” or “desc”).
offset – Number of records to skip.
limit – Number of records to return.
- Returns:
Tuple of (JobsPageDTO, ResultStatus). DTO is None when status is not OK.
- Raises:
DataProviderError – If accounting filters are missing or query fails.
KpiDataProvider
- class applications.kpi.providers.kpi.KpiDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for KPI dashboard metrics.
Thin facade over the KPI service layer. Resolves the accounting data source and workload filters, then delegates to individual services.
Example
>>> provider = KpiDataProvider(workload) >>> logs = provider.get_logs_metrics() >>> cost = provider.get_cost_metrics()
- get_carbon_footprint() tuple[float | None, ResultStatus]
Get CO2 emissions.
- Returns:
Tuple of (total_carbon_footprint_kg or None, ResultStatus). Total carbon footprint in kg of CO2.
- Raises:
DataProviderError – If carbon footprint computation fails.
- get_core_stats() tuple[HardwareStatsDTO | None, ResultStatus]
Get core count statistics (min/max/avg).
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
HardwareStatsDTO for cores over the period.
- get_cost_metrics() tuple[float | None, ResultStatus]
Get total cost.
- Returns:
Tuple of (total_cost or None, ResultStatus).
- Raises:
DataProviderError – If cost computation fails.
- get_energy_metrics() tuple[EnergyMetricsDTO | None, ResultStatus]
Get power and energy consumption.
- Returns:
Tuple of (EnergyMetricsDTO or None, ResultStatus).
- Raises:
DataProviderError – If energy computation fails.
- get_gpu_stats() tuple[HardwareStatsDTO | None, ResultStatus]
Get GPU count statistics (min/max/avg).
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
HardwareStatsDTO for GPUs over the period.
- get_job_count() tuple[int | None, ResultStatus]
Get total number of jobs.
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
Job count from the logs metrics.
- get_logs_metrics() tuple[LogsMetricsDTO | None, ResultStatus]
Get job count, resource hours, usage, and hardware stats.
- Returns:
LogsMetricsDTO with all job-level KPI metrics.
- Raises:
DataProviderError – If no data is found or no results match the filters.
- get_user_count() tuple[int | None, ResultStatus]
Get number of unique users.
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
Unique user count from the logs metrics.
MemoryDataProvider
- class applications.resources.providers.memory.MemoryDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for memory distribution analysis.
Example
>>> provider = MemoryDataProvider(workload) >>> result = provider.get_memory_flat_distribution(constants.MAXRSS) >>> grouped = provider.get_memory_grouped_distribution(constants.MAXRSS, "Account")
- get_memory_flat_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS) tuple[MemoryFlatDistributionDTO | None, ResultStatus]
Get ungrouped memory distribution.
- Parameters:
req_or_consumed – Memory field —
MemoryAccountingField.REQMEMorMemoryAccountingField.MAX_RSS.- Returns:
Tuple of (DTO, status). DTO is
Nonewhen no data is available.
- get_memory_grouped_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS, grouping_type: str = 'Account', grouping_size: int = 10) tuple[MemoryGroupedDistributionDTO | None, ResultStatus]
Get grouped memory distribution.
- Parameters:
req_or_consumed – Memory field —
MemoryAccountingField.REQMEMorMemoryAccountingField.MAX_RSS.grouping_type – ES field used for grouping (e.g.
"Account","UID").grouping_size – Number of top groups to return.
- Returns:
Tuple of (DTO, status). DTO is
Nonewhen no data or groups are available.
MemoryRatioDataProvider
- class applications.resources.providers.consumed_vs_requested_memory.MemoryRatioDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for consumed-vs-requested memory ratio analysis.
Example
>>> provider = MemoryRatioDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution() tuple[MemoryRatioFlatDistributionDTO | None, ResultStatus]
Get ungrouped consumed-vs-requested memory ratio distribution.
- Returns:
Tuple of (
MemoryRatioFlatDistributionDTO,ResultStatus). Status isNO_DATAwhen the MEMORYRATIO field is absent or all values are null/zero.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10) tuple[MemoryRatioGroupedDistributionDTO | None, ResultStatus]
Get grouped consumed-vs-requested memory ratio distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
- Returns:
Tuple of (
MemoryRatioGroupedDistributionDTO,ResultStatus). Status isNO_DATAwhen the MEMORYRATIO field is absent, orNO_RESULTSwhen grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
NodesDataProvider
- class applications.resources.providers.nodes.NodesDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for node allocation distribution analysis.
Thin facade over the nodes service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = NodesDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution() tuple[NodesFlatDistributionDTO | None, ResultStatus]
Get ungrouped node allocation distribution.
- Returns:
Tuple of (DTO, status). DTO is
Nonewhen no jobs have a positive node allocation value (NO_DATA).- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10) tuple[NodesGroupedDistributionDTO | None, ResultStatus]
Get grouped node allocation distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
- Returns:
Tuple of (DTO, status). DTO is
Nonewhen no jobs have a positive node value (NO_DATA), or when grouping yields no groups (NO_RESULTS).- Raises:
DataProviderError – If the data source cannot be resolved.
NodesLoadDataProvider
- class applications.load.providers.nodes_load.NodesLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for per-node job count metrics.
Thin wrapper around the nodes_load service. Resolves the accounting data source and filters from the workload, then delegates to the service.
Example
>>> provider = NodesLoadDataProvider(workload) >>> node_counts, status = provider.get_nb_jobs_per_node() >>> print(node_counts) {'node001': 42, 'node002': 15, ...}
- get_nb_jobs_per_node() tuple[dict[str, int] | None, ResultStatus]
Get job count per node over the workload’s date range.
- Returns:
Tuple of (node_counts dict or None, ResultStatus).
- Raises:
DataProviderError – If accounting filters are not configured or the Elasticsearch query fails.
OccupancyDataProvider
- class applications.load.providers.occupancy.OccupancyDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for cluster cores and nodes occupancy metrics.
Thin wrapper around the occupancy service. Resolves the monitoring data source and filters from the workload, then delegates to the service.
Unlike other load providers, occupancy data comes from the monitoring data source (nodes_stats_fetch pipeline → cores_stats index), not the accounting data source.
Example
>>> provider = OccupancyDataProvider(workload) >>> cores = provider.get_cores_occupancy(resolution="1hour") >>> print(f"Avg allocated: {sum(cores.allocated) / len(cores.allocated):.0f}") >>> >>> nodes = provider.get_nodes_occupancy(resolution="1day") >>> grouped = provider.get_cores_occupancy_by_group("Partition", grouping_size=5)
- get_cores_occupancy(resolution: Resolution) tuple[CoresOccupancyDTO | None, ResultStatus]
Get cores occupancy timeseries from the monitoring index.
- Parameters:
resolution – Time bucket size string (e.g.
"1hour","1day"). This is a required parameter — no default is provided because occupancy always needs an explicit resolution.- Returns:
CoresOccupancyDTO with per-state core counts and total capacity.
- Raises:
DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.
Example
>>> cores = provider.get_cores_occupancy(resolution="1hour") >>> for ts, alloc in zip(cores.timestamps, cores.allocated): ... print(f"{ts}: {alloc:.0f} cores allocated")
- get_cores_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) tuple[GroupedCoresOccupancyDTO | None, ResultStatus]
Get cores occupancy grouped by a field (e.g., Partition, Queue).
- Parameters:
grouping_key – Elasticsearch field to group by.
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size string. Default:
"1hour".
- Returns:
GroupedCoresOccupancyDTO with one CoresOccupancyGroupEntryDTO per group.
- Raises:
DataProviderError – If monitoring filters are not configured or query fails.
Example
>>> grouped = provider.get_cores_occupancy_by_group("Partition") >>> for g in grouped.groups: ... print(g.group, sum(g.allocated) / len(g.allocated))
- get_nodes_occupancy(resolution: Resolution) tuple[NodesOccupancyDTO | None, ResultStatus]
Get nodes occupancy timeseries from the monitoring index.
- Parameters:
resolution – Time bucket size string (e.g.
"1hour","1day"). Required — no default provided.- Returns:
NodesOccupancyDTO with per-state node counts and totals.
- Raises:
DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.
Example
>>> nodes = provider.get_nodes_occupancy(resolution="1day") >>> for ts, avail in zip(nodes.timestamps, nodes.available): ... print(f"{ts}: {avail:.0f} nodes available")
- get_nodes_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) tuple[GroupedNodesOccupancyDTO | None, ResultStatus]
Get nodes occupancy grouped by a field (e.g., Partition, Queue).
- Parameters:
grouping_key – Elasticsearch field to group by.
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size string. Default:
"1hour".
- Returns:
GroupedNodesOccupancyDTO with one NodesOccupancyGroupEntryDTO per group.
- Raises:
DataProviderError – If monitoring filters are not configured or query fails.
Example
>>> grouped = provider.get_nodes_occupancy_by_group("Partition") >>> for g in grouped.groups: ... print(g.group, sum(g.available) / len(g.available))
SlowdownDataProvider
- class applications.throughput.providers.slowdown.SlowdownDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for slowdown metrics.
Thin facade over the slowdown service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = SlowdownDataProvider(workload) >>> dist_dto, status = provider.get_slowdown_distribution() >>> stats_dto, status = provider.get_slowdown_stats()
- get_slowdown_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[SlowdownDistributionDTO | None, ResultStatus]
Get slowdown PDF/CDF distribution.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
Tuple of (SlowdownDistributionDTO, ResultStatus.OK) on success, or (None, ResultStatus.NO_DATA/NO_RESULTS) when no data is available.
- get_slowdown_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[SlowdownStatsDTO | None, ResultStatus]
Get slowdown descriptive statistics.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
Tuple of (SlowdownStatsDTO, ResultStatus.OK) on success, or (None, ResultStatus.NO_DATA/NO_RESULTS) when no data is available.
StateDataProvider
- class applications.state.providers.state.StateDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for job state distribution analysis.
Thin adapter over the state service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = StateDataProvider(workload) >>> status = provider.get_jobs_status() >>> for entry in status.entries: ... print(f"{entry.state}: {entry.count}")
- get_jobs_status(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) tuple[JobsStatusDTO | None, ResultStatus]
Get flat job status distribution (one entry per state).
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.
- Returns:
Tuple of (JobsStatusDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
- get_jobs_status_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) tuple[GroupedJobsStatusDTO | None, ResultStatus]
Get grouped job status distribution (per-state, per-group breakdown).
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.grouping_type – Field to group by (e.g. “Account”, “UID”, “GID”).
grouping_size – Maximum number of groups (excess goes to “Others”).
- Returns:
Tuple of (GroupedJobsStatusDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
- get_jobs_status_yms(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) tuple[JobsStatusYmsDTO | None, ResultStatus]
Get flat year-month state distribution.
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.
- Returns:
Tuple of (JobsStatusYmsDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
- get_jobs_status_yms_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) tuple[GroupedJobsStatusYmsDTO | None, ResultStatus]
Get grouped year-month state distribution.
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.grouping_type – Field to group by.
grouping_size – Maximum number of groups.
- Returns:
Tuple of (GroupedJobsStatusYmsDTO, ResultStatus.OK) on success, or (None, ResultStatus) when no data is available.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
SubmitDateDataProvider
- class applications.throughput.providers.submit_date.SubmitDateDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for submission date metrics.
Thin facade over the submit_date service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = SubmitDateDataProvider(workload) >>> dto, status = provider.get_submit_hour()
- get_submit_hour() tuple[SubmitHourDTO | None, ResultStatus]
Get hour-of-day submission histogram.
- Returns:
(SubmitHourDTO, OK)with hour values (0–23) and job counts per hour.(None, NO_RESULTS)when no jobs match the filters.
- get_submit_hour_grouped(grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[dict] | None, ResultStatus]
Get hour-of-day submission histogram broken down by a grouping dimension.
- Parameters:
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Tuple of (list of per-group dicts, ResultStatus). List is None when status is not OK.
- get_submit_weekday() tuple[SubmitWeekdayDTO | None, ResultStatus]
Get day-of-week submission histogram.
- Returns:
(SubmitWeekdayDTO, OK)with weekday names (Monday–Sunday) and job counts.(None, NO_RESULTS)when no jobs match the filters.
- get_submit_weekday_grouped(grouping_type: str = 'Account', grouping_size: int = 10) tuple[list[dict] | None, ResultStatus]
Get day-of-week submission histogram broken down by a grouping dimension.
- Parameters:
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Tuple of (list of per-group dicts, ResultStatus). List is None when status is not OK.
WaitTimeDataProvider
- class applications.throughput.providers.wait_time.WaitTimeDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for wait time metrics.
Thin facade over the wait_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = WaitTimeDataProvider(workload) >>> dist, status = provider.get_wait_time_distribution() >>> stats, status = provider.get_wait_time_stats()
- get_wait_time_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[WaitTimeDistributionDTO | None, ResultStatus]
Get wait time PDF/CDF distribution.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
(dto, ResultStatus.OK)with a DataFrame of [value, frequency, cdf], or(None, ResultStatus.NO_RESULTS)when no jobs match the filters.
- get_wait_time_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) tuple[WaitTimeStatsDTO | None, ResultStatus]
Get wait time descriptive statistics.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
(dto, ResultStatus.OK)with descriptive statistics (data, columns, median), or(None, ResultStatus.NO_RESULTS)when no jobs match the filters.