Skip to content

Commit

Permalink
Merge branch 'develop' into fix/node-user
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisjsewell authored Jun 16, 2021
2 parents 027f6e8 + 28ef3a5 commit bc6eeb0
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 56 deletions.
14 changes: 9 additions & 5 deletions aiida/backends/general/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,16 @@ def get_node_repository_dirpaths(basepath, shard=None):
path = None

if 'path' in subdirs and 'raw_input' in subdirs:
# If the `path` is empty, we simply ignore and set `raw_input` to be migrated, otherwise we add
# the entry to `contains_both` which will cause the migration to fail.
if os.listdir(dirpath / 'path'):
contains_both.append(str(dirpath))
else:
# If the `path` folder is empty OR it contains *only* a `.gitignore`, we simply ignore and set
# `raw_input` to be migrated, otherwise we add the entry to `contains_both` which will cause the
# migration to fail.
# See issue #4910 (https://github.com/aiidateam/aiida-core/issues/4910) for more information on the
# `.gitignore` case.
path_contents = os.listdir(dirpath / 'path')
if not path_contents or path_contents == ['.gitignore']:
path = dirpath / 'raw_input'
else:
contains_both.append(str(dirpath))
elif 'path' in subdirs:
path = dirpath / 'path'
elif 'raw_input' in subdirs:
Expand Down
36 changes: 27 additions & 9 deletions aiida/transports/plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,27 +759,45 @@ def _exec_command_internal(self, command, **kwargs): # pylint: disable=unused-a

return proc.stdin, proc.stdout, proc.stderr, proc

def exec_command_wait(self, command, **kwargs):
def exec_command_wait_bytes(self, command, stdin=None, **kwargs):
"""
Executes the specified command and waits for it to finish.
:param command: the command to execute
:return: a tuple with (return_value, stdout, stderr) where stdout and
stderr are strings.
:return: a tuple with (return_value, stdout, stderr) where stdout and stderr
are both bytes and the return_value is an int.
"""
stdin = kwargs.get('stdin')
local_stdin, _, _, local_proc = self._exec_command_internal(command)

if stdin is not None:
# Implicitly assume that the desired encoding is 'utf-8' if I receive a string.
# Also, if I get a StringIO, I just read it all in memory and put it into a BytesIO.
# Clearly not memory effective - in this case do not use a StringIO, but pass directly a BytesIO
# that will be read line by line, if you have a huge stdin and care about memory usage.
if isinstance(stdin, str):
filelike_stdin = io.StringIO(stdin)
else:
filelike_stdin = io.BytesIO(stdin.encode('utf-8'))
elif isinstance(stdin, bytes):
filelike_stdin = io.BytesIO(stdin)
elif isinstance(stdin, io.TextIOBase):

def line_encoder(iterator, encoding='utf-8'):
"""Encode the iterator item by item (i.e., line by line).
This only wraps iterating over it and not all other methods, but it's enough for its
use below."""
for line in iterator:
yield line.encode(encoding)

filelike_stdin = line_encoder(stdin)
elif isinstance(stdin, io.BufferedIOBase):
filelike_stdin = stdin
else:
raise ValueError('You can only pass strings, bytes, BytesIO or StringIO objects')

try:
for line in filelike_stdin.readlines():
local_stdin.write(line.encode('utf-8')) # the Popen.stdin/out/err are byte streams
for line in filelike_stdin:
local_stdin.write(line) # the Popen.stdin/out/err are byte streams
except AttributeError:
raise ValueError('stdin can only be either a string or a file-like object!')
else:
Expand All @@ -790,7 +808,7 @@ def exec_command_wait(self, command, **kwargs):

retval = local_proc.returncode

return retval, output_text.decode('utf-8'), stderr_text.decode('utf-8')
return retval, output_text, stderr_text

