Introduction
This post gives a minimal (but not production-ready) example of using FastAPI to run Kedro projects with parameters specified by HTTP requests. Although in this example I will have the web applications address set to home, in principle this lets you send remote instructions to run a Kedro project with modifiable behaviour.
Explanation
We can define a data model which contains both the project name and the parameters we want to override in the Kedro project.
class KedroParams(BaseModel):
str
project_name: str, str] params: Dict[
The FastAPI application needs to know about a given Kedro project’s path in order to be able to run it. So in a file projects.yml
I put the following:
projects:
spaceflights-pandas: spaceflights-pandas
We also need to expose a command to process incoming requests. That’s where run_kedro
comes in, which will expose our run-kedro
command to HTTP POST requests. It loads the projects configuration, and finds the relevant Kedro project path based on the project name provided in the kedroParam
object obtained from the request. If the project name in the request doesn’t match the project name in the projects configuration file then an HTTP 404 status will be sent back. Lastly, we run the Kedro project and send back the logged output.
@app.post("/run-kedro/")
def run_kedro(params: KedroParams):
# Load the project configuration
= load_project_config()
projects
# Get the project path from the config
= projects.get(params.project_name)
project_path
if not project_path:
raise HTTPException(status_code=404, detail="Project not found.")
# Run the Kedro command with the provided parameters
= run_kedro_command(project_path, params.params)
output return {"output": output}
We’ll also need to be able to load this project configuration. The project configuration comes from load_project_config
which is defined as follows:
def load_project_config(config_file: str = "projects.yaml") -> Dict[str, str]:
if not os.path.exists(config_file):
raise FileNotFoundError("Config file not found.")
with open(config_file, 'r') as file:
= yaml.safe_load(file)
config return config.get("projects", {})
The final piece is to define how run_kedro_command
works. It takes the project path and the parameter values to be overloaded when the project runs. First it prepares the parameters from its dictionary data structure into a string which Kedro’s run command accepts. Then it prepares a list which can be used in a subprocess.call
, which is attempted within a try-except just in case something goes wrong in running the project and we don’t want the application to crash. If the project successfully runs then we will grab the standard output to be sent back. Otherwise a server error response will be given.
def run_kedro_command(project_path: str, params: Dict[str, str]) -> str:
# Convert params dictionary to a Kedro CLI formatted string
= ",".join([f"{key}={value}" for key, value in params.items()])
params_str = ["kedro", "run", f"--params={params_str}"]
command
try:
# Running the command
= subprocess.run(command, cwd=project_path, capture_output=True, text=True, check=True)
result return result.stdout
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=f"Error executing Kedro command: {e.stderr}")
Setup
In order to actually run this example you’ll need the full application code:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict
import subprocess
import yaml
import os
= FastAPI()
app
class KedroParams(BaseModel):
str
project_name: str, str]
params: Dict[
def load_project_config(config_file: str = "projects.yaml") -> Dict[str, str]:
if not os.path.exists(config_file):
raise FileNotFoundError("Config file not found.")
with open(config_file, 'r') as file:
= yaml.safe_load(file)
config return config.get("projects", {})
def run_kedro_command(project_path: str, params: Dict[str, str]) -> str:
# Convert params dictionary to a Kedro CLI formatted string
= ",".join([f"{key}={value}" for key, value in params.items()])
params_str = ["kedro", "run", f"--params={params_str}"]
command
try:
# Running the command
= subprocess.run(command, cwd=project_path, capture_output=True, text=True, check=True)
result return result.stdout
except subprocess.CalledProcessError as e:
raise HTTPException(status_code=500, detail=f"Error executing Kedro command: {e.stderr}")
@app.post("/run-kedro/")
def run_kedro(params: KedroParams):
# Load the project configuration
= load_project_config()
projects
# Get the project path from the config
= projects.get(params.project_name)
project_path
if not project_path:
raise HTTPException(status_code=404, detail="Project not found.")
# Run the Kedro command with the provided parameters
= run_kedro_command(project_path, params.params)
output return {"output": output}
You can create an example project matching the projects.yaml
above by using the spaceflights starter. After all, you can’t run a kedro project without a kedro project.
kedro new --starter=spaceflights-pandas
Next you can run/reload the application:
uvicorn main:app --reload
Lastly, we can send an HTTP request to the application via port 8000 at the home address (which is the default). We’ll change the behaviour of the project by changing the test set size to 30% and the random seed to 2018.
curl -X POST "http://127.0.0.1:8000/run-kedro/" -H "Content-Type: application/json" -d '{
"project_name": "spaceflights-pandas",
"params": {
"test_size": "0.3",
"random_state": "2018"
}
}'
The application should send back something like this:
{"output":"[08/21/24 21:59:43] INFO Using `conf/logging.yml` as logging configuration. You can change this __init__.py:249\n by setting the KEDRO_LOGGING_CONFIG environment variable accordingly. \n[08/21/24 21:59:44] INFO Kedro project spaceflights-pandas session.py:324\n[08/21/24 21:59:45] INFO Using synchronous mode for loading and saving data. Use the sequential_runner.py:64\n --async flag for potential performance gains. \n https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipe \n line.html#load-and-save-asynchronously \n INFO Loading data from companies (CSVDataset)... data_catalog.py:508\n INFO Running node: preprocess_companies_node: preprocess_companies([companies]) node.py:361\n -> [preprocessed_companies] \n INFO Saving data to preprocessed_companies (ParquetDataset)... data_catalog.py:550\n INFO Completed 1 out of 6 tasks sequential_runner.py:90\n INFO Loading data from shuttles (ExcelDataset)... data_catalog.py:508\n[08/21/24 21:59:47] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) -> node.py:361\n [preprocessed_shuttles] \n WARNING /home/galen/projects/blog/posts/fastapi-kedro-params/spaceflights-panda warnings.py:109\n s/src/spaceflights_pandas/pipelines/data_processing/nodes.py:15: \n FutureWarning: The default value of regex will change from True to \n False in a future version. In addition, single character regular \n expressions will *not* be treated as literal strings when regex=True. \n x = x.str.replace(\"$\", \"\").str.replace(\",\", \"\") \n \n INFO Saving data to preprocessed_shuttles (ParquetDataset)... data_catalog.py:550\n INFO Completed 2 out of 6 tasks sequential_runner.py:90\n INFO Loading data from preprocessed_shuttles (ParquetDataset)... data_catalog.py:508\n INFO Loading data from preprocessed_companies (ParquetDataset)... data_catalog.py:508\n INFO Loading data from reviews (CSVDataset)... data_catalog.py:508\n INFO Running node: create_model_input_table_node: node.py:361\n create_model_input_table([preprocessed_shuttles;preprocessed_companies;revi \n ews]) -> [model_input_table] \n INFO Saving data to model_input_table (ParquetDataset)... data_catalog.py:550\n INFO Completed 3 out of 6 tasks sequential_runner.py:90\n INFO Loading data from model_input_table (ParquetDataset)... data_catalog.py:508\n INFO Loading data from params:model_options (MemoryDataset)... data_catalog.py:508\n INFO Running node: split_data_node: node.py:361\n split_data([model_input_table;params:model_options]) -> \n [X_train;X_test;y_train;y_test] \n INFO Saving data to X_train (MemoryDataset)... data_catalog.py:550\n INFO Saving data to X_test (MemoryDataset)... data_catalog.py:550\n INFO Saving data to y_train (MemoryDataset)... data_catalog.py:550\n INFO Saving data to y_test (MemoryDataset)... data_catalog.py:550\n INFO Completed 4 out of 6 tasks sequential_runner.py:90\n INFO Loading data from X_train (MemoryDataset)... data_catalog.py:508\n INFO Loading data from y_train (MemoryDataset)... data_catalog.py:508\n INFO Running node: train_model_node: train_model([X_train;y_train]) -> node.py:361\n [regressor] \n INFO Saving data to regressor (PickleDataset)... data_catalog.py:550\n INFO Completed 5 out of 6 tasks sequential_runner.py:90\n INFO Loading data from regressor (PickleDataset)... data_catalog.py:508\n INFO Loading data from X_test (MemoryDataset)... data_catalog.py:508\n INFO Loading data from y_test (MemoryDataset)... data_catalog.py:508\n INFO Running node: evaluate_model_node: node.py:361\n evaluate_model([regressor;X_test;y_test]) -> None \n INFO Model has a coefficient R^2 of 0.402 on test data. nodes.py:55\n INFO Completed 6 out of 6 tasks sequential_runner.py:90\n INFO Pipeline execution completed successfully. runner.py:119\n"}