|
| 1 | +# |
| 2 | +# Copyright (c) 2024 Project CHIP Authors |
| 3 | +# All rights reserved. |
| 4 | +# |
| 5 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | +# you may not use this file except in compliance with the License. |
| 7 | +# You may obtain a copy of the License at |
| 8 | +# |
| 9 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +# |
| 11 | +# Unless required by applicable law or agreed to in writing, software |
| 12 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | +# See the License for the specific language governing permissions and |
| 15 | +# limitations under the License. |
| 16 | +# |
| 17 | + |
| 18 | +# TODO - this was copied/pasted from another test, it needs to be reviewed and updated |
| 19 | +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments |
| 20 | +# for details about the block below. |
| 21 | +# |
| 22 | +# === BEGIN CI TEST ARGUMENTS === |
| 23 | +# test-runner-runs: run1 |
| 24 | +# test-runner-run/run1/app: ${CHIP_RVC_APP} |
| 25 | +# test-runner-run/run1/factoryreset: True |
| 26 | +# test-runner-run/run1/quiet: True |
| 27 | +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json |
| 28 | +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS examples/rvc-app/rvc-common/pics/rvc-app-pics-values --endpoint 1 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto |
| 29 | +# === END CI TEST ARGUMENTS === |
| 30 | + |
| 31 | +import logging |
| 32 | +from time import sleep |
| 33 | + |
| 34 | +import chip.clusters as Clusters |
| 35 | +from chip.clusters.Types import NullValue |
| 36 | +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main |
| 37 | +from mobly import asserts |
| 38 | + |
| 39 | + |
| 40 | +class TC_SEAR_1_2(MatterBaseTest): |
| 41 | + def __init__(self, *args): |
| 42 | + super().__init__(*args) |
| 43 | + self.endpoint = None |
| 44 | + self.is_ci = False |
| 45 | + self.app_pipe = "/tmp/chip_rvc_fifo_" |
| 46 | + self.mapid_list = [] |
| 47 | + |
| 48 | + # this must be kept in sync with the definitions from the Common Landmark Semantic Tag Namespace |
| 49 | + self.MAX_LANDMARK_ID = 0x33 |
| 50 | + |
| 51 | + # this must be kept in sync with the definitions from the Common Relative Position Semantic Tag Namespace |
| 52 | + self.MAX_RELPOS_ID = 0x07 |
| 53 | + |
| 54 | + async def read_sear_attribute_expect_success(self, endpoint, attribute): |
| 55 | + cluster = Clusters.Objects.ServiceArea |
| 56 | + return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) |
| 57 | + |
| 58 | + async def read_and_validate_supported_maps(self, step): |
| 59 | + self.print_step(step, "Read SupportedMaps attribute") |
| 60 | + supported_maps = await self.read_sear_attribute_expect_success( |
| 61 | + endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.SupportedMaps) |
| 62 | + logging.info("SupportedMaps: %s" % (supported_maps)) |
| 63 | + asserts.assert_less_equal(len(supported_maps), 255, |
| 64 | + "SupportedMaps should have max 255 entries") |
| 65 | + |
| 66 | + mapid_list = [m.mapID for m in supported_maps] |
| 67 | + asserts.assert_true(len(set(mapid_list)) == len(mapid_list), "SupportedMaps must have unique MapID values!") |
| 68 | + |
| 69 | + name_list = [m.name for m in supported_maps] |
| 70 | + asserts.assert_true(len(set(name_list)) == len(name_list), "SupportedMaps must have unique Name values!") |
| 71 | + |
| 72 | + # save so other methods can use this if neeeded |
| 73 | + self.mapid_list = mapid_list |
| 74 | + |
| 75 | + async def read_and_validate_supported_areas(self, step): |
| 76 | + self.print_step(step, "Read SupportedAreas attribute") |
| 77 | + supported_areas = await self.read_sear_attribute_expect_success( |
| 78 | + endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.SupportedAreas) |
| 79 | + logging.info("SupportedAreas: %s" % (supported_areas)) |
| 80 | + asserts.assert_less_equal(len(supported_areas), 255, |
| 81 | + "SupportedAreas should have max 255 entries") |
| 82 | + areaid_list = [] |
| 83 | + areadesc_s = set() |
| 84 | + for a in supported_areas: |
| 85 | + asserts.assert_true(a.areaID not in areaid_list, "SupportedAreas must have unique AreaID values!") |
| 86 | + |
| 87 | + areaid_list.append(a.areaID) |
| 88 | + |
| 89 | + if len(self.mapid_list) > 0: |
| 90 | + asserts.assert_is_not(a.mapID, NullValue, |
| 91 | + f"SupportedAreas entry with AreaID({a.areaID}) should not have null MapID") |
| 92 | + asserts.assert_is(a.mapID in self.mapid_list, |
| 93 | + f"SupportedAreas entry with AreaID({a.areaID}) has unknown MapID({a.mapID})") |
| 94 | + k = f"mapID:{a.mapID} areaDesc:{a.areaDesc}" |
| 95 | + asserts.assert_true(k not in areadesc_s, |
| 96 | + f"SupportedAreas must have unique MapID({a.mapID}) + AreaDesc({a.areaDesc}) values!") |
| 97 | + areadesc_s.add(k) |
| 98 | + else: |
| 99 | + # empty SupportedMaps |
| 100 | + asserts.assert_is(a.mapID, NullValue, |
| 101 | + f"SupportedAreas entry with AreaID({a.areaID}) should have null MapID") |
| 102 | + k = f"areaDesc:{a.areaDesc}" |
| 103 | + asserts.assert_true(k not in areadesc_s, f"SupportedAreas must have unique AreaDesc({a.areaDesc}) values!") |
| 104 | + areadesc_s.add(k) |
| 105 | + |
| 106 | + if a.locationInfo is NullValue and a.landmarkTag is NullValue: |
| 107 | + asserts.assert_true( |
| 108 | + f"SupportedAreas entry with AreaID({a.areaID}) should not have null LocationInfo and null LandmarkTag") |
| 109 | + if a.landmarkTag is not NullValue: |
| 110 | + asserts.assert_true(a.landmarkTag <= self.MAX_LANDMARK_ID, |
| 111 | + f"SupportedAreas entry with AreaID({a.areaID}) has invalid LandmarkTag({a.landmarkTag})") |
| 112 | + asserts.assert_true(a.positionTag is NullValue or a.positionTag in range(0, self.MAX_RELPOS_ID), |
| 113 | + f"SupportedAreas entry with AreaID({a.areaID}) has invalid PositionTag({a.positionTag})") |
| 114 | + # save so other methods can use this if neeeded |
| 115 | + self.areaid_list = areaid_list |
| 116 | + |
| 117 | + async def read_and_validate_selected_areas(self, step): |
| 118 | + self.print_step(step, "Read SelectedAreas attribute") |
| 119 | + selected_areas = await self.read_sear_attribute_expect_success( |
| 120 | + endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.SelectedAreas) |
| 121 | + logging.info(f"SelectedAreas {selected_areas}") |
| 122 | + |
| 123 | + # TODO how to check if all entries are uint32? |
| 124 | + |
| 125 | + asserts.assert_true(len(selected_areas) <= len(self.areaid_list), |
| 126 | + f"SelectedAreas(len {len(selected_areas)}) should have at most {len(self.areaid_list)} entries") |
| 127 | + |
| 128 | + asserts.assert_true(len(set(selected_areas)) == len(selected_areas), "SelectedAreas must have unique AreaID values!") |
| 129 | + |
| 130 | + for a in selected_areas: |
| 131 | + asserts.assert_true(a in self.areaid_list, |
| 132 | + f"SelectedAreas entry {a} has invalid value") |
| 133 | + # save so other methods can use this if neeeded |
| 134 | + self.selareaid_list = selected_areas |
| 135 | + |
| 136 | + async def read_and_validate_current_area(self, step): |
| 137 | + self.print_step(step, "Read CurrentArea attribute") |
| 138 | + current_area = await self.read_sear_attribute_expect_success( |
| 139 | + endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.CurrentArea) |
| 140 | + logging.info(f"CurrentArea {current_area}") |
| 141 | + |
| 142 | + asserts.assert_true((len(self.selareaid_list) == 0 and current_area is NullValue) |
| 143 | + or |
| 144 | + current_area in self.selareaid_list, |
| 145 | + f"CurrentArea {current_area} is invalid. SelectedAreas is {self.selareaid_list}.") |
| 146 | + # save so other methods can use this if neeeded |
| 147 | + self.current_area = current_area |
| 148 | + |
| 149 | + async def read_and_validate_estimated_end_time(self, step): |
| 150 | + import time |
| 151 | + read_time = int(time.time()) |
| 152 | + self.print_step(step, "Read EstimatedEndTime attribute") |
| 153 | + estimated_end_time = await self.read_sear_attribute_expect_success( |
| 154 | + endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.EstimatedEndTime) |
| 155 | + logging.info(f"EstimatedEndTime {estimated_end_time}") |
| 156 | + |
| 157 | + if self.current_area is NullValue: |
| 158 | + asserts.assert_true(estimated_end_time is NullValue, |
| 159 | + "EstimatedEndTime should be null if CurrentArea is null.") |
| 160 | + else: |
| 161 | + # allow for some clock skew |
| 162 | + asserts.assert_true(estimated_end_time >= read_time - 3*60, |
| 163 | + f"EstimatedEndTime({estimated_end_time}) should be greater than the time when it was read({read_time})") |
| 164 | + |
| 165 | + async def read_and_validate_progress(self, step): |
| 166 | + self.print_step(step, "Read Progress attribute") |
| 167 | + progress = await self.read_sear_attribute_expect_success( |
| 168 | + endpoint=self.endpoint, attribute=Clusters.ServiceArea.Attributes.Progress) |
| 169 | + logging.info(f"Progress {progress}") |
| 170 | + |
| 171 | + asserts.assert_true(len(progress) <= len(self.areaid_list), |
| 172 | + f"Progress(len {len(progress)}) should have at most {len(self.areaid_list)} entries") |
| 173 | + |
| 174 | + progareaid_list = [] |
| 175 | + for p in progress: |
| 176 | + if p.areaID in progareaid_list: |
| 177 | + asserts.fail("Progress must have unique AreaID values!") |
| 178 | + else: |
| 179 | + progareaid_list.append(p.areaID) |
| 180 | + asserts.assert_true(p.areaID in self.areaid_list, |
| 181 | + f"Progress entry has invalid AreaID value ({p.areaID})") |
| 182 | + asserts.assert_true(p.status in (Clusters.ServiceArea.OperationalStatusEnum.kPending, |
| 183 | + Clusters.ServiceArea.OperationalStatusEnum.kOperating, |
| 184 | + Clusters.ServiceArea.OperationalStatusEnum.kSkipped, |
| 185 | + Clusters.ServiceArea.OperationalStatusEnum.kCompleted), |
| 186 | + f"Progress entry has invalid Status value ({p.status})") |
| 187 | + if p.status not in (Clusters.ServiceArea.OperationalStatusEnum.kSkipped, Clusters.ServiceArea.OperationalStatusEnum.kCompleted): |
| 188 | + asserts.assert_true(p.totalOperationalTime is NullValue, |
| 189 | + f"Progress entry should have a null TotalOperationalTime value (Status is {p.status})") |
| 190 | + # TODO how to check that InitialTimeEstimate is either null or uint32? |
| 191 | + |
| 192 | + # Sends and out-of-band command to the rvc-app |
| 193 | + def write_to_app_pipe(self, command): |
| 194 | + with open(self.app_pipe, "w") as app_pipe: |
| 195 | + app_pipe.write(command + "\n") |
| 196 | + # Allow some time for the command to take effect. |
| 197 | + # This removes the test flakyness which is very annoying for everyone in CI. |
| 198 | + sleep(0.001) |
| 199 | + |
| 200 | + def TC_SEAR_1_2(self) -> list[str]: |
| 201 | + return ["SEAR.S"] |
| 202 | + |
| 203 | + @async_test_body |
| 204 | + async def test_TC_SEAR_1_2(self): |
| 205 | + self.endpoint = self.matter_test_config.endpoint |
| 206 | + asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.") |
| 207 | + self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") |
| 208 | + if self.is_ci: |
| 209 | + app_pid = self.matter_test_config.app_pid |
| 210 | + if app_pid == 0: |
| 211 | + asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set") |
| 212 | + self.app_pipe = self.app_pipe + str(app_pid) |
| 213 | + |
| 214 | + self.print_step(1, "Commissioning, already done") |
| 215 | + |
| 216 | + # Ensure that the device is in the correct state |
| 217 | + if self.is_ci: |
| 218 | + self.write_to_app_pipe('{"Name": "Reset"}') |
| 219 | + |
| 220 | + if self.check_pics("SEAR.S.F02"): |
| 221 | + await self.read_and_validate_supported_maps(step=2) |
| 222 | + |
| 223 | + await self.read_and_validate_supported_areas(step=3) |
| 224 | + |
| 225 | + await self.read_and_validate_selected_areas(step=4) |
| 226 | + |
| 227 | + if self.check_pics("SEAR.S.A0003"): |
| 228 | + await self.read_and_validate_current_area(step=5) |
| 229 | + |
| 230 | + if self.check_pics("SEAR.S.A0004"): |
| 231 | + await self.read_and_validate_estimated_end_time(step=6) |
| 232 | + |
| 233 | + if self.check_pics("SEAR.S.A0005"): |
| 234 | + await self.read_and_validate_progress(step=7) |
| 235 | + |
| 236 | + if self.check_pics("SEAR.S.F02") and self.check_pics("SEAR.S.M.REMOVE_MAP"): |
| 237 | + test_step = "Manually ensure the SupportedMaps attribute is not empty and that the device is not operating" |
| 238 | + self.print_step("8", test_step) |
| 239 | + if not self.is_ci: |
| 240 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 241 | + |
| 242 | + await self.read_and_validate_supported_maps(step=9) |
| 243 | + old_supported_maps = self.mapid_list |
| 244 | + |
| 245 | + test_step = "Manually intervene to remove one or more entries in the SupportedMaps list" |
| 246 | + self.print_step("10", test_step) |
| 247 | + if not self.is_ci: |
| 248 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 249 | + |
| 250 | + await self.read_and_validate_supported_maps(step=11) |
| 251 | + new_supported_maps = self.mapid_list |
| 252 | + asserts.assert_true(len(old_supported_maps) > len(new_supported_maps), "Failed to remove map(s)") |
| 253 | + |
| 254 | + # NOTE the following operations are all part of step 11 - read all these attributes and check the data consistency |
| 255 | + # after removing map(s) |
| 256 | + await self.read_and_validate_supported_areas(step=11) |
| 257 | + |
| 258 | + await self.read_and_validate_selected_areas(step=11) |
| 259 | + |
| 260 | + if self.check_pics("SEAR.S.A0003"): |
| 261 | + await self.read_and_validate_current_area(step=11) |
| 262 | + |
| 263 | + if self.check_pics("SEAR.S.A0004"): |
| 264 | + await self.read_and_validate_estimated_end_time(step=11) |
| 265 | + |
| 266 | + if self.check_pics("SEAR.S.A0005"): |
| 267 | + await self.read_and_validate_progress(step=11) |
| 268 | + |
| 269 | + if self.check_pics("SEAR.S.F02") and self.check_pics("SEAR.S.M.ADD_MAP"): |
| 270 | + test_step = "Manually ensure the SupportedMaps attribute has less than 255 entries and that the device is not operating" |
| 271 | + self.print_step("12", test_step) |
| 272 | + if not self.is_ci: |
| 273 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 274 | + |
| 275 | + await self.read_and_validate_supported_maps(step=13) |
| 276 | + old_supported_maps = self.mapid_list |
| 277 | + |
| 278 | + test_step = "Manually intervene to add one or more entries to the SupportedMaps list" |
| 279 | + self.print_step("14", test_step) |
| 280 | + if not self.is_ci: |
| 281 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 282 | + |
| 283 | + await self.read_and_validate_supported_maps(step=15) |
| 284 | + new_supported_maps = self.mapid_list |
| 285 | + asserts.assert_true(len(old_supported_maps) < len(new_supported_maps), "Failed to add map(s)") |
| 286 | + |
| 287 | + # NOTE the following operations are all part of step 15 - read all these attributes and check the data consistency |
| 288 | + # after adding map(s) |
| 289 | + await self.read_and_validate_supported_areas(step=15) |
| 290 | + |
| 291 | + await self.read_and_validate_selected_areas(step=15) |
| 292 | + |
| 293 | + if self.check_pics("SEAR.S.A0003"): |
| 294 | + await self.read_and_validate_current_area(step=15) |
| 295 | + |
| 296 | + if self.check_pics("SEAR.S.A0004"): |
| 297 | + await self.read_and_validate_estimated_end_time(step=15) |
| 298 | + |
| 299 | + if self.check_pics("SEAR.S.A0005"): |
| 300 | + await self.read_and_validate_progress(step=15) |
| 301 | + |
| 302 | + if self.check_pics("SEAR.S.M.REMOVE_AREA"): |
| 303 | + test_step = "Manually ensure the SupportedAreas attribute is not empty and that the device is not operating" |
| 304 | + self.print_step("16", test_step) |
| 305 | + if not self.is_ci: |
| 306 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 307 | + |
| 308 | + await self.read_and_validate_supported_areas(step=17) |
| 309 | + old_supported_areas = self.areaid_list |
| 310 | + |
| 311 | + test_step = "Manually intervene to remove one or more entries from the SupportedAreas list" |
| 312 | + self.print_step("18", test_step) |
| 313 | + if not self.is_ci: |
| 314 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 315 | + |
| 316 | + await self.read_and_validate_supported_areas(step=19) |
| 317 | + new_supported_areas = self.areaid_list |
| 318 | + asserts.assert_true(len(old_supported_areas) > len(new_supported_areas), "Failed to remove area(s)") |
| 319 | + |
| 320 | + # NOTE the following operations are all part of step 19 - read all these attributes and check the data consistency |
| 321 | + # after removing areas(s) |
| 322 | + |
| 323 | + await self.read_and_validate_selected_areas(step=19) |
| 324 | + |
| 325 | + if self.check_pics("SEAR.S.A0003"): |
| 326 | + await self.read_and_validate_current_area(step=19) |
| 327 | + |
| 328 | + if self.check_pics("SEAR.S.A0004"): |
| 329 | + await self.read_and_validate_estimated_end_time(step=19) |
| 330 | + |
| 331 | + if self.check_pics("SEAR.S.A0005"): |
| 332 | + await self.read_and_validate_progress(step=19) |
| 333 | + |
| 334 | + if self.check_pics("SEAR.S.M.ADD_AREA"): |
| 335 | + test_step = "Manually ensure the SupportedAreas attribute has less than 255 entries and that the device is not operating" |
| 336 | + self.print_step("20", test_step) |
| 337 | + if not self.is_ci: |
| 338 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 339 | + |
| 340 | + await self.read_and_validate_supported_areas(step=21) |
| 341 | + old_supported_areas = self.areaid_list |
| 342 | + |
| 343 | + test_step = "Manually intervene to add one or more entries to the SupportedAreas list" |
| 344 | + self.print_step("22", test_step) |
| 345 | + if not self.is_ci: |
| 346 | + self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") |
| 347 | + |
| 348 | + await self.read_and_validate_supported_areas(step=23) |
| 349 | + new_supported_areas = self.areaid_list |
| 350 | + asserts.assert_true(len(old_supported_areas) < len(new_supported_areas), "Failed to add area(s)") |
| 351 | + |
| 352 | + # NOTE the following operations are all part of step 23 - read all these attributes and check the data consistency |
| 353 | + # after removing areas(s) |
| 354 | + |
| 355 | + await self.read_and_validate_selected_areas(step=23) |
| 356 | + |
| 357 | + if self.check_pics("SEAR.S.A0003"): |
| 358 | + await self.read_and_validate_current_area(step=23) |
| 359 | + |
| 360 | + if self.check_pics("SEAR.S.A0004"): |
| 361 | + await self.read_and_validate_estimated_end_time(step=23) |
| 362 | + |
| 363 | + if self.check_pics("SEAR.S.A0005"): |
| 364 | + await self.read_and_validate_progress(step=23) |
| 365 | + |
| 366 | + |
| 367 | +if __name__ == "__main__": |
| 368 | + default_matter_test_main() |
0 commit comments