-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paththreading_example.py
49 lines (32 loc) · 1.13 KB
/
threading_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
import pyburner
from threading import Thread
heater = pyburner.Client("111.222.333.444")
event_loop = asyncio.new_event_loop()
def background_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()
def start_thread():
t = Thread(target=background_loop, args=(event_loop,), daemon=True)
t.start()
def init_websocket():
asyncio.run_coroutine_threadsafe(
heater.init_websocket(), event_loop)
def refresh_websocket():
asyncio.run_coroutine_threadsafe(
heater.refresh(), event_loop)
def close_websocket():
asyncio.run_coroutine_threadsafe(
heater.close_websocket(), event_loop)
def set_tempdesired(temperature: int):
future = asyncio.run_coroutine_threadsafe(
heater.set_tempdesired(temperature), event_loop)
print(future.result())
def set_runmode(run_mode: int):
future = asyncio.run_coroutine_threadsafe(
heater.set_runmode(run_mode), event_loop)
print(future.result())
def fetch_data(request):
future = asyncio.run_coroutine_threadsafe(
heater.fetch_data(request), event_loop)
print(future.result())