Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Binance backend #170

Merged
merged 6 commits into from
May 24, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 140 additions & 48 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

"""
from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Union, Optional
import json
Expand All @@ -45,7 +44,7 @@


from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray

Expand All @@ -64,12 +63,14 @@
('low', float),
('close', float),
('volume', float),
('close_time', int),
('quote_vol', float),
('num_trades', int),
('buy_base_vol', float),
('buy_quote_vol', float),
('ignore', float)
# XXX: don't need these in shm history right?
# ('close_time', int),
# ('quote_vol', float),
# ('num_trades', int),
# ('buy_base_vol', float),
# ('buy_quote_vol', float),
# ('ignore', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]

# UI components allow this to be declared such that additional
Expand All @@ -78,9 +79,10 @@

_show_wap_in_history = False


# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
symbol: str
symbol: str
status: str

baseAsset: str
Expand All @@ -99,42 +101,67 @@ class Pair(BaseModel):
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool

filters: List[Dict[str, Union[str, int, float]]]
permissions: List[str]


# TODO: this isn't being used yet right?
@dataclass
class OHLC:
"""Description of the flattened OHLC quote format.

For schema details see:
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams
https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams

documented format:
```
[
[
1499040000000, // Open time
"0.01634790", // Open
"0.80000000", // High
"0.01575800", // Low
"0.01577100", // Close
"148976.11427815", // Volume
1499644799999, // Close time
"2434.19055334", // Quote asset volume
308, // Number of trades
"1756.87402397", // Taker buy base asset volume
"28.46694368", // Taker buy quote asset volume
"17928899.62484339" // Ignore.
]
]
```

