Skip to content

Commit 10ada61

Browse files
committed
🚧: add authorized websocket
1 parent f275b69 commit 10ada61

File tree

3 files changed

+174
-8
lines changed

3 files changed

+174
-8
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2019 namjug-kim
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.njkim.reactivecrypto.binance
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper
20+
import com.fasterxml.jackson.module.kotlin.readValue
21+
import com.njkim.reactivecrypto.binance.model.BinanceOrderBook
22+
import com.njkim.reactivecrypto.binance.model.BinanceResponseWrapper
23+
import com.njkim.reactivecrypto.binance.model.BinanceTickData
24+
import com.njkim.reactivecrypto.core.websocket.AbstractExchangeWebsocketClient
25+
import com.njkim.reactivecrypto.core.ExchangeJsonObjectMapper
26+
import com.njkim.reactivecrypto.core.common.model.ExchangeVendor
27+
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
28+
import com.njkim.reactivecrypto.core.common.model.order.OrderBook
29+
import com.njkim.reactivecrypto.core.common.model.order.TickData
30+
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType.BUY
31+
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType.SELL
32+
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
33+
import mu.KotlinLogging
34+
import reactor.core.publisher.Flux
35+
import reactor.netty.http.client.HttpClient
36+
import java.time.ZonedDateTime
37+
import java.util.stream.Collectors
38+
39+
class BinancePrivateWebsocketClient : AbstractExchangeWebsocketClient() {
40+
private val log = KotlinLogging.logger {}
41+
42+
private val baseUri = "wss://stream.binance.com:9443"
43+
44+
private val objectMapper: ObjectMapper = createJsonObjectMapper().objectMapper()
45+
46+
override fun createJsonObjectMapper(): ExchangeJsonObjectMapper {
47+
return BinanceJsonObjectMapper()
48+
}
49+
50+
override fun createTradeWebsocket(subscribeTargets: List<CurrencyPair>): Flux<TickData> {
51+
val streams = subscribeTargets.stream()
52+
.map { "${it.baseCurrency}${it.quoteCurrency}" }
53+
.map { it.toLowerCase() + "@trade" }
54+
.collect(Collectors.joining("/"))
55+
56+
return HttpClient.create()
57+
.wiretap(log.isDebugEnabled)
58+
.websocket()
59+
.uri("$baseUri/stream?streams=$streams")
60+
.handle { inbound, _ -> inbound.receive().asString() }
61+
.map { objectMapper.readValue<BinanceResponseWrapper<BinanceTickData>>(it) }
62+
.map { it.data }
63+
.map { binanceTradeRawData ->
64+
TickData(
65+
binanceTradeRawData.tradeId.toString() + binanceTradeRawData.currencyPair + binanceTradeRawData.eventTime.toEpochMilli(),
66+
binanceTradeRawData.eventTime,
67+
binanceTradeRawData.price,
68+
binanceTradeRawData.quantity,
69+
binanceTradeRawData.currencyPair,
70+
ExchangeVendor.BINANCE,
71+
if (binanceTradeRawData.isMarketMaker) SELL else BUY
72+
)
73+
}
74+
}
75+
76+
override fun createDepthSnapshot(subscribeTargets: List<CurrencyPair>): Flux<OrderBook> {
77+
val streams = subscribeTargets.stream()
78+
.map { "${it.baseCurrency}${it.quoteCurrency}" }
79+
.map { it.toLowerCase() + "@depth20" }
80+
.collect(Collectors.joining("/"))
81+
82+
return HttpClient.create()
83+
.wiretap(log.isDebugEnabled)
84+
.websocket()
85+
.uri("$baseUri/stream?streams=$streams")
86+
.handle { inbound, _ -> inbound.receive().asString() }
87+
.map { objectMapper.readValue<BinanceResponseWrapper<BinanceOrderBook>>(it) }
88+
.map {
89+
OrderBook(
90+
"${it.data.lastUpdateId}",
91+
it.getCurrencyPair(),
92+
ZonedDateTime.now(),
93+
ExchangeVendor.BINANCE,
94+
it.data.getBids(),
95+
it.data.getAsks()
96+
)
97+
}
98+
}
99+
}

