Skip to content

Commit 6f2a684

Browse files
PLAT-10343: Datafeed V2 service implementation (#124)
* Datafeed Loop v2 implementation In the BdkDatafeedConfig class, check if the `datafeed.version` configuration is specified. If not, the datafeed version equals `v1` In the ServiceFactory class, check if the datafeed version is configured as `v2`, return the DatafeedLoopV2 instance. Otherwise, the DatafeedLoopV1 will be used by default. Implementation the datafeed v2 loop service in DatafeedLoopV2 class * Unittest added * Update markdown documentation
1 parent 336638d commit 6f2a684

File tree

9 files changed

+379
-9
lines changed

9 files changed

+379
-9
lines changed

docs/configuration.md

+6
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ app:
8686
appId: app-id
8787
privateKey:
8888
path: path/to/private-key.pem
89+
90+
datafeed:
91+
version: v2
8992
```
9093
9194
### Configuration structure
@@ -110,3 +113,6 @@ manager which manages the key token of the bot.
110113
on pod.
111114
- `app` contains information about the extension app that the bot will use like
112115
the appId, the private key for authenticating the extension app.
116+
- `datafeed` contains information about the datafeed service that the bot will use for the `DatafeedLoop` service.
117+
If the version field is configured to `v2`, the datafeed service v2 will be used. Otherwise, the datafeed service v1
118+
will be used by default.

symphony/bdk/core/config/model/bdk_datafeed_config.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ def __init__(self, config):
1717
"""
1818
self.version = DF_V1
1919
self.id_file_path = ""
20-
if config is not None and DF_ID_FILE_PATH in config:
21-
self.id_file_path = Path(config.get(DF_ID_FILE_PATH))
20+
if config is not None:
21+
self.id_file_path = Path(config.get(DF_ID_FILE_PATH)) if DF_ID_FILE_PATH in config else ""
22+
self.version = config.get(VERSION)
2223

2324
def get_id_file_path(self) -> Path:
2425
"""

symphony/bdk/core/service/datafeed/abstract_datafeed_loop.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,21 @@
66
from enum import Enum
77
from typing import List
88

9-
from symphony.bdk.core.config.model.bdk_config import BdkConfig
109
from symphony.bdk.core.auth.auth_session import AuthSession
11-
12-
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
10+
from symphony.bdk.core.config.model.bdk_config import BdkConfig
1311
from symphony.bdk.core.service.datafeed.real_time_event_listener import RealTimeEventListener
12+
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
1413
from symphony.bdk.gen.agent_model.v4_event import V4Event
1514

1615
logger = logging.getLogger(__name__)
1716

17+
18+
class DatafeedVersion(Enum):
19+
20+
V1 = "v1"
21+
V2 = "v2"
22+
23+
1824
class RealTimeEvent(Enum):
1925
"""This enum lists all possible
2026
`Real Time Events <https://docs.developers.symphony.com/building-bots-on-symphony/datafeed/real-time-events>`_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from typing import Optional
2+
3+
from symphony.bdk.core.auth.auth_session import AuthSession
4+
from symphony.bdk.core.config.model.bdk_config import BdkConfig
5+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import AbstractDatafeedLoop
6+
from symphony.bdk.gen import ApiException
7+
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
8+
from symphony.bdk.gen.agent_model.ack_id import AckId
9+
from symphony.bdk.gen.agent_model.v5_datafeed import V5Datafeed
10+
11+
12+
class DatafeedLoopV2(AbstractDatafeedLoop):
13+
"""A class for implementing the datafeed v2 loop service.
14+
15+
This service will be started by calling :func:`~DatafeedLoopV2.start`.
16+
17+
On the very first run, the BDK bot will try to retrieve the list of datafeed to which it is listening.
18+
Since each bot should only listening to just one datafeed, the first datafeed in the list will be used by
19+
the bot to be listened to. If the retrieved list is empty, the BDK bot will create a new datafeed to listen.
20+
21+
The BDK bot will listen to this datafeed to get all the received real-time events.
22+
23+
If this datafeed becomes stale or faulty, the BDK bot will create the new one for listening.
24+
25+
This service will be stopped by calling :func:`~DatafeedLoopV1.stop`
26+
27+
If the datafeed service is stopped during a read datafeed call, it has to wait until the last read finish to be
28+
really stopped
29+
"""
30+
31+
def __init__(self, datafeed_api: DatafeedApi, auth_session: AuthSession, config: BdkConfig):
32+
super().__init__(datafeed_api, auth_session, config)
33+
self._ack_id = ""
34+
self._started = False
35+
self._datafeed_id = None
36+
37+
async def start(self):
38+
datafeed = await self._retrieve_datafeed()
39+
if not datafeed:
40+
datafeed = await self._create_datafeed()
41+
self._datafeed_id = datafeed.id
42+
self._started = True
43+
while self._started:
44+
try:
45+
await self._read_datafeed()
46+
except ApiException as e:
47+
if e.status == 400:
48+
datafeed = await self._recreate_datafeed()
49+
self._datafeed_id = datafeed.id
50+
else:
51+
raise e
52+
53+
async def stop(self):
54+
self._started = False
55+
56+
async def _retrieve_datafeed(self) -> Optional[V5Datafeed]:
57+
session_token = await self.auth_session.session_token
58+
key_manager_token = await self.auth_session.key_manager_token
59+
datafeeds = await self.datafeed_api.list_datafeed(session_token=session_token,
60+
key_manager_token=key_manager_token)
61+
if datafeeds:
62+
return datafeeds[0]
63+
return None
64+
65+
async def _create_datafeed(self) -> V5Datafeed:
66+
session_token = await self.auth_session.session_token
67+
key_manager_token = await self.auth_session.key_manager_token
68+
return await self.datafeed_api.create_datafeed(session_token=session_token, key_manager_token=key_manager_token)
69+
70+
async def _read_datafeed(self):
71+
params = {
72+
'session_token': await self.auth_session.session_token,
73+
'key_manager_token': await self.auth_session.key_manager_token,
74+
'datafeed_id': self._datafeed_id,
75+
'ack_id': AckId(ack_id=self._ack_id)
76+
}
77+
event_list = await self.datafeed_api.read_datafeed(**params)
78+
self._ack_id = event_list.ack_id
79+
if event_list.events:
80+
await self.handle_v4_event_list(events=event_list.events)
81+
82+
async def _delete_datafeed(self) -> None:
83+
session_token = await self.auth_session.session_token
84+
key_manager_token = await self.auth_session.key_manager_token
85+
await self.datafeed_api.delete_datafeed(datafeed_id=self._datafeed_id,
86+
session_token=session_token,
87+
key_manager_token=key_manager_token)
88+
89+
async def _recreate_datafeed(self):
90+
await self._delete_datafeed()
91+
self._ack_id = ""
92+
return await self._create_datafeed()

symphony/bdk/core/service_factory.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
from symphony.bdk.core.client.api_client_factory import ApiClientFactory
33
from symphony.bdk.core.config.model.bdk_config import BdkConfig
44
from symphony.bdk.core.service.connection.connection_service import ConnectionService, OboConnectionService
5+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import DatafeedVersion, AbstractDatafeedLoop
56
from symphony.bdk.core.service.datafeed.datafeed_loop_v1 import DatafeedLoopV1
7+
from symphony.bdk.core.service.datafeed.datafeed_loop_v2 import DatafeedLoopV2
68
from symphony.bdk.core.service.message.message_service import MessageService, OboMessageService
79
from symphony.bdk.core.service.message.multi_attachments_messages_api import MultiAttachmentsMessagesApi
810
from symphony.bdk.core.service.stream.stream_service import StreamService, OboStreamService
@@ -94,11 +96,18 @@ def get_stream_service(self) -> StreamService:
9496
ShareApi(self._agent_client),
9597
self._auth_session)
9698

