Techiescamp blog covers in-depth guides on Kubernetes, DevOps, DevSecOps, Cloud computing, Infrastructure Automation, CI/CD, SaaS Tools, ...
🌐 Visit Blog.techiescamp.com 🌐 Blog.techiescamp.com besuchen
Write rieview✍️ Rezension schreiben✍️ Get Badge!🏷️ Abzeichen holen!🏷️ Edit entry⚙️ Eintrag bearbeiten⚙️ News📰 Neuigkeiten📰
60313 Frankfurt am Main, DE Germany, EU Europe, latitude: 50.1169, longitude: 8.6837
When running a Kubeadm cluster on AWS EC2 instances, if the nodes are spread across multiple subnets, you may encounter a complete DNS resolution failure.
All nslookup or curl requests inside pods time out with connection timed out; no servers could be reached even though the CoreDNS pods are running and healthy.
Running nslookup from any pod returns a connection timeout:
kubectl exec -it dnsutils -- nslookup kubernetes.default
;; connection timed out; no servers could be reached
command terminated with exit code 1This happens even when CoreDNS pods are fully running and the CoreDNS service exists:
kubectl get pods -n kube-system -l k8s-app=kube-dns
NAME READY STATUS RESTARTS AGE
coredns-66bc5c9577-qxqsb 1/1 Running 0 19m
coredns-66bc5c9577-x5c65 1/1 Running 0 19mCalico is the networking plugin that moves data between pods. It was set to a mode called VXLANCrossSubnet, which is supposed to wrap packets in a tunnel when pods talk across different network zones, but when nodes are on the same network, it skips the tunnel and sends packets directly.
Calico in VXLANCrossSubnet mode decides whether to use a tunnel or send traffic directly by comparing the subnet information stored on each Node resource, which comes from manual configuration or autodetection when Calico's node starts.
If that subnet information is wrong, Calico can incorrectly conclude that two nodes are on the same subnet, skip the tunnel, and send packets using raw pod IP addresses like (10.244.x.x) directly over AWS.
AWS has a built-in security feature called Source/Destination Check that inspects every packet leaving an EC2 instance.
AWS only knows about real EC2 node IPs (172.31.x.x).When it saw packets using pod IPs (10.244.x.x) as the source, it treated them as spoofed traffic and silently dropped every packet with no error or log message.
Two methods are available to solve this issue. Both address the same problem but at different layers.
Method 1 resolves it at the Calico level, while Method 2 fixes it at the AWS infrastructure level. Either method works.
In our case, we used Method 2, which resolves the issue at the AWS level.
Disabling Source/Destination Check tells AWS to stop inspecting packet IPs on EC2 instances. Raw pod packets pass freely without any changes to Calico.
Apply the following steps to each EC2 instance (control plane, node01, and node02).
Using AWS Console:
Open the EC2 Console and select the required instance
Click Actions toggle button as shown below.
Then, go to Networking and choose Change source/destination check
There enable Stop, then click Save to apply the change
Disabling Source/Destination Check takes effect immediately. No Kubernetes or Calico restart is required.
Using AWS CLI:
First, get all instance IDs.
aws ec2 describe-instances \
--query 'Reservations[].Instances[].[InstanceId,PrivateIpAddress]' \
--output tableThen, run the following command to disable it on each node:
aws ec2 modify-instance-attribute \
--instance-id <instance-id> \
--no-source-dest-check26.2.2026 09:15Unable to Resolve DNS in Kubeadm on AWS EC2 InstanceFor frontend setup, used a flask framework to render UI and flask server as backend.
Install flask if not installed already, use command pip install flask
Your project folder should look like this:
frontend/
│
|── static/
|── script.js # to process form data
|── style.css
├── templates/
│ └── index.html # Frontend UI template
│
├── app.py # Backend Flask server
First, make sure you have Flask installed. You can install it using pip:
pip install flask flask-cors pandas mlflow
or clone the project and run command pip install -r requirements.txt to install dependencies.
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import pandas as pd
import pickle
import mlflow
import mlflow.pyfunc
app = Flask(__name__)
CORS(app)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/predict', methods=['POST'])
def predict():
... will continue
if __name__ == "__main__":
app.run(host='127.0.0.1', port=8000, debug=True)
# mlflow
mlflow.set_tracking_uri("http://127.0.0.1:5000")
model_name = "Employee Attrition Model"
model_version = "3"
run_id = "bdda2dfd55454b9694bef6653ebbbe64"
model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version}")
# download artifacts
scaler_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/preprocessor/scaler.pkl")
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)
feature_names_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/preprocessor/feature_names.pkl")
with open(feature_names_path, "rb") as f:
feature_names = pickle.load(f)
ordinal_encoder_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/preprocessor/ordinal_encoder.pkl")
with open(ordinal_encoder_path, "rb") as f:
ordinal_encoder = pickle.load(f) ... continue with predict() as mentioned in step-2
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
if isinstance(data, dict):
input_data = pd.DataFrame([data])
else:
input_data = pd.DataFrame(data)
print('data', input_data)
df = preprocessing_input(input_data)
print('df: ', df)
try:
prediction = model.predict(df)
print('predict: ', prediction)
result = "Left" if prediction[0] == 1 else "Stayed"
print('result: ', result)
return jsonify({"prediction": result})
except Exception as e:
return jsonify({
"error": str(e)
}), 400
Here, we get json_data from frontend, and check if its compatible with dataframe, if not we'll create one.
Then we'll preprocess the input data by calling preprocessing_input() function which we'll see next.
Lastly predict the input with model, here model is the one we load using mlflow model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version}")
def preprocessing_input(input):
# ordinal encoding
cols_to_encode = ['Work-Life Balance', 'Job Satisfaction', 'Performance Rating', 'Education Level', 'Job Level', 'Company Size', 'Company Reputation', 'Employee Recognition']
input[cols_to_encode] = ordinal_encoder.transform(input[cols_to_encode]).astype('int')
# binary encoding
binary_cols = ['Overtime', 'Remote Work', 'Opportunities']
for col in binary_cols:
input[col] = input[col].map({'No': 0, 'Yes': 1})
# feature engg
def map_monthly_income(income):
if 1 <= income <= 10000:
return 0
elif 10001 <= income <= 20000:
return 1
elif 20001 <= income <= 50000:
return 2
elif 50001 <= income <=100000:
return 3
elif income >= 100001:
return 4
else:
return -1
input['Monthly Income'] = input['Monthly Income'].apply(map_monthly_income)
# ensure correct column order
input = input[feature_names]
print(input)
# scale the data
input_scaled = scaler.transform(input)
return input_scaled
We have used encoding and feature engineering when training the model. So our model expects the same thing for new request data! We have to preprocess the new request data as we did in training model by following the same method.
python app.py
or you can run flask
flask runBelow is the UI will render
/predict via a POST request.pyfunc interface.Stayed or Left.MLflow makes tracking models, managing artifacts, and deploying reproducible pipelines effortless. Using Flask and MLflow together provides a flexible, fast development environment for machine learning web apps.
Try using MLflow in your machine learning projects to improve your workflow and reproducibility!
16.4.2025 09:06UI setup of MLflow projectLet's learn the deploying a machine learning model using Flask as both the frontend and backend, and leveraging MLflow to manage the model and preprocessing artifacts. This setup allows for a lightweight, reproducible ML deployment environment.
If you see the project structure, it is divided into backend and frontend folder for understanding.
flask python, mlflowCreate project folder of your choice, then go that directory
mkdir mlflow_example
cd mlflow_exampleYou will need Python, MLflow and other libraries. Open your terminal and run the below code.
pip install mlflow scikit-learn pandasOr you can clone the project and run pip install -r requirements.txt
Let data scientist create mlflow tracking server by writing code in backend/train.py folder
# backend/train.py
import mlflow
mlflow.set_tracking_uri('http://127.0.0.1:5000')
mlflow.set_experiment('employee_attrition_classification')
with mlflow.start_run(run_name = "employee_attrition_run) as run:
....
....
....
train.py run this command:
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 127.0.0.1 --port 5000This sets up a local MLflow server. Think of it as a dashboard for your ML work. Here you are calling mlflow server and setting up sqlite databasebackend-store-uri sqlite:///mlfow.db and storing model artifacts in ./mlruns folder. Then setting up your port to run at localhost:5000
http://127.0.0.1:5000/ you will see mlflow interface where you can view experiments and modelsEmployee attrition classification is used to predict whether employee stays or leave the company. Here we are using supervised logistic regression model to classify two classes that is 'stay' or 'left'.
mlflow-model/train.pySo let's go to project directory and create train.py file to train our model.
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature
import pandas as pd
import pickle
from utils import load_emp_attr_data
from model import train_model
from sklearn.preprocessing import StandardScalerThis is to track the model in MLflow Interface, we can also save model artifacts and log the metrics.
mlflow.set_tracking_uri('http://127.0.0.1:5000')
mlflow.set_experiment('employee_attrition_classification')
with mlflow.start_run(run_name='employee_attrition_run') as run:
X_train, X_test, y_train, y_test, ordinal_encoder = load_data()
# save column names for later use
column_names = X_train.columns.tolist()
# normalize the dataset
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_Scaled = scaler.transform(X_test)
# call model => model.py
model, accuracy = train_model(X_train_scaled, X_test_scaled, y_train, y_test)
# now log accuracy score in mlflow
mlflow.log_metric('accuracy', accuracy * 100)
signature = infer_signature(X_train_scaled, model.predict(X_train_scaled))
Here, infer-signature is to determine model-artifact input, output. Input means X_train and our output should be prediction model i.e., model.predict(X_train)
.... continue above code
#save model artifact
with open('scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
mlflow.log_artifact('scaler.pkl', artifact_path='preprocessor')
# save feature/column nmaes in artifact
with open('feature_names', 'wb') as f:
pickle.dump(column_names, f)
mlflow.log_artifact('feature_names.pkl', artifact_path='preprocessor')
# save ordinal-encoder in artifact
with open('ordinal_encoder', 'wb') as f:
pickle.dump(ordinal_encoder, f)
mlflow.log_artifact('ordinal_encoder.pkl', artifact_path='preprocessor')
... continue above code
model_info = mlflow.sklearn.log_mode(
sk_model = model,
artifact_path = 'employee_attrition_model',
signature = signature,
input_example = X_train_scaled,
registered_model_name = 'Employee Attrition Model'
)
print(f'Registered Model Name: {model_info.model_uri}')
print(f'Run ID: {run.info.run_id}')
print(f'Model Accuracy: {accuracy}')
print(f'Model registered as: {model_info.model_name}')
# end of mlflow run()mlflow-model/model.pyCreate logistic regression model in model.py file
# model.py file
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_ScoreIn metrics we are using accuracy_score to determine the accuracy of our model.
# model.py
...
def train_model(X_train, X_test, y_train, y_test):
lr = LogisticRegression(random_state = 42)
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
accuracy = accuracy_score(y_pred, y_test)
return lr, accuracyThe following Python script integrates MLflow to track the training process of an employee attrition classification model.
mlflow-model/utils.pyTo load the train and test dataset
import pandas as pd
from sklearn.preprocessing import OrdinalEncoder
from sklearn.model_selection import train_test_split
from pathlib import Path
BASE_PATH = Path(__file__).parent
TRAIN_DATA_PATH = BASE_PATH / 'data' / 'train.csv'
TEST_DATA_PATH = BASE_PATH / 'data' / 'test.csv'
def load_data():
trian_dataset = pd.read_csv(TRAIN_DATA_PATH)
test_dataset = pd.read_csv(TEST_DATA_PATH)
dataset = pd.concat([train_Dataset, test_dataset])
X = dataset.drop(['Employee ID', 'Attrition', 'Job Role', 'Distance from Home', 'Marital Status', 'Gender'], axis=1)
y = dataset['Attrition']
..... continue from above code
# pre-processing the dataset
columns_to_encode = ['Work-Life Balance', 'Job Satisfaction', 'Performance Rating', 'Education Level', 'Job Level', 'Company Size', 'Company Reputation', 'Employee Recognition']
categories = [
['Poor', 'Fair', 'Good', 'Excellent'], # Work-Life Balance
['Low', 'Medium', 'High', 'Very High'], # Job Satisfaction
['Low', 'Below Average', 'Average', 'High'], # Performance Rating
["High School", "Bachelor’s Degree", "Master’s Degree", "Associate Degree", "PhD"], # Education Level
['Entry', 'Mid', 'Senior'], # Job Level
['Small', 'Medium', 'Large'], # Company Size
['Poor', 'Fair', 'Good', 'Excellent'], # Company Reputation
['Low', 'Medium', 'High', 'Very High'], # Employee Recognition
]
oe = OrdinalEncoder(categories=categories)
X[columns_to_encode] = oe.fit_transform(X[columns_to_encode]).astype('int') ... continue above code
# binary encoding
binary_cols = ['Overtime', 'Remote Work', 'Leadership Opportunities', 'Innovation Opportunities']
for col in binary_cols:
X[col] = X[col].map({'No': 0, 'Yes': 1}) ... continue above code
# label encoding (for target or class values)
y = y.map({'Stayed': 0, 'Left': 1}) .. continue above code
# Feature Engg (optional)
X['Opportunities'] = X['Leadership Opportunities'] + X['Innovation Opportunities']
X = X.drop(columns=['Leadership Opportunities', 'Innovation Opportunities'])
## Feature Engg (Income Mapping)
def map_monthly_income(income):
if 1 <= income <= 10000:
return 0
elif 10001 <= income <= 20000:
return 1
elif 20001 <= income <= 50000:
return 2
elif 50001 <= income <=100000:
return 3
elif income >= 100001:
return 4
else:
return -1
X['Monthly Income'] = X['Monthly Income'].apply(map_monthly_income)
...continue above code
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return x_train, x_test, y_train, y_test, oe
# end load_data() functionYou have to keep MLflow running by using this command
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 127.0.0.1 --port 5000Then open new terminal, and execute the script run:
python train.pyOnce executed, open the MLflow UI by visiting http://127.0.0.1:5000. You will see:
For backend resource link - click here!
We have to call model service from frontend UI so that backend will generate response by using the model.
Now how do we use this model ?
For this, we have to load the registered model for inference at frontend:
import mlflow.sklearn
model = mlflow.sklearn.load_model("models:/Employee Attrition Model/latest")To understand this, follow next blog – Frontend setup of MLflow project.
16.4.2025 07:18Deploying a Machine Learning Model with Flask and MLflow locallyHere's the workflow, that explained with detailed real-world tasks and how each component fits in.
The Data scientist writes training code, usually in Python using Scikit-learn, PyTorch, TensorFlow etc. Adds MLflow tracking to log experiment.
import mlflow
with mlflow.start_run():
mlflow.log_param("lr", 0.01)
mlflow.log_metric("accuracy", 0.04)
mlflow.sklearn.log_model(model, "model")📌 DevOps Task:
MLProject or DockerfileThe MLflow server receives:
lr, batch_size)accuracy, loss)📌 DevOps Task:
model.pkl, model.onnx, etc.)confusion_matrix.png)feature_names.pkl etc.)standardscaler.pkl, ordinalencoder.pkl etc.)📌 DevOps Task:
None → Staging → Production → Archived📌 DevOps Task:
Production-ready📌 DevOps Task:
mlflow models serve📌 DevOps Task:
POST /predict
{
"data": [5.1, 3.5, 1.4, 0.2]
}
📌 DevOps Task:
You can set up the entire MLflow workflow using:
The frontend UI is implemented using React and for parsing markdown document used react-markdown and react-syntax-highlighter libraries.
Open new terminal from your code editor and go to frontend directory,
C:/Users/mlops/rag_chatbot_k8> cd frontend
C:/Users/mlops/rag_chatbot_k8/frontend>Install dependencies,
npm installRun the frontend code:
npm startFrontend code will be rendered like this,
You can start asking questions and see how it responds back by using Kubernetes documents as a knowledge base.
You can also check the source of answer from where does the DocuMnacer is answering from.
Retrieval-Augmented Generation (RAG) revolutionizes how large language models (LLMs) interact with private, domain-specific knowledge.
Domain-specific usage enables minimal hallucinations by grounding answers in your own knowledge base, such as markdown docs, PDFs, or internal databases.
Unlike model training, which is resource-heavy and time-consuming, RAG systems are fast and efficient, meaning no re-training needed. Simply retrieve relevant documents in real time and augment the prompt.
Easily updatable by keeping your system current by just updating the source documents (e.g., .md files) meaning no need to retrain the model whenever your data changes.
Whether you're building a chatbot for support, dev docs, HR policies, or any internal knowledge base — RAG gives your GPT the brain of your business.
15.4.2025 11:41Frontend SetupAs DevOps engineer your job is to streamline development to production workflows, ensure reliability and enable automation. Here's how MLflow aligns perfectly with DevOps principles and how you can get hands-on with it.
MLflow lets you track all your ML experiments – similar to how you track application versions with Git. You can track:
params like hyperparamaters, batch size etc.metrics like accuracy, loss etc..artifacts like model, logs, preprocessor files etc..code versions like Git SHA or script snapshotsUse-Case: Create a GitOps like setup where every ML experiment is logged, versioned, and reproducible – giving full audit trails
mlflow.log_params("optimizer", "adam")
mlflow.log_metric("accuracy", 0.86)Now, you know which model version is best to promote to staging or production.
MLflow's Model Registry is like CI/CD environment for models.
employee-attrition-v1staging, production and archievedUse-case: Integrate with your existing CI/CD tools (e.g: GitHub Actions, GitLab CI, Jenkins) to automate promotion of models to production once metrics hit the benchmark.
mlflow models register -m runs:/<run-id>/model -n EmployeeAttiritonModelMLflow can serve any registered model as a REST API — no extra dev work needed.
mlflow models serve -m runs:/<run-id>/model --port 1234Use case for DevOps: You can deploy models to staging or production environments using Docker, Kubernetes, or even serverless tools like AWS Lambda. Then, build monitoring and alerting (e.g., using Prometheus and Grafana) around the served APIs.
DevOps is all about observability. MLflow provides:
Use case for DevOps: You can set up dashboards to visualize drift, inference time, model accuracy, and other SLAs — just like application health metrics.
Bonus: Artifacts like feature_importance.json or model_latency.csv can be logged and visualized too.
You can define an MLproject file to standardize ML pipelines across environments. It includes:
name: churn-prediction
conda_env: conda.yaml
entry_points:
main:
parameters:
lr: {type: float, default: 0.01}
command: "python train.py --lr {lr}"
Use case for DevOps: You can trigger MLflow Projects as jobs in CI/CD pipelines — with guaranteed reproducibility.
You can deploy a self-hosted MLflow tracking server on Kubernetes:
Then integrate with:
MLflow isn’t just for data scientists — it’s a DevOps goldmine!
Set it up as a central platform for all ML efforts, bring it under infrastructure-as-code, monitor the APIs like any other microservice, and bring ML and DevOps together like never before. 🚀
To setup query backend code, make sure to install following dependencies.
Install required libraries, here is the requirements.txt file,
langchain-openai
langchain_community
langchain_core
faiss-cpu
markdown
tiktoken
fastapi
pydantic
uvicorn
python-dotenv
scikit-learn
requestsInside project directory, go to venv/Scripts/activate for windows user
C:/project-directory/> venv/Scripts/activate
(venv)C:/project-directory/>cd main_backend
(venv)C:/project-directory/main_backend>Then install libraries
pip install -r requirements.txtImport libraries in app.py file,
import os
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import AzureChatOpenAI
from langchain_community.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.memory import ConversationBufferMemory
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
import tiktoken
import glob
The query backend service will handles the user query and orchestrates the RAG pipeline.
/query endpoint will accept and sends query to vector-store service (e.g.: http://localhost:8001/search ) that performs similarity search in a vector database (FAISS). This is the "retrieval" part in the pipeline,
@app.post("/query")
async def query_rag(request: QueryRequest):
try:
query = request.query
# search the docs based on query in vector-db
search_response = requests.post(f"{VECTOR_DB_URL}/search", json={"query": query})
if search_response.status_code != 200:
raise HTTPException(status_code=502, detail=f"Vector DB Error: {search_response.text}")
search_results = search_response.json()
context = "\n\n".join(doc['content'] for doc in search_results)
.....The prompt template will combine the query, context (relevant documents), chat history and instructions of how AI should behave. This is the "augmentation" part in the pipeline.
.....
prompt_template = ChatPromptTemplate.from_template(
"""
You are a helpful AI assistant that explains concepts to beginners with examples and code.
Use the provided context and chat history to answer the question. Avoid spelling mistakes.
If the context does NOT help answer the question, clearly mention that it's "out of context" and prefix your answer with a 🌟 emoji.
Chat History: {chat_history}
Context: {context}
Question: {question}
Answer:
"""
)
....Using LangChain's runnable chain, it prepares inputs, renders the prompt, sends to Azure LLM and extracts the raw text answer. This is the "generation" part in the pipeline.
....
chain = (
{
"context": lambda x: context,
"question": RunnablePassthrough(),
"chat_history": lambda x: memory.load_memory_variables({"question": x})['chat_history']
}
| prompt_template
| llm
| StrOutputParser()
)
To invoke the chain and store the current question and answer into memory.
result = chain.invoke(query)
memory.save_context({"question": query}, {"answer": result})Returns both generated answer and list of document sources.
return {
"answer": result,
"sources": [
doc['metadata']['source']
for doc in search_results if 'metadata' in doc and 'source' in doc['metadata']
],
}The main backend starts from here, lets say host is localhost and port as 8000 for local development.
if __name__ == "__main__":
uvicorn.run(app, host=HOST, port=PORT)To run the code,
C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd main_backend
(venv)C:/rag_chatbot_k8/main_backend>python main.py
# or
(venv)C:/rag_chatbot_k8/main_backend>uv run main.pyNow it is running in service http://localhost:8000
FAISS (Facebook AI Similarity Search) is used as the vector store in our project. It stores and searches embeddings efficiently and supports high-performance similarity search, enabling fast and accurate retrieval of the most relevant document chunks based on the user's query.
Install required libraries here is the requirements.txt file,
langchain-openai
langchain_community
langchain_core
faiss-cpu
markdown
tiktoken
fastapi
pydantic
uvicorn
python-dotenv
scikit-learn
requestsInside project directory, go to venv/Scripts/activate for windows user
C:/project-directory/> venv/Scripts/activate
(venv)C:/project-directory/>cd vector_store
(venv)C:/project-directory/vector_store>Then install libraries
pip install -r requirements.txtImport libraries in index.py file,
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import os
from typing import List
from langchain_community.vectorstores import FAISS
from langchain_openai import AzureOpenAIEmbeddings
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain.schema import Document
from faiss import IndexFlatL2
from dotenv import load_dotenvLoad environmental variables (like Azure credentials and host/port),
load_dotenv()
# Constants
VECTOR_STORE_PATH = "vector_store"
INDEX_NAME = "index"
EMBEDDING_DIM = 1536 # for Azure text-embedding-ada-002, adjust if needed
PORT = int(os.environ["PORT"])
HOST = os.environ["HOST"]Setup fastapi, cors and Azure embedding model for converting query and searching vectors in database.
# Fast API Setup
app = FastAPI()
# enable cors
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Azure embedding model
embedding_model = AzureOpenAIEmbeddings(
azure_endpoint=os.environ["AZURE_ENDPOINT"],
deployment=os.environ["AZURE_EMBEDDING_DEPLOYMENT"],
api_key=os.environ["AZURE_API_KEY"],
openai_api_version=os.environ["AZURE_EMBEDDING_VERSION"]
)
Initialize vector store, if a FAISS index file exists, it loads it. If not, it creates new IndexFlatL2 index which is standard L2 norm vector index.
def load_vector_store():
if os.path.exists(os.path.join(VECTOR_STORE_PATH, f"{INDEX_NAME}.faiss")):
return FAISS.load_local(VECTOR_STORE_PATH, embedding_model, index_name=INDEX_NAME, allow_dangerous_deserialization=True)
else:
index = IndexFlatL2(EMBEDDING_DIM)
return FAISS(embedding_model, index, InMemoryDocstore({}), {})
vector_store = load_vector_store()/store endpoint,
Document objects.vector_store.add_texts(...) to embed and store them. save_local.@app.post("/store")
async def store_embeddings(data: List[EmbeddingItem]):
try:
docs = [
Document(
page_content=item.content,
metadata=item.metadata
)
for item in data
]
# add to vector db
vector_store.add_texts(
texts=[doc.page_content for doc in docs],
metadatas=[doc.metadata for doc in docs]
)
# Persist store
vector_store.save_local(VECTOR_STORE_PATH, index_name=INDEX_NAME)
return {"status": "success", "stored": len(data)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))/search endpoint,
content and metadata of the top matches.@app.post("/search")
async def search_query(request: QueryRequest):
try:
docs_and_scores = vector_store.similarity_search(
query=request.query,
k=request.top_k
)
result = [{
"content": doc.page_content,
"metadata": doc.metadata
}
for doc in docs_and_scores
]
return result
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))The vector store backend starts from the main, lets say host is localhost and port as 8001 for local development.
if __name__ == "__main__":
uvicorn.run(app, host=HOST, port=PORT)To run the code,
C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd vector_store
(venv)C:/rag_chatbot_k8/vector_store>python index.pyNow it is running in service http://localhost:8000
This is batch processing part where it automates syncing Kubernetes documentation by cloning or copying and preparing it for text embeddings.
Make sure to install required libraries and following is requiremnts.txt file.
langchain-openai
langchain_community
langchain_core
fastapi
uvicorn
python-dotenv
scikit-learn
requestsGo to the project directory and install libraries,
(venv)C:/<project-directory>/rag_chatbot>pip install -r requirements.txtImport the following libraries,
import subprocess
import shutil
from pathlib import Path
import os
import requests
import glob
import time
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import AzureOpenAIEmbeddings
from dotenv import load_dotenv
import json
import hashlibDeclare constants,
HASH_DB_PATH = Path("hash_files.json")
REPO_URL = os.environ["K8_URL"]
VECTOR_DB_URL = os.environ["VECTOR_DB_URL"]
# Constants
TEMP_DIR = Path(os.path.abspath("./temp-docs"))
TARGET_DIR = Path(os.path.abspath("./k8_docs/en"))
# Batch configuration
EMBEDDING_BATCH_SIZE = 100 # Reduced batch size for embeddings
STORE_BATCH_SIZE = 100 # Batch size for vector store uploads
BATCH_DELAY = 2 # Delay between batches in seconds
RATE_LIMIT_DELAY = 60 # Delay when hitting rate limits in seconds
Start with Git operations,
clone_or_pull_repo() : Clones the Kubernetes docs repo if not present. Otherwise, pulls the latest changes.def clone_or_pull_repo():
if not TEMP_DIR.exists():
print("✅ Cloning Kubernetes docs repo...")
subprocess.run(["git", "clone", REPO_URL, str(TEMP_DIR)], check=True)
else:
print("✅ Pulling latest changes...")
subprocess.run(["git", "-C", str(TEMP_DIR), "pull"], check=True)copy_docs() : It copies selected subdirectories from the repo to temporary directory temp_dir/content/en/docs. Then copies the temp_dir/ of .md files into the target folder k8_docs/.def copy_docs():
base_dir = TEMP_DIR / "content" / "en" / "docs"
selected_subdirs = ["concepts"]
for subdir in selected_subdirs:
source_subdir = base_dir / subdir
if not source_subdir.exists():
print(f"⚠️ Source directory does not exist: {source_subdir}")
continue
# Create the target subdirectory
target_subdir = TARGET_DIR / subdir
target_subdir.mkdir(parents=True, exist_ok=True)
# Copy all markdown files with their directory structure
for file in source_subdir.glob("**/*.md"):
relative_path = file.relative_to(source_subdir)
dest_file = target_subdir / relative_path
dest_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(file, dest_file)
print(f"📄 Copied: {file} -> {dest_file}")
Load markdown files, load_md_files() reads copied .md files and stores them in a list.
def load_md_files():
md_files = []
try:
search_path = os.path.join(TARGET_DIR, "concepts", "**", "*.md")
for filepath in glob.glob(search_path, recursive=True):
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
md_files.append({
'filename': os.path.basename(filepath),
'content': text,
})
except Exception as e:
print(f"Error reading files: {str(e)}")
return md_filesText splitting (chunking) the documents,
call_text_splitter(): Use's LangChain's RecursiveCharacterTextSplitter to chunk large text files into smaller pieces and wraps each chunk in a Document object with metadata.def call_text_splitter(md_docs):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=50)
documents = []
try:
for doc in md_docs:
split_texts = text_splitter.split_text(doc['content'])
for i, chunk in enumerate(split_texts):
document = Document(
page_content=chunk,
metadata={
'source': f"{doc['filename']}-{i}"
}
)
documents.append(document)
except Exception as e:
print(f"Error in text splitting: {e}")
raiseHashing is used to prevent re-processing unchanged files. We don't have to process text embedding for the old files when new files were found in Kubernetes repo.
def get_file_hash(content):
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def load_existing_hashes():
if HASH_DB_PATH.exists():
with open(HASH_DB_PATH, "r") as f:
return json.load(f)
return {}
def save_hashes(hashes):
with open(HASH_DB_PATH, "w") as f:
json.dump(hashes, f, indent=2)Embedding in batch process,
process_and_store_batch: It converts each document chunk into embeddings using Azure's OpenAI GPT-4o.def process_and_store_batch(batch_documents):
contents = []
for doc in batch_documents:
try:
contents.append(doc.page_content)
except Exception as e:
print(f"Error accessing document content: {e}")
continue
if not contents:
return []
try:
embeddings = embedding_model.embed_documents(contents)
payload = []
for doc, embedding in zip(batch_documents, embeddings):
try:
payload.append({
"embedding": embedding,
"metadata": doc.metadata,
"content": doc.page_content
})
except Exception as e:
print(f"Error creating payload item: {e}")
continue
return payload
except Exception as e:
print(f"Error generating embeddings: {e}")
if "429" in str(e):
print(f"Rate limit hit, waiting {RATE_LIMIT_DELAY} seconds...")
time.sleep(RATE_LIMIT_DELAY)
return []store_embeddings_batch(payload_batch): Sends the payload to the vector store using a POST request. def store_embeddings_batch(payload_batch):
try:
response = session.post(f"{VECTOR_DB_URL}/store", json=payload_batch)
response.raise_for_status()
return True
except Exception as e:
if "429" in str(e):
print(f"Rate limit hit, waiting {RATE_LIMIT_DELAY} seconds...")
time.sleep(RATE_LIMIT_DELAY)
return Falsererun_embeddings() : This is the main embedding logic which runs the above functions.if __name__ == "__main__": try:
clone_or_pull_repo()
copy_docs()
rerun_embeddings()
print("✅ Successfully stored embeddings... ")
except Exception as e:
print(f"❌ Error: {e}")To run the code,
C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd sync_backend
(venv)C:/rag_chatbot_k8/sync_backend>python index.py15.4.2025 08:49Sync Backend Setup for Batch ProcessingTo run MLflow locally, you don't need much – it is lightweight by default. But here's the breakdown of resource requirements:
Ideally Python 3.8 and higher. Create a virtual environment to keep it clean.
pip install mlflowOptionally, you can install these libraries for data science
pip install scikit-learn pandas numpyA typical local dev machine to run MLflow smoothly:
Once installed:
mlflow uiThis will start MLflow tracking server at http://localhost:5000 and store runs under the local ./mlruns directory.
If you want more production-like setup:
mlflow models serveUp next you can see MLflow workflow and how we can deploy ML model using it.
15.4.2025 08:47Prerequisites to run Mlflow locally on your machine









