Skip to content

Commit

Permalink
Get binance OHLC history and quote format correct
Browse files Browse the repository at this point in the history
This gets the binance provider meeting the data feed schema requirements
of both the OHLC sampling/charting machinery as well as proper
formatting of historical OHLC history.

Notably,
- spec a minimal ohlc dtype based on the kline endpoint
- use a dataclass to parse out OHLC bar datums and pack into np.ndarray/shm
- add the ``aggTrade`` endpoint to get last clearing (traded) prices,
  validate with ``pydantic`` and then normalize these into our tick-quote
  format for delivery over the feed stream api.
- a notable requirement is that the "first" quote from the feed must
  contain a 'last` field so the clearing system can start up correctly.
  • Loading branch information
goodboy committed May 21, 2021
1 parent 604e195 commit 7b26bd4
Showing 1 changed file with 140 additions and 48 deletions.
188 changes: 140 additions & 48 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"""
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Union, Optional
import json
Expand All @@ -45,7 +44,7 @@


from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray

Expand All @@ -64,12 +63,14 @@
('low', float),
('close', float),
('volume', float),
('close_time', int),
('quote_vol', float),
('num_trades', int),
('buy_base_vol', float),
('buy_quote_vol', float),
('ignore', float)
# XXX: don't need these in shm history right?
# ('close_time', int),
# ('quote_vol', float),
# ('num_trades', int),
# ('buy_base_vol', float),
# ('buy_quote_vol', float),
# ('ignore', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]

# UI components allow this to be declared such that additional
Expand All @@ -78,9 +79,10 @@

_show_wap_in_history = False


# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
symbol: str
symbol: str
status: str

baseAsset: str
Expand All @@ -99,42 +101,67 @@ class Pair(BaseModel):
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool

filters: List[Dict[str, Union[str, int, float]]]
permissions: List[str]


# TODO: this isn't being used yet right?
@dataclass
class OHLC:
"""Description of the flattened OHLC quote format.
For schema details see:
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams
documented format:
```
[
[
1499040000000, // Open time
"0.01634790", // Open
"0.80000000", // High
"0.01575800", // Low
"0.01577100", // Close
"148976.11427815", // Volume
1499644799999, // Close time
"2434.19055334", // Quote asset volume
308, // Number of trades
"1756.87402397", // Taker buy base asset volume
"28.46694368", // Taker buy quote asset volume
"17928899.62484339" // Ignore.
]
]
```
"""
start_time: int
end_time: int
symbol: str
interval: str
first_id: int
last_id: int
time: int

open: float
close: float
high: float
low: float
base_vol: float
num_trades: int
closed: bool
close: float
volume: float

close_time: int

quote_vol: float
num_trades: int
buy_base_vol: float
buy_quote_vol: float
ignore: int

# null the place holder for `bar_wap` until we
# figure out what to extract for this.
bar_wap: float = 0.0

# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# ticks: List[Any] = field(default_factory=list)


# convert arrow timestamp to unixtime in miliseconds
def binance_timestamp(when):
return int((when.timestamp * 1000) + (when.microsecond / 1000))
return int((when.timestamp() * 1000) + (when.microsecond / 1000))


class Client:
Expand All @@ -158,14 +185,16 @@ async def _api(
async def symbol_info(
self,
sym: Optional[str] = None
):

) -> dict:

resp = await self._api('exchangeInfo', {})
if sym is not None:
for sym_info in resp['symbols']:
if sym_info['symbol'] == sym:
return sym_info
else:
raise BrokerError(f'{sym} not found')
raise SymbolNotFound(f'{sym} not found')
else:
return resp['symbols']

Expand All @@ -176,17 +205,18 @@ async def bars(
end_time: int = None,
limit: int = 1000, # <- max allowed per query
as_np: bool = True,

) -> dict:

if start_time is None:
start_time = binance_timestamp(
arrow.utcnow()
.floor('minute')
.shift(minutes=-limit)
arrow.utcnow().floor('minute').shift(minutes=-limit)
)

if end_time is None:
end_time = binance_timestamp(arrow.utcnow())

# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api(
'klines',
{
Expand All @@ -198,12 +228,29 @@ async def bars(
}
)

new_bars = [
(i,) + tuple(
ftype(bar[j])
for j, (name, ftype) in enumerate(_ohlc_dtype[1:])
) for i, bar in enumerate(bars)
]
# TODO: pack this bars scheme into a ``pydantic`` validator type:
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data

# TODO: we should port this to ``pydantic`` to avoid doing
# manual validation ourselves..
new_bars = []
for i, bar in enumerate(bars):

bar = OHLC(*bar)

row = []
for j, (name, ftype) in enumerate(_ohlc_dtype[1:]):

# TODO: maybe we should go nanoseconds on all
# history time stamps?
if name == 'time':
# convert to epoch seconds: float
row.append(bar.time / 1000.0)

else:
row.append(getattr(bar, name))

new_bars.append((i,) + tuple(row))

array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
Expand All @@ -214,15 +261,32 @@ async def get_client() -> Client:
yield Client()


# validation type
class AggTrade(BaseModel):
e: str # "aggTrade", # Event type
E: int # 123456789, # Event time
s: str # "BNBBTC", # Symbol
a: int # 12345, # Aggregate trade ID
p: float # "0.001", # Price
q: float # "100", # Quantity
f: int # 100, # First trade ID
l: int # 105, # Last trade ID
T: int # 123456785, # Trade time
m: bool # true, # Is the buyer the market maker?
M: bool # true # Ignore


async def stream_messages(ws):
while True:

with trio.move_on_after(5) as cs:
with trio.move_on_after(5):
msg = await ws.recv_msg()

# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']:
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams

if msg.get('u'):
sym = msg['s']
bid = float(msg['b'])
bsize = float(msg['B'])
Expand All @@ -239,6 +303,25 @@ async def stream_messages(ws):
]
}

elif msg.get('e') == 'aggTrade':

# validate
msg = AggTrade(**msg)

# TODO: type out and require this quote format
# from all backends!
yield 'trade', {
'symbol': msg.s,
'last': msg.p,
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'broker_ts': msg.T,
}],
}


def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
"""Create a request subscription packet dict.
Expand Down Expand Up @@ -395,31 +478,40 @@ async def stream_quotes(

async with open_autorecon_ws('wss://stream.binance.com/ws') as ws:

# XXX: setup subs
# setup subs

# trade data (aka L1)
# https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
l1_sub = make_sub(symbols, 'bookTicker', uid)
uid += 1

await ws.send_msg(l1_sub)

# aggregate (each order clear by taker **not** by maker)
# trades data:
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
agg_trades_sub = make_sub(symbols, 'aggTrade', uid)
await ws.send_msg(agg_trades_sub)

# ack from ws server
res = await ws.recv_msg()
assert res['id'] == uid

# pull a first quote and deliver
msg_gen = stream_messages(ws)

# TODO: use ``anext()`` when it lands in 3.10!
typ, tick = await msg_gen.__anext__()
typ, quote = await msg_gen.__anext__()

first_quote = {tick['symbol']: tick}
while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()

first_quote = {quote['symbol']: quote}
task_status.started((init_msgs, first_quote))

# signal to caller feed is ready for consumption
feed_is_live.set()

# start streaming
async for typ, msg in msg_gen:

if typ == 'l1':
topic = msg['symbol']
quote = msg

await send_chan.send({topic: quote})
topic = msg['symbol']
await send_chan.send({topic: msg})

0 comments on commit 7b26bd4

Please sign in to comment.