OKA

This section presents how to proceed with an update/upgrade of OKA.


Update

Obtaining latest OKA package

OKA package can be downloaded from UCit website:

$> wget --user <USERNAME> --password <PASSWORD> -O oka-X.Y.Z.run https://ucit.fr/product-publish/oka/package/latest.php?file=run

Please contact UCit Support to obtain your <USERNAME> and <PASSWORD> to access the package. Replace X.Y.Z by the version of OKA you want to download.

Prior to any update, you should check the SHA256 of the installer to ensure that it hasn’t been tampered with. To do so, first download the oka-X.Y.Z.sha256 file and run the shasum utility:

$> wget --user <USERNAME> --password <PASSWORD> -O oka-X.Y.Z.sha256 https://ucit.fr/product-publish/oka/package/latest.php?file=sha256
$> shasum -c oka-X.Y.Z.sha256
oka-X.Y.Z.sha256: OK

Requirements

Prior to going further, make sure that all requirements are up to date with the version you are trying to install by checking the Requirements page.

Upgrading from v2 to v3

When proceeding with an upgrade to the v3 major version of OKA pay attention to PostgreSQL and Elasticsearch as the supported versions have changed.

  • Upgrade of PostgreSQL to a version >=14 is mandatory. See Migrating to latest version.

  • Upgrade of Elasticsearch to a version 9.x is recommended. See Migrating from Elasticsearch 7 to Elasticsearch 9.

    Warning

    Elasticsearch 7.x official supports ends in January 2026 therefore, no further update will be made to ensure OKA’s compatibility in futur releases. It is highly recommended that you proceed with an upgrade to Elasticsearch 9.x.


Backup

It is recommended that you do a backup of both PostgreSQL and Elasticsearch databases before updating OKA to a new version.

Procedure for both database are available through the following links:


Update installation

Warning

  • The installer must have internet access to allow the download of python packages requirements.

  • The installer needs to be executed by either the root user or a user with sudo privileges.

OKA installer comes with a set of predefined command line options available. For example, to access the help, simply run:

./oka-X.Y.Z.run -h

When updating or upgrading OKA, the existing configuration will be used for the newly installed version. Only the mandatory missing parameters will be asked to the user in interactive mode. However, in batch mode, the installation will be stopped if something is missing. To avoid this, you can either update the existing conf before running the installer or, provide a new configuration file directly (see Batch mode).

Important

When upgrading from v2 to v3, you will have to provide a valid v3 license file.

If you do not have a license file yet, please contact UCit Support or your OKA reseller.

For more information regarding the way OKA handles its license, see License

Interactive mode

The installer oka-X.Y.Z.run will guide you through the installation of OKA on your device. Start the process by calling:

./oka-X.Y.Z.run

Batch mode

OKA can also be installed in a non-interactive way. To do so, you need to specify the required parameter through a configuration file that contains all the configuration variables of OKA. This file should match exactly the content of an oka.yaml file generated by the installer in ${OKA_INSTALL_DIR}/config/oka/oka.yaml (see OKA Parameter Reference).

To run the installer in batch mode, you need to run the following command line:

./oka-X.Y.Z.run -b -c <PATH_TO_YAML_CONF_FILE> -i ${OKA_INSTALL_DIR}

with:

  • -b: activation of the batch installation mode

  • -c <PATH_TO_YAML_CONF_FILE>: path to configuration file (see OKA Parameter Reference)

  • -i ${OKA_INSTALL_DIR}: path to install directory


Post update

The following section present the different actions that might be required post update/upgrade.

Upgrading from v2 to v3

After an upgrade from v2 to v3 depending on your own context, you might have to execute one or more additional script in order to clean-up your existing data.

Warning

The following actions might end-up taking a lot of time depending on the size of your Elasticsearch database. We recommend to execute them using tools such as tmux to avoid untimely deconnections during the process.

Cleanup

Note

For clients dealing with Slurm job schedulers, this will be necessary to remove potentially duplicated jobs due to previously misinterprated job array main entry.

To accommodate changes in how OKA handles some columns internally, it is recommended that you proceed to a cleanup for all your existing accounting index.

The following script should be saved in ${OKA_INSTALL_DIR}/current/bin/tools/ and can be used to handle this automatically for you once configured properly.

handle_switch_from_jobid_to_jobidrawn.py
# ============================================================================
# Elasticsearch cleanup module for duplicates and UUID regeneration.
#
# This module can be used as a standalone script or imported as a package.
# Reports are automatically saved to timestamped folders (report_es_<timestamp>/).
#
# Installation:
#     Place this script in: <oka_dir>/current/bin/tools
#
# Commands:
#     identify_missing_end_field  - Identify documents without 'End' field
#     identify_duplicates         - Identify duplicate JobID_Raw documents
#     fix                         - Execute all operations (identify missing, identify duplicates, delete, regenerate UUID)
#
# Options:
#     --conf-path PATH                          Path to OKA YAML configuration file (required)
#     --indices NAMES                           List of Elasticsearch index names to process (required)
#     --command {identify_missing_end_field,
#                identify_duplicates,
#                fix}          Command to execute (required)
#     --dry-run                                 Preview changes without executing them
#
# Usage Examples:
#
#     # Identify documents without 'End' field
#     uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_missing_end_field
#
#     # Identify duplicates
#     uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_duplicates
#
#     # Identify jobs duplicated or without 'End' field, delete duplicates, generate new uuid to fix jobs.
#     uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command fix
#
#     # Dry run mode (preview without executing)
#     uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command fix --dry-run
#
#     # Multiple indices
#     uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 index2 index3 --command fix
#
# Reports Generated:
#     - identified_missing_end_field.json: Documents without End field
#     - identified_duplicates.json: Duplicate JobID_Raw groups
#     - result_report.json: Final workflow summary (only with 'all' command)
# ============================================================================

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan
from datetime import datetime
from typing import Dict, Set, Optional, Tuple
import sys
import warnings
from pathlib import Path
from tqdm import tqdm
import json

# Suppress Elasticsearch warnings
warnings.filterwarnings("ignore", category=Warning, module="elasticsearch")

# Import OKA configuration classes
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "src"))
from lib.tools.configuration import ElasticsearchConfig, OKAConfig


# ============================================================================
# HELPERS
# ============================================================================
class ReportManager:
    """Report manager for timestamped reports"""

    def __init__(self, base_dir: Path = Path.cwd()):
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.report_dir = base_dir / f"report_es_{timestamp}"
        self.report_dir.mkdir(parents=True, exist_ok=True)

    def save(self, filename: str, data: Dict) -> None:
        """Save report to timestamped directory"""
        path = self.report_dir / filename
        with open(path, "w") as f:
            json.dump(data, f, indent=2, default=str)
        print(f"✓ Saved: {path}")


