Skip to content

Commit

Permalink
Merge pull request #25 from dls-controls/ca-disconnections
Browse files Browse the repository at this point in the history
Notify on channel access disconnections.
  • Loading branch information
willrogers authored Jun 28, 2021
2 parents d593266 + dc46236 commit 40ba2e0
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 313 deletions.
586 changes: 311 additions & 275 deletions Pipfile.lock

Large diffs are not rendered by default.

69 changes: 34 additions & 35 deletions coniql/caplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@


class CAChannelMaker:
def __init__(
self, name, config: ChannelConfig,
):
def __init__(self, name, config: ChannelConfig, writeable: bool):
self.name = name
self.config = config
self.cached_status: Optional[ChannelStatus] = None
self.formatter = ChannelFormatter()
self.writeable = True
# No camonitor is capable of updating whether a channel is writeable,
# so this value is immutable.
self.writeable = writeable

@staticmethod
def _create_formatter(
Expand All @@ -62,14 +62,13 @@ def channel_from_update(
self,
time_value: Optional[AugmentedValue] = None,
meta_value: Optional[AugmentedValue] = None,
writeable: Optional[bool] = None,
) -> Channel:
value = None
time = None
status = None
display = None

if meta_value is not None:
if meta_value is not None and meta_value.ok:
self.formatter = CAChannelMaker._create_formatter(
meta_value, self.config.display_form
)
Expand Down Expand Up @@ -103,27 +102,24 @@ def channel_from_update(
)

if time_value is not None:
assert time_value.timestamp
value = ChannelValue(time_value, self.formatter)
quality = CHANNEL_QUALITY_MAP[time_value.severity]
if self.cached_status is None or self.cached_status.quality != quality:
status = ChannelStatus(
quality=quality, message="", mutable=self.writeable,
if time_value.ok:
assert time_value.timestamp
value = ChannelValue(time_value, self.formatter)
quality = CHANNEL_QUALITY_MAP[time_value.severity]
if self.cached_status is None or self.cached_status.quality != quality:
status = ChannelStatus(
quality=quality, message="", mutable=self.writeable,
)
self.cached_status = status
time = ChannelTime(
seconds=time_value.timestamp,
nanoseconds=time_value.raw_stamp[1],
userTag=0,
)
self.cached_status = status
time = ChannelTime(
seconds=time_value.timestamp,
nanoseconds=time_value.raw_stamp[1],
userTag=0,
)

if writeable is not None:
self.writeable = writeable
if status is not None:
status.mutable = writeable
elif self.cached_status is not None:
else:
# An update where .ok is false indicates a disconnection.
status = ChannelStatus(
quality=self.cached_status.quality, message="", mutable=writeable,
quality="INVALID", message="", mutable=self.writeable,
)
self.cached_status = status

Expand Down Expand Up @@ -159,11 +155,8 @@ async def get_channel(
caget(pv, format=FORMAT_CTRL, timeout=timeout),
cainfo(pv, timeout=timeout),
)
# Put in channel id so converters can see it
maker = CAChannelMaker(pv, config)
return maker.channel_from_update(
time_value=time_value, meta_value=meta_value, writeable=info.write
)
maker = CAChannelMaker(pv, config, info.write)
return maker.channel_from_update(time_value=time_value, meta_value=meta_value)

async def put_channels(
self, pvs: List[str], values: List[PutValue], timeout: float
Expand All @@ -173,13 +166,16 @@ async def put_channels(
async def subscribe_channel(
self, pv: str, config: ChannelConfig
) -> AsyncIterator[Channel]:
maker = CAChannelMaker(pv, config)
# A queue that contains a monitor update and the keyword with which
# the channel's update_value function should be called.
q: asyncio.Queue[Dict[str, AugmentedValue]] = asyncio.Queue()
# Monitor PV for value and alarm changes with associated timestamp.
# Use this monitor also for notifications of disconnections.
value_monitor = camonitor(
pv, lambda v: q.put({"time_value": v}), format=FORMAT_TIME,
pv,
lambda v: q.put({"time_value": v}),
format=FORMAT_TIME,
notify_disconnect=True,
)
# Monitor PV only for property changes. For EPICS < 3.15 this monitor
# will update once on connection but will not subsequently be triggered.
Expand All @@ -195,18 +191,21 @@ async def subscribe_channel(
# A specific request required for whether the channel is writeable.
# This will not be updated, so wait until a callback is received
# before making the request when the channel is likely be connected.
writeable = True
try:
info = await cainfo(pv)
first_channel_value["writeable"] = info.write
writeable = info.write
except CANothing:
# Unlikely, but allow subscriptions to continue.
first_channel_value["writeable"] = True
pass

maker = CAChannelMaker(pv, config, writeable)
# Do not continue until both monitors have returned.
# Then the first Channel returned will be complete.
while len(first_channel_value) < 3:
while len(first_channel_value) < 2:
update = await q.get()
first_channel_value.update(update)

yield maker.channel_from_update(**first_channel_value)

# Handle all subsequent updates from both monitors.
Expand Down
2 changes: 1 addition & 1 deletion coniql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def ndarray_to_base64_array(
return dict(
numberType=value.dtype.name.upper(),
# https://stackoverflow.com/a/6485943
base64=base64.b64encode(value).decode(),
base64=base64.b64encode(value.tobytes()).decode(),
)

# ndarray -> [str] uses given precision
Expand Down
41 changes: 41 additions & 0 deletions tests/test_caplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from pathlib import Path
from subprocess import Popen
from typing import Any, Dict, List
from unittest.mock import ANY

import pytest
Expand Down Expand Up @@ -159,6 +160,46 @@ async def test_get_enum_pv(engine: Engine, ioc: Popen):
)


@pytest.mark.asyncio
async def test_subscribe_disconnect(engine: Engine, ioc: Popen):
query = (
"""
subscription {
subscribeChannel(id: "ca://%slongout") {
value {
float
}
status {
quality
}
}
}
"""
% PV_PREFIX
)
results: List[Dict[str, Any]] = []
wait_for_ioc(ioc)
async for result in engine.subscribe(query, context=make_context()):
if not results:
# First response; now disconnect.
results.append(result)
ioc.communicate("exit()")
else:
# Second response; done.
results.append(result)
break

assert len(results) == 2
assert results[0] == dict(
data=dict(
subscribeChannel=dict(value=dict(float=42), status=dict(quality="VALID"))
)
)
assert results[1] == dict(
data=dict(subscribeChannel=dict(value=None, status=dict(quality="INVALID")))
)


@pytest.mark.asyncio
async def test_subscribe_ticking(engine: Engine, ioc: Popen):
query = (
Expand Down
2 changes: 1 addition & 1 deletion tests/test_coniql.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async def test_get_sim_sinewave(engine: Engine):
stringArray=["%.5f" % x for x in np.zeros(10)],
base64Array=dict(
numberType="FLOAT64",
base64=base64.b64encode(np.zeros(50)).decode(),
base64=base64.b64encode(np.zeros(50).tobytes()).decode(),
),
)
)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pvaplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def test_subscribe_int_pv(engine: Engine, ioc: SimIoc):
% PV_PREFIX
)
context = make_context()
expected = np.ndarray(0, dtype=np.int32)
expected = np.ndarray([0], dtype=np.int32)
async for result in engine.subscribe(query, context=context):
assert result == dict(
data=dict(subscribeChannel=dict(value=dict(stringArray=ANY)))
Expand Down

0 comments on commit 40ba2e0

Please sign in to comment.