diff --git a/README.md b/README.md index 64a53dc2..a6505a1f 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,12 @@ python3 setup.py install * [ ] `user create ` (alias of `user modify ...`) * [x] [Server Version](https://matrix-org.github.io/synapse/develop/admin_api/version_api.html) * [x] `version` +* [x] [Registration Tokens](https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/registration_tokens.html) + * [x] `regtok list` + * [x] `regtok details ` + * [x] `regtok new` + * [x] `regtok update ` + * [x] `regtok delete ` @@ -342,4 +348,4 @@ https://github.com/JOJ0/synadm/blob/107d34b38de71d6d21d78141e78a1b19d3dd5379/syn ### Developer's documentation -Have a look at [synadm's module documentation pages on readthedocs](https://synadm.readthedocs.io/en/latest/index_modules.html) \ No newline at end of file +Have a look at [synadm's module documentation pages on readthedocs](https://synadm.readthedocs.io/en/latest/index_modules.html) diff --git a/doc/source/index_cli_reference.rst b/doc/source/index_cli_reference.rst index 58691155..1bb010c7 100644 --- a/doc/source/index_cli_reference.rst +++ b/doc/source/index_cli_reference.rst @@ -13,3 +13,4 @@ Command Line Reference synadm.cli.history synadm.cli.group synadm.cli.matrix + synadm.cli.regtok diff --git a/doc/source/synadm.cli.regtok.rst b/doc/source/synadm.cli.regtok.rst new file mode 100644 index 00000000..d5d0f64b --- /dev/null +++ b/doc/source/synadm.cli.regtok.rst @@ -0,0 +1,6 @@ +Regtok +====== + +.. click:: synadm.cli.regtok:regtok + :prog: synadm regtok + :nested: full diff --git a/synadm/api.py b/synadm/api.py index 4368c53b..86277f6d 100644 --- a/synadm/api.py +++ b/synadm/api.py @@ -651,3 +651,157 @@ def purge_history_status(self, purge_id): The status will be one of active, complete, or failed. """ return self.query("get", f"v1/purge_history_status/{purge_id}") + + def regtok_list(self, valid, readable_expiry): + """ List registration tokens + + Args: + valid (bool): List only valid (if True) or invalid (if False) + tokens. Default is to list all tokens regardless of validity. + readable_expiry (bool): If True, replace the expiry_time field with + a human readable datetime. If False, expiry_time will be a unix + timestamp. + + Returns: + string: JSON string containing the admin API's response or None if + an exception occured. See Synapse admin API docs for details. + + """ + result = self.query("get", "v1/registration_tokens", params={ + "valid": (str(valid).lower() if isinstance(valid, bool) else None) + }) + + # Change expiry_time to a human readable format if requested + if readable_expiry and result is not None and "registration_tokens" in result: + for i, regtok in enumerate(result["registration_tokens"]): + expiry_time = regtok["expiry_time"] + if expiry_time is not None: + result["registration_tokens"][i][ + "expiry_time" + ] = self._datetime_from_timestamp(expiry_time).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + return result + + def regtok_details(self, token, readable_expiry): + """ Get details about the given registration token + + Args: + token (string): The registration token in question + readable_expiry (bool): If True, replace the expiry_time field with + a human readable datetime. If False, expiry_time will be a unix + timestamp. + + Returns: + string: JSON string containing the admin API's response or None if + an exception occured. See Synapse admin API docs for details. + + """ + result = self.query("get", f"v1/registration_tokens/{token}") + + # Change expiry_time to a human readable format if requested + if readable_expiry and result is not None: + if result["expiry_time"] is not None: + result["expiry_time"] = self._datetime_from_timestamp( + result["expiry_time"] + ).strftime("%Y-%m-%d %H:%M:%S") + + return result + + def regtok_new(self, token, length, uses_allowed, expiry_ts, expire_at): + """ Create a new registration token + + Args: + token (string): Registration token to create. Default is randomly + generated by the server. + length (int): The length of the token to generate if the token is + not provided. + uses_allowed (int): The number of times the token can be used to + complete a registration before it becomes invalid. + expiry_ts (int): The latest time the registration token is valid. + Given as the number of milliseconds since + 1970-01-01 00:00:00 UTC. + expire_at (click.DateTime): The latest time the registration token + is valid. + + Returns: + string: JSON string containing the admin API's response or None if + an exception occured. See Synapse admin API docs for details. + + """ + data = { + "length": length, + "uses_allowed": uses_allowed, + } + + if expiry_ts: + self.log.debug(f"Received --expiry-ts: {expiry_ts}") + data["expiry_time"] = expiry_ts + elif expire_at: + self.log.debug(f"Received --expire-at: {expire_at}") + data["expiry_time"] = self._timestamp_from_datetime(expire_at) + else: + data["expiry_time"] = None + + # The token cannot be null, it must be a string + if isinstance(token, str): + data["token"] = token + + return self.query("post", "v1/registration_tokens/new", data=data) + + def regtok_update(self, token, uses_allowed, expiry_ts, expire_at): + """ Update a registration token + + Args: + token (string): Registration token to update. + uses_allowed (int): The number of times the token can be used to + complete a registration before it becomes invalid. + expiry_ts (int): The latest time the registration token is valid. + Given as the number of milliseconds since + 1970-01-01 00:00:00 UTC. -1 indicates no expiry. + expire_at (click.DateTime): The latest time the registration token + is valid. + + Returns: + string: JSON string containing the admin API's response or None if + an exception occured. See Synapse admin API docs for details. + + """ + # If uses_allowed or expiry time were not provided by the user, + # do not add the corresponding parameter to the request so that + # the server will not modify its value. + data = {} + + if uses_allowed == -1: + # A null value indicates unlimited uses + data["uses_allowed"] = None + elif uses_allowed is not None: + data["uses_allowed"] = uses_allowed + + if expiry_ts: + self.log.debug(f"Received --expiry-ts: {expiry_ts}") + if expiry_ts == -1: + # A null value indicates no expiry + data["expiry_time"] = None + else: + data["expiry_time"] = expiry_ts + elif expire_at: + self.log.debug(f"Received --expire-at: {expire_at}") + data["expiry_time"] = self._timestamp_from_datetime(expire_at) + + return self.query("put", f"v1/registration_tokens/{token}", data=data) + + + def regtok_delete(self, token): + """ Delete a registration token + + Args: + token (string): The registration token to delete + + Returns: + string: JSON string containing the admin API's response or None if + an exception occured. See Synapse admin API docs for details. + + """ + return self.query("delete", f"v1/registration_tokens/{token}") diff --git a/synadm/cli/__init__.py b/synadm/cli/__init__.py index bc2d3981..17b9f791 100644 --- a/synadm/cli/__init__.py +++ b/synadm/cli/__init__.py @@ -320,4 +320,4 @@ def version(helper): # Import additional commands -from synadm.cli import room, user, media, group, history, matrix +from synadm.cli import room, user, media, group, history, matrix, regtok diff --git a/synadm/cli/regtok.py b/synadm/cli/regtok.py new file mode 100644 index 00000000..c74c02a6 --- /dev/null +++ b/synadm/cli/regtok.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# synadm +# Copyright (C) 2021 Callum Brown +# +# synadm is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# synadm is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" Registration token-related CLI commands +""" + +import click +from click_option_group import optgroup, MutuallyExclusiveOptionGroup +from click_option_group import RequiredAnyOptionGroup + +from synadm import cli + + +@cli.root.group() +def regtok(): + """ Manage registration tokens + """ + + +@regtok.command(name="list") +@click.option( + "--valid/--invalid", "-v/-V", default=None, show_default=True, + help="List only valid/invalid tokens.") +@click.option( + "--datetime/--timestamp", "-d/-t", default=True, + help="""Display expiry time in a human readable format, or as a unix + timestamp in milliseconds. [default: datetime].""") +@click.pass_obj +def regtok_list_cmd(helper, valid, datetime): + """ List registration tokens + """ + regtoks = helper.api.regtok_list(valid, datetime) + if regtoks is None: + click.echo("Registration tokens could not be fetched.") + raise SystemExit(1) + if "registration_tokens" not in regtoks: + # Display error + helper.output(regtoks) + elif helper.output_format == "human": + if regtoks["registration_tokens"] == []: + click.echo("No registration tokens.") + else: + helper.output(regtoks["registration_tokens"]) + else: + helper.output(regtoks) + + +@regtok.command(name="details") +@click.argument("token", type=str) +@click.option( + "--datetime/--timestamp", "-d/-t", default=True, + help="""Display expiry time in a human readable format, or as a unix + timestamp in milliseconds. [default: datetime].""") +@click.pass_obj +def regtok_details_cmd(helper, token, datetime): + """ View details of the given token + """ + regtok = helper.api.regtok_details(token, datetime) + if regtok is None: + click.echo("Registration token could not be fetched.") + raise SystemExit(1) + helper.output(regtok) + + +@regtok.command(name="new") +@click.option( + "--token", "-t", type=str, default=None, + help="""Set the registration token. The default is a random value + generated by the server.""") +@click.option( + "--length", "-l", type=int, default=16, show_default=True, + help="""The length of the randomly generated token if the token is not + specified.""") +@click.option( + "--uses-allowed", "-u", type=int, default=None, + help="""The number of times the token can be used to complete a + registration before it becomes invalid. [default: unlimited uses]""") +@click.option( + "--expiry-ts", "-t", type=int, default=None, + help="""The latest time the registration token is valid. + Given as the number of milliseconds since 1970-01-01 00:00:00 UTC. + [default: no expiry]""") +@click.option( + "--expire-at", "-e", type=click.DateTime(), default=None, + help="""The latest time the registration token is valid. + See above for available date/time formats. [default: no expiry]""") +@click.pass_obj +def regtok_new(helper, token, length, uses_allowed, expiry_ts, expire_at): + """ Create a new registration token + """ + regtok = helper.api.regtok_new( + token, length, uses_allowed, expiry_ts, expire_at + ) + if regtok is None: + click.echo("Registration token could not be created.") + raise SystemExit(1) + helper.output(regtok) + + +@regtok.command(name="update") +@click.argument("token", type=str) +@click.option( + "--uses-allowed", "-u", type=int, default=None, + help="""The number of times the token can be used to complete a + registration before it becomes invalid. Use -1 for an unlimited + number of uses. [default: unchanged]""") +@click.option( + "--expiry-ts", "-t", type=int, default=None, + help="""The latest time the registration token is valid. + Given as the number of milliseconds since 1970-01-01 00:00:00 UTC. + Use -1 for no expiration. [default: unchanged]""") +@click.option( + "--expire-at", "-e", type=click.DateTime(), default=None, + help="""The latest time the registration token is valid. + See above for available date/time formats. [default: unchanged]""") +@click.pass_obj +def regtok_update(helper, token, uses_allowed, expiry_ts, expire_at): + """ Update a registration token + """ + regtok = helper.api.regtok_update(token, uses_allowed, expiry_ts, expire_at) + if regtok is None: + click.echo("Registration token could not be created.") + raise SystemExit(1) + helper.output(regtok) + + +@regtok.command(name="delete") +@click.argument("token", type=str) +@click.pass_obj +def regtok_delete(helper, token): + """ Delete a registration token + """ + response = helper.api.regtok_delete(token) + if response is None: + click.echo("Registration token could not be deleted.") + raise SystemExit(1) + if response == {}: + click.echo("Registration token successfully deleted.") + else: + helper.output(response)