Data Providers
Note
All get_* methods apply workload filters and date range automatically
unless stated otherwise.
ClusterLoadDataProvider
- class applications.load.providers.cluster_load.ClusterLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for cluster load metrics (cores, cost, CO₂, power).
Thin adapter over the cluster_load service. Resolves the accounting data source, workload filters, and hardware configuration from the workload, then delegates computation to the service layer.
Flat methods return aggregated timeseries DTOs. Grouped methods (
*_by_group()) accept agrouping_keyargument and return per-group timeseries.Example
>>> provider = ClusterLoadDataProvider(workload) >>> load = provider.get_core_load() >>> print(f"Utilization: {load.utilization_rate:.2%}") >>> >>> daily = provider.get_core_load(resolution=Resolution.DAY) >>> grouped = provider.get_core_load_by_group(grouping_key="Account")
- get_co2_load(resolution: Resolution = Resolution.HOUR) Co2LoadDTO
Get cluster CO₂ footprint timeseries (kgCO₂e).
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
- Returns:
Co2LoadDTO with CO₂ timeseries and statistics.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_co2_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedCo2LoadDTO
Get CO₂ footprint grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
- Returns:
GroupedCo2LoadDTO with one GroupEntryDTO per group (no waiting_values).
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_core_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) CoreLoadDTO
Get allocated cores timeseries with hardware capacity.
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
CoreLoadDTO with allocated/requested cores, capacity, and utilization.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> load = provider.get_core_load() >>> print(f"Utilization: {load.utilization_rate:.1%}") >>> load = provider.get_core_load(resolution=Resolution.DAY)
- Return type:
- get_core_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GroupedCoreLoadDTO
Get core load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GroupedCoreLoadDTO with one GroupEntryDTO per group.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> grouped = provider.get_core_load_by_group(grouping_key="Account") >>> for g in grouped.groups: ... print(g.group, g.avg_running)
- Return type:
- get_corehour_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) CorehourLoadDTO
Get core-hour load timeseries (allocated cores × elapsed time).
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
CorehourLoadDTO with core-hour accounting fields.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_cost_load(resolution: Resolution = Resolution.HOUR) CostLoadDTO
Get cluster cost load timeseries.
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
- Returns:
CostLoadDTO with cost timeseries, unit, and statistics.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_cost_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedCostLoadDTO
Get cost load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
- Returns:
GroupedCostLoadDTO with one GroupEntryDTO per group (no waiting_values).
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_power_load(resolution: Resolution = Resolution.HOUR) PowerLoadDTO
Get cluster power consumption timeseries (watts).
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
- Returns:
PowerLoadDTO with power timeseries and statistics.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_power_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedPowerLoadDTO
Get power consumption grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
- Returns:
GroupedPowerLoadDTO with one GroupEntryDTO per group (no waiting_values).
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
CongestionDataProvider
- class applications.congestion.providers.congestion.CongestionDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for congestion/contention analysis.
Thin adapter over the congestion service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = CongestionDataProvider(workload) >>> metrics = provider.get_congestion_metrics() >>> print(f"Avg running: {metrics.avg_running_pct:.1f}%")
- get_congestion_metrics(data_type: DataType = DataType.CORE, resolution: Resolution = Resolution.DAY) CongestionMetricsDTO
Compute congestion metrics for the workload.
- Parameters:
data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.resolution – Time bucket size (e.g. “1day”, “1hour”).
- Returns:
CongestionMetricsDTO with timeseries and stats.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
CoresDataProvider
- class applications.resources.providers.cores.CoresDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for core allocation distribution analysis.
Thin facade over the cores service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = CoresDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution() CoresFlatDistributionDTO
Get ungrouped core allocation distribution.
- Returns:
CoresFlatDistributionDTOwith per-bin statistics and extended descriptive stats. Status isNO_DATAwhen no jobs have allocated cores.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10) CoresGroupedDistributionDTO
Get grouped core allocation distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
- Returns:
CoresGroupedDistributionDTOwith per-group statistics. Status isNO_DATAwhen no jobs have allocated cores, orNO_RESULTSwhen the grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
CoresMemoryDataProvider
- class applications.resources.providers.cores_memory.CoresMemoryDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for cores-vs-memory matrix analysis.
Example
>>> provider = CoresMemoryDataProvider(workload) >>> result = provider.get_flat_matrix(constants.MAXRSS) >>> result = provider.get_grouped_matrix(constants.MAXRSS, grouping_type="uid")
- get_flat_matrix(req_or_consumed: str = 'MaxRSS') CoresMemoryFlatMatrixDTO
Get ungrouped cores-vs-memory 2D matrix.
- Parameters:
req_or_consumed – Memory field —
constants.REQMEMorconstants.MAXRSS.- Returns:
CoresMemoryFlatMatrixDTOwith per-bin job counts and core-hours. Status isNO_DATAwhen the memory field is absent or no jobs have allocated cores/memory.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_matrix(req_or_consumed: str = 'MaxRSS', grouping_type: str = 'uid', grouping_size: int = 10) CoresMemoryGroupedMatrixDTO
Get grouped cores-vs-memory 2D matrix.
- Parameters:
req_or_consumed – Memory field —
constants.REQMEMorconstants.MAXRSS.grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return.
- Returns:
CoresMemoryGroupedMatrixDTOwith per-group matrix data. Status isNO_DATAwhen the memory field is absent, orNO_RESULTSwhen grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
ExecTimeDataProvider
- class applications.throughput.providers.exec_time.ExecTimeDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for execution time metrics.
Thin facade over the exec_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = ExecTimeDataProvider(workload) >>> result = provider.get_exec_time()
- get_exec_time(data_type: DataType = DataType.CORE) ExecTimeDistributionDTO
Get execution time histogram.
- Parameters:
data_type – Resource dimension — CPU or GPU.
- Returns:
ExecTimeDistributionDTOwith bins and resource-hour sums.
- get_exec_time_grouped(data_type: DataType = DataType.CORE, grouping_type: str = 'Account', grouping_size: int = 10) list[GroupedExecTimeEntryDTO] | None
Get grouped execution time histogram.
- Parameters:
data_type – Resource dimension — CPU or GPU.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Dict with grouped exectime data.
- get_exec_time_vs_timelimit(config: ExecTimeVsTimelimitConfig | None = None) ExecTimeVsTimelimitDTO
Get execution time vs timelimit ratio distribution.
- Parameters:
config – Override configuration. When None, loads from DB.
- Returns:
ExecTimeVsTimelimitDTOwith categorized ratio data.
- get_exec_time_vs_timelimit_grouped(config: ExecTimeVsTimelimitConfig | None = None, grouping_type: str = 'Account', grouping_size: int = 10) list[GroupedExecTimeVsTimelimitEntryDTO] | None
Get grouped execution time vs timelimit ratio distribution.
- Parameters:
config – Override configuration. When None, loads from DB.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Dict with grouped ratio data and config.
GpuDataProvider
- class applications.resources.providers.gpu.GpuDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for GPU distribution analysis.
Thin facade over the GPU service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = GpuDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution(gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) GpuFlatDistributionDTO
Get ungrouped GPU distribution.
- Parameters:
gpu_field – ES field name —
GpuAccountingField.ALLOC_GPUSorGpuAccountingField.REQ_GPUS.- Returns:
GpuFlatDistributionDTOwith per-bin statistics and extended descriptive stats. Status isNO_DATAwhen the field is absent or no jobs have a positive GPU value.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10, gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) GpuGroupedDistributionDTO
Get grouped GPU distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
gpu_field – ES field name —
GpuAccountingField.ALLOC_GPUSorGpuAccountingField.REQ_GPUS.
- Returns:
GpuGroupedDistributionDTOwith per-group statistics. Status isNO_DATAwhen the field is absent or no jobs have a positive GPU value, orNO_RESULTSwhen the grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
GpuLoadDataProvider
- class applications.load.providers.gpu_load.GpuLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for GPU load metrics.
Thin adapter over the gpu_load service. Resolves the accounting data source, workload filters, and hardware GPU configuration from the workload, then delegates computation to the service layer.
Flat methods return aggregated timeseries DTOs. Grouped methods (
*_by_group()) accept agrouping_keyargument and return per-group timeseries.Example
>>> provider = GpuLoadDataProvider(workload) >>> load = provider.get_gpu_load() >>> print(f"GPU Utilization: {load.utilization_rate:.2%}") >>> >>> grouped = provider.get_gpu_load_by_group(grouping_key="Account")
- get_gpu_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GpuLoadDTO
Get allocated GPUs timeseries with hardware capacity.
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GpuLoadDTO with allocated/requested GPUs, capacity, and utilization.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> load = provider.get_gpu_load() >>> print(f"GPU Utilization: {load.utilization_rate:.1%}")
- Return type:
- get_gpu_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GroupedGpuLoadDTO
Get GPU load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GroupedGpuLoadDTO with one GroupEntryDTO per group.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
- get_gpuhour_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GpuhourLoadDTO
Get GPU-hour load timeseries (allocated GPUs × elapsed time).
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GpuhourLoadDTO with GPU-hour accounting fields.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
JobFrequencyDataProvider
- class applications.throughput.providers.job_freq.JobFrequencyDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for job frequency and interarrival metrics.
Thin facade over the job_freq service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = JobFrequencyDataProvider(workload) >>> result = provider.get_job_frequency()
- get_interarrival(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) InterArrivalDTO
Get job interarrival time statistics.
- Parameters:
datetime_col – Timestamp column.
- Returns:
InterArrivalDTO with descriptive stats.
- get_interarrival_grouped(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) dict
Get grouped interarrival time statistics.
- Parameters:
datetime_col – Timestamp column.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
Dict with grouped interarrival data.
- get_job_frequency(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) JobFrequencyDTO
Get job submission frequency timeseries.
- Parameters:
resolution – Time bucket size.
datetime_col – Timestamp column for bucketing.
- Returns:
JobFrequencyDTO with timeseries and stats.
- get_job_frequency_grouped(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) list
Get grouped job frequency timeseries.
- Parameters:
resolution – Time bucket size.
datetime_col – Timestamp column.
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
List of grouped frequency data dicts.
JobLoadDataProvider
- class applications.load.providers.job_load.JobLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for job count load metrics.
Thin adapter over the job_load service. Resolves the accounting data source and workload filters, then delegates computation to the service layer.
Flat methods return aggregated timeseries DTOs. Grouped methods (
*_by_group()) accept agrouping_keyargument and return per-group timeseries.Example
>>> provider = JobLoadDataProvider(workload) >>> load = provider.get_job_load() >>> print(f"Avg running jobs: {load.avg_running:.1f}") >>> >>> grouped = provider.get_job_load_by_group(grouping_key="Account")
- get_job_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) JobLoadDTO
Get job count timeseries for RUNNING and WAITING states.
- Parameters:
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
JobLoadDTO with running/waiting job counts and statistics.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
Example
>>> load = provider.get_job_load() >>> print(f"Avg running: {load.avg_running:.1f}") >>> load = provider.get_job_load(resolution=Resolution.DAY)
- Return type:
- get_job_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GroupedJobLoadDTO
Get job count load grouped by a field (e.g., Account, Partition).
- Parameters:
grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size. Default: Resolution.HOUR.
datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.
- Returns:
GroupedJobLoadDTO with one GroupEntryDTO per group.
- Raises:
DataProviderError – If accounting filters are not configured or query fails.
- Return type:
JobsDataProvider
- class applications.ait.providers.jobs.JobsDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for querying job records from accounting data.
Thin adapter over the jobs service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = JobsDataProvider(workload) >>> page = provider.get_jobs_page(sort_field="Submit", limit=50) >>> print(f"Showing {len(page.jobs)} of {page.total_count} jobs")
- get_available_fields(max_fields: int = 100) JobFieldsDTO
Get priority-ordered field names from the ES mapping.
- Parameters:
max_fields – Maximum number of fields to return.
- Returns:
JobFieldsDTO with ordered field list and total count.
- Raises:
DataProviderError – If data source is unavailable.
- get_job_count() JobCountDTO
Count jobs matching the workload filters.
- Returns:
JobCountDTO with total count.
- Raises:
DataProviderError – If accounting filters are missing or query fails.
- get_jobs_page(sort_field: str | None = None, sort_order: str = 'asc', offset: int = 0, limit: int = 25) JobsPageDTO
Fetch a paginated page of job records.
- Parameters:
sort_field – Field to sort by.
sort_order – Sort direction (“asc” or “desc”).
offset – Number of records to skip.
limit – Number of records to return.
- Returns:
JobsPageDTO with job dicts and pagination metadata.
- Raises:
DataProviderError – If accounting filters are missing or query fails.
KpiDataProvider
- class applications.kpi.providers.kpi.KpiDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for KPI dashboard metrics.
Thin facade over the KPI service layer. Resolves the accounting data source and workload filters, then delegates to individual services.
Example
>>> provider = KpiDataProvider(workload) >>> logs = provider.get_logs_metrics() >>> cost = provider.get_cost_metrics()
- get_carbon_footprint() CarbonFootprintDTO
Get CO2 emissions.
- Returns:
CarbonFootprintDTO with total carbon footprint.
- get_core_stats() HardwareStatsDTO
Get core count statistics (min/max/avg).
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
HardwareStatsDTO for cores over the period.
- get_cost_metrics() CostMetricsDTO
Get total cost.
- Returns:
CostMetricsDTO with total cost.
- get_energy_metrics() EnergyMetricsDTO
Get power and energy consumption.
- Returns:
EnergyMetricsDTO with power and/or energy results.
- get_gpu_stats() HardwareStatsDTO
Get GPU count statistics (min/max/avg).
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
HardwareStatsDTO for GPUs over the period.
- get_job_count() int
Get total number of jobs.
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
Job count from the logs metrics.
- get_logs_metrics() LogsMetricsDTO
Get job count, resource hours, usage, and hardware stats.
- Returns:
LogsMetricsDTO with all job-level KPI metrics.
- get_user_count() int
Get number of unique users.
Calls
get_logs_metrics()internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.- Returns:
Unique user count from the logs metrics.
MemoryDataProvider
- class applications.resources.providers.memory.MemoryDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for memory distribution analysis.
Example
>>> provider = MemoryDataProvider(workload) >>> result = provider.get_memory_flat_distribution(constants.MAXRSS) >>> grouped = provider.get_memory_grouped_distribution(constants.MAXRSS, "Account")
- get_memory_flat_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS) MemoryFlatDistributionDTO
Get ungrouped memory distribution.
- Parameters:
req_or_consumed – Memory field —
MemoryAccountingField.REQMEMorMemoryAccountingField.MAX_RSS.- Returns:
MemoryFlatDistributionDTOwith memory bin series and extended descriptive stats.
- get_memory_grouped_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS, grouping_type: str = 'Account', grouping_size: int = 10) MemoryGroupedDistributionDTO
Get grouped memory distribution.
- Parameters:
req_or_consumed – Memory field —
MemoryAccountingField.REQMEMorMemoryAccountingField.MAX_RSS.grouping_type – ES field used for grouping (e.g.
"Account","UID").grouping_size – Number of top groups to return.
- Returns:
MemoryGroupedDistributionDTOwith per-group per-bin job counts and grouping summary stats.
MemoryRatioDataProvider
- class applications.resources.providers.consumed_vs_requested_memory.MemoryRatioDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for consumed-vs-requested memory ratio analysis.
Example
>>> provider = MemoryRatioDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution() MemoryRatioFlatDistributionDTO
Get ungrouped consumed-vs-requested memory ratio distribution.
- Returns:
MemoryRatioFlatDistributionDTOwith per-bin job counts and extended descriptive stats. Status isNO_DATAwhen the MEMORYRATIO field is absent or all values are null/zero.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10) MemoryRatioGroupedDistributionDTO
Get grouped consumed-vs-requested memory ratio distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
- Returns:
MemoryRatioGroupedDistributionDTOwith per-group per-bin job counts. Status isNO_DATAwhen the MEMORYRATIO field is absent, orNO_RESULTSwhen grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
NodesDataProvider
- class applications.resources.providers.nodes.NodesDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for node allocation distribution analysis.
Thin facade over the nodes service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = NodesDataProvider(workload) >>> result = provider.get_flat_distribution() >>> result = provider.get_grouped_distribution(grouping_type="uid")
- get_flat_distribution() NodesFlatDistributionDTO
Get ungrouped node allocation distribution.
- Returns:
NodesFlatDistributionDTOwith per-bin statistics and extended descriptive stats. Status isNO_DATAwhen no jobs have a positive node allocation value.- Raises:
DataProviderError – If the data source cannot be resolved.
- get_grouped_distribution(grouping_type: str, grouping_size: int = 10) NodesGroupedDistributionDTO
Get grouped node allocation distribution.
- Parameters:
grouping_type – ES field to group by (e.g.
"uid","account").grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.
- Returns:
NodesGroupedDistributionDTOwith per-group statistics. Status isNO_DATAwhen no jobs have a positive node value, orNO_RESULTSwhen the grouping yields no groups.- Raises:
DataProviderError – If the data source cannot be resolved.
NodesLoadDataProvider
- class applications.load.providers.nodes_load.NodesLoadDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for per-node job count metrics.
Thin wrapper around the nodes_load service. Resolves the accounting data source and filters from the workload, then delegates to the service.
Example
>>> provider = NodesLoadDataProvider(workload) >>> counts = provider.get_nb_jobs_per_node() >>> print(counts.node_counts) {'node001': 42, 'node002': 15, ...}
- get_nb_jobs_per_node() NodesJobCountDTO
Get job count per node over the workload’s date range.
- Returns:
NodesJobCountDTO with node_counts mapping and nodes_per_line_display hint.
- Raises:
DataProviderError – If accounting filters are not configured or the Elasticsearch query fails.
- Return type:
OccupancyDataProvider
- class applications.load.providers.occupancy.OccupancyDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for cluster cores and nodes occupancy metrics.
Thin wrapper around the occupancy service. Resolves the monitoring data source and filters from the workload, then delegates to the service.
Unlike other load providers, occupancy data comes from the monitoring data source (nodes_stats_fetch pipeline → cores_stats index), not the accounting data source.
Example
>>> provider = OccupancyDataProvider(workload) >>> cores = provider.get_cores_occupancy(resolution="1hour") >>> print(f"Avg allocated: {sum(cores.allocated) / len(cores.allocated):.0f}") >>> >>> nodes = provider.get_nodes_occupancy(resolution="1day") >>> grouped = provider.get_cores_occupancy_by_group("Partition", grouping_size=5)
- get_cores_occupancy(resolution: Resolution) CoresOccupancyDTO
Get cores occupancy timeseries from the monitoring index.
- Parameters:
resolution – Time bucket size string (e.g.
"1hour","1day"). This is a required parameter — no default is provided because occupancy always needs an explicit resolution.- Returns:
CoresOccupancyDTO with per-state core counts and total capacity.
- Raises:
DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.
Example
>>> cores = provider.get_cores_occupancy(resolution="1hour") >>> for ts, alloc in zip(cores.timestamps, cores.allocated): ... print(f"{ts}: {alloc:.0f} cores allocated")
- Return type:
- get_cores_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedCoresOccupancyDTO
Get cores occupancy grouped by a field (e.g., Partition, Queue).
- Parameters:
grouping_key – Elasticsearch field to group by.
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size string. Default:
"1hour".
- Returns:
GroupedCoresOccupancyDTO with one CoresOccupancyGroupEntryDTO per group.
- Raises:
DataProviderError – If monitoring filters are not configured or query fails.
Example
>>> grouped = provider.get_cores_occupancy_by_group("Partition") >>> for g in grouped.groups: ... print(g.group, sum(g.allocated) / len(g.allocated))
- Return type:
- get_nodes_occupancy(resolution: Resolution) NodesOccupancyDTO
Get nodes occupancy timeseries from the monitoring index.
- Parameters:
resolution – Time bucket size string (e.g.
"1hour","1day"). Required — no default provided.- Returns:
NodesOccupancyDTO with per-state node counts and totals.
- Raises:
DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.
Example
>>> nodes = provider.get_nodes_occupancy(resolution="1day") >>> for ts, avail in zip(nodes.timestamps, nodes.available): ... print(f"{ts}: {avail:.0f} nodes available")
- Return type:
- get_nodes_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedNodesOccupancyDTO
Get nodes occupancy grouped by a field (e.g., Partition, Queue).
- Parameters:
grouping_key – Elasticsearch field to group by.
grouping_size – Number of top groups to show individually. Default: 10.
resolution – Time bucket size string. Default:
"1hour".
- Returns:
GroupedNodesOccupancyDTO with one NodesOccupancyGroupEntryDTO per group.
- Raises:
DataProviderError – If monitoring filters are not configured or query fails.
Example
>>> grouped = provider.get_nodes_occupancy_by_group("Partition") >>> for g in grouped.groups: ... print(g.group, sum(g.available) / len(g.available))
- Return type:
SlowdownDataProvider
- class applications.throughput.providers.slowdown.SlowdownDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for slowdown metrics.
Thin facade over the slowdown service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = SlowdownDataProvider(workload) >>> dist = provider.get_slowdown_distribution() >>> stats = provider.get_slowdown_stats()
- get_slowdown_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) SlowdownDistributionDTO
Get slowdown PDF/CDF distribution.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
SlowdownDistributionDTO with a DataFrame of [value, frequency, cdf].
- get_slowdown_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) SlowdownStatsDTO
Get slowdown descriptive statistics.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
SlowdownStatsDTO with descriptive statistics (data, columns, median).
StateDataProvider
- class applications.state.providers.state.StateDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for job state distribution analysis.
Thin adapter over the state service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = StateDataProvider(workload) >>> status = provider.get_jobs_status() >>> for entry in status.entries: ... print(f"{entry.state}: {entry.count}")
- get_jobs_status(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) JobsStatusDTO
Get flat job status distribution (one entry per state).
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.
- Returns:
JobsStatusDTO with per-state counts and totals.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
- get_jobs_status_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) GroupedJobsStatusDTO
Get grouped job status distribution (per-state, per-group breakdown).
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.grouping_type – Field to group by (e.g. “Account”, “UID”, “GID”).
grouping_size – Maximum number of groups (excess goes to “Others”).
- Returns:
GroupedJobsStatusDTO with per-state, per-group breakdowns.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
- get_jobs_status_yms(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) JobsStatusYmsDTO
Get flat year-month state distribution.
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.
- Returns:
JobsStatusYmsDTO with per-month, per-state breakdowns.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
- get_jobs_status_yms_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) GroupedJobsStatusYmsDTO
Get grouped year-month state distribution.
- Parameters:
category – What to measure per state. Use
MetricCategoryfor autocompletion — e.g.MetricCategory.CORE_HOURS.data_type – Resource dimension. Use
DataType.GPUfor GPU metrics.grouping_type – Field to group by.
grouping_size – Maximum number of groups.
- Returns:
GroupedJobsStatusYmsDTO with per-group, per-month breakdowns.
- Raises:
DataProviderError – If accounting filters are missing or computation fails.
SubmitDateDataProvider
- class applications.throughput.providers.submit_date.SubmitDateDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for submission date metrics.
Thin facade over the submit_date service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = SubmitDateDataProvider(workload) >>> result = provider.get_submit_hour()
- get_submit_hour() SubmitHourDTO
Get hour-of-day submission histogram.
- Returns:
SubmitHourDTO with hour values (0–23) and job counts per hour.
- get_submit_hour_grouped(grouping_type: str = 'Account', grouping_size: int = 10) list | None
Get hour-of-day submission histogram broken down by a grouping dimension.
- Parameters:
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
List of per-group dicts with
submit_hourandsubmit_hour_count, orNonewhen no jobs match the filters.
- get_submit_weekday() SubmitWeekdayDTO
Get day-of-week submission histogram.
- Returns:
SubmitWeekdayDTO with weekday names (Monday–Sunday) and job counts.
- get_submit_weekday_grouped(grouping_type: str = 'Account', grouping_size: int = 10) list | None
Get day-of-week submission histogram broken down by a grouping dimension.
- Parameters:
grouping_type – ES field to group by.
grouping_size – Max number of groups.
- Returns:
List of per-group dicts with
submit_weekdayandsubmit_weekday_count, orNonewhen no jobs match the filters.
WaitTimeDataProvider
- class applications.throughput.providers.wait_time.WaitTimeDataProvider(workload: Workload)
Bases:
BaseDataProviderProvider for wait time metrics.
Thin facade over the wait_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.
Example
>>> provider = WaitTimeDataProvider(workload) >>> dist = provider.get_wait_time_distribution() >>> stats = provider.get_wait_time_stats()
- get_wait_time_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) WaitTimeDistributionDTO
Get wait time PDF/CDF distribution.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
WaitTimeDistributionDTO with a DataFrame of [value, frequency, cdf].
- get_wait_time_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) WaitTimeStatsDTO
Get wait time descriptive statistics.
- Parameters:
datetime_col – Timestamp column (SUBMIT or ELIGIBLE).
- Returns:
WaitTimeStatsDTO with descriptive statistics (data, columns, median).