-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrebalancing.py
122 lines (100 loc) · 4.36 KB
/
rebalancing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
Determine if positions are further than a predetermined amount from the ideal
positional allocation. If they are, rebalance the portfolio by selling assets first
and buying afterwards.
"""
# Non-local imports
from alpaca_trade_api.rest import APIError # handle this error
# Local imports
import threading
import time
# Project modules
from config import Config
import utils
from utils import alpaca
import errors
from allocation import allocation # the dictionary
# Define an ETFs dictionary with ideal allocations by symbol
etf_allocations = {value[1]: value[0] for value in allocation.values()}
def _fractional_order_errorhandling(side: str, symbol: str, amount: float):
"""Calls `utils.fractional_order` but handles errors if they come up.
This was designed for use in rebalancing orders, to skip a symbol if
there isn't enough buying power due to timing or other reason."""
try:
utils.fractional_order(side, symbol, amount)
except APIError as e:
if str(e) == 'insufficient buying power':
errors.report_error(
f"Error rebalancing {symbol} with {side} order for ${amount:,.2f}. " \
"Insufficient buying power. Skipped.",
console = utils.console
)
else:
raise APIError(e)
def _current_positions() -> dict[str, float] | str:
"""Returns a dictionary with keys being the symbol name and values
being a float of the market value of the position in the account.
Only searches for symbols mentioned in the allocation."""
etfs: list[str] = [value[1] for value in allocation.values()]
try:
return {etf: float(alpaca.get_position(etf).market_value) for etf in etfs}
except APIError as e:
if str(e) == "position does not exist":
return(
"Positions not found. "
"Allow allocation to automatically allocate account cash."
)
def positional_deltas() -> dict[str, float] | str:
"""Returns a dict of how off each position is in the account based on the
ideal etf allocations. A negative value means the account is _under_ the
expectation, so more of it should be bought to compensate."""
account_total = utils.account_equity() * Config.account_multiplier
positions = _current_positions()
if isinstance(positions, str): # if there was an ignorable error
return positions
ideal_alloc = {}
for etf, alloc in etf_allocations.items():
ideal_alloc[etf] = alloc * account_total
return_deltas = {}
for position in positions:
delta = round((positions[position] - ideal_alloc[position]), 2)
# Only if above threshold and if notional value is more than 2
if(
abs(delta) / ideal_alloc[position] > Config.rebalance_threshold
and abs(delta) > 2
):
return_deltas[position] = delta
return return_deltas
def rebalance_portfolio() -> dict[str, float] | str:
"""
Places orders and returns deltas.
`'buy if delta < 0` because if `delta < 0` it means the account is _under_
the ideal allocation.
"""
# Ensure no other positions are in the account
account_positions = [position.symbol for position in alpaca.list_positions()]
acceptable_etfs = [value[1] for value in allocation.values()]
if any([True for position in account_positions if position not in acceptable_etfs]):
return "Cannot rebalance portfolio as untracked positions were detected."
deltas = positional_deltas()
if isinstance(deltas, str): # if there was an ignorable, deliverable error
return deltas
# Sell orders first to free up buying power
for position, delta in deltas.items():
if delta > 0:
thread = threading.Thread(
target = _fractional_order_errorhandling,
args = ('sell', position, abs(delta))
)
thread.start()
# Sleep to ensure execution
time.sleep(5)
# Buy orders next
for position, delta in deltas.items():
if delta < 0:
thread = threading.Thread(
target = _fractional_order_errorhandling,
args = ('buy', position, abs(delta))
)
thread.start()
return deltas