From a551d49346e4d6dab4d4235e775be3164082582c Mon Sep 17 00:00:00 2001 From: Ajay verma Date: Sat, 16 Apr 2022 19:40:56 +0530 Subject: [PATCH 1/4] initial improvement commit --- src/main/java/com/uditagarwal/Main.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/uditagarwal/Main.java b/src/main/java/com/uditagarwal/Main.java index b528d04..4cec004 100644 --- a/src/main/java/com/uditagarwal/Main.java +++ b/src/main/java/com/uditagarwal/Main.java @@ -13,7 +13,7 @@ public static void main(String[] args) throws InterruptedException { final SleepingSubscriber sub2 = new SleepingSubscriber("sub2", 10000); queue.subscribe(sub1, topic1); queue.subscribe(sub2, topic1); - +// final SleepingSubscriber sub3 = new SleepingSubscriber("sub3", 5000); queue.subscribe(sub3, topic2); From 6284eff70aaf2595e70ede5dae44474962983a0f Mon Sep 17 00:00:00 2001 From: Ajay verma Date: Sun, 17 Apr 2022 03:49:53 +0530 Subject: [PATCH 2/4] refactored --- pom.xml | 15 +++++ ...ngSubscriber.java => DummySubscriber.java} | 10 ++-- src/main/java/com/uditagarwal/Main.java | 42 ++++++------- .../pub_sub_queue/InMemoryQueue.java | 60 +++++++++++++++++++ .../factories/SubscriberFactory.java | 11 ++++ .../pub_sub_queue/factories/TopicFactory.java | 15 +++++ ...{TopicHandler.java => TopicProcessor.java} | 10 ++-- .../pub_sub_queue/model/Topic.java | 8 +-- .../public_interface/IQueue.java | 15 +++++ .../pub_sub_queue/public_interface/Queue.java | 49 --------------- 10 files changed, 153 insertions(+), 82 deletions(-) rename src/main/java/com/uditagarwal/{SleepingSubscriber.java => DummySubscriber.java} (60%) create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java rename src/main/java/com/uditagarwal/pub_sub_queue/handler/{TopicHandler.java => TopicProcessor.java} (80%) create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java delete mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java diff --git a/pom.xml b/pom.xml index 6d25477..70c3ca1 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,21 @@ 1.18.16 provided + + org.apache.logging.log4j + log4j-api + 2.7 + + + org.apache.logging.log4j + log4j-core + 2.7 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.7 + diff --git a/src/main/java/com/uditagarwal/SleepingSubscriber.java b/src/main/java/com/uditagarwal/DummySubscriber.java similarity index 60% rename from src/main/java/com/uditagarwal/SleepingSubscriber.java rename to src/main/java/com/uditagarwal/DummySubscriber.java index 6ec6fc0..a1ce9ed 100644 --- a/src/main/java/com/uditagarwal/SleepingSubscriber.java +++ b/src/main/java/com/uditagarwal/DummySubscriber.java @@ -2,12 +2,14 @@ import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; import com.uditagarwal.pub_sub_queue.model.Message; +import lombok.extern.slf4j.Slf4j; -public class SleepingSubscriber implements ISubscriber { +@Slf4j +public class DummySubscriber implements ISubscriber { private final String id; private final int sleepTimeInMillis; - public SleepingSubscriber(String id, int sleepTimeInMillis) { + public DummySubscriber(String id, int sleepTimeInMillis) { this.id = id; this.sleepTimeInMillis = sleepTimeInMillis; } @@ -19,8 +21,8 @@ public String getId() { @Override public void consume(Message message) throws InterruptedException { - System.out.println("Subscriber: " + id + " started consuming: " + message.getMsg()); + log.info("Subscriber: " + id + " started consuming: " + message.getMsg()); Thread.sleep(sleepTimeInMillis); - System.out.println("Subscriber: " + id + " done consuming: " + message.getMsg()); + log.info("Subscriber: " + id + " done consuming: " + message.getMsg()); } } diff --git a/src/main/java/com/uditagarwal/Main.java b/src/main/java/com/uditagarwal/Main.java index 4cec004..8c3c26f 100644 --- a/src/main/java/com/uditagarwal/Main.java +++ b/src/main/java/com/uditagarwal/Main.java @@ -1,31 +1,33 @@ package com.uditagarwal; -import com.uditagarwal.pub_sub_queue.public_interface.Queue; +import com.uditagarwal.pub_sub_queue.InMemoryQueue; import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; +import com.uditagarwal.pub_sub_queue.public_interface.IQueue; public class Main { + public static void main(String[] args) throws InterruptedException { - final Queue queue = new Queue(); - final Topic topic1 = queue.createTopic("t1"); - final Topic topic2 = queue.createTopic("t2"); - final SleepingSubscriber sub1 = new SleepingSubscriber("sub1", 10000); - final SleepingSubscriber sub2 = new SleepingSubscriber("sub2", 10000); - queue.subscribe(sub1, topic1); - queue.subscribe(sub2, topic1); -// - final SleepingSubscriber sub3 = new SleepingSubscriber("sub3", 5000); - queue.subscribe(sub3, topic2); - - queue.publish(topic1, new Message("m1")); - queue.publish(topic1, new Message("m2")); - - queue.publish(topic2, new Message("m3")); + final IQueue queue = new InMemoryQueue(); + final String topicName1 = "t1", topicName2 = "t2"; + + queue.createTopic(topicName1); + queue.createTopic(topicName2); + final DummySubscriber sub1 = new DummySubscriber("sub1", 10000); + final DummySubscriber sub2 = new DummySubscriber("sub2", 10000); + queue.addSubscriber(sub1, topicName1); + queue.addSubscriber(sub2, topicName1); + final DummySubscriber sub3 = new DummySubscriber("sub3", 5000); + queue.addSubscriber(sub3, topicName2); + + queue.publishMessage(topicName1, new Message("m1")); + queue.publishMessage(topicName1, new Message("m2")); + + queue.publishMessage(topicName2, new Message("m3")); Thread.sleep(15000); - queue.publish(topic2, new Message("m4")); - queue.publish(topic1, new Message("m5")); + queue.publishMessage(topicName2, new Message("m4")); + queue.publishMessage(topicName1, new Message("m5")); - queue.resetOffset(topic1, sub1, 0); + queue.resetOffset(topicName1, sub1, 0); } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java b/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java new file mode 100644 index 0000000..5eee4ea --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java @@ -0,0 +1,60 @@ +package com.uditagarwal.pub_sub_queue; + +import com.uditagarwal.pub_sub_queue.factories.SubscriberFactory; +import com.uditagarwal.pub_sub_queue.factories.TopicFactory; +import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; +import com.uditagarwal.pub_sub_queue.model.Message; +import com.uditagarwal.pub_sub_queue.model.Topic; +import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.IQueue; +import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j +public class InMemoryQueue implements IQueue { + + private final Map topicNameToProcessorMap; + + public InMemoryQueue() { + this.topicNameToProcessorMap = new HashMap<>(); + } + + public void createTopic(@NonNull final String topicName) { + final Topic topic = TopicFactory.getNewTopic(topicName); + final TopicProcessor topicProcessor = TopicFactory.getNewTopicProcessor(topic); + topicNameToProcessorMap.put(topic.getTopicName(), topicProcessor); + log.info("created topic with name: {}", topicName); + } + + public void addSubscriber(@NonNull final ISubscriber subscriber, @NonNull final String topicName) { + getTopic(topicName).addSubscriber(SubscriberFactory.getTopicSubscriber(subscriber)); + log.info(subscriber.getId() + " subscribed to topic: " + topicName); + } + + private Topic getTopic(String topicName) { + return topicNameToProcessorMap.get(topicName).getTopic(); + } + + public void publishMessage(@NonNull final String topicName, @NonNull final Message message) { + getTopic(topicName).addMessage(message); + log.info(message.getMsg() + " published to topic: " + topicName); + new Thread(() -> topicNameToProcessorMap.get(topicName).publish()).start(); + } + + public void resetOffset(@NonNull final String topicName, @NonNull final ISubscriber subscriber, @NonNull final Integer newOffset) { + List topicSubscriberList = getTopic(topicName).getSubscribers(); + for (TopicSubscriber topicSubscriber : topicSubscriberList) { + if (topicSubscriber.getSubscriber().equals(subscriber)) { + topicSubscriber.getOffset().set(newOffset); + log.info(topicSubscriber.getSubscriber().getId() + " offset reset to: " + newOffset); + new Thread(() -> topicNameToProcessorMap.get(topicName).startSubscriberWorker(topicSubscriber)).start(); + break; + } + } + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java new file mode 100644 index 0000000..2ed758f --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java @@ -0,0 +1,11 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; + +public class SubscriberFactory { + + public static TopicSubscriber getTopicSubscriber(ISubscriber subscriber) { + return new TopicSubscriber(subscriber); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java new file mode 100644 index 0000000..f964a99 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java @@ -0,0 +1,15 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; +import com.uditagarwal.pub_sub_queue.model.Topic; + +import java.util.UUID; + +public class TopicFactory { + public static Topic getNewTopic(String topicName) { + return new Topic(topicName, UUID.randomUUID().toString()); + } + public static TopicProcessor getNewTopicProcessor(final Topic topic) { + return new TopicProcessor(topic); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicHandler.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java similarity index 80% rename from src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicHandler.java rename to src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java index 66bd337..cd44184 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicHandler.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java @@ -2,27 +2,29 @@ import com.uditagarwal.pub_sub_queue.model.Topic; import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import lombok.Getter; import lombok.NonNull; import java.util.HashMap; import java.util.Map; -public class TopicHandler { +@Getter +public class TopicProcessor { private final Topic topic; private final Map subscriberWorkers; - public TopicHandler(@NonNull final Topic topic) { + public TopicProcessor(@NonNull final Topic topic) { this.topic = topic; subscriberWorkers = new HashMap<>(); } public void publish() { for (TopicSubscriber topicSubscriber:topic.getSubscribers()) { - startSubsriberWorker(topicSubscriber); + startSubscriberWorker(topicSubscriber); } } - public void startSubsriberWorker(@NonNull final TopicSubscriber topicSubscriber) { + public void startSubscriberWorker(@NonNull final TopicSubscriber topicSubscriber) { final String subscriberId = topicSubscriber.getSubscriber().getId(); if (!subscriberWorkers.containsKey(subscriberId)) { final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, topicSubscriber); diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java index 94a8cb5..a43ceef 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java @@ -8,14 +8,12 @@ @Getter public class Topic { - private final String topicName; - private final String topicId; - private final List messages; // TODO: Change getter this to send only immutable list outside. - private final List subscribers; // TODO: Change getter this to send only immutable list outside. + private final String topicName; // treating topic name as identifier + private final List messages; + private final List subscribers; public Topic(@NonNull final String topicName, @NonNull final String topicId) { this.topicName = topicName; - this.topicId = topicId; this.messages = new ArrayList<>(); this.subscribers = new ArrayList<>(); } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java new file mode 100644 index 0000000..7875cdf --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java @@ -0,0 +1,15 @@ +package com.uditagarwal.pub_sub_queue.public_interface; + +import com.uditagarwal.pub_sub_queue.model.Message; +import com.uditagarwal.pub_sub_queue.model.Topic; + +public interface IQueue { + + void createTopic(String topicName); + + void addSubscriber(ISubscriber subscriber, String topicName); + + void publishMessage(String topicName, Message message); + + void resetOffset(String topicName, ISubscriber subscriber, Integer newOffset); +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java deleted file mode 100644 index 535138a..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.handler.TopicHandler; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import lombok.NonNull; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -public class Queue { - private final Map topicProcessors; - - public Queue() { - this.topicProcessors = new HashMap<>(); - } - - public Topic createTopic(@NonNull final String topicName) { - final Topic topic = new Topic(topicName, UUID.randomUUID().toString()); - TopicHandler topicHandler = new TopicHandler(topic); - topicProcessors.put(topic.getTopicId(), topicHandler); - System.out.println("Created topic: " + topic.getTopicName()); - return topic; - } - - public void subscribe(@NonNull final ISubscriber subscriber, @NonNull final Topic topic) { - topic.addSubscriber(new TopicSubscriber(subscriber)); - System.out.println(subscriber.getId() + " subscribed to topic: " + topic.getTopicName()); - } - - public void publish(@NonNull final Topic topic, @NonNull final Message message) { - topic.addMessage(message); - System.out.println(message.getMsg() + " published to topic: " + topic.getTopicName()); - new Thread(() -> topicProcessors.get(topic.getTopicId()).publish()).start(); - } - - public void resetOffset(@NonNull final Topic topic, @NonNull final ISubscriber subscriber, @NonNull final Integer newOffset) { - for (TopicSubscriber topicSubscriber : topic.getSubscribers()) { - if (topicSubscriber.getSubscriber().equals(subscriber)) { - topicSubscriber.getOffset().set(newOffset); - System.out.println(topicSubscriber.getSubscriber().getId() + " offset reset to: " + newOffset); - new Thread(() -> topicProcessors.get(topic.getTopicId()).startSubsriberWorker(topicSubscriber)).start(); - break; - } - } - } -} From f57dfc175dc3c882be936542b0f1c06da42f3963 Mon Sep 17 00:00:00 2001 From: Ajay verma Date: Sun, 17 Apr 2022 04:15:54 +0530 Subject: [PATCH 3/4] refactored --- application-20220417.log | 29 +++++++++++++++++++ .../{Main.java => Application.java} | 9 ++++-- .../java/com/uditagarwal/DummySubscriber.java | 4 +-- .../pub_sub_queue/InMemoryQueue.java | 15 +++++----- .../factories/SubscriberFactory.java | 4 +-- .../pub_sub_queue/factories/TopicFactory.java | 5 ++-- .../handler/SubscriberWorker.java | 3 +- .../pub_sub_queue/handler/TopicProcessor.java | 3 +- .../model/{Topic.java => InMemoryTopic.java} | 5 ++-- .../pub_sub_queue/model/Message.java | 1 - .../pub_sub_queue/model/TopicSubscriber.java | 6 ++-- .../public_interface/IQueue.java | 15 ---------- .../pub_sub_queue/public_interface/Queue.java | 10 +++++++ .../{ISubscriber.java => Subscriber.java} | 3 +- .../pub_sub_queue/public_interface/Topic.java | 13 +++++++++ src/main/resources/log4j2.xml | 17 +++++++++++ 16 files changed, 101 insertions(+), 41 deletions(-) create mode 100644 application-20220417.log rename src/main/java/com/uditagarwal/{Main.java => Application.java} (83%) rename src/main/java/com/uditagarwal/pub_sub_queue/model/{Topic.java => InMemoryTopic.java} (80%) delete mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java rename src/main/java/com/uditagarwal/pub_sub_queue/public_interface/{ISubscriber.java => Subscriber.java} (85%) create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java create mode 100644 src/main/resources/log4j2.xml diff --git a/application-20220417.log b/application-20220417.log new file mode 100644 index 0000000..6f3845d --- /dev/null +++ b/application-20220417.log @@ -0,0 +1,29 @@ +2022-04-17 04:11:38.301 [main] INFO com.uditagarwal.Application - starting application +2022-04-17 04:11:38.309 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - created topic with name: t1 +2022-04-17 04:11:38.309 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - created topic with name: t2 +2022-04-17 04:11:38.310 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub1 subscribed to topic: t1 +2022-04-17 04:11:38.310 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub2 subscribed to topic: t1 +2022-04-17 04:11:38.310 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub3 subscribed to topic: t2 +2022-04-17 04:11:38.311 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m1 published to topic: t1 +2022-04-17 04:11:38.394 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m2 published to topic: t1 +2022-04-17 04:11:38.395 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m3 published to topic: t2 +2022-04-17 04:11:38.419 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m1 +2022-04-17 04:11:38.419 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 started consuming: m3 +2022-04-17 04:11:38.419 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 started consuming: m1 +2022-04-17 04:11:43.426 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 done consuming: m3 +2022-04-17 04:11:48.425 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 done consuming: m1 +2022-04-17 04:11:48.425 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 done consuming: m1 +2022-04-17 04:11:48.428 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 started consuming: m2 +2022-04-17 04:11:48.428 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m2 +2022-04-17 04:11:53.417 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m4 published to topic: t2 +2022-04-17 04:11:53.421 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m5 published to topic: t1 +2022-04-17 04:11:53.422 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 started consuming: m4 +2022-04-17 04:11:53.423 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub1 offset reset to: 0 +2022-04-17 04:11:58.428 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 done consuming: m4 +2022-04-17 04:11:58.428 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 done consuming: m2 +2022-04-17 04:11:58.428 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 done consuming: m2 +2022-04-17 04:11:58.428 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m1 +2022-04-17 04:11:58.428 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 started consuming: m5 +2022-04-17 04:12:08.434 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 done consuming: m5 +2022-04-17 04:12:08.434 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 done consuming: m1 +2022-04-17 04:12:08.435 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m2 diff --git a/src/main/java/com/uditagarwal/Main.java b/src/main/java/com/uditagarwal/Application.java similarity index 83% rename from src/main/java/com/uditagarwal/Main.java rename to src/main/java/com/uditagarwal/Application.java index 8c3c26f..8672f80 100644 --- a/src/main/java/com/uditagarwal/Main.java +++ b/src/main/java/com/uditagarwal/Application.java @@ -2,12 +2,15 @@ import com.uditagarwal.pub_sub_queue.InMemoryQueue; import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.public_interface.IQueue; +import com.uditagarwal.pub_sub_queue.public_interface.Queue; +import lombok.extern.slf4j.Slf4j; -public class Main { +@Slf4j +public class Application { public static void main(String[] args) throws InterruptedException { - final IQueue queue = new InMemoryQueue(); + log.info("starting application"); + final Queue queue = new InMemoryQueue(); final String topicName1 = "t1", topicName2 = "t2"; queue.createTopic(topicName1); diff --git a/src/main/java/com/uditagarwal/DummySubscriber.java b/src/main/java/com/uditagarwal/DummySubscriber.java index a1ce9ed..26480fb 100644 --- a/src/main/java/com/uditagarwal/DummySubscriber.java +++ b/src/main/java/com/uditagarwal/DummySubscriber.java @@ -1,11 +1,11 @@ package com.uditagarwal; -import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; import com.uditagarwal.pub_sub_queue.model.Message; import lombok.extern.slf4j.Slf4j; @Slf4j -public class DummySubscriber implements ISubscriber { +public class DummySubscriber implements Subscriber { private final String id; private final int sleepTimeInMillis; diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java b/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java index 5eee4ea..95a25f2 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java @@ -4,10 +4,11 @@ import com.uditagarwal.pub_sub_queue.factories.TopicFactory; import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; +import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import com.uditagarwal.pub_sub_queue.public_interface.IQueue; -import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Queue; +import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Topic; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -16,7 +17,7 @@ import java.util.Map; @Slf4j -public class InMemoryQueue implements IQueue { +public class InMemoryQueue implements Queue { private final Map topicNameToProcessorMap; @@ -27,11 +28,11 @@ public InMemoryQueue() { public void createTopic(@NonNull final String topicName) { final Topic topic = TopicFactory.getNewTopic(topicName); final TopicProcessor topicProcessor = TopicFactory.getNewTopicProcessor(topic); - topicNameToProcessorMap.put(topic.getTopicName(), topicProcessor); + topicNameToProcessorMap.put(topicName, topicProcessor); log.info("created topic with name: {}", topicName); } - public void addSubscriber(@NonNull final ISubscriber subscriber, @NonNull final String topicName) { + public void addSubscriber(@NonNull final Subscriber subscriber, @NonNull final String topicName) { getTopic(topicName).addSubscriber(SubscriberFactory.getTopicSubscriber(subscriber)); log.info(subscriber.getId() + " subscribed to topic: " + topicName); } @@ -46,7 +47,7 @@ public void publishMessage(@NonNull final String topicName, @NonNull final Messa new Thread(() -> topicNameToProcessorMap.get(topicName).publish()).start(); } - public void resetOffset(@NonNull final String topicName, @NonNull final ISubscriber subscriber, @NonNull final Integer newOffset) { + public void resetOffset(@NonNull final String topicName, @NonNull final Subscriber subscriber, @NonNull final Integer newOffset) { List topicSubscriberList = getTopic(topicName).getSubscribers(); for (TopicSubscriber topicSubscriber : topicSubscriberList) { if (topicSubscriber.getSubscriber().equals(subscriber)) { diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java index 2ed758f..da81ff3 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java @@ -1,11 +1,11 @@ package com.uditagarwal.pub_sub_queue.factories; import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; public class SubscriberFactory { - public static TopicSubscriber getTopicSubscriber(ISubscriber subscriber) { + public static TopicSubscriber getTopicSubscriber(Subscriber subscriber) { return new TopicSubscriber(subscriber); } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java index f964a99..a535497 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java @@ -1,13 +1,14 @@ package com.uditagarwal.pub_sub_queue.factories; import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; -import com.uditagarwal.pub_sub_queue.model.Topic; +import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; +import com.uditagarwal.pub_sub_queue.public_interface.Topic; import java.util.UUID; public class TopicFactory { public static Topic getNewTopic(String topicName) { - return new Topic(topicName, UUID.randomUUID().toString()); + return new InMemoryTopic(topicName); } public static TopicProcessor getNewTopicProcessor(final Topic topic) { return new TopicProcessor(topic); diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java index d43ceee..0025b7b 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java @@ -1,8 +1,9 @@ package com.uditagarwal.pub_sub_queue.handler; import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; +import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Topic; import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java index cd44184..d8645bd 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java @@ -1,7 +1,8 @@ package com.uditagarwal.pub_sub_queue.handler; -import com.uditagarwal.pub_sub_queue.model.Topic; +import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Topic; import lombok.Getter; import lombok.NonNull; diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/InMemoryTopic.java similarity index 80% rename from src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java rename to src/main/java/com/uditagarwal/pub_sub_queue/model/InMemoryTopic.java index a43ceef..d9e17f9 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/Topic.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/model/InMemoryTopic.java @@ -1,5 +1,6 @@ package com.uditagarwal.pub_sub_queue.model; +import com.uditagarwal.pub_sub_queue.public_interface.Topic; import lombok.Getter; import lombok.NonNull; @@ -7,12 +8,12 @@ import java.util.List; @Getter -public class Topic { +public class InMemoryTopic implements Topic { private final String topicName; // treating topic name as identifier private final List messages; private final List subscribers; - public Topic(@NonNull final String topicName, @NonNull final String topicId) { + public InMemoryTopic(@NonNull final String topicName) { this.topicName = topicName; this.messages = new ArrayList<>(); this.subscribers = new ArrayList<>(); diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java index 946e339..46e8cab 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java @@ -7,5 +7,4 @@ @Getter public class Message { private final String msg; - } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java index 64d5e71..84c579b 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java @@ -1,6 +1,6 @@ package com.uditagarwal.pub_sub_queue.model; -import com.uditagarwal.pub_sub_queue.public_interface.ISubscriber; +import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; @@ -11,9 +11,9 @@ @AllArgsConstructor public class TopicSubscriber { private final AtomicInteger offset; - private final ISubscriber subscriber; + private final Subscriber subscriber; - public TopicSubscriber(@NonNull final ISubscriber subscriber) { + public TopicSubscriber(@NonNull final Subscriber subscriber) { this.subscriber = subscriber; this.offset = new AtomicInteger(0); } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java deleted file mode 100644 index 7875cdf..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/IQueue.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.Topic; - -public interface IQueue { - - void createTopic(String topicName); - - void addSubscriber(ISubscriber subscriber, String topicName); - - void publishMessage(String topicName, Message message); - - void resetOffset(String topicName, ISubscriber subscriber, Integer newOffset); -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java new file mode 100644 index 0000000..79e3395 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java @@ -0,0 +1,10 @@ +package com.uditagarwal.pub_sub_queue.public_interface; + +import com.uditagarwal.pub_sub_queue.model.Message; + +public interface Queue { + void createTopic(String topicName); + void addSubscriber(Subscriber subscriber, String topicName); + void publishMessage(String topicName, Message message); + void resetOffset(String topicName, Subscriber subscriber, Integer newOffset); +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/ISubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java similarity index 85% rename from src/main/java/com/uditagarwal/pub_sub_queue/public_interface/ISubscriber.java rename to src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java index 8e02fa7..639ad78 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/ISubscriber.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java @@ -2,8 +2,7 @@ import com.uditagarwal.pub_sub_queue.model.Message; -public interface ISubscriber { - +public interface Subscriber { String getId(); void consume(Message message) throws InterruptedException; } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java new file mode 100644 index 0000000..8c5660a --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java @@ -0,0 +1,13 @@ +package com.uditagarwal.pub_sub_queue.public_interface; + +import com.uditagarwal.pub_sub_queue.model.Message; +import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; + +import java.util.List; + +public interface Topic { + void addMessage(Message message); + void addSubscriber(TopicSubscriber subscriber); + List getSubscribers(); + List getMessages(); +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..765ccff --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file From 260a4b474c65537cb38230e165a5ae10479c5d1c Mon Sep 17 00:00:00 2001 From: Ajay verma Date: Sun, 17 Apr 2022 19:34:41 +0530 Subject: [PATCH 4/4] removed FileAppender, Added more abstractions --- application-20220417.log | 29 -------------- problem-statement.md | 2 +- .../java/com/uditagarwal/Application.java | 34 ++++++++-------- .../factories/MessageFactory.java | 9 +++++ .../pub_sub_queue/factories/QueueFactory.java | 10 +++++ .../factories/SubscriberFactory.java | 8 ++-- .../pub_sub_queue/factories/TopicFactory.java | 6 +-- .../handler/SubscriberWorker.java | 27 +++++++------ .../pub_sub_queue/handler/TopicProcessor.java | 15 ++++--- .../impl}/DummySubscriber.java | 17 ++++---- .../{ => impl}/InMemoryQueue.java | 39 ++++++++++--------- .../{model => impl}/InMemoryTopic.java | 10 +++-- .../pub_sub_queue/interfaces/Queue.java | 8 ++++ .../pub_sub_queue/interfaces/Subscriber.java | 12 ++++++ .../pub_sub_queue/interfaces/Topic.java | 12 ++++++ .../pub_sub_queue/model/TopicSubscriber.java | 20 ---------- .../{model => models}/Message.java | 2 +- .../pub_sub_queue/public_interface/Queue.java | 10 ----- .../public_interface/Subscriber.java | 8 ---- .../pub_sub_queue/public_interface/Topic.java | 13 ------- src/main/resources/log4j2.xml | 8 ++-- 21 files changed, 135 insertions(+), 164 deletions(-) delete mode 100644 application-20220417.log create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java rename src/main/java/com/uditagarwal/{ => pub_sub_queue/impl}/DummySubscriber.java (65%) rename src/main/java/com/uditagarwal/pub_sub_queue/{ => impl}/InMemoryQueue.java (52%) rename src/main/java/com/uditagarwal/pub_sub_queue/{model => impl}/InMemoryTopic.java (65%) create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java create mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java delete mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java rename src/main/java/com/uditagarwal/pub_sub_queue/{model => models}/Message.java (75%) delete mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java delete mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java delete mode 100644 src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java diff --git a/application-20220417.log b/application-20220417.log deleted file mode 100644 index 6f3845d..0000000 --- a/application-20220417.log +++ /dev/null @@ -1,29 +0,0 @@ -2022-04-17 04:11:38.301 [main] INFO com.uditagarwal.Application - starting application -2022-04-17 04:11:38.309 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - created topic with name: t1 -2022-04-17 04:11:38.309 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - created topic with name: t2 -2022-04-17 04:11:38.310 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub1 subscribed to topic: t1 -2022-04-17 04:11:38.310 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub2 subscribed to topic: t1 -2022-04-17 04:11:38.310 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub3 subscribed to topic: t2 -2022-04-17 04:11:38.311 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m1 published to topic: t1 -2022-04-17 04:11:38.394 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m2 published to topic: t1 -2022-04-17 04:11:38.395 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m3 published to topic: t2 -2022-04-17 04:11:38.419 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m1 -2022-04-17 04:11:38.419 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 started consuming: m3 -2022-04-17 04:11:38.419 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 started consuming: m1 -2022-04-17 04:11:43.426 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 done consuming: m3 -2022-04-17 04:11:48.425 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 done consuming: m1 -2022-04-17 04:11:48.425 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 done consuming: m1 -2022-04-17 04:11:48.428 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 started consuming: m2 -2022-04-17 04:11:48.428 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m2 -2022-04-17 04:11:53.417 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m4 published to topic: t2 -2022-04-17 04:11:53.421 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - m5 published to topic: t1 -2022-04-17 04:11:53.422 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 started consuming: m4 -2022-04-17 04:11:53.423 [main] INFO com.uditagarwal.pub_sub_queue.InMemoryQueue - sub1 offset reset to: 0 -2022-04-17 04:11:58.428 [Thread-4] INFO com.uditagarwal.DummySubscriber - Subscriber: sub3 done consuming: m4 -2022-04-17 04:11:58.428 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 done consuming: m2 -2022-04-17 04:11:58.428 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 done consuming: m2 -2022-04-17 04:11:58.428 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m1 -2022-04-17 04:11:58.428 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 started consuming: m5 -2022-04-17 04:12:08.434 [Thread-7] INFO com.uditagarwal.DummySubscriber - Subscriber: sub2 done consuming: m5 -2022-04-17 04:12:08.434 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 done consuming: m1 -2022-04-17 04:12:08.435 [Thread-6] INFO com.uditagarwal.DummySubscriber - Subscriber: sub1 started consuming: m2 diff --git a/problem-statement.md b/problem-statement.md index ef0e9ce..1d48670 100644 --- a/problem-statement.md +++ b/problem-statement.md @@ -3,7 +3,7 @@ We have to design a message queue supporting publisher-subscriber model. It shou * It should support multiple topics where messages can be published. * Publisher should be able to publish a message to a particular topic. -* Subscribers should be able to subscribe to a topic. +* Subscribers should be able to subscribe to a topic (Note: subscriber can subscribe to at most 1 topic). * Whenever a message is published to a topic, all the subscribers, who are subscribed to that topic, should receive the message. * Subscribers should be able to run in parallel diff --git a/src/main/java/com/uditagarwal/Application.java b/src/main/java/com/uditagarwal/Application.java index 8672f80..bfbe559 100644 --- a/src/main/java/com/uditagarwal/Application.java +++ b/src/main/java/com/uditagarwal/Application.java @@ -1,36 +1,36 @@ package com.uditagarwal; -import com.uditagarwal.pub_sub_queue.InMemoryQueue; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.public_interface.Queue; +import com.uditagarwal.pub_sub_queue.factories.QueueFactory; +import com.uditagarwal.pub_sub_queue.interfaces.Queue; import lombok.extern.slf4j.Slf4j; @Slf4j public class Application { public static void main(String[] args) throws InterruptedException { - log.info("starting application"); - final Queue queue = new InMemoryQueue(); + log.info("Started application"); + final Queue queue = QueueFactory.getInMemoryQueue(); final String topicName1 = "t1", topicName2 = "t2"; queue.createTopic(topicName1); queue.createTopic(topicName2); - final DummySubscriber sub1 = new DummySubscriber("sub1", 10000); - final DummySubscriber sub2 = new DummySubscriber("sub2", 10000); - queue.addSubscriber(sub1, topicName1); - queue.addSubscriber(sub2, topicName1); - final DummySubscriber sub3 = new DummySubscriber("sub3", 5000); - queue.addSubscriber(sub3, topicName2); - queue.publishMessage(topicName1, new Message("m1")); - queue.publishMessage(topicName1, new Message("m2")); + queue.addSubscriber("sub1", topicName1, 10000); + queue.addSubscriber("sub2", topicName1, 10000); + queue.addSubscriber("sub3", topicName2, 5000); - queue.publishMessage(topicName2, new Message("m3")); + queue.publishMessage(topicName1, "m1"); + queue.publishMessage(topicName1, "m2"); + queue.publishMessage(topicName2, "m3"); + log.info("Sleeping for 15 seconds ..."); Thread.sleep(15000); - queue.publishMessage(topicName2, new Message("m4")); - queue.publishMessage(topicName1, new Message("m5")); + log.info("Woke up after sleeping for 15 seconds ..."); - queue.resetOffset(topicName1, sub1, 0); + queue.publishMessage(topicName2, "m4"); + queue.publishMessage(topicName1, "m5"); + + queue.resetOffset(topicName1, "sub1", 0); + log.info("Stopped application"); } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java new file mode 100644 index 0000000..62ab979 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/MessageFactory.java @@ -0,0 +1,9 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.models.Message; + +public class MessageFactory { + public static Message getNewMessage(String message) { + return new Message(message); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java new file mode 100644 index 0000000..75c72fc --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/QueueFactory.java @@ -0,0 +1,10 @@ +package com.uditagarwal.pub_sub_queue.factories; + +import com.uditagarwal.pub_sub_queue.impl.InMemoryQueue; +import com.uditagarwal.pub_sub_queue.interfaces.Queue; + +public class QueueFactory { + public static Queue getInMemoryQueue() { + return new InMemoryQueue(); + } +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java index da81ff3..f093c53 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/SubscriberFactory.java @@ -1,11 +1,11 @@ package com.uditagarwal.pub_sub_queue.factories; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; +import com.uditagarwal.pub_sub_queue.impl.DummySubscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; public class SubscriberFactory { - public static TopicSubscriber getTopicSubscriber(Subscriber subscriber) { - return new TopicSubscriber(subscriber); + public static Subscriber getDummySubscriber(String subscriberId, int sleepTimeInMs) { + return new DummySubscriber(subscriberId, sleepTimeInMs); } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java index a535497..7286048 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/factories/TopicFactory.java @@ -1,10 +1,8 @@ package com.uditagarwal.pub_sub_queue.factories; import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; -import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; -import com.uditagarwal.pub_sub_queue.public_interface.Topic; - -import java.util.UUID; +import com.uditagarwal.pub_sub_queue.impl.InMemoryTopic; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; public class TopicFactory { public static Topic getNewTopic(String topicName) { diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java index 0025b7b..d016b5a 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/SubscriberWorker.java @@ -1,9 +1,8 @@ package com.uditagarwal.pub_sub_queue.handler; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import com.uditagarwal.pub_sub_queue.public_interface.Topic; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.models.Message; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; import lombok.Getter; import lombok.NonNull; import lombok.SneakyThrows; @@ -12,35 +11,35 @@ public class SubscriberWorker implements Runnable { private final Topic topic; - private final TopicSubscriber topicSubscriber; + private final Subscriber subscriber; - public SubscriberWorker(@NonNull final Topic topic, @NonNull final TopicSubscriber topicSubscriber) { + public SubscriberWorker(@NonNull final Topic topic, @NonNull final Subscriber subscriber) { this.topic = topic; - this.topicSubscriber = topicSubscriber; + this.subscriber = subscriber; } @SneakyThrows @Override public void run() { - synchronized (topicSubscriber) { + synchronized (subscriber) { do { - int curOffset = topicSubscriber.getOffset().get(); + int curOffset = subscriber.getOffset().get(); while (curOffset >= topic.getMessages().size()) { - topicSubscriber.wait(); + subscriber.wait(); } Message message = topic.getMessages().get(curOffset); - topicSubscriber.getSubscriber().consume(message); + subscriber.consume(message); // We cannot just increment here since subscriber offset can be reset while it is consuming. So, after // consuming we need to increase only if it was previous one. - topicSubscriber.getOffset().compareAndSet(curOffset, curOffset + 1); + subscriber.getOffset().compareAndSet(curOffset, curOffset + 1); } while (true); } } synchronized public void wakeUpIfNeeded() { - synchronized (topicSubscriber) { - topicSubscriber.notify(); + synchronized (subscriber) { + subscriber.notify(); } } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java index d8645bd..3eee8f2 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/handler/TopicProcessor.java @@ -1,8 +1,7 @@ package com.uditagarwal.pub_sub_queue.handler; -import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import com.uditagarwal.pub_sub_queue.public_interface.Topic; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; import lombok.Getter; import lombok.NonNull; @@ -20,15 +19,15 @@ public TopicProcessor(@NonNull final Topic topic) { } public void publish() { - for (TopicSubscriber topicSubscriber:topic.getSubscribers()) { - startSubscriberWorker(topicSubscriber); + for (Subscriber subscriber : topic.getSubscribers()) { + startSubscriberWorker(subscriber); } } - public void startSubscriberWorker(@NonNull final TopicSubscriber topicSubscriber) { - final String subscriberId = topicSubscriber.getSubscriber().getId(); + public void startSubscriberWorker(@NonNull final Subscriber subscriber) { + final String subscriberId = subscriber.getId(); if (!subscriberWorkers.containsKey(subscriberId)) { - final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, topicSubscriber); + final SubscriberWorker subscriberWorker = new SubscriberWorker(topic, subscriber); subscriberWorkers.put(subscriberId, subscriberWorker); new Thread(subscriberWorker).start(); } diff --git a/src/main/java/com/uditagarwal/DummySubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/impl/DummySubscriber.java similarity index 65% rename from src/main/java/com/uditagarwal/DummySubscriber.java rename to src/main/java/com/uditagarwal/pub_sub_queue/impl/DummySubscriber.java index 26480fb..6eff80b 100644 --- a/src/main/java/com/uditagarwal/DummySubscriber.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/impl/DummySubscriber.java @@ -1,22 +1,23 @@ -package com.uditagarwal; +package com.uditagarwal.pub_sub_queue.impl; -import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; -import com.uditagarwal.pub_sub_queue.model.Message; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.models.Message; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.atomic.AtomicInteger; + @Slf4j +@Getter public class DummySubscriber implements Subscriber { private final String id; private final int sleepTimeInMillis; + private final AtomicInteger offset; public DummySubscriber(String id, int sleepTimeInMillis) { this.id = id; this.sleepTimeInMillis = sleepTimeInMillis; - } - - @Override - public String getId() { - return id; + this.offset = new AtomicInteger(0); } @Override diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryQueue.java similarity index 52% rename from src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java rename to src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryQueue.java index 95a25f2..3757ce6 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/InMemoryQueue.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryQueue.java @@ -1,14 +1,12 @@ -package com.uditagarwal.pub_sub_queue; +package com.uditagarwal.pub_sub_queue.impl; +import com.uditagarwal.pub_sub_queue.factories.MessageFactory; import com.uditagarwal.pub_sub_queue.factories.SubscriberFactory; import com.uditagarwal.pub_sub_queue.factories.TopicFactory; import com.uditagarwal.pub_sub_queue.handler.TopicProcessor; -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.InMemoryTopic; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; -import com.uditagarwal.pub_sub_queue.public_interface.Queue; -import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; -import com.uditagarwal.pub_sub_queue.public_interface.Topic; +import com.uditagarwal.pub_sub_queue.interfaces.Queue; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -32,8 +30,11 @@ public void createTopic(@NonNull final String topicName) { log.info("created topic with name: {}", topicName); } - public void addSubscriber(@NonNull final Subscriber subscriber, @NonNull final String topicName) { - getTopic(topicName).addSubscriber(SubscriberFactory.getTopicSubscriber(subscriber)); + public void addSubscriber(@NonNull final String subscriberId, + @NonNull final String topicName, + final int sleepTimeInMs) { + Subscriber subscriber = SubscriberFactory.getDummySubscriber(subscriberId, sleepTimeInMs); + getTopic(topicName).addSubscriber(subscriber); log.info(subscriber.getId() + " subscribed to topic: " + topicName); } @@ -41,19 +42,19 @@ private Topic getTopic(String topicName) { return topicNameToProcessorMap.get(topicName).getTopic(); } - public void publishMessage(@NonNull final String topicName, @NonNull final Message message) { - getTopic(topicName).addMessage(message); - log.info(message.getMsg() + " published to topic: " + topicName); + public void publishMessage(@NonNull final String topicName, @NonNull final String message) { + getTopic(topicName).addMessage(MessageFactory.getNewMessage(message)); + log.info(message + " published to topic: " + topicName); new Thread(() -> topicNameToProcessorMap.get(topicName).publish()).start(); } - public void resetOffset(@NonNull final String topicName, @NonNull final Subscriber subscriber, @NonNull final Integer newOffset) { - List topicSubscriberList = getTopic(topicName).getSubscribers(); - for (TopicSubscriber topicSubscriber : topicSubscriberList) { - if (topicSubscriber.getSubscriber().equals(subscriber)) { - topicSubscriber.getOffset().set(newOffset); - log.info(topicSubscriber.getSubscriber().getId() + " offset reset to: " + newOffset); - new Thread(() -> topicNameToProcessorMap.get(topicName).startSubscriberWorker(topicSubscriber)).start(); + public void resetOffset(@NonNull final String topicName, @NonNull final String subscriberId, @NonNull final Integer newOffset) { + List subscriberList = getTopic(topicName).getSubscribers(); + for (Subscriber subscriber : subscriberList) { + if (subscriber.getId().equals(subscriberId)) { + subscriber.getOffset().set(newOffset); + log.info(subscriber.getId() + " offset reset to: " + newOffset); + new Thread(() -> topicNameToProcessorMap.get(topicName).startSubscriberWorker(subscriber)).start(); break; } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/InMemoryTopic.java b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryTopic.java similarity index 65% rename from src/main/java/com/uditagarwal/pub_sub_queue/model/InMemoryTopic.java rename to src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryTopic.java index d9e17f9..d390fa6 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/InMemoryTopic.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/impl/InMemoryTopic.java @@ -1,6 +1,8 @@ -package com.uditagarwal.pub_sub_queue.model; +package com.uditagarwal.pub_sub_queue.impl; -import com.uditagarwal.pub_sub_queue.public_interface.Topic; +import com.uditagarwal.pub_sub_queue.interfaces.Subscriber; +import com.uditagarwal.pub_sub_queue.interfaces.Topic; +import com.uditagarwal.pub_sub_queue.models.Message; import lombok.Getter; import lombok.NonNull; @@ -11,7 +13,7 @@ public class InMemoryTopic implements Topic { private final String topicName; // treating topic name as identifier private final List messages; - private final List subscribers; + private final List subscribers; public InMemoryTopic(@NonNull final String topicName) { this.topicName = topicName; @@ -23,7 +25,7 @@ public synchronized void addMessage(@NonNull final Message message) { messages.add(message); } - public void addSubscriber(@NonNull final TopicSubscriber subscriber) { + public void addSubscriber(@NonNull final Subscriber subscriber) { subscribers.add(subscriber); } } diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java new file mode 100644 index 0000000..8f30cf5 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Queue.java @@ -0,0 +1,8 @@ +package com.uditagarwal.pub_sub_queue.interfaces; + +public interface Queue { + void createTopic(String topicName); + void addSubscriber(String subscriberId, String topicName, int sleepTimeInMs); + void publishMessage(String topicName, String message); + void resetOffset(String topicName, String subscriberId, Integer newOffset); +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java new file mode 100644 index 0000000..9f15cf8 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Subscriber.java @@ -0,0 +1,12 @@ +package com.uditagarwal.pub_sub_queue.interfaces; + +import com.uditagarwal.pub_sub_queue.models.Message; + +import java.util.concurrent.atomic.AtomicInteger; + +public interface Subscriber { + String getId(); + int getSleepTimeInMillis(); + AtomicInteger getOffset(); + void consume(Message message) throws InterruptedException; +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java new file mode 100644 index 0000000..6b4dff5 --- /dev/null +++ b/src/main/java/com/uditagarwal/pub_sub_queue/interfaces/Topic.java @@ -0,0 +1,12 @@ +package com.uditagarwal.pub_sub_queue.interfaces; + +import com.uditagarwal.pub_sub_queue.models.Message; + +import java.util.List; + +public interface Topic { + void addMessage(Message message); + void addSubscriber(Subscriber subscriber); + List getSubscribers(); + List getMessages(); +} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java deleted file mode 100644 index 84c579b..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/TopicSubscriber.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.uditagarwal.pub_sub_queue.model; - -import com.uditagarwal.pub_sub_queue.public_interface.Subscriber; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NonNull; - -import java.util.concurrent.atomic.AtomicInteger; - -@Getter -@AllArgsConstructor -public class TopicSubscriber { - private final AtomicInteger offset; - private final Subscriber subscriber; - - public TopicSubscriber(@NonNull final Subscriber subscriber) { - this.subscriber = subscriber; - this.offset = new AtomicInteger(0); - } -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java b/src/main/java/com/uditagarwal/pub_sub_queue/models/Message.java similarity index 75% rename from src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java rename to src/main/java/com/uditagarwal/pub_sub_queue/models/Message.java index 46e8cab..2f0cb88 100644 --- a/src/main/java/com/uditagarwal/pub_sub_queue/model/Message.java +++ b/src/main/java/com/uditagarwal/pub_sub_queue/models/Message.java @@ -1,4 +1,4 @@ -package com.uditagarwal.pub_sub_queue.model; +package com.uditagarwal.pub_sub_queue.models; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java deleted file mode 100644 index 79e3395..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Queue.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.model.Message; - -public interface Queue { - void createTopic(String topicName); - void addSubscriber(Subscriber subscriber, String topicName); - void publishMessage(String topicName, Message message); - void resetOffset(String topicName, Subscriber subscriber, Integer newOffset); -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java deleted file mode 100644 index 639ad78..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Subscriber.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.model.Message; - -public interface Subscriber { - String getId(); - void consume(Message message) throws InterruptedException; -} diff --git a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java b/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java deleted file mode 100644 index 8c5660a..0000000 --- a/src/main/java/com/uditagarwal/pub_sub_queue/public_interface/Topic.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.uditagarwal.pub_sub_queue.public_interface; - -import com.uditagarwal.pub_sub_queue.model.Message; -import com.uditagarwal.pub_sub_queue.model.TopicSubscriber; - -import java.util.List; - -public interface Topic { - void addMessage(Message message); - void addSubscriber(TopicSubscriber subscriber); - List getSubscribers(); - List getMessages(); -} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 765ccff..56126c3 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -4,14 +4,14 @@ - - - + + + - + \ No newline at end of file