diff --git a/src/bus.rs b/src/bus.rs index c1572d5..13f9751 100644 --- a/src/bus.rs +++ b/src/bus.rs @@ -16,9 +16,9 @@ use std::sync::{Arc, Mutex, RwLock}; mod tests; pub struct EventBus { - next_event_id: Arc>, + next_event_id: Arc>, // TODO: possible overflow here // RwLock is we do not expect many writes, but many reads - subscribers: RwLock>>>>, + subscribers: RwLock>>>>, // TODO: conider generic type? } impl EventBus { diff --git a/src/lib.rs b/src/lib.rs index 9737cd2..547ed03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,11 @@ // e-mail: mail@agramakov.me // // ************************************************************************* +// TODO: eliminate unwraps + +#[cfg(test)] +mod tests; + mod bus; mod event; mod publisher; diff --git a/src/publisher.rs b/src/publisher.rs index f509418..b0f57cd 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -48,6 +48,6 @@ impl EventEmitter { } } -pub trait Publisher { +pub trait Publisher: Send { fn get_mut_emitter(&mut self) -> &mut EventEmitter; } diff --git a/src/subscriber.rs b/src/subscriber.rs index d372fc7..8ae80a1 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -14,6 +14,6 @@ use super::Event; #[cfg(test)] mod tests; -pub trait Subscriber { +pub trait Subscriber: Send { fn on_event(&mut self, event: &Event); } diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 0000000..387388b --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,154 @@ +// ************************************************************************* +// +// Copyright (c) 2025 Andrei Gramakov. All rights reserved. +// +// This file is licensed under the terms of the MIT license. +// For a copy, see: https://opensource.org/licenses/MIT +// +// site: https://agramakov.me +// e-mail: mail@agramakov.me +// +// ************************************************************************* +use crate::{Event, EventBus, EventEmitter, Publisher, Subscriber}; +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + thread, +}; + +// global counter +const ITERATIONS: usize = 1000000; +static COUNTER: AtomicUsize = AtomicUsize::new(0); + +struct TestEvent { + source: u64, + destination: u64, + value: i32, +} + +struct TestSubscriber { + id: u64, +} + +impl Subscriber for TestSubscriber { + fn on_event(&mut self, event: &Event) { + let content = event.get_content(); + if content.destination != self.id { + return; + } + COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + // println!("Received event with content: {}", content.value); + } +} + +struct TestPublisher { + publisher_value: i32, + pub emitter: EventEmitter, + self_id: u64, +} + +impl TestPublisher { + pub fn new(value: i32) -> Self { + let publisher = EventEmitter::new(); + Self { + publisher_value: value, + emitter: publisher, + self_id: 0, + } + } + + pub fn publish_to(&mut self, destination: u64) { + let event = TestEvent { + destination, + value: self.publisher_value, + source: self.self_id, + }; + + self.emitter.publish(event); + } +} + +impl Publisher for TestPublisher { + fn get_mut_emitter(&mut self) -> &mut EventEmitter { + &mut self.emitter + } +} + +#[test] +fn test_multithreading() { + let bus = Arc::new(EventBus::new()); + + let subscriber1 = Arc::new(Mutex::new(TestSubscriber { id: 1 })); + let subscriber2 = Arc::new(Mutex::new(TestSubscriber { id: 2 })); + + bus.add_subscriber(subscriber1.clone()); + bus.add_subscriber(subscriber2.clone()); + + let mut publisher1 = TestPublisher::new(42); + let mut publisher2 = TestPublisher::new(24); + let mut publisher3 = TestPublisher::new(100); + let mut publisher4 = TestPublisher::new(200); + let mut publisher5 = TestPublisher::new(300); + + bus.add_publisher(&mut publisher1); + bus.add_publisher(&mut publisher2); + bus.add_publisher(&mut publisher3); + bus.add_publisher(&mut publisher4); + bus.add_publisher(&mut publisher5); + + let handle1 = thread::Builder::new() + .name("publishing 1".to_string()) + .spawn(move || { + for _ in 0..ITERATIONS { + publisher1.publish_to(1); + } + }) + .unwrap(); + + let handle2 = thread::Builder::new() + .name("poublishing 2".to_string()) + .spawn(move || { + for _ in 0..ITERATIONS { + publisher2.publish_to(2); + } + }) + .unwrap(); + + let handle3 = thread::Builder::new() + .name("publishing 3".to_string()) + .spawn(move || { + for _ in 0..ITERATIONS { + publisher3.publish_to(2); + } + }) + .unwrap(); + + let handle4 = thread::Builder::new() + .name("publishing 4".to_string()) + .spawn(move || { + for _ in 0..ITERATIONS { + publisher4.publish_to(1); + } + }) + .unwrap(); + + let handle5 = thread::Builder::new() + .name("publishing 5".to_string()) + .spawn(move || { + for _ in 0..ITERATIONS { + publisher5.publish_to(1); + } + }) + .unwrap(); + + handle1.join().unwrap(); + handle2.join().unwrap(); + handle3.join().unwrap(); + handle4.join().unwrap(); + handle5.join().unwrap(); + + println!("COUNTER: {}", COUNTER.load(Ordering::SeqCst)); + assert_eq!(COUNTER.load(Ordering::SeqCst), ITERATIONS * 5); +}