Data enhancers

You can create your own data enhancers to augment the data used by OKA. Data enhancers can be applied:

  • During logs ingestion to add new features to each job that will be saved in the Elasticsearch database.

  • By the enhancers_pipeline to add or update features of jobs already saved in the Elasticsearch database.

  • By OKA Predict (Predictor) plugin to dynamically enhance your dataset.

Whenever you create or update a data enhancer, you must restart okaserver for these changes to be taken into account.

Format

The data enhancers must follow a specific format in order to be understood by OKA:

  • You must create a Python class containing the word enhancer (case and position have no impact).

  • Your class must have a run(self, data, **kwargs) method into which you will create the new features.

    • data is a Pandas dataframe containing jobs in rows and features in columns. Features list contains but is not limited to: Account, Allocated_Cores, Allocated_Nodes, Allocated_Memory, Cluster, Comment, Eligible, End, GID, JobID, JobName, MaxRSS, Partition, QOS, Requested_Cores, Requested_Nodes, Requested_Memory, Start, State, Submit, Timelimit, UID (contact UCit Support for more details).

    • kwargs is a dictionary containing complementary information about OKA pipelines for OKA Predict (Predictor) plugin (contact UCit Support for more details).

  • You must modify data in-place in order to add the new features as new columns (do not return data).

    Warning

    All columns created or modified by a data enhancer will be saved in the database.

  • Your filename has no impact and you can define multiple data enhancers in the same file.

  • You can add a global variable named VERSION to your class (optional). It will be visible in OKA logs whenever the data enhancer is executed.

Example of data enhancer:

class EnhancerMyNewFeatures():
    VERSION = "1.0"
    def run(self, data, **kwargs):
        """
            Data enhancer that computes my new features named feature1 and feature2.
        """
        data.loc[:, "feature1"] = data["Timelimit"] / 60
        data.loc[:, "feature2"] = data["Partition"].str.lower()

OKA will automatically add the Cust_ prefix to the name of the features you have created. In the above example, the features will be named Cust_feature1 and Cust_feature2. The newly created features will be visible in several plugins (Consumers, OKA Predict (Predictor)…).

Logging

If you want to add some logs in your Data Enhancer, you can use the logging Python module with the oka_main logger:

import logging

logger = logging.getLogger("oka_main")

class EnhancerMyNewFeatures():
    VERSION = "1.0"
    def run(self, data, **kwargs):
        logger.info("Running the EnhancerMyNewFeatures Data Enhancer")
        [...]

Location

You must place your data enhancer files into the ${OKA_INSTALL_DIR}/data/processor directory.

Configuration

In order for OKA to know when to use your data enhancers, you can provide the list of enhancers to apply in the pipeline configuration (see Conf pipelines). This list contains comma-separated classes names and will be used in the provided order to apply the respective data enhancers.

Specific enhancers

You can define 2 enhancers to compute the cost and the power consumption of each job during logs ingestion. However, in these specific cases, you must use specific columns names:

  • Cost for the cost of each job in currency as set in the cluster configuration

  • Power for the power consumption of each job in watts

  • Energy for the energy consumption of each job in joules

These columns will not be renamed using the above-mentioned prefix.

Example of data enhancer to compute the jobs cost:

from sklearn import preprocessing

class EnhancerCostFeature():

    def run(self, data, **kwargs):
        """
            Example of data enhancer that computes a dummy cost for each job based on a
            cost for a core/hour and the partitions.
        """
        cost_column_name = "Cost"  # DO NOT CHANGE!
        # Compute a dummy cost
        weights = preprocessing.LabelEncoder().fit_transform(data["Partition"])
        weights = 1 + (weights / 10)  # 1, 1.1, 1.2, ...
        cost_core_hour = 0.02  # Cost of a core/hour in currency as set in the cluster configuration
        data.loc[:, cost_column_name] = data["Core_hours"] * (cost_core_hour * weights)