lade...

Blog.techiescamp.com

Blog.techiescamp.com

an avatar

a logo

Techiescamp - Learn Kubernetes | DevOps | MLOps | System Design

Techiescamp blog covers in-depth guides on Kubernetes, DevOps, DevSecOps, Cloud computing, Infrastructure Automation, CI/CD, SaaS Tools, ...

🌐 Visit Blog.techiescamp.com 🌐 Blog.techiescamp.com besuchen

Write rieview✍️ Rezension schreiben✍️ Get Badge!🏷️ Abzeichen holen!🏷️ Edit entry⚙️ Eintrag bearbeiten⚙️ News📰 Neuigkeiten📰

Write review

Tags: automation computing devsecops infrastructure kubernetes techiescamp

Blog.techiescamp.com hosts 1 (1) users Blog.techiescamp.com beherbergt 1 (1) Benutzer insgesamt (powered by Ghost)

Server location (146.75.119.7):Serverstandort (146.75.119.7 ):60313 Frankfurt am Main, DE Germany, EU Europe, latitude: 50.1169, longitude: 8.6837

Rieviews

Bewertungen

not yet rated noch nicht bewertet 0%

Be the first one
and write a rieview
about blog.techiescamp.com.
Sein Sie der erste
und schreiben Sie eine Rezension
über blog.techiescamp.com.

Blog.techiescamp.com News

Unable to Resolve DNS in Kubeadm on AWS EC2 Instance

https://blog.techiescamp.com/doc...

When running a Kubeadm cluster on AWS EC2 instances, if the nodes are spread across multiple subnets, you may encounter a complete DNS resolution failure.

All nslookup or curl requests inside pods time out with connection timed out; no servers could be reached even though the CoreDNS pods are running and healthy.

The Error

Running nslookup from any pod returns a connection timeout:

kubectl exec -it dnsutils -- nslookup kubernetes.default
;; connection timed out; no servers could be reached
command terminated with exit code 1

This happens even when CoreDNS pods are fully running and the CoreDNS service exists:

kubectl get pods -n kube-system -l k8s-app=kube-dns

NAME                       READY   STATUS    RESTARTS   AGE
coredns-66bc5c9577-qxqsb   1/1     Running   0          19m
coredns-66bc5c9577-x5c65   1/1     Running   0          19m

Root Cause

Calico is the networking plugin that moves data between pods. It was set to a mode called VXLANCrossSubnet, which is supposed to wrap packets in a tunnel when pods talk across different network zones, but when nodes are on the same network, it skips the tunnel and sends packets directly.

Calico in VXLANCrossSubnet mode decides whether to use a tunnel or send traffic directly by comparing the subnet information stored on each Node resource, which comes from manual configuration or autodetection when Calico's node starts.

If that subnet information is wrong, Calico can incorrectly conclude that two nodes are on the same subnet, skip the tunnel, and send packets using raw pod IP addresses like (10.244.x.x) directly over AWS.

AWS has a built-in security feature called Source/Destination Check that inspects every packet leaving an EC2 instance.

AWS only knows about real EC2 node IPs (172.31.x.x).When it saw packets using pod IPs (10.244.x.x) as the source, it treated them as spoofed traffic and silently dropped every packet with no error or log message.

How to Fix It

Two methods are available to solve this issue. Both address the same problem but at different layers.

Method 1 resolves it at the Calico level, while Method 2 fixes it at the AWS infrastructure level. Either method works.

In our case, we used Method 2, which resolves the issue at the AWS level.

Disable AWS Source/Destination Check

Disabling Source/Destination Check tells AWS to stop inspecting packet IPs on EC2 instances. Raw pod packets pass freely without any changes to Calico.

Apply the following steps to each EC2 instance (control plane, node01, and node02).

Using AWS Console:

Open the EC2 Console and select the required instance

Click Actions toggle button as shown below.

Then, go to Networking and choose Change source/destination check

There enable Stop, then click Save to apply the change

Disabling Source/Destination Check takes effect immediately. No Kubernetes or Calico restart is required.

Using AWS CLI:

First, get all instance IDs.

aws ec2 describe-instances \
  --query 'Reservations[].Instances[].[InstanceId,PrivateIpAddress]' \
  --output table

Then, run the following command to disable it on each node:

aws ec2 modify-instance-attribute \
  --instance-id <instance-id> \
  --no-source-dest-check

26.2.2026 09:15Unable to Resolve DNS in Kubeadm on AWS EC2 Instance
https://blog.techiescamp.com/doc...

UI setup of MLflow project

https://blog.techiescamp.com/doc...

For frontend setup, used a flask framework to render UI and flask server as backend.

Install flask if not installed already, use command pip install flask

