diff --git a/src/write_service/app.py b/src/write_service/app.py
index 4ce6b28..94eb70e 100644
--- a/src/write_service/app.py
+++ b/src/write_service/app.py
@@ -23,7 +23,7 @@ def test_json():
# Grab and parse data from URL, also send to Kafka
testURL = "https://www.stlouis-mo.gov/customcf/endpoints/arpa/expenditures.cfm?format=json"
result = get_json(testURL)
- kafka_status = send_data(result)
+ kafka_status = send_data(result, "arpa")
# Display results to user
formattedResult = json.dumps(result, indent=2)
diff --git a/src/write_service/consumers/json_consumer.py b/src/write_service/consumers/json_consumer.py
new file mode 100644
index 0000000..48da824
--- /dev/null
+++ b/src/write_service/consumers/json_consumer.py
@@ -0,0 +1,148 @@
+from kafka import KafkaConsumer
+from kafka.errors import NoBrokersAvailable
+import json, time
+import os
+import time
+from sqlalchemy import Column, Integer, DateTime, JSON, String, create_engine
+from sqlalchemy.orm import Session, DeclarativeBase
+from sqlalchemy.exc import SQLAlchemyError
+from datetime import datetime, timezone
+import urllib.parse
+
+# Configuration
+KAFKA_BROKER = os.getenv('KAFKA_BROKER', 'localhost:9092')
+PG_HOST = os.getenv('PG_HOST', 'localhost')
+PG_PORT = os.getenv('PG_PORT', '5432')
+PG_DB = os.getenv('PG_DB', 'stl_data')
+PG_USER = os.getenv('PG_USER', 'postgres')
+PG_PASSWORD = os.getenv('PG_PASSWORD', "Welcome@123456") # update with pg password if needed
+
+# SQL Alchemy requires this base class thing
+class Base(DeclarativeBase):
+ pass
+
+def get_table_class(table_name):
+ """
+ This will create a table class if it doesn't exist, otherwise it will just return the table class.
+ This allows the table to be named after the topic (e.g. crime, traffic, etc.)
+ """
+ class DataTable(Base):
+ __tablename__ = table_name
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ name = Column(String)
+ content = Column(JSON, nullable=False)
+ date_added = Column(DateTime, default=datetime.now(timezone.utc))
+ return DataTable
+
+def retrieve_from_kafka(topic_name):
+ """
+ This function retrieves the JSON data from Kafka.
+ """
+ if (topic_name == None):
+ topic_name = "JSON-data"
+
+ received_data = []
+
+ try:
+ # Since Kafka is super slow, let's give it 3 tries
+ for attempt in range(3):
+ try:
+ # Connect to Kafka
+ consumer = KafkaConsumer(
+ topic_name,
+ bootstrap_servers=['localhost:9092', 'kafka:29092'],
+ value_deserializer=lambda m: json.loads(m.decode("utf-8")),
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=5000
+ )
+
+ # Get all messages
+ try:
+ for message in consumer:
+ data = message.value
+ received_data.append(data)
+ print(f"Received message: {data}")
+ except Exception as e:
+ print(f"Database error: {e}")
+
+ consumer.close()
+ return received_data
+ except NoBrokersAvailable:
+ # Connection failed, try again
+ print(f"Kafka consumer attempt {attempt+1} failed (NoBrokersAvailable), retrying in 5s...")
+ time.sleep(5)
+ except Exception as e:
+ print(f"Something went wrong with Kafka Consumer!: \n {e}")
+ raise
+
+def save_into_database(data, topic_name, topic_extended_name=None):
+ """
+ This function takes the data then saves it into the PostgreSQL database into a table with the topic name
+ using SQL Alchemy.
+ Each entity of the data is a row.
+ """
+
+ if (topic_extended_name == None):
+ topic_extended_name = topic_name
+
+ try:
+ # Ensure data is in right format (list) else abort!
+ if (not isinstance(data, list)):
+ print("The data to be saved into the database is not valid! It must be a list.")
+ return
+
+ # Connect to database
+ encoded_password = urllib.parse.quote_plus(PG_PASSWORD)
+ engine_url = f"postgresql+psycopg2://{PG_USER}:{encoded_password}@{PG_HOST}:{PG_PORT}/{PG_DB}"
+ engine = create_engine(engine_url, echo=True)
+
+ # Get and create the table (if it did not already exist)
+ table = get_table_class(topic_name)
+ Base.metadata.create_all(engine)
+
+ # Now let's add stuff to the table
+ session = Session(engine)
+ with Session(engine) as session:
+
+ # Each entity will be a new row
+ entity_counter = 1
+
+ for entity in data:
+
+ # Ensure data is in JSON format, otherwise we skip it
+ try:
+ json.dumps(entity)
+ except json.JSONDecodeError:
+ print("Entity " + str(entity) + " is not valid JSON. Going to the next entity.")
+ continue
+
+ new_row = table(
+ name= topic_extended_name + " Entity #" + str(entity_counter),
+ content=entity)
+ session.add(new_row)
+ entity_counter += 1
+
+ # Push all changes
+ session.commit()
+
+ # We are done!
+ # Close the engine (or return the session if in test mode)
+ if (str(engine.url) != "sqlite:///:memory:"):
+ engine.dispose()
+ else:
+ return session
+
+ print("PostgreSQL: OK")
+
+ # Exceptions
+ except SQLAlchemyError as e:
+ print("An error occured when connecting to the database. \n " + str(e))
+ session.rollback()
+
+ except Exception as e:
+ print("An error occured when saving to the database. \n " + str(e))
+
+# Test function
+if __name__ == "__main__":
+ test_data = retrieve_from_kafka("arpa")
+ save_into_database(test_data, "arpa2", "ARPA funds usage")
\ No newline at end of file
diff --git a/src/write_service/processing/json_processor.py b/src/write_service/processing/json_processor.py
index e0bc59b..4e3d9d5 100644
--- a/src/write_service/processing/json_processor.py
+++ b/src/write_service/processing/json_processor.py
@@ -38,7 +38,7 @@ def clean_data(raw_data):
return result
-def send_data(raw_data):
+def send_data(raw_data, topic_name):
"""This function checks if the data passes the schema and then sends the data to Kafka."""
# Clean the data first
@@ -68,10 +68,14 @@ def send_data(raw_data):
request_timeout_ms=10000,
reconnect_backoff_ms=1000
)
- producer.send("JSON-data", data)
- producer.flush()
- logging.info("Sent JSON data to Kafka: " + str(data))
- return "Sent data to Kafka successfully!
" + "Topic: JSON data
" + "Data:
" + str(data)
+
+ # Send each entry in the list separately to Kafka so the message isn't too big
+ for entry in data:
+ producer.send(topic_name, entry)
+ producer.flush()
+ logging.info("Sent JSON data to Kafka: " + str(entry))
+
+ return "Sent data to Kafka successfully!
" + "Topic: " + topic_name + "
" + "Data:
" + str(data)
except NoBrokersAvailable:
# Kafka may not be available yet, let's try again
logging.error(f"Kafka producer attempt {attempt+1} failed (NoBrokersAvailable), retrying in 5s...")
diff --git a/tests/write_service/test_json_consumer.py b/tests/write_service/test_json_consumer.py
new file mode 100644
index 0000000..29fcd7a
--- /dev/null
+++ b/tests/write_service/test_json_consumer.py
@@ -0,0 +1,35 @@
+"""
+Tests for the JSON consumer and ensure data is stored in a database successfully and in the right format.
+"""
+
+from unittest.mock import patch
+from sqlalchemy import create_engine, text
+from src.write_service.consumers.json_consumer import save_into_database
+
+def test_save_to_database():
+ """
+ This is a function that ensures that sample data is stored correctly in a test database.
+ """
+
+ test_data = [{"traffic count": 24}, {"traffic count": 34}]
+
+ # We don't want to use the real database, so we must use a temporary one in memory (deletes when connection closed)
+ engine = create_engine("sqlite:///:memory:")
+
+ # We will use patch to replace the create_engine() function so it uses the temporary engine not the real one
+ with patch("src.write_service.consumers.json_consumer.create_engine") as original_engine:
+ original_engine.return_value = engine
+ session = save_into_database(test_data, "test", "Traffic Count")
+
+ # Let's see if the data is saved correctly or not
+ result = session.execute(text("SELECT * FROM test")).fetchall()
+
+ # There should only be 2 rows
+ assert len(result) == 2
+
+ # Check to see if the cell data is accurate
+ assert result[0].name == "Traffic Count Entity #1"
+ assert result[0].content == '{"traffic count": 24}'
+ assert result[1].name == "Traffic Count Entity #2"
+ assert result[1].content == '{"traffic count": 34}'
+ session.close()
\ No newline at end of file
diff --git a/tests/write_service/test_json_processor.py b/tests/write_service/test_json_processor.py
index 7188d57..f582926 100644
--- a/tests/write_service/test_json_processor.py
+++ b/tests/write_service/test_json_processor.py
@@ -16,7 +16,7 @@ def test_send_to_kafka_valid():
"""This test checks if a valid list of dictionary passes the schema, if Kafka was able to be connected to, and if the data was able to be sent to Kafka."""
valid_records = [{"color": "blue"}, {"color": "red"}]
- kafka_status = send_data(valid_records)
+ kafka_status = send_data(valid_records, "test")
# See if matches expected response from JSON processor
assert kafka_status.startswith("Sent data to Kafka successfully!") == True
@@ -25,7 +25,7 @@ def test_send_to_kafka_invalid():
"""This test checks if improperly formatted data is able to be cleaned, pass the schema, and be sent to Kafka."""
invalid_records = [1, 2, 3]
- kafka_status = send_data(invalid_records)
+ kafka_status = send_data(invalid_records, "test")
# See if matches expected response from JSON processor
assert kafka_status.startswith("Sent data to Kafka successfully!") == True
\ No newline at end of file