Data Providers

Note

All get_* methods apply workload filters and date range automatically unless stated otherwise.

ClusterLoadDataProvider

class applications.load.providers.cluster_load.ClusterLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for cluster load metrics (cores, cost, CO₂, power).

Thin adapter over the cluster_load service. Resolves the accounting data source, workload filters, and hardware configuration from the workload, then delegates computation to the service layer.

Flat methods return aggregated timeseries DTOs. Grouped methods (*_by_group()) accept a grouping_key argument and return per-group timeseries.

Example

>>> provider = ClusterLoadDataProvider(workload)
>>> load = provider.get_core_load()
>>> print(f"Utilization: {load.utilization_rate:.2%}")
>>>
>>> daily = provider.get_core_load(resolution=Resolution.DAY)
>>> grouped = provider.get_core_load_by_group(grouping_key="Account")
get_co2_load(resolution: Resolution = Resolution.HOUR) Co2LoadDTO

Get cluster CO₂ footprint timeseries (kgCO₂e).

Parameters:

resolution – Time bucket size. Default: Resolution.HOUR.

Returns:

Co2LoadDTO with CO₂ timeseries and statistics.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

Co2LoadDTO

get_co2_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedCo2LoadDTO

Get CO₂ footprint grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

Returns:

GroupedCo2LoadDTO with one GroupEntryDTO per group (no waiting_values).

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

GroupedCo2LoadDTO

get_core_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) CoreLoadDTO

Get allocated cores timeseries with hardware capacity.

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

CoreLoadDTO with allocated/requested cores, capacity, and utilization.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> load = provider.get_core_load()
>>> print(f"Utilization: {load.utilization_rate:.1%}")
>>> load = provider.get_core_load(resolution=Resolution.DAY)
Return type:

CoreLoadDTO

get_core_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GroupedCoreLoadDTO

Get core load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GroupedCoreLoadDTO with one GroupEntryDTO per group.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> grouped = provider.get_core_load_by_group(grouping_key="Account")
>>> for g in grouped.groups:
...     print(g.group, g.avg_running)
Return type:

GroupedCoreLoadDTO

get_corehour_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) CorehourLoadDTO

Get core-hour load timeseries (allocated cores × elapsed time).

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

CorehourLoadDTO with core-hour accounting fields.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

CorehourLoadDTO

get_cost_load(resolution: Resolution = Resolution.HOUR) CostLoadDTO

Get cluster cost load timeseries.

Parameters:

resolution – Time bucket size. Default: Resolution.HOUR.

Returns:

CostLoadDTO with cost timeseries, unit, and statistics.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

CostLoadDTO

get_cost_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedCostLoadDTO

Get cost load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

Returns:

GroupedCostLoadDTO with one GroupEntryDTO per group (no waiting_values).

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

GroupedCostLoadDTO

get_power_load(resolution: Resolution = Resolution.HOUR) PowerLoadDTO

Get cluster power consumption timeseries (watts).

Parameters:

resolution – Time bucket size. Default: Resolution.HOUR.

Returns:

PowerLoadDTO with power timeseries and statistics.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

PowerLoadDTO

get_power_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedPowerLoadDTO

Get power consumption grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

Returns:

GroupedPowerLoadDTO with one GroupEntryDTO per group (no waiting_values).

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

GroupedPowerLoadDTO

CongestionDataProvider

class applications.congestion.providers.congestion.CongestionDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for congestion/contention analysis.

Thin adapter over the congestion service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = CongestionDataProvider(workload)
>>> metrics = provider.get_congestion_metrics()
>>> print(f"Avg running: {metrics.avg_running_pct:.1f}%")
get_congestion_metrics(data_type: DataType = DataType.CORE, resolution: Resolution = Resolution.DAY) CongestionMetricsDTO

Compute congestion metrics for the workload.

Parameters:
  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

  • resolution – Time bucket size (e.g. “1day”, “1hour”).

Returns:

CongestionMetricsDTO with timeseries and stats.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