reactive-crypto-binance/src/main/kotlin/com/njkim/reactivecrypto/binance/http/raw/BinanceRawUserDataOperator.kt

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package com.njkim.reactivecrypto.binance.http.raw
1919
import com.fasterxml.jackson.module.kotlin.convertValue
2020
import com.njkim.reactivecrypto.binance.BinanceJsonObjectMapper
2121
import com.njkim.reactivecrypto.binance.model.BinanceAccountResponse
22+
import com.njkim.reactivecrypto.binance.model.BinanceListenKeyResponse
2223
import com.njkim.reactivecrypto.binance.model.BinanceOrderInfoResponse
2324
import com.njkim.reactivecrypto.binance.model.BinanceTradeInfoResponse
2425
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
@@ -70,12 +71,12 @@ class BinanceRawUserDataOperator internal constructor(private val webClient: Web
7071
limit: Int = 500,
7172
recvWindow: Long = 5000
7273
): Flux<BinanceOrderInfoResponse> {
73-
val request = mapOf(
74+
val request = mutableMapOf(
7475
"symbol" to symbol,
7576
"limit" to limit,
7677
"recvWindow" to recvWindow,
7778
"timestamp" to Instant.now().toEpochMilli()
78-
).toMutableMap()
79+
)
7980
orderId?.let { request["orderId"] = orderId }
8081
startTime?.let { request["startTime"] = startTime }
8182
endTime?.let { request["endTime"] = endTime }
@@ -104,10 +105,10 @@ class BinanceRawUserDataOperator internal constructor(private val webClient: Web
104105
symbol: CurrencyPair? = null,
105106
recvWindow: Long = 5000
106107
): Flux<BinanceOrderInfoResponse> {
107-
val request = mapOf<String, Any>(
108+
val request = mutableMapOf<String, Any>(
108109
"recvWindow" to recvWindow,
109110
"timestamp" to Instant.now().toEpochMilli()
110-
).toMutableMap()
111+
)
111112
symbol?.let { request["symbol"] = symbol }
112113

113114
val convertedRequest = BinanceJsonObjectMapper.instance.convertValue<Map<String, Any>>(request)
@@ -137,11 +138,11 @@ class BinanceRawUserDataOperator internal constructor(private val webClient: Web
137138
origClientOrderId: String? = null,
138139
recvWindow: Long = 5000
139140
): Mono<BinanceOrderInfoResponse> {
140-
val request = mapOf(
141+
val request = mutableMapOf(
141142
"symbol" to symbol,
142143
"recvWindow" to recvWindow,
143144
"timestamp" to Instant.now().toEpochMilli()
144-
).toMutableMap()
145+
)
145146
orderId?.let { request["orderId"] = orderId }
146147
origClientOrderId?.let { request["origClientOrderId"] = origClientOrderId }
147148

@@ -175,12 +176,12 @@ class BinanceRawUserDataOperator internal constructor(private val webClient: Web
175176
limit: Int = 500,
176177
recvWindow: Long = 5000
177178
): Flux<BinanceTradeInfoResponse> {
178-
val request = mapOf<String, Any>(
179+
val request = mutableMapOf(
179180
"symbol" to symbol,
180181
"recvWindow" to recvWindow,
181182
"timestamp" to Instant.now().toEpochMilli(),
182183
"limit" to limit
183-
).toMutableMap()
184+
)
184185
startTime?.let { request["startTime"] = startTime }
185186
endTime?.let { request["endTime"] = endTime }
186187
fromId?.let { request["fromId"] = fromId }
@@ -197,4 +198,42 @@ class BinanceRawUserDataOperator internal constructor(private val webClient: Web
197198
.retrieve()
198199
.bodyToFlux()
199200
}
201+
202+
/**
203+
* Start a new user data stream. The stream will close after 60 minutes unless a keepalive is sent.
204+
* If the account has an active listenKey, that listenKey will be returned and its validity will be extended for 60 minutes.
205+
*
206+
* Weight: 1
207+
*/
208+
fun createListenKey(): Mono<BinanceListenKeyResponse> {
209+
return webClient.post()
210+
.uri {
211+
it.path("/api/v3/userDataStream")
212+
.build()
213+
}
214+
.retrieve()
215+
.bodyToMono()
216+
}
217+
218+
/**
219+
* Keepalive a user data stream to prevent a time out.
220+
* User data streams will close after 60 minutes.
221+
* It's recommended to send a ping about every 30 minutes.
222+
*
223+
* Weight: 1
224+
*/
225+
fun keepAliveListenKey(listenKey: String): Mono<BinanceListenKeyResponse> {
226+
val request = mapOf(
227+
"listenKey" to listenKey
228+
)
229+
230+
return webClient.put()
231+
.uri {
232+
it.path("/api/v3/userDataStream")
233+
.queryParams(request.toMultiValueMap())
234+
.build()
235+
}
236+
.retrieve()
237+
.bodyToMono()
238+
}
200239
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2019 namjug-kim
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.njkim.reactivecrypto.binance.model
18+
19+
import com.fasterxml.jackson.annotation.JsonProperty
20+
import com.njkim.reactivecrypto.core.common.model.currency.Currency
21+
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
22+
import java.math.BigDecimal
23+
import java.time.ZonedDateTime
24+
25+
data class BinanceListenKeyResponse(
26+
@JsonProperty("listenKey")
27+
val listenKey: String
28+
)

0 commit comments

Comments
 (0)