OKA
This section presents how to proceed with an update/upgrade of OKA.
Update
Obtaining latest OKA package
OKA package can be downloaded from UCit website:
$> wget --user <USERNAME> --password <PASSWORD> -O oka-X.Y.Z.run https://ucit.fr/product-publish/oka/package/latest.php?file=run
Please contact UCit Support to obtain your <USERNAME> and <PASSWORD> to access the package.
Replace X.Y.Z by the version of OKA you want to download.
Prior to any update, you should check the SHA256 of the installer to ensure that it hasn’t been tampered with.
To do so, first download the oka-X.Y.Z.sha256 file and run the shasum utility:
$> wget --user <USERNAME> --password <PASSWORD> -O oka-X.Y.Z.sha256 https://ucit.fr/product-publish/oka/package/latest.php?file=sha256
$> shasum -c oka-X.Y.Z.sha256
oka-X.Y.Z.sha256: OK
Requirements
Prior to going further, make sure that all requirements are up to date with the version you are trying to install by checking the Requirements page.
Upgrading from v2 to v3
When proceeding with an upgrade to the v3 major version of OKA pay attention to PostgreSQL and Elasticsearch as the supported versions have changed.
Upgrade of PostgreSQL to a version >=14 is mandatory. See Migrating to latest version.
Upgrade of Elasticsearch to a version 9.x is recommended. See Migrating from Elasticsearch 7 to Elasticsearch 9.
Warning
Elasticsearch 7.x official supports ends in January 2026 therefore, no further update will be made to ensure OKA’s compatibility in futur releases. It is highly recommended that you proceed with an upgrade to Elasticsearch 9.x.
Backup
It is recommended that you do a backup of both PostgreSQL and Elasticsearch databases before updating OKA to a new version.
Procedure for both database are available through the following links:
Update installation
Warning
The installer must have internet access to allow the download of python packages requirements.
The installer needs to be executed by either the
rootuser or a user withsudoprivileges.
OKA installer comes with a set of predefined command line options available. For example, to access the help, simply run:
./oka-X.Y.Z.run -h
When updating or upgrading OKA, the existing configuration will be used for the newly installed version. Only the mandatory missing parameters will be asked to the user in interactive mode. However, in batch mode, the installation will be stopped if something is missing. To avoid this, you can either update the existing conf before running the installer or, provide a new configuration file directly (see Batch mode).
Important
When upgrading from v2 to v3, you will have to provide a valid v3 license file.
In Interactive mode, you will be asked for the new path.
In Batch mode, you will have to use the
-loption to provide it.
If you do not have a license file yet, please contact UCit Support or your OKA reseller.
For more information regarding the way OKA handles its license, see License
Interactive mode
The installer oka-X.Y.Z.run will guide you through the installation of OKA on your device.
Start the process by calling:
./oka-X.Y.Z.run
Batch mode
OKA can also be installed in a non-interactive way.
To do so, you need to specify the required parameter through a configuration file that contains all the configuration variables of OKA.
This file should match exactly the content of an oka.yaml file generated by the installer in ${OKA_INSTALL_DIR}/config/oka/oka.yaml (see OKA Parameter Reference).
To run the installer in batch mode, you need to run the following command line:
./oka-X.Y.Z.run -b -c <PATH_TO_YAML_CONF_FILE> -i ${OKA_INSTALL_DIR}
with:
-b: activation of the batch installation mode-c <PATH_TO_YAML_CONF_FILE>: path to configuration file (see OKA Parameter Reference)-i ${OKA_INSTALL_DIR}: path to install directory
Post update
The following section present the different actions that might be required post update/upgrade.
Upgrading from v2 to v3
After an upgrade from v2 to v3 depending on your own context, you might have to execute one or more additional script in order to clean-up your existing data.
Warning
The following actions might end-up taking a lot of time depending on the size of your Elasticsearch database.
We recommend to execute them using tools such as tmux to avoid untimely deconnections during the process.
Cleanup
Note
For clients dealing with Slurm job schedulers, this will be necessary to remove potentially duplicated jobs due to previously misinterprated job array main entry.
To accommodate changes in how OKA handles some columns internally, it is recommended that you proceed to a cleanup for all your existing accounting index.
The following script should be saved in ${OKA_INSTALL_DIR}/current/bin/tools/ and can be used to handle this automatically for you once configured properly.
handle_switch_from_jobid_to_jobidrawn.py
# ============================================================================ # Elasticsearch cleanup module for duplicates and UUID regeneration. # # This module can be used as a standalone script or imported as a package. # Reports are automatically saved to timestamped folders (report_es_<timestamp>/). # # Installation: # Place this script in: <oka_dir>/current/bin/tools # # Commands: # identify_missing_end_field - Identify documents without 'End' field # identify_duplicates - Identify duplicate JobID_Raw documents # fix - Execute all operations (identify missing, identify duplicates, delete, regenerate UUID) # # Options: # --conf-path PATH Path to OKA YAML configuration file (required) # --indices NAMES List of Elasticsearch index names to process (required) # --command {identify_missing_end_field, # identify_duplicates, # fix} Command to execute (required) # --dry-run Preview changes without executing them # # Usage Examples: # # # Identify documents without 'End' field # uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_missing_end_field # # # Identify duplicates # uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_duplicates # # # Identify jobs duplicated or without 'End' field, delete duplicates, generate new uuid to fix jobs. # uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command fix # # # Dry run mode (preview without executing) # uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command fix --dry-run # # # Multiple indices # uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 index2 index3 --command fix # # Reports Generated: # - identified_missing_end_field.json: Documents without End field # - identified_duplicates.json: Duplicate JobID_Raw groups # - result_report.json: Final workflow summary (only with 'all' command) # ============================================================================ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan from datetime import datetime from typing import Dict, Set, Optional, Tuple import sys import warnings from pathlib import Path from tqdm import tqdm import json # Suppress Elasticsearch warnings warnings.filterwarnings("ignore", category=Warning, module="elasticsearch") # Import OKA configuration classes sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "src")) from lib.tools.configuration import ElasticsearchConfig, OKAConfig # ============================================================================ # HELPERS # ============================================================================ class ReportManager: """Report manager for timestamped reports""" def __init__(self, base_dir: Path = Path.cwd()): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.report_dir = base_dir / f"report_es_{timestamp}" self.report_dir.mkdir(parents=True, exist_ok=True) def save(self, filename: str, data: Dict) -> None: """Save report to timestamped directory""" path = self.report_dir / filename with open(path, "w") as f: json.dump(data, f, indent=2, default=str) print(f"✓ Saved: {path}") def create_es_client_from_oka(es_config: ElasticsearchConfig) -> Elasticsearch: """Create Elasticsearch client from OKA configuration""" from pydantic import SecretStr host = str(es_config.host) conn_params = { "hosts": [host], "verify_certs": es_config.verify_certs, } # Add authentication if provided if es_config.user and es_config.password: password = ( es_config.password.get_secret_value() if isinstance(es_config.password, SecretStr) else es_config.password ) conn_params["basic_auth"] = (es_config.user, password) # Only add TLS options if verify_certs is enabled if es_config.verify_certs: if es_config.ca_cert_path: conn_params["ca_certs"] = str(es_config.ca_cert_path) if es_config.client_cert_path: conn_params["client_cert"] = str(es_config.client_cert_path) if es_config.client_key_path: conn_params["client_key"] = str(es_config.client_key_path) # Ignore es warnings warnings.filterwarnings("ignore") import urllib3 urllib3.disable_warnings() return Elasticsearch(**conn_params) def regenerate_uuid( es: Elasticsearch, index: str, document_ids: Optional[Set[str]] = None, dry_run: bool = False, batch_size: int = 5000, ) -> int: """ Regenerate UUID field for specified documents or all documents. Args: es: Elasticsearch client index: Index name document_ids: Set of specific document IDs to update (None = all documents) dry_run: If True, only show what would be updated batch_size: Number of documents to update per batch Returns: Number of documents updated """ print(f"\n{'=' * 60}") print(f"Regenerating UUID - {index}") print(f"{'=' * 60}") if dry_run: print("⚠ DRY RUN MODE - No updates will be performed") if document_ids: print(f"Updating {len(document_ids)} specific documents") query = {"query": {"ids": {"values": list(document_ids)}}} total_docs = len(document_ids) else: print("Updating ALL documents in index") query = {"query": {"match_all": {}}} count_result = es.count(index=index) total_docs = count_result["count"] if dry_run: sample_docs = es.search( index=index, body={ "size": 5, "query": query["query"], "_source": ["JobID_Raw", "Submit", "uuid", "timestamp"], }, ) print("\nSample UUID transformations:") for doc in sample_docs["hits"]["hits"]: source = doc["_source"] old_uuid = source.get("uuid", "N/A") try: new_uuid = generate_uuid_from_source(source) print(f"\n Document ID: {doc['_id']}") print(f" JobID_Raw: {source.get('JobID_Raw')}") print(f" Submit: {source.get('Submit')}") print(f" Old UUID: {old_uuid}") print(f" New UUID: {new_uuid}") print(f" Changed: {old_uuid != new_uuid}") except ValueError as e: print(f"\n Document ID: {doc['_id']}") print(f" Error: {e}") print(f"\n⚠ DRY RUN - Would update {total_docs} documents") return 0 updated_count = 0 failed_count = 0 update_ops = [] print("Scanning and updating documents...") for doc in tqdm( scan(es, index=index, query=query, _source=["JobID_Raw", "Submit"]), total=total_docs, desc="Updating", ): try: new_uuid = generate_uuid_from_source(doc["_source"]) update_ops.append( { "_op_type": "update", "_index": index, "_id": doc["_id"], "doc": {"uuid": new_uuid}, } ) if len(update_ops) >= batch_size: success, failed = bulk( es, update_ops, raise_on_error=False, stats_only=True ) updated_count += success if failed: failed_count += failed if isinstance(failed, int) else len(failed) update_ops = [] except ValueError as e: print(f"\n⚠ Warning: Skipping document {doc['_id']}: {e}") failed_count += 1 if update_ops: success, failed = bulk(es, update_ops, raise_on_error=False, stats_only=True) updated_count += success if failed: failed_count += failed if isinstance(failed, int) else len(failed) print("\nRefreshing index...") es.indices.refresh(index=index) print("✓ Complete:") print(f" Total updated: {updated_count}") print(f" Failed: {failed_count}") return updated_count def generate_uuid_from_source(source: Dict) -> str: """ Generate UUID by concatenating JobID_Raw and Submit fields. Args: source: Document source dictionary Returns: Generated UUID string """ column_list = ["JobID_Raw", "Submit"] # Check if all specified columns exist in the source missing_columns = [col for col in column_list if col not in source] if missing_columns: raise ValueError( f"Missing required columns for UUID generation: {missing_columns}" ) # Create UUID by concatenating all specified columns uuid = "_".join(str(source[col]) for col in column_list) return uuid # ============================================================================ # COMMAND FUNCTIONS # ============================================================================ def identify_missing_end_field( es: Elasticsearch, index: str, report_manager: ReportManager ) -> Dict: """ Identify all documents that don't have an 'End' field value. Args: es: Elasticsearch client index: Index name report_manager: ReportManager instance for saving reports Returns: Dictionary with results and document IDs """ print(f"\n{'=' * 60}") print(f"Identifying documents without 'End' field - {index}") print(f"{'=' * 60}") # Query for documents where End field doesn't exist query = {"query": {"bool": {"must_not": {"exists": {"field": "End"}}}}} count_result = es.count(index=index, body=query) total_docs = count_result["count"] print(f"Found {total_docs} documents without 'End' field") if total_docs == 0: result = {"total": 0, "document_ids": set(), "samples": []} return result doc_ids = set() samples = [] print("Scanning documents...") for doc in tqdm( scan( es, index=index, query=query, _source=["JobID_Raw", "Submit", "timestamp", "uuid"], ), total=total_docs, desc="Scanning", ): doc_ids.add(doc["_id"]) samples.append({"id": doc["_id"], "source": doc["_source"]}) result = {"total": total_docs, "document_ids": doc_ids, "samples": samples} report_data = { "index": index, "timestamp": datetime.now().isoformat(), "total_documents": total_docs, "document_ids": list(doc_ids), "samples": samples, } report_manager.save("identified_missing_end_field.json", report_data) print(f"✓ Identified: {total_docs} documents without 'End' field") return result def identify_duplicates( es: Elasticsearch, index: str, report_manager: ReportManager ) -> Dict: """ Identify all documents with duplicate JobID_Raw values. Args: es: Elasticsearch client index: Index name report_manager: ReportManager instance for saving reports Returns: Dictionary with duplicate groups and document IDs """ print(f"\n{'=' * 60}") print(f"Identifying duplicates by JobID_Raw - {index}") print(f"{'=' * 60}") duplicate_groups = {} all_duplicate_doc_ids = set() after = None total_duplicate_docs = 0 while True: # Use cardinality instead of value_count on _id to avoid the deprecation warning query = { "size": 0, "aggs": { "jobid_groups": { "composite": { "size": 10000, "sources": [ {"jobid_raw": {"terms": {"field": "JobID_Raw.keyword"}}} ], }, "aggs": { "all_docs": { "top_hits": { "size": 100, "sort": [{"timestamp": {"order": "desc"}}], "_source": [ "timestamp", "Submit", "uuid", "JobID", "JobID_Raw", "End", ], } } }, } }, } if after: query["aggs"]["jobid_groups"]["composite"]["after"] = after result = es.search(index=index, body=query) buckets = result["aggregations"]["jobid_groups"]["buckets"] if not buckets: break for bucket in buckets: # Use the actual number of documents from top_hits instead of value_count docs = bucket["all_docs"]["hits"]["hits"] doc_count = len(docs) if doc_count > 1: jobid_raw = bucket["key"]["jobid_raw"] for doc in docs: all_duplicate_doc_ids.add(doc["_id"]) duplicate_groups[jobid_raw] = { "total_docs": doc_count, "keeper": { "id": docs[0]["_id"], "timestamp": docs[0]["_source"].get("timestamp"), "source": docs[0]["_source"], }, "to_delete": [ { "id": doc["_id"], "timestamp": doc["_source"].get("timestamp"), "source": doc["_source"], } for doc in docs[1:] ], } total_duplicate_docs += doc_count - 1 after = result["aggregations"]["jobid_groups"].get("after_key") print( f" Processed batch... Found {len(duplicate_groups)} duplicate groups so far" ) result = { "total_groups": len(duplicate_groups), "total_duplicate_docs": total_duplicate_docs, "duplicate_groups": duplicate_groups, "all_duplicate_doc_ids": all_duplicate_doc_ids, } report_data = { "index": index, "timestamp": datetime.now().isoformat(), "total_duplicate_groups": len(duplicate_groups), "total_duplicate_documents": total_duplicate_docs, "duplicate_groups": { k: { "total_docs": v["total_docs"], "keeper_id": v["keeper"]["id"], "keeper_timestamp": v["keeper"]["timestamp"], "delete_ids": [d["id"] for d in v["to_delete"]], } for k, v in list(duplicate_groups.items())[:100] }, } report_manager.save("identified_duplicates.json", report_data) print("✓ Identified:") print(f" Duplicate JobID_Raw groups: {len(duplicate_groups)}") print(f" Total duplicate documents: {total_duplicate_docs}") print(f" All documents involved in duplicates: {len(all_duplicate_doc_ids)}") return result def delete_duplicates( es: Elasticsearch, index: str, duplicate_groups: Dict, dry_run: bool = False, batch_size: int = 5000, ) -> Tuple[int, Set[str]]: """ Delete duplicate documents, keeping the most recent by timestamp. Args: es: Elasticsearch client index: Index name duplicate_groups: Result from identify_duplicates() dry_run: If True, only show what would be deleted batch_size: Number of documents to delete per batch Returns: Tuple of (number of documents deleted, set of deleted document IDs) """ print(f"\n{'=' * 60}") print(f"Deleting duplicates - {index}") print(f"{'=' * 60}") if dry_run: print("⚠ DRY RUN MODE - No deletions will be performed") delete_ids = [] for jobid_raw, group_info in duplicate_groups["duplicate_groups"].items(): for doc in group_info["to_delete"]: delete_ids.append(doc["id"]) deleted_ids_set = set(delete_ids) total_to_delete = len(delete_ids) print(f"Total documents to delete: {total_to_delete}") if total_to_delete == 0: print("✓ No duplicates to delete") return 0, set() if dry_run: print("\nSample document IDs that would be deleted (first 10):") for doc_id in delete_ids[:10]: print(f" - {doc_id}") print(f"⚠ DRY RUN - Would delete {total_to_delete} documents") return 0, deleted_ids_set response = input( f"\nAbout to delete {total_to_delete} documents from {index}. Continue? (yes/no): " ) if response.lower() != "yes": print("⚠ Deletion cancelled") raise RuntimeError("Operation cancelled by user") deleted_count = 0 failed_count = 0 print("Executing bulk delete...") for i in tqdm(range(0, len(delete_ids), batch_size), desc="Deleting batches"): batch = delete_ids[i : i + batch_size] actions = [ {"_op_type": "delete", "_index": index, "_id": doc_id} for doc_id in batch ] success, failed = bulk(es, actions, raise_on_error=False, stats_only=True) deleted_count += success if failed: failed_count += failed if isinstance(failed, int) else len(failed) print("\nRefreshing index...") es.indices.refresh(index=index) print("✓ Complete:") print(f" Total deleted: {deleted_count}") print(f" Failed: {failed_count}") return deleted_count, deleted_ids_set # ============================================================================ # MAIN CLI # ============================================================================ def main(): """Main CLI entry point""" import argparse parser = argparse.ArgumentParser( description="Elasticsearch cleanup tool for duplicates and UUID regeneration", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_missing_end_field uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_duplicates uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command fix --dry-run uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 index2 --command fix """, ) parser.add_argument( "--conf-path", required=True, help="Path to OKA YAML configuration file" ) parser.add_argument( "--indices", nargs="+", required=True, help="List of index names to process" ) parser.add_argument( "--command", choices=[ "identify_missing_end_field", "identify_duplicates", "fix", ], required=True, help="Command to execute", ) parser.add_argument( "--dry-run", action="store_true", help="Preview changes without executing" ) args = parser.parse_args() print(f"Loading OKA configuration from: {args.conf_path}") oka_config = OKAConfig.from_yaml(args.conf_path) es = create_es_client_from_oka(oka_config.elasticsearch) try: info = es.info() print(f"✓ Connected to Elasticsearch {info['version']['number']}") except Exception as e: print(f"✗ Failed to connect to Elasticsearch: {e}") sys.exit(1) # Initialize report manager once for all indices report_manager = ReportManager() for index in args.indices: try: print(f"\n{'#' * 60}") print(f"# Processing index: {index}") print(f"{'#' * 60}") if args.command == "identify_missing_end_field": identify_missing_end_field(es, index, report_manager) elif args.command == "identify_duplicates": identify_duplicates(es, index, report_manager) elif args.command == "fix": step1_result = identify_missing_end_field(es, index, report_manager) step2_result = identify_duplicates(es, index, report_manager) deleted, deleted_ids = delete_duplicates( es, index, step2_result, args.dry_run ) # Compute remaining id list after handling deletion of duplicates. remaining_ids = ( step1_result["document_ids"] | step2_result["all_duplicate_doc_ids"] ) - deleted_ids print(f"\nUUID Regeneration Target: {len(remaining_ids)} documents") if remaining_ids: regenerate_uuid(es, index, remaining_ids, args.dry_run) # Save result report report_manager.save( "result_report.json", { "index": index, "timestamp": datetime.now().isoformat(), "workflow": "fix", "results": { "missing_end_field": step1_result["total"], "duplicate_groups": step2_result["total_groups"], "duplicate_documents": step2_result["total_duplicate_docs"], "deleted": deleted, }, }, ) except KeyboardInterrupt: print("\n⚠ Interrupted by user") sys.exit(1) except RuntimeError as e: if "cancelled" in str(e).lower(): # User cancellation - no traceback print(f"\n✗ Error processing {index}: {e}") else: # Other RuntimeError - show traceback print(f"\n✗ Error processing {index}: {e}") import traceback traceback.print_exc() except Exception as e: # All other exceptions - show traceback print(f"\n✗ Error processing {index}: {e}") import traceback traceback.print_exc() print(f"\n{'=' * 60}") print("✓ All operations complete") print(f"✓ Reports saved to: {report_manager.report_dir}") print(f"{'=' * 60}") if __name__ == "__main__": main()
This program identifies and corrects Elasticsearch documents representing duplicated jobs or documents that could lead to duplicates in the future. It is only required for the
accountingindex therefore, you need to first access OKA and from the Cluster page, retrieveaccountingindex UID.Then, proceed to run the script:
cd ${OKA_INSTALL_DIR}/current/bin/tools/ # Check available options uv run handle_switch_from_jobid_to_jobidraw.py -h # Execute 'fix' command for the 2 given index: 'c663f880-0444-41b3-a5a5-ca0dc841dfee' and '8dd506ff-b858-41ea-9735-6169751d6902' uv run handle_switch_from_jobid_to_jobidraw.py --conf-path ${OKA_INSTALL_DIR}/config/oka/oka.yaml --command fix --indices c663f880-0444-41b3-a5a5-ca0dc841dfee 8dd506ff-b858-41ea-9735-6169751d6902Note
Use
--dry-runoption if you wish to see what will be done prior to fully executing thefixcommand.
reindex
Warning
The process of requesting a reindex on an Elasticsearch database will lead to the use of at least twice the size of the existing index. Make sure to have enough space available before starting the process.
Important
When reindexing, the script will use the index original configuration regarding the number of shard so that it keeps working the same way as before.
However, the number of replicas will be set to 0 to make the process faster. If there were replicas before, be advised that you will have to set them back by yourself.
Note
For clients dealing with PBS based job schedulers, this will be necessary to handle
resource_listandresources_usedtype of data in a more advanced way.For clients with data originally uploaded prior to version 2.9.1, this might be necessary to fix a potential computational error for a metric in the Resource module.
To accommodate changes in how OKA handles some columns internally, it is recommended that you proceed to reindex all your existing accounting index.
The following script should be saved in ${OKA_INSTALL_DIR}/current/bin/tools/ and can be used to handle this automatically for you once configured properly.
reindex_indices.py
""" Elasticsearch Reindexing Tool with Type Conversion and Field Restructuring This module handles migrating Elasticsearch indices while applying: • Type conversions based on OKA constants (int, float, date, string, etc.) • Field restructuring using object patterns (flattening fields into objects) • Automatic alias management (always query by original index name) WORKFLOW: 1. Source index exists: "xxxxx-yyyyy-" with old data 2. Create new index: "oka-<cluster_uid>-<custom_key>-<uuid>" 3. Reindex all documents, applying transformations 4. Delete source index 5. Create aliases "xxxxx-yyyyy-" and "oka-<cluster_uid>-<custom_key>" → points to "oka-<cluster_uid>-<custom_key>-<uuid>" 6. Query by original name; data served from new index transparently INSTALLATION: Place this file in: <oka_dir>/current/bin/tools USAGE: uv run reindex_indices.py --conf-path /path/to/oka.yaml CONFIGURATION: Edit the CONFIGURATION section below for: • INDICES_CONFIG: List of indices to process • BATCH_SIZE: Bulk operation batch size • OBJECT_PATTERNS: Field grouping rules • FIELD_TYPE_OVERRIDES: Type coercion overrides • REQUEST_TIMEOUT: Elasticsearch operation timeouts """ import sys import argparse import warnings from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Any import uuid from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk, scan # Import OKA configuration and constants sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "src")) from lib.tools.configuration import ElasticsearchConfig, OKAConfig from lib.common import oka_constants # ============================================================================ # CONFIGURATION SECTION - EDIT HERE # ============================================================================ # INDICES_CONFIG: List of tuples defining which indices to reindex # # Format: (cluster_uid, source_index_or_alias, custom_key) # # cluster_uid: # Identifier for the cluster. Find this on the OKA clusters page. # # source_index_or_alias: # The existing concrete index or alias you want to migrate. Find this on the OKA clusters page. # # custom_key: # Optional semantic label for the new index name. Makes naming clearer. # # EXAMPLES: # Single concrete index: # [("my_cluster", "old_index_uuid", "accounting")] # With alias: # [("my_cluster", "old_alias_name", "accounting")] # Multiple mixed: # [ # ("hpc_slurm", "index_uuid_1", "accounting"), # ("hpc_pbs", "my_alias_name", "accounting"), # ("hpc_lsf", "index_uuid_3", None), # ] INDICES_CONFIG: List[Tuple[str, str, Optional[str]]] = [ ("xyz_uid", "xxxxxxxx-yyyy-zzzz-wwww-aaaaaaaaaa", "accounting"), ] # OBJECT_PATTERNS: Field grouping rules # # Old schema (flat): resource_list_mem, resource_list_cpu, resource_list_disk # New schema (object): resource_list: {mem, cpu, disk} # # HOW IT WORKS: # For each field_name that starts with a pattern key: # • Extract the suffix (field_name - pattern) # • Move to parent object: parent[suffix] = value # • Remove original flat field # # Set to {} (empty dict) to disable grouping entirely. OBJECT_PATTERNS: Dict[str, str] = { "resource_list_": "resource_list", "resources_used_": "resources_used", } # FIELD_TYPE_OVERRIDES: Force specific fields to a type # # When OKA constants don't define a field type, or you want to override them, # add the field here with its desired type. # # Supported types: "int", "float", "date", "string", "object", "nested" # # EXAMPLE: # If you want to force "your_metric" to be treated as float: # {"your_metric": "float"} # # Leave empty {} if no overrides needed. # FIELD_TYPE_OVERRIDES: Dict[str, str] = {} # BATCH_SIZE: Number of documents to bulk insert per Elasticsearch request # # Higher values = fewer requests, but higher memory usage # Lower values = more requests, but lower memory per batch # BATCH_SIZE: int = 1000 # REQUEST_TIMEOUT: Maximum time (seconds) for individual Elasticsearch operations # # This applies to: # • Reindexing operations (scan, bulk inserts) # • Index creation/deletion # • Alias operations # # For large indices with millions of documents, increase this. # Formula: ~30-60 seconds per million documents is a safe estimate. # For 10M documents, use 5-10 minutes. # # Examples: # 300 seconds = 5 minutes (small indices, <1M docs) # 1800 seconds = 30 minutes (medium indices, 1-10M docs) # 10800 seconds = 3 hours (large indices, 10M+ docs) # REQUEST_TIMEOUT: int = 3 * 60 * 60 # 3 hours # ============================================================================ # END CONFIGURATION # ============================================================================ # Suppress warnings warnings.filterwarnings("ignore") def progress_message(current: int, total: int, desc: str = "") -> None: """ Print a simple progress message with percentage. Args: current: Current processed count total: Total items to process desc: Description prefix (e.g., "Reindexing") Example: progress_message(500, 10000, "Reindexing") # Output: "Reindexing 500/10000 (5.0%)" """ if total > 0: percent = (current / total) * 100 print(f"\r{desc} {current}/{total} ({percent:.1f}%)", end="", flush=True) class Reindexer: """ Handle Elasticsearch index reindexing with type conversion and field restructuring. This class orchestrates the entire reindexing workflow: 1. Validate/resolve source (concrete index or alias) 2. Create new index with naming pattern 3. Reindex documents with transformations 4. Delete source concrete index 5. Create aliases for backward compatibility Transformations applied during reindexing: • Field type conversion based on OKA constants • Field grouping/object restructuring • Handling of missing/null values Attributes: es (Elasticsearch): Elasticsearch client indices_config (List[Tuple[str, str, Optional[str]]]): Configuration for indices to process """ def __init__(self, es_client: Elasticsearch) -> None: """ Initialize the Reindexer. Args: es_client: Authenticated Elasticsearch client Raises: ValueError: If OBJECT_PATTERNS is invalid """ self.es: Elasticsearch = es_client self.indices_config: List[Tuple[str, str, Optional[str]]] = INDICES_CONFIG # Validate and display OBJECT_PATTERNS configuration if OBJECT_PATTERNS: print( f"\n✓ OBJECT_PATTERNS enabled with {len(OBJECT_PATTERNS)} grouping rules:" ) for pattern, parent in OBJECT_PATTERNS.items(): print(f" {pattern:20s} → {parent}") else: print("\n⚠ OBJECT_PATTERNS is empty - no field grouping will occur") @staticmethod def create_es_client_from_oka(es_config: ElasticsearchConfig) -> Elasticsearch: """ Create and configure an Elasticsearch client from OKA configuration. Handles: • Basic authentication (user/password) • SSL certificate verification • Custom CA certificates • Client certificate authentication Args: es_config: ElasticsearchConfig object from OKA configuration Returns: Configured Elasticsearch client Raises: Exception: If connection parameters are invalid """ from pydantic import SecretStr host: str = str(es_config.host) # Build connection parameters conn_params: Dict[str, Any] = { "hosts": [host], "verify_certs": es_config.verify_certs, "request_timeout": REQUEST_TIMEOUT, } # Add authentication if configured if es_config.user and es_config.password: password: str = ( es_config.password.get_secret_value() if isinstance(es_config.password, SecretStr) else es_config.password ) conn_params["basic_auth"] = (es_config.user, password) # Add SSL certificates if verification enabled if es_config.verify_certs: if es_config.ca_cert_path: conn_params["ca_certs"] = str(es_config.ca_cert_path) if es_config.client_cert_path: conn_params["client_cert"] = str(es_config.client_cert_path) if es_config.client_key_path: conn_params["client_key"] = str(es_config.client_key_path) # Suppress urllib3 SSL warnings (user chose to verify) import urllib3 urllib3.disable_warnings() return Elasticsearch(**conn_params) def generate_new_index_name( self, cluster_name: str, custom_key: Optional[str] = None ) -> str: """ Generate a new index name following the naming pattern. Pattern: "oka-{cluster_name}[-{custom_key}]-{uuid}" The UUID ensures uniqueness even if multiple reindexing runs occur. This allows for blue-green deployments if needed. Args: cluster_name: Cluster identifier (e.g., "my_cluster", "prod") custom_key: Optional semantic label (e.g., "accounting", "billing") Returns: Generated index name (e.g., "oka-my_cluster-accounting-ace9a3b5") Example: generate_new_index_name("prod", "accounting") # Returns: "oka-prod-accounting-f2e8c1a9" """ unique_id: str = str(uuid.uuid4())[:8] if custom_key: return f"oka-{cluster_name}-{custom_key}-{unique_id}" else: return f"oka-{cluster_name}-{unique_id}" def get_source_index_settings(self, index_name: str) -> Dict[str, Any]: """ Get settings from source index to replicate in target index. Args: index_name: Source index name Returns: Dict with number_of_shards from source index """ try: settings = self.es.indices.get_settings(index=index_name) source_settings = settings[index_name]["settings"]["index"] return { "number_of_shards": int(source_settings.get("number_of_shards", 4)), } except Exception as e: print(f"⚠ Warning: Could not get source settings: {e}") print(" Using configured INDEX_SETTINGS instead") return {"number_of_shards": 4} def create_target_index(self, index_name: str, source_index: str) -> None: """ Create target index with settings from source index. Args: index_name: New index name to create source_index: Source index to copy shard count from """ # Get shard count from source, keep replicas at 0 source_settings = self.get_source_index_settings(source_index) index_body = { "settings": { "number_of_shards": source_settings["number_of_shards"], "number_of_replicas": 0, # Always 0 during reindexing } } print(" Creating index with:") print(f" • number_of_shards: {source_settings['number_of_shards']}") print(" • number_of_replicas: 0") self.es.indices.create(index=index_name, body=index_body) print(f" ✓ Index created: {index_name}") def get_field_type(self, field_name: str) -> Optional[str]: """ Determine the data type of a field using OKA constants. OKA constants define field types across the system: • INTCOLS: Integer fields • FLOATCOLS: Float fields • STRCOLS: String fields • DATECOLS: Date fields • NESTEDCOLS: Nested object fields • OBJECTCOLS: Object fields Args: field_name: Name of the field to check Returns: Type string ("int", "float", "string", "date", "nested", "object") or None if field is not defined in OKA constants Example: get_field_type("cpu_count") # Returns: "int" get_field_type("memory_gb") # Returns: "float" get_field_type("job_name") # Returns: "string" """ if field_name in oka_constants.INTCOLS: return "int" elif field_name in oka_constants.FLOATCOLS: return "float" elif field_name in oka_constants.STRCOLS: return "string" elif field_name in oka_constants.DATECOLS: return "date" elif field_name in oka_constants.NESTEDCOLS: return "nested" elif field_name in oka_constants.OBJECTCOLS: return "object" return None def get_object_parent(self, field_name: str) -> Optional[str]: """ Check if a field should be grouped under a parent object. Uses OBJECT_PATTERNS to match field prefixes. Example matching: Pattern: "resource_list_" → Parent: "resource_list" Input field: "resource_list_mem" Output: "resource_list" (field will be grouped) Args: field_name: Field name to check Returns: Parent object name if pattern matches, None otherwise Example: OBJECT_PATTERNS = {"resource_list_": "resource_list"} get_object_parent("resource_list_mem") # Returns: "resource_list" get_object_parent("resource_list_cpu") # Returns: "resource_list" get_object_parent("job_name") # Returns: None """ if not OBJECT_PATTERNS: return None for pattern, parent in OBJECT_PATTERNS.items(): if field_name.startswith(pattern): return parent return None def convert_field_value( self, field_name: str, value: Any ) -> Tuple[Any, bool, Optional[str]]: """ Convert a field value to its correct type and determine object grouping. This method: 1. Checks if the field should be grouped under a parent object 2. Gets the field type from OKA constants (or overrides) 3. Converts the value to the correct type 4. Returns the converted value + grouping metadata Conversion logic: • None → None (passed through) • "int" → int(value) • "float" → float(value) • "date" → str(value) (dates stored as strings) • "object" → parsed dict (if stringified) or original value • Unknown → str(value) (default to string) On conversion error, the original value is returned unchanged. Args: field_name: Name of the field value: Value to convert Returns: Tuple of (converted_value, should_group_in_object, parent_object_name) • converted_value: Type-converted value • should_group_in_object: True if value will be nested under a parent • parent_object_name: Name of parent object ("resource_list", etc.) Example: convert_field_value("resource_list_mem", "2048") # Returns: (2048, True, "resource_list") # # This means: convert "2048" to int, and nest under resource_list # Result in transformed doc: {resource_list: {mem: 2048}} """ if value is None: return None, False, None # Step 1: Check if this field should be grouped into a parent object object_parent: Optional[str] = self.get_object_parent(field_name) should_group_as_object: bool = object_parent is not None # Step 2: Get the field type (from overrides or OKA constants) field_type: Optional[str] = FIELD_TYPE_OVERRIDES.get( field_name ) or self.get_field_type(field_name) # Step 3: Convert based on type try: if field_type == "int": return int(value), should_group_as_object, object_parent elif field_type == "float": return float(value), should_group_as_object, object_parent elif field_type == "date": return str(value), should_group_as_object, object_parent elif field_type == "object": # Objects can be stored in three forms in Elasticsearch: # # 1. STRINGIFIED: # '{"priority": 5, "owner": "alice"}' (string representation) # Need to parse back to dict for proper handling. # # 2. ALREADY A DICT: # {"priority": 5, "owner": "alice"} (native dict) # Use unchanged. # # 3. NESTED STRUCTURES : # {"metadata": {"priority": 5, "nested": {"level": 2}}} # These can be nested dicts inside dicts. # ast.literal_eval handles this correctly. # if isinstance(value, str): try: import ast # ast.literal_eval safely parses stringified representations. # Handles: {"a": 1}, {"a": {"b": 2}}, lists of objects, etc. return ( ast.literal_eval(value), should_group_as_object, object_parent, ) except Exception: # Parsing failed (malformed string, invalid syntax, etc.) # Better to keep original than crash. Elasticsearch will # handle it or log a type mismatch error—at least data isn't lost. return value, should_group_as_object, object_parent # Value is already a dict/object (including nested structures), # use it unchanged. Elasticsearch handles nested objects natively. return value, should_group_as_object, object_parent else: # Default: convert to string when type is unknown or not defined. # String is the safest fallback—ensures data is preserved and return str(value), should_group_as_object, object_parent except Exception: # On any conversion error, return original value unchanged return value, should_group_as_object, object_parent def transform_document(self, doc: Dict[str, Any]) -> Dict[str, Any]: """ Transform a document for the new index mapping. Transformation steps: 1. Extract document source 2. For each field: a. Convert type using convert_field_value() b. If grouping, nest under parent object c. Otherwise, add field as-is 3. Remove original flat fields that were grouped Example transformation: INPUT: { "resource_list_mem": "2048", "resource_list_cpu": "4", "job_id": "12345" } OUTPUT (with OBJECT_PATTERNS): { "resource_list": {"mem": 2048, "cpu": 4}, "job_id": "12345" } Args: doc: Elasticsearch document (dict with _id, _source, etc.) Returns: Transformed source dict ready for the new index Note: The transformation is applied in-place during the reindex operation. Original flat fields are not preserved. """ source: Dict[str, Any] = doc.get("_source", {}) transformed: Dict[str, Any] = {} grouped_fields: Set[str] = set() # Track which fields were grouped # Process each field in the source document for field_name, value in source.items(): # Convert value and get grouping metadata converted_value, should_group, object_parent = self.convert_field_value( field_name, value ) # Handle object grouping - nest under parent if applicable if should_group and object_parent: # Initialize parent object if not exists if object_parent not in transformed: transformed[object_parent] = {} # Extract subfield name by removing the pattern prefix for pattern in OBJECT_PATTERNS.keys(): if field_name.startswith(pattern): subfield_name: str = field_name[len(pattern) :] transformed[object_parent][subfield_name] = converted_value grouped_fields.add(field_name) break else: # Regular field - add to transformed document as-is transformed[field_name] = converted_value return transformed def get_alias_target(self, alias_name: str) -> Optional[str]: """ Get the concrete index that an alias currently points to. An alias is a virtual name that references one or more concrete indices. This function resolves the alias to its target. Args: alias_name: Name of the alias Returns: Concrete index name, or None if alias doesn't exist Example: get_alias_target("c0814ac7...") # Returns: "oka-my_cluster-accounting-ace9a3b5" # # Means the alias "c0814ac7..." points to that concrete index """ try: aliases: Dict[str, Any] = self.es.indices.get_alias(name=alias_name) # Get the first (usually only) index this alias points to if aliases: return list(aliases.keys())[0] except Exception: return None return None def resolve_to_concrete_index(self, index_or_alias: str) -> str: """ Resolve an index name or alias to a concrete index. If input is an alias, returns the concrete index it points to. If input is a concrete index, returns it unchanged. Args: index_or_alias: Index name or alias name Returns: Concrete index name Raises: ValueError: If index/alias doesn't exist or is invalid """ # Try to get as alias first alias_target: str = self.get_alias_target(index_or_alias) if alias_target is not None: print( f"Resolved alias '{index_or_alias}' → concrete index '{alias_target}'" ) return alias_target # Try to get as concrete index try: info: Dict[str, Any] = self.es.indices.get(index=index_or_alias) if index_or_alias in info: return index_or_alias except Exception as e: if "index_not_found" not in str(e): raise raise ValueError(f"Index or alias '{index_or_alias}' not found") def reindex( self, source_index: str, target_index: str, batch_size: int = 500, ) -> None: """ Reindex documents from source to target index with transformations. This is the core reindexing operation: 1. Get total document count 2. Scan documents from source (in batches to avoid memory overflow) 3. Transform each document (type conversion, field grouping) 4. Bulk insert into target index 5. Report progress and errors The scan operation uses Elasticsearch scrolling to efficiently handle large indices without loading everything into memory. Args: source_index: Source index name target_index: Target index name batch_size: Number of documents per bulk request (default: 500) Raises: Exception: If reindexing fails (connectivity, server error, etc.) Note: Target index must already exist with correct mappings. Errors during bulk insert are reported but don't stop the process. """ try: # Display configuration if OBJECT_PATTERNS: print(f"Using object patterns: {OBJECT_PATTERNS}") # Get total document count for progress tracking total_docs: int = self.es.count(index=source_index)["count"] print(f"Total documents to reindex: {total_docs}") if total_docs == 0: print("No documents to reindex") return # Scan and transform documents actions: List[Dict[str, Any]] = [] processed: int = 0 # Use scan to iterate through all documents efficiently # scroll="2m" means Elasticsearch keeps the search context open for 2 minutes for doc in scan( self.es, index=source_index, scroll="2m", request_timeout=REQUEST_TIMEOUT, ): processed += 1 progress_message(processed, total_docs, "Reindexing") # Transform document (type conversion, field grouping) transformed: Dict[str, Any] = self.transform_document(doc) # Create bulk action action: Dict[str, Any] = { "_index": target_index, "_id": doc["_id"], "_source": transformed, } actions.append(action) # Bulk insert when batch is full if len(actions) >= batch_size: success, failed = bulk( self.es, actions, raise_on_error=False, request_timeout=REQUEST_TIMEOUT, ) # Report any errors if failed: for f in failed: print(f"\nError: {f['error']['reason']}") actions = [] # Insert remaining documents if actions: success, failed = bulk( self.es, actions, raise_on_error=False, request_timeout=REQUEST_TIMEOUT, ) if failed: for f in failed: print(f"\nError: {f['error']['reason']}") print("\n✓ Reindex complete") except Exception as e: print(f"\n✗ Reindex failed: {e}") raise def process_all(self) -> None: """ Process all indices defined in INDICES_CONFIG. This is the main orchestration method. For each index in the config: WORKFLOW: [0/5] Resolve source (concrete index or alias) to concrete index name [1/5] Generate new index name (oka-{cluster}-{key}-{uuid}) [2/5] Reindex documents from source → new index (with transformations) [3/5] Delete source concrete index [4/5] Create aliases: - If source was alias: keep the alias name pointing to new index - If source was concrete: create alias from source name to new index - Always create: new index name (without UUID) → new index [5/5] Clients transparently query via alias (no code changes needed) On error, raises immediately. Partially processed indices are not rolled back. Raises: ValueError: If validation fails (source doesn't exist, etc.) Exception: If any operation fails (connectivity, Elasticsearch errors, etc.) """ for cluster_name, index_or_alias, custom_key in self.indices_config: print(f"\n{'=' * 70}") print(f"Processing cluster: {cluster_name}") if custom_key: print(f" custom key: {custom_key}") print(f" Source (index or alias): {index_or_alias}") print(f"{'=' * 70}") try: # Step 0: Resolve source to concrete index (handles both alias and concrete) print("\n[0/5] Resolving source...") concrete_index: str = self.resolve_to_concrete_index(index_or_alias) print(f" ✓ Concrete index: {concrete_index}") # Step 1: Generate new index name with UUID new_index: str = self.generate_new_index_name(cluster_name, custom_key) print(f"\n[1/5-1] New index name: {new_index}") # Create target index with same shard count, 0 replicas print("\n[1/5-2] Creating target index...") self.create_target_index(new_index, concrete_index) # Step 2: Reindex from source to new index print( f"\n[2/5] Reindexing documents from '{concrete_index}' → '{new_index}'" ) self.reindex( source_index=concrete_index, target_index=new_index, batch_size=BATCH_SIZE, ) # Step 3: Delete source concrete index (now safe) print(f"\n[3/5] Deleting source concrete index: {concrete_index}") self.es.indices.delete(index=concrete_index) print(" ✓ Concrete index deleted") # Step 4: Create alias(es) for backward compatibility print("\n[4/5] Creating alias(es):") # Alias 1: Use the original name (whether it was alias or concrete index) # This ensures clients can continue using the same name self.es.indices.put_alias(index=new_index, name=index_or_alias) print(f" ✓ Alias created: {index_or_alias} → {new_index}") # Alias 2: New index name (without UUID) for direct access if needed new_index_no_uuid: str = new_index.rsplit("-", 1)[0] self.es.indices.put_alias(index=new_index, name=new_index_no_uuid) print(f" ✓ Alias created: {new_index_no_uuid} → {new_index}") print("\n[5/5] ✓ Process complete") print(f" Clients query using: '{index_or_alias}' (alias)") print(f" Data stored in: '{new_index}' (concrete index)") except ValueError as e: # User/validation errors print("\n ✗ VALIDATION ERROR") print(f" {str(e)}") raise except Exception as e: # Operational errors print(f"\n ✗ Process failed: {str(e)}") raise def main() -> None: """ Main entry point for the reindexing tool. Workflow: 1. Parse command-line arguments 2. Load OKA configuration from YAML file 3. Create Elasticsearch client 4. Initialize reindexer with configuration 5. Process all configured indices 6. Exit with appropriate status code Command-line Arguments: --conf-path (required): Path to oka.yaml configuration file Exit Codes: 0: Success 1: Configuration or validation error 130: Interrupted by user (Ctrl+C) 1: Unexpected error """ parser = argparse.ArgumentParser( description="Reindex Elasticsearch indices with type conversion and field restructuring" ) parser.add_argument( "--conf-path", required=True, help="Path to OKA configuration file (oka.yaml)", ) args = parser.parse_args() try: # Load OKA configuration oka_config: OKAConfig = OKAConfig.from_yaml(args.conf_path) es_client: Elasticsearch = Reindexer.create_es_client_from_oka( oka_config.elasticsearch ) # Initialize reindexer with configuration reindexer: Reindexer = Reindexer(es_client) # Process all configured indices reindexer.process_all() except ValueError as e: # User errors (validation, config, etc) - show message only print(f"\n✗ ERROR: {str(e)}", file=sys.stderr) sys.exit(1) except KeyboardInterrupt: # User interrupted (Ctrl+C) print("\n\n✗ Process interrupted by user", file=sys.stderr) sys.exit(130) except Exception as e: # Unexpected errors - show traceback print(f"\n✗ Unexpected error: {str(e)}", file=sys.stderr) import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()
This program aims to reindex the
accountingindex only. Therefore you need to first access OKA and from the Cluster page, retrieve two things:The value of the field
Cluster UID.The Elasticsearch
accountingindex UID.
Then, open the script in a text editor and update the
INDICES_CONFIGlist to specify required info regarding each impacted Cluster as follows:INDICES_CONFIG: List[Tuple[str, str, Optional[str]]] = [ ("my_cluster_name_uid", "8dd506ff-b858-41ea-9735-6169751d6902", "accounting"), ]
Finally, proceed to run the script:
cd ${OKA_INSTALL_DIR}/current/bin/tools/ # Check available options uv run reindex_indices.py -h # Execute reindex uv run reindex_indices.py --conf-path ${OKA_INSTALL_DIR}/config/oka/oka.yaml