Skip to content

Commit

Permalink
Merge pull request #2 from pikers/binance_aggtrades_and_ohlc_parsing
Browse files Browse the repository at this point in the history
Get binance OHLC history and quote format correct
  • Loading branch information
Guillermo Rodriguez authored May 21, 2021
2 parents 604e195 + 7b26bd4 commit 169420b
Showing 1 changed file with 140 additions and 48 deletions.
188 changes: 140 additions & 48 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"""
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Union, Optional
import json
Expand All @@ -45,7 +44,7 @@


from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray

Expand All @@ -64,12 +63,14 @@
('low', float),
('close', float),
('volume', float),
('close_time', int),
('quote_vol', float),
('num_trades', int),
('buy_base_vol', float),
('buy_quote_vol', float),
('ignore', float)
# XXX: don't need these in shm history right?
# ('close_time', int),
# ('quote_vol', float),
# ('num_trades', int),
# ('buy_base_vol', float),
# ('buy_quote_vol', float),
# ('ignore', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]

# UI components allow this to be declared such that additional
Expand All @@ -78,9 +79,10 @@

_show_wap_in_history = False


# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
symbol: str
symbol: str
status: str

baseAsset: str
Expand All @@ -99,42 +101,67 @@ class Pair(BaseModel):
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool

filters: List[Dict[str, Union[str, int, float]]]
permissions: List[str]


# TODO: this isn't being used yet right?
@dataclass
class OHLC:
"""Description of the flattened OHLC quote format.
For schema details see:
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams
documented format:
```
[
[
1499040000000, // Open time
"0.01634790", // Open
"0.80000000", // High
"0.01575800", // Low
"0.01577100", // Close
"148976.11427815", // Volume
1499644799999, // Close time
"2434.19055334", // Quote asset volume
308, // Number of trades
"1756.87402397", // Taker buy base asset volume
"28.46694368", // Taker buy quote asset volume
"17928899.62484339" // Ignore.
]
]
```
"""
start_time: int
end_time: int
symbol: str
interval: str
first_id: int
last_id: int
time: int

open: float
close: float
high: float
low: float
base_vol: float
num_trades: int
closed: bool
close: float
volume: float

close_time: int

quote_vol: float
num_trades: int
buy_base_vol: float
buy_quote_vol: float
ignore: int

# null the place holder for `bar_wap` until we
# figure out what to extract for this.
bar_wap: float = 0.0

# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# ticks: List[Any] = field(default_factory=list)


# convert arrow timestamp to unixtime in miliseconds
def binance_timestamp(when):
return int((when.timestamp * 1000) + (when.microsecond / 1000))
return int((when.timestamp() * 1000) + (when.microsecond / 1000))


class Client:
Expand All @@ -158,14 +185,16 @@ async def _api(
async def symbol_info(
self,
sym: Optional[str] = None
):

) -> dict:

resp = await self._api('exchangeInfo', {})
if sym is not None:
for sym_info in resp['symbols']:
if sym_info['symbol'] == sym:
return sym_info
else:
raise BrokerError(f'{sym} not found')
raise SymbolNotFound(f'{sym} not found')
else:
return resp['symbols']

Expand All @@ -176,17 +205,18 @@ async def bars(
end_time: int = None,
limit: int = 1000, # <- max allowed per query
as_np: bool = True,

) -> dict:

if start_time is None:
start_time = binance_timestamp(
arrow.utcnow()
.floor('minute')
.shift(minutes=-limit)
arrow.utcnow().floor('minute').shift(minutes=-limit)
)

if end_time is None:
end_time = binance_timestamp(arrow.utcnow())

# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api(
'klines',
{
Expand All @@ -198,12 +228,29 @@ async def bars(
}
)

new_bars = [
(i,) + tuple(
ftype(bar[j])
for j, (name, ftype) in enumerate(_ohlc_dtype[1:])
) for i, bar in enumerate(bars)
]
# TODO: pack this bars scheme into a ``pydantic`` validator type:
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data

# TODO: we should port this to ``pydantic`` to avoid doing
# manual validation ourselves..
new_bars = []
for i, bar in enumerate(bars):

bar = OHLC(*bar)

row = []
for j, (name, ftype) in enumerate(_ohlc_dtype[1:]):

# TODO: maybe we should go nanoseconds on all
# history time stamps?
if name == 'time':
# convert to epoch seconds: float
row.append(bar.time / 1000.0)

else:
row.append(getattr(bar, name))

new_bars.append((i,) + tuple(row))

array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
Expand All @@ -214,15 +261,32 @@ async def get_client() -> Client:
yield Client()


# validation type
class AggTrade(BaseModel):
e: str # "aggTrade", # Event type
E: int # 123456789, # Event time
s: str # "BNBBTC", # Symbol
a: int # 12345, # Aggregate trade ID
p: float # "0.001", # Price
q: float # "100", # Quantity
f: int # 100, # First trade ID
l: int # 105, # Last trade ID
T: int # 123456785, # Trade time
m: bool # true, # Is the buyer the market maker?
M: bool # true # Ignore


async def stream_messages(ws):
while True:

with trio.move_on_after(5) as cs:
with trio.move_on_after(5):
msg = await ws.recv_msg()

# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']:
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams

if msg.get('u'):
sym = msg['s']
bid = float(msg['b'])
bsize = float(msg['B'])
Expand All @@ -239,6 +303,25 @@ async def stream_messages(ws):
]
}

elif msg.get('e') == 'aggTrade':

# validate
msg = AggTrade(**msg)

# TODO: type out and require this quote format
# from all backends!
yield 'trade', {
'symbol': msg.s,
'last': msg.p,
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'broker_ts': msg.T,
}],
}


def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
"""Create a request subscription packet dict.
Expand Down Expand Up @@ -395,31 +478,40 @@ async def stream_quotes(

async with open_autorecon_ws('wss://stream.binance.com/ws') as ws:

# XXX: setup subs
# setup subs

# trade data (aka L1)
# https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
l1_sub = make_sub(symbols, 'bookTicker', uid)
uid += 1

await ws.send_msg(l1_sub)

# aggregate (each order clear by taker **not** by maker)
# trades data:
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
agg_trades_sub = make_sub(symbols, 'aggTrade', uid)
await ws.send_msg(agg_trades_sub)

# ack from ws server
res = await ws.recv_msg()
assert res['id'] == uid

# pull a first quote and deliver
msg_gen = stream_messages(ws)

# TODO: use ``anext()`` when it lands in 3.10!
typ, tick = await msg_gen.__anext__()
typ, quote = await msg_gen.__anext__()

first_quote = {tick['symbol']: tick}
while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()

first_quote = {quote['symbol']: quote}
task_status.started((init_msgs, first_quote))

# signal to caller feed is ready for consumption
feed_is_live.set()

# start streaming
async for typ, msg in msg_gen:

if typ == 'l1':
topic = msg['symbol']
quote = msg

await send_chan.send({topic: quote})
topic = msg['symbol']
await send_chan.send({topic: msg})

0 comments on commit 169420b

Please sign in to comment.