"""
start_time: int
end_time: int
symbol: str
interval: str
first_id: int
last_id: int
time: int

open: float
close: float
high: float
low: float
base_vol: float
num_trades: int
closed: bool
close: float
volume: float

close_time: int

quote_vol: float
num_trades: int
buy_base_vol: float
buy_quote_vol: float
ignore: int

# null the place holder for `bar_wap` until we
# figure out what to extract for this.
bar_wap: float = 0.0

# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# ticks: List[Any] = field(default_factory=list)


# convert arrow timestamp to unixtime in miliseconds
def binance_timestamp(when):
return int((when.timestamp * 1000) + (when.microsecond / 1000))
return int((when.timestamp() * 1000) + (when.microsecond / 1000))


class Client:
Expand All @@ -158,14 +185,16 @@ async def _api(
async def symbol_info(
self,
sym: Optional[str] = None
):

) -> dict:

resp = await self._api('exchangeInfo', {})
if sym is not None:
for sym_info in resp['symbols']:
if sym_info['symbol'] == sym:
return sym_info
else:
raise BrokerError(f'{sym} not found')
raise SymbolNotFound(f'{sym} not found')
else:
return resp['symbols']

Expand All @@ -176,17 +205,18 @@ async def bars(
end_time: int = None,
limit: int = 1000, # <- max allowed per query
as_np: bool = True,

) -> dict:

if start_time is None:
start_time = binance_timestamp(
arrow.utcnow()
.floor('minute')
.shift(minutes=-limit)
arrow.utcnow().floor('minute').shift(minutes=-limit)
)

if end_time is None:
end_time = binance_timestamp(arrow.utcnow())

# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api(
'klines',
{
Expand All @@ -198,12 +228,29 @@ async def bars(
}
)

new_bars = [
(i,) + tuple(
ftype(bar[j])
for j, (name, ftype) in enumerate(_ohlc_dtype[1:])
) for i, bar in enumerate(bars)
]
# TODO: pack this bars scheme into a ``pydantic`` validator type:
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data

# TODO: we should port this to ``pydantic`` to avoid doing
# manual validation ourselves..
new_bars = []
for i, bar in enumerate(bars):

bar = OHLC(*bar)

row = []
for j, (name, ftype) in enumerate(_ohlc_dtype[1:]):

# TODO: maybe we should go nanoseconds on all
# history time stamps?
if name == 'time':
# convert to epoch seconds: float
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make an issue about possibly moving to nanoseconds as int for all time stamps.
I see no reason to not be thinking about UHFT down the road.

row.append(bar.time / 1000.0)

else:
row.append(getattr(bar, name))
Copy link
Contributor

@goodboy goodboy May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This felt really really clunky.

What's a better way to one shot validate these?
I don't know if you can unpack *bar in a pydantic model..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I mucked with this for a while and got nowhere too fancy 😂

Since our dtype is technically matching the dataclass we could actually probably just use asdict if we wanted to do it this way:

[nav] In [116]: @dataclass
           ...: class C:
           ...:     doggy: int
           ...:     num: int

[nav] In [117]: l = [C(doggy=9, num=10), C(doggy=11, num=12)]

[nav] In [118]: np.array(list(list(asdict(c).values()) for c in l), dtype={'names': ('doggy', 'num',), 'formats': (int, int)})
Out[118]:
array([[( 9,  9), (10, 10)],
       [(11, 11), (12, 12)]], dtype=[('doggy', '<i8'), ('num', '<i8')])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a pydantic.BaseModel.dict() would require array unpacking each bar into dict form since I still don't think pydantic.BaseModel supports the OHLC(*bar) style creation.

Not sure it'd be worth it anyway for readability.


new_bars.append((i,) + tuple(row))

array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
Expand All @@ -214,15 +261,32 @@ async def get_client() -> Client:
yield Client()


# validation type
class AggTrade(BaseModel):
e: str # "aggTrade", # Event type
E: int # 123456789, # Event time
s: str # "BNBBTC", # Symbol
a: int # 12345, # Aggregate trade ID
p: float # "0.001", # Price
q: float # "100", # Quantity
f: int # 100, # First trade ID
l: int # 105, # Last trade ID
T: int # 123456785, # Trade time
m: bool # true, # Is the buyer the market maker?
M: bool # true # Ignore


async def stream_messages(ws):
while True:

with trio.move_on_after(5) as cs:
with trio.move_on_after(5):
msg = await ws.recv_msg()

# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']:
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams

if msg.get('u'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that was my other question: can we do similar BaseModel unpacking/validation here for l1 messages?

sym = msg['s']
bid = float(msg['b'])
bsize = float(msg['B'])
Expand All @@ -239,6 +303,25 @@ async def stream_messages(ws):
]
}

elif msg.get('e') == 'aggTrade':

# validate
msg = AggTrade(**msg)

# TODO: type out and require this quote format
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely part of writing up the backend spec.

# from all backends!
yield 'trade', {
'symbol': msg.s,
'last': msg.p,
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'broker_ts': msg.T,
}],
}


def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
"""Create a request subscription packet dict.
Expand Down Expand Up @@ -395,31 +478,40 @@ async def stream_quotes(

async with open_autorecon_ws('wss://stream.binance.com/ws') as ws:

# XXX: setup subs
# setup subs

# trade data (aka L1)
# https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
l1_sub = make_sub(symbols, 'bookTicker', uid)
uid += 1

await ws.send_msg(l1_sub)

# aggregate (each order clear by taker **not** by maker)
# trades data:
# https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
agg_trades_sub = make_sub(symbols, 'aggTrade', uid)
await ws.send_msg(agg_trades_sub)

# ack from ws server
res = await ws.recv_msg()
assert res['id'] == uid

# pull a first quote and deliver
msg_gen = stream_messages(ws)

# TODO: use ``anext()`` when it lands in 3.10!
typ, tick = await msg_gen.__anext__()
typ, quote = await msg_gen.__anext__()

first_quote = {tick['symbol']: tick}
while typ != 'trade':
# TODO: use ``anext()`` when it lands in 3.10!
typ, quote = await msg_gen.__anext__()

first_quote = {quote['symbol']: quote}
task_status.started((init_msgs, first_quote))

# signal to caller feed is ready for consumption
feed_is_live.set()

# start streaming
async for typ, msg in msg_gen:

if typ == 'l1':
topic = msg['symbol']
quote = msg

await send_chan.send({topic: quote})
topic = msg['symbol']
await send_chan.send({topic: msg})