Skip to content

Commit 3fad1a6

Browse files
committed
Datafeed Loop v2 implementation
In the BdkDatafeedConfig class, check if the `datafeed.version` configuration is specified. If not, the datafeed version equals `v1` In the ServiceFactory class, check if the datafeed version is configured as `v2`, return the DatafeedLoopV2 instance. Otherwise, the DatafeedLoopV1 will be used by default. Implementation the datafeed v2 loop service in DatafeedLoopV2 class
1 parent abd52a7 commit 3fad1a6

File tree

5 files changed

+112
-7
lines changed

5 files changed

+112
-7
lines changed

symphony/bdk/core/config/model/bdk_datafeed_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def __init__(self, config):
1515
1616
:param config: the dict containing the datafeed specific confguration.
1717
"""
18-
self.version = DF_V1
18+
self.version = config.get(VERSION) if VERSION in config else DF_V1
1919
self.id_file_path = ""
2020
if config is not None and DF_ID_FILE_PATH in config:
2121
self.id_file_path = Path(config.get(DF_ID_FILE_PATH))

symphony/bdk/core/service/datafeed/abstract_datafeed_loop.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,21 @@
66
from enum import Enum
77
from typing import List
88

9-
from symphony.bdk.core.config.model.bdk_config import BdkConfig
109
from symphony.bdk.core.auth.auth_session import AuthSession
11-
12-
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
10+
from symphony.bdk.core.config.model.bdk_config import BdkConfig
1311
from symphony.bdk.core.service.datafeed.real_time_event_listener import RealTimeEventListener
12+
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
1413
from symphony.bdk.gen.agent_model.v4_event import V4Event
1514

1615
logger = logging.getLogger(__name__)
1716

17+
18+
class DatafeedVersion(Enum):
19+
20+
V1 = "v1"
21+
V2 = "v2"
22+
23+
1824
class RealTimeEvent(Enum):
1925
"""This enum lists all possible
2026
`Real Time Events <https://docs.developers.symphony.com/building-bots-on-symphony/datafeed/real-time-events>`_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from symphony.bdk.core.auth.auth_session import AuthSession
2+
from symphony.bdk.core.config.model.bdk_config import BdkConfig
3+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import AbstractDatafeedLoop
4+
from symphony.bdk.gen import ApiException
5+
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
6+
from symphony.bdk.gen.agent_model.ack_id import AckId
7+
from symphony.bdk.gen.agent_model.v5_datafeed import V5Datafeed
8+
from typing import Optional
9+
10+
11+
class DatafeedLoopV2(AbstractDatafeedLoop):
12+
"""A class for implementing the datafeed v2 loop service.
13+
14+
This service will be started by calling :func:`~DatafeedLoopV2.start`.
15+
16+
On the very first run, the BDK bot will try to retrieve the list of datafeed to which it is listening.
17+
Since each bot should only listening to just one datafeed, the first datafeed in the list will be used by
18+
the bot to be listened to. If the retrieved list is empty, the BDK bot will create a new datafeed to listen.
19+
20+
The BDK bot will listen to this datafeed to get all the received real-time events.
21+
22+
If this datafeed becomes stale or faulty, the BDK bot will create the new one for listening.
23+
24+
This service will be stopped by calling :func:`~DatafeedLoopV1.stop`
25+
26+
If the datafeed service is stopped during a read datafeed call, it has to wait until the last read finish to be
27+
really stopped
28+
"""
29+
30+
def __init__(self, datafeed_api: DatafeedApi, auth_session: AuthSession, config: BdkConfig):
31+
super().__init__(datafeed_api, auth_session, config)
32+
self._ack_id = ""
33+
self.started = False
34+
self.datafeed_id = None
35+
36+
async def start(self):
37+
datafeed = await self._retrieve_datafeed()
38+
if not datafeed:
39+
datafeed = await self._create_datafeed()
40+
self.datafeed_id = datafeed.id
41+
self.started = True
42+
while self.started:
43+
try:
44+
await self._read_datafeed()
45+
except ApiException as e:
46+
if e.status == 400:
47+
datafeed = await self._recreate_datafeed()
48+
self.datafeed_id = datafeed.id
49+
else:
50+
raise e
51+
52+
async def stop(self):
53+
self.started = False
54+
55+
async def _retrieve_datafeed(self) -> Optional[V5Datafeed]:
56+
session_token = await self.auth_session.session_token
57+
key_manager_token = await self.auth_session.key_manager_token
58+
datafeeds = await self.datafeed_api.list_datafeed(session_token=session_token,
59+
key_manager_token=key_manager_token)
60+
if datafeeds:
61+
return datafeeds[0]
62+
return None
63+
64+
async def _create_datafeed(self) -> V5Datafeed:
65+
session_token = await self.auth_session.session_token
66+
key_manager_token = await self.auth_session.key_manager_token
67+
return await self.datafeed_api.create_datafeed(session_token=session_token, key_manager_token=key_manager_token)
68+
69+
async def _read_datafeed(self):
70+
params = {
71+
'session_token': await self.auth_session.session_token,
72+
'key_manager_token': await self.auth_session.key_manager_token,
73+
'datafeed_id': self.datafeed_id,
74+
'ack_id': AckId(ack_id=self._ack_id)
75+
}
76+
event_list = await self.datafeed_api.read_datafeed(**params)
77+
self._ack_id = event_list.ack_id
78+
if event_list.events:
79+
await self.handle_v4_event_list(events=event_list.events)
80+
81+
async def _delete_datafeed(self) -> None:
82+
session_token = await self.auth_session.session_token
83+
key_manager_token = await self.auth_session.key_manager_token
84+
await self.datafeed_api.delete_datafeed(datafeed_id=self.datafeed_id,
85+
session_token=session_token,
86+
key_manager_token=key_manager_token)
87+
88+
async def _recreate_datafeed(self):
89+
await self._delete_datafeed()
90+
return await self._create_datafeed()

