Skip to content

Commit 1c746a8

Browse files
authored
Merge pull request #73 from namjug-kim/add_poloniex
feat: add poloniex websocket client
2 parents 6ef4d64 + 516570f commit 1c746a8

File tree

19 files changed

+972
-1
lines changed

19 files changed

+972
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Support public market feature (tickData, orderBook)
3434
| ![bitmax](https://user-images.githubusercontent.com/16334718/57548356-b082d480-739b-11e9-9539-b27c60877fb6.jpg) | Bitmax | BITMAX | v1.2 | [ws](https://github.com/bitmax-exchange/api-doc/blob/master/bitmax-api-doc-v1.2.md) |
3535
| ![idax](https://user-images.githubusercontent.com/16334718/58029691-128bc880-7b58-11e9-9aaa-a331f394c8bd.jpg) | Idax | IDAX | * | [ws](https://github.com/idax-exchange/idax-official-api-docs/blob/master/open-ws_en.md) |
3636
| ![coineal](https://user-images.githubusercontent.com/16334718/58037062-7d90cb80-7b67-11e9-9278-e8b03c5ddd86.jpg) | Coineal | COINEAL | ⚠️ | ⚠️ |
37+
| ![poloniex](https://user-images.githubusercontent.com/16334718/59551277-335a0900-8fb2-11e9-9d1e-4ab2a7574148.jpg) | Poloniex | POLONIEX | * | [ws](https://docs.poloniex.com/#websocket-api) |
3738

3839
⚠️ : Uses endpoints that are used by the official web. This is not an official api and should be used with care.
3940

reactive-crypto-core/src/main/kotlin/com/njkim/reactivecrypto/core/common/model/ExchangeVendor.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ enum class ExchangeVendor {
3030
BITMAX,
3131
IDAX,
3232
COINEAL,
33+
POLONIEX,
3334
UNKNOWN;
3435

3536
/**
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2019 namjug-kim
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
apply plugin: 'kotlin'
18+
apply plugin: 'org.jetbrains.kotlin.jvm'
19+
20+
version '1.0-SNAPSHOT'
21+
22+
dependencies {
23+
compile project(':reactive-crypto-core')
24+
25+
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
26+
}
27+
28+
compileKotlin {
29+
kotlinOptions.jvmTarget = "1.8"
30+
}
31+
compileTestKotlin {
32+
kotlinOptions.jvmTarget = "1.8"
33+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright 2019 namjug-kim
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.njkim.reactivecrypto.poloniex
18+
19+
import com.fasterxml.jackson.core.JsonParser
20+
import com.fasterxml.jackson.databind.DeserializationContext
21+
import com.fasterxml.jackson.databind.JsonDeserializer
22+
import com.fasterxml.jackson.databind.JsonNode
23+
import com.fasterxml.jackson.databind.ObjectMapper
24+
import com.fasterxml.jackson.databind.module.SimpleModule
25+
import com.njkim.reactivecrypto.core.ExchangeJsonObjectMapper
26+
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
27+
import com.njkim.reactivecrypto.core.common.model.order.OrderBookUnit
28+
import com.njkim.reactivecrypto.core.common.model.order.OrderSideType
29+
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType
30+
import com.njkim.reactivecrypto.poloniex.model.*
31+
import java.math.BigDecimal
32+
import java.time.Instant
33+
import java.time.ZoneId
34+
import java.time.ZonedDateTime
35+
36+
class PoloniexJsonObjectMapper : ExchangeJsonObjectMapper {
37+
38+
companion object {
39+
val instance: ObjectMapper = com.njkim.reactivecrypto.poloniex.PoloniexJsonObjectMapper().objectMapper()
40+
}
41+
42+
override fun zonedDateTimeDeserializer(): JsonDeserializer<ZonedDateTime> {
43+
return object : JsonDeserializer<ZonedDateTime>() {
44+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): ZonedDateTime {
45+
return Instant.ofEpochSecond(p.valueAsLong).atZone(ZoneId.systemDefault())
46+
}
47+
}
48+
}
49+
50+
/**
51+
* {baseCurrency}_{targetCurrency}
52+
*/
53+
override fun currencyPairDeserializer(): JsonDeserializer<CurrencyPair> {
54+
return object : JsonDeserializer<CurrencyPair>() {
55+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): CurrencyPair {
56+
val currencyPairRawValue = p.valueAsString
57+
val split = currencyPairRawValue.split("_")
58+
59+
return CurrencyPair.parse(split[1], split[0])
60+
}
61+
}
62+
}
63+
64+
override fun orderSideTypeDeserializer(): JsonDeserializer<OrderSideType>? {
65+
// <1 for bid 0 for ask>
66+
return object : JsonDeserializer<OrderSideType>() {
67+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): OrderSideType {
68+
return when (p.valueAsInt) {
69+
0 -> OrderSideType.ASK
70+
1 -> OrderSideType.BID
71+
else -> throw IllegalArgumentException()
72+
}
73+
}
74+
}
75+
}
76+
77+
override fun tradeSideTypeDeserializer(): JsonDeserializer<TradeSideType>? {
78+
// <1 for buy 0 for sell>
79+
return object : JsonDeserializer<TradeSideType>() {
80+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): TradeSideType {
81+
return when (p.valueAsInt) {
82+
0 -> TradeSideType.SELL
83+
1 -> TradeSideType.BUY
84+
else -> throw IllegalArgumentException()
85+
}
86+
}
87+
}
88+
}
89+
90+
override fun customConfiguration(simpleModule: SimpleModule) {
91+
val poloniexMessageFrameDeserializer = object : JsonDeserializer<PoloniexMessageFrame>() {
92+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexMessageFrame {
93+
val jsonNode: JsonNode = p.codec.readTree(p)
94+
val channelId = jsonNode[0].asLong()
95+
val sequenceNumber = jsonNode[1].asLong()
96+
val events = jsonNode[2].map { event ->
97+
val eventType = PoloniexEventType.parse(event[0].asText())
98+
PoloniexJsonObjectMapper.instance.convertValue(event, eventType.classType)
99+
}
100+
101+
return PoloniexMessageFrame(
102+
channelId, sequenceNumber, events
103+
)
104+
}
105+
}
106+
107+
// ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>] ]
108+
val tradeEventDeserializer = object : JsonDeserializer<PoloniexTradeEvent>() {
109+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexTradeEvent {
110+
val jsonNode: JsonNode = p.codec.readTree(p)
111+
112+
val tradeId: String = jsonNode[1].asText()
113+
val side: TradeSideType = instance.convertValue(jsonNode[2], TradeSideType::class.java)
114+
val price: BigDecimal = instance.convertValue(jsonNode[3], BigDecimal::class.java)
115+
val size: BigDecimal = instance.convertValue(jsonNode[4], BigDecimal::class.java)
116+
val timestamp: ZonedDateTime = instance.convertValue(jsonNode[5], ZonedDateTime::class.java)
117+
118+
return PoloniexTradeEvent(
119+
tradeId,
120+
side,
121+
price,
122+
size,
123+
timestamp
124+
)
125+
}
126+
}
127+
128+
val orderBookUpdateEventDeserializer = object : JsonDeserializer<PoloniexOrderBookUpdateEvent>() {
129+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexOrderBookUpdateEvent {
130+
val jsonNode: JsonNode = p.codec.readTree(p)
131+
132+
val side: OrderSideType = instance.convertValue(jsonNode[1], OrderSideType::class.java)
133+
val price: BigDecimal = instance.convertValue(jsonNode[2], BigDecimal::class.java)
134+
val size: BigDecimal = instance.convertValue(jsonNode[3], BigDecimal::class.java)
135+
136+
return PoloniexOrderBookUpdateEvent(
137+
side,
138+
price,
139+
size
140+
)
141+
}
142+
}
143+
144+
/**
145+
* [
146+
* "i",
147+
* {
148+
* "currencyPair": "<currency pair name>",
149+
* "orderBook": [
150+
* { "<lowest ask price>": "<lowest ask size>", "<next ask price>": "<next ask size>", ... },
151+
* { "<highest bid price>": "<highest bid size>", "<next bid price>": "<next bid size>", ... }
152+
* ]
153+
* }
154+
* ]
155+
*/
156+
val orderBookSnapshotEventDeserializer = object : JsonDeserializer<PoloniexOrderBookSnapshotEvent>() {
157+
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): PoloniexOrderBookSnapshotEvent {
158+
val jsonNode: JsonNode = p.codec.readTree(p)
159+
160+
val orderBookSnapshotNode = jsonNode.get(1)
161+
val currencyPair =
162+
instance.convertValue(orderBookSnapshotNode.get("currencyPair"), CurrencyPair::class.java)
163+
val orderBookNode = orderBookSnapshotNode.get("orderBook")
164+
val asksNode = orderBookNode.get(0)
165+
val asks = asksNode.fields().asSequence().toList().map { mutableEntry ->
166+
val price = mutableEntry.key
167+
val size = mutableEntry.value
168+
169+
OrderBookUnit(
170+
instance.convertValue(price, BigDecimal::class.java),
171+
instance.convertValue(size, BigDecimal::class.java),
172+
OrderSideType.ASK
173+
)
174+
}
175+
176+
val bidsNode = orderBookNode.get(1)
177+
val bids = bidsNode.fields().asSequence().toList().map { mutableEntry ->
178+
val price = mutableEntry.key
179+
val size = mutableEntry.value
180+
181+
OrderBookUnit(
182+
instance.convertValue(price, BigDecimal::class.java),
183+
instance.convertValue(size, BigDecimal::class.java),
184+
OrderSideType.BID
185+
)
186+
}
187+
188+
return PoloniexOrderBookSnapshotEvent(
189+
currencyPair,
190+
bids,
191+
asks
192+
)
193+
}
194+
}
195+
196+
simpleModule.addDeserializer(PoloniexMessageFrame::class.java, poloniexMessageFrameDeserializer)
197+
simpleModule.addDeserializer(PoloniexTradeEvent::class.java, tradeEventDeserializer)
198+
simpleModule.addDeserializer(PoloniexOrderBookUpdateEvent::class.java, orderBookUpdateEventDeserializer)
199+
simpleModule.addDeserializer(PoloniexOrderBookSnapshotEvent::class.java, orderBookSnapshotEventDeserializer)
200+
}
201+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.njkim.reactivecrypto.poloniex
2+
3+
import com.fasterxml.jackson.module.kotlin.readValue
4+
import com.njkim.reactivecrypto.core.common.model.currency.CurrencyPair
5+
import com.njkim.reactivecrypto.poloniex.model.PoloniexEventType
6+
import com.njkim.reactivecrypto.poloniex.model.PoloniexMessageFrame
7+
import com.njkim.reactivecrypto.poloniex.model.PoloniexOrderBookSnapshotEvent
8+
import reactor.core.publisher.Flux
9+
import reactor.core.publisher.toFlux
10+
import reactor.netty.http.client.HttpClient
11+
12+
class PoloniexRawWebsocketClient {
13+
private val baseUrl: String = "wss://api2.poloniex.com"
14+
15+
/**
16+
* Subscribe to price aggregated depth of book by currency pair.
17+
* Response includes an initial book snapshot, book modifications, and trades.
18+
* Book modification updates with 0 quantity should be treated as removal of the price level.
19+
* Note that the updates are price aggregated and do not contain individual orders.
20+
*
21+
*/
22+
fun priceAggregatedBook(currencyPairs: List<CurrencyPair>): Flux<PoloniexMessageFrame> {
23+
val channelIdCurrencyPairMap: MutableMap<Long, CurrencyPair> = HashMap()
24+
25+
// { "command": "subscribe", "channel": "<channel id>" }
26+
val subscribeChannels = currencyPairs
27+
.map { "${it.baseCurrency}_${it.targetCurrency}" }
28+
.map { "{ \"command\": \"subscribe\", \"channel\": \"$it\" }" }
29+
.toFlux()
30+
31+
// TODO heartbeat check
32+
return HttpClient.create()
33+
.websocket(655360)
34+
.uri(baseUrl)
35+
.handle { inbound, outbound ->
36+
outbound.sendString(subscribeChannels)
37+
.then()
38+
.thenMany(inbound.receive().asString())
39+
}
40+
.filter { it != "[1010]" } // ping message
41+
.map { PoloniexJsonObjectMapper.instance.readValue<PoloniexMessageFrame>(it) }
42+
// set currencyPair info for each channel
43+
.doOnNext { messageFrame ->
44+
val orderBookSnapshotEvent = messageFrame.events
45+
.filter { it.eventType == PoloniexEventType.ORDER_BOOK_SNAPSHOT }
46+
.map { it as PoloniexOrderBookSnapshotEvent }
47+
.firstOrNull()
48+
49+
if (orderBookSnapshotEvent != null) {
50+
val channelId = messageFrame.channelId
51+
channelIdCurrencyPairMap[channelId] = orderBookSnapshotEvent.currencyPair
52+
}
53+
}
54+
.doOnNext { messageFrame ->
55+
val channelId = messageFrame.channelId
56+
messageFrame.events
57+
.forEach { event ->
58+
event.currencyPair = channelIdCurrencyPairMap[channelId]!!
59+
}
60+
}
61+
.doFinally {
62+
channelIdCurrencyPairMap.clear()
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)