Skip to content

Commit 6122754

Browse files
authored
Merge pull request #4 from namjug-kim/feature/add_exchange_okex
Add exchange okex
2 parents 8f59898 + c53500e commit 6122754

File tree

19 files changed

+723
-17
lines changed

19 files changed

+723
-17
lines changed

README.md

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,28 @@ A Kotlin library for cryptocurrency trading.
66
## Supported Exchanges
77

88
### Websocket
9-
| Exchange | orderBook | tickData |
10-
|----------------|------|------|
11-
| Binance | Done | Done |
12-
| Upbit | Done | Done |
13-
| HuobiKorea | Done | Done |
9+
Support public market feature (tickData, orderBook)
10+
11+
| Exchange | ver | doc |
12+
|----------------|---|---|
13+
| Binance | * | [ws](https://github.com/binance-exchange/binance-official-api-docs/blob/master/web-socket-streams.md)| Done |
14+
| Upbit | v1.0.3 | [ws](https://docs.upbit.com/docs/upbit-quotation-websocket) | Done |
15+
| HuobiKorea | * | [ws](https://github.com/alphaex-api/BAPI_Docs_ko/wiki) | Done |
16+
| Okex | v3 | [ws](https://www.okex.com/docs/en/#spot_ws-all) | Done |
17+
18+
### Api
19+
| Exchange | ver | doc |
20+
|----------------|---|---|
21+
| | |
1422

1523
## Install
1624

17-
## Usage
25+
### Maven
26+
27+
### Gradle
28+
29+
## Usage
30+
31+
### Kotlin
32+
33+
### Java

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ subprojects {
5454
compile("org.apache.logging.log4j:log4j-core:${log4j2Version}")
5555
compile("org.apache.logging.log4j:log4j-slf4j-impl:${log4j2Version}")
5656

57-
// unit test
57+
// test
5858
testCompile group: 'junit', name: 'junit', version: '4.11'
5959
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.11.1'
60-
60+
testCompile group: 'io.projectreactor', name: 'reactor-test', version: reactorVersion
6161
}
6262
}

reactive-crypto-binance/src/main/kotlin/com/njkim/reactivecrypto/binance/model/BinanceOrderBook.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ data class BinanceOrderBook(
1616
OrderBookUnit(
1717
BigDecimal(objects[0]),
1818
BigDecimal(objects[1]),
19-
OrderSideType.BID
19+
OrderSideType.BID,
20+
null
2021
)
2122
}
2223
.toList()
@@ -28,7 +29,8 @@ data class BinanceOrderBook(
2829
OrderBookUnit(
2930
BigDecimal(objects[0]),
3031
BigDecimal(objects[1]),
31-
OrderSideType.ASK
32+
OrderSideType.ASK,
33+
null
3234
)
3335
}
3436
.toList()

reactive-crypto-core/src/main/kotlin/com/njkim/reactivecrypto/core/common/model/ExchangeVendor.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ package com.njkim.reactivecrypto.core.common.model
33
enum class ExchangeVendor(val implementedClassName: String) {
44
UPBIT("com.njkim.reactivecrypto.upbit.UpbitWebsocketClient"),
55
BINANCE("com.njkim.reactivecrypto.binance.BinanceWebsocketClient"),
6-
HUOBI_KOREA("com.njkim.reactivecrypto.huobikorea.HuobiKoreaWebsocketClient");
6+
HUOBI_KOREA("com.njkim.reactivecrypto.huobikorea.HuobiKoreaWebsocketClient"),
7+
OKEX("com.njkim.reactivecrypto.okex.OkexWebsocketClient");
78
}

reactive-crypto-core/src/main/kotlin/com/njkim/reactivecrypto/core/common/model/order/OrderBookUnit.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package com.njkim.reactivecrypto.core.common.model.order
22

33
import java.math.BigDecimal
44

5+
/**
6+
* @property orderNumbers the number of orders placed at limit order.
7+
*/
58
data class OrderBookUnit(
69
val price: BigDecimal,
710
val quantity: BigDecimal,
8-
val orderSideType: OrderSideType
11+
val orderSideType: OrderSideType,
12+
val orderNumbers: Int?
913
)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.njkim.reactivecrypto.core.common.model.order
2+
3+
enum class TradeSideType {
4+
SELL,
5+
BUY
6+
}

reactive-crypto-huobikorea/src/main/kotlin/com/njkim/reactivecrypto/huobikorea/model/HuobiOrderBook.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ data class HuobiOrderBook(
1515
OrderBookUnit(
1616
BigDecimal(objects[0]),
1717
BigDecimal(objects[1]),
18-
OrderSideType.BID
18+
OrderSideType.BID,
19+
null
1920
)
2021
}
2122
.toList()
@@ -27,7 +28,8 @@ data class HuobiOrderBook(
2728
OrderBookUnit(
2829
BigDecimal(objects[0]),
2930
BigDecimal(objects[1]),
30-
OrderSideType.ASK
31+
OrderSideType.ASK,
32+
null
3133
)
3234
}
3335
.toList()

reactive-crypto-okex/build.gradle

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
apply plugin: 'kotlin'
2+
apply plugin: 'org.jetbrains.kotlin.jvm'
3+
4+
version '1.0-SNAPSHOT'
5+
6+
dependencies {
7+
compile project(':reactive-crypto-core')
8+
9+
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
10+
11+
compile group: 'org.apache.commons', name: 'commons-compress', version: '1.18'
12+
}
13+
14+
compileKotlin {
15+
kotlinOptions.jvmTarget = "1.8"
16+
}
17+
compileTestKotlin {
18+
kotlinOptions.jvmTarget = "1.8"
19+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.njkim.reactivecrypto.okex
2+
3+
import com.fasterxml.jackson.core.JsonParser
4+
import com.fasterxml.jackson.core.JsonProcessingException
5+
import com.fasterxml.jackson.databind.DeserializationContext
6+
import com.fasterxml.jackson.databind.DeserializationFeature
7+
import com.fasterxml.jackson.databind.JsonDeserializer
8+
import com.fasterxml.jackson.databind.ObjectMapper
9+
import com.fasterxml.jackson.databind.module.SimpleModule
10+
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
11+
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
12+
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType
13+
import java.io.IOException
14+
import java.math.BigDecimal
15+
import java.time.ZonedDateTime
16+
17+
class OkexJsonObjectMapper {
18+
companion object {
19+
val instance = OkexJsonObjectMapper().objectMapper()
20+
}
21+
22+
private fun objectMapper(): ObjectMapper {
23+
val simpleModule = SimpleModule()
24+
25+
simpleModule.addDeserializer(ZonedDateTime::class.java, object : JsonDeserializer<ZonedDateTime>() {
26+
@Throws(IOException::class, JsonProcessingException::class)
27+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): ZonedDateTime {
28+
return ZonedDateTime.parse(p.valueAsString)
29+
}
30+
})
31+
32+
simpleModule.addDeserializer(BigDecimal::class.java, object : JsonDeserializer<BigDecimal>() {
33+
@Throws(IOException::class, JsonProcessingException::class)
34+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): BigDecimal {
35+
return BigDecimal(p.valueAsString)
36+
}
37+
})
38+
39+
simpleModule.addDeserializer(CurrencyPair::class.java, object : JsonDeserializer<CurrencyPair>() {
40+
@Throws(IOException::class, JsonProcessingException::class)
41+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): CurrencyPair {
42+
val split = p.valueAsString.split("-")
43+
val targetCurrency = split[0]
44+
val baseCurrency = split[1]
45+
46+
return CurrencyPair.parse(targetCurrency, baseCurrency)
47+
}
48+
})
49+
50+
simpleModule.addDeserializer(TradeSideType::class.java, object : JsonDeserializer<TradeSideType>() {
51+
@Throws(IOException::class, JsonProcessingException::class)
52+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): TradeSideType {
53+
val valueAsString = p.valueAsString
54+
return TradeSideType.valueOf(valueAsString.toUpperCase())
55+
}
56+
})
57+
58+
val objectMapper = ObjectMapper().registerKotlinModule()
59+
objectMapper.registerModule(simpleModule)
60+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
61+
return objectMapper
62+
}
63+
64+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package com.njkim.reactivecrypto.okex
2+
3+
import com.fasterxml.jackson.module.kotlin.readValue
4+
import com.njkim.reactivecrypto.core.ExchangeWebsocketClient
5+
import com.njkim.reactivecrypto.core.common.model.ExchangeVendor
6+
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
7+
import com.njkim.reactivecrypto.core.common.model.order.OrderBook
8+
import com.njkim.reactivecrypto.core.common.model.order.OrderBookUnit
9+
import com.njkim.reactivecrypto.core.common.model.order.TickData
10+
import com.njkim.reactivecrypto.okex.model.OkexOrderBookWrapper
11+
import com.njkim.reactivecrypto.okex.model.OkexTickDataWrapper
12+
import io.netty.buffer.ByteBuf
13+
import io.netty.buffer.ByteBufInputStream
14+
import io.netty.channel.ChannelHandlerContext
15+
import io.netty.handler.codec.ByteToMessageDecoder
16+
import mu.KotlinLogging
17+
import org.apache.commons.compress.compressors.deflate64.Deflate64CompressorInputStream
18+
import org.springframework.util.StreamUtils
19+
import reactor.core.publisher.Flux
20+
import reactor.netty.http.client.HttpClient
21+
import java.math.BigDecimal
22+
import java.nio.charset.Charset
23+
import java.util.concurrent.ConcurrentHashMap
24+
import kotlin.streams.toList
25+
26+
27+
class OkexWebsocketClient : ExchangeWebsocketClient {
28+
29+
private val log = KotlinLogging.logger {}
30+
31+
private val baseUri = "wss://real.okex.com:10442/ws/v3"
32+
33+
override fun createDepthSnapshot(subscribeTargets: List<CurrencyPair>): Flux<OrderBook> {
34+
val subscribeMessages = subscribeTargets.stream()
35+
.map { "${it.targetCurrency.name}-${it.baseCurrency.name}" }
36+
.map { "{\"op\": \"subscribe\", \"args\": [\"spot/depth:$it\"]}" }
37+
.toList()
38+
39+
val currentOrderBookMap: MutableMap<CurrencyPair, OrderBook> = ConcurrentHashMap()
40+
41+
return HttpClient.create()
42+
.wiretap(log.isDebugEnabled)
43+
.tcpConfiguration { tcp -> tcp.doOnConnected { connection -> connection.addHandler(Deflat64Decoder()) } }
44+
.websocket()
45+
.uri(baseUri)
46+
.handle { inbound, outbound ->
47+
outbound.sendString(Flux.fromIterable<String>(subscribeMessages))
48+
.then()
49+
.thenMany(inbound.receive().asString())
50+
}
51+
.doOnNext { log.debug { it } }
52+
.filter { it.contains("\"spot/depth\"") }
53+
.map { OkexJsonObjectMapper.instance.readValue<OkexOrderBookWrapper>(it) }
54+
.map { it.data }
55+
.flatMapIterable {
56+
it.map { okexTickData ->
57+
OrderBook(
58+
"${okexTickData.instrumentId}${okexTickData.timestamp.toInstant().toEpochMilli()}",
59+
okexTickData.instrumentId,
60+
okexTickData.timestamp,
61+
ExchangeVendor.OKEX,
62+
okexTickData.getBids().toMutableList(),
63+
okexTickData.getAsks().toMutableList()
64+
)
65+
}
66+
}
67+
.map { orderBook ->
68+
if (!currentOrderBookMap.containsKey(orderBook.currencyPair)) {
69+
currentOrderBookMap[orderBook.currencyPair] = orderBook
70+
return@map orderBook
71+
}
72+
73+
val prevOrderBook = currentOrderBookMap[orderBook.currencyPair]!!
74+
75+
val askMap: MutableMap<BigDecimal, OrderBookUnit> = prevOrderBook.asks
76+
.map { Pair(it.price.stripTrailingZeros(), it) }
77+
.toMap()
78+
.toMutableMap()
79+
80+
orderBook.asks.forEach { updatedAsk ->
81+
askMap.compute(updatedAsk.price.stripTrailingZeros()) { _, oldValue ->
82+
when {
83+
oldValue == null -> updatedAsk
84+
updatedAsk.quantity <= BigDecimal.ZERO -> null
85+
else -> oldValue.copy(
86+
quantity = updatedAsk.quantity,
87+
orderNumbers = updatedAsk.orderNumbers
88+
)
89+
}
90+
}
91+
}
92+
93+
val bidMap: MutableMap<BigDecimal, OrderBookUnit> = prevOrderBook.bids
94+
.map { Pair(it.price.stripTrailingZeros(), it) }
95+
.toMap()
96+
.toMutableMap()
97+
98+
orderBook.bids.forEach { updatedBid ->
99+
bidMap.compute(updatedBid.price.stripTrailingZeros()) { _, oldValue ->
100+
when {
101+
oldValue == null -> updatedBid
102+
updatedBid.quantity <= BigDecimal.ZERO -> null
103+
else -> oldValue.copy(
104+
quantity = updatedBid.quantity,
105+
orderNumbers = updatedBid.orderNumbers
106+
)
107+
}
108+
}
109+
}
110+
111+
val currentOrderBook = prevOrderBook.copy(
112+
asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
113+
bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
114+
)
115+
currentOrderBookMap[currentOrderBook.currencyPair] = currentOrderBook
116+
currentOrderBook
117+
}
118+
}
119+
120+
override fun createTradeWebsocket(subscribeTargets: List<CurrencyPair>): Flux<TickData> {
121+
val subscribeMessages = subscribeTargets.stream()
122+
.map { "${it.targetCurrency.name}-${it.baseCurrency.name}" }
123+
.map { "{\"op\": \"subscribe\", \"args\": [\"spot/trade:$it\"]}" }
124+
.toList()
125+
126+
127+
return HttpClient.create()
128+
.wiretap(log.isDebugEnabled)
129+
.tcpConfiguration { tcp ->
130+
tcp.doOnConnected { connection ->
131+
connection.addHandler(Deflat64Decoder())
132+
}
133+
}
134+
.websocket()
135+
.uri(baseUri)
136+
.handle { inbound, outbound ->
137+
outbound.sendString(Flux.fromIterable<String>(subscribeMessages))
138+
.then()
139+
.thenMany(inbound.receive().asString())
140+
}
141+
.filter { t -> t.contains("\"spot/trade\"") }
142+
.map { OkexJsonObjectMapper.instance.readValue<OkexTickDataWrapper>(it) }
143+
.map { it.data }
144+
.flatMapIterable {
145+
it.map { okexTickData ->
146+
TickData(
147+
okexTickData.tradeId,
148+
okexTickData.timestamp,
149+
okexTickData.price,
150+
okexTickData.size,
151+
okexTickData.instrumentId,
152+
ExchangeVendor.OKEX
153+
)
154+
}
155+
}
156+
.doOnError { log.error(it.message, it) }
157+
}
158+
159+
private inner class Deflat64Decoder : ByteToMessageDecoder() {
160+
override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList<Any>) {
161+
Deflate64CompressorInputStream(ByteBufInputStream(msg)).use {
162+
val responseBody = StreamUtils.copyToString(it, Charset.forName("UTF-8"))
163+
val uncompressed = msg.alloc().buffer().writeBytes(responseBody.toByteArray())
164+
out.add(uncompressed)
165+
}
166+
}
167+
}
168+
}

0 commit comments

Comments
 (0)