CoresDataProvider

class applications.resources.providers.cores.CoresDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for core allocation distribution analysis.

Thin facade over the cores service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = CoresDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution() CoresFlatDistributionDTO

Get ungrouped core allocation distribution.

Returns:

CoresFlatDistributionDTO with per-bin statistics and extended descriptive stats. Status is NO_DATA when no jobs have allocated cores.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10) CoresGroupedDistributionDTO

Get grouped core allocation distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

Returns:

CoresGroupedDistributionDTO with per-group statistics. Status is NO_DATA when no jobs have allocated cores, or NO_RESULTS when the grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

CoresMemoryDataProvider

class applications.resources.providers.cores_memory.CoresMemoryDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for cores-vs-memory matrix analysis.

Example

>>> provider = CoresMemoryDataProvider(workload)
>>> result = provider.get_flat_matrix(constants.MAXRSS)
>>> result = provider.get_grouped_matrix(constants.MAXRSS, grouping_type="uid")
get_flat_matrix(req_or_consumed: str = 'MaxRSS') CoresMemoryFlatMatrixDTO

Get ungrouped cores-vs-memory 2D matrix.

Parameters:

req_or_consumed – Memory field — constants.REQMEM or constants.MAXRSS.

Returns:

CoresMemoryFlatMatrixDTO with per-bin job counts and core-hours. Status is NO_DATA when the memory field is absent or no jobs have allocated cores/memory.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_matrix(req_or_consumed: str = 'MaxRSS', grouping_type: str = 'uid', grouping_size: int = 10) CoresMemoryGroupedMatrixDTO

Get grouped cores-vs-memory 2D matrix.

Parameters:
  • req_or_consumed – Memory field — constants.REQMEM or constants.MAXRSS.

  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return.

Returns:

CoresMemoryGroupedMatrixDTO with per-group matrix data. Status is NO_DATA when the memory field is absent, or NO_RESULTS when grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

ExecTimeDataProvider

class applications.throughput.providers.exec_time.ExecTimeDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for execution time metrics.

Thin facade over the exec_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = ExecTimeDataProvider(workload)
>>> result = provider.get_exec_time()
get_exec_time(data_type: DataType = DataType.CORE) ExecTimeDistributionDTO

Get execution time histogram.

Parameters:

data_type – Resource dimension — CPU or GPU.

Returns:

ExecTimeDistributionDTO with bins and resource-hour sums.

get_exec_time_grouped(data_type: DataType = DataType.CORE, grouping_type: str = 'Account', grouping_size: int = 10) list[GroupedExecTimeEntryDTO] | None

Get grouped execution time histogram.

Parameters:
  • data_type – Resource dimension — CPU or GPU.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Dict with grouped exectime data.

get_exec_time_vs_timelimit(config: ExecTimeVsTimelimitConfig | None = None) ExecTimeVsTimelimitDTO

Get execution time vs timelimit ratio distribution.

Parameters:

config – Override configuration. When None, loads from DB.

Returns:

ExecTimeVsTimelimitDTO with categorized ratio data.

get_exec_time_vs_timelimit_grouped(config: ExecTimeVsTimelimitConfig | None = None, grouping_type: str = 'Account', grouping_size: int = 10) list[GroupedExecTimeVsTimelimitEntryDTO] | None

Get grouped execution time vs timelimit ratio distribution.

Parameters:
  • config – Override configuration. When None, loads from DB.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Dict with grouped ratio data and config.

GpuDataProvider

class applications.resources.providers.gpu.GpuDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for GPU distribution analysis.

Thin facade over the GPU service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = GpuDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution(gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) GpuFlatDistributionDTO

Get ungrouped GPU distribution.

Parameters:

gpu_field – ES field name — GpuAccountingField.ALLOC_GPUS or GpuAccountingField.REQ_GPUS.

Returns:

GpuFlatDistributionDTO with per-bin statistics and extended descriptive stats. Status is NO_DATA when the field is absent or no jobs have a positive GPU value.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10, gpu_field: GpuAccountingField = GpuAccountingField.ALLOC_GPUS) GpuGroupedDistributionDTO