Folder Structure

Your project folder should look like this:

frontend/
│ 
|── static/
    |── script.js    # to process form data
    |── style.css
├── templates/
│   └── index.html   # Frontend UI template
│
├── app.py           # Backend Flask server

Setup Flask as Backend

First, make sure you have Flask installed. You can install it using pip:

pip install flask flask-cors pandas mlflow

or clone the project and run command pip install -r requirements.txt to install dependencies.

i. Import libraries

from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import pandas as pd
import pickle

import mlflow
import mlflow.pyfunc

ii. Create Flask app

app = Flask(__name__)
CORS(app)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/predict', methods=['POST'])
def predict():
  ... will continue

if __name__ == "__main__":
    app.run(host='127.0.0.1', port=8000, debug=True)

iii. Setup MLflow tracking and install artifacts

# mlflow
mlflow.set_tracking_uri("http://127.0.0.1:5000")

model_name = "Employee Attrition Model"
model_version = "3"
run_id = "bdda2dfd55454b9694bef6653ebbbe64"
model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version}")

# download artifacts
scaler_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/preprocessor/scaler.pkl")
with open(scaler_path, "rb") as f:
    scaler = pickle.load(f)

feature_names_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/preprocessor/feature_names.pkl")
with open(feature_names_path, "rb") as f:
    feature_names = pickle.load(f)

ordinal_encoder_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/preprocessor/ordinal_encoder.pkl")
with open(ordinal_encoder_path, "rb") as f:
    ordinal_encoder = pickle.load(f)

iv. Continue with predict()

 ... continue with predict() as mentioned in step-2
 
@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    if isinstance(data, dict):
        input_data = pd.DataFrame([data])
    else:
        input_data = pd.DataFrame(data)
    print('data', input_data)

    df = preprocessing_input(input_data)
    print('df: ', df)

    try:
        prediction = model.predict(df)
        print('predict: ', prediction)

        result = "Left" if prediction[0] == 1 else "Stayed"
        print('result: ', result)
        return jsonify({"prediction": result})
    except Exception as e:
        return jsonify({
            "error": str(e)
        }), 400

Here, we get json_data from frontend, and check if its compatible with dataframe, if not we'll create one.

Then we'll preprocess the input data by calling preprocessing_input() function which we'll see next.

Lastly predict the input with model, here model is the one we load using mlflow model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_version}")

v. Preprocessing function

def preprocessing_input(input):
    # ordinal encoding
    cols_to_encode = ['Work-Life Balance', 'Job Satisfaction', 'Performance Rating', 'Education Level', 'Job Level', 'Company Size', 'Company Reputation', 'Employee Recognition']
    input[cols_to_encode] = ordinal_encoder.transform(input[cols_to_encode]).astype('int')

    # binary encoding
    binary_cols = ['Overtime', 'Remote Work', 'Opportunities']
    for col in binary_cols:
        input[col] = input[col].map({'No': 0, 'Yes': 1})
    
    # feature engg
    def map_monthly_income(income):
        if 1 <= income <= 10000:
            return 0
        elif 10001 <= income <= 20000:
            return 1
        elif 20001 <= income <= 50000:
            return 2
        elif 50001 <= income <=100000:
            return 3
        elif income >= 100001:
            return 4
        else:
            return -1
    input['Monthly Income'] = input['Monthly Income'].apply(map_monthly_income)

    # ensure correct column order
    input = input[feature_names]
    print(input)

    # scale the data
    input_scaled = scaler.transform(input)
    return input_scaled

We have used encoding and feature engineering when training the model. So our model expects the same thing for new request data! We have to preprocess the new request data as we did in training model by following the same method.

Run the frontend code

python app.py

or you can run flask

flask run

Below is the UI will render

UI of employee-attriition

What’s Happening Behind the Scenes?

  1. Frontend sends data to /predict via a POST request.
  2. Backend receives JSON, converts it to a Pandas DataFrame.
  3. Preprocessing is applied using loaded MLflow artifacts (ordinal encoder, scaler, etc.).
  4. The model makes a prediction using MLflow’s pyfunc interface.
  5. The response is sent back as JSON with the prediction: Stayed or Left.

Conclusion

MLflow makes tracking models, managing artifacts, and deploying reproducible pipelines effortless. Using Flask and MLflow together provides a flexible, fast development environment for machine learning web apps.

Try using MLflow in your machine learning projects to improve your workflow and reproducibility!

16.4.2025 09:06UI setup of MLflow project
https://blog.techiescamp.com/doc...

Deploying a Machine Learning Model with Flask and MLflow locally

https://blog.techiescamp.com/doc...

