Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #36, #33, tests for i2c and dyn on device #39

Merged
merged 2 commits into from
May 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[pytest]
log_cli = True
log_cli_date_format = %Y-%m-%d %H:%M:%S
log_cli_format = %(asctime)s %(levelname)-7s %(threadName)-18s %(name)-32s %(message)s
log_cli_level = 60

; log_file = logs/pytest-logs.txt
; log_file_date_format = %Y-%m-%d %H:%M:%S
; log_file_format = %(asctime)s %(levelname)s %(message)s
; log_file_level = INFO

; log_format = %(asctime)s %(levelname)s %(message)s
; log_level = INFO
; log_print = False
29 changes: 20 additions & 9 deletions roboglia/base/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ class BaseDevice():
if mandatory parameters are not found or unexpected values
are used (ex. for boolean)
"""

cache = {}
"""A chache of device models that is updated when a new model is
encountered and reused when the same model is requested during
device creation.
"""

def __init__(self, name='DEVICE', bus=None, dev_id=None, model=None,
path=None, auto=True, init={}, **kwargs):
# these are already checked by robot
Expand All @@ -111,8 +118,12 @@ def __init__(self, name='DEVICE', bus=None, dev_id=None, model=None,
if not path:
path = self.get_model_path()
model_file = os.path.join(path, model + '.yml')
with open(model_file, 'r') as f:
model_ini = yaml.load(f, Loader=yaml.FullLoader)
if model_file not in BaseDevice.cache:
with open(model_file, 'r') as f:
model_ini = yaml.load(f, Loader=yaml.FullLoader)
BaseDevice.cache[model_file] = model_ini
else:
model_ini = BaseDevice.cache[model_file]
self.__registers = {}
self.__reg_by_addr = {}
for reg_name, reg_info in model_ini['registers'].items():
Expand Down Expand Up @@ -248,21 +259,21 @@ def open(self):
that are not flagged for ``sync`` replication and, if ``init``
parameter provided initializes the indicated
registers with the values from the ``init`` paramters."""
logger.info('reading registers')
logger.info('\t\treading registers')
for register in self.registers.values():
if register.sync:
logger.debug(f'register {register.name} flagged for sync '
'update -- skipping')
logger.debug(f'\t\t\tregister {register.name} flagged for '
'sync update -- skipping')
else:
logger.debug(f'reading register {register.name}')
logger.debug(f'\t\t\treading register {register.name}')
self.read_register(register)
logger.info('initializing registers')
logger.info('\t\tinitializing registers')
for reg_name, value in self.__init.items():
if reg_name in self.__registers:
logger.debug(f'initializing register {reg_name}')
logger.debug(f'\t\t\tinitializing register {reg_name}')
self.__registers[reg_name].value = value
else:
logger.warning(f'register {reg_name} does not exist in '
logger.warning(f'\t\t\tregister {reg_name} does not exist in '
f'device {self.name}; skipping initialization')

def close(self):
Expand Down
46 changes: 44 additions & 2 deletions roboglia/base/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,32 @@ class BaseRegister():
through the value property will invoke reading / writing to the real
register; default ``False``

word: bool
Indicates that the register is a ``word`` register (16 bits) instead
of a usual 8 bits. Some I2C and SPI devices use 16bit registers
and need to use separate access functions to read them as opposed to
the 8 bit registers. Default is ``False`` which effectively makes it
an 8 bit register

order: ``LH`` or ``HL``
Applicable only for registers with size > 1 that represent a value
over succesive internal registers, but for convenience are groupped
as one single register with size 2 (or higher).
``LH`` means low-high and indicates the bytes in the registry are
organized starting with the low byte first. ``HL`` indicates that
the registers are with the high byte first. Technically the ``read``
and ``write`` functions always read the bytes in the order they
are stored in the device and if the register is marked as ``HL`` the
list is reversed before being returned to the requester or processed
as a number in case the ``bulk`` is ``False``. Default is ``LH``.

default: int
The default value for the register; implicit 0

"""
def __init__(self, name='REGISTER', device=None, address=0, size=1,
minim=0, maxim=None, access='R', sync=False, default=0,
**kwargs):
minim=0, maxim=None, access='R', sync=False, word=False,
bulk=True, order='LH', default=0, **kwargs):
# these are already checked by the device
self.__name = name
# device
Expand Down Expand Up @@ -92,6 +111,15 @@ def __init__(self, name='REGISTER', device=None, address=0, size=1,
# sync
check_options(sync, [True, False], 'register', self.name, logger)
self.__sync = sync
# word
check_options(word, [True, False], 'register', self.name, logger)
self.__word = word
# bulk
check_options(bulk, [True, False], 'register', self.name, logger)
self.__bulk = bulk
# order
check_options(order, ['LH', 'HL'], 'register', self.name, logger)
self.__order = order
# default
check_type(default, int, 'register', self.name, logger)
self.__default = default
Expand Down Expand Up @@ -168,6 +196,20 @@ def sync(self, value):
logger.error('only BaseSync subclasses can chance the sync '
'flag of a register')

@property
def word(self):
"""Indicates if the register is an 16 bit register (``True``) or
an 8 bit register.
"""
return self.__word

@property
def order(self):
"""Indicates the order of the data representartion; low-high (LH)
or high-low (HL)
"""
return self.__order

@property
def default(self):
"""The register's default value in internal format."""
Expand Down
2 changes: 1 addition & 1 deletion roboglia/base/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def process_registers(self):
reg_obj = getattr(device, register)
if not reg_obj.sync:
reg_obj.sync = True
logger.debug(f'setting register {register} of device '
logger.debug(f'\t\tsetting register {register} of device '
f'{device.name} sync=True')

def get_register_range(self):
Expand Down
2 changes: 1 addition & 1 deletion roboglia/dynamixel/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def open(self):
self.__port_handler.setBaudRate(self.baudrate)
if self.rs485:
self.__port_handler.ser.rs485_mode = rs485.RS485Settings()
logger.info(f'bus {self.name} set in rs485 mode')
logger.info(f'\tbus {self.name} set in rs485 mode')
self.__packet_handler = PacketHandler(self.__protocol)
logger.info(f'bus {self.name} opened')

Expand Down
85 changes: 69 additions & 16 deletions roboglia/i2c/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,40 @@ class I2CBus(BaseBus):

In addition there is an extra parameter `mock`.

At this mement the ``I2CBus`` supports devices with byte and word
registers and permits defining composed regsiters with ``size`` > 1
that are treated as a single register.

.. example:: A gyroscope sensor might have registers for the z, y and z
axes reading that are stored as pairs of registers like this::

gyro_x_l #0x28
gyro_x_h #0x29
gyro_y_l #0x2A
gyro_y_h #0x2B
gyro_z_l #0x2C
gyro_z_h #0x2D

For simplicity it is possible to define these registers like this
in the device template::

registers:
gyro_x:
address: 0x28
size: 2
gyro_y:
address: 0x2A
size: 2
gyro_z:
address: 0x2C
size: 2

By default the registers are ``Byte`` and the order of data is
low-high as described in the :py:class:roboglia.base.`BaseRegister`.
The bus will handle this by reading the two registers sequentially
and computing the register's value using the size of the register
and the order.

Parameters
----------
mock: bool
Expand Down Expand Up @@ -85,44 +119,63 @@ def read(self, reg):
if not self.is_open:
logger.error(f'attempted to read from a closed bus: {self.name}')
return None

dev = reg.device
if reg.word:
function = self.__i2cbus.read_word_data
base = 65536
else:
dev = reg.device
if reg.size == 1:
function = self.__i2cbus.read_byte_data
elif reg.size == 2:
function = self.__i2cbus.read_word_data
else:
raise NotImplementedError
function = self.__i2cbus.read_byte_data
base = 256

values = [0] * reg.size
for pos in range(reg.size):
try:
return function(dev.dev_id, reg.address)
values[pos] = function(dev.dev_id, reg.address + pos)
except Exception as e:
logger.error(f'failed to execute read command on I2C bus '
f'{self.name} for device {dev.name} and '
f'register {reg.name}')
logger.error(str(e))
return None
if reg.order == 'HL':
values = values.reverse()
value = 0
for pos in range(reg.size):
value = value * base + values[-pos-1]
return value

def write(self, reg, value):
"""Depending on the size of the register it calls the corresponding
write function from ``SMBus``.
"""
if not self.is_open:
logger.error(f'attempted to write to a closed bus: {self.name}')

dev = reg.device
if reg.word:
function = self.__i2cbus.write_word_data
base = 65536
else:
dev = reg.device
if reg.size == 1:
function = self.__i2cbus.write_byte_data
elif reg.size == 2:
function = self.__i2cbus.write_word_data
else:
raise NotImplementedError
function = self.__i2cbus.write_byte_data
base = 256

values = [0] * reg.size
data = reg.int_value
for pos in range(reg.size):
values[pos] = data % base
data = data // base
if reg.order == 'HL':
values = values.reverse()
for pos, value in enumerate(values):
try:
function(dev.dev_id, reg.address, value)
function(dev.dev_id, reg.address + pos, value)
except Exception as e:
logger.error(f'failed to execute write command on I2C bus '
f'{self.name} for device {dev.name} and '
f'register {reg.name}')
logger.error(str(e))
return None


class SharedI2CBus(SharedBus):
Expand Down
95 changes: 95 additions & 0 deletions roboglia/i2c/devices/LSM330G.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# iNEMO inertial module: always-on 3D accelerometer and 3D gyroscope
# this is only the gyro part of the device as they are accessed
# as separate devices on the I2C bus with
# https://www.st.com/resource/en/datasheet/lsm6ds3.pdf
#
registers:
who_am_i:
# Who I am ID
address: 0x0f
access: R
default: 0b11010100

# Angular rate sensor control registers
ctrl_reg1:
address: 0x20
access: RW
default: 0b00000111

ctrl_reg2:
address: 0x21
access: RW

ctrl_reg3:
address: 0x22
access: RW

ctrl_reg4:
address: 0x23
access: RW

ctrl_reg5:
address: 0x24
access: RW

# Reference value for interrupt generation
reference:
address: 0x25
access: RW

# Temperature data output
out_temp:
address: 0x26

# Status register
status_reg:
address: 0x27

# Angular rate sensor output registers
out_x:
address: 0x28
size: 2

out_y:
address: 0x2a
size: 2

out_z:
address: 0x2c
size: 2

# Angular rate sensor FIFO registers
fifo_ctrl_reg:
address: 0x2e
access: RW

fifo_src_reg:
address: 0x2f

# Angular rate sensor interrupt registers
int1_cfg:
address: 0x30
access: RW

int1_src:
address: 0x31
access: RW

int1_ths_x:
address: 0x32
access: RW
size: 2

int1_ths_y:
address: 0x34
access: RW
size: 2

int1_ths_z:
address: 0x36
access: RW
size: 2

int1_duration:
address: 0x38
access: RW
Loading