Skip to content

Commit 9e7e0d1

Browse files
dpwizepoberezkin
andauthored
smp-server: conserve resources (#1194)
* transport: force auth params, remove async wrapper * stricter new messages * bang more thunks * style * don't produce msgQuota unless requested * strict * refactor * remove bangs --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
1 parent d47c099 commit 9e7e0d1

File tree

8 files changed

+35
-25
lines changed

8 files changed

+35
-25
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,9 @@ setNetworkConfig c@AgentClient {useNetworkConfig} cfg' = do
427427
(_, cfg) <- readTVar useNetworkConfig
428428
if cfg == cfg'
429429
then pure False
430-
else True <$ (writeTVar useNetworkConfig $! (slowNetworkConfig cfg', cfg'))
430+
else
431+
let cfgSlow = slowNetworkConfig cfg'
432+
in True <$ (cfgSlow `seq` writeTVar useNetworkConfig (cfgSlow, cfg'))
431433
when changed $ reconnectAllServers c
432434

433435
setUserNetworkInfo :: AgentClient -> UserNetworkInfo -> IO ()

src/Simplex/Messaging/Client.hs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ module Simplex.Messaging.Client
100100
where
101101

102102
import Control.Applicative ((<|>))
103+
import Control.Concurrent (ThreadId, forkFinally, killThread, mkWeakThreadId)
103104
import Control.Concurrent.Async
104105
import Control.Concurrent.STM
105106
import Control.Exception
@@ -138,13 +139,14 @@ import Simplex.Messaging.Transport.KeepAlive
138139
import Simplex.Messaging.Transport.WebSockets (WS)
139140
import Simplex.Messaging.Util (bshow, diffToMicroseconds, ifM, liftEitherWith, raceAny_, threadDelay', tshow, whenM)
140141
import Simplex.Messaging.Version
142+
import System.Mem.Weak (Weak, deRefWeak)
141143
import System.Timeout (timeout)
142144

143145
-- | 'SMPClient' is a handle used to send commands to a specific SMP server.
144146
--
145147
-- Use 'getSMPClient' to connect to an SMP server and create a client handle.
146148
data ProtocolClient v err msg = ProtocolClient
147-
{ action :: Maybe (Async ()),
149+
{ action :: Maybe (Weak ThreadId),
148150
thParams :: THandleParams v 'TClient,
149151
sessionTs :: UTCTime,
150152
client_ :: PClient v err msg
@@ -475,15 +477,14 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
475477
cVar <- newEmptyTMVarIO
476478
let tcConfig = (transportClientConfig networkConfig useHost) {alpn = clientALPN}
477479
username = proxyUsername transportSession
478-
action <-
479-
async $
480-
runTransportClient tcConfig (Just username) useHost port' (Just $ keyHash srv) (client t c cVar)
481-
`finally` atomically (tryPutTMVar cVar $ Left PCENetworkError)
480+
tId <-
481+
runTransportClient tcConfig (Just username) useHost port' (Just $ keyHash srv) (client t c cVar)
482+
`forkFinally` \_ -> void (atomically . tryPutTMVar cVar $ Left PCENetworkError)
482483
c_ <- tcpConnectTimeout `timeout` atomically (takeTMVar cVar)
483484
case c_ of
484-
Just (Right c') -> pure $ Right c' {action = Just action}
485+
Just (Right c') -> mkWeakThreadId tId >>= \tId' -> pure $ Right c' {action = Just tId'}
485486
Just (Left e) -> pure $ Left e
486-
Nothing -> cancel action $> Left PCENetworkError
487+
Nothing -> killThread tId $> Left PCENetworkError
487488

488489
useTransport :: (ServiceName, ATransport)
489490
useTransport = case port srv of
@@ -589,7 +590,7 @@ proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (
589590

590591
-- | Disconnects client from the server and terminates client threads.
591592
closeProtocolClient :: ProtocolClient v err msg -> IO ()
592-
closeProtocolClient = mapM_ uninterruptibleCancel . action
593+
closeProtocolClient = mapM_ (deRefWeak >=> mapM_ killThread) . action
593594
{-# INLINE closeProtocolClient #-}
594595

595596
-- | SMP client error type.

src/Simplex/Messaging/Client/Agent.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1+
{-# LANGUAGE BangPatterns #-}
12
{-# LANGUAGE DuplicateRecordFields #-}
23
{-# LANGUAGE FlexibleInstances #-}
3-
{-# LANGUAGE InstanceSigs #-}
44
{-# LANGUAGE LambdaCase #-}
5-
{-# LANGUAGE MultiWayIf #-}
65
{-# LANGUAGE NamedFieldPuns #-}
76
{-# LANGUAGE OverloadedStrings #-}
87
{-# LANGUAGE RankNTypes #-}
@@ -171,7 +170,8 @@ getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worke
171170
case r of
172171
Right smp -> do
173172
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
174-
let c = (isOwnServer ca srv, smp)
173+
let !owned = isOwnServer ca srv
174+
!c = (owned, smp)
175175
atomically $ do
176176
putTMVar (sessionVar v) (Right c)
177177
TM.insert (sessionId $ thParams smp) c smpSessions

src/Simplex/Messaging/Notifications/Server.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ send :: Transport c => THandleNTF c 'TServer -> NtfServerClient -> IO ()
382382
send h@THandle {params} NtfServerClient {sndQ, sndActiveAt} = forever $ do
383383
t <- atomically $ readTBQueue sndQ
384384
void . liftIO $ tPut h [Right (Nothing, encodeTransmission params t)]
385-
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
385+
atomically . (writeTVar sndActiveAt $!) =<< liftIO getSystemTime
386386

387387
-- instance Show a => Show (TVar a) where
388388
-- show x = unsafePerformIO $ show <$> readTVarIO x

src/Simplex/Messaging/Server.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv
525525
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
526526
forever $ do
527527
ts <- L.toList <$> liftIO (tGet h)
528-
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
528+
atomically . (writeTVar rcvActiveAt $!) =<< liftIO getSystemTime
529529
stats <- asks serverStats
530530
(errs, cmds) <- partitionEithers <$> mapM (cmdAction stats) ts
531531
write sndQ errs
@@ -581,7 +581,7 @@ tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Tran
581581
tSend th Client {sndActiveAt} ts = do
582582
withMVar th $ \h@THandle {params} ->
583583
void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts
584-
atomically . writeTVar sndActiveAt =<< liftIO getSystemTime
584+
atomically . (writeTVar sndActiveAt $!) =<< liftIO getSystemTime
585585

586586
disconnectTransport :: Transport c => THandle v c 'TServer -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO ()
587587
disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do
@@ -1037,15 +1037,16 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
10371037
mkMessage body = do
10381038
msgId <- randomId =<< asks (msgIdBytes . config)
10391039
msgTs <- liftIO getSystemTime
1040-
pure $ Message msgId msgTs msgFlags body
1040+
pure $! Message msgId msgTs msgFlags body
10411041

10421042
expireMessages :: MsgQueue -> M ()
10431043
expireMessages q = do
10441044
msgExp <- asks $ messageExpiration . config
10451045
old <- liftIO $ mapM expireBeforeEpoch msgExp
1046-
stats <- asks serverStats
10471046
deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old
1048-
atomically $ modifyTVar' (msgExpired stats) (+ deleted)
1047+
when (deleted > 0) $ do
1048+
stats <- asks serverStats
1049+
atomically $ modifyTVar' (msgExpired stats) (+ deleted)
10491050

10501051
trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool)
10511052
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg =
@@ -1164,7 +1165,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
11641165
msgTs' = messageTs msg
11651166

11661167
setDelivered :: Sub -> Message -> STM Bool
1167-
setDelivered s msg = tryPutTMVar (delivered s) (messageId msg)
1168+
setDelivered s msg = tryPutTMVar (delivered s) $! messageId msg
11681169

11691170
getStoreMsgQueue :: T.Text -> RecipientId -> M MsgQueue
11701171
getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do

src/Simplex/Messaging/Server/MsgStore/STM.hs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE BangPatterns #-}
12
{-# LANGUAGE ConstraintKinds #-}
23
{-# LANGUAGE FlexibleContexts #-}
34
{-# LANGUAGE FlexibleInstances #-}
@@ -75,7 +76,7 @@ snapshotMsgQueue st rId = TM.lookup rId st >>= maybe (pure []) (snapshotTQueue .
7576
pure msgs
7677

7778
writeMsg :: MsgQueue -> Message -> STM (Maybe Message)
78-
writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} msg = do
79+
writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = do
7980
canWrt <- readTVar canWrite
8081
empty <- isEmptyTQueue q
8182
if canWrt || empty
@@ -85,7 +86,7 @@ writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} msg = do
8586
modifyTVar' size (+ 1)
8687
if canWrt'
8788
then writeTQueue q msg $> Just msg
88-
else writeTQueue q msgQuota $> Nothing
89+
else (writeTQueue q $! msgQuota) $> Nothing
8990
else pure Nothing
9091
where
9192
msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE BangPatterns #-}
12
{-# LANGUAGE DataKinds #-}
23
{-# LANGUAGE FlexibleInstances #-}
34
{-# LANGUAGE GADTs #-}
@@ -69,7 +70,7 @@ secureQueue QueueStore {queues} rId sKey =
6970
readTVar qVar >>= \q -> case senderKey q of
7071
Just k -> pure $ if sKey == k then Just q else Nothing
7172
_ ->
72-
let q' = q {senderKey = Just sKey}
73+
let !q' = q {senderKey = Just sKey}
7374
in writeTVar qVar q' $> Just q'
7475

7576
addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> STM (Either ErrorType QueueRec)

src/Simplex/Messaging/Transport.hs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ module Simplex.Messaging.Transport
8383
where
8484

8585
import Control.Applicative (optional)
86-
import Control.Monad (forM)
86+
import Control.Monad (forM, (<$!>))
8787
import Control.Monad.Except
8888
import Control.Monad.Trans.Except (throwE)
8989
import qualified Data.Aeson.TH as J
@@ -540,12 +540,12 @@ smpClientHandshake c ks_ keyHash@(C.KeyHash kh) smpVRange = do
540540

541541
smpTHandleServer :: forall c. THandleSMP c 'TServer -> VersionSMP -> VersionRangeSMP -> C.PrivateKeyX25519 -> Maybe C.PublicKeyX25519 -> THandleSMP c 'TServer
542542
smpTHandleServer th v vr pk k_ =
543-
let thAuth = THAuthServer {serverPrivKey = pk, sessSecret' = (`C.dh'` pk) <$> k_}
543+
let thAuth = THAuthServer {serverPrivKey = pk, sessSecret' = (`C.dh'` pk) <$!> k_}
544544
in smpTHandle_ th v vr (Just thAuth)
545545

546546
smpTHandleClient :: forall c. THandleSMP c 'TClient -> VersionSMP -> VersionRangeSMP -> Maybe C.PrivateKeyX25519 -> Maybe (C.PublicKeyX25519, (X.CertificateChain, X.SignedExact X.PubKey)) -> THandleSMP c 'TClient
547547
smpTHandleClient th v vr pk_ ck_ =
548-
let thAuth = (\(k, ck) -> THAuthClient {serverPeerPubKey = k, serverCertKey = ck, sessSecret = C.dh' k <$> pk_}) <$> ck_
548+
let thAuth = (\(k, ck) -> THAuthClient {serverPeerPubKey = k, serverCertKey = forceCertChain ck, sessSecret = C.dh' k <$!> pk_}) <$!> ck_
549549
in smpTHandle_ th v vr thAuth
550550

551551
smpTHandle_ :: forall c p. THandleSMP c p -> VersionSMP -> VersionRangeSMP -> Maybe (THandleAuth p) -> THandleSMP c p
@@ -554,6 +554,10 @@ smpTHandle_ th@THandle {params} v vr thAuth =
554554
let params' = params {thVersion = v, thServerVRange = vr, thAuth, implySessId = v >= authCmdsSMPVersion}
555555
in (th :: THandleSMP c p) {params = params'}
556556

557+
{-# INLINE forceCertChain #-}
558+
forceCertChain :: (X.CertificateChain, X.SignedExact T.PubKey) -> (X.CertificateChain, X.SignedExact T.PubKey)
559+
forceCertChain cert@(X.CertificateChain cc, signedKey) = length (show cc) `seq` show signedKey `seq` cert
560+
557561
-- This function is only used with v >= 8, so currently it's a simple record update.
558562
-- It may require some parameters update in the future, to be consistent with smpTHandle_.
559563
smpTHParamsSetVersion :: VersionSMP -> THandleParams SMPVersion p -> THandleParams SMPVersion p

0 commit comments

Comments
 (0)