def create_es_client_from_oka(es_config: ElasticsearchConfig) -> Elasticsearch:
    """Create Elasticsearch client from OKA configuration"""
    from pydantic import SecretStr

    host = str(es_config.host)

    conn_params = {
        "hosts": [host],
        "verify_certs": es_config.verify_certs,
    }

    # Add authentication if provided
    if es_config.user and es_config.password:
        password = (
            es_config.password.get_secret_value()
            if isinstance(es_config.password, SecretStr)
            else es_config.password
        )
        conn_params["basic_auth"] = (es_config.user, password)

    # Only add TLS options if verify_certs is enabled
    if es_config.verify_certs:
        if es_config.ca_cert_path:
            conn_params["ca_certs"] = str(es_config.ca_cert_path)

        if es_config.client_cert_path:
            conn_params["client_cert"] = str(es_config.client_cert_path)

        if es_config.client_key_path:
            conn_params["client_key"] = str(es_config.client_key_path)

    # Ignore es warnings
    warnings.filterwarnings("ignore")
    import urllib3

    urllib3.disable_warnings()

    return Elasticsearch(**conn_params)


def regenerate_uuid(
    es: Elasticsearch,
    index: str,
    document_ids: Optional[Set[str]] = None,
    dry_run: bool = False,
    batch_size: int = 5000,
) -> int:
    """
    Regenerate UUID field for specified documents or all documents.

    Args:
        es: Elasticsearch client
        index: Index name
        document_ids: Set of specific document IDs to update (None = all documents)
        dry_run: If True, only show what would be updated
        batch_size: Number of documents to update per batch

    Returns:
        Number of documents updated
    """
    print(f"\n{'=' * 60}")
    print(f"Regenerating UUID - {index}")
    print(f"{'=' * 60}")

    if dry_run:
        print("⚠ DRY RUN MODE - No updates will be performed")

    if document_ids:
        print(f"Updating {len(document_ids)} specific documents")
        query = {"query": {"ids": {"values": list(document_ids)}}}
        total_docs = len(document_ids)
    else:
        print("Updating ALL documents in index")
        query = {"query": {"match_all": {}}}
        count_result = es.count(index=index)
        total_docs = count_result["count"]

    if dry_run:
        sample_docs = es.search(
            index=index,
            body={
                "size": 5,
                "query": query["query"],
                "_source": ["JobID_Raw", "Submit", "uuid", "timestamp"],
            },
        )

        print("\nSample UUID transformations:")
        for doc in sample_docs["hits"]["hits"]:
            source = doc["_source"]
            old_uuid = source.get("uuid", "N/A")
            try:
                new_uuid = generate_uuid_from_source(source)
                print(f"\n  Document ID: {doc['_id']}")
                print(f"    JobID_Raw: {source.get('JobID_Raw')}")
                print(f"    Submit: {source.get('Submit')}")
                print(f"    Old UUID: {old_uuid}")
                print(f"    New UUID: {new_uuid}")
                print(f"    Changed: {old_uuid != new_uuid}")
            except ValueError as e:
                print(f"\n  Document ID: {doc['_id']}")
                print(f"    Error: {e}")

        print(f"\n⚠ DRY RUN - Would update {total_docs} documents")
        return 0

    updated_count = 0
    failed_count = 0
    update_ops = []

    print("Scanning and updating documents...")
    for doc in tqdm(
        scan(es, index=index, query=query, _source=["JobID_Raw", "Submit"]),
        total=total_docs,
        desc="Updating",
    ):
        try:
            new_uuid = generate_uuid_from_source(doc["_source"])

            update_ops.append(
                {
                    "_op_type": "update",
                    "_index": index,
                    "_id": doc["_id"],
                    "doc": {"uuid": new_uuid},
                }
            )

            if len(update_ops) >= batch_size:
                success, failed = bulk(
                    es, update_ops, raise_on_error=False, stats_only=True
                )
                updated_count += success
                if failed:
                    failed_count += failed if isinstance(failed, int) else len(failed)
                update_ops = []
        except ValueError as e:
            print(f"\n⚠ Warning: Skipping document {doc['_id']}: {e}")
            failed_count += 1

    if update_ops:
        success, failed = bulk(es, update_ops, raise_on_error=False, stats_only=True)
        updated_count += success
        if failed:
            failed_count += failed if isinstance(failed, int) else len(failed)

    print("\nRefreshing index...")
    es.indices.refresh(index=index)

    print("✓ Complete:")
    print(f"  Total updated: {updated_count}")
    print(f"  Failed: {failed_count}")

    return updated_count


def generate_uuid_from_source(source: Dict) -> str:
    """
    Generate UUID by concatenating JobID_Raw and Submit fields.

    Args:
        source: Document source dictionary

    Returns:
        Generated UUID string
    """
    column_list = ["JobID_Raw", "Submit"]

    # Check if all specified columns exist in the source
    missing_columns = [col for col in column_list if col not in source]
    if missing_columns:
        raise ValueError(
            f"Missing required columns for UUID generation: {missing_columns}"
        )

    # Create UUID by concatenating all specified columns
    uuid = "_".join(str(source[col]) for col in column_list)

    return uuid


# ============================================================================
# COMMAND FUNCTIONS
# ============================================================================
def identify_missing_end_field(
    es: Elasticsearch, index: str, report_manager: ReportManager
) -> Dict:
    """
    Identify all documents that don't have an 'End' field value.

    Args:
        es: Elasticsearch client
        index: Index name
        report_manager: ReportManager instance for saving reports

    Returns:
        Dictionary with results and document IDs
    """
    print(f"\n{'=' * 60}")
    print(f"Identifying documents without 'End' field - {index}")
    print(f"{'=' * 60}")

    # Query for documents where End field doesn't exist
    query = {"query": {"bool": {"must_not": {"exists": {"field": "End"}}}}}

    count_result = es.count(index=index, body=query)
    total_docs = count_result["count"]

    print(f"Found {total_docs} documents without 'End' field")

    if total_docs == 0:
        result = {"total": 0, "document_ids": set(), "samples": []}
        return result

    doc_ids = set()
    samples = []

    print("Scanning documents...")
    for doc in tqdm(
        scan(
            es,
            index=index,
            query=query,
            _source=["JobID_Raw", "Submit", "timestamp", "uuid"],
        ),
        total=total_docs,
        desc="Scanning",
    ):
        doc_ids.add(doc["_id"])

        samples.append({"id": doc["_id"], "source": doc["_source"]})

    result = {"total": total_docs, "document_ids": doc_ids, "samples": samples}

    report_data = {
        "index": index,
        "timestamp": datetime.now().isoformat(),
        "total_documents": total_docs,
        "document_ids": list(doc_ids),
        "samples": samples,
    }
    report_manager.save("identified_missing_end_field.json", report_data)

    print(f"✓ Identified: {total_docs} documents without 'End' field")

    return result