Get grouped GPU distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

  • gpu_field – ES field name — GpuAccountingField.ALLOC_GPUS or GpuAccountingField.REQ_GPUS.

Returns:

GpuGroupedDistributionDTO with per-group statistics. Status is NO_DATA when the field is absent or no jobs have a positive GPU value, or NO_RESULTS when the grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

GpuLoadDataProvider

class applications.load.providers.gpu_load.GpuLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for GPU load metrics.

Thin adapter over the gpu_load service. Resolves the accounting data source, workload filters, and hardware GPU configuration from the workload, then delegates computation to the service layer.

Flat methods return aggregated timeseries DTOs. Grouped methods (*_by_group()) accept a grouping_key argument and return per-group timeseries.

Example

>>> provider = GpuLoadDataProvider(workload)
>>> load = provider.get_gpu_load()
>>> print(f"GPU Utilization: {load.utilization_rate:.2%}")
>>>
>>> grouped = provider.get_gpu_load_by_group(grouping_key="Account")
get_gpu_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GpuLoadDTO

Get allocated GPUs timeseries with hardware capacity.

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GpuLoadDTO with allocated/requested GPUs, capacity, and utilization.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> load = provider.get_gpu_load()
>>> print(f"GPU Utilization: {load.utilization_rate:.1%}")
Return type:

GpuLoadDTO

get_gpu_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GroupedGpuLoadDTO

Get GPU load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GroupedGpuLoadDTO with one GroupEntryDTO per group.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

GroupedGpuLoadDTO

get_gpuhour_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GpuhourLoadDTO

Get GPU-hour load timeseries (allocated GPUs × elapsed time).

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GpuhourLoadDTO with GPU-hour accounting fields.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

GpuhourLoadDTO

JobFrequencyDataProvider

class applications.throughput.providers.job_freq.JobFrequencyDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for job frequency and interarrival metrics.

Thin facade over the job_freq service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = JobFrequencyDataProvider(workload)
>>> result = provider.get_job_frequency()
get_interarrival(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) InterArrivalDTO

Get job interarrival time statistics.

Parameters:

datetime_col – Timestamp column.

Returns:

InterArrivalDTO with descriptive stats.

get_interarrival_grouped(datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) dict

Get grouped interarrival time statistics.

Parameters:
  • datetime_col – Timestamp column.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

Dict with grouped interarrival data.

get_job_frequency(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT) JobFrequencyDTO

Get job submission frequency timeseries.

Parameters:
  • resolution – Time bucket size.

  • datetime_col – Timestamp column for bucketing.

Returns:

JobFrequencyDTO with timeseries and stats.

get_job_frequency_grouped(resolution: Resolution = Resolution.DAY, datetime_col: ThroughputDatetimeCol = ThroughputDatetimeCol.SUBMIT, grouping_type: str = 'Account', grouping_size: int = 10) list

Get grouped job frequency timeseries.

Parameters:
  • resolution – Time bucket size.

  • datetime_col – Timestamp column.

  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

List of grouped frequency data dicts.

JobLoadDataProvider

class applications.load.providers.job_load.JobLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for job count load metrics.

Thin adapter over the job_load service. Resolves the accounting data source and workload filters, then delegates computation to the service layer.

Flat methods return aggregated timeseries DTOs. Grouped methods (*_by_group()) accept a grouping_key argument and return per-group timeseries.

Example

>>> provider = JobLoadDataProvider(workload)
>>> load = provider.get_job_load()
>>> print(f"Avg running jobs: {load.avg_running:.1f}")
>>>
>>> grouped = provider.get_job_load_by_group(grouping_key="Account")
get_job_load(resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) JobLoadDTO

Get job count timeseries for RUNNING and WAITING states.

Parameters:
  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

JobLoadDTO with running/waiting job counts and statistics.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Example

