diff --git a/aiida/backends/general/migrations/utils.py b/aiida/backends/general/migrations/utils.py index c59904078e..71eba1b530 100644 --- a/aiida/backends/general/migrations/utils.py +++ b/aiida/backends/general/migrations/utils.py @@ -252,12 +252,16 @@ def get_node_repository_dirpaths(basepath, shard=None): path = None if 'path' in subdirs and 'raw_input' in subdirs: - # If the `path` is empty, we simply ignore and set `raw_input` to be migrated, otherwise we add - # the entry to `contains_both` which will cause the migration to fail. - if os.listdir(dirpath / 'path'): - contains_both.append(str(dirpath)) - else: + # If the `path` folder is empty OR it contains *only* a `.gitignore`, we simply ignore and set + # `raw_input` to be migrated, otherwise we add the entry to `contains_both` which will cause the + # migration to fail. + # See issue #4910 (https://github.com/aiidateam/aiida-core/issues/4910) for more information on the + # `.gitignore` case. + path_contents = os.listdir(dirpath / 'path') + if not path_contents or path_contents == ['.gitignore']: path = dirpath / 'raw_input' + else: + contains_both.append(str(dirpath)) elif 'path' in subdirs: path = dirpath / 'path' elif 'raw_input' in subdirs: diff --git a/aiida/transports/plugins/local.py b/aiida/transports/plugins/local.py index 9f8b96a8c9..99de833254 100644 --- a/aiida/transports/plugins/local.py +++ b/aiida/transports/plugins/local.py @@ -759,27 +759,45 @@ def _exec_command_internal(self, command, **kwargs): # pylint: disable=unused-a return proc.stdin, proc.stdout, proc.stderr, proc - def exec_command_wait(self, command, **kwargs): + def exec_command_wait_bytes(self, command, stdin=None, **kwargs): """ Executes the specified command and waits for it to finish. :param command: the command to execute - :return: a tuple with (return_value, stdout, stderr) where stdout and - stderr are strings. + :return: a tuple with (return_value, stdout, stderr) where stdout and stderr + are both bytes and the return_value is an int. """ - stdin = kwargs.get('stdin') local_stdin, _, _, local_proc = self._exec_command_internal(command) if stdin is not None: + # Implicitly assume that the desired encoding is 'utf-8' if I receive a string. + # Also, if I get a StringIO, I just read it all in memory and put it into a BytesIO. + # Clearly not memory effective - in this case do not use a StringIO, but pass directly a BytesIO + # that will be read line by line, if you have a huge stdin and care about memory usage. if isinstance(stdin, str): - filelike_stdin = io.StringIO(stdin) - else: + filelike_stdin = io.BytesIO(stdin.encode('utf-8')) + elif isinstance(stdin, bytes): + filelike_stdin = io.BytesIO(stdin) + elif isinstance(stdin, io.TextIOBase): + + def line_encoder(iterator, encoding='utf-8'): + """Encode the iterator item by item (i.e., line by line). + + This only wraps iterating over it and not all other methods, but it's enough for its + use below.""" + for line in iterator: + yield line.encode(encoding) + + filelike_stdin = line_encoder(stdin) + elif isinstance(stdin, io.BufferedIOBase): filelike_stdin = stdin + else: + raise ValueError('You can only pass strings, bytes, BytesIO or StringIO objects') try: - for line in filelike_stdin.readlines(): - local_stdin.write(line.encode('utf-8')) # the Popen.stdin/out/err are byte streams + for line in filelike_stdin: + local_stdin.write(line) # the Popen.stdin/out/err are byte streams except AttributeError: raise ValueError('stdin can only be either a string or a file-like object!') else: @@ -790,7 +808,7 @@ def exec_command_wait(self, command, **kwargs): retval = local_proc.returncode - return retval, output_text.decode('utf-8'), stderr_text.decode('utf-8') + return retval, output_text, stderr_text def gotocomputer_command(self, remotedir): """ diff --git a/aiida/transports/plugins/ssh.py b/aiida/transports/plugins/ssh.py index ada3650186..c6a37f76a3 100644 --- a/aiida/transports/plugins/ssh.py +++ b/aiida/transports/plugins/ssh.py @@ -677,7 +677,7 @@ def rmtree(self, path): command = f'{rm_exe} {rm_flags} {escape_for_bash(path)}' - retval, stdout, stderr = self.exec_command_wait(command) + retval, stdout, stderr = self.exec_command_wait_bytes(command) if retval == 0: if stderr.strip(): @@ -1133,7 +1133,7 @@ def _exec_cp(self, cp_exe, cp_flags, src, dst): # to simplify writing the above copy function command = f'{cp_exe} {cp_flags} {escape_for_bash(src)} {escape_for_bash(dst)}' - retval, stdout, stderr = self.exec_command_wait(command) + retval, stdout, stderr = self.exec_command_wait_bytes(command) if retval == 0: if stderr.strip(): @@ -1283,7 +1283,7 @@ def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1): # return stdin, stdout, stderr, channel - def exec_command_wait(self, command, stdin=None, combine_stderr=False, bufsize=-1): # pylint: disable=arguments-differ + def exec_command_wait_bytes(self, command, stdin=None, combine_stderr=False, bufsize=-1): # pylint: disable=arguments-differ, too-many-branches """ Executes the specified command and waits for it to finish. @@ -1295,35 +1295,101 @@ def exec_command_wait(self, command, stdin=None, combine_stderr=False, bufsize=- :param bufsize: same meaning of paramiko. :return: a tuple with (return_value, stdout, stderr) where stdout and stderr - are strings. + are both bytes and the return_value is an int. """ + import socket + import time + ssh_stdin, stdout, stderr, channel = self._exec_command_internal(command, combine_stderr, bufsize=bufsize) if stdin is not None: if isinstance(stdin, str): filelike_stdin = io.StringIO(stdin) - else: + elif isinstance(stdin, bytes): + filelike_stdin = io.BytesIO(stdin) + elif isinstance(stdin, (io.BufferedIOBase, io.TextIOBase)): + # It seems both StringIO and BytesIO work correctly when doing ssh_stdin.write(line)? + # (The ChannelFile is opened with mode 'b', but until now it always has been a StringIO) filelike_stdin = stdin + else: + raise ValueError('You can only pass strings, bytes, BytesIO or StringIO objects') - try: - for line in filelike_stdin.readlines(): - ssh_stdin.write(line) - except AttributeError: - raise ValueError('stdin can only be either a string of a file-like object!') + for line in filelike_stdin: + ssh_stdin.write(line) # I flush and close them anyway; important to call shutdown_write # to avoid hangouts ssh_stdin.flush() ssh_stdin.channel.shutdown_write() + # Now I get the output + stdout_bytes = [] + stderr_bytes = [] + # 100kB buffer (note that this should be smaller than the window size of paramiko) + # Also, apparently if the data is coming slowly, the read() command will not unlock even for + # times much larger than the timeout. Therefore we don't want to have very large buffers otherwise + # you risk that a lot of output is sent to both stdout and stderr, and stderr goes beyond the + # window size and blocks. + # Note that this is different than the bufsize of paramiko. + internal_bufsize = 100 * 1024 + + # Set a small timeout on the channels, so that if we get data from both + # stderr and stdout, and the connection is slow, we interleave the receive and don't hang + # NOTE: Timeouts and sleep time below, as well as the internal_bufsize above, have been benchmarked + # to try to optimize the overall throughput. I could get ~100MB/s on a localhost via ssh (and 3x slower + # if compression is enabled). + # It's important to mention that, for speed benchmarks, it's important to disable compression + # in the SSH transport settings, as it will cap the max speed. + stdout.channel.settimeout(0.01) + stderr.channel.settimeout(0.01) # Maybe redundant, as this could be the same channel. + + while True: + chunk_exists = False + + if stdout.channel.recv_ready(): # True means that the next .read call will at least receive 1 byte + chunk_exists = True + try: + piece = stdout.read(internal_bufsize) + stdout_bytes.append(piece) + except socket.timeout: + # There was a timeout: I continue as there should still be data + pass + + if stderr.channel.recv_stderr_ready(): # True means that the next .read call will at least receive 1 byte + chunk_exists = True + try: + piece = stderr.read(internal_bufsize) + stderr_bytes.append(piece) + except socket.timeout: + # There was a timeout: I continue as there should still be data + pass + + # If chunk_exists, there is data (either already read and put in the std*_bytes lists, or + # still in the buffer because of a timeout). I need to loop. + # Otherwise, there is no data in the buffers, and I enter this block. + if not chunk_exists: + # Both channels have no data in the buffer + if channel.exit_status_ready(): + # The remote execution is over + + # I think that in some corner cases there might still be some data, + # in case the data arrived between the previous calls and this check. + # So we do a final read. Since the execution is over, I think all data is in the buffers, + # so we can just read the whole buffer without loops + stdout_bytes.append(stdout.read()) + stderr_bytes.append(stderr.read()) + # And we go out of the `while True` loop + break + # The exit status is not ready: + # I just put a small sleep to avoid infinite fast loops when data + # is not available on a slow connection, and loop + time.sleep(0.01) + # I get the return code (blocking) + # However, if I am here, the exit status is ready so this should be returning very quickly retval = channel.recv_exit_status() - # needs to be after 'recv_exit_status', otherwise it might hang - output_text = stdout.read().decode('utf-8') - stderr_text = stderr.read().decode('utf-8') - - return retval, output_text, stderr_text + return (retval, b''.join(stdout_bytes), b''.join(stderr_bytes)) def gotocomputer_command(self, remotedir): """ diff --git a/aiida/transports/transport.py b/aiida/transports/transport.py index 9b8ad0a51a..7e82d81dfb 100644 --- a/aiida/transports/transport.py +++ b/aiida/transports/transport.py @@ -398,19 +398,45 @@ def _exec_command_internal(self, command, **kwargs): """ raise NotImplementedError - def exec_command_wait(self, command, **kwargs): + def exec_command_wait_bytes(self, command, stdin=None, **kwargs): """ Execute the command on the shell, waits for it to finish, - and return the retcode, the stdout and the stderr. + and return the retcode, the stdout and the stderr as bytes. - Enforce the execution to be run from the pwd (as given by - self.getcwd), if this is not None. + Enforce the execution to be run from the pwd (as given by self.getcwd), if this is not None. + + The command implementation can have some additional plugin-specific kwargs. :param str command: execute the command given as a string - :return: a list: the retcode (int), stdout (str) and stderr (str). + :param stdin: (optional,default=None) can be a string or a file-like object. + :return: a tuple: the retcode (int), stdout (bytes) and stderr (bytes). """ raise NotImplementedError + def exec_command_wait(self, command, stdin=None, encoding='utf-8', **kwargs): + """ + Executes the specified command and waits for it to finish. + + :note: this function also decodes the bytes received into a string with the specified encoding, + which is set to be ``utf-8`` by default (for backward-compatibility with earlier versions) of + AiiDA. + Use this method only if you are sure that you are getting a properly encoded string; otherwise, + use the ``exec_command_wait_bytes`` method that returns the undecoded byte stream. + + :note: additional kwargs are passed to the ``exec_command_wait_bytes`` function, that might use them + depending on the plugin. + + :param command: the command to execute + :param stdin: (optional,default=None) can be a string or a file-like object. + :param encoding: the encoding to use to decode the byte stream received from the remote command execution. + + :return: a tuple with (return_value, stdout, stderr) where stdout and stderr are both strings, decoded + with the specified encoding. + """ + retval, stdout_bytes, stderr_bytes = self.exec_command_wait_bytes(command=command, stdin=stdin, **kwargs) + # Return the decoded strings + return (retval, stdout_bytes.decode(encoding), stderr_bytes.decode(encoding)) + def get(self, remotepath, localpath, *args, **kwargs): """ Retrieve a file or folder from remote source to local destination @@ -689,6 +715,8 @@ def whoami(self): """ command = 'whoami' + # Assuming here that the username is either ASCII or UTF-8 encoded + # This should be true essentially always retval, username, stderr = self.exec_command_wait(command) if retval == 0: if stderr.strip(): diff --git a/docs/source/intro/install_conda.rst b/docs/source/intro/install_conda.rst index 08c99cb938..f559777efb 100644 --- a/docs/source/intro/install_conda.rst +++ b/docs/source/intro/install_conda.rst @@ -6,6 +6,7 @@ Installation into Conda environment This installation route installs all necessary software -- including the prerequisite services PostgreSQL and RabbitMQ -- into a Conda environment. This is the recommended method for users on shared systems and systems where the user has no administrative privileges. +If you want to install AiiDA onto you own personal workstation/laptop, it is recommanded to use the :ref:`system-wide installation `. .. important:: @@ -34,7 +35,7 @@ This is the recommended method for users on shared systems and systems where the (aiida) $ initdb -D mylocal_db - This *database cluster* may contain a collection of databases (one per profile) that is managed by a single running server process. + This *database cluster* (located inside a folder named ``mylocal_db``) may contain a collection of databases (one per profile) that is managed by a single running server process. We start this process with: .. code-block:: console @@ -47,25 +48,19 @@ This is the recommended method for users on shared systems and systems where the - `Creating a Database Cluster `__. - `Starting the Database Server `__. - Then, start the RabbitMQ server: - - .. code-block:: console - (aiida) $ rabbitmq-server -detached - Finally, start the AiiDA daemon(s): + Then, start the RabbitMQ server: .. code-block:: console - (aiida) $ verdi daemon start 2 + (aiida) $ rabbitmq-server -detached .. important:: - The verdi daemon(s) must be restarted after a system reboot. - - .. tip:: - - Do not start more daemons then there are physical processors on your system. + The services started this way will use the default ports on the machine. + Conflicts may happen if there are more than one user running AiiDA this way on the same machine, or you already have the server running in a system-wide installation. + To get around this issue, you can explicitly define the ports to be used. --- @@ -84,11 +79,30 @@ This is the recommended method for users on shared systems and systems where the Last name: name Institution: where-i-work + .. tip:: + + In case of non-default ports are used for the *database cluster* and the RabbitMQ server, you can pass them using ``--db-port`` and ``--broker-port`` options respectively. + + .. admonition:: Is AiiDA unable to auto-detect the PostgreSQL setup? :class: attention title-icon-troubleshoot If you get an error saying that AiiDA has trouble autodetecting the PostgreSQL setup, you will need to do the manual setup explained in the :ref:`troubleshooting section`. + Once the profile is up and running, you can start the AiiDA daemon(s): + + .. code-block:: console + + (aiida) $ verdi daemon start 2 + + .. important:: + + The verdi daemon(s) must be restarted after a system reboot. + + .. tip:: + + Do not start more daemons then there are physical processors on your system. + --- **Check setup** @@ -117,13 +131,32 @@ This is the recommended method for users on shared systems and systems where the :text: What's next? :classes: btn-outline-primary btn-block font-weight-bold + --- **Shut-down services** - After finishing with your aiida session, particularly if switching between profiles, you may wish to power down the services: + After finishing with your aiida session, particularly if switching between profiles, you may wish to power down the daemon and the services: .. code-block:: console (aiida) $ verdi daemon stop - (aiida) $ pg_ctl stop + (aiida) $ pg_ctl -D mylocal_db stop + (aiida) $ rabbitmqctl stop + + + --- + + **Restart the services** + + If you want to restart the services and the daemon: + + .. code-block:: console + + (aiida) $ pg_ctl -D mylocal_db start + (aiida) $ rabbitmq-server -detached + (aiida) $ verdi daemon start + + .. tip:: + + If different ports are used, you have to pass them here as well. diff --git a/docs/source/topics/transport_template.py b/docs/source/topics/transport_template.py index 1c29cc0500..378d98857a 100644 --- a/docs/source/topics/transport_template.py +++ b/docs/source/topics/transport_template.py @@ -58,13 +58,14 @@ def copytree(self, remotesource, remotedestination, *args, **kwargs): :raise IOError: if one of src or dst does not exist """ - def exec_command_wait(self, command, **kwargs): + def exec_command_wait_bytes(self, command, stdin=None, **kwargs): """Execute the command on the shell, wait for it to finish and return the retcode, the stdout and the stderr. Enforce the execution to be run from the pwd (as given by ``self.getcwd``), if this is not None. :param str command: execute the command given as a string - :return: a tuple: the retcode (int), stdout (str) and stderr (str). + :param stdin: (optional,default=None) can be a string or a file-like object. + :return: a tuple: the retcode (int), stdout (bytes) and stderr (bytes). """ def get_attribute(self, path): diff --git a/tests/backends/aiida_django/migrations/test_migrations_0047_migrate_repository.py b/tests/backends/aiida_django/migrations/test_migrations_0047_migrate_repository.py index b62c402fb8..4c6774b6ad 100644 --- a/tests/backends/aiida_django/migrations/test_migrations_0047_migrate_repository.py +++ b/tests/backends/aiida_django/migrations/test_migrations_0047_migrate_repository.py @@ -10,6 +10,7 @@ # pylint: disable=import-error,no-name-in-module,invalid-name """Test migration of the old file repository to the disk object store.""" import hashlib +import os from aiida.backends.general.migrations import utils from .test_migrations_common import TestMigrations @@ -33,14 +34,25 @@ def setUpBeforeMigration(self): dbnode_02.save() dbnode_03 = DbNode(user_id=self.default_user.id) dbnode_03.save() + dbnode_04 = DbNode(user_id=self.default_user.id) + dbnode_04.save() self.node_01_pk = dbnode_01.pk self.node_02_pk = dbnode_02.pk self.node_03_pk = dbnode_03.pk + self.node_04_pk = dbnode_04.pk utils.put_object_from_string(dbnode_01.uuid, 'sub/path/file_b.txt', 'b') utils.put_object_from_string(dbnode_01.uuid, 'sub/file_a.txt', 'a') utils.put_object_from_string(dbnode_02.uuid, 'output.txt', 'output') + utils.put_object_from_string(dbnode_04.uuid, '.gitignore', 'test') + + # If both `path` and `raw_input` subfolders are present and `.gitignore` is in `path`, it should be ignored. + # Cannot use `put_object_from_string` here as it statically writes under the `path` folder. + raw_input_sub_folder = utils.get_node_repository_sub_folder(dbnode_04.uuid, subfolder='raw_input') + os.makedirs(raw_input_sub_folder, exist_ok=True) + with open(os.path.join(raw_input_sub_folder, 'input.txt'), 'w', encoding='utf-8') as handle: + handle.write('input') # When multiple migrations are ran, it is possible that migration 0047 is run at a point where the repository # container does not have a UUID (at that point in the migration) and so the setting gets set to `None`. This @@ -56,6 +68,7 @@ def test_migration(self): node_01 = DbNode.objects.get(pk=self.node_01_pk) node_02 = DbNode.objects.get(pk=self.node_02_pk) node_03 = DbNode.objects.get(pk=self.node_03_pk) + node_04 = DbNode.objects.get(pk=self.node_04_pk) assert node_01.repository_metadata == { 'o': { @@ -83,11 +96,19 @@ def test_migration(self): } } assert node_03.repository_metadata == {} + assert node_04.repository_metadata == { + 'o': { + 'input.txt': { + 'k': hashlib.sha256('input'.encode('utf-8')).hexdigest() + } + } + } for hashkey, content in ( (node_01.repository_metadata['o']['sub']['o']['path']['o']['file_b.txt']['k'], b'b'), (node_01.repository_metadata['o']['sub']['o']['file_a.txt']['k'], b'a'), (node_02.repository_metadata['o']['output.txt']['k'], b'output'), + (node_04.repository_metadata['o']['input.txt']['k'], b'input'), ): assert utils.get_repository_object(hashkey) == content diff --git a/tests/backends/aiida_sqlalchemy/test_migrations.py b/tests/backends/aiida_sqlalchemy/test_migrations.py index 3e453dfcb6..6175f09a86 100644 --- a/tests/backends/aiida_sqlalchemy/test_migrations.py +++ b/tests/backends/aiida_sqlalchemy/test_migrations.py @@ -1816,22 +1816,26 @@ def setUpBeforeMigration(self): node_02 = DbNode(user_id=default_user.id, uuid=get_new_uuid()) node_03 = DbNode(user_id=default_user.id, uuid=get_new_uuid()) node_04 = DbNode(user_id=default_user.id, uuid=get_new_uuid()) + node_05 = DbNode(user_id=default_user.id, uuid=get_new_uuid()) session.add(node_01) session.add(node_02) session.add(node_03) # Empty repository folder session.add(node_04) # Both `path` and `raw_input` subfolder + session.add(node_05) # Both `path` and `raw_input` subfolder & `.gitignore` in `path` session.commit() assert node_01.uuid is not None assert node_02.uuid is not None assert node_03.uuid is not None assert node_04.uuid is not None + assert node_05.uuid is not None self.node_01_pk = node_01.id self.node_02_pk = node_02.id self.node_03_pk = node_03.id self.node_04_pk = node_04.id + self.node_05_pk = node_05.id utils.put_object_from_string(node_01.uuid, 'sub/path/file_b.txt', 'b') utils.put_object_from_string(node_01.uuid, 'sub/file_a.txt', 'a') @@ -1839,6 +1843,17 @@ def setUpBeforeMigration(self): os.makedirs(utils.get_node_repository_sub_folder(node_04.uuid, 'path'), exist_ok=True) os.makedirs(utils.get_node_repository_sub_folder(node_04.uuid, 'raw_input'), exist_ok=True) + os.makedirs(utils.get_node_repository_sub_folder(node_05.uuid, 'path'), exist_ok=True) + os.makedirs(utils.get_node_repository_sub_folder(node_05.uuid, 'raw_input'), exist_ok=True) + + utils.put_object_from_string(node_05.uuid, '.gitignore', 'test') + with open( + os.path.join( + utils.get_node_repository_sub_folder(node_05.uuid, 'raw_input'), 'input.txt'), + 'w', + encoding='utf-8', + ) as handle: + handle.write('input') # Add a repository folder for a node that no longer exists - i.e. it may have been deleted. utils.put_object_from_string(get_new_uuid(), 'file_of_deleted_node', 'output') @@ -1859,6 +1874,7 @@ def test_migration(self): node_01 = session.query(DbNode).filter(DbNode.id == self.node_01_pk).one() node_02 = session.query(DbNode).filter(DbNode.id == self.node_02_pk).one() node_03 = session.query(DbNode).filter(DbNode.id == self.node_03_pk).one() + node_05 = session.query(DbNode).filter(DbNode.id == self.node_05_pk).one() assert node_01.repository_metadata == { 'o': { @@ -1886,11 +1902,19 @@ def test_migration(self): } } assert node_03.repository_metadata == {} + assert node_05.repository_metadata == { + 'o': { + 'input.txt': { + 'k': hashlib.sha256('input'.encode('utf-8')).hexdigest() + } + } + } for hashkey, content in ( (node_01.repository_metadata['o']['sub']['o']['path']['o']['file_b.txt']['k'], b'b'), (node_01.repository_metadata['o']['sub']['o']['file_a.txt']['k'], b'a'), (node_02.repository_metadata['o']['output.txt']['k'], b'output'), + (node_05.repository_metadata['o']['input.txt']['k'], b'input'), ): assert utils.get_repository_object(hashkey) == content diff --git a/tests/transports/test_all_plugins.py b/tests/transports/test_all_plugins.py index 7470dd15c9..cf1456ec43 100644 --- a/tests/transports/test_all_plugins.py +++ b/tests/transports/test_all_plugins.py @@ -1245,7 +1245,7 @@ def test_exec_pwd(self, custom_transport): @run_for_all_plugins def test_exec_with_stdin_string(self, custom_transport): """Test command execution with a stdin string.""" - test_string = str('some_test String') + test_string = 'some_test String' with custom_transport as transport: retcode, stdout, stderr = transport.exec_command_wait('cat', stdin=test_string) self.assertEqual(retcode, 0) @@ -1253,14 +1253,18 @@ def test_exec_with_stdin_string(self, custom_transport): self.assertEqual(stderr, '') @run_for_all_plugins - def test_exec_with_stdin_unicode(self, custom_transport): - """Test command execution with a unicode stdin string.""" - test_string = 'some_test String' + def test_exec_with_stdin_bytes(self, custom_transport): + """Test command execution with a stdin bytes. + + I test directly the exec_command_wait_bytes function; I also pass some non-unicode + bytes to check that there is no internal implicit encoding/decoding in the code. + """ + test_string = b'some_test bytes with non-unicode -> \xFA' with custom_transport as transport: - retcode, stdout, stderr = transport.exec_command_wait('cat', stdin=test_string) + retcode, stdout, stderr = transport.exec_command_wait_bytes('cat', stdin=test_string) self.assertEqual(retcode, 0) self.assertEqual(stdout, test_string) - self.assertEqual(stderr, '') + self.assertEqual(stderr, b'') @run_for_all_plugins def test_exec_with_stdin_filelike(self, custom_transport): @@ -1273,6 +1277,42 @@ def test_exec_with_stdin_filelike(self, custom_transport): self.assertEqual(stdout, test_string) self.assertEqual(stderr, '') + @run_for_all_plugins + def test_exec_with_stdin_filelike_bytes(self, custom_transport): + """Test command execution with a stdin from filelike of type bytes. + + I test directly the exec_command_wait_bytes function; I also pass some non-unicode + bytes to check that there is no place in the code where this non-unicode string is + implicitly converted to unicode (temporarily, and then converted back) - + if this happens somewhere, that code would fail (as the test_string + cannot be decoded to UTF8). (Note: we cannot test for all encodings, we test for + unicode hoping that this would already catch possible issues.) + """ + test_string = b'some_test bytes with non-unicode -> \xFA' + stdin = io.BytesIO(test_string) + with custom_transport as transport: + retcode, stdout, stderr = transport.exec_command_wait_bytes('cat', stdin=stdin) + self.assertEqual(retcode, 0) + self.assertEqual(stdout, test_string) + self.assertEqual(stderr, b'') + + @run_for_all_plugins + def test_exec_with_stdin_filelike_bytes_decoding(self, custom_transport): + """Test command execution with a stdin from filelike of type bytes, trying to decode. + + I test directly the exec_command_wait_bytes function; I also pass some non-unicode + bytes to check that there is no place in the code where this non-unicode string is + implicitly converted to unicode (temporarily, and then converted back) - + if this happens somewhere, that code would fail (as the test_string + cannot be decoded to UTF8). (Note: we cannot test for all encodings, we test for + unicode hoping that this would already catch possible issues.) + """ + test_string = b'some_test bytes with non-unicode -> \xFA' + stdin = io.BytesIO(test_string) + with custom_transport as transport: + with self.assertRaises(UnicodeDecodeError): + transport.exec_command_wait('cat', stdin=stdin, encoding='utf-8') + @run_for_all_plugins def test_exec_with_wrong_stdin(self, custom_transport): """Test command execution with incorrect stdin string.""" @@ -1281,6 +1321,98 @@ def test_exec_with_wrong_stdin(self, custom_transport): with self.assertRaises(ValueError): transport.exec_command_wait('cat', stdin=1) + @run_for_all_plugins + def test_transfer_big_stdout(self, custom_transport): # pylint: disable=too-many-locals + """Test the transfer of a large amount of data on stdout.""" + # Create a "big" file of > 2MB (10MB here; in general, larger than the buffer size) + min_file_size_bytes = 5 * 1024 * 1024 + # The file content will be a sequence of these lines, until the size + # is > MIN_FILE_SIZE_BYTES + file_line = 'This is a Unicødê štring\n' + fname = 'test.dat' + script_fname = 'script.py' + + # I create a large content of the file (as a string) + file_line_binary = file_line.encode('utf8') + line_repetitions = (min_file_size_bytes // len(file_line_binary) + 1) + fcontent = (file_line_binary * line_repetitions).decode('utf8') + + with custom_transport as trans: + # We cannot use tempfile.mkdtemp because we're on a remote folder + location = trans.normalize(os.path.join('/', 'tmp')) + trans.chdir(location) + self.assertEqual(location, trans.getcwd()) + + directory = 'temp_dir_test_transfer_big_stdout' + while trans.isdir(directory): + # I append a random letter/number until it is unique + directory += random.choice(string.ascii_uppercase + string.digits) + trans.mkdir(directory) + trans.chdir(directory) + + with tempfile.NamedTemporaryFile(mode='wb') as tmpf: + tmpf.write(fcontent.encode('utf8')) + tmpf.flush() + + # I put a file with specific content there at the right file name + trans.putfile(tmpf.name, fname) + + python_code = r"""import sys + +# disable buffering is only allowed in binary +#stdout = open(sys.stdout.fileno(), mode="wb", buffering=0) +#stderr = open(sys.stderr.fileno(), mode="wb", buffering=0) +# Use these lines instead if you want to use buffering +# I am leaving these in as most programs typically are buffered +stdout = open(sys.stdout.fileno(), mode="wb") +stderr = open(sys.stderr.fileno(), mode="wb") + +line = '''{}'''.encode('utf-8') + +for i in range({}): + stdout.write(line) + stderr.write(line) +""".format(file_line, line_repetitions) + + with tempfile.NamedTemporaryFile(mode='w') as tmpf: + tmpf.write(python_code) + tmpf.flush() + + # I put a file with specific content there at the right file name + trans.putfile(tmpf.name, script_fname) + + # I get its content via the stdout; emulate also network slowness (note I cat twice) + retcode, stdout, stderr = trans.exec_command_wait(f'cat {fname} ; sleep 1 ; cat {fname}') + self.assertEqual(stderr, '') + self.assertEqual(stdout, fcontent + fcontent) + self.assertEqual(retcode, 0) + + # I get its content via the stderr; emulate also network slowness (note I cat twice) + retcode, stdout, stderr = trans.exec_command_wait(f'cat {fname} >&2 ; sleep 1 ; cat {fname} >&2') + self.assertEqual(stderr, fcontent + fcontent) + self.assertEqual(stdout, '') + self.assertEqual(retcode, 0) + + # This time, I cat one one on each of the two streams intermittently, to check + # that this does not hang. + + # Initially I was using a command like + # 'i=0; while [ "$i" -lt {} ] ; do let i=i+1; echo -n "{}" ; echo -n "{}" >&2 ; done'.format( + # line_repetitions, file_line, file_line)) + # However this is pretty slow (and using 'cat' of a file containing only one line is even slower) + + retcode, stdout, stderr = trans.exec_command_wait(f'python3 {script_fname}') + + self.assertEqual(stderr, fcontent) + self.assertEqual(stdout, fcontent) + self.assertEqual(retcode, 0) + + # Clean-up + trans.remove(fname) + trans.remove(script_fname) + trans.chdir('..') + trans.rmdir(directory) + class TestDirectScheduler(unittest.TestCase): """