Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/write_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_json():
# Grab and parse data from URL, also send to Kafka
testURL = "https://www.stlouis-mo.gov/customcf/endpoints/arpa/expenditures.cfm?format=json"
result = get_json(testURL)
kafka_status = send_data(result)
kafka_status = send_data(result, "arpa")

# Display results to user
formattedResult = json.dumps(result, indent=2)
Expand Down
148 changes: 148 additions & 0 deletions src/write_service/consumers/json_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
import json, time
import os
import time
from sqlalchemy import Column, Integer, DateTime, JSON, String, create_engine
from sqlalchemy.orm import Session, DeclarativeBase
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timezone
import urllib.parse

# Configuration
KAFKA_BROKER = os.getenv('KAFKA_BROKER', 'localhost:9092')
PG_HOST = os.getenv('PG_HOST', 'localhost')
PG_PORT = os.getenv('PG_PORT', '5432')
PG_DB = os.getenv('PG_DB', 'stl_data')
PG_USER = os.getenv('PG_USER', 'postgres')
PG_PASSWORD = os.getenv('PG_PASSWORD', "Welcome@123456") # update with pg password if needed

# SQL Alchemy requires this base class thing
class Base(DeclarativeBase):
pass

def get_table_class(table_name):
"""
This will create a table class if it doesn't exist, otherwise it will just return the table class.
This allows the table to be named after the topic (e.g. crime, traffic, etc.)
"""
class DataTable(Base):
__tablename__ = table_name
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String)
content = Column(JSON, nullable=False)
date_added = Column(DateTime, default=datetime.now(timezone.utc))
return DataTable

def retrieve_from_kafka(topic_name):
"""
This function retrieves the JSON data from Kafka.
"""
if (topic_name == None):
topic_name = "JSON-data"

received_data = []

try:
# Since Kafka is super slow, let's give it 3 tries
for attempt in range(3):
try:
# Connect to Kafka
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092', 'kafka:29092'],
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset='earliest',
consumer_timeout_ms=5000
)

# Get all messages
try:
for message in consumer:
data = message.value
received_data.append(data)
print(f"Received message: {data}")
except Exception as e:
print(f"Database error: {e}")

consumer.close()
return received_data
except NoBrokersAvailable:
# Connection failed, try again
print(f"Kafka consumer attempt {attempt+1} failed (NoBrokersAvailable), retrying in 5s...")
time.sleep(5)
except Exception as e:
print(f"Something went wrong with Kafka Consumer!: \n {e}")
raise

def save_into_database(data, topic_name, topic_extended_name=None):
"""
This function takes the data then saves it into the PostgreSQL database into a table with the topic name
using SQL Alchemy.
Each entity of the data is a row.
"""

if (topic_extended_name == None):
topic_extended_name = topic_name

try:
# Ensure data is in right format (list) else abort!
if (not isinstance(data, list)):
print("The data to be saved into the database is not valid! It must be a list.")
return

# Connect to database
encoded_password = urllib.parse.quote_plus(PG_PASSWORD)
engine_url = f"postgresql+psycopg2://{PG_USER}:{encoded_password}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(engine_url, echo=True)

# Get and create the table (if it did not already exist)
table = get_table_class(topic_name)
Base.metadata.create_all(engine)

# Now let's add stuff to the table
session = Session(engine)
with Session(engine) as session:

# Each entity will be a new row
entity_counter = 1

for entity in data:

# Ensure data is in JSON format, otherwise we skip it
try:
json.dumps(entity)
except json.JSONDecodeError:
print("Entity " + str(entity) + " is not valid JSON. Going to the next entity.")
continue

new_row = table(
name= topic_extended_name + " Entity #" + str(entity_counter),
content=entity)
session.add(new_row)
entity_counter += 1

# Push all changes
session.commit()

# We are done!
# Close the engine (or return the session if in test mode)
if (str(engine.url) != "sqlite:///:memory:"):
engine.dispose()
else:
return session

