Running A Kedro Project With An HTTP Request Via FastAPI

Python
FastAPI
Kedro
HTTP
HTTP Requests
Author

Galen Seilis

Published

August 21, 2024

Introduction

This post gives a minimal (but not production-ready) example of using FastAPI to run Kedro projects with parameters specified by HTTP requests. Although in this example I will have the web applications address set to home, in principle this lets you send remote instructions to run a Kedro project with modifiable behaviour.

Explanation

We can define a data model which contains both the project name and the parameters we want to override in the Kedro project.

class KedroParams(BaseModel):
    project_name: str
    params: Dict[str, str]

The FastAPI application needs to know about a given Kedro project’s path in order to be able to run it. So in a file projects.yml I put the following:

projects:
    spaceflights-pandas: spaceflights-pandas

We also need to expose a command to process incoming requests. That’s where run_kedro comes in, which will expose our run-kedro command to HTTP POST requests. It loads the projects configuration, and finds the relevant Kedro project path based on the project name provided in the kedroParam object obtained from the request. If the project name in the request doesn’t match the project name in the projects configuration file then an HTTP 404 status will be sent back. Lastly, we run the Kedro project and send back the logged output.

@app.post("/run-kedro/")
def run_kedro(params: KedroParams):
    # Load the project configuration
    projects = load_project_config()

    # Get the project path from the config
    project_path = projects.get(params.project_name)

    if not project_path:
        raise HTTPException(status_code=404, detail="Project not found.")

    # Run the Kedro command with the provided parameters
    output = run_kedro_command(project_path, params.params)
    return {"output": output}

We’ll also need to be able to load this project configuration. The project configuration comes from load_project_config which is defined as follows:

def load_project_config(config_file: str = "projects.yaml") -> Dict[str, str]:
    if not os.path.exists(config_file):
        raise FileNotFoundError("Config file not found.")
    with open(config_file, 'r') as file:
        config = yaml.safe_load(file)
    return config.get("projects", {})

The final piece is to define how run_kedro_command works. It takes the project path and the parameter values to be overloaded when the project runs. First it prepares the parameters from its dictionary data structure into a string which Kedro’s run command accepts. Then it prepares a list which can be used in a subprocess.call, which is attempted within a try-except just in case something goes wrong in running the project and we don’t want the application to crash. If the project successfully runs then we will grab the standard output to be sent back. Otherwise a server error response will be given.

