forked from project-chip/connectedhomeip
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
248 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
# | ||
# Copyright (c) 2024 Project CHIP Authors | ||
# All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments | ||
# for details about the block below. | ||
# | ||
# TODO: Skip CI for now, we don't have any way to run this. Needs setup. See test_TC_CCTRL.py | ||
|
||
import ipaddress | ||
import logging | ||
import os | ||
import signal | ||
import subprocess | ||
import uuid | ||
import random | ||
import pathlib | ||
import time | ||
|
||
import chip.clusters as Clusters | ||
import chip.exceptions | ||
from matter_testing_support import MatterBaseTest, default_matter_test_main, per_endpoint_test, has_cluster, async_test_body | ||
from mobly import asserts | ||
from chip.interaction_model import InteractionModelError, Status | ||
|
||
# isort: off | ||
|
||
from chip import ChipDeviceCtrl # Needed before chip.FabricAdmin | ||
import chip.CertificateAuthority | ||
from chip.ChipDeviceCtrl import CommissioningParameters | ||
|
||
# isort: on | ||
|
||
|
||
class TC_CCTRL(MatterBaseTest): | ||
|
||
@async_test_body | ||
async def setup_class(self): | ||
super().setup_class() | ||
# TODO: This needs to come from an arg and needs to be something available on the TH | ||
# TODO: confirm whether we can open processes like this on the TH | ||
app = os.path.join(pathlib.Path(__file__).resolve().parent, '..','..','out', 'linux-x64-all-clusters-no-ble', 'chip-all-clusters-app') | ||
|
||
self.kvs = f'kvs_{str(uuid.uuid4())}' | ||
self.port = 5543 | ||
discriminator = random.randint(0, 4095) | ||
discriminator = 3840 | ||
passcode = 20202021 | ||
app_args = f'--secured-device-port {self.port} --discriminator {discriminator} --passcode {passcode} --KVS {self.kvs}' | ||
cmd = f'{app} {app_args}' | ||
# TODO: Determine if we want these logs cooked or pushed to somewhere else | ||
logging.info("Starting TH_SERVER") | ||
self.app_process = subprocess.Popen(cmd, bufsize=0, shell=True) | ||
logging.info("TH_SERVER started") | ||
time.sleep(3) | ||
|
||
logging.info("Commissioning from separate fabric") | ||
|
||
|
||
# Create a second controller on a new fabric to communicate to the server | ||
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() | ||
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2) | ||
paa_path = str(self.matter_test_config.paa_trust_store_path) | ||
print(f"paa_path = {paa_path} ------------------------------------------------") | ||
self.TH_server_controller = new_fabric_admin.NewController(nodeId=112233, paaTrustStorePath=paa_path) | ||
self.server_nodeid = 1111 | ||
await self.TH_server_controller.CommissionOnNetwork(nodeId=self.server_nodeid, setupPinCode=passcode, filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=discriminator) | ||
logging.info("Commissioning TH_SERVER complete") | ||
|
||
def teardown_class(self): | ||
logging.warning("Stopping app with SIGTERM") | ||
self.app_process.send_signal(signal.SIGTERM.value) | ||
test_app_exit_code = self.app_process.wait() | ||
# TODO: Use timeout, if term doesn't work, try SIGINT | ||
|
||
os.remove(self.kvs) | ||
super().teardown_class() | ||
|
||
|
||
#@per_endpoint_test(has_cluster(Clusters.CommissionerControl)) | ||
@async_test_body | ||
async def test_TC_CCTRL_3_1(self): | ||
th_server_fabrics = await self.read_single_attribute_check_success(cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.Fabrics, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0) | ||
th_server_vid = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.VendorID, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0) | ||
th_server_pid = await self.read_single_attribute_check_success(cluster=Clusters.BasicInformation, attribute=Clusters.BasicInformation.Attributes.ProductID, dev_ctrl=self.TH_server_controller, node_id=self.server_nodeid, endpoint=0) | ||
|
||
# TODO: Read event, not yet implemented in mock | ||
|
||
ipaddr = ipaddress.IPv6Address('::1') | ||
cmd = Clusters.CommissionerControl.Commands.CommissionNode(requestId=1, responseTimeoutSeconds=30, ipAddress=ipaddr.packed, port=self.port) | ||
try: | ||
await self.send_single_cmd(cmd) | ||
asserts.fail("Unexpected success on CommissionNode") | ||
except InteractionModelError as e: | ||
asserts.assert_equal(e.status, Status.Failure, "Incorrect error returned") | ||
|
||
params = await self.openCommissioningWindow(dev_ctrl=self.default_controller, node_id=self.dut_node_id) | ||
|
||
pase_nodeid = self.dut_node_id + 1 | ||
await self.default_controller.FindOrEstablishPASESession(setupCode=params.commissioningParameters.setupQRCode, nodeid=pase_nodeid) | ||
try: | ||
await self.send_single_cmd(cmd=cmd, node_id=pase_nodeid) | ||
asserts.fail("Unexpected success on CommissionNode") | ||
except InteractionModelError as e: | ||
asserts.assert_equal(e.status, Status.UnsupportedAccess, "Incorrect error returned") | ||
|
||
good_request_id = 0x1234567887654321 | ||
cmd = Clusters.CommissionerControl.Commands.RequestCommissioningApproval(requestId=good_request_id, vendorId=th_server_vid, productId=th_server_pid) | ||
try: | ||
await self.send_single_cmd(cmd=cmd, node_id=pase_nodeid) | ||
asserts.fail("Unexpected success on CommissionNode") | ||
except InteractionModelError as e: | ||
asserts.assert_equal(e.status, Status.UnsupportedAccess, "Incorrect error returned") | ||
|
||
|
||
# TODO: read event - need to implement in the mock | ||
|
||
# If no exception is raised, this is success | ||
await self.send_single_cmd(cmd) | ||
|
||
if __name__ == "__main__": | ||
default_matter_test_main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
#!/usr/bin/env -S python3 -B | ||
# | ||
# Copyright (c) 2024 Project CHIP Authors | ||
# All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
import os | ||
import sys | ||
import pathlib | ||
import typing | ||
|
||
import chip.clusters as Clusters | ||
from chip import ChipDeviceCtrl | ||
from chip.clusters import Attribute | ||
from MockTestRunner import MockTestRunner, AsyncMock | ||
from chip.interaction_model import InteractionModelError, Status | ||
|
||
|
||
try: | ||
from matter_testing_support import get_default_paa_trust_store, run_tests_no_exit | ||
except ImportError: | ||
sys.path.append(os.path.abspath( | ||
os.path.join(os.path.dirname(__file__), '..'))) | ||
from matter_testing_support import get_default_paa_trust_store, run_tests_no_exit | ||
|
||
call_count = 0 | ||
|
||
def dynamic_return(*args, **argv): | ||
print("using Mock invoke") | ||
global call_count | ||
call_count += 1 | ||
|
||
if call_count == 1: # Commission node with no prior request, return failure | ||
raise InteractionModelError(status=Status.Failure) | ||
elif call_count == 2: # Commission node over pase - return unsupported access | ||
raise InteractionModelError(status=Status.UnsupportedAccess) | ||
elif call_count == 3: # request commissioning approval over pase - return unsupported access | ||
raise InteractionModelError(status=Status.UnsupportedAccess) | ||
elif call_count == 4: # good RequestCommissioningApproval over CASE | ||
return None | ||
else: | ||
raise InteractionModelError(Status.Failure) | ||
|
||
def wildcard() -> Attribute.AsyncReadTransaction.ReadResponse: | ||
cc = Clusters.CommissionerControl | ||
ei = Clusters.EcosystemInformation | ||
desc = Clusters.Descriptor | ||
bdbi = Clusters.BridgedDeviceBasicInformation | ||
|
||
# EP1 is aggregator device type with a commissioner control cluster | ||
# children - EP2 type bridged node endpoint, ecosystem information, bridged device basic information. Should also have and admin commissioning, but I don't need it for this test. | ||
desc_ep1 = {desc.Attributes.PartsList: [2], desc.Attributes.ServerList: [cc.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x000E, revision=2)]} | ||
desc_ep2 = {desc.Attributes.ServerList: [bdbi.id, ei.id], desc.Attributes.DeviceTypeList: [desc.Structs.DeviceTypeStruct(deviceType=0x0013, revision=3)]} | ||
|
||
# I'm not filling anything in here, because I don't care. I just care that the cluster exists. | ||
ei_attrs = {ei.Attributes.AttributeList:[ei.Attributes.DeviceDirectory.attribute_id, ei.Attributes.LocationDirectory.attribute_id], ei.Attributes.DeviceDirectory:[], ei.Attributes.LocationDirectory:[]} | ||
|
||
# This cluster just needs to exist, so I'm just going to throw on the mandatory items for now. | ||
bdbi_attrs = {bdbi.Attributes.AttributeList:[bdbi.Attributes.Reachable.attribute_id, bdbi.Attributes.UniqueID.attribute_id], bdbi.Attributes.Reachable:True, bdbi.Attributes.UniqueID:'something'} | ||
|
||
cc_attrs = {cc.Attributes.AttributeList:[cc.Attributes.SupportedDeviceCategories], cc.Attributes.AcceptedCommandList:[cc.Commands.RequestCommissioningApproval, cc.Commands.CommissionNode], | ||
cc.Attributes.GeneratedCommandList:[cc.Commands.RequestCommissioningApproval], cc.Attributes.SupportedDeviceCategories:1} | ||
|
||
resp = Attribute.AsyncReadTransaction.ReadResponse({}, [], {}) | ||
resp.attributes = {1: {desc: desc_ep1, cc:cc_attrs}, 2:{desc:desc_ep2, ei:ei_attrs, bdbi:bdbi_attrs}} | ||
return resp | ||
|
||
class MyMock(MockTestRunner): | ||
# TODO consolidate with above | ||
def run_test_with_mock(self, dynamic_invoke_return: typing.Callable, read_cache: Attribute.AsyncReadTransaction.ReadResponse, hooks=None): | ||
''' Effects is a list of callable functions with *args, **kwargs parameters. It can either throw an InteractionModelException or return the command response.''' | ||
self.default_controller.Read = AsyncMock(return_value=read_cache) | ||
self.default_controller.SendCommand = AsyncMock(return_value=None, side_effect=dynamic_invoke_return) | ||
# It doesn't actually matter what we return here because I'm going to catch the next pase session connection anyway | ||
params = ChipDeviceCtrl.CommissioningParameters(setupPinCode=0, setupManualCode='', setupQRCode='') | ||
self.default_controller.OpenCommissioningWindow = AsyncMock(return_value=params) | ||
self.default_controller.FindOrEstablishPASESession = AsyncMock(return_value=None) | ||
|
||
return run_tests_no_exit(self.test_class, self.config, hooks, self.default_controller, self.stack) | ||
|
||
def main(): | ||
root = os.path.abspath(os.path.join(pathlib.Path(__file__).resolve().parent, '..','..','..')) | ||
print(f'root = {root}') | ||
paa_path = get_default_paa_trust_store(root) | ||
print(f'paa = {paa_path}') | ||
|
||
test_runner = MyMock('TC_CCTRL', 'TC_CCTRL', 'test_TC_CCTRL_3_1', 1, paa_trust_store_path=paa_path) | ||
|
||
test_runner.run_test_with_mock(dynamic_return, wildcard()) | ||
test_runner.Shutdown() | ||
|
||
if __name__ == "__main__": | ||
sys.exit(main()) |