>>> load = provider.get_job_load()
>>> print(f"Avg running: {load.avg_running:.1f}")
>>> load = provider.get_job_load(resolution=Resolution.DAY)
Return type:

JobLoadDTO

get_job_load_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR, datetime_col: LoadDatetimeField = LoadDatetimeField.SUBMIT) GroupedJobLoadDTO

Get job count load grouped by a field (e.g., Account, Partition).

Parameters:
  • grouping_key – Elasticsearch field to group by (e.g. “Account”, “Partition”).

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size. Default: Resolution.HOUR.

  • datetime_col – Job timestamp field for bucketing. Default: LoadDatetimeField.SUBMIT.

Returns:

GroupedJobLoadDTO with one GroupEntryDTO per group.

Raises:

DataProviderError – If accounting filters are not configured or query fails.

Return type:

GroupedJobLoadDTO

JobsDataProvider

class applications.ait.providers.jobs.JobsDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for querying job records from accounting data.

Thin adapter over the jobs service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = JobsDataProvider(workload)
>>> page = provider.get_jobs_page(sort_field="Submit", limit=50)
>>> print(f"Showing {len(page.jobs)} of {page.total_count} jobs")
get_available_fields(max_fields: int = 100) JobFieldsDTO

Get priority-ordered field names from the ES mapping.

Parameters:

max_fields – Maximum number of fields to return.

Returns:

JobFieldsDTO with ordered field list and total count.

Raises:

DataProviderError – If data source is unavailable.

get_job_count() JobCountDTO

Count jobs matching the workload filters.

Returns:

JobCountDTO with total count.

Raises:

DataProviderError – If accounting filters are missing or query fails.

get_jobs_page(sort_field: str | None = None, sort_order: str = 'asc', offset: int = 0, limit: int = 25) JobsPageDTO

Fetch a paginated page of job records.

Parameters:
  • sort_field – Field to sort by.

  • sort_order – Sort direction (“asc” or “desc”).

  • offset – Number of records to skip.

  • limit – Number of records to return.

Returns:

JobsPageDTO with job dicts and pagination metadata.

Raises:

DataProviderError – If accounting filters are missing or query fails.

KpiDataProvider

class applications.kpi.providers.kpi.KpiDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for KPI dashboard metrics.

Thin facade over the KPI service layer. Resolves the accounting data source and workload filters, then delegates to individual services.

Example

>>> provider = KpiDataProvider(workload)
>>> logs = provider.get_logs_metrics()
>>> cost = provider.get_cost_metrics()
get_carbon_footprint() CarbonFootprintDTO

Get CO2 emissions.

Returns:

CarbonFootprintDTO with total carbon footprint.

get_core_stats() HardwareStatsDTO

Get core count statistics (min/max/avg).

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

HardwareStatsDTO for cores over the period.

get_cost_metrics() CostMetricsDTO

Get total cost.

Returns:

CostMetricsDTO with total cost.

get_energy_metrics() EnergyMetricsDTO

Get power and energy consumption.

Returns:

EnergyMetricsDTO with power and/or energy results.

get_gpu_stats() HardwareStatsDTO

Get GPU count statistics (min/max/avg).

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

HardwareStatsDTO for GPUs over the period.

get_job_count() int

Get total number of jobs.

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

Job count from the logs metrics.

get_logs_metrics() LogsMetricsDTO

Get job count, resource hours, usage, and hardware stats.

Returns:

LogsMetricsDTO with all job-level KPI metrics.

get_user_count() int

Get number of unique users.

Calls get_logs_metrics() internally; the result is memoized so no duplicate ES query is issued when multiple convenience methods are called on the same provider instance.

Returns:

Unique user count from the logs metrics.

MemoryDataProvider

class applications.resources.providers.memory.MemoryDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for memory distribution analysis.

Example

>>> provider = MemoryDataProvider(workload)
>>> result = provider.get_memory_flat_distribution(constants.MAXRSS)
>>> grouped = provider.get_memory_grouped_distribution(constants.MAXRSS, "Account")
get_memory_flat_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS) MemoryFlatDistributionDTO