Let's learn the deploying a machine learning model using Flask as both the frontend and backend, and leveraging MLflow to manage the model and preprocessing artifacts. This setup allows for a lightweight, reproducible ML deployment environment.

If you see the project structure, it is divided into backend and frontend folder for understanding.

Setup MLflow Locally

Create project folder of your choice, then go that directory

mkdir mlflow_example
cd mlflow_example
  1. Install MLflow

You will need Python, MLflow and other libraries. Open your terminal and run the below code.

pip install mlflow scikit-learn pandas

Or you can clone the project and run pip install -r requirements.txt

  1. Start the MLflow tracking server

Let data scientist create mlflow tracking server by writing code in backend/train.py folder

# backend/train.py

import mlflow

mlflow.set_tracking_uri('http://127.0.0.1:5000')
mlflow.set_experiment('employee_attrition_classification')

with mlflow.start_run(run_name = "employee_attrition_run) as run:
  .... 
  ....
  ....

train.py run this command:

mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 127.0.0.1 --port 5000

This sets up a local MLflow server. Think of it as a dashboard for your ML work. Here you are calling mlflow server and setting up sqlite databasebackend-store-uri sqlite:///mlfow.db and storing model artifacts in ./mlruns folder. Then setting up your port to run at localhost:5000

  1. Open your browser and go to http://127.0.0.1:5000/ you will see mlflow interface where you can view experiments and models
MLflow UI

Model Setup

Employee attrition classification is used to predict whether employee stays or leave the company. Here we are using supervised logistic regression model to classify two classes that is 'stay' or 'left'.

project folder structure

i. mlflow-model/train.py

So let's go to project directory and create train.py file to train our model.

  1. Install dependencies
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature
import pandas as pd
import pickle
from utils import load_emp_attr_data
from model import train_model
from sklearn.preprocessing import StandardScaler
  1. Continue with MLflow tracking server from above

This is to track the model in MLflow Interface, we can also save model artifacts and log the metrics.

mlflow.set_tracking_uri('http://127.0.0.1:5000')
mlflow.set_experiment('employee_attrition_classification')

with mlflow.start_run(run_name='employee_attrition_run') as run:
  X_train, X_test, y_train, y_test, ordinal_encoder = load_data()

  # save column names for later use
  column_names = X_train.columns.tolist()

  # normalize the dataset
  scaler = StandardScaler()
  X_train_scaled = scaler.fit_transform(X_train)
  X_test_Scaled = scaler.transform(X_test)

  # call model => model.py
  model, accuracy = train_model(X_train_scaled, X_test_scaled, y_train, y_test)

  # now log accuracy score in mlflow
  mlflow.log_metric('accuracy', accuracy * 100)
  signature = infer_signature(X_train_scaled, model.predict(X_train_scaled))

Here, infer-signature is to determine model-artifact input, output. Input means X_train and our output should be prediction model i.e., model.predict(X_train)

  1. Save model Artifacts
  .... continue above code
  #save model artifact
  with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)
  mlflow.log_artifact('scaler.pkl', artifact_path='preprocessor')

  # save feature/column nmaes in artifact
  with open('feature_names', 'wb') as f:
    pickle.dump(column_names, f)
  mlflow.log_artifact('feature_names.pkl', artifact_path='preprocessor')

  # save ordinal-encoder in artifact
  with open('ordinal_encoder', 'wb') as f:
    pickle.dump(ordinal_encoder, f)
  mlflow.log_artifact('ordinal_encoder.pkl', artifact_path='preprocessor')
  1. Create custom model by configuring model information
  ... continue above code
  model_info = mlflow.sklearn.log_mode(
    sk_model = model,
    artifact_path = 'employee_attrition_model',
    signature = signature,
    input_example = X_train_scaled,
    registered_model_name = 'Employee Attrition Model'
  )

  print(f'Registered Model Name: {model_info.model_uri}')
  print(f'Run ID: {run.info.run_id}')
  print(f'Model Accuracy: {accuracy}')
  print(f'Model registered as: {model_info.model_name}')

  # end of mlflow run()

ii. mlflow-model/model.py

Create logistic regression model in model.py file

# model.py file
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_Score

In metrics we are using accuracy_score to determine the accuracy of our model.

# model.py 
...

def train_model(X_train, X_test, y_train, y_test):
  lr = LogisticRegression(random_state = 42)
  lr.fit(X_train, y_train)

  y_pred = lr.predict(X_test)

  accuracy = accuracy_score(y_pred, y_test)
  return lr, accuracy

The following Python script integrates MLflow to track the training process of an employee attrition classification model.

iii. mlflow-model/utils.py

To load the train and test dataset

import pandas as pd
from sklearn.preprocessing import OrdinalEncoder
from sklearn.model_selection import train_test_split
from pathlib import Path

