Skip to content

Commit a9438ae

Browse files
committed
Use a real-time queue for TQueue
This is another alternative design for `TQueue`. Instead of an amortized queue, this uses a real-time one based on Okasaki's scheduled banker's queues. We limit contention by using two independent schedules.
1 parent 92af455 commit a9438ae

File tree

1 file changed

+104
-65
lines changed

1 file changed

+104
-65
lines changed

Control/Concurrent/STM/TQueue.hs

Lines changed: 104 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
22
{-# LANGUAGE CPP, DeriveDataTypeable #-}
3+
{-# LANGUAGE BangPatterns #-}
34

45
#if __GLASGOW_HASKELL__ >= 701
56
{-# LANGUAGE Trustworthy #-}
@@ -17,16 +18,14 @@
1718
--
1819
-- A 'TQueue' is like a 'TChan', with two important differences:
1920
--
20-
-- * it has faster throughput than both 'TChan' and 'Chan' (although
21-
-- the costs are amortised, so the cost of individual operations
22-
-- can vary a lot).
21+
-- * it has faster throughput than both 'TChan' and 'Chan'
2322
--
2423
-- * it does /not/ provide equivalents of the 'dupTChan' and
2524
-- 'cloneTChan' operations.
2625
--
27-
-- The implementation is based on the traditional purely-functional
28-
-- queue representation that uses two lists to obtain amortised /O(1)/
29-
-- enqueue and dequeue operations.
26+
-- The implementation is based on Okasaki's scheduled banker's queues,
27+
-- but it uses *two* schedules so there's only contention between the
28+
-- reader and writer when the queue needs to be rotated.
3029
--
3130
-- @since 2.4
3231
-----------------------------------------------------------------------------
@@ -44,63 +43,109 @@ module Control.Concurrent.STM.TQueue (
4443
writeTQueue,
4544
unGetTQueue,
4645
isEmptyTQueue,
47-
) where
46+
) where
4847

4948
import GHC.Conc
5049
import Control.Monad (unless)
5150
import Data.Typeable (Typeable)
5251

52+
data End a =
53+
End [a] -- list
54+
[a] -- schedule
55+
5356
-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
5457
--
5558
-- @since 2.4
56-
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
57-
{-# UNPACK #-} !(TVar [a])
59+
data TQueue a = TQueue {-# UNPACK #-} !(TVar (End a))
60+
{-# UNPACK #-} !(TVar (End a))
5861
deriving Typeable
62+
{-
63+
Invariant:
64+
65+
Given front list, rear list, front schedule, and rear schedule called
66+
front, rear, fsched, and rsched, respectively,
67+
68+
2 * (|front| - |rear|) = |fsched| + |rsched|
69+
70+
Note that because lengths cannot be negative, this implies that
71+
72+
|front| >= |rear|
73+
74+
We rotate the queue when either schedule is empty. This preserves
75+
the invariant and ensures that the spine of the front list is
76+
fully realized when a rotation occurs. The spine of the rear list
77+
is *always* fully realized. We could use a strict-spined list for
78+
the rear, but it doesn't really seem to be worth the trouble.
79+
-}
5980

6081
instance Eq (TQueue a) where
6182
TQueue a _ == TQueue b _ = a == b
6283

63-
-- |Build and returns a new instance of 'TQueue'
84+
-- | Build and returns a new instance of 'TQueue'
6485
newTQueue :: STM (TQueue a)
6586
newTQueue = do
66-
read <- newTVar []
67-
write <- newTVar []
87+
read <- newTVar (End [] [])
88+
write <- newTVar (End [] [])
6889
return (TQueue read write)
6990

70-
-- |@IO@ version of 'newTQueue'. This is useful for creating top-level
91+
-- | @IO@ version of 'newTQueue'. This is useful for creating top-level
7192
-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
7293
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
7394
-- possible.
7495
newTQueueIO :: IO (TQueue a)
7596
newTQueueIO = do
76-
read <- newTVarIO []
77-
write <- newTVarIO []
97+
read <- newTVarIO (End [] [])
98+
write <- newTVarIO (End [] [])
7899
return (TQueue read write)
79100

80-
-- |Write a value to a 'TQueue'.
101+
-- rotate front end = front ++ reverse rear, but the reverse is performed
102+
-- incrementally as the append proceeds.
103+
--
104+
-- Precondition: |front| + 1 >= |rear|. This ensures that when the front
105+
-- list is empty, the rear list has at most one element, so we don't need
106+
-- to reverse it.
107+
rotate :: [a] -> [a] -> [a]
108+
rotate = go []
109+
where
110+
go acc [] rear = rear ++ acc
111+
go acc (x:xs) (r:rs)
112+
= x : go (r:acc) xs rs
113+
go acc xs [] = xs ++ acc
114+
115+
-- | Write a value to a 'TQueue'.
81116
writeTQueue :: TQueue a -> a -> STM ()
82-
writeTQueue (TQueue _read write) a = do
83-
listend <- readTVar write
84-
writeTVar write (a:listend)
85-
86-
-- |Read the next value from the 'TQueue'.
117+
writeTQueue (TQueue read write) a = do
118+
End listend rsched <- readTVar write
119+
let listend' = a : listend
120+
case rsched of
121+
-- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2
122+
_:_:rsched' -> writeTVar write (End listend' rsched')
123+
124+
-- Rotate the queue; the invariant holds trivially.
125+
_ -> do
126+
End listfront _fsched <- readTVar read
127+
let !front' = rotate listfront listend'
128+
writeTVar read (End front' front')
129+
writeTVar write (End [] front')
130+
131+
-- | Read the next value from the 'TQueue'.
87132
readTQueue :: TQueue a -> STM a
88133
readTQueue (TQueue read write) = do
89-
xs <- readTVar read
90-
case xs of
91-
(x:xs') -> do
92-
writeTVar read xs'
93-
return x
94-
[] -> do
95-
ys <- readTVar write
96-
case ys of
97-
[] -> retry
98-
_ -> do
99-
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
100-
-- short, otherwise it will conflict
101-
writeTVar write []
102-
writeTVar read zs
103-
return z
134+
End listfront fsched <- readTVar read
135+
case listfront of
136+
[] -> retry
137+
x:front' ->
138+
case fsched of
139+
-- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2
140+
_:_:fsched' -> writeTVar read (End front' fsched') >> return x
141+
142+
-- Rotate the queue; the invariant holds trivially.
143+
_ -> do
144+
End listend _rsched <- readTVar write
145+
let !front'' = rotate front' listend
146+
writeTVar read (End front'' front'')
147+
writeTVar write (End [] front'')
148+
return x
104149

105150
-- | A version of 'readTQueue' which does not retry. Instead it
106151
-- returns @Nothing@ if no value is available.
@@ -113,44 +158,38 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
113158
-- @since 2.4.5
114159
flushTQueue :: TQueue a -> STM [a]
115160
flushTQueue (TQueue read write) = do
116-
xs <- readTVar read
117-
ys <- readTVar write
118-
unless (null xs) $ writeTVar read []
119-
unless (null ys) $ writeTVar write []
120-
return (xs ++ reverse ys)
161+
End front fsched <- readTVar read
162+
End rear rsched <- readTVar write
163+
unless (null front && null fsched) $ writeTVar read (End [] [])
164+
unless (null rear && null rsched) $ writeTVar write (End [] [])
165+
return (rotate front rear)
121166

122167
-- | Get the next value from the @TQueue@ without removing it,
123168
-- retrying if the channel is empty.
124169
peekTQueue :: TQueue a -> STM a
125-
peekTQueue c = do
126-
x <- readTQueue c
127-
unGetTQueue c x
128-
return x
170+
peekTQueue (TQueue read _write) = do
171+
End front _fsched <- readTVar read
172+
case front of
173+
x:_ -> return x
174+
[] -> retry
129175

130176
-- | A version of 'peekTQueue' which does not retry. Instead it
131177
-- returns @Nothing@ if no value is available.
132178
tryPeekTQueue :: TQueue a -> STM (Maybe a)
133-
tryPeekTQueue c = do
134-
m <- tryReadTQueue c
135-
case m of
136-
Nothing -> return Nothing
137-
Just x -> do
138-
unGetTQueue c x
139-
return m
140-
141-
-- |Put a data item back onto a channel, where it will be the next item read.
179+
tryPeekTQueue (TQueue read _write) = do
180+
End front _fsched <- readTVar read
181+
case front of
182+
x:_ -> return (Just x)
183+
[] -> return Nothing
184+
185+
-- | Put a data item back onto a channel, where it will be the next item read.
142186
unGetTQueue :: TQueue a -> a -> STM ()
143187
unGetTQueue (TQueue read _write) a = do
144-
xs <- readTVar read
145-
writeTVar read (a:xs)
188+
End front fsched <- readTVar read
189+
writeTVar read (End (a:front) (a:a:fsched))
146190

147-
-- |Returns 'True' if the supplied 'TQueue' is empty.
191+
-- | Returns 'True' if the supplied 'TQueue' is empty.
148192
isEmptyTQueue :: TQueue a -> STM Bool
149-
isEmptyTQueue (TQueue read write) = do
150-
xs <- readTVar read
151-
case xs of
152-
(_:_) -> return False
153-
[] -> do ys <- readTVar write
154-
case ys of
155-
[] -> return True
156-
_ -> return False
193+
isEmptyTQueue (TQueue read _write) = do
194+
End front _fsched <- readTVar read
195+
return $! null front

0 commit comments

Comments
 (0)