Get ungrouped memory distribution.

Parameters:

req_or_consumed – Memory field — MemoryAccountingField.REQMEM or MemoryAccountingField.MAX_RSS.

Returns:

MemoryFlatDistributionDTO with memory bin series and extended descriptive stats.

get_memory_grouped_distribution(req_or_consumed: MemoryAccountingField = MemoryAccountingField.MAX_RSS, grouping_type: str = 'Account', grouping_size: int = 10) MemoryGroupedDistributionDTO

Get grouped memory distribution.

Parameters:
  • req_or_consumed – Memory field — MemoryAccountingField.REQMEM or MemoryAccountingField.MAX_RSS.

  • grouping_type – ES field used for grouping (e.g. "Account", "UID").

  • grouping_size – Number of top groups to return.

Returns:

MemoryGroupedDistributionDTO with per-group per-bin job counts and grouping summary stats.

MemoryRatioDataProvider

class applications.resources.providers.consumed_vs_requested_memory.MemoryRatioDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for consumed-vs-requested memory ratio analysis.

Example

>>> provider = MemoryRatioDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution() MemoryRatioFlatDistributionDTO

Get ungrouped consumed-vs-requested memory ratio distribution.

Returns:

MemoryRatioFlatDistributionDTO with per-bin job counts and extended descriptive stats. Status is NO_DATA when the MEMORYRATIO field is absent or all values are null/zero.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10) MemoryRatioGroupedDistributionDTO

Get grouped consumed-vs-requested memory ratio distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

Returns:

MemoryRatioGroupedDistributionDTO with per-group per-bin job counts. Status is NO_DATA when the MEMORYRATIO field is absent, or NO_RESULTS when grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

NodesDataProvider

class applications.resources.providers.nodes.NodesDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for node allocation distribution analysis.

Thin facade over the nodes service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = NodesDataProvider(workload)
>>> result = provider.get_flat_distribution()
>>> result = provider.get_grouped_distribution(grouping_type="uid")
get_flat_distribution() NodesFlatDistributionDTO

Get ungrouped node allocation distribution.

Returns:

NodesFlatDistributionDTO with per-bin statistics and extended descriptive stats. Status is NO_DATA when no jobs have a positive node allocation value.

Raises:

DataProviderError – If the data source cannot be resolved.

get_grouped_distribution(grouping_type: str, grouping_size: int = 10) NodesGroupedDistributionDTO

Get grouped node allocation distribution.

Parameters:
  • grouping_type – ES field to group by (e.g. "uid", "account").

  • grouping_size – Maximum number of top groups to return; remaining groups are collapsed into an “Others” bucket.

Returns:

NodesGroupedDistributionDTO with per-group statistics. Status is NO_DATA when no jobs have a positive node value, or NO_RESULTS when the grouping yields no groups.

Raises:

DataProviderError – If the data source cannot be resolved.

NodesLoadDataProvider

class applications.load.providers.nodes_load.NodesLoadDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for per-node job count metrics.

Thin wrapper around the nodes_load service. Resolves the accounting data source and filters from the workload, then delegates to the service.

Example

>>> provider = NodesLoadDataProvider(workload)
>>> counts = provider.get_nb_jobs_per_node()
>>> print(counts.node_counts)
{'node001': 42, 'node002': 15, ...}
get_nb_jobs_per_node() NodesJobCountDTO

Get job count per node over the workload’s date range.

Returns:

NodesJobCountDTO with node_counts mapping and nodes_per_line_display hint.

Raises:

DataProviderError – If accounting filters are not configured or the Elasticsearch query fails.

Return type:

NodesJobCountDTO

OccupancyDataProvider

class applications.load.providers.occupancy.OccupancyDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for cluster cores and nodes occupancy metrics.

Thin wrapper around the occupancy service. Resolves the monitoring data source and filters from the workload, then delegates to the service.