BASE_PATH = Path(__file__).parent
TRAIN_DATA_PATH = BASE_PATH / 'data' / 'train.csv'
TEST_DATA_PATH = BASE_PATH / 'data' / 'test.csv'

def load_data():
  trian_dataset = pd.read_csv(TRAIN_DATA_PATH)
  test_dataset = pd.read_csv(TEST_DATA_PATH)
  dataset = pd.concat([train_Dataset, test_dataset])

  X = dataset.drop(['Employee ID', 'Attrition', 'Job Role', 'Distance from Home', 'Marital Status', 'Gender'], axis=1)

  y = dataset['Attrition']
  1. Preprocess the data: For this we are using ordinal encoder, which is used to transform categorical data into numerical values, specifically when the categorical variable has an inherent order or ranking.
  ..... continue from above code
  
  # pre-processing the dataset
  columns_to_encode = ['Work-Life Balance', 'Job Satisfaction', 'Performance Rating', 'Education Level', 'Job Level', 'Company Size', 'Company Reputation', 'Employee Recognition']

  categories = [
        ['Poor', 'Fair', 'Good', 'Excellent'], # Work-Life Balance
        ['Low', 'Medium', 'High', 'Very High'], # Job Satisfaction
        ['Low', 'Below Average', 'Average', 'High'], # Performance Rating
        ["High School", "Bachelor’s Degree", "Master’s Degree", "Associate Degree", "PhD"], # Education Level
        ['Entry', 'Mid', 'Senior'], # Job Level
        ['Small', 'Medium', 'Large'], # Company Size
        ['Poor', 'Fair', 'Good', 'Excellent'], # Company Reputation
        ['Low', 'Medium', 'High', 'Very High'], # Employee Recognition
    ]
    oe = OrdinalEncoder(categories=categories)
    X[columns_to_encode] = oe.fit_transform(X[columns_to_encode]).astype('int')
 ... continue above code
 # binary encoding
    binary_cols = ['Overtime', 'Remote Work', 'Leadership Opportunities', 'Innovation Opportunities']
    for col in binary_cols:
        X[col] = X[col].map({'No': 0, 'Yes': 1})
  ... continue above code
  # label encoding (for target or class values)
  y = y.map({'Stayed': 0, 'Left': 1})
  1. Feature Engineering
  .. continue above code
  # Feature Engg (optional)
    X['Opportunities'] = X['Leadership Opportunities'] + X['Innovation Opportunities']
    X = X.drop(columns=['Leadership Opportunities', 'Innovation Opportunities'])

    ## Feature Engg (Income Mapping)
    def map_monthly_income(income):
        if 1 <= income <= 10000:
            return 0
        elif 10001 <= income <= 20000:
            return 1
        elif 20001 <= income <= 50000:
            return 2
        elif 50001 <= income <=100000:
            return 3
        elif income >= 100001:
            return 4
        else:
            return -1
    X['Monthly Income'] = X['Monthly Income'].apply(map_monthly_income)

  1. Split the dataset to train and test
  ...continue above code
  x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

  return x_train, x_test, y_train, y_test, oe

  # end load_data() function

Running the MLflow Experiment

You have to keep MLflow running by using this command

mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 127.0.0.1 --port 5000

Then open new terminal, and execute the script run:

python train.py

Once executed, open the MLflow UI by visiting http://127.0.0.1:5000. You will see:

mlflow model artifact image
For backend resource link - click here!

💡 Bonus: Loading the Trained Model

We have to call model service from frontend UI so that backend will generate response by using the model.

Now how do we use this model ?

For this, we have to load the registered model for inference at frontend:

import mlflow.sklearn
model = mlflow.sklearn.load_model("models:/Employee Attrition Model/latest")

To understand this, follow next blog – Frontend setup of MLflow project.

16.4.2025 07:18Deploying a Machine Learning Model with Flask and MLflow locally
https://blog.techiescamp.com/doc...

MLflow workflow

https://blog.techiescamp.com/doc...

Here's the workflow, that explained with detailed real-world tasks and how each component fits in.

Step-1: Data Scientist Trains the Model

The Data scientist writes training code, usually in Python using Scikit-learn, PyTorch, TensorFlow etc. Adds MLflow tracking to log experiment.

import mlflow

with mlflow.start_run():
  mlflow.log_param("lr", 0.01)
  mlflow.log_metric("accuracy", 0.04)
  mlflow.sklearn.log_model(model, "model")

📌 DevOps Task:


Step-2: MLflow Tracking Server Logs Everything

The MLflow server receives:

📌 DevOps Task:


Step-3: Artifact Store Saves Model Outputs

📌 DevOps Task:


Step-4: MLflow Model Registry Registers Models

