HTTP Authentication

Overview

When configuring OKA to use authentication through an external provider of credentials (see Authentication Method), you must implement a custom authentication class that OKA will use to control login and logout actions.

To do this, you need to define HTTPAuth authentication class with the required methods for handling authentication and session revocation.

Location

The authentication class should be implemented in the following file:

${OKA_INSTALL_DIR}/conf/auth/http_auth.py

Required Methods

Your authentication class must contain the following two methods:

* authenticate
* revoke

authenticate

This method is responsible for extracting and validating user details. It should process a user authentication request and return the relevant information.

Expected output:

A dictionary containing the extracted user information:

  • username (str, required): The primary identifier of the user.

  • email (str, optional): The user’s email address.

  • first_name (str, optional): The user’s first name.

  • last_name (str, optional): The user’s last name.

  • is_superuser (bool, optional, defaults to False): Indicates if the user has administrative privileges.

  • session_expiry (int, optional, defaults to None): Defines session expiration time in seconds.

revoke

This method is responsible for handling the user logout process by determining the appropriate logout mechanism.

Expected output:

A dictionary containing either of the following options:

  • url_based_logout (str): A logout URL for redirection.

  • cookie_based_logout (str): The name of the authentication session cookie to be cleared.

OKA Integration

These methods will be called by OKA to validate user authentication and manage logout actions. Implementing them correctly ensures proper integration with external authentication providers.

Example Implementation

Below is an example of how the authentication class can be implemented for AWS OpenID:

import base64
import json
from datetime import datetime

import jwt
import requests


class HTTPAuth:
    """
    Handle HTTP authentication using JWT tokens.

    Features:
    - Extracts JWT tokens from HTTP request headers.
    - Decodes JWT headers to retrieve the key ID (kid).
    - Fetches and verifies the JWT signature using a public key.
    - Extracts user details from the JWT payload.
    - Provides a method to revoke authentication.

    Use case:
    Suitable for applications using AWS OpenID authentication (e.g., AWS ALB authentication).

    Methods:
    - authenticate(request_dict): Validates the JWT token and extracts user details.
    - revoke(): Placeholder for revoking authentication.
    """

    def __init__(self, request_dict):
        """
        Initialize the HTTPAuth class and accept the request_dict.

        Args:
            request_dict (dict): A dictionary containing HTTP request data with the following keys:
                - `POST` (dict): Data from the POST body of the request.
                - `GET` (dict): Data from the query parameters in the URL.
                - `META` (dict): HTTP headers, typically with keys prefixed by "HTTP_"
                                 (e.g., "HTTP_AUTHORIZATION" for the `Authorization` header).
        """
        self.request_dict = request_dict
        # Define the source of authentication data (default is META)
        self.data_source = "META"
        # The header name containing the auth token.
        self.data_header = "HTTP_X_AMZN_OIDC_DATA"
        # Base URL for fetching the public key.
        self.key_url = "https://public-keys.auth.elb.eu-west-1.amazonaws.com/"
        # The JWT algorithm used by AWS OpenID.
        self.algorithm = "ES256"

    def authenticate(self):
        """
        Authenticate a user using an AWS OpenID JWT token.

        Returns:
            dict:
                - On success:
                  A dictionary with user details extracted from the JWT payload:
                    - `username` (required): The user's primary identifier, typically their email address.
                    - `email` (optional): The user's email address.
                    - `first_name` (optional): The user's first (given) name.
                    - `last_name` (optional): The user's last (family) name.
                    - `is_superuser` (optional): A boolean flag, defaulting to `False`.
                    - `session_expiry` (optional): The session expiry time (in seconds).

                - On failure:
                  A dictionary containing an `error` key with a descriptive error message.
        """
        try:
            # Step 1: Retrieve the token from the request headers.
            data = self.request_dict.get(self.data_source, {}).get(self.data_header, None)
            if not data:
                return {"error": "Authorization token not found in request headers"}

            # Step 2: Decode the JWT header to get the 'kid' (key ID).
            jwt_headers = data.split(".")[0]
            decoded_json = json.loads(base64.b64decode(jwt_headers).decode("utf-8"))
            key_id = decoded_json.get("kid")
            if not key_id:
                return {"error": "JWT header missing 'kid' field"}

            # Step 3: Fetch the public key using the 'kid'.
            req = requests.get(f"{self.key_url}{key_id}")
            if req.status_code != 200:
                return {"error": "Failed to fetch public key from AWS"}
            public_key = req.text

            # Step 4: Decode and verify the JWT token using the public key.
            payload = jwt.decode(data, public_key, algorithms=[self.algorithm])

            # Get "session_expiry"
            timestamp = datetime.now().timestamp()
            expiry_date = payload.get("exp")
            session_expiry = round(expiry_date - timestamp)

            # Step 5: Extract and return user details from the JWT payload.
            return {
                "username": payload.get("email", ""),  # Required field
                "email": payload.get("email", ""),  # Optional
                "first_name": payload.get("given_name", ""),  # Optional
                "last_name": payload.get("family_name", ""),  # Optional
                "is_superuser": False,  # Defaults to False
                "session_expiry": session_expiry,  # Optional, default is None
            }

        except jwt.ExpiredSignatureError:
            return {"error": "JWT token has expired"}
        except jwt.InvalidTokenError:
            return {"error": "Invalid JWT token"}
        except Exception as e:
            return {"error": str(e)}

    def revoke(self):
        """
        Revoke authentication.

        Determine the logout method and constructs the appropriate response:

        - If using URL-based logout, provides a logout URL to redirect the user.
        - If using cookie-based logout, specifies the authentication session cookie name to be cleared.

        Returns:
            dict: A dictionary containing either:
                - `url_based_logout` (str): The logout URL for redirection, OR
                - `cookie_based_logout` (str): The name of the authentication session cookie.
        """
        cookie_name = "AWSELBAuthSessionCookie"
        return_object = {
            "cookie_based_logout": cookie_name,
        }

        return return_object

Notes

Note

  • Any modifications to http_auth.py are applied immediately.

  • Restarting the oka.service is not required for changes to take effect.

Note

  • Upon update, OKA will generate a second file http_auth.py.latest_template in order not to overwrite any client updates on the original file.

Warning

  • Do not rename http_auth.py, the class name, or the function names, nor change the file’s location (${OKA_INSTALL_DIR}/conf/auth).

  • Any modifications to these may cause authentication to fail.