Skip to content

Commit

Permalink
Merge pull request #39 from sonelu/in-device
Browse files Browse the repository at this point in the history
fix #36, #33, tests for i2c and dyn on device
  • Loading branch information
sonelu authored May 17, 2020
2 parents 1eb297c + 6e55dae commit 79e4dab
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 181 deletions.
14 changes: 14 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[pytest]
log_cli = True
log_cli_date_format = %Y-%m-%d %H:%M:%S
log_cli_format = %(asctime)s %(levelname)-7s %(threadName)-18s %(name)-32s %(message)s
log_cli_level = 60

; log_file = logs/pytest-logs.txt
; log_file_date_format = %Y-%m-%d %H:%M:%S
; log_file_format = %(asctime)s %(levelname)s %(message)s
; log_file_level = INFO

; log_format = %(asctime)s %(levelname)s %(message)s
; log_level = INFO
; log_print = False
29 changes: 20 additions & 9 deletions roboglia/base/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ class BaseDevice():
if mandatory parameters are not found or unexpected values
are used (ex. for boolean)
"""

cache = {}
"""A chache of device models that is updated when a new model is
encountered and reused when the same model is requested during
device creation.
"""

def __init__(self, name='DEVICE', bus=None, dev_id=None, model=None,
path=None, auto=True, init={}, **kwargs):
# these are already checked by robot
Expand All @@ -111,8 +118,12 @@ def __init__(self, name='DEVICE', bus=None, dev_id=None, model=None,
if not path:
path = self.get_model_path()
model_file = os.path.join(path, model + '.yml')
with open(model_file, 'r') as f:
model_ini = yaml.load(f, Loader=yaml.FullLoader)
if model_file not in BaseDevice.cache:
with open(model_file, 'r') as f:
model_ini = yaml.load(f, Loader=yaml.FullLoader)
BaseDevice.cache[model_file] = model_ini
else:
model_ini = BaseDevice.cache[model_file]
self.__registers = {}
self.__reg_by_addr = {}
for reg_name, reg_info in model_ini['registers'].items():
Expand Down Expand Up @@ -248,21 +259,21 @@ def open(self):
that are not flagged for ``sync`` replication and, if ``init``
parameter provided initializes the indicated
registers with the values from the ``init`` paramters."""
logger.info('reading registers')
logger.info('\t\treading registers')
for register in self.registers.values():
if register.sync:
logger.debug(f'register {register.name} flagged for sync '
'update -- skipping')
logger.debug(f'\t\t\tregister {register.name} flagged for '
'sync update -- skipping')
else:
logger.debug(f'reading register {register.name}')
logger.debug(f'\t\t\treading register {register.name}')
self.read_register(register)
logger.info('initializing registers')
logger.info('\t\tinitializing registers')
for reg_name, value in self.__init.items():
if reg_name in self.__registers:
logger.debug(f'initializing register {reg_name}')
logger.debug(f'\t\t\tinitializing register {reg_name}')
self.__registers[reg_name].value = value
else:
logger.warning(f'register {reg_name} does not exist in '
logger.warning(f'\t\t\tregister {reg_name} does not exist in '
f'device {self.name}; skipping initialization')

def close(self):
Expand Down
46 changes: 44 additions & 2 deletions roboglia/base/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,32 @@ class BaseRegister():
through the value property will invoke reading / writing to the real
register; default ``False``
word: bool
Indicates that the register is a ``word`` register (16 bits) instead
of a usual 8 bits. Some I2C and SPI devices use 16bit registers
and need to use separate access functions to read them as opposed to
the 8 bit registers. Default is ``False`` which effectively makes it
an 8 bit register
order: ``LH`` or ``HL``
Applicable only for registers with size > 1 that represent a value
over succesive internal registers, but for convenience are groupped
as one single register with size 2 (or higher).
``LH`` means low-high and indicates the bytes in the registry are
organized starting with the low byte first. ``HL`` indicates that
the registers are with the high byte first. Technically the ``read``
and ``write`` functions always read the bytes in the order they
are stored in the device and if the register is marked as ``HL`` the
list is reversed before being returned to the requester or processed
as a number in case the ``bulk`` is ``False``. Default is ``LH``.
default: int
The default value for the register; implicit 0
"""
def __init__(self, name='REGISTER', device=None, address=0, size=1,
minim=0, maxim=None, access='R', sync=False, default=0,
**kwargs):
minim=0, maxim=None, access='R', sync=False, word=False,
bulk=True, order='LH', default=0, **kwargs):
# these are already checked by the device
self.__name = name
# device
Expand Down Expand Up @@ -92,6 +111,15 @@ def __init__(self, name='REGISTER', device=None, address=0, size=1,
# sync
check_options(sync, [True, False], 'register', self.name, logger)
self.__sync = sync
# word
check_options(word, [True, False], 'register', self.name, logger)
self.__word = word
# bulk
check_options(bulk, [True, False], 'register', self.name, logger)
self.__bulk = bulk
# order
check_options(order, ['LH', 'HL'], 'register', self.name, logger)
self.__order = order
# default
check_type(default, int, 'register', self.name, logger)
self.__default = default
Expand Down Expand Up @@ -168,6 +196,20 @@ def sync(self, value):
logger.error('only BaseSync subclasses can chance the sync '
'flag of a register')

@property
def word(self):
"""Indicates if the register is an 16 bit register (``True``) or
an 8 bit register.
"""
return self.__word

@property
def order(self):
"""Indicates the order of the data representartion; low-high (LH)
or high-low (HL)
"""
return self.__order

@property
def default(self):
"""The register's default value in internal format."""
Expand Down
2 changes: 1 addition & 1 deletion roboglia/base/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def process_registers(self):
reg_obj = getattr(device, register)
if not reg_obj.sync:
reg_obj.sync = True
logger.debug(f'setting register {register} of device '
logger.debug(f'\t\tsetting register {register} of device '
f'{device.name} sync=True')

def get_register_range(self):
Expand Down
2 changes: 1 addition & 1 deletion roboglia/dynamixel/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def open(self):
self.__port_handler.setBaudRate(self.baudrate)
if self.rs485:
self.__port_handler.ser.rs485_mode = rs485.RS485Settings()
logger.info(f'bus {self.name} set in rs485 mode')
logger.info(f'\tbus {self.name} set in rs485 mode')
self.__packet_handler = PacketHandler(self.__protocol)
logger.info(f'bus {self.name} opened')

Expand Down
85 changes: 69 additions & 16 deletions roboglia/i2c/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,40 @@ class I2CBus(BaseBus):
In addition there is an extra parameter `mock`.
At this mement the ``I2CBus`` supports devices with byte and word
registers and permits defining composed regsiters with ``size`` > 1
that are treated as a single register.
.. example:: A gyroscope sensor might have registers for the z, y and z
axes reading that are stored as pairs of registers like this::
gyro_x_l #0x28
gyro_x_h #0x29
gyro_y_l #0x2A
gyro_y_h #0x2B
gyro_z_l #0x2C
gyro_z_h #0x2D
For simplicity it is possible to define these registers like this
in the device template::
registers:
gyro_x:
address: 0x28
size: 2
gyro_y:
address: 0x2A
size: 2
gyro_z:
address: 0x2C
size: 2
By default the registers are ``Byte`` and the order of data is
low-high as described in the :py:class:roboglia.base.`BaseRegister`.
The bus will handle this by reading the two registers sequentially
and computing the register's value using the size of the register
and the order.
Parameters
----------
mock: bool
Expand Down Expand Up @@ -85,44 +119,63 @@ def read(self, reg):
if not self.is_open:
logger.error(f'attempted to read from a closed bus: {self.name}')
return None

dev = reg.device
if reg.word:
function = self.__i2cbus.read_word_data
base = 65536
else:
dev = reg.device
if reg.size == 1:
function = self.__i2cbus.read_byte_data
elif reg.size == 2:
function = self.__i2cbus.read_word_data
else:
raise NotImplementedError
function = self.__i2cbus.read_byte_data
base = 256

values = [0] * reg.size
for pos in range(reg.size):
try:
return function(dev.dev_id, reg.address)
values[pos] = function(dev.dev_id, reg.address + pos)
except Exception as e:
logger.error(f'failed to execute read command on I2C bus '
f'{self.name} for device {dev.name} and '
f'register {reg.name}')
logger.error(str(e))
return None
if reg.order == 'HL':
values = values.reverse()
value = 0
for pos in range(reg.size):
value = value * base + values[-pos-1]
return value

def write(self, reg, value):
"""Depending on the size of the register it calls the corresponding
write function from ``SMBus``.
"""
if not self.is_open:
logger.error(f'attempted to write to a closed bus: {self.name}')

dev = reg.device
if reg.word:
function = self.__i2cbus.write_word_data
base = 65536
else:
dev = reg.device
if reg.size == 1:
function = self.__i2cbus.write_byte_data
elif reg.size == 2:
function = self.__i2cbus.write_word_data
else:
raise NotImplementedError
function = self.__i2cbus.write_byte_data
base = 256

values = [0] * reg.size
data = reg.int_value
for pos in range(reg.size):
values[pos] = data % base
data = data // base
if reg.order == 'HL':
values = values.reverse()
for pos, value in enumerate(values):
try:
function(dev.dev_id, reg.address, value)
function(dev.dev_id, reg.address + pos, value)
except Exception as e:
logger.error(f'failed to execute write command on I2C bus '
f'{self.name} for device {dev.name} and '
f'register {reg.name}')
logger.error(str(e))
return None


class SharedI2CBus(SharedBus):
Expand Down
95 changes: 95 additions & 0 deletions roboglia/i2c/devices/LSM330G.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# iNEMO inertial module: always-on 3D accelerometer and 3D gyroscope
# this is only the gyro part of the device as they are accessed
# as separate devices on the I2C bus with
# https://www.st.com/resource/en/datasheet/lsm6ds3.pdf
#
registers:
who_am_i:
# Who I am ID
address: 0x0f
access: R
default: 0b11010100

# Angular rate sensor control registers
ctrl_reg1:
address: 0x20
access: RW
default: 0b00000111

ctrl_reg2:
address: 0x21
access: RW

ctrl_reg3:
address: 0x22
access: RW

ctrl_reg4:
address: 0x23
access: RW

ctrl_reg5:
address: 0x24
access: RW

# Reference value for interrupt generation
reference:
address: 0x25
access: RW

# Temperature data output
out_temp:
address: 0x26

# Status register
status_reg:
address: 0x27

# Angular rate sensor output registers
out_x:
address: 0x28
size: 2

out_y:
address: 0x2a
size: 2

out_z:
address: 0x2c
size: 2

# Angular rate sensor FIFO registers
fifo_ctrl_reg:
address: 0x2e
access: RW

fifo_src_reg:
address: 0x2f

# Angular rate sensor interrupt registers
int1_cfg:
address: 0x30
access: RW

int1_src:
address: 0x31
access: RW

int1_ths_x:
address: 0x32
access: RW
size: 2

int1_ths_y:
address: 0x34
access: RW
size: 2

int1_ths_z:
address: 0x36
access: RW
size: 2

int1_duration:
address: 0x38
access: RW
Loading

0 comments on commit 79e4dab

Please sign in to comment.