📌 DevOps Task:


Step-5: CI/CD Pipeline Promotes & Deploys Models

📌 DevOps Task:


Step-6: Model Served as REST API

📌 DevOps Task:


Step-7: Expose to App for Inference

POST /predict
{
  "data": [5.1, 3.5, 1.4, 0.2]
}

📌 DevOps Task:


💡 Bonus: Automate End-to-End

You can set up the entire MLflow workflow using:

15.4.2025 12:45MLflow workflow
https://blog.techiescamp.com/doc...

Frontend Setup

https://blog.techiescamp.com/doc...

The frontend UI is implemented using React and for parsing markdown document used react-markdown and react-syntax-highlighter libraries.

Open new terminal from your code editor and go to frontend directory,

C:/Users/mlops/rag_chatbot_k8> cd frontend
C:/Users/mlops/rag_chatbot_k8/frontend>

Install dependencies,

npm install

Run the frontend code:

npm start

Frontend code will be rendered like this,

Frontend UI - DocuMancer AI

You can start asking questions and see how it responds back by using Kubernetes documents as a knowledge base.

You can also check the source of answer from where does the DocuMnacer is answering from.

Conclusion

Retrieval-Augmented Generation (RAG) revolutionizes how large language models (LLMs) interact with private, domain-specific knowledge.

Domain-specific usage enables minimal hallucinations by grounding answers in your own knowledge base, such as markdown docs, PDFs, or internal databases.

Unlike model training, which is resource-heavy and time-consuming, RAG systems are fast and efficient, meaning no re-training needed. Simply retrieve relevant documents in real time and augment the prompt.

Easily updatable by keeping your system current by just updating the source documents (e.g., .md files) meaning no need to retrain the model whenever your data changes.

Whether you're building a chatbot for support, dev docs, HR policies, or any internal knowledge base — RAG gives your GPT the brain of your business.

15.4.2025 11:41Frontend Setup
https://blog.techiescamp.com/doc...

How DevOps Folks Can Use MLflow

https://blog.techiescamp.com/doc...

As DevOps engineer your job is to streamline development to production workflows, ensure reliability and enable automation. Here's how MLflow aligns perfectly with DevOps principles and how you can get hands-on with it.

i. Tracking and Versioning Like GitOps

MLflow lets you track all your ML experiments – similar to how you track application versions with Git. You can track:

Use-Case: Create a GitOps like setup where every ML experiment is logged, versioned, and reproducible – giving full audit trails

mlflow.log_params("optimizer", "adam")
mlflow.log_metric("accuracy", 0.86)

Now, you know which model version is best to promote to staging or production.

ii. Model Lifecycle Managements (CI/CD for models)

MLflow's Model Registry is like CI/CD environment for models.

Use-case: Integrate with your existing CI/CD tools (e.g: GitHub Actions, GitLab CI, Jenkins) to automate promotion of models to production once metrics hit the benchmark.

mlflow models register -m runs:/<run-id>/model -n EmployeeAttiritonModel

iii. Serving Models as APIs for Inference

MLflow can serve any registered model as a REST API — no extra dev work needed.

mlflow models serve -m runs:/<run-id>/model --port 1234

Use case for DevOps: You can deploy models to staging or production environments using Docker, Kubernetes, or even serverless tools like AWS Lambda. Then, build monitoring and alerting (e.g., using Prometheus and Grafana) around the served APIs.

iv. Observability: Metrics, Logs, Dashboards

DevOps is all about observability. MLflow provides:

Use case for DevOps: You can set up dashboards to visualize drift, inference time, model accuracy, and other SLAs — just like application health metrics.

Bonus: Artifacts like feature_importance.json or model_latency.csv can be logged and visualized too.

v. Reusable Pipelines via MLflow Projects

You can define an MLproject file to standardize ML pipelines across environments. It includes:

name: churn-prediction
conda_env: conda.yaml
entry_points:
  main:
    parameters:
      lr: {type: float, default: 0.01}
    command: "python train.py --lr {lr}"

Use case for DevOps: You can trigger MLflow Projects as jobs in CI/CD pipelines — with guaranteed reproducibility.

vi. MLflow + Kubernetes + CI/CD

You can deploy a self-hosted MLflow tracking server on Kubernetes:

Then integrate with:


MLflow isn’t just for data scientists — it’s a DevOps goldmine!
Set it up as a central platform for all ML efforts, bring it under infrastructure-as-code, monitor the APIs like any other microservice, and bring ML and DevOps together like never before. 🚀

15.4.2025 10:54How DevOps Folks Can Use MLflow
https://blog.techiescamp.com/doc...

Main Backend Setup

https://blog.techiescamp.com/doc...