print("PostgreSQL: OK")

# Exceptions
except SQLAlchemyError as e:
print("An error occured when connecting to the database. \n " + str(e))
session.rollback()

except Exception as e:
print("An error occured when saving to the database. \n " + str(e))

# Test function
if __name__ == "__main__":
test_data = retrieve_from_kafka("arpa")
save_into_database(test_data, "arpa2", "ARPA funds usage")
14 changes: 9 additions & 5 deletions src/write_service/processing/json_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def clean_data(raw_data):

return result

def send_data(raw_data):
def send_data(raw_data, topic_name):
"""This function checks if the data passes the schema and then sends the data to Kafka."""

# Clean the data first
Expand Down Expand Up @@ -68,10 +68,14 @@ def send_data(raw_data):
request_timeout_ms=10000,
reconnect_backoff_ms=1000
)
producer.send("JSON-data", data)
producer.flush()
logging.info("Sent JSON data to Kafka: " + str(data))
return "Sent data to Kafka successfully!<br>" + "Topic: JSON data<br>" + "Data:<br>" + str(data)

# Send each entry in the list separately to Kafka so the message isn't too big
for entry in data:
producer.send(topic_name, entry)
producer.flush()
logging.info("Sent JSON data to Kafka: " + str(entry))

return "Sent data to Kafka successfully!<br>" + "Topic: " + topic_name + "<br>" + "Data:<br>" + str(data)
except NoBrokersAvailable:
# Kafka may not be available yet, let's try again
logging.error(f"Kafka producer attempt {attempt+1} failed (NoBrokersAvailable), retrying in 5s...")
Expand Down
35 changes: 35 additions & 0 deletions tests/write_service/test_json_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Tests for the JSON consumer and ensure data is stored in a database successfully and in the right format.
"""

from unittest.mock import patch
from sqlalchemy import create_engine, text
from src.write_service.consumers.json_consumer import save_into_database

def test_save_to_database():
"""
This is a function that ensures that sample data is stored correctly in a test database.
"""

test_data = [{"traffic count": 24}, {"traffic count": 34}]

# We don't want to use the real database, so we must use a temporary one in memory (deletes when connection closed)
engine = create_engine("sqlite:///:memory:")

# We will use patch to replace the create_engine() function so it uses the temporary engine not the real one
with patch("src.write_service.consumers.json_consumer.create_engine") as original_engine:
original_engine.return_value = engine
session = save_into_database(test_data, "test", "Traffic Count")

# Let's see if the data is saved correctly or not
result = session.execute(text("SELECT * FROM test")).fetchall()

# There should only be 2 rows
assert len(result) == 2

# Check to see if the cell data is accurate
assert result[0].name == "Traffic Count Entity #1"
assert result[0].content == '{"traffic count": 24}'
assert result[1].name == "Traffic Count Entity #2"
assert result[1].content == '{"traffic count": 34}'
session.close()
4 changes: 2 additions & 2 deletions tests/write_service/test_json_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_send_to_kafka_valid():
"""This test checks if a valid list of dictionary passes the schema, if Kafka was able to be connected to, and if the data was able to be sent to Kafka."""
valid_records = [{"color": "blue"}, {"color": "red"}]

kafka_status = send_data(valid_records)
kafka_status = send_data(valid_records, "test")

# See if matches expected response from JSON processor
assert kafka_status.startswith("Sent data to Kafka successfully!") == True
Expand All @@ -25,7 +25,7 @@ def test_send_to_kafka_invalid():
"""This test checks if improperly formatted data is able to be cleaned, pass the schema, and be sent to Kafka."""
invalid_records = [1, 2, 3]

kafka_status = send_data(invalid_records)
kafka_status = send_data(invalid_records, "test")

# See if matches expected response from JSON processor
assert kafka_status.startswith("Sent data to Kafka successfully!") == True
Loading