22use std:: cell:: { Cell , UnsafeCell } ;
33use std:: iter;
44#[ allow( unused) ]
5- use std:: sync:: { Arc , RwLock , RwLockReadGuard , RwLockWriteGuard , TryLockError } ;
5+ use std:: sync:: { Arc , Mutex , RwLock , RwLockReadGuard , RwLockWriteGuard , TryLockError } ;
66#[ allow( unused) ]
77use std:: thread;
88use std:: time:: Duration ;
99
10+ #[ cfg( feature = "with_client_implementation" ) ]
11+ use fragile:: Sticky ;
12+
1013#[ allow( unused) ]
1114use std:: sync:: atomic:: { AtomicBool , Ordering } ;
1215
@@ -75,25 +78,16 @@ impl<F: FnOnce() -> I, I: IntoBreadcrumbs> IntoBreadcrumbs for F {
7578 }
7679}
7780
78- #[ doc( hidden) ]
79- #[ cfg( feature = "with_client_implementation" ) ]
80- pub struct PendingProcessors (
81- Vec < (
82- Option < thread:: ThreadId > ,
83- Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > > ,
84- ) > ,
85- ) ;
86-
87- // this is not actually sync but our uses of it are
88- #[ cfg( feature = "with_client_implementation" ) ]
89- unsafe impl Sync for PendingProcessors { }
9081#[ cfg( feature = "with_client_implementation" ) ]
91- unsafe impl Send for PendingProcessors { }
82+ enum PendingProcessor {
83+ Send ( Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > + Send + Sync > ) ,
84+ NonSend ( Sticky < Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > > > ) ,
85+ }
9286
9387#[ cfg( feature = "with_client_implementation" ) ]
9488struct HubImpl {
9589 stack : RwLock < Stack > ,
96- pending_processors : RwLock < PendingProcessors > ,
90+ pending_processors : Mutex < Vec < PendingProcessor > > ,
9791 has_pending_processors : AtomicBool ,
9892}
9993
@@ -109,9 +103,9 @@ impl HubImpl {
109103 f ( & mut * guard)
110104 }
111105
112- fn with_processors_mut < F : FnOnce ( & mut PendingProcessors ) -> R , R > ( & self , f : F ) -> R {
106+ fn with_processors_mut < F : FnOnce ( & mut Vec < PendingProcessor > ) -> R , R > ( & self , f : F ) -> R {
113107 f ( & mut * self . pending_processors
114- . write ( )
108+ . lock ( )
115109 . unwrap_or_else ( |x| x. into_inner ( ) ) )
116110 }
117111
@@ -160,7 +154,7 @@ impl Hub {
160154 Hub {
161155 inner : HubImpl {
162156 stack : RwLock :: new ( Stack :: from_client_and_scope ( client, scope) ) ,
163- pending_processors : RwLock :: new ( PendingProcessors ( vec ! [ ] ) ) ,
157+ pending_processors : Mutex :: new ( vec ! [ ] ) ,
164158 has_pending_processors : AtomicBool :: new ( false ) ,
165159 } ,
166160 }
@@ -423,7 +417,7 @@ impl Hub {
423417 pub fn add_event_processor ( & self , f : Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > > ) {
424418 with_client_impl ! { {
425419 self . inner. with_processors_mut( |pending| {
426- pending. 0 . push( ( Some ( thread :: current ( ) . id ( ) ) , f ) ) ;
420+ pending. push( PendingProcessor :: NonSend ( Sticky :: new ( f ) ) ) ;
427421 } ) ;
428422 self . inner. has_pending_processors. store( true , Ordering :: Release ) ;
429423 } }
@@ -438,12 +432,12 @@ impl Hub {
438432 #[ allow( unused) ]
439433 pub fn add_send_event_processor (
440434 & self ,
441- f : Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > + Send > ,
435+ f : Box < FnOnce ( ) -> Box < Fn ( & mut Event ) + Send + Sync > + Send + Sync > ,
442436 ) {
443437 with_client_impl ! { {
444438 use std:: mem;
445439 self . inner. with_processors_mut( |pending| {
446- pending. 0 . push( ( None , unsafe { mem :: transmute ( f ) } ) ) ;
440+ pending. push( PendingProcessor :: Send ( f ) ) ;
447441 } ) ;
448442 self . inner. has_pending_processors. store( true , Ordering :: Release ) ;
449443 } }
@@ -455,25 +449,27 @@ impl Hub {
455449 return ;
456450 }
457451 let mut new_processors = vec![ ] ;
458- let this_thread = thread:: current( ) . id( ) ;
459- let any_left = self . inner. with_processors_mut( |pending| {
460- let vec = & mut pending. 0 ;
452+ let any_left = self . inner. with_processors_mut( |vec| {
461453 let mut i = 0 ;
462454 while i < vec. len( ) {
463- let is_safe_call = match vec[ i] . 0 {
464- None => true ,
465- Some ( thread_id ) => thread_id == this_thread ,
455+ let is_safe_call = match vec[ i] {
456+ PendingProcessor :: Send ( .. ) => true ,
457+ PendingProcessor :: NonSend ( ref sticky ) => sticky . is_valid ( ) ,
466458 } ;
467- if is_safe_call {
468- let mut item = vec. remove( i) ;
469- let processor: & mut Box <FnMut ( ) -> Box <Fn ( & mut Event ) + Send + Sync >> = unsafe {
470- use std:: mem;
471- mem:: transmute( & mut item. 1 )
472- } ;
473- new_processors. push( processor( ) ) ;
474- } else {
459+ if !is_safe_call {
475460 i += 1 ;
461+ continue ;
476462 }
463+
464+ let mut item = vec. remove( i) ;
465+ let processor: & mut Box <FnMut ( ) -> Box <Fn ( & mut Event ) + Send + Sync >> = unsafe {
466+ use std:: mem;
467+ match item {
468+ PendingProcessor :: Send ( ref mut func) => mem:: transmute( & mut * func) ,
469+ PendingProcessor :: NonSend ( ref mut func) => mem:: transmute( func. get_mut( ) ) ,
470+ }
471+ } ;
472+ new_processors. push( processor( ) ) ;
477473 }
478474 !vec. is_empty( )
479475 } ) ;
0 commit comments