def identify_duplicates(
    es: Elasticsearch, index: str, report_manager: ReportManager
) -> Dict:
    """
    Identify all documents with duplicate JobID_Raw values.

    Args:
        es: Elasticsearch client
        index: Index name
        report_manager: ReportManager instance for saving reports

    Returns:
        Dictionary with duplicate groups and document IDs
    """
    print(f"\n{'=' * 60}")
    print(f"Identifying duplicates by JobID_Raw - {index}")
    print(f"{'=' * 60}")

    duplicate_groups = {}
    all_duplicate_doc_ids = set()
    after = None
    total_duplicate_docs = 0

    while True:
        # Use cardinality instead of value_count on _id to avoid the deprecation warning
        query = {
            "size": 0,
            "aggs": {
                "jobid_groups": {
                    "composite": {
                        "size": 10000,
                        "sources": [
                            {"jobid_raw": {"terms": {"field": "JobID_Raw.keyword"}}}
                        ],
                    },
                    "aggs": {
                        "all_docs": {
                            "top_hits": {
                                "size": 100,
                                "sort": [{"timestamp": {"order": "desc"}}],
                                "_source": [
                                    "timestamp",
                                    "Submit",
                                    "uuid",
                                    "JobID",
                                    "JobID_Raw",
                                    "End",
                                ],
                            }
                        }
                    },
                }
            },
        }

        if after:
            query["aggs"]["jobid_groups"]["composite"]["after"] = after

        result = es.search(index=index, body=query)
        buckets = result["aggregations"]["jobid_groups"]["buckets"]

        if not buckets:
            break

        for bucket in buckets:
            # Use the actual number of documents from top_hits instead of value_count
            docs = bucket["all_docs"]["hits"]["hits"]
            doc_count = len(docs)

            if doc_count > 1:
                jobid_raw = bucket["key"]["jobid_raw"]

                for doc in docs:
                    all_duplicate_doc_ids.add(doc["_id"])

                duplicate_groups[jobid_raw] = {
                    "total_docs": doc_count,
                    "keeper": {
                        "id": docs[0]["_id"],
                        "timestamp": docs[0]["_source"].get("timestamp"),
                        "source": docs[0]["_source"],
                    },
                    "to_delete": [
                        {
                            "id": doc["_id"],
                            "timestamp": doc["_source"].get("timestamp"),
                            "source": doc["_source"],
                        }
                        for doc in docs[1:]
                    ],
                }

                total_duplicate_docs += doc_count - 1

        after = result["aggregations"]["jobid_groups"].get("after_key")
        print(
            f"  Processed batch... Found {len(duplicate_groups)} duplicate groups so far"
        )

    result = {
        "total_groups": len(duplicate_groups),
        "total_duplicate_docs": total_duplicate_docs,
        "duplicate_groups": duplicate_groups,
        "all_duplicate_doc_ids": all_duplicate_doc_ids,
    }

    report_data = {
        "index": index,
        "timestamp": datetime.now().isoformat(),
        "total_duplicate_groups": len(duplicate_groups),
        "total_duplicate_documents": total_duplicate_docs,
        "duplicate_groups": {
            k: {
                "total_docs": v["total_docs"],
                "keeper_id": v["keeper"]["id"],
                "keeper_timestamp": v["keeper"]["timestamp"],
                "delete_ids": [d["id"] for d in v["to_delete"]],
            }
            for k, v in list(duplicate_groups.items())[:100]
        },
    }
    report_manager.save("identified_duplicates.json", report_data)

    print("✓ Identified:")
    print(f"  Duplicate JobID_Raw groups: {len(duplicate_groups)}")
    print(f"  Total duplicate documents: {total_duplicate_docs}")
    print(f"  All documents involved in duplicates: {len(all_duplicate_doc_ids)}")

    return result


def delete_duplicates(
    es: Elasticsearch,
    index: str,
    duplicate_groups: Dict,
    dry_run: bool = False,
    batch_size: int = 5000,
) -> Tuple[int, Set[str]]:
    """
    Delete duplicate documents, keeping the most recent by timestamp.

    Args:
        es: Elasticsearch client
        index: Index name
        duplicate_groups: Result from identify_duplicates()
        dry_run: If True, only show what would be deleted
        batch_size: Number of documents to delete per batch

    Returns:
        Tuple of (number of documents deleted, set of deleted document IDs)
    """
    print(f"\n{'=' * 60}")
    print(f"Deleting duplicates - {index}")
    print(f"{'=' * 60}")

    if dry_run:
        print("⚠ DRY RUN MODE - No deletions will be performed")

    delete_ids = []
    for jobid_raw, group_info in duplicate_groups["duplicate_groups"].items():
        for doc in group_info["to_delete"]:
            delete_ids.append(doc["id"])

    deleted_ids_set = set(delete_ids)
    total_to_delete = len(delete_ids)
    print(f"Total documents to delete: {total_to_delete}")

    if total_to_delete == 0:
        print("✓ No duplicates to delete")
        return 0, set()

    if dry_run:
        print("\nSample document IDs that would be deleted (first 10):")
        for doc_id in delete_ids[:10]:
            print(f"  - {doc_id}")
        print(f"⚠ DRY RUN - Would delete {total_to_delete} documents")
        return 0, deleted_ids_set

    response = input(
        f"\nAbout to delete {total_to_delete} documents from {index}. Continue? (yes/no): "
    )
    if response.lower() != "yes":
        print("⚠ Deletion cancelled")
        raise RuntimeError("Operation cancelled by user")

    deleted_count = 0
    failed_count = 0

    print("Executing bulk delete...")
    for i in tqdm(range(0, len(delete_ids), batch_size), desc="Deleting batches"):
        batch = delete_ids[i : i + batch_size]

        actions = [
            {"_op_type": "delete", "_index": index, "_id": doc_id} for doc_id in batch
        ]

        success, failed = bulk(es, actions, raise_on_error=False, stats_only=True)
        deleted_count += success
        if failed:
            failed_count += failed if isinstance(failed, int) else len(failed)

    print("\nRefreshing index...")
    es.indices.refresh(index=index)

    print("✓ Complete:")
    print(f"  Total deleted: {deleted_count}")
    print(f"  Failed: {failed_count}")

    return deleted_count, deleted_ids_set


