diff --git a/README.md b/README.md index ffc1f5f..0156b82 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,14 @@ ## Overview -Kuma ingress watcher is a kubernetes controller designed to automatically monitor `Kubernetes Ingress` and `Traefik Ingressroutes` in a Kubernetes cluster and create corresponding monitors in `Uptime Kuma`. It provides seamless integration between Kubernetes ingress resources and Uptime Kuma monitoring, allowing for easy and efficient monitoring of web services deployed on Kubernetes. +Kuma ingress watcher is a Kubernetes controller designed to automatically monitor `Kubernetes Ingress` and `Traefik Ingressroutes` in a Kubernetes cluster and create corresponding monitors in `Uptime Kuma`. It provides seamless integration between Kubernetes ingress resources and Uptime Kuma monitoring, allowing for easy and efficient monitoring of web services deployed on Kubernetes. ## Features -- Automatically creates, updates and deletes monitors in Uptime Kuma for `Kubernetes Ingress` and `Traefik Ingressroutes`. +- Automatically creates, updates, and deletes monitors in Uptime Kuma for `Kubernetes Ingress` and `Traefik Ingressroutes`. - Supports both single and multiple routes per Ingress resource. -- Customizable monitors by annotate Ingressroutes and Ingress. +- Customizable monitors by annotating Ingressroutes and Ingress. +- **File-based Monitor Configuration**: Define static monitors using a YAML file. ## Installation @@ -18,7 +19,6 @@ Kuma ingress watcher is a kubernetes controller designed to automatically monito - Kubernetes cluster - Uptime Kuma instance - ## Configuration ### Environment variables @@ -31,10 +31,37 @@ Before running the controller, make sure to configure the following environment - `WATCH_INGRESS`: Set to `True` to enable monitoring of Kubernetes Ingress resources. - `WATCH_INTERVAL`: Interval in seconds between each check for changes in Ingress or IngressRoutes (default is `10` seconds). - `USE_TRAEFIK_V3_CRD_GROUP`: Whether to use Traefik V3 API CRD group (`traefik.io`); default to `False`. +- `ENABLE_FILE_MONITOR`: Set to `True` to enable file-based monitor configuration. +- `FILE_MONITOR_PATH`: The path to the YAML file containing monitor definitions (default `/etc/kuma-controller/monitors.yaml`). + +### File-Based Monitor Configuration + +If `ENABLE_FILE_MONITOR` is set to `True`, the controller will load monitor definitions from the YAML file specified by `FILE_MONITOR_PATH`. + +#### Example YAML File + +```yaml +monitors: + - name: example-monitor + url: https://example.com + type: http + interval: 60 + - name: another-monitor + url: https://another.com + type: tcp + interval: 120 +``` + +#### Validation Rules + +- Each monitor entry must include `name` and `url` fields. +- Optional fields: `type` (default: `http`), `interval` (default: `60`), `headers` (default `{}`). + +The controller will create or update these monitors in Uptime Kuma upon startup. ### Annotations for Uptime Kuma Autodiscovery -These annotations apply to both Kubernetes Ingress and Traefik Ingressroutes resources, allowing you to customize the behavior of Uptime Kuma monitors for each.: +These annotations apply to both Kubernetes Ingress and Traefik Ingressroutes resources, allowing you to customize the behavior of Uptime Kuma monitors for each: 1. **`uptime-kuma.autodiscovery.probe.interval`** - Sets the probing interval in seconds for the monitor. @@ -67,15 +94,15 @@ These annotations apply to both Kubernetes Ingress and Traefik Ingressroutes res - **Example:** `uptime-kuma.autodiscovery.probe.headers: {"Authorization": "Bearer token"}` 6. **`uptime-kuma.autodiscovery.probe.host`** - - force the host for the probe. **WARNING: Be carefull with this paramter if you are using multiple hosts in a same Ingress object** + - Force the host for the probe. **WARNING: Be careful with this parameter if you are using multiple hosts in the same Ingress object.** - **Type:** String - - **Default:** `null` (host grep from ingress) + - **Default:** `null` (host grabbed from ingress) - **Example:** `uptime-kuma.autodiscovery.probe.host: example.com` 7. **`uptime-kuma.autodiscovery.probe.path`** - - url sub path for the probe. + - URL sub-path for the probe. - **Type:** String - - **Default:** `null` + - **Default:** `null` - **Example:** `uptime-kuma.autodiscovery.probe.path: /tintin` 8. **`uptime-kuma.autodiscovery.probe.port`** @@ -95,10 +122,10 @@ These annotations apply to both Kubernetes Ingress and Traefik Ingressroutes res Here's an example of annotations configured in a Kubernetes Ingress Resource: ```yaml -apiVersion: traefik.containo.us/v1alpha1 -kind: IngressRoute +apiVersion: networking.k8s.io/v1 +kind: Ingress metadata: - name: example-ingressroute + name: example-ingress namespace: my-namespace annotations: uptime-kuma.autodiscovery.probe.interval: 120 @@ -111,14 +138,15 @@ metadata: uptime-kuma.autodiscovery.probe.port: 8080 uptime-kuma.autodiscovery.probe.method: GET spec: - # Your Ingress route specification here +# Your Ingress route specification here ``` - ## Usage Once the controller is running, it will automatically monitor any changes to Ingress resources in your Kubernetes cluster and create/update corresponding monitors in Uptime Kuma. Simply deploy your applications using Kubernetes Ingress, and the controller will take care of the rest! +If `ENABLE_FILE_MONITOR` is enabled, the controller will also create monitors defined in the specified YAML file. + ## Important Notes ### Tag Addition Limitation @@ -127,17 +155,12 @@ Currently, the addition of tags to monitors is not supported due to limitations ### Custom Watcher for IngressRoutes -The Kubernetes event watcher (`watch`) does not provide specific details on creation, modification, or deletion events for IngressRoutes. To overcome this limitation, this controller implements a custom watcher mechanism that continuously monitors IngressRoutes and triggers appropriate actions based on changes detected. homemade watcher is used for Ingress objects too. This custom solution ensures accurate monitoring and synchronization with Uptime Kuma configurations. - -Here’s the passage with the new **Improvements** section included: - ---- +The Kubernetes event watcher (`watch`) does not provide specific details on creation, modification, or deletion events for IngressRoutes. To overcome this limitation, this controller implements a custom watcher mechanism that continuously monitors IngressRoutes and triggers appropriate actions based on changes detected. This custom solution ensures accurate monitoring and synchronization with Uptime Kuma configurations. ## Improvements +- **File-Based Monitor Configuration**: Define static monitors using a YAML file. - **IngressRoute Version Selection**: You can now choose which version of IngressRoutes to watch. This allows you to customize the controller's behavior based on the version of Traefik you're using. -- **Tag Addition for Monitors**: Currently, the addition of tags to monitors is not supported due to limitations in the Uptime Kuma API. Future updates may include support for this feature, allowing you to tag monitors directly through the controller. - ## Contributing @@ -152,10 +175,17 @@ To run unit tests for this project: - Python 3.12 or higher installed. - Poetry installed. -```` +```bash poetry install poetry run pytest -```` +``` + +### Pre-commit Hook + +```bash +pre-commit install +pre-commit run --all-files +``` ## License diff --git a/kuma_ingress_watcher/controller.py b/kuma_ingress_watcher/controller.py index d206a91..ff8a00f 100644 --- a/kuma_ingress_watcher/controller.py +++ b/kuma_ingress_watcher/controller.py @@ -3,6 +3,7 @@ import time import logging import sys +import yaml from uptime_kuma_api import UptimeKumaApi from kubernetes import client, config @@ -10,35 +11,35 @@ def str_to_bool(value): if isinstance(value, bool): return value - return str(value).lower() in ['true', '1', 't', 'y', 'yes'] + return str(value).lower() in ["true", "1", "t", "y", "yes"] # Configuration -UPTIME_KUMA_URL = os.getenv('UPTIME_KUMA_URL') -UPTIME_KUMA_USER = os.getenv('UPTIME_KUMA_USER') -UPTIME_KUMA_PASSWORD = os.getenv('UPTIME_KUMA_PASSWORD') -WATCH_INTERVAL = int(os.getenv('WATCH_INTERVAL', '10') or 10) -WATCH_INGRESSROUTES = str_to_bool(os.getenv('WATCH_INGRESSROUTES', True)) -WATCH_INGRESS = str_to_bool(os.getenv('WATCH_INGRESS', False)) -USE_TRAEFIK_V3_CRD_GROUP = str_to_bool(os.getenv('USE_TRAEFIK_V3_CRD_GROUP', False)) -LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper() +UPTIME_KUMA_URL = os.getenv("UPTIME_KUMA_URL") +UPTIME_KUMA_USER = os.getenv("UPTIME_KUMA_USER") +UPTIME_KUMA_PASSWORD = os.getenv("UPTIME_KUMA_PASSWORD") +WATCH_INTERVAL = int(os.getenv("WATCH_INTERVAL", "10") or 10) +WATCH_INGRESSROUTES = str_to_bool(os.getenv("WATCH_INGRESSROUTES", True)) +WATCH_INGRESS = str_to_bool(os.getenv("WATCH_INGRESS", False)) +USE_TRAEFIK_V3_CRD_GROUP = str_to_bool(os.getenv("USE_TRAEFIK_V3_CRD_GROUP", False)) +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +LOAD_MONITOR_FROM_FILE = str_to_bool(os.getenv("ENABLE_FILE_MONITOR", False)) +FILE_MONITOR_PATH = os.getenv("FILE_MONITOR_PATH", "/etc/kuma-controller/monitors.yaml") LOG_LEVELS = { - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO, - 'WARNING': logging.WARNING, - 'ERROR': logging.ERROR, - 'CRITICAL': logging.CRITICAL, + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, } -# Logging configuration logging.basicConfig( - level=LOG_LEVELS.get(LOG_LEVEL, 'INFO'), - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + level=LOG_LEVELS.get(LOG_LEVEL, "INFO"), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) -# Global variables for Kubernetes and Uptime Kuma kuma = None custom_api_instance = None networking_api_instance = None @@ -64,9 +65,16 @@ def create_or_update_monitor(name, url, interval, probe_type, headers, method): try: monitors = kuma.get_monitors() for monitor in monitors: - if monitor['name'] == name: + if monitor["name"] == name: logger.info(f"Updating monitor for {name} with URL: {url}") - kuma.edit_monitor(monitor['id'], url=url, type=probe_type, headers=headers, method=method, interval=interval) + kuma.edit_monitor( + monitor["id"], + url=url, + type=probe_type, + headers=headers, + method=method, + interval=interval, + ) return logger.info(f"Creating new monitor for {name} with URL: {url}") kuma.add_monitor( @@ -75,7 +83,7 @@ def create_or_update_monitor(name, url, interval, probe_type, headers, method): url=url, interval=interval, headers=headers, - method=method + method=method, ) logger.info(f"Successfully created monitor for {name}") except Exception as e: @@ -86,8 +94,8 @@ def delete_monitor(name): try: monitors = kuma.get_monitors() for monitor in monitors: - if monitor['name'] == name: - kuma.delete_monitor(monitor['id']) + if monitor["name"] == name: + kuma.delete_monitor(monitor["id"]) logger.info(f"Successfully deleted monitor {name}") return logger.warning(f"No monitor found with name {name}") @@ -96,63 +104,90 @@ def delete_monitor(name): def extract_hosts_from_match(match): - host_pattern = re.compile(r'Host\(`([^`]*)`\)') + host_pattern = re.compile(r"Host\(`([^`]*)`\)") return host_pattern.findall(match) def extract_hosts_from_ingress_rule(rule): hosts = [] - if 'host' in rule: - hosts.append(rule['host']) + if "host" in rule: + hosts.append(rule["host"]) return hosts def extract_hosts(route_or_rule, type_obj): - if type_obj == 'IngressRoute': - match = route_or_rule.get('match') + if type_obj == "IngressRoute": + match = route_or_rule.get("match") return extract_hosts_from_match(match) if match else [] - elif type_obj == 'Ingress': + elif type_obj == "Ingress": return extract_hosts_from_ingress_rule(route_or_rule) else: return [] def get_routes_or_rules(spec, type_obj): - if type_obj == 'IngressRoute': - return spec.get('routes', []) - elif type_obj == 'Ingress': - return spec.get('rules', []) + if type_obj == "IngressRoute": + return spec.get("routes", []) + elif type_obj == "Ingress": + return spec.get("rules", []) else: return [] def process_routing_object(item, type_obj): - metadata = item['metadata'] - annotations = metadata.get('annotations', {}) + metadata = item["metadata"] + annotations = metadata.get("annotations", {}) - name = metadata['name'] - namespace = metadata['namespace'] - spec = item['spec'] + name = metadata["name"] + namespace = metadata["namespace"] + spec = item["spec"] routes_or_rules = get_routes_or_rules(spec, type_obj) - interval = int(annotations.get('uptime-kuma.autodiscovery.probe.interval', 60)) - monitor_name = annotations.get('uptime-kuma.autodiscovery.probe.name', f"{name}-{namespace}") - enabled = annotations.get('uptime-kuma.autodiscovery.probe.enabled', 'true').lower() == 'true' - probe_type = annotations.get('uptime-kuma.autodiscovery.probe.type', 'http') - headers = annotations.get('uptime-kuma.autodiscovery.probe.headers') - port = annotations.get('uptime-kuma.autodiscovery.probe.port') - path = annotations.get('uptime-kuma.autodiscovery.probe.path') - hard_host = annotations.get('uptime-kuma.autodiscovery.probe.host') - method = annotations.get('uptime-kuma.autodiscovery.probe.method', 'GET') + interval = int(annotations.get("uptime-kuma.autodiscovery.probe.interval", 60)) + monitor_name = annotations.get( + "uptime-kuma.autodiscovery.probe.name", f"{name}-{namespace}" + ) + enabled = ( + annotations.get("uptime-kuma.autodiscovery.probe.enabled", "true").lower() + == "true" + ) + probe_type = annotations.get("uptime-kuma.autodiscovery.probe.type", "http") + headers = annotations.get("uptime-kuma.autodiscovery.probe.headers") + port = annotations.get("uptime-kuma.autodiscovery.probe.port") + path = annotations.get("uptime-kuma.autodiscovery.probe.path") + hard_host = annotations.get("uptime-kuma.autodiscovery.probe.host") + method = annotations.get("uptime-kuma.autodiscovery.probe.method", "GET") if not enabled: logger.info(f"Monitoring for {name} is disabled via annotations.") delete_monitor(monitor_name) return - process_routes(monitor_name, routes_or_rules, interval, probe_type, headers, port, path, hard_host, method, type_obj) - - -def process_routes(monitor_name, routes_or_rules, interval, probe_type, headers, port, path, hard_host, method, type_obj): + process_routes( + monitor_name, + routes_or_rules, + interval, + probe_type, + headers, + port, + path, + hard_host, + method, + type_obj, + ) + + +def process_routes( + monitor_name, + routes_or_rules, + interval, + probe_type, + headers, + port, + path, + hard_host, + method, + type_obj, +): index = 1 for route_or_rule in routes_or_rules: hosts = extract_hosts(route_or_rule, type_obj) @@ -167,9 +202,15 @@ def process_routes(monitor_name, routes_or_rules, interval, probe_type, headers, if port: url = f"{url}:{port}" - monitor_name_with_index = f"{monitor_name}-{index}" if len(routes_or_rules) > 1 else monitor_name + monitor_name_with_index = ( + f"{monitor_name}-{index}" + if len(routes_or_rules) > 1 + else monitor_name + ) - create_or_update_monitor(monitor_name_with_index, url, interval, probe_type, headers, method) + create_or_update_monitor( + monitor_name_with_index, url, interval, probe_type, headers, method + ) index += 1 @@ -196,23 +237,21 @@ def get_ingressroutes(custom_api_instance): try: return custom_api_instance.list_cluster_custom_object( - group=group, - version="v1alpha1", - plural="ingressroutes" + group=group, version="v1alpha1", plural="ingressroutes" ) except Exception as e: logger.error(f"Failed to get ingressroutes: {e}") - return {'items': []} + return {"items": []} def get_ingress(networking_api_instance): try: ingress_list = networking_api_instance.list_ingress_for_all_namespaces() ingress_dict_list = [ingress.to_dict() for ingress in ingress_list.items] - return {'items': ingress_dict_list} + return {"items": ingress_dict_list} except Exception as e: logger.error(f"Failed to get Ingress: {e}") - return {'items': []} + return {"items": []} def handle_changes(previous_items, current_items, resource_type): @@ -226,7 +265,7 @@ def handle_changes(previous_items, current_items, resource_type): deleted = previous_names - current_names for name in deleted: - namespace = previous_items[name]['metadata']['namespace'] + namespace = previous_items[name]["metadata"]["namespace"] monitor_name = f"{name}-{namespace}" logger.info(f"{resource_type} {name} deleted.") delete_monitor(monitor_name) @@ -255,22 +294,80 @@ def watch_ingress_resources(): while True: if WATCH_INGRESSROUTES: current_ingressroutes = get_ingressroutes(custom_api_instance) - current_items = {item['metadata']['name']: item for item in current_ingressroutes['items']} - previous_ingressroutes = handle_changes(previous_ingressroutes, current_items, "IngressRoute") + current_items = { + item["metadata"]["name"]: item + for item in current_ingressroutes["items"] + } + previous_ingressroutes = handle_changes( + previous_ingressroutes, current_items, "IngressRoute" + ) if WATCH_INGRESS: current_ingress = get_ingress(networking_api_instance) - current_items = {item['metadata']['name']: item for item in current_ingress['items']} - previous_ingress = handle_changes(previous_ingress, current_items, "Ingress") + current_items = { + item["metadata"]["name"]: item for item in current_ingress["items"] + } + previous_ingress = handle_changes( + previous_ingress, current_items, "Ingress" + ) time.sleep(WATCH_INTERVAL) +def process_monitor_file(file_path): + try: + with open(file_path, "r") as file: + file_content = file.read() + + if not file_content.strip(): + logger.info(f"The file {file_path} is empty or contains only whitespace.") + return + + try: + ingress_data = yaml.safe_load(file_content) + except yaml.YAMLError as e: + logger.error( + f"Failed to process file {file_path}: Invalid YAML format ({str(e)})" + ) + return + + for entry in ingress_data: + try: + if not isinstance(entry, dict): + raise ValueError(f"Invalid entry format: {entry}") + if "name" not in entry or "url" not in entry: + raise KeyError(f"Missing required fields in entry: {entry}") + + create_or_update_monitor( + entry.get("name"), + entry.get("url"), + entry.get("interval", 60), + entry.get("type", "http"), + entry.get("headers", {}), + entry.get("method", "GET"), + ) + except (ValueError, KeyError) as e: + logger.warning(f"Skipping invalid entry: {entry} ({str(e)})") + + except FileNotFoundError: + logger.error(f"File {file_path} not found.") + except Exception as e: + logger.error( + f"An unexpected error occurred while processing file {file_path}: {str(e)}" + ) + + def main(): check_config() init_kuma_api() - init_kubernetes_client() - watch_ingress_resources() + + if LOAD_MONITOR_FROM_FILE: + logger.info("File-based Monitor creation is enabled.") + process_monitor_file(FILE_MONITOR_PATH) + + if WATCH_INGRESSROUTES or WATCH_INGRESS: + init_kubernetes_client() + watch_ingress_resources() if __name__ == "__main__": diff --git a/tests/test_create_or_update_monitor.py b/tests/test_create_or_update_monitor.py index a4f69a7..3caee1b 100644 --- a/tests/test_create_or_update_monitor.py +++ b/tests/test_create_or_update_monitor.py @@ -4,38 +4,58 @@ class TestCreateOrUpdateMonitor(unittest.TestCase): - @patch('kuma_ingress_watcher.controller.kuma') - @patch('kuma_ingress_watcher.controller.logger') + @patch("kuma_ingress_watcher.controller.kuma") + @patch("kuma_ingress_watcher.controller.logger") def test_create_or_update_monitor_update(self, mock_logger, mock_kuma): - mock_kuma.get_monitors.return_value = [{'name': 'test', 'url': 'http://oldurl.com', 'id': 1}] + mock_kuma.get_monitors.return_value = [ + {"name": "test", "url": "http://oldurl.com", "id": 1} + ] - create_or_update_monitor('test', 'http://newurl.com', 60, 'http', None, 'GET') + create_or_update_monitor("test", "http://newurl.com", 60, "http", None, "GET") - mock_logger.info.assert_called_with('Updating monitor for test with URL: http://newurl.com') - mock_kuma.edit_monitor.assert_called_once_with(1, url='http://newurl.com', interval=60, type='http', headers=None, method='GET') + mock_logger.info.assert_called_with( + "Updating monitor for test with URL: http://newurl.com" + ) + mock_kuma.edit_monitor.assert_called_once_with( + 1, + url="http://newurl.com", + interval=60, + type="http", + headers=None, + method="GET", + ) - @patch('kuma_ingress_watcher.controller.kuma') - @patch('kuma_ingress_watcher.controller.logger') + @patch("kuma_ingress_watcher.controller.kuma") + @patch("kuma_ingress_watcher.controller.logger") def test_create_or_update_monitor_create(self, mock_logger, mock_kuma): mock_kuma.get_monitors.return_value = [] - create_or_update_monitor('test', 'http://newurl.com', 60, 'http', None, 'GET') + create_or_update_monitor("test", "http://newurl.com", 60, "http", None, "GET") - mock_logger.info.assert_any_call('Creating new monitor for test with URL: http://newurl.com') - mock_logger.info.assert_any_call('Successfully created monitor for test') + mock_logger.info.assert_any_call( + "Creating new monitor for test with URL: http://newurl.com" + ) + mock_logger.info.assert_any_call("Successfully created monitor for test") mock_kuma.add_monitor.assert_called_once_with( - type='http', name='test', url='http://newurl.com', interval=60, headers=None, method='GET' + type="http", + name="test", + url="http://newurl.com", + interval=60, + headers=None, + method="GET", ) - @patch('kuma_ingress_watcher.controller.kuma') - @patch('kuma_ingress_watcher.controller.logger') + @patch("kuma_ingress_watcher.controller.kuma") + @patch("kuma_ingress_watcher.controller.logger") def test_create_or_update_monitor_error(self, mock_logger, mock_kuma): - mock_kuma.get_monitors.side_effect = Exception('API error') + mock_kuma.get_monitors.side_effect = Exception("API error") - create_or_update_monitor('test', 'http://newurl.com', 60, 'http', None, 'GET') + create_or_update_monitor("test", "http://newurl.com", 60, "http", None, "GET") - mock_logger.error.assert_called_once_with('Failed to create or update monitor for test: API error') + mock_logger.error.assert_called_once_with( + "Failed to create or update monitor for test: API error" + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_delete_monitor.py b/tests/test_delete_monitor.py index e9f4e93..b8d5d8c 100644 --- a/tests/test_delete_monitor.py +++ b/tests/test_delete_monitor.py @@ -4,26 +4,26 @@ class TestDeleteMonitor(unittest.TestCase): - @patch('kuma_ingress_watcher.controller.kuma') - @patch('kuma_ingress_watcher.controller.logger') + @patch("kuma_ingress_watcher.controller.kuma") + @patch("kuma_ingress_watcher.controller.logger") def test_delete_monitor_exists(self, mock_logger, mock_kuma): - mock_kuma.get_monitors.return_value = [{'name': 'test', 'id': 1}] + mock_kuma.get_monitors.return_value = [{"name": "test", "id": 1}] - delete_monitor('test') + delete_monitor("test") - mock_logger.info.assert_called_with('Successfully deleted monitor test') + mock_logger.info.assert_called_with("Successfully deleted monitor test") mock_kuma.delete_monitor.assert_called_once_with(1) - @patch('kuma_ingress_watcher.controller.kuma') - @patch('kuma_ingress_watcher.controller.logger') + @patch("kuma_ingress_watcher.controller.kuma") + @patch("kuma_ingress_watcher.controller.logger") def test_delete_monitor_not_found(self, mock_logger, mock_kuma): mock_kuma.get_monitors.return_value = [] - delete_monitor('test') + delete_monitor("test") - mock_logger.warning.assert_called_with('No monitor found with name test') + mock_logger.warning.assert_called_with("No monitor found with name test") mock_kuma.delete_monitor.assert_not_called() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_extract_hosts.py b/tests/test_extract_hosts.py index c211bb7..bce54bf 100644 --- a/tests/test_extract_hosts.py +++ b/tests/test_extract_hosts.py @@ -4,20 +4,20 @@ class TestExtractHosts(unittest.TestCase): def test_extract_hosts_ingressroute(self): - route_or_rule = {'match': 'Host(`example.com`) && Host(`example.org`)'} - hosts = extract_hosts(route_or_rule, 'IngressRoute') - self.assertEqual(hosts, ['example.com', 'example.org']) + route_or_rule = {"match": "Host(`example.com`) && Host(`example.org`)"} + hosts = extract_hosts(route_or_rule, "IngressRoute") + self.assertEqual(hosts, ["example.com", "example.org"]) def test_extract_hosts_ingress(self): - route_or_rule = {'host': 'example.com'} - hosts = extract_hosts(route_or_rule, 'Ingress') - self.assertEqual(hosts, ['example.com']) + route_or_rule = {"host": "example.com"} + hosts = extract_hosts(route_or_rule, "Ingress") + self.assertEqual(hosts, ["example.com"]) def test_extract_hosts_none(self): - route_or_rule = {'path': '/test'} - hosts = extract_hosts(route_or_rule, 'IngressRoute') + route_or_rule = {"path": "/test"} + hosts = extract_hosts(route_or_rule, "IngressRoute") self.assertEqual(hosts, []) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_extract_hosts_from_ingress_rule.py b/tests/test_extract_hosts_from_ingress_rule.py index 42ba171..906a12c 100644 --- a/tests/test_extract_hosts_from_ingress_rule.py +++ b/tests/test_extract_hosts_from_ingress_rule.py @@ -4,15 +4,15 @@ class TestExtractHostsFromIngressRule(unittest.TestCase): def test_extract_hosts_single(self): - rule = {'host': 'example.com'} + rule = {"host": "example.com"} hosts = extract_hosts_from_ingress_rule(rule) - self.assertEqual(hosts, ['example.com']) + self.assertEqual(hosts, ["example.com"]) def test_extract_hosts_none(self): - rule = {'path': '/test'} + rule = {"path": "/test"} hosts = extract_hosts_from_ingress_rule(rule) self.assertEqual(hosts, []) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_extract_hosts_from_match.py b/tests/test_extract_hosts_from_match.py index 0c55e39..621926a 100644 --- a/tests/test_extract_hosts_from_match.py +++ b/tests/test_extract_hosts_from_match.py @@ -4,20 +4,20 @@ class TestExtractHostsFromMatch(unittest.TestCase): def test_extract_hosts_single(self): - match = 'Host(`example.com`)' + match = "Host(`example.com`)" hosts = extract_hosts_from_match(match) - self.assertEqual(hosts, ['example.com']) + self.assertEqual(hosts, ["example.com"]) def test_extract_hosts_multiple(self): - match = 'Host(`example.com`) && Host(`example.org`)' + match = "Host(`example.com`) && Host(`example.org`)" hosts = extract_hosts_from_match(match) - self.assertEqual(hosts, ['example.com', 'example.org']) + self.assertEqual(hosts, ["example.com", "example.org"]) def test_extract_hosts_none(self): - match = 'Path(`/test`)' + match = "Path(`/test`)" hosts = extract_hosts_from_match(match) self.assertEqual(hosts, []) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_get_ingress.py b/tests/test_get_ingress.py index 17a0761..765a565 100644 --- a/tests/test_get_ingress.py +++ b/tests/test_get_ingress.py @@ -4,38 +4,44 @@ class TestGetIngress(unittest.TestCase): - @patch('kuma_ingress_watcher.controller.logger', spec=True) - @patch('kuma_ingress_watcher.controller.networking_api_instance') + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.networking_api_instance") def test_get_ingress_success(self, mock_api_instance, mock_logger): # Mock an ingress object with a 'to_dict' method mock_ingress = MagicMock() - mock_ingress.to_dict.return_value = {'metadata': {'name': 'test-ingress'}} + mock_ingress.to_dict.return_value = {"metadata": {"name": "test-ingress"}} # Set the mocked 'items' list to contain the mocked ingress object - mock_api_instance.list_ingress_for_all_namespaces.return_value.items = [mock_ingress] + mock_api_instance.list_ingress_for_all_namespaces.return_value.items = [ + mock_ingress + ] # Call the function result = get_ingress(mock_api_instance) # Expected result after calling 'to_dict' on the ingress object - expected_result = {'items': [{'metadata': {'name': 'test-ingress'}}]} + expected_result = {"items": [{"metadata": {"name": "test-ingress"}}]} # Assert the function's result matches the expected result self.assertEqual(result, expected_result) mock_logger.error.assert_not_called() - @patch('kuma_ingress_watcher.controller.logger', spec=True) - @patch('kuma_ingress_watcher.controller.networking_api_instance') + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.networking_api_instance") def test_get_ingress_failure(self, mock_api_instance, mock_logger): # Simulate an exception when calling list_ingress_for_all_namespaces - mock_api_instance.list_ingress_for_all_namespaces.side_effect = Exception('Failed to get Ingress') + mock_api_instance.list_ingress_for_all_namespaces.side_effect = Exception( + "Failed to get Ingress" + ) result = get_ingress(mock_api_instance) # Verify that in case of an exception, an empty list is returned - self.assertEqual(result, {'items': []}) - mock_logger.error.assert_called_once_with('Failed to get Ingress: Failed to get Ingress') + self.assertEqual(result, {"items": []}) + mock_logger.error.assert_called_once_with( + "Failed to get Ingress: Failed to get Ingress" + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_get_ingressroutes.py b/tests/test_get_ingressroutes.py index 666a001..baba84c 100644 --- a/tests/test_get_ingressroutes.py +++ b/tests/test_get_ingressroutes.py @@ -4,75 +4,83 @@ class TestGetIngressroutes(unittest.TestCase): - @patch('kuma_ingress_watcher.controller.logger', spec=True) - @patch('kuma_ingress_watcher.controller.custom_api_instance') + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.custom_api_instance") def test_get_ingressroutes_success(self, mock_api_instance, mock_logger): # Mock an ingressroute object with a 'to_dict' method mock_ingressroute = MagicMock() - mock_ingressroute.to_dict.return_value = {'metadata': {'name': 'test-ingressroute'}} + mock_ingressroute.to_dict.return_value = { + "metadata": {"name": "test-ingressroute"} + } # Mock the API response to include the mocked ingressroute mock_api_instance.list_cluster_custom_object.return_value = { - 'items': [mock_ingressroute.to_dict()] + "items": [mock_ingressroute.to_dict()] } # Call the function result = get_ingressroutes(mock_api_instance) # Assert we call list_cluster_custom_object with expected group - mock_api_instance.list_cluster_custom_object.assert_called_with(group='traefik.containo.us', - version='v1alpha1', - plural='ingressroutes') + mock_api_instance.list_cluster_custom_object.assert_called_with( + group="traefik.containo.us", version="v1alpha1", plural="ingressroutes" + ) # Expected result after calling 'to_dict' on the ingressroute object - expected_result = {'items': [{'metadata': {'name': 'test-ingressroute'}}]} + expected_result = {"items": [{"metadata": {"name": "test-ingressroute"}}]} # Assert the function's result matches the expected result self.assertEqual(result, expected_result) mock_logger.error.assert_not_called() - @patch('kuma_ingress_watcher.controller.logger', spec=True) - @patch('kuma_ingress_watcher.controller.custom_api_instance') + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.custom_api_instance") def test_get_ingressroutes_failure(self, mock_api_instance, mock_logger): # Simulate an exception when calling list_cluster_custom_object - mock_api_instance.list_cluster_custom_object.side_effect = Exception('Failed to get ingressroutes') + mock_api_instance.list_cluster_custom_object.side_effect = Exception( + "Failed to get ingressroutes" + ) result = get_ingressroutes(mock_api_instance) # Verify that in case of an exception, an empty list is returned - self.assertEqual(result, {'items': []}) - mock_logger.error.assert_called_once_with('Failed to get ingressroutes: Failed to get ingressroutes') + self.assertEqual(result, {"items": []}) + mock_logger.error.assert_called_once_with( + "Failed to get ingressroutes: Failed to get ingressroutes" + ) - @patch('kuma_ingress_watcher.controller.logger', spec=True) - @patch('kuma_ingress_watcher.controller.custom_api_instance') + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.custom_api_instance") def test_get_ingressroutes_empty(self, mock_api_instance, mock_logger): # Simulate an empty response from the API - mock_api_instance.list_cluster_custom_object.return_value = {'items': []} + mock_api_instance.list_cluster_custom_object.return_value = {"items": []} result = get_ingressroutes(mock_api_instance) # Verify that an empty list is returned when there are no ingressroutes - self.assertEqual(result, {'items': []}) + self.assertEqual(result, {"items": []}) mock_logger.error.assert_not_called() - @patch('kuma_ingress_watcher.controller.custom_api_instance') - @patch('kuma_ingress_watcher.controller.USE_TRAEFIK_V3_CRD_GROUP', True) + @patch("kuma_ingress_watcher.controller.custom_api_instance") + @patch("kuma_ingress_watcher.controller.USE_TRAEFIK_V3_CRD_GROUP", True) def test_get_ingressroutes_traefik_v3_crd(self, mock_api_instance): mock_ingressroute = MagicMock() - mock_ingressroute.to_dict.return_value = {'metadata': {'name': 'test-ingressroute'}} + mock_ingressroute.to_dict.return_value = { + "metadata": {"name": "test-ingressroute"} + } # Mock the API response to include the mocked ingressroute mock_api_instance.list_cluster_custom_object.return_value = { - 'items': [mock_ingressroute.to_dict()] + "items": [mock_ingressroute.to_dict()] } get_ingressroutes(mock_api_instance) # Assert we call list_cluster_custom_object with expected group - mock_api_instance.list_cluster_custom_object.assert_called_with(group='traefik.io', - version='v1alpha1', - plural='ingressroutes') + mock_api_instance.list_cluster_custom_object.assert_called_with( + group="traefik.io", version="v1alpha1", plural="ingressroutes" + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_get_routes_or_rules.py b/tests/test_get_routes_or_rules.py index 30a5ead..b7ae29a 100644 --- a/tests/test_get_routes_or_rules.py +++ b/tests/test_get_routes_or_rules.py @@ -5,53 +5,38 @@ class TestGetRoutesOrRules(unittest.TestCase): def test_get_routes_ingressroute(self): spec = { - 'routes': [ - {'match': 'Host(`example.com`)'}, - {'match': 'Host(`example.org`)'} + "routes": [ + {"match": "Host(`example.com`)"}, + {"match": "Host(`example.org`)"}, ] } - type_obj = 'IngressRoute' + type_obj = "IngressRoute" result = get_routes_or_rules(spec, type_obj) - expected = [ - {'match': 'Host(`example.com`)'}, - {'match': 'Host(`example.org`)'} - ] + expected = [{"match": "Host(`example.com`)"}, {"match": "Host(`example.org`)"}] self.assertEqual(result, expected) def test_get_routes_ingress(self): - spec = { - 'rules': [ - {'host': 'example.com'}, - {'host': 'example.org'} - ] - } - type_obj = 'Ingress' + spec = {"rules": [{"host": "example.com"}, {"host": "example.org"}]} + type_obj = "Ingress" result = get_routes_or_rules(spec, type_obj) - expected = [ - {'host': 'example.com'}, - {'host': 'example.org'} - ] + expected = [{"host": "example.com"}, {"host": "example.org"}] self.assertEqual(result, expected) def test_get_routes_empty(self): spec = {} - type_obj = 'IngressRoute' + type_obj = "IngressRoute" result = get_routes_or_rules(spec, type_obj) self.assertEqual(result, []) def test_get_routes_invalid_type(self): spec = { - 'routes': [ - {'match': 'Host(`example.com`)'} - ], - 'rules': [ - {'host': 'example.org'} - ] + "routes": [{"match": "Host(`example.com`)"}], + "rules": [{"host": "example.org"}], } - type_obj = 'InvalidType' + type_obj = "InvalidType" result = get_routes_or_rules(spec, type_obj) self.assertEqual(result, []) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_handle_changes.py b/tests/test_handle_changes.py index f80efa2..9ac102d 100644 --- a/tests/test_handle_changes.py +++ b/tests/test_handle_changes.py @@ -4,20 +4,25 @@ class TestHandleChanges(unittest.TestCase): - - @patch('kuma_ingress_watcher.controller.process_routing_object') - @patch('kuma_ingress_watcher.controller.delete_monitor') - @patch('kuma_ingress_watcher.controller.ingressroute_changed') - @patch('kuma_ingress_watcher.controller.logger', spec=True) - def test_handle_changes(self, mock_logger, mock_ingressroute_changed, mock_delete_monitor, mock_process_routing_object): + @patch("kuma_ingress_watcher.controller.process_routing_object") + @patch("kuma_ingress_watcher.controller.delete_monitor") + @patch("kuma_ingress_watcher.controller.ingressroute_changed") + @patch("kuma_ingress_watcher.controller.logger", spec=True) + def test_handle_changes( + self, + mock_logger, + mock_ingressroute_changed, + mock_delete_monitor, + mock_process_routing_object, + ): # Define previous and current items previous_items = { - 'test1': {'metadata': {'name': 'test1', 'namespace': 'default'}}, - 'test2': {'metadata': {'name': 'test2', 'namespace': 'default'}} + "test1": {"metadata": {"name": "test1", "namespace": "default"}}, + "test2": {"metadata": {"name": "test2", "namespace": "default"}}, } current_items = { - 'test1': {'metadata': {'name': 'test1', 'namespace': 'default'}}, - 'test3': {'metadata': {'name': 'test3', 'namespace': 'default'}} + "test1": {"metadata": {"name": "test1", "namespace": "default"}}, + "test3": {"metadata": {"name": "test3", "namespace": "default"}}, } # Simulate that the items have changed @@ -27,14 +32,14 @@ def test_handle_changes(self, mock_logger, mock_ingressroute_changed, mock_delet handle_changes(previous_items, current_items, "Ingress") # Verify that process_routing_object was called for the added item - mock_process_routing_object.assert_any_call(current_items['test3'], "Ingress") + mock_process_routing_object.assert_any_call(current_items["test3"], "Ingress") # Verify that process_routing_object was called for the modified item - mock_process_routing_object.assert_any_call(current_items['test1'], "Ingress") + mock_process_routing_object.assert_any_call(current_items["test1"], "Ingress") # Verify that delete_monitor was called for the deleted item - mock_delete_monitor.assert_called_once_with('test2-default') + mock_delete_monitor.assert_called_once_with("test2-default") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_init_kuma_api.py b/tests/test_init_kuma_api.py index 80df3c5..b7c84a7 100644 --- a/tests/test_init_kuma_api.py +++ b/tests/test_init_kuma_api.py @@ -4,9 +4,9 @@ class TestInitKumaApi(unittest.TestCase): - @patch('kuma_ingress_watcher.controller.UptimeKumaApi') - @patch('kuma_ingress_watcher.controller.logger') - @patch('kuma_ingress_watcher.controller.sys.exit') + @patch("kuma_ingress_watcher.controller.UptimeKumaApi") + @patch("kuma_ingress_watcher.controller.logger") + @patch("kuma_ingress_watcher.controller.sys.exit") def test_init_kuma_api_success(self, mock_exit, mock_logger, MockUptimeKumaApi): mock_kuma = MagicMock() MockUptimeKumaApi.return_value = mock_kuma @@ -16,11 +16,11 @@ def test_init_kuma_api_success(self, mock_exit, mock_logger, MockUptimeKumaApi): mock_kuma.login.assert_called_once() mock_exit.assert_not_called() - @patch('kuma_ingress_watcher.controller.UptimeKumaApi') - @patch('kuma_ingress_watcher.controller.logger') - @patch('kuma_ingress_watcher.controller.sys.exit') + @patch("kuma_ingress_watcher.controller.UptimeKumaApi") + @patch("kuma_ingress_watcher.controller.logger") + @patch("kuma_ingress_watcher.controller.sys.exit") def test_init_kuma_api_failure(self, mock_exit, mock_logger, MockUptimeKumaApi): - MockUptimeKumaApi.side_effect = Exception('Login failed') + MockUptimeKumaApi.side_effect = Exception("Login failed") init_kuma_api() @@ -28,5 +28,5 @@ def test_init_kuma_api_failure(self, mock_exit, mock_logger, MockUptimeKumaApi): mock_logger.error.assert_called_once() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_process_file_monitor.py b/tests/test_process_file_monitor.py new file mode 100644 index 0000000..eadf5ec --- /dev/null +++ b/tests/test_process_file_monitor.py @@ -0,0 +1,174 @@ +import unittest +from unittest.mock import patch, MagicMock +import yaml +from kuma_ingress_watcher.controller import process_monitor_file + + +class TestProcessFileIngress(unittest.TestCase): + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + def test_process_monitor_file_empty_file(self, mock_open, mock_logger): + mock_file_content = "" + mock_open.return_value.__enter__.return_value.read.return_value = ( + mock_file_content + ) + + process_monitor_file("empty_file.yaml") + + mock_logger.info.assert_called_once_with( + "The file empty_file.yaml is empty or contains only whitespace." + ) + + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + def test_process_monitor_file_invalid_yaml(self, mock_open, mock_logger): + mock_file_content = """ + - name: test-ingress + url: http://example.com + interval: 60 + type: http + - name: invalid-ingress + url example.com # Erreur dans le YAML + """ + mock_open.return_value.__enter__.return_value.read.return_value = ( + mock_file_content + ) + + with patch("yaml.safe_load", side_effect=yaml.YAMLError("Invalid YAML")): + process_monitor_file("mock_file.yaml") + + mock_logger.error.assert_called_once_with( + "Failed to process file mock_file.yaml: Invalid YAML format (Invalid YAML)" + ) + + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + @patch("kuma_ingress_watcher.controller.create_or_update_monitor", spec=True) + def test_process_monitor_file_valid_entries( + self, mock_create_or_update_monitor, mock_open, mock_logger + ): + mock_file_content = """ + - name: test-ingress + url: http://example.com + interval: 30 + type: http + headers: {"Authorization": "Bearer token"} + method: POST + """ + mock_open.return_value.__enter__.return_value.read.return_value = ( + mock_file_content + ) + + process_monitor_file("mock_file.yaml") + + mock_create_or_update_monitor.assert_called_once_with( + "test-ingress", + "http://example.com", + 30, + "http", + {"Authorization": "Bearer token"}, + "POST", + ) + + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + def test_process_monitor_file_file_not_found(self, mock_open, mock_logger): + mock_open.side_effect = FileNotFoundError + + process_monitor_file("nonexistent_file.yaml") + + mock_logger.error.assert_called_once_with( + "File nonexistent_file.yaml not found." + ) + + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + def test_process_monitor_file_unexpected_exception(self, mock_open, mock_logger): + mock_open.side_effect = Exception("Unexpected error") + + process_monitor_file("error_file.yaml") + + mock_logger.error.assert_called_once_with( + "An unexpected error occurred while processing file error_file.yaml: Unexpected error" + ) + + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + def test_process_monitor_file_invalid_entry_format(self, mock_open, mock_logger): + mock_file_content = """ + - not_a_dict + """ + mock_open.return_value.__enter__.return_value.read.return_value = ( + mock_file_content + ) + + process_monitor_file("mock_file.yaml") + + mock_logger.warning.assert_called_once_with( + "Skipping invalid entry: not_a_dict (Invalid entry format: not_a_dict)" + ) + + @patch("kuma_ingress_watcher.controller.logger", spec=True) + @patch("kuma_ingress_watcher.controller.open", new_callable=MagicMock) + @patch("kuma_ingress_watcher.controller.create_or_update_monitor", spec=True) + def test_process_monitor_file_multiple_valid_entries_with_non_default_values( + self, mock_create_or_update_monitor, mock_open, mock_logger + ): + # Contenu simulé du fichier avec plusieurs entrées valides et des valeurs personnalisées + mock_file_content = """ + - name: ingress1 + url: http://example1.com + interval: 30 + type: https + probe_type: http + headers: + Authorization: "Bearer token1" + method: POST + - name: ingress2 + url: http://example2.com + - name: ingress3 + url: http://example3.com + type: http + probe_type: http + headers: + Authorization: "Bearer token3" + method: GET + - name: ingress1 + url: http://example1.com + interval: 30 + probe_type: http + headers: + Authorization: "Bearer token1" + method: POST + - name: ingress2 + url: http://example2.com + interval: 45 + type: http + headers: + Authorization: "Bearer token2" + method: PUT + - name: ingress3 + url: http://example3.com + interval: 60 + type: http + probe_type: http + method: GET + - name: ingress3 + url: http://example3.com + interval: 60 + type: http + probe_type: http + """ + mock_open.return_value.__enter__.return_value.read.return_value = ( + mock_file_content + ) + + process_monitor_file("mock_file.yaml") + + mock_logger.warning.assert_not_called() + + assert mock_create_or_update_monitor.call_count == 7 + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_process_routes.py b/tests/test_process_routes.py index 7afa3a3..bbaca1d 100644 --- a/tests/test_process_routes.py +++ b/tests/test_process_routes.py @@ -4,211 +4,216 @@ class TestProcessRoutes(unittest.TestCase): - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_single_route(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.return_value = ['example.com'] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_single_route( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.return_value = ["example.com"] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}], + monitor_name="test-monitor", + routes_or_rules=[{"match": "Host(`example.com`)"}], interval=60, - probe_type='http', + probe_type="http", headers=None, hard_host=None, path=None, - port='8080', - method='GET', - type_obj='IngressRoute' + port="8080", + method="GET", + type_obj="IngressRoute", ) mock_create_or_update_monitor.assert_called_once_with( - 'test-monitor', - 'https://example.com:8080', - 60, - 'http', - None, - 'GET' + "test-monitor", "https://example.com:8080", 60, "http", None, "GET" ) - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_multiple_routes(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.side_effect = [['example.com'], ['example.org']] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_multiple_routes( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.side_effect = [["example.com"], ["example.org"]] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}, {'match': 'Host(`example.org`)'}], + monitor_name="test-monitor", + routes_or_rules=[ + {"match": "Host(`example.com`)"}, + {"match": "Host(`example.org`)"}, + ], interval=60, - probe_type='http', + probe_type="http", headers=None, hard_host=None, path=None, - port='8080', - method='GET', - type_obj='IngressRoute' + port="8080", + method="GET", + type_obj="IngressRoute", ) self.assertEqual(mock_create_or_update_monitor.call_count, 2) calls = [ - unittest.mock.call('test-monitor-1', 'https://example.com:8080', 60, 'http', None, 'GET'), - unittest.mock.call('test-monitor-2', 'https://example.org:8080', 60, 'http', None, 'GET') + unittest.mock.call( + "test-monitor-1", "https://example.com:8080", 60, "http", None, "GET" + ), + unittest.mock.call( + "test-monitor-2", "https://example.org:8080", 60, "http", None, "GET" + ), ] mock_create_or_update_monitor.assert_has_calls(calls) - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_no_hosts(self, mock_extract_hosts, mock_create_or_update_monitor): + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_no_hosts( + self, mock_extract_hosts, mock_create_or_update_monitor + ): mock_extract_hosts.return_value = [] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Path(`/test`)'}], + monitor_name="test-monitor", + routes_or_rules=[{"match": "Path(`/test`)"}], interval=60, - probe_type='http', + probe_type="http", headers=None, hard_host=None, path=None, - port='8080', - method='GET', - type_obj='IngressRoute' + port="8080", + method="GET", + type_obj="IngressRoute", ) mock_create_or_update_monitor.assert_not_called() - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_with_empty_port(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.return_value = ['example.com'] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_with_empty_port( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.return_value = ["example.com"] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}], + monitor_name="test-monitor", + routes_or_rules=[{"match": "Host(`example.com`)"}], interval=60, - probe_type='http', + probe_type="http", headers=None, hard_host=None, path=None, port=None, - method='GET', - type_obj='IngressRoute' + method="GET", + type_obj="IngressRoute", ) mock_create_or_update_monitor.assert_called_once_with( - 'test-monitor', - 'https://example.com', - 60, - 'http', - None, - 'GET' + "test-monitor", "https://example.com", 60, "http", None, "GET" ) - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_with_path(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.return_value = ['example.com'] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_with_path( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.return_value = ["example.com"] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}], + monitor_name="test-monitor", + routes_or_rules=[{"match": "Host(`example.com`)"}], interval=60, - probe_type='http', + probe_type="http", headers=None, hard_host=None, - path='/milou', + path="/milou", port=None, - method='GET', - type_obj='IngressRoute' + method="GET", + type_obj="IngressRoute", ) mock_create_or_update_monitor.assert_called_once_with( - 'test-monitor', - 'https://example.com/milou', - 60, - 'http', - None, - 'GET' + "test-monitor", "https://example.com/milou", 60, "http", None, "GET" ) - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_with_hard_host_and_path(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.return_value = ['example.com'] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_with_hard_host_and_path( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.return_value = ["example.com"] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}], + monitor_name="test-monitor", + routes_or_rules=[{"match": "Host(`example.com`)"}], interval=60, - probe_type='http', + probe_type="http", headers=None, - hard_host='tintin', - path='/milou', + hard_host="tintin", + path="/milou", port=None, - method='GET', - type_obj='IngressRoute' + method="GET", + type_obj="IngressRoute", ) mock_create_or_update_monitor.assert_called_once_with( - 'test-monitor', - 'https://tintin/milou', - 60, - 'http', - None, - 'GET' + "test-monitor", "https://tintin/milou", 60, "http", None, "GET" ) - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_with_hard_host_and_path_and_port(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.return_value = ['example.com'] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_with_hard_host_and_path_and_port( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.return_value = ["example.com"] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}], + monitor_name="test-monitor", + routes_or_rules=[{"match": "Host(`example.com`)"}], interval=60, - probe_type='http', + probe_type="http", headers=None, - hard_host='tintin', - path='/milou', + hard_host="tintin", + path="/milou", port=8080, - method='GET', - type_obj='IngressRoute' + method="GET", + type_obj="IngressRoute", ) mock_create_or_update_monitor.assert_called_once_with( - 'test-monitor', - 'https://tintin/milou:8080', - 60, - 'http', - None, - 'GET' + "test-monitor", "https://tintin/milou:8080", 60, "http", None, "GET" ) - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.extract_hosts') - def test_process_routes_multiple_routes_with_hard_host_and_path_and_port(self, mock_extract_hosts, mock_create_or_update_monitor): - mock_extract_hosts.side_effect = [['example.com'], ['example.org']] + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.extract_hosts") + def test_process_routes_multiple_routes_with_hard_host_and_path_and_port( + self, mock_extract_hosts, mock_create_or_update_monitor + ): + mock_extract_hosts.side_effect = [["example.com"], ["example.org"]] process_routes( - monitor_name='test-monitor', - routes_or_rules=[{'match': 'Host(`example.com`)'}, {'match': 'Host(`example.org`)'}], + monitor_name="test-monitor", + routes_or_rules=[ + {"match": "Host(`example.com`)"}, + {"match": "Host(`example.org`)"}, + ], interval=60, - probe_type='http', + probe_type="http", headers=None, - hard_host='tintin', - path='/milou', + hard_host="tintin", + path="/milou", port=8080, - method='GET', - type_obj='IngressRoute' + method="GET", + type_obj="IngressRoute", ) self.assertEqual(mock_create_or_update_monitor.call_count, 2) calls = [ - unittest.mock.call('test-monitor-1', 'https://tintin/milou:8080', 60, 'http', None, 'GET'), - unittest.mock.call('test-monitor-2', 'https://tintin/milou:8080', 60, 'http', None, 'GET') + unittest.mock.call( + "test-monitor-1", "https://tintin/milou:8080", 60, "http", None, "GET" + ), + unittest.mock.call( + "test-monitor-2", "https://tintin/milou:8080", 60, "http", None, "GET" + ), ] mock_create_or_update_monitor.assert_has_calls(calls) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_process_routing_object.py b/tests/test_process_routing_object.py index ab2b4a4..4a9d52f 100644 --- a/tests/test_process_routing_object.py +++ b/tests/test_process_routing_object.py @@ -4,16 +4,17 @@ class TestProcessRoutingObject(unittest.TestCase): - - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.delete_monitor') - def test_process_routing_object_single_route(self, mock_delete_monitor, mock_create_or_update_monitor): + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.delete_monitor") + def test_process_routing_object_single_route( + self, mock_delete_monitor, mock_create_or_update_monitor + ): # Define the test item with a single route item = { - 'metadata': {'name': 'test', 'namespace': 'default', 'annotations': {}}, - 'spec': {'routes': [{'match': 'Host(`example.com`)'}]} + "metadata": {"name": "test", "namespace": "default", "annotations": {}}, + "spec": {"routes": [{"match": "Host(`example.com`)"}]}, } - type_obj = 'IngressRoute' + type_obj = "IngressRoute" # Call the function under test process_routing_object(item, type_obj) @@ -24,15 +25,22 @@ def test_process_routing_object_single_route(self, mock_delete_monitor, mock_cre # Verify that delete_monitor was not called mock_delete_monitor.assert_not_called() - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.delete_monitor') - def test_process_routing_object_multiple_routes(self, mock_delete_monitor, mock_create_or_update_monitor): + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.delete_monitor") + def test_process_routing_object_multiple_routes( + self, mock_delete_monitor, mock_create_or_update_monitor + ): # Define the test item with multiple routes item = { - 'metadata': {'name': 'test', 'namespace': 'default', 'annotations': {}}, - 'spec': {'routes': [{'match': 'Host(`example.com`)'}, {'match': 'Host(`example.org`)'}]} + "metadata": {"name": "test", "namespace": "default", "annotations": {}}, + "spec": { + "routes": [ + {"match": "Host(`example.com`)"}, + {"match": "Host(`example.org`)"}, + ] + }, } - type_obj = 'IngressRoute' + type_obj = "IngressRoute" # Call the function under test process_routing_object(item, type_obj) @@ -42,15 +50,17 @@ def test_process_routing_object_multiple_routes(self, mock_delete_monitor, mock_ # Verify that delete_monitor was not called mock_delete_monitor.assert_not_called() - @patch('kuma_ingress_watcher.controller.create_or_update_monitor') - @patch('kuma_ingress_watcher.controller.delete_monitor') - def test_process_routing_object_empty(self, mock_delete_monitor, mock_create_or_update_monitor): + @patch("kuma_ingress_watcher.controller.create_or_update_monitor") + @patch("kuma_ingress_watcher.controller.delete_monitor") + def test_process_routing_object_empty( + self, mock_delete_monitor, mock_create_or_update_monitor + ): # Define the test item with no routes item = { - 'metadata': {'name': 'test', 'namespace': 'default', 'annotations': {}}, - 'spec': {'routes': []} + "metadata": {"name": "test", "namespace": "default", "annotations": {}}, + "spec": {"routes": []}, } - type_obj = 'IngressRoute' + type_obj = "IngressRoute" # Call the function under test process_routing_object(item, type_obj) @@ -61,5 +71,5 @@ def test_process_routing_object_empty(self, mock_delete_monitor, mock_create_or_ mock_delete_monitor.assert_not_called() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()