Data enhancers
You can create your own data enhancers to augment the data used by OKA. Data enhancers can be applied:
During logs ingestion to add new features to each job that will be saved in the Elasticsearch database.
By the
enhancers_pipeline
to add or update features of jobs already saved in the Elasticsearch database.By
OKA Predict (Predictor)
plugin to dynamically enhance your dataset.
Whenever you create or update a data enhancer, you must restart okaserver for these changes to be taken into account.
Format
The data enhancers must follow a specific format in order to be understood by OKA:
You must create a Python class containing the word
enhancer
(case and position have no impact).Your class must have a
run(self, data, **kwargs)
method into which you will create the new features.data
is a Pandas dataframe containing jobs in rows and features in columns. Features list contains but is not limited to: Account, Allocated_Cores, Allocated_Nodes, Allocated_Memory, Cluster, Comment, Eligible, End, GID, JobID, JobName, MaxRSS, Partition, QOS, Requested_Cores, Requested_Nodes, Requested_Memory, Start, State, Submit, Timelimit, UID (contact UCit Support for more details).kwargs
is a dictionary containing complementary information about OKA pipelines forOKA Predict (Predictor)
plugin (contact UCit Support for more details).
You must modify
data
in-place in order to add the new features as new columns (do not returndata
).Warning
All columns created or modified by a data enhancer will be saved in the database.
Your filename has no impact and you can define multiple data enhancers in the same file.
You can add a global variable named
VERSION
to your class (optional). It will be visible in OKA logs whenever the data enhancer is executed.
Example of data enhancer:
class EnhancerMyNewFeatures():
VERSION = "1.0"
def run(self, data, **kwargs):
"""
Data enhancer that computes my new features named feature1 and feature2.
"""
data.loc[:, "feature1"] = data["Timelimit"] / 60
data.loc[:, "feature2"] = data["Partition"].str.lower()
OKA will automatically add the Cust_
prefix to the name of the features you have created.
In the above example, the features will be named Cust_feature1
and Cust_feature2
.
The newly created features will be visible in several plugins (Consumers, OKA Predict (Predictor)…).
Logging
If you want to add some logs in your Data Enhancer, you can use the logging
Python module with the oka_main
logger:
import logging
logger = logging.getLogger("oka_main")
class EnhancerMyNewFeatures():
VERSION = "1.0"
def run(self, data, **kwargs):
logger.info("Running the EnhancerMyNewFeatures Data Enhancer")
[...]
Location
You must place your data enhancer files into the ${OKA_INSTALL_DIR}/data/processor
directory.
Configuration
In order for OKA to know when to use your data enhancers, you can provide the list of enhancers to apply in the pipeline configuration (see Conf pipelines). This list contains comma-separated classes names and will be used in the provided order to apply the respective data enhancers.
Specific enhancers
You can define 2 enhancers to compute the cost and the power consumption of each job during logs ingestion. However, in these specific cases, you must use specific columns names:
Cost
for the cost of each job in currency as set in the cluster configurationPower
for the power consumption of each job in wattsEnergy
for the energy consumption of each job in joules
These columns will not be renamed using the above-mentioned prefix.
Example of data enhancer to compute the jobs cost:
from sklearn import preprocessing
class EnhancerCostFeature():
def run(self, data, **kwargs):
"""
Example of data enhancer that computes a dummy cost for each job based on a
cost for a core/hour and the partitions.
"""
cost_column_name = "Cost" # DO NOT CHANGE!
# Compute a dummy cost
weights = preprocessing.LabelEncoder().fit_transform(data["Partition"])
weights = 1 + (weights / 10) # 1, 1.1, 1.2, ...
cost_core_hour = 0.02 # Cost of a core/hour in currency as set in the cluster configuration
data.loc[:, cost_column_name] = data["Core_hours"] * (cost_core_hour * weights)
Starting from version 2.7.0, you can apply this enhancer on existing data to compute missing GPU_hours
data.
class EnhancerGPUHours():
def run(self, data, **kwargs):
"""Compute GPU_hours."""
# DO NOT CHANGE THE COLS NAME.
if "Allocated_GPU" in data.columns:
data.loc[:, "GPU_hours"] = data["Allocated_GPU"].multiply(data["Execution_Time"] / 3600.0)
# Fill empty values with 0.0 in place
data.loc[:, "GPU_hours"].fillna(0.0, inplace=True)
else:
# Make sur to ALWAYS get a value for GPU hours even if there are no data to compute it.
# This is to avoid the fact that we could end up loading logs without any info regarding GPU once
# after we already load some that contains the info. We would have an inconsistency in the data
# with some empty value set to Null and others to 0.0
data.loc[:, "GPU_hours"] = 0.0
return data
Apply on existing data
Enhancers can be applied to existing parts of the data stored in Elasticsearch through the use of the enhancers_pipeline
.
You can specify the enhancers to be applied when running this pipeline by updating its conf as explained here Configuration.
You can run this pipeline using the corresponding Periodic tasks <cluster_name>_enhancers_pipeline
to apply the enhancers.
OKA will retrieve data by chunks and apply all the listed enhancers to a single chunk at a time (to limit resources consumption) and send the updated data to Elasticsearch. By default, configuration is made so that a chunk size is set to 1000.
You can change this default configuration through the admin panel by going into the Conf assets
and editing the output_enhanced_features
object linked with your cluster.
This object contains a dict in its Additional data
param allowing you to configure the behavior you want depending on your needs:
chunk
: How many jobs do you want to process at once.save_to_elastic
: True if you want to update your Elasticsearch database with the changes made by the enhancers, False otherwise.
Warning
If you need to compute something based on the full dataset then using chunks might not work for you as the enhancers will be applied using only the subset of data retrieved by each loop cycle call to Elasticsearch.
Available options are:
Set the chunk size to the maximum number of jobs currently logged into your database.
Search for another way to compute what you need through a first process and then, apply it by re-running another enhancer with your pre-computed value(s).