97-
def get_datafeed_loop(self) -> DatafeedLoopV1:
99+
def get_datafeed_loop(self) -> AbstractDatafeedLoop:
98100
"""Returns a fully initialized DatafeedLoop
99101
100102
:return: a new DatafeedLoop instance.
101103
"""
104+
df_version = self._config.datafeed.version
105+
if df_version.lower() == DatafeedVersion.V2.value.lower():
106+
return DatafeedLoopV2(
107+
DatafeedApi(self._agent_client),
108+
self._auth_session,
109+
self._config
110+
)
102111
return DatafeedLoopV1(
103112
DatafeedApi(self._agent_client),
104113
self._auth_session,

symphony/bdk/core/symphony_bdk.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from symphony.bdk.core.client.api_client_factory import ApiClientFactory
88
from symphony.bdk.core.config.exception import BotNotConfiguredError
99
from symphony.bdk.core.service.connection.connection_service import ConnectionService
10-
from symphony.bdk.core.service.datafeed.datafeed_loop_v1 import DatafeedLoopV1
10+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import AbstractDatafeedLoop
1111
from symphony.bdk.core.service.message.message_service import MessageService
1212
from symphony.bdk.core.service.obo_services import OboServices
1313
from symphony.bdk.core.service.stream.stream_service import StreamService
@@ -123,7 +123,7 @@ def streams(self) -> StreamService:
123123
return self._stream_service
124124

125125
@bot_service
126-
def datafeed(self) -> DatafeedLoopV1:
126+
def datafeed(self) -> AbstractDatafeedLoop:
127127
"""Get the Datafeed loop from the BDK entry point.
128128
129129
:return: The Datafeed Loop instance.

tests/core/config/models/test_bdk_datafeed_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44

55

6-
@pytest.fixture(params=["v1", 25, True])
6+
@pytest.fixture(params=["v1"])
77
def datafeed_version(request):
88
return request.param
99

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
from unittest.mock import MagicMock, AsyncMock, call
2+
3+
import pytest
4+
5+
from symphony.bdk.core.auth.auth_session import AuthSession
6+
from symphony.bdk.core.config.loader import BdkConfigLoader
7+
from symphony.bdk.core.service.datafeed.abstract_datafeed_loop import RealTimeEvent
8+
from symphony.bdk.core.service.datafeed.datafeed_loop_v2 import DatafeedLoopV2
9+
from symphony.bdk.core.service.datafeed.real_time_event_listener import RealTimeEventListener
10+
from symphony.bdk.gen import ApiClient, ApiException
11+
from symphony.bdk.gen.agent_api.datafeed_api import DatafeedApi
12+
from symphony.bdk.gen.agent_model.ack_id import AckId
13+
from symphony.bdk.gen.agent_model.datafeed import Datafeed
14+
from symphony.bdk.gen.agent_model.v4_event import V4Event
15+
from symphony.bdk.gen.agent_model.v4_initiator import V4Initiator
16+
from symphony.bdk.gen.agent_model.v4_message_sent import V4MessageSent
17+
from symphony.bdk.gen.agent_model.v4_payload import V4Payload
18+
from symphony.bdk.gen.agent_model.v4_user import V4User
19+
from tests.utils.resource_utils import get_config_resource_filepath
20+
21+
22+
@pytest.fixture()
23+
def auth_session():
24+
auth_session = AuthSession(None)
25+
auth_session.session_token = "session_token"
26+
auth_session.key_manager_token = "km_token"
27+
return auth_session
28+
29+
30+
@pytest.fixture()
31+
def config():
32+
config = BdkConfigLoader.load_from_file(get_config_resource_filepath("config.yaml"))
33+
config.datafeed.version = "v2"
34+
return config
35+
36+
37+
@pytest.fixture()
38+
def datafeed_api():
39+
datafeed_api = MagicMock(DatafeedApi)
40+
datafeed_api.api_client = MagicMock(ApiClient)
41+
datafeed_api.list_datafeed = AsyncMock()
42+
datafeed_api.create_datafeed = AsyncMock()
43+
datafeed_api.read_datafeed = AsyncMock()
44+
datafeed_api.delete_datafeed = AsyncMock()
45+
return datafeed_api
46+
47+
48+
@pytest.fixture()
49+
def mock_listener():
50+
return AsyncMock(wraps=RealTimeEventListener())
51+
52+
53+
@pytest.fixture()
54+
def initiator():
55+
return V4Initiator(user=V4User(username="username"))
56+
57+
58+
@pytest.fixture()
59+
def message_sent_event(initiator):
60+
class EventsMock:
61+
def __init__(self, individual_event):
62+
self.events = [individual_event]
63+
self.ack_id = "ack_id"
64+
65+
v4_event = V4Event(type=RealTimeEvent.MESSAGESENT.name,
66+
payload=V4Payload(message_sent=V4MessageSent()),
67+
initiator=initiator)
68+
event = EventsMock(v4_event)
69+
return event
70+
71+
72+
@pytest.fixture()
73+
def datafeed_loop(datafeed_api, auth_session, config):
74+
datafeed_loop = DatafeedLoopV2(datafeed_api, auth_session, config)
75+
76+
class RealTimeEventListenerImpl(RealTimeEventListener):
77+
78+
async def on_message_sent(self, initiator: V4Initiator, event: V4MessageSent):
79+
await datafeed_loop.stop()
80+
81+
datafeed_loop.subscribe(RealTimeEventListenerImpl())
82+
return datafeed_loop
83+
84+
85+
@pytest.mark.asyncio
86+
async def test_start(datafeed_loop, datafeed_api, message_sent_event):
87+
datafeed_api.list_datafeed.return_value = []
88+
datafeed_api.create_datafeed.return_value = Datafeed(id="test_id")
89+
datafeed_api.read_datafeed.return_value = message_sent_event
90+
91+
await datafeed_loop.start()
92+
93+
datafeed_api.list_datafeed.assert_called_with(
94+
session_token="session_token",
95+
key_manager_token="km_token"
96+
)
97+
datafeed_api.create_datafeed.assert_called_with(
98+
session_token="session_token",
99+
key_manager_token="km_token"
100+
)
101+
datafeed_api.read_datafeed.assert_called_with(
102+
session_token="session_token",
103+
key_manager_token="km_token",
104+
datafeed_id="test_id",
105+
ack_id=AckId(ack_id="")
106+
)
107+
108+
assert datafeed_loop._datafeed_id == "test_id"
109+
assert datafeed_loop._ack_id == "ack_id"
110+
111+
112+
@pytest.mark.asyncio
113+
async def test_start_datafeed_exist(datafeed_loop, datafeed_api, message_sent_event):
114+
datafeed_api.list_datafeed.return_value = [Datafeed(id="test_id_exist")]
115+
datafeed_api.read_datafeed.return_value = message_sent_event
116+
117+
await datafeed_loop.start()
118+
119+
datafeed_api.list_datafeed.assert_called_with(
120+
session_token="session_token",
121+
key_manager_token="km_token"
122+
)
123+
datafeed_api.read_datafeed.assert_called_with(
124+
session_token="session_token",
125+
key_manager_token="km_token",
126+
datafeed_id="test_id_exist",
127+
ack_id=AckId(ack_id="")
128+
)
129+
130+
assert datafeed_loop._datafeed_id == "test_id_exist"
131+
assert datafeed_loop._ack_id == "ack_id"
132+
133+
134+
@pytest.mark.asyncio
135+
async def test_start_datafeed_stale_datafeed(datafeed_loop, datafeed_api, message_sent_event):
136+
datafeed_api.list_datafeed.return_value = [Datafeed(id="fault_datafeed_id")]
137+
datafeed_api.create_datafeed.return_value = Datafeed(id="test_id")
138+
datafeed_api.read_datafeed.side_effect = [ApiException(400), message_sent_event]
139+
140+
await datafeed_loop.start()
141+
142+
datafeed_api.list_datafeed.assert_called_with(
143+
session_token="session_token",
144+
key_manager_token="km_token"
145+
)
146+
147+
datafeed_api.delete_datafeed.assert_called_with(
148+
session_token="session_token",
149+
key_manager_token="km_token",
150+
datafeed_id="fault_datafeed_id"
151+
)
152+
153+
datafeed_api.create_datafeed.assert_called_with(
154+
session_token="session_token",
155+
key_manager_token="km_token"
156+
)
157+
158+
datafeed_api.read_datafeed.assert_has_calls([
159+
call(
160+
session_token="session_token",
161+
key_manager_token="km_token",
162+
datafeed_id="fault_datafeed_id",
163+
ack_id=AckId(ack_id="")
164+
),
165+
call(
166+
session_token="session_token",
167+
key_manager_token="km_token",
168+
datafeed_id="test_id",
169+
ack_id=AckId(ack_id="")
170+
)
171+
])
172+
173+
assert datafeed_loop._datafeed_id == "test_id"
174+
assert datafeed_loop._ack_id == "ack_id"

0 commit comments

Comments
 (0)