symphony/bdk/core/service_factory.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
from symphony.bdk.core.client.api_client_factory import ApiClientFactory
33
from symphony.bdk.core.config.model.bdk_config import BdkConfig
44
from symphony.bdk.core.service.connection.connection_service import ConnectionService
5+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import DatafeedVersion, AbstractDatafeedLoop
56
from symphony.bdk.core.service.datafeed.datafeed_loop_v1 import DatafeedLoopV1
7+
from symphony.bdk.core.service.datafeed.datafeed_loop_v2 import DatafeedLoopV2
68
from symphony.bdk.core.service.message.message_service import MessageService
79
from symphony.bdk.core.service.message.multi_attachments_messages_api import MultiAttachmentsMessagesApi
810
from symphony.bdk.core.service.stream.stream_service import StreamService
@@ -94,11 +96,18 @@ def get_stream_service(self) -> StreamService:
9496
ShareApi(self._agent_client),
9597
self._auth_session)
9698

97-
def get_datafeed_loop(self) -> DatafeedLoopV1:
99+
def get_datafeed_loop(self) -> AbstractDatafeedLoop:
98100
"""Returns a fully initialized DatafeedLoop
99101
100102
:return: a new DatafeedLoop instance.
101103
"""
104+
df_version = self._config.datafeed.version
105+
if df_version.lower() == DatafeedVersion.V2.value.lower():
106+
return DatafeedLoopV2(
107+
DatafeedApi(self._agent_client),
108+
self._auth_session,
109+
self._config
110+
)
102111
return DatafeedLoopV1(
103112
DatafeedApi(self._agent_client),
104113
self._auth_session,

symphony/bdk/core/symphony_bdk.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from symphony.bdk.core.auth.exception import AuthInitializationException
44
from symphony.bdk.core.client.api_client_factory import ApiClientFactory
55
from symphony.bdk.core.service.connection.connection_service import ConnectionService
6-
from symphony.bdk.core.service.datafeed.datafeed_loop_v1 import DatafeedLoopV1
6+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import AbstractDatafeedLoop
77
from symphony.bdk.core.service.message.message_service import MessageService
88
from symphony.bdk.core.service.stream.stream_service import StreamService
99
from symphony.bdk.core.service.user.user_service import UserService
@@ -73,7 +73,7 @@ def streams(self) -> StreamService:
7373
"""
7474
return self._stream_service
7575

76-
def datafeed(self) -> DatafeedLoopV1:
76+
def datafeed(self) -> AbstractDatafeedLoop:
7777
"""Get the Datafeed loop from the BDK entry point.
7878
7979
:return: The Datafeed Loop instance.

0 commit comments

Comments
 (0)