Skip to content

Commit 682e6a4

Browse files
authored
Initial work to get attribute subscription for chip-repl yamltest runner (#24242)
* Initial work to get attribute subscrition for chip-repl yamltest runner * Address PR comments * Restyle * Restyle * Restyle
1 parent f15228d commit 682e6a4

File tree

1 file changed

+166
-3
lines changed

1 file changed

+166
-3
lines changed

src/controller/python/chip/yaml/runner.py

+166-3
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
import asyncio as asyncio
1919
import logging
20+
import queue
2021
from abc import ABC, abstractmethod
21-
from dataclasses import dataclass
22+
from dataclasses import dataclass, field
2223
from enum import Enum
2324

2425
import chip.interaction_model
2526
import chip.yaml.format_converter as Converter
2627
import stringcase
2728
from chip import ChipDeviceCtrl
28-
from chip.clusters.Attribute import AttributeStatus, ValueDecodeFailure
29+
from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath, ValueDecodeFailure
2930
from chip.yaml.errors import ParsingError, UnexpectedParsingError
3031

3132
from .data_model_lookup import *
@@ -44,11 +45,24 @@ class _ActionResult:
4445
response: object
4546

4647

48+
@dataclass
49+
class _AttributeSubscriptionCallbackResult:
50+
name: str
51+
attribute_path: TypedAttributePath
52+
result: _ActionResult
53+
54+
4755
@dataclass
4856
class _ExecutionContext:
4957
''' Objects that is commonly passed around this file that are vital to test execution.'''
5058
# Data model lookup to get python attribute, cluster, command object.
5159
data_model_lookup: DataModelLookup = None
60+
# List of subscriptions.
61+
subscriptions: list = field(default_factory=list)
62+
# The key is the attribute/event name, and the value is a queue of subscription callback results
63+
# that been sent by device under test. For attribute subscription the queue is of type
64+
# _AttributeSubscriptionCallbackResult.
65+
subscription_callback_result_queue: dict = field(default_factory=dict)
5266

5367

5468
class BaseAction(ABC):
@@ -175,6 +189,9 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
175189
except chip.interaction_model.InteractionModelError as error:
176190
return _ActionResult(status=_ActionStatus.ERROR, response=error)
177191

192+
return self.parse_raw_response(raw_resp)
193+
194+
def parse_raw_response(self, raw_resp) -> _ActionResult:
178195
if self._possibly_unsupported and not raw_resp:
179196
# We have found an unsupported attribute. TestStep provided did specify that it might be
180197
# unsupported, so nothing left to validate. We just return a failure here.
@@ -194,6 +211,83 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
194211
return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)
195212

196213

214+
class AttributeChangeAccumulator:
215+
def __init__(self, name: str, expected_attribute: Clusters.ClusterAttributeDescriptor,
216+
output_queue: queue.SimpleQueue):
217+
self._name = name
218+
self._expected_attribute = expected_attribute
219+
self._output_queue = output_queue
220+
221+
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
222+
if path.AttributeType == self._expected_attribute:
223+
data = transaction.GetAttribute(path)
224+
result = _ActionResult(status=_ActionStatus.SUCCESS, response=path.AttributeType(data))
225+
226+
item = _AttributeSubscriptionCallbackResult(self._name, path, result)
227+
logging.debug(
228+
f'Got subscription report on client {self.name} for {path.AttributeType}: {data}')
229+
self._output_queue.put(item)
230+
231+
@property
232+
def name(self) -> str:
233+
return self._name
234+
235+
236+
class SubscribeAttributeAction(ReadAttributeAction):
237+
'''Single subscribe attribute action to be executed.'''
238+
239+
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
240+
'''Converts 'test_step' to subscribe attribute action that can execute with ChipDeviceCtrl.
241+
242+
Args:
243+
'test_step': Step containing information required to run write attribute action.
244+
'cluster': Name of cluster write attribute action is targeting.
245+
'context': Contains test-wide common objects such as DataModelLookup instance.
246+
Raises:
247+
ParsingError: Raised if there is a benign error, and there is currently no
248+
action to perform for this write attribute.
249+
UnexpectedParsingError: Raised if there is an unexpected parsing error.
250+
'''
251+
super().__init__(test_step, cluster, context)
252+
self._context = context
253+
if test_step.min_interval is None:
254+
raise UnexpectedParsingError(
255+
f'SubscribeAttribute action does not have min_interval {self.label}')
256+
self._min_interval = test_step.min_interval
257+
258+
if test_step.max_interval is None:
259+
raise UnexpectedParsingError(
260+
f'SubscribeAttribute action does not have max_interval {self.label}')
261+
self._max_interval = test_step.max_interval
262+
263+
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
264+
try:
265+
subscription = asyncio.run(
266+
dev_ctrl.ReadAttribute(self._node_id, [(self._endpoint, self._request_object)],
267+
reportInterval=(self._min_interval, self._max_interval),
268+
keepSubscriptions=False))
269+
except chip.interaction_model.InteractionModelError as error:
270+
return _ActionResult(status=_ActionStatus.ERROR, response=error)
271+
272+
self._context.subscriptions.append(subscription)
273+
output_queue = self._context.subscription_callback_result_queue.get(self._attribute_name,
274+
None)
275+
if output_queue is None:
276+
output_queue = queue.SimpleQueue()
277+
self._context.subscription_callback_result_queue[self._attribute_name] = output_queue
278+
279+
while not output_queue.empty():
280+
output_queue.get(block=False)
281+
282+
subscription_handler = AttributeChangeAccumulator(self.label, self._request_object,
283+
output_queue)
284+
285+
subscription.SetAttributeUpdateCallback(subscription_handler)
286+
287+
raw_resp = subscription.GetAttributes()
288+
return self.parse_raw_response(raw_resp)
289+
290+
197291
class WriteAttributeAction(BaseAction):
198292
'''Single write attribute action to be executed.'''
199293

