|
| 1 | +{- | |
| 2 | +Module : EulerHS.ART.DBReplay |
| 3 | +Copyright : (C) Juspay Technologies Pvt Ltd 2019-2022 |
| 4 | +License : Apache 2.0 (see the file LICENSE) |
| 5 | +Maintainer : opensource@juspay.in |
| 6 | +Stability : experimental |
| 7 | +Portability : non-portable |
| 8 | +
|
| 9 | +This module contains interpreters and methods for running `Flow` scenarios. |
| 10 | +-} |
| 11 | + |
| 12 | +{-# LANGUAGE DerivingStrategies #-} |
| 13 | +{-# LANGUAGE RankNTypes #-} |
| 14 | +{-# LANGUAGE ScopedTypeVariables #-} |
| 15 | +{-# LANGUAGE TypeApplications #-} |
| 16 | + |
| 17 | +module EulerHS.ART.DBReplay where |
| 18 | + |
| 19 | +import qualified Data.Aeson as A |
| 20 | +import Data.Either.Extra (mapLeft) |
| 21 | +import Data.Time.Clock.POSIX (getPOSIXTime) |
| 22 | +import qualified Database.Beam as B |
| 23 | +import qualified EulerHS.Language as L |
| 24 | +import EulerHS.Prelude |
| 25 | +import qualified EulerHS.SqlDB.Language as DB |
| 26 | +import EulerHS.Types (DBConfig) |
| 27 | +import qualified EulerHS.Types as T |
| 28 | +import EulerHS.KVConnector.InMemConfig.Flow (searchInMemoryCache) |
| 29 | +import Sequelize (Model, Set (..), Where) |
| 30 | +import qualified Servant as S |
| 31 | +import qualified Data.Serialize as Serialize |
| 32 | +import EulerHS.ART.FlowUtils (addRecToState) |
| 33 | +import qualified EulerHS.ART.EnvVars as Env |
| 34 | +import EulerHS.KVConnector.Types (KVConnector(..), MeshResult, MeshMeta(..), tableName, Source(..)) |
| 35 | +import EulerHS.ART.Types (RunDBEntry(..), RecordingEntry(..),RunInMemEntry(..)) |
| 36 | +import EulerHS.KVConnector.Utils |
| 37 | +import EulerHS.KVConnector.DBSync (whereClauseToJson) |
| 38 | +import EulerHS.SqlDB.Types (BeamRunner, BeamRuntime) |
| 39 | +import qualified EulerHS.ART.ReplayFunctions as ER |
| 40 | +import EulerHS.KVDB.Types (MeshError(..)) |
| 41 | +import EulerHS.PIIEncryption (PII(..)) |
| 42 | +import qualified Data.ByteString.Lazy as BS |
| 43 | + |
| 44 | +getCurrentDateInMillis :: (L.MonadFlow m) => m Int |
| 45 | +getCurrentDateInMillis = L.runIO $ do |
| 46 | + t <- (* 1000) <$> getPOSIXTime |
| 47 | + pure . floor $ t |
| 48 | + |
| 49 | +getLatencyInMicroSeconds :: Integer -> Integer |
| 50 | +getLatencyInMicroSeconds execTime = execTime `div` 1000000 |
| 51 | + |
| 52 | +parseDataReplayList ::(FromJSON b,L.MonadFlow m) => BS.ByteString -> m (Either T.DBError [b]) |
| 53 | +parseDataReplayList res = do |
| 54 | + let eReply = A.eitherDecode res :: (FromJSON b) => Either String (Either T.DBError [b]) |
| 55 | + case eReply of |
| 56 | + Left err -> do |
| 57 | + let errorMessage = "Failed to decode response: " <> encodeUtf8 err |
| 58 | + L.throwException $ S.err400 {S.errBody = errorMessage} |
| 59 | + Right reply -> pure reply |
| 60 | + |
| 61 | +parseDataReplay ::(FromJSON b, L.MonadFlow m) => BS.ByteString -> m (Either MeshError b) |
| 62 | +parseDataReplay res = do |
| 63 | + let eReply = A.eitherDecode res :: (FromJSON b) => Either String (Either MeshError b) |
| 64 | + case eReply of |
| 65 | + Left err -> do |
| 66 | + let errorMessage = "Failed to decode response: " <> encodeUtf8 err |
| 67 | + L.throwException $ S.err400 {S.errBody = errorMessage} |
| 68 | + Right reply -> pure reply |
| 69 | + |
| 70 | +runWithArtFindALL :: |
| 71 | + forall be beM table m. |
| 72 | + (Model be table |
| 73 | + , FromJSON (table Identity) |
| 74 | + , ToJSON (table Identity) |
| 75 | + , KVConnector (table Identity) |
| 76 | + , MeshMeta be table |
| 77 | + , L.MonadFlow m |
| 78 | + ) => |
| 79 | + DBConfig beM -> |
| 80 | + Where be table -> |
| 81 | + Text -> |
| 82 | + m (Either T.DBError [table Identity]) -> |
| 83 | + m (Either T.DBError [table Identity]) |
| 84 | +runWithArtFindALL _dbConf whereClause method hsDbFunc = do |
| 85 | + do |
| 86 | + if Env.isArtReplayEnabled |
| 87 | + then do |
| 88 | + recTimestamp <- L.getCurrentTimeUTC |
| 89 | + msessionId <- L.getLoggerContext "x-request-id" |
| 90 | + resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId |
| 91 | + parseDataReplayList resp |
| 92 | + else do |
| 93 | + tmp_res <- hsDbFunc |
| 94 | + when Env.isArtRecEnabled $ do |
| 95 | + recTimestamp <- L.getCurrentTimeUTC |
| 96 | + addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp) |
| 97 | + pure tmp_res |
| 98 | + |
| 99 | +runWithArtFindAllExtended :: |
| 100 | + forall be beM table m. |
| 101 | + (Model be table |
| 102 | + , FromJSON (table Identity) |
| 103 | + , ToJSON (table Identity) |
| 104 | + , KVConnector (table Identity) |
| 105 | + , MeshMeta be table |
| 106 | + , L.MonadFlow m |
| 107 | + ) => |
| 108 | + DBConfig beM -> |
| 109 | + DB.SqlDB beM [table Identity] -> |
| 110 | + Where be table -> |
| 111 | + Text -> |
| 112 | + m (Either T.DBError [table Identity]) -> |
| 113 | + m (Either T.DBError [table Identity]) |
| 114 | +runWithArtFindAllExtended _dbConf _query whereClause method hsDbFunc = do |
| 115 | + do |
| 116 | + if Env.isArtReplayEnabled |
| 117 | + then do |
| 118 | + recTimestamp <- L.getCurrentTimeUTC |
| 119 | + msessionId <- L.getLoggerContext "x-request-id" |
| 120 | + resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId |
| 121 | + parseDataReplayList resp |
| 122 | + else do |
| 123 | + tmp_res <- hsDbFunc |
| 124 | + when Env.isArtRecEnabled $ do |
| 125 | + recTimestamp <- L.getCurrentTimeUTC |
| 126 | + addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp) |
| 127 | + pure tmp_res |
| 128 | + |
| 129 | +runWithArtFind :: |
| 130 | + forall be beM table m. |
| 131 | + (Model be table |
| 132 | + , KVConnector (table Identity) |
| 133 | + , FromJSON (table Identity) |
| 134 | + , ToJSON (table Identity) |
| 135 | + , MeshMeta be table |
| 136 | + , L.MonadFlow m |
| 137 | + ) => |
| 138 | + DBConfig beM -> |
| 139 | + Where be table -> |
| 140 | + Text -> |
| 141 | + m (Either T.DBError (Maybe (table Identity))) -> |
| 142 | + m (MeshResult (Maybe (table Identity))) |
| 143 | +runWithArtFind _dbConf whereClause method hsDbFunc = do |
| 144 | + do |
| 145 | + if Env.isArtReplayEnabled |
| 146 | + then do |
| 147 | + recTimestamp <- L.getCurrentTimeUTC |
| 148 | + msessionId <- L.getLoggerContext "x-request-id" |
| 149 | + resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId |
| 150 | + pure $ |
| 151 | + case A.decode resp of |
| 152 | + Just val -> val |
| 153 | + Nothing -> Right Nothing |
| 154 | + else do |
| 155 | + res <- hsDbFunc |
| 156 | + when Env.isArtRecEnabled $ do |
| 157 | + recTimestamp <- L.getCurrentTimeUTC |
| 158 | + addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON res) recTimestamp) |
| 159 | + pure $ mapLeft MDBError $ res |
| 160 | + |
| 161 | +runWithArtUpdate :: |
| 162 | + forall be beM a table m. |
| 163 | + (Model be table |
| 164 | + , FromJSON a |
| 165 | + , ToJSON a |
| 166 | + , KVConnector (table Identity) |
| 167 | + , MeshMeta be table |
| 168 | + , L.MonadFlow m |
| 169 | + ) => |
| 170 | + DBConfig beM -> |
| 171 | + [Set be table] -> |
| 172 | + Where be table -> |
| 173 | + Text -> |
| 174 | + m (T.DBResult a) -> |
| 175 | + m (MeshResult a) |
| 176 | +runWithArtUpdate _ setClause whereClause method hsDbFunc = do |
| 177 | + do |
| 178 | + if Env.isArtReplayEnabled |
| 179 | + then do |
| 180 | + recTimestamp <- L.getCurrentTimeUTC |
| 181 | + msessionId <- L.getLoggerContext "x-request-id" |
| 182 | + resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method (toJSON (jsonKeyValueUpdates setClause)) (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId |
| 183 | + parseDataReplay resp |
| 184 | + else do |
| 185 | + tmp_res <- hsDbFunc |
| 186 | + when Env.isArtRecEnabled $ do |
| 187 | + recTimestamp <- L.getCurrentTimeUTC |
| 188 | + addRecToState $ RunDBEntryT (RunDBEntry method (toJSON (jsonKeyValueUpdates setClause)) (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp) |
| 189 | + pure $ mapLeft MDBError $ tmp_res |
| 190 | + |
| 191 | +runWithArtCreatemSQl :: |
| 192 | + forall beM a table m. |
| 193 | + ( ToJSON (table Identity) |
| 194 | + , FromJSON a |
| 195 | + , ToJSON a |
| 196 | + , KVConnector (table Identity) |
| 197 | + , L.MonadFlow m |
| 198 | + ) => |
| 199 | + DBConfig beM -> |
| 200 | + table Identity -> |
| 201 | + Text -> |
| 202 | + m (T.DBResult a) -> |
| 203 | + m (MeshResult a) |
| 204 | +runWithArtCreatemSQl _ value method hsDbFunc = do |
| 205 | + do |
| 206 | + if Env.isArtReplayEnabled |
| 207 | + then do |
| 208 | + recTimestamp <- L.getCurrentTimeUTC |
| 209 | + msessionId <- L.getLoggerContext "x-request-id" |
| 210 | + resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method (toJSON value) A.Null (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId |
| 211 | + parseDataReplay resp |
| 212 | + else do |
| 213 | + tmp_res <- hsDbFunc |
| 214 | + when Env.isArtRecEnabled $ do |
| 215 | + recTimestamp <- L.getCurrentTimeUTC |
| 216 | + addRecToState $ RunDBEntryT (RunDBEntry method (toJSON value) A.Null (tableName @(table Identity)) (toJSON tmp_res) recTimestamp) |
| 217 | + pure $ mapLeft MDBError $ tmp_res |
| 218 | + |
| 219 | +runWithArtDelete :: |
| 220 | + forall be beM a table m. |
| 221 | + (Model be table |
| 222 | + , FromJSON a |
| 223 | + , ToJSON a |
| 224 | + , KVConnector (table Identity) |
| 225 | + , MeshMeta be table |
| 226 | + , L.MonadFlow m |
| 227 | + ) => |
| 228 | + DBConfig beM -> |
| 229 | + Where be table -> |
| 230 | + Text -> |
| 231 | + m (T.DBResult a) -> |
| 232 | + m (MeshResult a) |
| 233 | +runWithArtDelete _ whereClause method hsDbFunc = do |
| 234 | + do |
| 235 | + if Env.isArtReplayEnabled |
| 236 | + then do |
| 237 | + recTimestamp <- L.getCurrentTimeUTC |
| 238 | + msessionId <- L.getLoggerContext "x-request-id" |
| 239 | + resp <- L.runIO $ ER.callBrahmaReplayDB (RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (A.Null) recTimestamp)) msessionId |
| 240 | + parseDataReplay resp |
| 241 | + else do |
| 242 | + tmp_res <- hsDbFunc |
| 243 | + when Env.isArtRecEnabled $ do |
| 244 | + recTimestamp <- L.getCurrentTimeUTC |
| 245 | + addRecToState $ RunDBEntryT (RunDBEntry method A.Null (whereClauseToJson whereClause) (tableName @(table Identity)) (toJSON tmp_res) recTimestamp) |
| 246 | + pure $ mapLeft MDBError $ tmp_res |
| 247 | + |
| 248 | +searchInMemoryCacheRecRepWrapper :: forall be beM table m. |
| 249 | + ( |
| 250 | + BeamRuntime be beM, |
| 251 | + BeamRunner beM, |
| 252 | + B.HasQBuilder be, |
| 253 | + HasCallStack, |
| 254 | + KVConnector (table Identity), |
| 255 | + ToJSON (table Identity), |
| 256 | + Show (table Identity), |
| 257 | + Serialize.Serialize (table Identity), |
| 258 | + FromJSON (table Identity), |
| 259 | + Model be table, |
| 260 | + MeshMeta be table, |
| 261 | + PII table, |
| 262 | + L.MonadFlow m |
| 263 | + ) => Text -> |
| 264 | + DBConfig beM -> |
| 265 | + Where be table -> |
| 266 | + m (Source, MeshResult (Maybe (table Identity))) |
| 267 | +searchInMemoryCacheRecRepWrapper method dbConf whereClause = do |
| 268 | + if Env.isArtReplayEnabled |
| 269 | + then do |
| 270 | + recTimestamp <- L.getCurrentTimeUTC |
| 271 | + let recInmem = RunInMemEntryT (RunInMemEntry method A.Null (whereClauseToJson whereClause) (toJSON $ tableName @(table Identity)) (Left A.Null) recTimestamp) |
| 272 | + msessionId <- L.getLoggerContext "x-request-id" |
| 273 | + resp <- L.runIO $ ER.callBrahmaReplayDB recInmem msessionId |
| 274 | + meshRes <- parseDataReplay resp |
| 275 | + pure (IN_MEM,meshRes) |
| 276 | + else do |
| 277 | + (src,meshResult) <- searchInMemoryCache dbConf whereClause |
| 278 | + when Env.isArtRecEnabled $ do |
| 279 | + recTimestamp <- L.getCurrentTimeUTC |
| 280 | + addRecToState $ RunInMemEntryT (RunInMemEntry method A.Null (whereClauseToJson whereClause) (toJSON $ tableName @(table Identity)) (either (Left . toJSON) (Right . toJSON) meshResult) recTimestamp) |
| 281 | + pure (src,meshResult) |
0 commit comments