def gotocomputer_command(self, remotedir):
"""
Expand Down
96 changes: 81 additions & 15 deletions aiida/transports/plugins/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ def rmtree(self, path):

command = f'{rm_exe} {rm_flags} {escape_for_bash(path)}'

retval, stdout, stderr = self.exec_command_wait(command)
retval, stdout, stderr = self.exec_command_wait_bytes(command)

if retval == 0:
if stderr.strip():
Expand Down Expand Up @@ -1133,7 +1133,7 @@ def _exec_cp(self, cp_exe, cp_flags, src, dst):
# to simplify writing the above copy function
command = f'{cp_exe} {cp_flags} {escape_for_bash(src)} {escape_for_bash(dst)}'

retval, stdout, stderr = self.exec_command_wait(command)
retval, stdout, stderr = self.exec_command_wait_bytes(command)

if retval == 0:
if stderr.strip():
Expand Down Expand Up @@ -1283,7 +1283,7 @@ def _exec_command_internal(self, command, combine_stderr=False, bufsize=-1): #

return stdin, stdout, stderr, channel

def exec_command_wait(self, command, stdin=None, combine_stderr=False, bufsize=-1): # pylint: disable=arguments-differ
def exec_command_wait_bytes(self, command, stdin=None, combine_stderr=False, bufsize=-1): # pylint: disable=arguments-differ, too-many-branches
"""
Executes the specified command and waits for it to finish.
Expand All @@ -1295,35 +1295,101 @@ def exec_command_wait(self, command, stdin=None, combine_stderr=False, bufsize=-
:param bufsize: same meaning of paramiko.
:return: a tuple with (return_value, stdout, stderr) where stdout and stderr
are strings.
are both bytes and the return_value is an int.
"""
import socket
import time

ssh_stdin, stdout, stderr, channel = self._exec_command_internal(command, combine_stderr, bufsize=bufsize)

if stdin is not None:
if isinstance(stdin, str):
filelike_stdin = io.StringIO(stdin)
else:
elif isinstance(stdin, bytes):
filelike_stdin = io.BytesIO(stdin)
elif isinstance(stdin, (io.BufferedIOBase, io.TextIOBase)):
# It seems both StringIO and BytesIO work correctly when doing ssh_stdin.write(line)?
# (The ChannelFile is opened with mode 'b', but until now it always has been a StringIO)
filelike_stdin = stdin
else:
raise ValueError('You can only pass strings, bytes, BytesIO or StringIO objects')

try:
for line in filelike_stdin.readlines():
ssh_stdin.write(line)
except AttributeError:
raise ValueError('stdin can only be either a string of a file-like object!')
for line in filelike_stdin:
ssh_stdin.write(line)

# I flush and close them anyway; important to call shutdown_write
# to avoid hangouts
ssh_stdin.flush()
ssh_stdin.channel.shutdown_write()

# Now I get the output
stdout_bytes = []
stderr_bytes = []
# 100kB buffer (note that this should be smaller than the window size of paramiko)
# Also, apparently if the data is coming slowly, the read() command will not unlock even for
# times much larger than the timeout. Therefore we don't want to have very large buffers otherwise
# you risk that a lot of output is sent to both stdout and stderr, and stderr goes beyond the
# window size and blocks.
# Note that this is different than the bufsize of paramiko.
internal_bufsize = 100 * 1024

# Set a small timeout on the channels, so that if we get data from both
# stderr and stdout, and the connection is slow, we interleave the receive and don't hang
# NOTE: Timeouts and sleep time below, as well as the internal_bufsize above, have been benchmarked
# to try to optimize the overall throughput. I could get ~100MB/s on a localhost via ssh (and 3x slower
# if compression is enabled).
# It's important to mention that, for speed benchmarks, it's important to disable compression
# in the SSH transport settings, as it will cap the max speed.
stdout.channel.settimeout(0.01)
stderr.channel.settimeout(0.01) # Maybe redundant, as this could be the same channel.

while True:
chunk_exists = False

if stdout.channel.recv_ready(): # True means that the next .read call will at least receive 1 byte
chunk_exists = True
try:
piece = stdout.read(internal_bufsize)
stdout_bytes.append(piece)
except socket.timeout:
# There was a timeout: I continue as there should still be data
pass

if stderr.channel.recv_stderr_ready(): # True means that the next .read call will at least receive 1 byte
chunk_exists = True
try:
piece = stderr.read(internal_bufsize)
stderr_bytes.append(piece)
except socket.timeout:
# There was a timeout: I continue as there should still be data
pass

# If chunk_exists, there is data (either already read and put in the std*_bytes lists, or
# still in the buffer because of a timeout). I need to loop.
# Otherwise, there is no data in the buffers, and I enter this block.
if not chunk_exists:
# Both channels have no data in the buffer
if channel.exit_status_ready():
# The remote execution is over

# I think that in some corner cases there might still be some data,
# in case the data arrived between the previous calls and this check.
# So we do a final read. Since the execution is over, I think all data is in the buffers,
# so we can just read the whole buffer without loops
stdout_bytes.append(stdout.read())
stderr_bytes.append(stderr.read())
# And we go out of the `while True` loop
break
# The exit status is not ready:
# I just put a small sleep to avoid infinite fast loops when data
# is not available on a slow connection, and loop
time.sleep(0.01)

# I get the return code (blocking)
# However, if I am here, the exit status is ready so this should be returning very quickly
retval = channel.recv_exit_status()

# needs to be after 'recv_exit_status', otherwise it might hang
output_text = stdout.read().decode('utf-8')
stderr_text = stderr.read().decode('utf-8')

return retval, output_text, stderr_text
return (retval, b''.join(stdout_bytes), b''.join(stderr_bytes))

def gotocomputer_command(self, remotedir):
"""
Expand Down
38 changes: 33 additions & 5 deletions aiida/transports/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,19 +398,45 @@ def _exec_command_internal(self, command, **kwargs):
"""
raise NotImplementedError

def exec_command_wait(self, command, **kwargs):
def exec_command_wait_bytes(self, command, stdin=None, **kwargs):
"""
Execute the command on the shell, waits for it to finish,
and return the retcode, the stdout and the stderr.
and return the retcode, the stdout and the stderr as bytes.
Enforce the execution to be run from the pwd (as given by
self.getcwd), if this is not None.
Enforce the execution to be run from the pwd (as given by self.getcwd), if this is not None.
The command implementation can have some additional plugin-specific kwargs.
:param str command: execute the command given as a string
:return: a list: the retcode (int), stdout (str) and stderr (str).
:param stdin: (optional,default=None) can be a string or a file-like object.
:return: a tuple: the retcode (int), stdout (bytes) and stderr (bytes).
"""
raise NotImplementedError

def exec_command_wait(self, command, stdin=None, encoding='utf-8', **kwargs):
"""
Executes the specified command and waits for it to finish.
:note: this function also decodes the bytes received into a string with the specified encoding,
which is set to be ``utf-8`` by default (for backward-compatibility with earlier versions) of
AiiDA.
Use this method only if you are sure that you are getting a properly encoded string; otherwise,
use the ``exec_command_wait_bytes`` method that returns the undecoded byte stream.
:note: additional kwargs are passed to the ``exec_command_wait_bytes`` function, that might use them
depending on the plugin.
:param command: the command to execute
:param stdin: (optional,default=None) can be a string or a file-like object.
:param encoding: the encoding to use to decode the byte stream received from the remote command execution.
:return: a tuple with (return_value, stdout, stderr) where stdout and stderr are both strings, decoded
with the specified encoding.
"""
retval, stdout_bytes, stderr_bytes = self.exec_command_wait_bytes(command=command, stdin=stdin, **kwargs)
# Return the decoded strings
return (retval, stdout_bytes.decode(encoding), stderr_bytes.decode(encoding))

def get(self, remotepath, localpath, *args, **kwargs):
"""
Retrieve a file or folder from remote source to local destination
Expand Down Expand Up @@ -689,6 +715,8 @@ def whoami(self):
"""

command = 'whoami'
# Assuming here that the username is either ASCII or UTF-8 encoded
# This should be true essentially always
retval, username, stderr = self.exec_command_wait(command)
if retval == 0:
if stderr.strip():
Expand Down
61 changes: 47 additions & 14 deletions docs/source/intro/install_conda.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Installation into Conda environment

This installation route installs all necessary software -- including the prerequisite services PostgreSQL and RabbitMQ -- into a Conda environment.
This is the recommended method for users on shared systems and systems where the user has no administrative privileges.
If you want to install AiiDA onto you own personal workstation/laptop, it is recommanded to use the :ref:`system-wide installation <intro:get_started:system-wide-install>`.

.. important::

Expand Down Expand Up @@ -34,7 +35,7 @@ This is the recommended method for users on shared systems and systems where the
(aiida) $ initdb -D mylocal_db
This *database cluster* may contain a collection of databases (one per profile) that is managed by a single running server process.
This *database cluster* (located inside a folder named ``mylocal_db``) may contain a collection of databases (one per profile) that is managed by a single running server process.
We start this process with:

.. code-block:: console
Expand All @@ -47,25 +48,19 @@ This is the recommended method for users on shared systems and systems where the
- `Creating a Database Cluster <https://www.postgresql.org/docs/12/creating-cluster.html>`__.
- `Starting the Database Server <https://www.postgresql.org/docs/12/server-start.html>`__.

Then, start the RabbitMQ server:

.. code-block:: console

(aiida) $ rabbitmq-server -detached

Finally, start the AiiDA daemon(s):
Then, start the RabbitMQ server:

.. code-block:: console
(aiida) $ verdi daemon start 2
(aiida) $ rabbitmq-server -detached
.. important::

The verdi daemon(s) must be restarted after a system reboot.

.. tip::

Do not start more daemons then there are physical processors on your system.
The services started this way will use the default ports on the machine.
Conflicts may happen if there are more than one user running AiiDA this way on the same machine, or you already have the server running in a system-wide installation.
To get around this issue, you can explicitly define the ports to be used.

---

Expand All @@ -84,11 +79,30 @@ This is the recommended method for users on shared systems and systems where the
Last name: name
Institution: where-i-work
.. tip::

In case of non-default ports are used for the *database cluster* and the RabbitMQ server, you can pass them using ``--db-port`` and ``--broker-port`` options respectively.


.. admonition:: Is AiiDA unable to auto-detect the PostgreSQL setup?
:class: attention title-icon-troubleshoot

If you get an error saying that AiiDA has trouble autodetecting the PostgreSQL setup, you will need to do the manual setup explained in the :ref:`troubleshooting section<intro:troubleshooting:installation:postgresql-autodetect-issues>`.

Once the profile is up and running, you can start the AiiDA daemon(s):

.. code-block:: console
(aiida) $ verdi daemon start 2
.. important::

The verdi daemon(s) must be restarted after a system reboot.

.. tip::

Do not start more daemons then there are physical processors on your system.

---

**Check setup**
Expand Down Expand Up @@ -117,13 +131,32 @@ This is the recommended method for users on shared systems and systems where the
:text: What's next?
:classes: btn-outline-primary btn-block font-weight-bold


---

**Shut-down services**

After finishing with your aiida session, particularly if switching between profiles, you may wish to power down the services:
After finishing with your aiida session, particularly if switching between profiles, you may wish to power down the daemon and the services:

.. code-block:: console
(aiida) $ verdi daemon stop
(aiida) $ pg_ctl stop
(aiida) $ pg_ctl -D mylocal_db stop
(aiida) $ rabbitmqctl stop
---

**Restart the services**

If you want to restart the services and the daemon:

.. code-block:: console
(aiida) $ pg_ctl -D mylocal_db start
(aiida) $ rabbitmq-server -detached
(aiida) $ verdi daemon start
.. tip::

If different ports are used, you have to pass them here as well.
Loading

0 comments on commit bc6eeb0

Please sign in to comment.