def run_kedro_command(project_path: str, params: Dict[str, str]) -> str:
    # Convert params dictionary to a Kedro CLI formatted string
    params_str = ",".join([f"{key}={value}" for key, value in params.items()])
    command = ["kedro", "run", f"--params={params_str}"]

    try:
        # Running the command
        result = subprocess.run(command, cwd=project_path, capture_output=True, text=True, check=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        raise HTTPException(status_code=500, detail=f"Error executing Kedro command: {e.stderr}")

Setup

In order to actually run this example you’ll need the full application code:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict
import subprocess
import yaml
import os

app = FastAPI()

class KedroParams(BaseModel):
    project_name: str
    params: Dict[str, str]

def load_project_config(config_file: str = "projects.yaml") -> Dict[str, str]:
    if not os.path.exists(config_file):
        raise FileNotFoundError("Config file not found.")
    with open(config_file, 'r') as file:
        config = yaml.safe_load(file)
    return config.get("projects", {})

def run_kedro_command(project_path: str, params: Dict[str, str]) -> str:
    # Convert params dictionary to a Kedro CLI formatted string
    params_str = ",".join([f"{key}={value}" for key, value in params.items()])
    command = ["kedro", "run", f"--params={params_str}"]

    try:
        # Running the command
        result = subprocess.run(command, cwd=project_path, capture_output=True, text=True, check=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        raise HTTPException(status_code=500, detail=f"Error executing Kedro command: {e.stderr}")

@app.post("/run-kedro/")
def run_kedro(params: KedroParams):
    # Load the project configuration
    projects = load_project_config()

    # Get the project path from the config
    project_path = projects.get(params.project_name)

    if not project_path:
        raise HTTPException(status_code=404, detail="Project not found.")

    # Run the Kedro command with the provided parameters
    output = run_kedro_command(project_path, params.params)
    return {"output": output}

You can create an example project matching the projects.yaml above by using the spaceflights starter. After all, you can’t run a kedro project without a kedro project.

kedro new --starter=spaceflights-pandas

Next you can run/reload the application:

uvicorn main:app --reload

Lastly, we can send an HTTP request to the application via port 8000 at the home address (which is the default). We’ll change the behaviour of the project by changing the test set size to 30% and the random seed to 2018.

curl -X POST "http://127.0.0.1:8000/run-kedro/" -H "Content-Type: application/json" -d '{
    "project_name": "spaceflights-pandas",
    "params": {
        "test_size": "0.3",
        "random_state": "2018"
    }
}'

The application should send back something like this:

{"output":"[08/21/24 21:59:43] INFO     Using `conf/logging.yml` as logging configuration. You can change this  __init__.py:249\n                             by setting the KEDRO_LOGGING_CONFIG environment variable accordingly.                  \n[08/21/24 21:59:44] INFO     Kedro project spaceflights-pandas                                        session.py:324\n[08/21/24 21:59:45] INFO     Using synchronous mode for loading and saving data. Use the     sequential_runner.py:64\n                             --async flag for potential performance gains.                                          \n                             https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipe                        \n                             line.html#load-and-save-asynchronously                                                 \n                    INFO     Loading data from companies (CSVDataset)...                         data_catalog.py:508\n                    INFO     Running node: preprocess_companies_node: preprocess_companies([companies])  node.py:361\n                             -> [preprocessed_companies]                                                            \n                    INFO     Saving data to preprocessed_companies (ParquetDataset)...           data_catalog.py:550\n                    INFO     Completed 1 out of 6 tasks                                      sequential_runner.py:90\n                    INFO     Loading data from shuttles (ExcelDataset)...                        data_catalog.py:508\n[08/21/24 21:59:47] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) ->  node.py:361\n                             [preprocessed_shuttles]                                                                \n                    WARNING  /home/galen/projects/blog/posts/fastapi-kedro-params/spaceflights-panda warnings.py:109\n                             s/src/spaceflights_pandas/pipelines/data_processing/nodes.py:15:                       \n                             FutureWarning: The default value of regex will change from True to                     \n                             False in a future version. In addition, single character regular                       \n                             expressions will *not* be treated as literal strings when regex=True.                  \n                               x = x.str.replace(\"$\", \"\").str.replace(\",\", \"\")                                      \n                                                                                                                    \n                    INFO     Saving data to preprocessed_shuttles (ParquetDataset)...            data_catalog.py:550\n                    INFO     Completed 2 out of 6 tasks                                      sequential_runner.py:90\n                    INFO     Loading data from preprocessed_shuttles (ParquetDataset)...         data_catalog.py:508\n                    INFO     Loading data from preprocessed_companies (ParquetDataset)...        data_catalog.py:508\n                    INFO     Loading data from reviews (CSVDataset)...                           data_catalog.py:508\n                    INFO     Running node: create_model_input_table_node:                                node.py:361\n                             create_model_input_table([preprocessed_shuttles;preprocessed_companies;revi            \n                             ews]) -> [model_input_table]                                                           \n                    INFO     Saving data to model_input_table (ParquetDataset)...                data_catalog.py:550\n                    INFO     Completed 3 out of 6 tasks                                      sequential_runner.py:90\n                    INFO     Loading data from model_input_table (ParquetDataset)...             data_catalog.py:508\n                    INFO     Loading data from params:model_options (MemoryDataset)...           data_catalog.py:508\n                    INFO     Running node: split_data_node:                                              node.py:361\n                             split_data([model_input_table;params:model_options]) ->                                \n                             [X_train;X_test;y_train;y_test]                                                        \n                    INFO     Saving data to X_train (MemoryDataset)...                           data_catalog.py:550\n                    INFO     Saving data to X_test (MemoryDataset)...                            data_catalog.py:550\n                    INFO     Saving data to y_train (MemoryDataset)...                           data_catalog.py:550\n                    INFO     Saving data to y_test (MemoryDataset)...                            data_catalog.py:550\n                    INFO     Completed 4 out of 6 tasks                                      sequential_runner.py:90\n                    INFO     Loading data from X_train (MemoryDataset)...                        data_catalog.py:508\n                    INFO     Loading data from y_train (MemoryDataset)...                        data_catalog.py:508\n                    INFO     Running node: train_model_node: train_model([X_train;y_train]) ->           node.py:361\n                             [regressor]                                                                            \n                    INFO     Saving data to regressor (PickleDataset)...                         data_catalog.py:550\n                    INFO     Completed 5 out of 6 tasks                                      sequential_runner.py:90\n                    INFO     Loading data from regressor (PickleDataset)...                      data_catalog.py:508\n                    INFO     Loading data from X_test (MemoryDataset)...                         data_catalog.py:508\n                    INFO     Loading data from y_test (MemoryDataset)...                         data_catalog.py:508\n                    INFO     Running node: evaluate_model_node:                                          node.py:361\n                             evaluate_model([regressor;X_test;y_test]) -> None                                      \n                    INFO     Model has a coefficient R^2 of 0.402 on test data.                          nodes.py:55\n                    INFO     Completed 6 out of 6 tasks                                      sequential_runner.py:90\n                    INFO     Pipeline execution completed successfully.                                runner.py:119\n"}