Data enhancers
You can create your own data enhancers to augment the data used by OKA. Data enhancers can be applied:
During logs ingestion to add new features to each job that will be saved in the Elasticsearch database.
By the
enhancers_pipeline
to add or update features of jobs already saved in the Elasticsearch database.By Predict-IT plugin to dynamically enhance your dataset.
Whenever you create or update a data enhancer, you must restart okaserver for these changes to be taken into account.
Format
The data enhancers must follow a specific format in order to be understood by OKA:
You must create a Python class containing the word
enhancer
(case and position have no impact).Your class must have a
run(self, data, **kwargs)
method into which you will create the new features.data
is a Pandas dataframe containing jobs in rows and features in columns. Features list contains but is not limited to: Account, Allocated_Cores, Allocated_Nodes, Allocated_Memory, Cluster, Comment, Eligible, End, GID, JobID, JobName, MaxRSS, Partition, QOS, Requested_Cores, Requested_Nodes, Requested_Memory, Start, State, Submit, Timelimit, UID (contact UCit Support for more details).kwargs
is a dictionary containing complementary information about OKA pipelines for Predict-IT plugin (contact UCit Support for more details).
You must modify
data
in-place in order to add the new features as new columns (do not returndata
).Warning
All columns created or modified by a data enhancer will be saved in the database.
Your filename has no impact and you can define multiple data enhancers in the same file.
You can add a global variable named
VERSION
to your class (optional). It will be visible in OKA logs whenever the data enhancer is executed.
Example of data enhancer:
class EnhancerMyNewFeatures():
VERSION = "1.0"
def run(self, data, **kwargs):
"""
Data enhancer that computes my new features named feature1 and feature2.
"""
data.loc[:, "feature1"] = data["Timelimit"] / 60
data.loc[:, "feature2"] = data["Partition"].str.lower()
OKA will automatically add the Cust_
prefix to the name of the features you have created.
In the above example, the features will be named Cust_feature1
and Cust_feature2
.
The newly created features will be visible in several plugins (Consumers, Predict-IT…).
Logging
If you want to add some logs in your Data Enhancer, you can use the logging
Python module with the oka_main
logger:
import logging
logger = logging.getLogger("oka_main")
class EnhancerMyNewFeatures():
VERSION = "1.0"
def run(self, data, **kwargs):
logger.info("Running the EnhancerMyNewFeatures Data Enhancer")
[...]
Location
You must place your data enhancer files into the ${OKA_INSTALL_DIR}/data/processor
directory.
Configuration
In order for OKA to know when to use your data enhancers, you can provide the list of enhancers to apply in the pipeline configuration (see Conf pipelines). This list contains comma-separated classes names and will be used in the provided order to apply the respective data enhancers.
Specific enhancers
You can define 2 enhancers to compute the cost and the power consumption of each job during logs ingestion. However, in these specific cases, you must use specific columns names:
Cost
for the cost of each job in currency as set in the cluster configurationPower
for the power consumption of each job in wattsEnergy
for the energy consumption of each job in joules
These columns will not be renamed using the above-mentioned prefix.
Example of data enhancer to compute the jobs cost:
from sklearn import preprocessing
class EnhancerCostFeature():
def run(self, data, **kwargs):
"""
Example of data enhancer that computes a dummy cost for each job based on a
cost for a core/hour and the partitions.
"""
cost_column_name = "Cost" # DO NOT CHANGE!
# Compute a dummy cost
weights = preprocessing.LabelEncoder().fit_transform(data["Partition"])
weights = 1 + (weights / 10) # 1, 1.1, 1.2, ...
cost_core_hour = 0.02 # Cost of a core/hour in currency as set in the cluster configuration
data.loc[:, cost_column_name] = data["Core_hours"] * (cost_core_hour * weights)