To setup query backend code, make sure to install following dependencies.

Install required libraries, here is the requirements.txt file,

langchain-openai
langchain_community
langchain_core
faiss-cpu
markdown
tiktoken
fastapi
pydantic
uvicorn
python-dotenv
scikit-learn
requests

Inside project directory, go to venv/Scripts/activate for windows user

C:/project-directory/> venv/Scripts/activate

(venv)C:/project-directory/>cd main_backend

(venv)C:/project-directory/main_backend>

Then install libraries

pip install -r requirements.txt

Import libraries in app.py file,

import os
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import AzureChatOpenAI
from langchain_community.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.memory import ConversationBufferMemory
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
import tiktoken
import glob

The query backend service will handles the user query and orchestrates the RAG pipeline.

/query endpoint will accept and sends query to vector-store service (e.g.: http://localhost:8001/search ) that performs similarity search in a vector database (FAISS). This is the "retrieval" part in the pipeline,

@app.post("/query")
async def query_rag(request: QueryRequest):
    try:
        query = request.query
        # search the docs based on query in vector-db 
        search_response = requests.post(f"{VECTOR_DB_URL}/search", json={"query": query})

        if search_response.status_code != 200:
            raise HTTPException(status_code=502, detail=f"Vector DB Error: {search_response.text}")
        search_results = search_response.json()
        context = "\n\n".join(doc['content'] for doc in search_results)
        .....

The prompt template will combine the query, context (relevant documents), chat history and instructions of how AI should behave. This is the "augmentation" part in the pipeline.

.....
prompt_template = ChatPromptTemplate.from_template(
            """
            You are a helpful AI assistant that explains concepts to beginners with examples and code. 
            Use the provided context and chat history to answer the question. Avoid spelling mistakes.
            If the context does NOT help answer the question, clearly mention that it's "out of context" and prefix your answer with a 🌟 emoji.

            Chat History: {chat_history}
            Context: {context}
            Question: {question}
            Answer: 
            """
        )
....

Using LangChain's runnable chain, it prepares inputs, renders the prompt, sends to Azure LLM and extracts the raw text answer. This is the "generation" part in the pipeline.

....
chain = (
            {
                "context": lambda x: context,
                "question": RunnablePassthrough(),
                "chat_history": lambda x: memory.load_memory_variables({"question": x})['chat_history'] 
            }
            | prompt_template
            | llm
            | StrOutputParser()
        ) 

To invoke the chain and store the current question and answer into memory.

result = chain.invoke(query)
memory.save_context({"question": query}, {"answer": result})

Returns both generated answer and list of document sources.

return {
            "answer": result,
            "sources": [
                doc['metadata']['source'] 
                for doc in search_results if 'metadata' in doc and 'source' in doc['metadata']
            ],
        }

The main backend starts from here, lets say host is localhost and port as 8000 for local development.

if __name__ == "__main__":
    uvicorn.run(app, host=HOST, port=PORT)

To run the code,

C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd main_backend
(venv)C:/rag_chatbot_k8/main_backend>python main.py

# or

(venv)C:/rag_chatbot_k8/main_backend>uv run main.py

Now it is running in service http://localhost:8000

15.4.2025 09:41Main Backend Setup
https://blog.techiescamp.com/doc...

Vector Database Backend

https://blog.techiescamp.com/doc...

FAISS (Facebook AI Similarity Search) is used as the vector store in our project. It stores and searches embeddings efficiently and supports high-performance similarity search, enabling fast and accurate retrieval of the most relevant document chunks based on the user's query.

Install required libraries here is the requirements.txt file,

langchain-openai
langchain_community
langchain_core
faiss-cpu
markdown
tiktoken
fastapi
pydantic
uvicorn
python-dotenv
scikit-learn
requests

Inside project directory, go to venv/Scripts/activate for windows user

C:/project-directory/> venv/Scripts/activate

(venv)C:/project-directory/>cd vector_store

(venv)C:/project-directory/vector_store>

Then install libraries

pip install -r requirements.txt

Import libraries in index.py file,

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import os
from typing import List
from langchain_community.vectorstores import FAISS
from langchain_openai import AzureOpenAIEmbeddings
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain.schema import Document
from faiss import IndexFlatL2
from dotenv import load_dotenv

Load environmental variables (like Azure credentials and host/port),

load_dotenv()

# Constants
VECTOR_STORE_PATH = "vector_store"
INDEX_NAME = "index"
EMBEDDING_DIM = 1536  # for Azure text-embedding-ada-002, adjust if needed
PORT = int(os.environ["PORT"])
HOST = os.environ["HOST"]

Setup fastapi, cors and Azure embedding model for converting query and searching vectors in database.

# Fast API Setup
app = FastAPI()

# enable cors
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Azure embedding model
embedding_model = AzureOpenAIEmbeddings(
    azure_endpoint=os.environ["AZURE_ENDPOINT"],
    deployment=os.environ["AZURE_EMBEDDING_DEPLOYMENT"],
    api_key=os.environ["AZURE_API_KEY"],
    openai_api_version=os.environ["AZURE_EMBEDDING_VERSION"]
)

Initialize vector store, if a FAISS index file exists, it loads it. If not, it creates new IndexFlatL2 index which is standard L2 norm vector index.

def load_vector_store():
    if os.path.exists(os.path.join(VECTOR_STORE_PATH, f"{INDEX_NAME}.faiss")):
        return FAISS.load_local(VECTOR_STORE_PATH, embedding_model, index_name=INDEX_NAME, allow_dangerous_deserialization=True)
    else:
        index = IndexFlatL2(EMBEDDING_DIM)
        return FAISS(embedding_model, index, InMemoryDocstore({}), {})

vector_store = load_vector_store()

/store endpoint,

@app.post("/store")
async def store_embeddings(data: List[EmbeddingItem]):
    try:
        docs = [
            Document(
                page_content=item.content,
                metadata=item.metadata
            )
            for item in data
        ]
        # add to vector db
        vector_store.add_texts(
            texts=[doc.page_content for doc in docs],
            metadatas=[doc.metadata for doc in docs]
        )
        # Persist store
        vector_store.save_local(VECTOR_STORE_PATH, index_name=INDEX_NAME)
        return {"status": "success", "stored": len(data)}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

/search endpoint,

@app.post("/search")
async def search_query(request: QueryRequest):
    try:
        docs_and_scores = vector_store.similarity_search(
            query=request.query,
            k=request.top_k
        )
        result = [{
                "content": doc.page_content,
                "metadata": doc.metadata
            } 
            for doc in docs_and_scores
        ]
        return result
    except Exception as e:
        import traceback
        traceback.print_exc()
        raise HTTPException(status_code=500, detail=str(e))

The vector store backend starts from the main, lets say host is localhost and port as 8001 for local development.

if __name__ == "__main__":
    uvicorn.run(app, host=HOST, port=PORT)

To run the code,

C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd vector_store
(venv)C:/rag_chatbot_k8/vector_store>python index.py

Now it is running in service http://localhost:8000

15.4.2025 09:26Vector Database Backend
https://blog.techiescamp.com/doc...

Sync Backend Setup for Batch Processing

https://blog.techiescamp.com/doc...

This is batch processing part where it automates syncing Kubernetes documentation by cloning or copying and preparing it for text embeddings.

Make sure to install required libraries and following is requiremnts.txt file.

langchain-openai
langchain_community
langchain_core
fastapi
uvicorn
python-dotenv
scikit-learn
requests

Go to the project directory and install libraries,

(venv)C:/<project-directory>/rag_chatbot>pip install -r requirements.txt

Import the following libraries,

import subprocess
import shutil
from pathlib import Path
import os
import requests
import glob
import time
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import AzureOpenAIEmbeddings
from dotenv import load_dotenv
import json
import hashlib

Declare constants,

HASH_DB_PATH = Path("hash_files.json")
REPO_URL = os.environ["K8_URL"]
VECTOR_DB_URL = os.environ["VECTOR_DB_URL"]

# Constants
TEMP_DIR = Path(os.path.abspath("./temp-docs"))
TARGET_DIR = Path(os.path.abspath("./k8_docs/en"))

# Batch configuration
EMBEDDING_BATCH_SIZE = 100  # Reduced batch size for embeddings
STORE_BATCH_SIZE = 100     # Batch size for vector store uploads
BATCH_DELAY = 2          # Delay between batches in seconds
RATE_LIMIT_DELAY = 60    # Delay when hitting rate limits in seconds

Start with Git operations,

def clone_or_pull_repo():
    if not TEMP_DIR.exists():
        print("✅ Cloning Kubernetes docs repo...")
        subprocess.run(["git", "clone", REPO_URL, str(TEMP_DIR)], check=True)
    else:
        print("✅ Pulling latest changes...")
        subprocess.run(["git", "-C", str(TEMP_DIR), "pull"], check=True)
def copy_docs():
    base_dir = TEMP_DIR / "content" / "en" / "docs"
    selected_subdirs = ["concepts"]
    for subdir in selected_subdirs:
        source_subdir = base_dir / subdir
        if not source_subdir.exists():
            print(f"⚠️ Source directory does not exist: {source_subdir}")
            continue
        # Create the target subdirectory
        target_subdir = TARGET_DIR / subdir
        target_subdir.mkdir(parents=True, exist_ok=True)
        
        # Copy all markdown files with their directory structure
        for file in source_subdir.glob("**/*.md"):
            relative_path = file.relative_to(source_subdir)
            dest_file = target_subdir / relative_path
            dest_file.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(file, dest_file)
            print(f"📄 Copied: {file} -> {dest_file}")

Load markdown files, load_md_files() reads copied .md files and stores them in a list.

def load_md_files():
    md_files = []
    try:
        search_path = os.path.join(TARGET_DIR, "concepts", "**", "*.md")
        for filepath in glob.glob(search_path, recursive=True):
            with open(filepath, 'r', encoding='utf-8') as f:
                text = f.read()
            md_files.append({
                'filename': os.path.basename(filepath),
                'content': text,
            })
    except Exception as e:
        print(f"Error reading files: {str(e)}")
    return md_files

Text splitting (chunking) the documents,

def call_text_splitter(md_docs):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=50)
    documents = []
    try:
        for doc in md_docs:
            split_texts = text_splitter.split_text(doc['content'])
            for i, chunk in enumerate(split_texts):
                document = Document(
                    page_content=chunk,
                    metadata={
                        'source': f"{doc['filename']}-{i}" 
                    }
                )
                documents.append(document)
    except Exception as e:
        print(f"Error in text splitting: {e}")
        raise