Unlike other load providers, occupancy data comes from the monitoring data source (nodes_stats_fetch pipeline → cores_stats index), not the accounting data source.

Example

>>> provider = OccupancyDataProvider(workload)
>>> cores = provider.get_cores_occupancy(resolution="1hour")
>>> print(f"Avg allocated: {sum(cores.allocated) / len(cores.allocated):.0f}")
>>>
>>> nodes = provider.get_nodes_occupancy(resolution="1day")
>>> grouped = provider.get_cores_occupancy_by_group("Partition", grouping_size=5)
get_cores_occupancy(resolution: Resolution) CoresOccupancyDTO

Get cores occupancy timeseries from the monitoring index.

Parameters:

resolution – Time bucket size string (e.g. "1hour", "1day"). This is a required parameter — no default is provided because occupancy always needs an explicit resolution.

Returns:

CoresOccupancyDTO with per-state core counts and total capacity.

Raises:

DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.

Example

>>> cores = provider.get_cores_occupancy(resolution="1hour")
>>> for ts, alloc in zip(cores.timestamps, cores.allocated):
...     print(f"{ts}: {alloc:.0f} cores allocated")
Return type:

CoresOccupancyDTO

get_cores_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedCoresOccupancyDTO

Get cores occupancy grouped by a field (e.g., Partition, Queue).

Parameters:
  • grouping_key – Elasticsearch field to group by.

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size string. Default: "1hour".

Returns:

GroupedCoresOccupancyDTO with one CoresOccupancyGroupEntryDTO per group.

Raises:

DataProviderError – If monitoring filters are not configured or query fails.

Example

>>> grouped = provider.get_cores_occupancy_by_group("Partition")
>>> for g in grouped.groups:
...     print(g.group, sum(g.allocated) / len(g.allocated))
Return type:

GroupedCoresOccupancyDTO

get_nodes_occupancy(resolution: Resolution) NodesOccupancyDTO

Get nodes occupancy timeseries from the monitoring index.

Parameters:

resolution – Time bucket size string (e.g. "1hour", "1day"). Required — no default provided.

Returns:

NodesOccupancyDTO with per-state node counts and totals.

Raises:

DataProviderError – If monitoring filters are not configured, no data is available, or the Elasticsearch query fails.

Example

>>> nodes = provider.get_nodes_occupancy(resolution="1day")
>>> for ts, avail in zip(nodes.timestamps, nodes.available):
...     print(f"{ts}: {avail:.0f} nodes available")
Return type:

NodesOccupancyDTO

get_nodes_occupancy_by_group(grouping_key: str, grouping_size: int = 10, resolution: Resolution = Resolution.HOUR) GroupedNodesOccupancyDTO

Get nodes occupancy grouped by a field (e.g., Partition, Queue).

Parameters:
  • grouping_key – Elasticsearch field to group by.

  • grouping_size – Number of top groups to show individually. Default: 10.

  • resolution – Time bucket size string. Default: "1hour".

Returns:

GroupedNodesOccupancyDTO with one NodesOccupancyGroupEntryDTO per group.

Raises:

DataProviderError – If monitoring filters are not configured or query fails.

Example

>>> grouped = provider.get_nodes_occupancy_by_group("Partition")
>>> for g in grouped.groups:
...     print(g.group, sum(g.available) / len(g.available))
Return type:

GroupedNodesOccupancyDTO

SlowdownDataProvider

class applications.throughput.providers.slowdown.SlowdownDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for slowdown metrics.

Thin facade over the slowdown service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = SlowdownDataProvider(workload)
>>> dist = provider.get_slowdown_distribution()
>>> stats = provider.get_slowdown_stats()
get_slowdown_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) SlowdownDistributionDTO

Get slowdown PDF/CDF distribution.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

SlowdownDistributionDTO with a DataFrame of [value, frequency, cdf].

get_slowdown_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) SlowdownStatsDTO

Get slowdown descriptive statistics.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

SlowdownStatsDTO with descriptive statistics (data, columns, median).

StateDataProvider