# ============================================================================
# MAIN CLI
# ============================================================================
def main():
    """Main CLI entry point"""
    import argparse

    parser = argparse.ArgumentParser(
        description="Elasticsearch cleanup tool for duplicates and UUID regeneration",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
                Examples:
                uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_missing_end_field
                uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command identify_duplicates
                uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 --command fix --dry-run
                uv run handle_switch_from_jobid_to_jobidraw.py --conf-path <oka_dir>/config/oka/oka.yaml --indices index1 index2 --command fix
                """,
    )

    parser.add_argument(
        "--conf-path", required=True, help="Path to OKA YAML configuration file"
    )
    parser.add_argument(
        "--indices", nargs="+", required=True, help="List of index names to process"
    )
    parser.add_argument(
        "--command",
        choices=[
            "identify_missing_end_field",
            "identify_duplicates",
            "fix",
        ],
        required=True,
        help="Command to execute",
    )
    parser.add_argument(
        "--dry-run", action="store_true", help="Preview changes without executing"
    )

    args = parser.parse_args()

    print(f"Loading OKA configuration from: {args.conf_path}")
    oka_config = OKAConfig.from_yaml(args.conf_path)
    es = create_es_client_from_oka(oka_config.elasticsearch)

    try:
        info = es.info()
        print(f"✓ Connected to Elasticsearch {info['version']['number']}")
    except Exception as e:
        print(f"✗ Failed to connect to Elasticsearch: {e}")
        sys.exit(1)

    # Initialize report manager once for all indices
    report_manager = ReportManager()

    for index in args.indices:
        try:
            print(f"\n{'#' * 60}")
            print(f"# Processing index: {index}")
            print(f"{'#' * 60}")

            if args.command == "identify_missing_end_field":
                identify_missing_end_field(es, index, report_manager)

            elif args.command == "identify_duplicates":
                identify_duplicates(es, index, report_manager)

            elif args.command == "fix":
                step1_result = identify_missing_end_field(es, index, report_manager)
                step2_result = identify_duplicates(es, index, report_manager)
                deleted, deleted_ids = delete_duplicates(
                    es, index, step2_result, args.dry_run
                )

                # Compute remaining id list after handling deletion of duplicates.
                remaining_ids = (
                    step1_result["document_ids"] | step2_result["all_duplicate_doc_ids"]
                ) - deleted_ids
                print(f"\nUUID Regeneration Target: {len(remaining_ids)} documents")

                if remaining_ids:
                    regenerate_uuid(es, index, remaining_ids, args.dry_run)

                # Save result report
                report_manager.save(
                    "result_report.json",
                    {
                        "index": index,
                        "timestamp": datetime.now().isoformat(),
                        "workflow": "fix",
                        "results": {
                            "missing_end_field": step1_result["total"],
                            "duplicate_groups": step2_result["total_groups"],
                            "duplicate_documents": step2_result["total_duplicate_docs"],
                            "deleted": deleted,
                        },
                    },
                )

        except KeyboardInterrupt:
            print("\n⚠ Interrupted by user")
            sys.exit(1)
        except RuntimeError as e:
            if "cancelled" in str(e).lower():
                # User cancellation - no traceback
                print(f"\n✗ Error processing {index}: {e}")
            else:
                # Other RuntimeError - show traceback
                print(f"\n✗ Error processing {index}: {e}")
                import traceback

                traceback.print_exc()
        except Exception as e:
            # All other exceptions - show traceback
            print(f"\n✗ Error processing {index}: {e}")
            import traceback

            traceback.print_exc()

    print(f"\n{'=' * 60}")
    print("✓ All operations complete")
    print(f"✓ Reports saved to: {report_manager.report_dir}")
    print(f"{'=' * 60}")


if __name__ == "__main__":
    main()
  1. This program identifies and corrects Elasticsearch documents representing duplicated jobs or documents that could lead to duplicates in the future. It is only required for the accounting index therefore, you need to first access OKA and from the Cluster page, retrieve accounting index UID.

  2. Then, proceed to run the script:

    cd ${OKA_INSTALL_DIR}/current/bin/tools/
    # Check available options
    uv run handle_switch_from_jobid_to_jobidraw.py -h
    # Execute 'fix' command for the 2 given index: 'c663f880-0444-41b3-a5a5-ca0dc841dfee' and '8dd506ff-b858-41ea-9735-6169751d6902'
    uv run handle_switch_from_jobid_to_jobidraw.py --conf-path ${OKA_INSTALL_DIR}/config/oka/oka.yaml --command fix --indices c663f880-0444-41b3-a5a5-ca0dc841dfee 8dd506ff-b858-41ea-9735-6169751d6902
    

    Note

    Use --dry-run option if you wish to see what will be done prior to fully executing the fix command.

reindex

Warning

The process of requesting a reindex on an Elasticsearch database will lead to the use of at least twice the size of the existing index. Make sure to have enough space available before starting the process.

Important

When reindexing, the script will use the index original configuration regarding the number of shard so that it keeps working the same way as before.

However, the number of replicas will be set to 0 to make the process faster. If there were replicas before, be advised that you will have to set them back by yourself.

Note

  • For clients dealing with PBS based job schedulers, this will be necessary to handle resource_list and resources_used type of data in a more advanced way.

  • For clients with data originally uploaded prior to version 2.9.1, this might be necessary to fix a potential computational error for a metric in the Resource module.

To accommodate changes in how OKA handles some columns internally, it is recommended that you proceed to reindex all your existing accounting index.

The following script should be saved in ${OKA_INSTALL_DIR}/current/bin/tools/ and can be used to handle this automatically for you once configured properly.

reindex_indices.py
"""
Elasticsearch Reindexing Tool with Type Conversion and Field Restructuring

This module handles migrating Elasticsearch indices while applying:
  • Type conversions based on OKA constants (int, float, date, string, etc.)
  • Field restructuring using object patterns (flattening fields into objects)
  • Automatic alias management (always query by original index name)

WORKFLOW:
  1. Source index exists: "xxxxx-yyyyy-" with old data
  2. Create new index: "oka-<cluster_uid>-<custom_key>-<uuid>"
  3. Reindex all documents, applying transformations
  4. Delete source index
  5. Create aliases "xxxxx-yyyyy-" and "oka-<cluster_uid>-<custom_key>" → points to "oka-<cluster_uid>-<custom_key>-<uuid>"
  6. Query by original name; data served from new index transparently

INSTALLATION:
  Place this file in: <oka_dir>/current/bin/tools

USAGE:
  uv run reindex_indices.py --conf-path /path/to/oka.yaml

CONFIGURATION:
  Edit the CONFIGURATION section below for:
    • INDICES_CONFIG: List of indices to process
    • BATCH_SIZE: Bulk operation batch size
    • OBJECT_PATTERNS: Field grouping rules
    • FIELD_TYPE_OVERRIDES: Type coercion overrides
    • REQUEST_TIMEOUT: Elasticsearch operation timeouts
"""

import sys
import argparse
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Any
import uuid

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, scan

# Import OKA configuration and constants
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "src"))
from lib.tools.configuration import ElasticsearchConfig, OKAConfig
from lib.common import oka_constants


# ============================================================================
# CONFIGURATION SECTION - EDIT HERE
# ============================================================================

# INDICES_CONFIG: List of tuples defining which indices to reindex
#
# Format: (cluster_uid, source_index_or_alias, custom_key)
#
# cluster_uid:
#   Identifier for the cluster. Find this on the OKA clusters page.
#
# source_index_or_alias:
#   The existing concrete index or alias you want to migrate. Find this on the OKA clusters page.
#
# custom_key:
#   Optional semantic label for the new index name. Makes naming clearer.
#
# EXAMPLES:
#   Single concrete index:
#     [("my_cluster", "old_index_uuid", "accounting")]
#   With alias:
#     [("my_cluster", "old_alias_name", "accounting")]
#   Multiple mixed:
#     [
#       ("hpc_slurm", "index_uuid_1", "accounting"),
#       ("hpc_pbs", "my_alias_name", "accounting"),
#       ("hpc_lsf", "index_uuid_3", None),
#     ]
INDICES_CONFIG: List[Tuple[str, str, Optional[str]]] = [
    ("xyz_uid", "xxxxxxxx-yyyy-zzzz-wwww-aaaaaaaaaa", "accounting"),
]


# OBJECT_PATTERNS: Field grouping rules
#
#   Old schema (flat): resource_list_mem, resource_list_cpu, resource_list_disk
#   New schema (object): resource_list: {mem, cpu, disk}
#
# HOW IT WORKS:
#   For each field_name that starts with a pattern key:
#     • Extract the suffix (field_name - pattern)
#     • Move to parent object: parent[suffix] = value
#     • Remove original flat field
#
# Set to {} (empty dict) to disable grouping entirely.
OBJECT_PATTERNS: Dict[str, str] = {
    "resource_list_": "resource_list",
    "resources_used_": "resources_used",
}

# FIELD_TYPE_OVERRIDES: Force specific fields to a type
#
# When OKA constants don't define a field type, or you want to override them,
# add the field here with its desired type.
#
# Supported types: "int", "float", "date", "string", "object", "nested"
#
# EXAMPLE:
#   If you want to force "your_metric" to be treated as float:
#     {"your_metric": "float"}
#
# Leave empty {} if no overrides needed.
#
FIELD_TYPE_OVERRIDES: Dict[str, str] = {}


# BATCH_SIZE: Number of documents to bulk insert per Elasticsearch request
#
# Higher values = fewer requests, but higher memory usage
# Lower values = more requests, but lower memory per batch
#
BATCH_SIZE: int = 1000

# REQUEST_TIMEOUT: Maximum time (seconds) for individual Elasticsearch operations
#
# This applies to:
#   • Reindexing operations (scan, bulk inserts)
#   • Index creation/deletion
#   • Alias operations
#
# For large indices with millions of documents, increase this.
# Formula: ~30-60 seconds per million documents is a safe estimate.
# For 10M documents, use 5-10 minutes.
#
# Examples:
#   300 seconds = 5 minutes (small indices, <1M docs)
#   1800 seconds = 30 minutes (medium indices, 1-10M docs)
#   10800 seconds = 3 hours (large indices, 10M+ docs)
#
REQUEST_TIMEOUT: int = 3 * 60 * 60  # 3 hours

# ============================================================================
# END CONFIGURATION
# ============================================================================

# Suppress warnings
warnings.filterwarnings("ignore")


def progress_message(current: int, total: int, desc: str = "") -> None:
    """
    Print a simple progress message with percentage.

    Args:
        current: Current processed count
        total: Total items to process
        desc: Description prefix (e.g., "Reindexing")

    Example:
        progress_message(500, 10000, "Reindexing")
        # Output: "Reindexing 500/10000 (5.0%)"
    """
    if total > 0:
        percent = (current / total) * 100
        print(f"\r{desc} {current}/{total} ({percent:.1f}%)", end="", flush=True)


class Reindexer:
    """
    Handle Elasticsearch index reindexing with type conversion and field restructuring.

    This class orchestrates the entire reindexing workflow:
      1. Validate/resolve source (concrete index or alias)
      2. Create new index with naming pattern
      3. Reindex documents with transformations
      4. Delete source concrete index
      5. Create aliases for backward compatibility

    Transformations applied during reindexing:
      • Field type conversion based on OKA constants
      • Field grouping/object restructuring
      • Handling of missing/null values

    Attributes:
        es (Elasticsearch): Elasticsearch client
        indices_config (List[Tuple[str, str, Optional[str]]]): Configuration for indices to process
    """

    def __init__(self, es_client: Elasticsearch) -> None:
        """
        Initialize the Reindexer.

        Args:
            es_client: Authenticated Elasticsearch client

        Raises:
            ValueError: If OBJECT_PATTERNS is invalid
        """
        self.es: Elasticsearch = es_client
        self.indices_config: List[Tuple[str, str, Optional[str]]] = INDICES_CONFIG

        # Validate and display OBJECT_PATTERNS configuration
        if OBJECT_PATTERNS:
            print(
                f"\n✓ OBJECT_PATTERNS enabled with {len(OBJECT_PATTERNS)} grouping rules:"
            )
            for pattern, parent in OBJECT_PATTERNS.items():
                print(f"    {pattern:20s}{parent}")
        else:
            print("\n⚠ OBJECT_PATTERNS is empty - no field grouping will occur")

    @staticmethod
    def create_es_client_from_oka(es_config: ElasticsearchConfig) -> Elasticsearch:
        """
        Create and configure an Elasticsearch client from OKA configuration.

        Handles:
          • Basic authentication (user/password)
          • SSL certificate verification
          • Custom CA certificates
          • Client certificate authentication

        Args:
            es_config: ElasticsearchConfig object from OKA configuration

        Returns:
            Configured Elasticsearch client

        Raises:
            Exception: If connection parameters are invalid
        """
        from pydantic import SecretStr

        host: str = str(es_config.host)

        # Build connection parameters
        conn_params: Dict[str, Any] = {
            "hosts": [host],
            "verify_certs": es_config.verify_certs,
            "request_timeout": REQUEST_TIMEOUT,
        }

        # Add authentication if configured
        if es_config.user and es_config.password:
            password: str = (
                es_config.password.get_secret_value()
                if isinstance(es_config.password, SecretStr)
                else es_config.password
            )
            conn_params["basic_auth"] = (es_config.user, password)

        # Add SSL certificates if verification enabled
        if es_config.verify_certs:
            if es_config.ca_cert_path:
                conn_params["ca_certs"] = str(es_config.ca_cert_path)
            if es_config.client_cert_path:
                conn_params["client_cert"] = str(es_config.client_cert_path)
            if es_config.client_key_path:
                conn_params["client_key"] = str(es_config.client_key_path)

        # Suppress urllib3 SSL warnings (user chose to verify)
        import urllib3

        urllib3.disable_warnings()

        return Elasticsearch(**conn_params)

    def generate_new_index_name(
        self, cluster_name: str, custom_key: Optional[str] = None
    ) -> str:
        """
        Generate a new index name following the naming pattern.

        Pattern: "oka-{cluster_name}[-{custom_key}]-{uuid}"

        The UUID ensures uniqueness even if multiple reindexing runs occur.
        This allows for blue-green deployments if needed.

        Args:
            cluster_name: Cluster identifier (e.g., "my_cluster", "prod")
            custom_key: Optional semantic label (e.g., "accounting", "billing")

        Returns:
            Generated index name (e.g., "oka-my_cluster-accounting-ace9a3b5")

        Example:
            generate_new_index_name("prod", "accounting")
            # Returns: "oka-prod-accounting-f2e8c1a9"
        """
        unique_id: str = str(uuid.uuid4())[:8]

        if custom_key:
            return f"oka-{cluster_name}-{custom_key}-{unique_id}"
        else:
            return f"oka-{cluster_name}-{unique_id}"

    def get_source_index_settings(self, index_name: str) -> Dict[str, Any]:
        """
        Get settings from source index to replicate in target index.

        Args:
            index_name: Source index name

        Returns:
            Dict with number_of_shards from source index
        """
        try:
            settings = self.es.indices.get_settings(index=index_name)
            source_settings = settings[index_name]["settings"]["index"]

            return {
                "number_of_shards": int(source_settings.get("number_of_shards", 4)),
            }
        except Exception as e:
            print(f"⚠ Warning: Could not get source settings: {e}")
            print("  Using configured INDEX_SETTINGS instead")
            return {"number_of_shards": 4}

    def create_target_index(self, index_name: str, source_index: str) -> None:
        """
        Create target index with settings from source index.

        Args:
            index_name: New index name to create
            source_index: Source index to copy shard count from
        """
        # Get shard count from source, keep replicas at 0
        source_settings = self.get_source_index_settings(source_index)

        index_body = {
            "settings": {
                "number_of_shards": source_settings["number_of_shards"],
                "number_of_replicas": 0,  # Always 0 during reindexing
            }
        }

        print("  Creating index with:")
        print(f"    • number_of_shards: {source_settings['number_of_shards']}")
        print("    • number_of_replicas: 0")

        self.es.indices.create(index=index_name, body=index_body)
        print(f"  ✓ Index created: {index_name}")

    def get_field_type(self, field_name: str) -> Optional[str]:
        """
        Determine the data type of a field using OKA constants.

        OKA constants define field types across the system:
          • INTCOLS: Integer fields
          • FLOATCOLS: Float fields
          • STRCOLS: String fields
          • DATECOLS: Date fields
          • NESTEDCOLS: Nested object fields
          • OBJECTCOLS: Object fields

        Args:
            field_name: Name of the field to check

        Returns:
            Type string ("int", "float", "string", "date", "nested", "object")
            or None if field is not defined in OKA constants

        Example:
            get_field_type("cpu_count")  # Returns: "int"
            get_field_type("memory_gb")  # Returns: "float"
            get_field_type("job_name")   # Returns: "string"
        """
        if field_name in oka_constants.INTCOLS:
            return "int"
        elif field_name in oka_constants.FLOATCOLS:
            return "float"
        elif field_name in oka_constants.STRCOLS:
            return "string"
        elif field_name in oka_constants.DATECOLS:
            return "date"
        elif field_name in oka_constants.NESTEDCOLS:
            return "nested"
        elif field_name in oka_constants.OBJECTCOLS:
            return "object"
        return None

    def get_object_parent(self, field_name: str) -> Optional[str]:
        """
        Check if a field should be grouped under a parent object.

        Uses OBJECT_PATTERNS to match field prefixes.

        Example matching:
          Pattern: "resource_list_" → Parent: "resource_list"
          Input field: "resource_list_mem"
          Output: "resource_list" (field will be grouped)

        Args:
            field_name: Field name to check

        Returns:
            Parent object name if pattern matches, None otherwise

        Example:
            OBJECT_PATTERNS = {"resource_list_": "resource_list"}
            get_object_parent("resource_list_mem")     # Returns: "resource_list"
            get_object_parent("resource_list_cpu")     # Returns: "resource_list"
            get_object_parent("job_name")              # Returns: None
        """
        if not OBJECT_PATTERNS:
            return None

        for pattern, parent in OBJECT_PATTERNS.items():
            if field_name.startswith(pattern):
                return parent

        return None

    def convert_field_value(
        self, field_name: str, value: Any
    ) -> Tuple[Any, bool, Optional[str]]:
        """
        Convert a field value to its correct type and determine object grouping.

        This method:
          1. Checks if the field should be grouped under a parent object
          2. Gets the field type from OKA constants (or overrides)
          3. Converts the value to the correct type
          4. Returns the converted value + grouping metadata

        Conversion logic:
          • None → None (passed through)
          • "int" → int(value)
          • "float" → float(value)
          • "date" → str(value) (dates stored as strings)
          • "object" → parsed dict (if stringified) or original value
          • Unknown → str(value) (default to string)

        On conversion error, the original value is returned unchanged.

        Args:
            field_name: Name of the field
            value: Value to convert

        Returns:
            Tuple of (converted_value, should_group_in_object, parent_object_name)
              • converted_value: Type-converted value
              • should_group_in_object: True if value will be nested under a parent
              • parent_object_name: Name of parent object ("resource_list", etc.)

        Example:
            convert_field_value("resource_list_mem", "2048")
            # Returns: (2048, True, "resource_list")
            #
            # This means: convert "2048" to int, and nest under resource_list
            # Result in transformed doc: {resource_list: {mem: 2048}}
        """
        if value is None:
            return None, False, None

        # Step 1: Check if this field should be grouped into a parent object
        object_parent: Optional[str] = self.get_object_parent(field_name)
        should_group_as_object: bool = object_parent is not None

        # Step 2: Get the field type (from overrides or OKA constants)
        field_type: Optional[str] = FIELD_TYPE_OVERRIDES.get(
            field_name
        ) or self.get_field_type(field_name)

        # Step 3: Convert based on type
        try:
            if field_type == "int":
                return int(value), should_group_as_object, object_parent
            elif field_type == "float":
                return float(value), should_group_as_object, object_parent
            elif field_type == "date":
                return str(value), should_group_as_object, object_parent
            elif field_type == "object":
                # Objects can be stored in three forms in Elasticsearch:
                #
                # 1. STRINGIFIED:
                #    '{"priority": 5, "owner": "alice"}' (string representation)
                #    Need to parse back to dict for proper handling.
                #
                # 2. ALREADY A DICT:
                #    {"priority": 5, "owner": "alice"} (native dict)
                #    Use unchanged.
                #
                # 3. NESTED STRUCTURES :
                #    {"metadata": {"priority": 5, "nested": {"level": 2}}}
                #    These can be nested dicts inside dicts.
                #    ast.literal_eval handles this correctly.
                #
                if isinstance(value, str):
                    try:
                        import ast

                        # ast.literal_eval safely parses stringified representations.
                        # Handles: {"a": 1}, {"a": {"b": 2}}, lists of objects, etc.
                        return (
                            ast.literal_eval(value),
                            should_group_as_object,
                            object_parent,
                        )
                    except Exception:
                        # Parsing failed (malformed string, invalid syntax, etc.)
                        # Better to keep original than crash. Elasticsearch will
                        # handle it or log a type mismatch error—at least data isn't lost.
                        return value, should_group_as_object, object_parent
                # Value is already a dict/object (including nested structures),
                # use it unchanged. Elasticsearch handles nested objects natively.
                return value, should_group_as_object, object_parent
            else:
                # Default: convert to string when type is unknown or not defined.
                # String is the safest fallback—ensures data is preserved and
                return str(value), should_group_as_object, object_parent

        except Exception:
            # On any conversion error, return original value unchanged
            return value, should_group_as_object, object_parent

    def transform_document(self, doc: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform a document for the new index mapping.

        Transformation steps:
          1. Extract document source
          2. For each field:
             a. Convert type using convert_field_value()
             b. If grouping, nest under parent object
             c. Otherwise, add field as-is
          3. Remove original flat fields that were grouped

        Example transformation:
          INPUT:
            {
              "resource_list_mem": "2048",
              "resource_list_cpu": "4",
              "job_id": "12345"
            }
          OUTPUT (with OBJECT_PATTERNS):
            {
              "resource_list": {"mem": 2048, "cpu": 4},
              "job_id": "12345"
            }

        Args:
            doc: Elasticsearch document (dict with _id, _source, etc.)

        Returns:
            Transformed source dict ready for the new index

        Note:
            The transformation is applied in-place during the reindex operation.
            Original flat fields are not preserved.
        """
        source: Dict[str, Any] = doc.get("_source", {})
        transformed: Dict[str, Any] = {}
        grouped_fields: Set[str] = set()  # Track which fields were grouped

        # Process each field in the source document
        for field_name, value in source.items():
            # Convert value and get grouping metadata
            converted_value, should_group, object_parent = self.convert_field_value(
                field_name, value
            )

            # Handle object grouping - nest under parent if applicable
            if should_group and object_parent:
                # Initialize parent object if not exists
                if object_parent not in transformed:
                    transformed[object_parent] = {}

                # Extract subfield name by removing the pattern prefix
                for pattern in OBJECT_PATTERNS.keys():
                    if field_name.startswith(pattern):
                        subfield_name: str = field_name[len(pattern) :]
                        transformed[object_parent][subfield_name] = converted_value
                        grouped_fields.add(field_name)
                        break
            else:
                # Regular field - add to transformed document as-is
                transformed[field_name] = converted_value

        return transformed

    def get_alias_target(self, alias_name: str) -> Optional[str]:
        """
        Get the concrete index that an alias currently points to.

        An alias is a virtual name that references one or more concrete indices.
        This function resolves the alias to its target.

        Args:
            alias_name: Name of the alias

        Returns:
            Concrete index name, or None if alias doesn't exist

        Example:
            get_alias_target("c0814ac7...")
            # Returns: "oka-my_cluster-accounting-ace9a3b5"
            #
            # Means the alias "c0814ac7..." points to that concrete index
        """
        try:
            aliases: Dict[str, Any] = self.es.indices.get_alias(name=alias_name)
            # Get the first (usually only) index this alias points to
            if aliases:
                return list(aliases.keys())[0]
        except Exception:
            return None
        return None

    def resolve_to_concrete_index(self, index_or_alias: str) -> str:
        """
        Resolve an index name or alias to a concrete index.

        If input is an alias, returns the concrete index it points to.
        If input is a concrete index, returns it unchanged.

        Args:
            index_or_alias: Index name or alias name

        Returns:
            Concrete index name

        Raises:
            ValueError: If index/alias doesn't exist or is invalid
        """
        # Try to get as alias first
        alias_target: str = self.get_alias_target(index_or_alias)
        if alias_target is not None:
            print(
                f"Resolved alias '{index_or_alias}' → concrete index '{alias_target}'"
            )
            return alias_target

        # Try to get as concrete index
        try:
            info: Dict[str, Any] = self.es.indices.get(index=index_or_alias)
            if index_or_alias in info:
                return index_or_alias
        except Exception as e:
            if "index_not_found" not in str(e):
                raise

        raise ValueError(f"Index or alias '{index_or_alias}' not found")

    def reindex(
        self,
        source_index: str,
        target_index: str,
        batch_size: int = 500,
    ) -> None:
        """
        Reindex documents from source to target index with transformations.

        This is the core reindexing operation:
          1. Get total document count
          2. Scan documents from source (in batches to avoid memory overflow)
          3. Transform each document (type conversion, field grouping)
          4. Bulk insert into target index
          5. Report progress and errors

        The scan operation uses Elasticsearch scrolling to efficiently handle
        large indices without loading everything into memory.

        Args:
            source_index: Source index name
            target_index: Target index name
            batch_size: Number of documents per bulk request (default: 500)

        Raises:
            Exception: If reindexing fails (connectivity, server error, etc.)

        Note:
            Target index must already exist with correct mappings.
            Errors during bulk insert are reported but don't stop the process.
        """
        try:
            # Display configuration
            if OBJECT_PATTERNS:
                print(f"Using object patterns: {OBJECT_PATTERNS}")

            # Get total document count for progress tracking
            total_docs: int = self.es.count(index=source_index)["count"]
            print(f"Total documents to reindex: {total_docs}")

            if total_docs == 0:
                print("No documents to reindex")
                return

            # Scan and transform documents
            actions: List[Dict[str, Any]] = []
            processed: int = 0

            # Use scan to iterate through all documents efficiently
            # scroll="2m" means Elasticsearch keeps the search context open for 2 minutes
            for doc in scan(
                self.es,
                index=source_index,
                scroll="2m",
                request_timeout=REQUEST_TIMEOUT,
            ):
                processed += 1
                progress_message(processed, total_docs, "Reindexing")

                # Transform document (type conversion, field grouping)
                transformed: Dict[str, Any] = self.transform_document(doc)

                # Create bulk action
                action: Dict[str, Any] = {
                    "_index": target_index,
                    "_id": doc["_id"],
                    "_source": transformed,
                }
                actions.append(action)

                # Bulk insert when batch is full
                if len(actions) >= batch_size:
                    success, failed = bulk(
                        self.es,
                        actions,
                        raise_on_error=False,
                        request_timeout=REQUEST_TIMEOUT,
                    )

                    # Report any errors
                    if failed:
                        for f in failed:
                            print(f"\nError: {f['error']['reason']}")

                    actions = []

            # Insert remaining documents
            if actions:
                success, failed = bulk(
                    self.es,
                    actions,
                    raise_on_error=False,
                    request_timeout=REQUEST_TIMEOUT,
                )
                if failed:
                    for f in failed:
                        print(f"\nError: {f['error']['reason']}")

            print("\n✓ Reindex complete")

        except Exception as e:
            print(f"\n✗ Reindex failed: {e}")
            raise

    def process_all(self) -> None:
        """
        Process all indices defined in INDICES_CONFIG.

        This is the main orchestration method. For each index in the config:

        WORKFLOW:
          [0/5] Resolve source (concrete index or alias) to concrete index name
          [1/5] Generate new index name (oka-{cluster}-{key}-{uuid})
          [2/5] Reindex documents from source → new index (with transformations)
          [3/5] Delete source concrete index
          [4/5] Create aliases:
                - If source was alias: keep the alias name pointing to new index
                - If source was concrete: create alias from source name to new index
                - Always create: new index name (without UUID) → new index
          [5/5] Clients transparently query via alias (no code changes needed)

        On error, raises immediately. Partially processed indices are not rolled back.

        Raises:
            ValueError: If validation fails (source doesn't exist, etc.)
            Exception: If any operation fails (connectivity, Elasticsearch errors, etc.)
        """
        for cluster_name, index_or_alias, custom_key in self.indices_config:
            print(f"\n{'=' * 70}")
            print(f"Processing cluster: {cluster_name}")
            if custom_key:
                print(f"  custom key: {custom_key}")
            print(f"  Source (index or alias): {index_or_alias}")
            print(f"{'=' * 70}")

            try:
                # Step 0: Resolve source to concrete index (handles both alias and concrete)
                print("\n[0/5] Resolving source...")
                concrete_index: str = self.resolve_to_concrete_index(index_or_alias)
                print(f"  ✓ Concrete index: {concrete_index}")

                # Step 1: Generate new index name with UUID
                new_index: str = self.generate_new_index_name(cluster_name, custom_key)
                print(f"\n[1/5-1] New index name: {new_index}")

                # Create target index with same shard count, 0 replicas
                print("\n[1/5-2] Creating target index...")
                self.create_target_index(new_index, concrete_index)

                # Step 2: Reindex from source to new index
                print(
                    f"\n[2/5] Reindexing documents from '{concrete_index}' → '{new_index}'"
                )
                self.reindex(
                    source_index=concrete_index,
                    target_index=new_index,
                    batch_size=BATCH_SIZE,
                )

                # Step 3: Delete source concrete index (now safe)
                print(f"\n[3/5] Deleting source concrete index: {concrete_index}")
                self.es.indices.delete(index=concrete_index)
                print("  ✓ Concrete index deleted")

                # Step 4: Create alias(es) for backward compatibility
                print("\n[4/5] Creating alias(es):")

                # Alias 1: Use the original name (whether it was alias or concrete index)
                # This ensures clients can continue using the same name
                self.es.indices.put_alias(index=new_index, name=index_or_alias)
                print(f"  ✓ Alias created: {index_or_alias}{new_index}")

                # Alias 2: New index name (without UUID) for direct access if needed
                new_index_no_uuid: str = new_index.rsplit("-", 1)[0]
                self.es.indices.put_alias(index=new_index, name=new_index_no_uuid)
                print(f"  ✓ Alias created: {new_index_no_uuid}{new_index}")

                print("\n[5/5] ✓ Process complete")
                print(f"  Clients query using: '{index_or_alias}' (alias)")
                print(f"  Data stored in: '{new_index}' (concrete index)")

            except ValueError as e:
                # User/validation errors
                print("\n  ✗ VALIDATION ERROR")
                print(f"     {str(e)}")
                raise
            except Exception as e:
                # Operational errors
                print(f"\n  ✗ Process failed: {str(e)}")
                raise


def main() -> None:
    """
    Main entry point for the reindexing tool.

    Workflow:
      1. Parse command-line arguments
      2. Load OKA configuration from YAML file
      3. Create Elasticsearch client
      4. Initialize reindexer with configuration
      5. Process all configured indices
      6. Exit with appropriate status code

    Command-line Arguments:
      --conf-path (required): Path to oka.yaml configuration file

    Exit Codes:
      0: Success
      1: Configuration or validation error
      130: Interrupted by user (Ctrl+C)
      1: Unexpected error
    """
    parser = argparse.ArgumentParser(
        description="Reindex Elasticsearch indices with type conversion and field restructuring"
    )
    parser.add_argument(
        "--conf-path",
        required=True,
        help="Path to OKA configuration file (oka.yaml)",
    )

    args = parser.parse_args()

    try:
        # Load OKA configuration
        oka_config: OKAConfig = OKAConfig.from_yaml(args.conf_path)
        es_client: Elasticsearch = Reindexer.create_es_client_from_oka(
            oka_config.elasticsearch
        )

        # Initialize reindexer with configuration
        reindexer: Reindexer = Reindexer(es_client)

        # Process all configured indices
        reindexer.process_all()

    except ValueError as e:
        # User errors (validation, config, etc) - show message only
        print(f"\n✗ ERROR: {str(e)}", file=sys.stderr)
        sys.exit(1)
    except KeyboardInterrupt:
        # User interrupted (Ctrl+C)
        print("\n\n✗ Process interrupted by user", file=sys.stderr)
        sys.exit(130)
    except Exception as e:
        # Unexpected errors - show traceback
        print(f"\n✗ Unexpected error: {str(e)}", file=sys.stderr)
        import traceback

        traceback.print_exc()
        sys.exit(1)


if __name__ == "__main__":
    main()
  1. This program aims to reindex the accounting index only. Therefore you need to first access OKA and from the Cluster page, retrieve two things:

    1. The value of the field Cluster UID.

    2. The Elasticsearch accounting index UID.

  2. Then, open the script in a text editor and update the INDICES_CONFIG list to specify required info regarding each impacted Cluster as follows:

    INDICES_CONFIG: List[Tuple[str, str, Optional[str]]] =
    [
          ("my_cluster_name_uid", "8dd506ff-b858-41ea-9735-6169751d6902", "accounting"),
    ]
    
  3. Finally, proceed to run the script:

    cd ${OKA_INSTALL_DIR}/current/bin/tools/
    # Check available options
    uv run reindex_indices.py -h
    # Execute reindex
    uv run reindex_indices.py --conf-path ${OKA_INSTALL_DIR}/config/oka/oka.yaml