Skip to content

Commit

Permalink
Got rid of websocket OHLC API, and added l1 tick streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillermo Rodriguez committed May 7, 2021
1 parent 7e49362 commit 604e195
Showing 1 changed file with 28 additions and 96 deletions.
124 changes: 28 additions & 96 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,6 @@
_url = 'https://api.binance.com'


# Broker specific ohlc schema (websocket)
_websocket_ohlc_dtype = [
('index', int),
('time', int),
('close_time', int),
('symbol', str),
('interval', str),
('first_trade_id', int),
('last_trade_id', int),
('open', float),
('close', float),
('high', float),
('low', float),
('volume', float),
('num_trades', int),
('closed', bool),
('quote_asset_volume', float),
('taker_buy_base_asset_volume', float),
('taker_buy_quote_asset_volume', float),
('ignore', int)
]

# Broker specific ohlc schema (rest)
_ohlc_dtype = [
('index', int),
Expand Down Expand Up @@ -150,6 +128,8 @@ class OHLC:
buy_base_vol: float
buy_quote_vol: float
ignore: int
# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)


# convert arrow timestamp to unixtime in miliseconds
Expand Down Expand Up @@ -235,30 +215,29 @@ async def get_client() -> Client:


async def stream_messages(ws):

too_slow_count = last_hb = 0

while True:

with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()

if msg.get('e') == 'kline':

yield 'ohlc', OHLC(*msg['k'].values())


def normalize(
ohlc: OHLC,
) -> dict:
quote = asdict(ohlc)
quote['broker_ts'] = quote['start_time']
quote['brokerd_ts'] = time.time()
quote['last'] = quote['close']
quote['time'] = quote['start_time']

# print(quote)
return ohlc.symbol, quote
# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
if list(msg.keys()) == ['u', 's', 'b', 'B', 'a', 'A']:
sym = msg['s']
bid = float(msg['b'])
bsize = float(msg['B'])
ask = float(msg['a'])
asize = float(msg['A'])

yield 'l1', {
'symbol': sym,
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize}
]
}


def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]:
Expand Down Expand Up @@ -389,10 +368,6 @@ async def stream_quotes(
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,

) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol><fiat_symbol>.
"""
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)

Expand Down Expand Up @@ -421,14 +396,9 @@ async def stream_quotes(
async with open_autorecon_ws('wss://stream.binance.com/ws') as ws:

# XXX: setup subs
ohlc_sub = make_sub(symbols, 'kline_1m', uid)
uid += 1

await ws.send_msg(ohlc_sub)
res = await ws.recv_msg()

# trade data (aka L1)
l1_sub = make_sub(symbols, 'trade', uid)
l1_sub = make_sub(symbols, 'bookTicker', uid)
uid += 1

await ws.send_msg(l1_sub)
Expand All @@ -438,56 +408,18 @@ async def stream_quotes(
msg_gen = stream_messages(ws)

# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()

topic, quote = normalize(ohlc_last)
typ, tick = await msg_gen.__anext__()

first_quote = {topic: quote}
first_quote = {tick['symbol']: tick}
task_status.started((init_msgs, first_quote))

# lol, only "closes" when they're margin squeezing clients ;P
feed_is_live.set()

# keep start of last interval for volume tracking
last_interval_start = ohlc_last.end_time

# start streaming
async for typ, ohlc in msg_gen:
...
# if typ == 'ohlc':

# # TODO: can get rid of all this by using
# # ``trades`` subscription...

# # generate tick values to match time & sales pane:
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
# volume = ohlc.volume

# # new OHLC sample interval
# if ohlc.etime > last_interval_start:
# last_interval_start = ohlc.etime
# tick_volume = volume

# else:
# # this is the tick volume *within the interval*
# tick_volume = volume - ohlc_last.volume

# ohlc_last = ohlc
# last = ohlc.close

# if tick_volume:
# ohlc.ticks.append({
# 'type': 'trade',
# 'price': last,
# 'size': tick_volume,
# })

# topic, quote = normalize(ohlc)
async for typ, msg in msg_gen:

# elif typ == 'l1':
# quote = ohlc
# topic = quote['symbol']
if typ == 'l1':
topic = msg['symbol']
quote = msg

# # XXX: format required by ``tractor.msg.pub``
# # requires a ``Dict[topic: str, quote: dict]``
# await send_chan.send({topic: quote})
await send_chan.send({topic: quote})

0 comments on commit 604e195

Please sign in to comment.