Hashing is used to prevent re-processing unchanged files. We don't have to process text embedding for the old files when new files were found in Kubernetes repo.

def get_file_hash(content):
    return hashlib.sha256(content.encode('utf-8')).hexdigest()

def load_existing_hashes():
    if HASH_DB_PATH.exists():
        with open(HASH_DB_PATH, "r") as f:
            return json.load(f)
    return {}

def save_hashes(hashes):
    with open(HASH_DB_PATH, "w") as f:
        json.dump(hashes, f, indent=2)

Embedding in batch process,

def process_and_store_batch(batch_documents):
    contents = []
    for doc in batch_documents:
        try:
            contents.append(doc.page_content)
        except Exception as e:
            print(f"Error accessing document content: {e}")
            continue  
    if not contents:
        return []  
    try:
        embeddings = embedding_model.embed_documents(contents)      
        payload = []
        for doc, embedding in zip(batch_documents, embeddings):
            try:
                payload.append({
                    "embedding": embedding,
                    "metadata": doc.metadata,
                    "content": doc.page_content
                })
            except Exception as e:
                print(f"Error creating payload item: {e}")
                continue    
        return payload
    except Exception as e:
        print(f"Error generating embeddings: {e}")
        if "429" in str(e):
            print(f"Rate limit hit, waiting {RATE_LIMIT_DELAY} seconds...")
            time.sleep(RATE_LIMIT_DELAY)
        return []