@@ -258,6 +352,37 @@ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
258352
return _ActionResult(status=_ActionStatus.ERROR, response=None)
259353

260354

355+
class WaitForReportAction(BaseAction):
356+
'''Single WaitForReport action to be executed.'''
357+
358+
def __init__(self, test_step, context: _ExecutionContext):
359+
'''Converts 'test_step' to wait for report action.
360+
361+
Args:
362+
'test_step': Step containing information required to run wait for report action.
363+
'context': Contains test-wide common objects such as DataModelLookup instance.
364+
Raises:
365+
UnexpectedParsingError: Raised if the expected queue does not exist.
366+
'''
367+
super().__init__(test_step.label)
368+
self._attribute_name = stringcase.pascalcase(test_step.attribute)
369+
self._output_queue = context.subscription_callback_result_queue.get(self._attribute_name,
370+
None)
371+
if self._output_queue is None:
372+
raise UnexpectedParsingError(f'Could not find output queue')
373+
374+
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
375+
try:
376+
# While there should be a timeout here provided by the test, the current codegen version
377+
# of YAML tests doesn't have a per test step timeout, only a global timeout for the
378+
# entire test. For that reason we default to a 30 second timeout.
379+
item = self._output_queue.get(block=True, timeout=30)
380+
except queue.Empty:
381+
return _ActionResult(status=_ActionStatus.ERROR, response=None)
382+
383+
return item.result
384+
385+
261386
class ReplTestRunner:
262387
'''Test runner to encode/decode values from YAML test Parser for executing the TestStep.
263388
@@ -301,6 +426,25 @@ def _attribute_read_action_factory(self, test_step, cluster: str):
301426
except ParsingError:
302427
return None
303428

429+
def _attribute_subscribe_action_factory(self, test_step, cluster: str):
430+
'''Creates subscribe attribute command from TestStep provided.
431+
432+
Args:
433+
'test_step': Step containing information required to run subscribe attribute action.
434+
'cluster': Name of cluster write attribute action is targeting.
435+
Returns:
436+
SubscribeAttributeAction if 'test_step' is a valid subscribe attribute to be executed.
437+
None if we were unable to use the provided 'test_step' for a known reason that is not
438+
fatal to test execution.
439+
'''
440+
try:
441+
return SubscribeAttributeAction(test_step, cluster, self._context)
442+
except ParsingError:
443+
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
444+
# runner has matched parity of the codegen YAML test, this exception should be
445+
# propogated.
446+
return None
447+
304448
def _attribute_write_action_factory(self, test_step, cluster: str):
305449
'''Creates write attribute command TestStep.
306450
@@ -317,6 +461,15 @@ def _attribute_write_action_factory(self, test_step, cluster: str):
317461
except ParsingError:
318462
return None
319463

464+
def _wait_for_report_action_factory(self, test_step):
465+
try:
466+
return WaitForReportAction(test_step, self._context)
467+
except ParsingError:
468+
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
469+
# runner has matched parity of the codegen YAML test, this exception should be
470+
# propogated.
471+
return None
472+
320473
def encode(self, request) -> BaseAction:
321474
action = None
322475
cluster = request.cluster.replace(' ', '').replace('/', '')
@@ -328,7 +481,13 @@ def encode(self, request) -> BaseAction:
328481
elif command == 'readAttribute':
329482
action = self._attribute_read_action_factory(request, cluster)
330483
elif command == 'readEvent':
331-
action = self._event_read_action_factory(request, cluster)
484+
# TODO need to implement _event_read_action_factory
485+
# action = self._event_read_action_factory(request, cluster)
486+
pass
487+
elif command == 'subscribeAttribute':
488+
action = self._attribute_subscribe_action_factory(request, cluster)
489+
elif command == 'waitForReport':
490+
action = self._wait_for_report_action_factory(request)
332491
else:
333492
action = self._invoke_action_factory(request, cluster)
334493

@@ -386,3 +545,7 @@ def decode(self, result: _ActionResult):
386545

387546
def execute(self, action: BaseAction):
388547
return action.run_action(self._dev_ctrl)
548+
549+
def shutdown(self):
550+
for subscription in self._context.subscriptions:
551+
subscription.Shutdown()

0 commit comments

Comments
 (0)