diff --git a/src/write_service/app.py b/src/write_service/app.py index 4ce6b28..94eb70e 100644 --- a/src/write_service/app.py +++ b/src/write_service/app.py @@ -23,7 +23,7 @@ def test_json(): # Grab and parse data from URL, also send to Kafka testURL = "https://www.stlouis-mo.gov/customcf/endpoints/arpa/expenditures.cfm?format=json" result = get_json(testURL) - kafka_status = send_data(result) + kafka_status = send_data(result, "arpa") # Display results to user formattedResult = json.dumps(result, indent=2) diff --git a/src/write_service/consumers/json_consumer.py b/src/write_service/consumers/json_consumer.py new file mode 100644 index 0000000..48da824 --- /dev/null +++ b/src/write_service/consumers/json_consumer.py @@ -0,0 +1,148 @@ +from kafka import KafkaConsumer +from kafka.errors import NoBrokersAvailable +import json, time +import os +import time +from sqlalchemy import Column, Integer, DateTime, JSON, String, create_engine +from sqlalchemy.orm import Session, DeclarativeBase +from sqlalchemy.exc import SQLAlchemyError +from datetime import datetime, timezone +import urllib.parse + +# Configuration +KAFKA_BROKER = os.getenv('KAFKA_BROKER', 'localhost:9092') +PG_HOST = os.getenv('PG_HOST', 'localhost') +PG_PORT = os.getenv('PG_PORT', '5432') +PG_DB = os.getenv('PG_DB', 'stl_data') +PG_USER = os.getenv('PG_USER', 'postgres') +PG_PASSWORD = os.getenv('PG_PASSWORD', "Welcome@123456") # update with pg password if needed + +# SQL Alchemy requires this base class thing +class Base(DeclarativeBase): + pass + +def get_table_class(table_name): + """ + This will create a table class if it doesn't exist, otherwise it will just return the table class. + This allows the table to be named after the topic (e.g. crime, traffic, etc.) + """ + class DataTable(Base): + __tablename__ = table_name + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String) + content = Column(JSON, nullable=False) + date_added = Column(DateTime, default=datetime.now(timezone.utc)) + return DataTable + +def retrieve_from_kafka(topic_name): + """ + This function retrieves the JSON data from Kafka. + """ + if (topic_name == None): + topic_name = "JSON-data" + + received_data = [] + + try: + # Since Kafka is super slow, let's give it 3 tries + for attempt in range(3): + try: + # Connect to Kafka + consumer = KafkaConsumer( + topic_name, + bootstrap_servers=['localhost:9092', 'kafka:29092'], + value_deserializer=lambda m: json.loads(m.decode("utf-8")), + auto_offset_reset='earliest', + consumer_timeout_ms=5000 + ) + + # Get all messages + try: + for message in consumer: + data = message.value + received_data.append(data) + print(f"Received message: {data}") + except Exception as e: + print(f"Database error: {e}") + + consumer.close() + return received_data + except NoBrokersAvailable: + # Connection failed, try again + print(f"Kafka consumer attempt {attempt+1} failed (NoBrokersAvailable), retrying in 5s...") + time.sleep(5) + except Exception as e: + print(f"Something went wrong with Kafka Consumer!: \n {e}") + raise + +def save_into_database(data, topic_name, topic_extended_name=None): + """ + This function takes the data then saves it into the PostgreSQL database into a table with the topic name + using SQL Alchemy. + Each entity of the data is a row. + """ + + if (topic_extended_name == None): + topic_extended_name = topic_name + + try: + # Ensure data is in right format (list) else abort! + if (not isinstance(data, list)): + print("The data to be saved into the database is not valid! It must be a list.") + return + + # Connect to database + encoded_password = urllib.parse.quote_plus(PG_PASSWORD) + engine_url = f"postgresql+psycopg2://{PG_USER}:{encoded_password}@{PG_HOST}:{PG_PORT}/{PG_DB}" + engine = create_engine(engine_url, echo=True) + + # Get and create the table (if it did not already exist) + table = get_table_class(topic_name) + Base.metadata.create_all(engine) + + # Now let's add stuff to the table + session = Session(engine) + with Session(engine) as session: + + # Each entity will be a new row + entity_counter = 1 + + for entity in data: + + # Ensure data is in JSON format, otherwise we skip it + try: + json.dumps(entity) + except json.JSONDecodeError: + print("Entity " + str(entity) + " is not valid JSON. Going to the next entity.") + continue + + new_row = table( + name= topic_extended_name + " Entity #" + str(entity_counter), + content=entity) + session.add(new_row) + entity_counter += 1 + + # Push all changes + session.commit() + + # We are done! + # Close the engine (or return the session if in test mode) + if (str(engine.url) != "sqlite:///:memory:"): + engine.dispose() + else: + return session + + print("PostgreSQL: OK") + + # Exceptions + except SQLAlchemyError as e: + print("An error occured when connecting to the database. \n " + str(e)) + session.rollback() + + except Exception as e: + print("An error occured when saving to the database. \n " + str(e)) + +# Test function +if __name__ == "__main__": + test_data = retrieve_from_kafka("arpa") + save_into_database(test_data, "arpa2", "ARPA funds usage") \ No newline at end of file diff --git a/src/write_service/processing/json_processor.py b/src/write_service/processing/json_processor.py index e0bc59b..4e3d9d5 100644 --- a/src/write_service/processing/json_processor.py +++ b/src/write_service/processing/json_processor.py @@ -38,7 +38,7 @@ def clean_data(raw_data): return result -def send_data(raw_data): +def send_data(raw_data, topic_name): """This function checks if the data passes the schema and then sends the data to Kafka.""" # Clean the data first @@ -68,10 +68,14 @@ def send_data(raw_data): request_timeout_ms=10000, reconnect_backoff_ms=1000 ) - producer.send("JSON-data", data) - producer.flush() - logging.info("Sent JSON data to Kafka: " + str(data)) - return "Sent data to Kafka successfully!
" + "Topic: JSON data
" + "Data:
" + str(data) + + # Send each entry in the list separately to Kafka so the message isn't too big + for entry in data: + producer.send(topic_name, entry) + producer.flush() + logging.info("Sent JSON data to Kafka: " + str(entry)) + + return "Sent data to Kafka successfully!
" + "Topic: " + topic_name + "
" + "Data:
" + str(data) except NoBrokersAvailable: # Kafka may not be available yet, let's try again logging.error(f"Kafka producer attempt {attempt+1} failed (NoBrokersAvailable), retrying in 5s...") diff --git a/tests/write_service/test_json_consumer.py b/tests/write_service/test_json_consumer.py new file mode 100644 index 0000000..29fcd7a --- /dev/null +++ b/tests/write_service/test_json_consumer.py @@ -0,0 +1,35 @@ +""" +Tests for the JSON consumer and ensure data is stored in a database successfully and in the right format. +""" + +from unittest.mock import patch +from sqlalchemy import create_engine, text +from src.write_service.consumers.json_consumer import save_into_database + +def test_save_to_database(): + """ + This is a function that ensures that sample data is stored correctly in a test database. + """ + + test_data = [{"traffic count": 24}, {"traffic count": 34}] + + # We don't want to use the real database, so we must use a temporary one in memory (deletes when connection closed) + engine = create_engine("sqlite:///:memory:") + + # We will use patch to replace the create_engine() function so it uses the temporary engine not the real one + with patch("src.write_service.consumers.json_consumer.create_engine") as original_engine: + original_engine.return_value = engine + session = save_into_database(test_data, "test", "Traffic Count") + + # Let's see if the data is saved correctly or not + result = session.execute(text("SELECT * FROM test")).fetchall() + + # There should only be 2 rows + assert len(result) == 2 + + # Check to see if the cell data is accurate + assert result[0].name == "Traffic Count Entity #1" + assert result[0].content == '{"traffic count": 24}' + assert result[1].name == "Traffic Count Entity #2" + assert result[1].content == '{"traffic count": 34}' + session.close() \ No newline at end of file diff --git a/tests/write_service/test_json_processor.py b/tests/write_service/test_json_processor.py index 7188d57..f582926 100644 --- a/tests/write_service/test_json_processor.py +++ b/tests/write_service/test_json_processor.py @@ -16,7 +16,7 @@ def test_send_to_kafka_valid(): """This test checks if a valid list of dictionary passes the schema, if Kafka was able to be connected to, and if the data was able to be sent to Kafka.""" valid_records = [{"color": "blue"}, {"color": "red"}] - kafka_status = send_data(valid_records) + kafka_status = send_data(valid_records, "test") # See if matches expected response from JSON processor assert kafka_status.startswith("Sent data to Kafka successfully!") == True @@ -25,7 +25,7 @@ def test_send_to_kafka_invalid(): """This test checks if improperly formatted data is able to be cleaned, pass the schema, and be sent to Kafka.""" invalid_records = [1, 2, 3] - kafka_status = send_data(invalid_records) + kafka_status = send_data(invalid_records, "test") # See if matches expected response from JSON processor assert kafka_status.startswith("Sent data to Kafka successfully!") == True \ No newline at end of file