def store_embeddings_batch(payload_batch):
    try:
        response = session.post(f"{VECTOR_DB_URL}/store", json=payload_batch)
        response.raise_for_status()
        return True
    except Exception as e:
        if "429" in str(e):
            print(f"Rate limit hit, waiting {RATE_LIMIT_DELAY} seconds...")
            time.sleep(RATE_LIMIT_DELAY)
        return False
You can also add logic for batch data failure, it will use retry strategy and delay logic for rate limits. I have included this logic, you can visit this github link for more code details.
if __name__ == "__main__":    try:
        clone_or_pull_repo()
        copy_docs()
        rerun_embeddings()
        print("✅ Successfully stored embeddings... ")
    except Exception as e:
        print(f"❌ Error: {e}")

To run the code,

C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd sync_backend
(venv)C:/rag_chatbot_k8/sync_backend>python index.py

15.4.2025 08:49Sync Backend Setup for Batch Processing
https://blog.techiescamp.com/doc...

Prerequisites to run Mlflow locally on your machine

https://blog.techiescamp.com/doc...

To run MLflow locally, you don't need much – it is lightweight by default. But here's the breakdown of resource requirements:

1. Python (3.7+)

Ideally Python 3.8 and higher. Create a virtual environment to keep it clean.

2. Install MLflow

pip install mlflow

Optionally, you can install these libraries for data science

pip install scikit-learn pandas numpy

3. Resource Requirements

Example Local Dev Setup

A typical local dev machine to run MLflow smoothly:

Running MLflow locally

Once installed:

mlflow ui

This will start MLflow tracking server at http://localhost:5000 and store runs under the local ./mlruns directory.

Optional Add-ons

If you want more production-like setup:

Up next you can see MLflow workflow and how we can deploy ML model using it.

15.4.2025 08:47Prerequisites to run Mlflow locally on your machine
https://blog.techiescamp.com/doc...
Subscribe

🔝

Datenschutzerklärung    Impressum