Skip to content

Fix multithreading support for Subscribers #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::sync::{Arc, Mutex, RwLock};
mod tests;

pub struct EventBus<ContentType> {
next_event_id: Arc<Mutex<u64>>,
next_event_id: Arc<Mutex<u64>>, // TODO: possible overflow here
// RwLock is we do not expect many writes, but many reads
subscribers: RwLock<Vec<Arc<Mutex<dyn Subscriber<ContentType>>>>>,
subscribers: RwLock<Vec<Arc<Mutex<dyn Subscriber<ContentType>>>>>, // TODO: conider generic type?
}

impl<ContentType> EventBus<ContentType> {
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
// e-mail: mail@agramakov.me
//
// *************************************************************************
// TODO: eliminate unwraps

#[cfg(test)]
mod tests;

mod bus;
mod event;
mod publisher;
Expand Down
2 changes: 1 addition & 1 deletion src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ impl<ContentType> EventEmitter<ContentType> {
}
}

pub trait Publisher<ContentType> {
pub trait Publisher<ContentType>: Send {
fn get_mut_emitter(&mut self) -> &mut EventEmitter<ContentType>;
}
2 changes: 1 addition & 1 deletion src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ use super::Event;
#[cfg(test)]
mod tests;

pub trait Subscriber<ContentType> {
pub trait Subscriber<ContentType>: Send {
fn on_event(&mut self, event: &Event<ContentType>);
}
154 changes: 154 additions & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// *************************************************************************
//
// Copyright (c) 2025 Andrei Gramakov. All rights reserved.
//
// This file is licensed under the terms of the MIT license.
// For a copy, see: https://opensource.org/licenses/MIT
//
// site: https://agramakov.me
// e-mail: mail@agramakov.me
//
// *************************************************************************
use crate::{Event, EventBus, EventEmitter, Publisher, Subscriber};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
thread,
};

// global counter
const ITERATIONS: usize = 1000000;
static COUNTER: AtomicUsize = AtomicUsize::new(0);

struct TestEvent {
source: u64,
destination: u64,
value: i32,
}

struct TestSubscriber {
id: u64,
}

impl Subscriber<TestEvent> for TestSubscriber {
fn on_event(&mut self, event: &Event<TestEvent>) {
let content = event.get_content();
if content.destination != self.id {
return;
}
COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
// println!("Received event with content: {}", content.value);
}
}

struct TestPublisher {
publisher_value: i32,
pub emitter: EventEmitter<TestEvent>,
self_id: u64,
}

impl TestPublisher {
pub fn new(value: i32) -> Self {
let publisher = EventEmitter::new();
Self {
publisher_value: value,
emitter: publisher,
self_id: 0,
}
}

pub fn publish_to(&mut self, destination: u64) {
let event = TestEvent {
destination,
value: self.publisher_value,
source: self.self_id,
};

self.emitter.publish(event);
}
}

impl Publisher<TestEvent> for TestPublisher {
fn get_mut_emitter(&mut self) -> &mut EventEmitter<TestEvent> {
&mut self.emitter
}
}

#[test]
fn test_multithreading() {
let bus = Arc::new(EventBus::new());

let subscriber1 = Arc::new(Mutex::new(TestSubscriber { id: 1 }));
let subscriber2 = Arc::new(Mutex::new(TestSubscriber { id: 2 }));

bus.add_subscriber(subscriber1.clone());
bus.add_subscriber(subscriber2.clone());

let mut publisher1 = TestPublisher::new(42);
let mut publisher2 = TestPublisher::new(24);
let mut publisher3 = TestPublisher::new(100);
let mut publisher4 = TestPublisher::new(200);
let mut publisher5 = TestPublisher::new(300);

bus.add_publisher(&mut publisher1);
bus.add_publisher(&mut publisher2);
bus.add_publisher(&mut publisher3);
bus.add_publisher(&mut publisher4);
bus.add_publisher(&mut publisher5);

let handle1 = thread::Builder::new()
.name("publishing 1".to_string())
.spawn(move || {
for _ in 0..ITERATIONS {
publisher1.publish_to(1);
}
})
.unwrap();

let handle2 = thread::Builder::new()
.name("poublishing 2".to_string())
.spawn(move || {
for _ in 0..ITERATIONS {
publisher2.publish_to(2);
}
})
.unwrap();

let handle3 = thread::Builder::new()
.name("publishing 3".to_string())
.spawn(move || {
for _ in 0..ITERATIONS {
publisher3.publish_to(2);
}
})
.unwrap();

let handle4 = thread::Builder::new()
.name("publishing 4".to_string())
.spawn(move || {
for _ in 0..ITERATIONS {
publisher4.publish_to(1);
}
})
.unwrap();

let handle5 = thread::Builder::new()
.name("publishing 5".to_string())
.spawn(move || {
for _ in 0..ITERATIONS {
publisher5.publish_to(1);
}
})
.unwrap();

handle1.join().unwrap();
handle2.join().unwrap();
handle3.join().unwrap();
handle4.join().unwrap();
handle5.join().unwrap();

println!("COUNTER: {}", COUNTER.load(Ordering::SeqCst));
assert_eq!(COUNTER.load(Ordering::SeqCst), ITERATIONS * 5);
}