1
1
{-# LANGUAGE OverloadedStrings #-}
2
+ {-# OPTIONS_GHC -fno-warn-orphans #-}
2
3
3
4
module Stub where
4
5
5
6
6
7
import Data.Bifunctor
7
8
import Data.ByteString as BS
8
9
import Data.Text
10
+ import Data.Text.Lazy as TL
9
11
import Data.Text.Encoding
12
+ import Data.IORef ( readIORef
13
+ , newIORef
14
+ , modifyIORef
15
+ , writeIORef
16
+ )
10
17
import Data.Vector as Vector
11
18
( Vector
12
19
, length
@@ -20,12 +27,13 @@ import Data.IORef (readIORef, newIORef, modifyIORef
20
27
import Control.Monad.Except (ExceptT (.. ), runExceptT )
21
28
22
29
import qualified Peer.ChaincodeShim as Pb
30
+ import qualified Ledger.Queryresult.KvQueryResult as Pb
23
31
24
32
import Network.GRPC.HighLevel
25
33
import Google.Protobuf.Timestamp as Pb
26
34
import Peer.Proposal as Pb
27
35
import Proto3.Suite
28
- import Proto3.Wire.Decode
36
+ import Proto3.Wire.Decode
29
37
30
38
import Interfaces
31
39
import Messages
@@ -116,7 +124,7 @@ instance ChaincodeStubInterface DefaultChaincodeStub where
116
124
117
125
-- TODO: Implement better error handling/checks etc
118
126
-- getStateByRange :: ccs -> Text -> Text -> IO (Either Error StateQueryIterator)
119
- getStateByRange ccs startKey endKey =
127
+ getStateByRange ccs startKey endKey =
120
128
let payload = getStateByRangePayload startKey endKey
121
129
message = buildChaincodeMessage GET_STATE_BY_RANGE payload (txId ccs) (channelId ccs)
122
130
-- We have listenForResponse a :: IO (Either Error ByteString)
@@ -126,33 +134,43 @@ instance ChaincodeStubInterface DefaultChaincodeStub where
126
134
bsToSqi :: ByteString -> ExceptT Error IO StateQueryIterator
127
135
bsToSqi bs = let eeaQueryResponse = parse (decodeMessage (FieldNumber 1 )) bs :: Either ParseError Pb. QueryResponse in
128
136
case eeaQueryResponse of
129
- Left _ -> ExceptT $ pure $ Left ParseError
130
- Right queryResponse -> ExceptT $ do
131
- -- queryResponse and currentLoc are IORefs as they need to be mutated
132
- -- as a part of the next() function
133
- queryResponseIORef <- newIORef queryResponse
134
- currentLocIORef <- newIORef 0
135
- pure $ Right StateQueryIterator {
136
- sqiChannelId = getChannelId ccs
137
- , sqiTxId = getTxId ccs
138
- , sqiResponse = queryResponseIORef
139
- , sqiCurrentLoc = currentLocIORef
140
- }
141
- in do
142
- e <- (sendStream ccs) message
143
- case e of
144
- Left err -> error (" Error while streaming: " ++ show err)
145
- Right _ -> pure ()
146
- runExceptT $ ExceptT (listenForResponse (recvStream ccs)) >>= bsToSqi
137
+ -- TODO: refactor out pattern matching, e.g. using >>= or <*>
138
+ Left err -> ExceptT $ pure $ Left $ DecodeError err
139
+ Right queryResponse -> ExceptT $ do
140
+ -- queryResponse and currentLoc are IORefs as they need to be mutated
141
+ -- as a part of the next() function
142
+ queryResponseIORef <- newIORef queryResponse
143
+ currentLocIORef <- newIORef 0
144
+ pure $ Right StateQueryIterator
145
+ { sqiChaincodeStub = ccs
146
+ , sqiChannelId = getChannelId ccs
147
+ , sqiTxId = getTxId ccs
148
+ , sqiResponse = queryResponseIORef
149
+ , sqiCurrentLoc = currentLocIORef
150
+ }
151
+ in do
152
+ e <- (sendStream ccs) message
153
+ case e of
154
+ Left err -> error (" Error while streaming: " ++ show err)
155
+ Right _ -> pure ()
156
+ runExceptT $ ExceptT (listenForResponse (recvStream ccs)) >>= bsToSqi
147
157
148
- -- TODO : implement all these interface functions
158
+ -- TODO : implement all these interface functions
149
159
instance StateQueryIteratorInterface StateQueryIterator where
150
- -- hasNext :: sqi -> Bool
151
- hasNext sqi = True
160
+ -- hasNext :: sqi -> IO Bool
161
+ hasNext sqi = do
162
+ queryResponse <- readIORef $ sqiResponse sqi
163
+ currentLoc <- readIORef $ sqiCurrentLoc sqi
164
+ pure $ currentLoc < Prelude. length (Pb. queryResponseResults queryResponse) || (Pb. queryResponseHasMore queryResponse)
152
165
-- close :: sqi -> IO (Maybe Error)
153
166
close _ = pure Nothing
154
167
-- next :: sqi -> IO (Either Error Pb.KV)
155
- next _ = pure $ Left $ Error " not implemented"
168
+ next sqi = do
169
+ eeQueryResultBytes <- nextResult sqi
170
+ case eeQueryResultBytes of
171
+ Left _ -> pure $ Left $ Error " Error getting next queryResultBytes"
172
+ Right queryResultBytes -> pure $ first DecodeError (parse (decodeMessage (FieldNumber 1 )) (Pb. queryResultBytesResultBytes queryResultBytes) :: Either ParseError Pb. KV )
173
+
156
174
157
175
nextResult :: StateQueryIterator -> IO (Either Error Pb. QueryResultBytes )
158
176
nextResult sqi = do
@@ -171,12 +189,37 @@ nextResult sqi = do
171
189
queryResult
172
190
else pure $ Left $ Error " Invalid iterator state"
173
191
174
- -- TODO : this function is only called when the local result list has been
192
+
193
+ -- This function is only called when the local result list has been
175
194
-- iterated through and there are more results to get from the peer
176
- -- It makes a call to get the next QueryResponse back from the peer
177
- -- and mutates the response with the new QueryResponse and set currentLoc back to 0
195
+ -- It makes a call to get the next QueryResponse back from the peer
196
+ -- and mutates the sqi with the new QueryResponse and sets currentLoc back to 0
178
197
fetchNextQueryResult :: StateQueryIterator -> IO (Either Error StateQueryIterator )
179
- fetchNextQueryResult sqi = pure $ Left $ Error " not yet implemented"
198
+ fetchNextQueryResult sqi = do
199
+ queryResponse <- readIORef $ sqiResponse sqi
200
+ let
201
+ payload = queryNextStatePayload $ TL. toStrict $ Pb. queryResponseId queryResponse
202
+ message = buildChaincodeMessage QUERY_STATE_NEXT payload (sqiTxId sqi) (sqiChannelId sqi)
203
+ bsToQueryResponse :: ByteString -> ExceptT Error IO StateQueryIterator
204
+ bsToQueryResponse bs =
205
+ let eeaQueryResponse =
206
+ parse (decodeMessage (FieldNumber 1 )) bs :: Either
207
+ ParseError
208
+ Pb. QueryResponse
209
+ in case eeaQueryResponse of
210
+ -- TODO: refactor out pattern matching, e.g. using >>= or <*>
211
+ Left err -> ExceptT $ pure $ Left $ DecodeError err
212
+ Right queryResponse -> ExceptT $ do
213
+ -- Need to put the new queryResponse in the sqi queryResponse
214
+ writeIORef (sqiCurrentLoc sqi) 0
215
+ writeIORef (sqiResponse sqi) queryResponse
216
+ pure $ Right sqi
217
+ in do
218
+ e <- (sendStream $ sqiChaincodeStub sqi) message
219
+ case e of
220
+ Left err -> error (" Error while streaming: " ++ show err)
221
+ Right _ -> pure ()
222
+ runExceptT $ ExceptT (listenForResponse (recvStream $ sqiChaincodeStub sqi)) >>= bsToQueryResponse
180
223
181
224
--
182
225
-- -- getStateByRangeWithPagination :: ccs -> String -> String -> Int32 -> String -> Either Error (StateQueryIterator, Pb.QueryResponseMetadata)
@@ -249,4 +292,4 @@ fetchNextQueryResult sqi = pure $ Left $ Error "not yet implemented"
249
292
-- getTxTimestamp ccs = Right txTimestamp
250
293
--
251
294
-- -- setEvent :: ccs -> String -> ByteArray -> Maybe Error
252
- -- setEvent ccs = Right notImplemented
295
+ -- setEvent ccs = Right notImplemented
0 commit comments