class applications.state.providers.state.StateDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for job state distribution analysis.

Thin adapter over the state service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = StateDataProvider(workload)
>>> status = provider.get_jobs_status()
>>> for entry in status.entries:
...     print(f"{entry.state}: {entry.count}")
get_jobs_status(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) JobsStatusDTO

Get flat job status distribution (one entry per state).

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

Returns:

JobsStatusDTO with per-state counts and totals.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

get_jobs_status_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) GroupedJobsStatusDTO

Get grouped job status distribution (per-state, per-group breakdown).

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

  • grouping_type – Field to group by (e.g. “Account”, “UID”, “GID”).

  • grouping_size – Maximum number of groups (excess goes to “Others”).

Returns:

GroupedJobsStatusDTO with per-state, per-group breakdowns.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

get_jobs_status_yms(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE) JobsStatusYmsDTO

Get flat year-month state distribution.

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

Returns:

JobsStatusYmsDTO with per-month, per-state breakdowns.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

get_jobs_status_yms_grouped(category: MetricCategory = MetricCategory.JOBS, data_type: DataType = DataType.CORE, grouping_type: GroupingField | str = GroupingField.ACCOUNT, grouping_size: int = 10) GroupedJobsStatusYmsDTO

Get grouped year-month state distribution.

Parameters:
  • category – What to measure per state. Use MetricCategory for autocompletion — e.g. MetricCategory.CORE_HOURS.

  • data_type – Resource dimension. Use DataType.GPU for GPU metrics.

  • grouping_type – Field to group by.

  • grouping_size – Maximum number of groups.

Returns:

GroupedJobsStatusYmsDTO with per-group, per-month breakdowns.

Raises:

DataProviderError – If accounting filters are missing or computation fails.

SubmitDateDataProvider

class applications.throughput.providers.submit_date.SubmitDateDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for submission date metrics.

Thin facade over the submit_date service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = SubmitDateDataProvider(workload)
>>> result = provider.get_submit_hour()
get_submit_hour() SubmitHourDTO

Get hour-of-day submission histogram.

Returns:

SubmitHourDTO with hour values (0–23) and job counts per hour.

get_submit_hour_grouped(grouping_type: str = 'Account', grouping_size: int = 10) list | None

Get hour-of-day submission histogram broken down by a grouping dimension.

Parameters:
  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

List of per-group dicts with submit_hour and submit_hour_count, or None when no jobs match the filters.

get_submit_weekday() SubmitWeekdayDTO

Get day-of-week submission histogram.

Returns:

SubmitWeekdayDTO with weekday names (Monday–Sunday) and job counts.

get_submit_weekday_grouped(grouping_type: str = 'Account', grouping_size: int = 10) list | None

Get day-of-week submission histogram broken down by a grouping dimension.

Parameters:
  • grouping_type – ES field to group by.

  • grouping_size – Max number of groups.

Returns:

List of per-group dicts with submit_weekday and submit_weekday_count, or None when no jobs match the filters.

WaitTimeDataProvider

class applications.throughput.providers.wait_time.WaitTimeDataProvider(workload: Workload)

Bases: BaseDataProvider

Provider for wait time metrics.

Thin facade over the wait_time service. Resolves the accounting data source and workload filters, then delegates to the service layer.

Example

>>> provider = WaitTimeDataProvider(workload)
>>> dist = provider.get_wait_time_distribution()
>>> stats = provider.get_wait_time_stats()
get_wait_time_distribution(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) WaitTimeDistributionDTO

Get wait time PDF/CDF distribution.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

WaitTimeDistributionDTO with a DataFrame of [value, frequency, cdf].

get_wait_time_stats(datetime_col: SubmissionDatetimeCol = SubmissionDatetimeCol.SUBMIT) WaitTimeStatsDTO

Get wait time descriptive statistics.

Parameters:

datetime_col – Timestamp column (SUBMIT or ELIGIBLE).

Returns:

WaitTimeStatsDTO with descriptive statistics (data, columns, median).