This project demonstrates a real-time AI "Meeting Coach" showcasing the use of Confluent Cloud for Apache Flink AI Inference functions to build a real-time Retrieval-Augmented Generation (RAG) pipeline. The demo uses both a static knowledge base of sales documents and real-time simulated meeting data.
The Meeting Coach system consumes a chat stream representing a sales meeting. When specific triggers occur (like a customer raising a price objection), it uses a RAG pipeline to retrieve relevant context from a knowledge base of company documents (e.g., battlecards, sales materials, previous meeting notes, objection handling tips) stored in a vector database, and generates real-time coaching suggestions for the salesperson using an LLM.
This guide helps you deploy the complete Flink ML Demo infrastructure from scratch with minimal credentials.
You need accounts and API credentials for:
- Confluent Cloud (for Kafka + Flink)
- Microsoft Azure (for OpenAI services)
- MongoDB Atlas (for vector database)
- This deployment uses MongoDB Atlas M0 Free Tier
- Note: M0 free tier cluster cannot be created via Terraform API - must be created manually in Atlas UI
1.1. Go to Confluent Cloud
1.2. Create a new Cloud API Key (not cluster-specific)
1.3. Note down: API Key
and API Secret
1.4. Go to Azure Portal > App registrations
1.5. Create a new app registration
1.6. Go to "Certificates & secrets" > New client secret
1.7. Go to "Subscriptions" > Your subscription > Access control (IAM)
1.8. Add role assignment: "Contributor" for your app
1.9. Note down: Subscription ID
, Tenant ID
, Client ID
, Client Secret
1.10. Go to MongoDB Atlas > Access Manager > API Keys
1.11. Create new Organization API Key with "Project Creator" permissions
1.12. Note down: Public Key
, Private Key
, and your Organization ID
2.1. Copy the template:
cp .env.template .env
2.2. Open the .env file and fill it out with your credentials:
nano .env
3.1. Initialize Terraform:
cd terraform
terraform init
3.2. Load environment variables:
Make sure to load this command every time before running terraform plan
or terraform apply
source ./load_tf_vars.sh
3.3. Deploy everything:
-- make sure to run `source ./load_tf_vars.sh` every time before `terraform apply`
terraform apply --auto-approve
Step 4: Run personalized Flink SQL commands to generate connections, models, and tables in Confluent Cloud
4.1. Run terraform/generate_personalized_commands.sh
which generates terraform/personalized_setup_commands.md
, containing personalized Flink SQL commands with your specific deployment details.
cd terraform
./generate_personalized_commands.sh
4.2. Run the personalized Flink SQL statements from personalized_setup_commands.md
in Confluent Cloud to create Flink connections, models, and tables
Use either the Confluent Cloud Flink SQL Workspace or, if logged into Confluent CLI, use Confluent Flink shell by running:
confluent flink shell
There are two main tracks that define how data flows within this project:
- Ingestion: Generate static, synthetic knowledge base documents (e.g., .txt, .md). Use a method like the Kafka file source connector or a custom producer to send these documents as messages to the
knowledge
Kafka topic. - Processing & Embedding: A Flink SQL job consumes these documents, chunks them using
ML_CHARACTER_TEXT_SPLITTER
, usesML_PREDICT
with theopenaiembed
model to call Azure OpenAI for embeddings, and prepares the data for the vector store. - Storage: The embedded knowledge is stored in MongoDB Atlas via the
knowledge_mongodb
external table definition and a suitable sink mechanism.
- Input: Real or simulated chat messages are sent to the
messages_conversation
Kafka topic via the frontend web UI. - Processing: Flink SQL filters the conversation for prospect messages (
messages_prospect
), then generates embeddings for these messages (messages_prospect_embeddings
) by calling out to the Azure OpenAItext-embedding-ada-002
model. - RAG: The system then uses Flink SQL AI functions to perform vector search against MongoDB Atlas vector database using
VECTOR_SEARCH
, and saves the results (messages_prospect_rag_results
). - Generation: The system calls the Azure OpenAI
gpt-4o-mini
model viaML_PREDICT
using thecoaching_response_generator
model and a custom prompt, including the prospect's recent message, and the retrieved document chunk results from RAG retrieval. - Pipeline result storage: The original message, along with the relevant document chunks retrieved via RAG search, and the
coaching_response_generator
's final meeting coaching output, are saved for later review and model retraining to Azure CosmosDB.
-
Activate Virtual Environment: Before running the application, activate the Python virtual environment. If you haven't created one, you can do so with
python -m venv .venv
oruv venv
.- On macOS/Linux:
source .venv/bin/activate
- On Windows:
.\.venv\Scripts\activate
- On macOS/Linux:
-
Install Dependencies: Install the required Python packages:
pip install -r requirements.txt
Alternatively, if using
uv
, run:uv sync
-
Set Environment Variables: Ensure you have have fully filled out the
.env
file in the project root directory with the necessary credentials (see.env.template
for required variables like Kafka/Schema Registry credentials and Azure OpenAI keys). -
Load Knowledge Base (one-time setup):
python publish_knowledge_documents.py
-
Run the Flask App: Start the Flask development server:
python app.py
The application should now be accessible at
http://127.0.0.1:5000
(or the host/port specified in the output).
The Terraform + Flink SQL statements you entered earlier create everything from scratch:
- β New environment
- β Kafka cluster with all topics
- β Flink compute pool
- β Service accounts and API keys
- β MongoDB sink connector
- β Resource group
- β OpenAI service
- β Text embedding model deployment
- β GPT-4 completion model deployment
- β New project
- β M0 Free Tier cluster with vector search support
- β Database user with proper permissions
- β Vector search index (1536 dimensions for OpenAI)
- β IP allowlist for access
To destroy all resources:
cd terraform
terraform destroy --auto-approve
"Resource already exists" errors:
- Change your
DEPLOYMENT_PREFIX
in .env to something unique
MongoDB connection issues:
- Wait 2-3 minutes after cluster creation for it to be fully ready
- Ensure your cluster is deployed in
eastus2 (Virigina)
region, same as rest of project infra. - Whitelist IP 0.0.0.0/0 to enable Confluent to connect
Azure permission errors:
- Ensure your service principal has "Contributor" role on the subscription
- Check that the Azure region is
eastus2
for this project - do not change region.
Confluent connection errors:
- Verify your Cloud API key is a
Cloud Resource Management
key, and that you've also attachedOrganizationAdmin
permissions to the key (!!) - Ensure your cluster is deployed in
eastus2 (Virigina)
region, same as rest of project infra. - Use the exact resource IDs from terraform outputs
βββ README.md (this file)
βββ .env.template (credential template)
βββ requirements.txt (python dependencies)
βββ publish_knowledge_documents.py (main utility)
βββ app.py (main flask application)
βββ terraform/ (infrastructure as code)
βββ app/ (application code & assets)
β βββ routes/ (flask routes)
β βββ utils/ (utilities)
β βββ templates/ (html templates)
β βββ static/ (css, js, images)
β βββ scripts/ (utility scripts)
βββ sample-data/ (demo knowledge base)
βββ images/ (screenshots & diagrams)