Skip to content

Commit 5d1f1d6

Browse files
cletnicktehampsonrestyled-commits
authored
Initial Test Script for TC_MCORE_FS_1_1 (#34634)
* Initial Test Script for FS Top-Level * Resolve linter issues. * Update TC_MCORE_FS_1_1 to be testable * Restyled by autopep8 * Address PR comments * Restyled by isort --------- Co-authored-by: Terence Hampson <thampson@google.com> Co-authored-by: Restyled.io <commits@restyled.io>
1 parent f45fdb8 commit 5d1f1d6

File tree

2 files changed

+306
-0
lines changed

2 files changed

+306
-0
lines changed

src/python_testing/TC_MCORE_FS_1_1.py

+144
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
#
2+
# Copyright (c) 2024 Project CHIP Authors
3+
# All rights reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# This test requires a TH_SERVER application. Please specify with --string-arg th_server_app_path:<path_to_app>
19+
20+
import logging
21+
import os
22+
import random
23+
import signal
24+
import subprocess
25+
import time
26+
import uuid
27+
28+
import chip.clusters as Clusters
29+
from chip import ChipDeviceCtrl
30+
from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
31+
from mobly import asserts
32+
33+
34+
class TC_MCORE_FS_1_1(MatterBaseTest):
35+
36+
@async_test_body
37+
async def setup_class(self):
38+
super().setup_class()
39+
# TODO: confirm whether we can open processes like this on the TH
40+
app = self.matter_test_config.user_params.get("th_server_app_path", None)
41+
if not app:
42+
asserts.fail('This test requires a TH_SERVER app. Specify app path with --string-arg th_server_app_path:<path_to_app>')
43+
44+
self.kvs = f'kvs_{str(uuid.uuid4())}'
45+
self.port = 5543
46+
discriminator = random.randint(0, 4095)
47+
passcode = 20202021
48+
app_args = f'--secured-device-port {self.port} --discriminator {discriminator} --passcode {passcode} --KVS {self.kvs}'
49+
cmd = f'{app} {app_args}'
50+
# TODO: Determine if we want these logs cooked or pushed to somewhere else
51+
logging.info("Starting application to acts mock a server portion of TH_FSA")
52+
self.app_process = subprocess.Popen(cmd, bufsize=0, shell=True)
53+
logging.info("Started application to acts mock a server portion of TH_FSA")
54+
time.sleep(3)
55+
56+
logging.info("Commissioning from separate fabric")
57+
# Create a second controller on a new fabric to communicate to the server
58+
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
59+
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
60+
paa_path = str(self.matter_test_config.paa_trust_store_path)
61+
self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path)
62+
self.server_nodeid = 1111
63+
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.server_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator)
64+
logging.info("Commissioning TH_SERVER complete")
65+
66+
def teardown_class(self):
67+
logging.warning("Stopping app with SIGTERM")
68+
self.app_process.send_signal(signal.SIGTERM.value)
69+
self.app_process.wait()
70+
71+
os.remove(self.kvs)
72+
super().teardown_class()
73+
74+
def steps_TC_MCORE_FS_1_1(self) -> list[TestStep]:
75+
steps = [TestStep(1, "Enable Fabric Synchronization on DUT_FSA using the manufacturer specified mechanism.", is_commissioning=True),
76+
TestStep(2, "Commission DUT_FSA onto TH_FSA fabric."),
77+
TestStep(3, "Reverse Commision Commission TH_FSAs onto DUT_FSA fabric."),
78+
TestStep("3a", "TH_FSA sends RequestCommissioningApproval"),
79+
TestStep("3b", "TH_FSA sends CommissionNode"),
80+
TestStep("3c", "DUT_FSA commissions TH_FSA")]
81+
return steps
82+
83+
@async_test_body
84+
async def test_TC_MCORE_FS_1_1(self):
85+
self.is_ci = self.check_pics('PICS_SDK_CI_ONLY')
86+
# TODO this value should either be determined or passed in from command line
87+
dut_commissioning_control_endpoint = 0
88+
self.step(1)
89+
self.step(2)
90+
self.step(3)
91+
th_fsa_server_fabrics = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
92+
th_fsa_server_vid = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.VendorID, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
93+
th_fsa_server_pid = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.ProductID, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
94+
95+
event_path = [(dut_commissioning_control_endpoint, Clusters.CommissionerControl.Events.CommissioningRequestResult, 1)]
96+
events = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path)
97+
98+
self.step("3a")
99+
good_request_id = 0x1234567812345678
100+
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(
101+
requestId=good_request_id, vendorId=th_fsa_server_vid, productId=th_fsa_server_pid, label="Test Ecosystem")
102+
await self.send_single_cmd(cmd, endpoint=dut_commissioning_control_endpoint)
103+
104+
if not self.is_ci:
105+
self.wait_for_use_input("Approve Commissioning approval request on DUT using manufacturer specified mechanism")
106+
107+
if not events:
108+
new_event = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path)
109+
else:
110+
event_nums = [e.Header.EventNumber for e in events]
111+
new_event = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=event_path, eventNumberFilter=max(event_nums)+1)
112+
113+
asserts.assert_equal(len(new_event), 1, "Unexpected event list len")
114+
asserts.assert_equal(new_event[0].Data.statusCode, 0, "Unexpected status code")
115+
asserts.assert_equal(new_event[0].Data.clientNodeId,
116+
self.matter_test_config.controller_node_id, "Unexpected client node id")
117+
asserts.assert_equal(new_event[0].Data.requestId, good_request_id, "Unexpected request ID")
118+
119+
self.step("3b")
120+
cmd = Clusters.CommissionerControl.Commands.CommissionNode(requestId=good_request_id, responseTimeoutSeconds=30)
121+
resp = await self.send_single_cmd(cmd, endpoint=dut_commissioning_control_endpoint)
122+
asserts.assert_equal(type(resp), Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow,
123+
"Incorrect response type")
124+
125+
# min commissioning timeout is 3*60 seconds, so use that even though the command said 30.
126+
cmd = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(commissioningTimeout=3*60,
127+
PAKEPasscodeVerifier=resp.PAKEPasscodeVerifier,
128+
discriminator=resp.discriminator,
129+
iterations=resp.iterations, salt=resp.salt)
130+
await self.send_single_cmd(cmd, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0, timedRequestTimeoutMs=5000)
131+
132+
self.step("3c")
133+
if not self.is_ci:
134+
time.sleep(30)
135+
136+
th_fsa_server_fabrics_new = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0)
137+
# TODO: this should be mocked too.
138+
if not self.is_ci:
139+
asserts.assert_equal(len(th_fsa_server_fabrics) + 1, len(th_fsa_server_fabrics_new),
140+
"Unexpected number of fabrics on TH_SERVER")
141+
142+
143+
if __name__ == "__main__":
144+
default_matter_test_main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#!/usr/bin/env -S python3 -B
2+
#
3+
# Copyright (c) 2024 Project CHIP Authors
4+
# All rights reserved.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import base64
20+
import os
21+
import pathlib
22+
import sys
23+
import typing
24+
25+
import chip.clusters as Clusters
26+
import click
27+
from chip import ChipDeviceCtrl
28+
from chip.clusters import Attribute
29+
from chip.interaction_model import InteractionModelError, Status
30+
from MockTestRunner import AsyncMock, MockTestRunner
31+
32+
try:
33+
from matter_testing_support import MatterTestConfig, get_default_paa_trust_store, run_tests_no_exit
34+
except ImportError:
35+
sys.path.append(os.path.abspath(
36+
os.path.join(os.path.dirname(__file__), '..')))
37+
from matter_testing_support import MatterTestConfig, get_default_paa_trust_store, run_tests_no_exit
38+
39+
invoke_call_count = 0
40+
event_call_count = 0
41+
42+
43+
def dynamic_invoke_return(*args, **argv):
44+
''' Returns the response to a mocked SendCommand call.
45+
'''
46+
global invoke_call_count
47+
invoke_call_count += 1
48+
49+
# passcode 20202024
50+
reverse_open = Clusters.CommissionerControl.Commands.ReverseOpenCommissioningWindow(commissioningTimeout=30,
51+
PAKEPasscodeVerifier=b"+w1qZQR05Zn0bc2LDyNaDAhsrhDS5iRHPTN10+EmNx8E2OpIPC4SjWRDQVOgqcbnXdYMlpiZ168xLBqn1fx9659gGK/7f9Yc6GxpoJH8kwAUYAYyLGsYeEBt1kL6kpXjgA==",
52+
discriminator=2222, iterations=10000, salt=base64.b64encode(bytes('SaltyMcSalterson', 'utf-8')))
53+
54+
print(f'invoke call {invoke_call_count}')
55+
if invoke_call_count == 1: # Commission node with no prior request, return failure - step 5
56+
return None
57+
elif invoke_call_count == 2: # Commission node over pase - return unsupported access - step 7
58+
return reverse_open
59+
else:
60+
raise InteractionModelError(Status.Failure)
61+
62+
63+
def dynamic_event_return(*args, **argv):
64+
''' Returns the response to a mocked ReadEvent call.
65+
'''
66+
global event_call_count
67+
event_call_count += 1
68+
69+
if event_call_count == 1: # reading events, start empty - no events
70+
return []
71+
elif event_call_count == 2: # read event with filter - expect empty
72+
header = Attribute.EventHeader(EndpointId=0, ClusterId=Clusters.CommissionerControl.id,
73+
EventId=Clusters.CommissionerControl.Events.CommissioningRequestResult.event_id, EventNumber=1)
74+
data = Clusters.CommissionerControl.Events.CommissioningRequestResult(
75+
requestId=0x1234567812345678, clientNodeId=112233, statusCode=0)
76+
result = Attribute.EventReadResult(Header=header, Status=Status.Success, Data=data)
77+
return [result]
78+
else:
79+
raise InteractionModelError(Status.Failure)
80+
81+
82+
def wildcard() -> Attribute.AsyncReadTransaction.ReadResponse:
83+
''' Returns the response to a wildcard read.
84+
For this test, we just need descriptors and a few attributes
85+
Tree
86+
EP1 (Aggregator): Descriptor
87+
- EP2 (Bridged Node): Descriptor, Bridged Device Basic Information, Ecosystem Information
88+
'''
89+
cc = Clusters.CommissionerControl
90+
ei = Clusters.EcosystemInformation
91+
desc = Clusters.Descriptor
92+
bdbi = Clusters.BridgedDeviceBasicInformation
93+
94+
# EP1 is aggregator device type with a commissioner control cluster
95+
# children - EP2 type bridged node endpoint, ecosystem information, bridged device basic information. Should also have and admin commissioning, but I don't need it for this test.
96+
desc_ep1 = {desc.Attributes.PartsList: [2], desc.Attributes.ServerList: [
97+
cc.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x000E, revision=2)]}
98+
desc_ep2 = {desc.Attributes.ServerList: [bdbi.id, ei.id], desc.Attributes.DeviceTypeList: [
99+
desc.Structs.DeviceTypeStruct(deviceType=0x0013, revision=3)]}
100+
101+
# I'm not filling anything in here, because I don't care. I just care that the cluster exists.
102+
ei_attrs = {ei.Attributes.AttributeList: [ei.Attributes.DeviceDirectory.attribute_id,
103+
ei.Attributes.LocationDirectory.attribute_id], ei.Attributes.DeviceDirectory: [], ei.Attributes.LocationDirectory: []}
104+
105+
# This cluster just needs to exist, so I'm just going to throw on the mandatory items for now.
106+
bdbi_attrs = {bdbi.Attributes.AttributeList: [bdbi.Attributes.Reachable.attribute_id,
107+
bdbi.Attributes.UniqueID.attribute_id], bdbi.Attributes.Reachable: True, bdbi.Attributes.UniqueID: 'something'}
108+
109+
cc_attrs = {cc.Attributes.AttributeList: [cc.Attributes.SupportedDeviceCategories], cc.Attributes.AcceptedCommandList: [cc.Commands.RequestCommissioningApproval, cc.Commands.CommissionNode],
110+
cc.Attributes.GeneratedCommandList: [cc.Commands.RequestCommissioningApproval], cc.Attributes.SupportedDeviceCategories: 1}
111+
112+
resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {})
113+
resp.attributes = {1: {desc: desc_ep1, cc: cc_attrs}, 2: {desc: desc_ep2, ei: ei_attrs, bdbi: bdbi_attrs}}
114+
return resp
115+
116+
117+
class MyMock(MockTestRunner):
118+
def run_test_with_mock(self, dynamic_invoke_return: typing.Callable, dynamic_event_return: typing.Callable, read_cache: Attribute.AsyncReadTransaction.ReadResponse, hooks=None):
119+
''' Run the test using the Mocked versions of Read, SendCommand, OpenCommissioningWindow, FindOrEstablishPASESession and ReadEvent
120+
dynamic_invoke_return: Callable function that returns the result of a SendCommand call
121+
Function should return one of
122+
- command response for commands with responses
123+
- None for commands with success results
124+
- raise InteractionModelError for error results
125+
dynamic_event_return: Callable function that returns the result of a ReadEvent call
126+
Function should return one of
127+
- list of EventReadResult for successful reads
128+
- raise InteractionModelError for error results
129+
read_cache : Response to a Read call. For this test, this will be the wildcard read of all teh attributes
130+
hooks : Test harness hook object if desired.
131+
'''
132+
self.default_controller.Read = AsyncMock(return_value=read_cache)
133+
self.default_controller.SendCommand = AsyncMock(return_value=None, side_effect=dynamic_invoke_return)
134+
# It doesn't actually matter what we return here because I'm going to catch the next pase session connection anyway
135+
params = ChipDeviceCtrl.CommissioningParameters(setupPinCode=0, setupManualCode='', setupQRCode='')
136+
self.default_controller.OpenCommissioningWindow = AsyncMock(return_value=params)
137+
self.default_controller.FindOrEstablishPASESession = AsyncMock(return_value=None)
138+
self.default_controller.ReadEvent = AsyncMock(return_value=[], side_effect=dynamic_event_return)
139+
140+
return run_tests_no_exit(self.test_class, self.config, hooks, self.default_controller, self.stack)
141+
142+
143+
@click.command()
144+
@click.argument('th_server_app', type=click.Path(exists=True))
145+
def main(th_server_app: str):
146+
root = os.path.abspath(os.path.join(pathlib.Path(__file__).resolve().parent, '..', '..', '..'))
147+
print(f'root = {root}')
148+
paa_path = get_default_paa_trust_store(root)
149+
print(f'paa = {paa_path}')
150+
151+
pics = {"PICS_SDK_CI_ONLY": True}
152+
test_runner = MyMock('TC_MCORE_FS_1_1', 'TC_MCORE_FS_1_1', 'test_TC_MCORE_FS_1_1', 1, paa_trust_store_path=paa_path, pics=pics)
153+
config = MatterTestConfig()
154+
config.user_params = {'th_server_app_path': th_server_app}
155+
test_runner.set_test_config(config)
156+
157+
test_runner.run_test_with_mock(dynamic_invoke_return, dynamic_event_return, wildcard())
158+
test_runner.Shutdown()
159+
160+
161+
if __name__ == "__main__":
162+
sys.exit(main())

0 commit comments

Comments
 (0)