Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscription: add ModifySubscription functionality #714

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Here is the current set of supported services. For low-level access use the clie
| | SetMonitoringMode | Yes | Yes | |
| | SetTriggering | | | |
| Subscription Service Set | CreateSubscription | Yes | Yes | |
| | ModifySubscription | | | |
| | ModifySubscription | Yes | | |
| | SetPublishingMode | | | |
| | Publish | Yes | Yes | |
| | Republish | | | |
Expand Down
17 changes: 10 additions & 7 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *Client) Subscribe(ctx context.Context, params *SubscriptionParameters,
}

c.subs[sub.SubscriptionID] = sub
c.updatePublishTimeout_NeedsSubMuxRLock()
c.updatePublishTimeout_NeedsSubMuxLock()
return sub, nil
}

Expand All @@ -89,7 +89,7 @@ func (c *Client) SubscriptionIDs() []uint32 {
}

// recreateSubscriptions creates new subscriptions
// with the same parameters to replace the previous ones
// with the same parameters to replace the previous one
func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
c.subMux.Lock()
defer c.subMux.Unlock()
Expand All @@ -98,7 +98,10 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
if !ok {
return ua.StatusBadSubscriptionIDInvalid
}
return sub.recreate_NeedsSubMuxLock(ctx)

sub.recreate_delete(ctx)
c.forgetSubscription_NeedsSubMuxLock(ctx, id)
return sub.recreate_create(ctx)
}

// transferSubscriptions ask the server to transfer the given subscriptions
Expand Down Expand Up @@ -230,7 +233,7 @@ func (c *Client) forgetSubscription(ctx context.Context, id uint32) {

func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint32) {
delete(c.subs, id)
c.updatePublishTimeout_NeedsSubMuxRLock()
c.updatePublishTimeout_NeedsSubMuxLock()
stats.Subscription().Add("Count", -1)

if len(c.subs) == 0 {
Expand All @@ -240,7 +243,7 @@ func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint
}
}

func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() {
func (c *Client) updatePublishTimeout_NeedsSubMuxLock() {
maxTimeout := uasc.MaxTimeout
for _, s := range c.subs {
if d := s.publishTimeout(); d < maxTimeout {
Expand Down Expand Up @@ -470,7 +473,7 @@ func (c *Client) publish(ctx context.Context) error {
}

// handle the publish response for a specific subscription
c.handleNotification_NeedsSubMuxLock(ctx, sub, res)
c.handleNotification_NeedsSubMuxLock(sub, res)
c.subMux.Unlock()

c.notifySubscription(ctx, sub, res.NotificationMessage)
Expand Down Expand Up @@ -513,7 +516,7 @@ func (c *Client) handleAcks_NeedsSubMuxLock(res []ua.StatusCode) {
dlog.Printf("notAcked=%v", notAcked)
}

func (c *Client) handleNotification_NeedsSubMuxLock(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
func (c *Client) handleNotification_NeedsSubMuxLock(sub *Subscription, res *ua.PublishResponse) {
dlog := debug.NewPrefixLogger("publish: sub %d: ", res.SubscriptionID)

// keep-alive message
Expand Down
8 changes: 8 additions & 0 deletions examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func main() {
log.Fatal(err)
}

// Uncomment the following to try modifying the subscription
//
// var params opcua.SubscriptionParameters
// params.Interval = time.Millisecond * 2000
// if _, err := sub.ModifySubscription(ctx, params); err != nil {
// log.Fatal(err)
// }

// read from subscription's notification channel until ctx is cancelled
for {
select {
Expand Down
79 changes: 57 additions & 22 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ const (
DefaultSubscriptionPriority = 0
)

const terminatedSubscriptionID uint32 = 0xC0CAC01B

type Subscription struct {
SubscriptionID uint32
RevisedPublishingInterval time.Duration
RevisedLifetimeCount uint32
RevisedMaxKeepAliveCount uint32
Notifs chan<- *PublishNotificationData
params *SubscriptionParameters
paramsMu sync.Mutex
items map[uint32]*monitoredItem
itemsMu sync.Mutex
lastSeq uint32
Expand Down Expand Up @@ -111,6 +110,40 @@ func (s *Subscription) delete(ctx context.Context) error {
}
}

func (s *Subscription) ModifySubscription(ctx context.Context, params SubscriptionParameters) (*ua.ModifySubscriptionResponse, error) {
stats.Subscription().Add("ModifySubscription", 1)

params.setDefaults()
req := &ua.ModifySubscriptionRequest{
SubscriptionID: s.SubscriptionID,
RequestedPublishingInterval: float64(params.Interval.Milliseconds()),
RequestedLifetimeCount: params.LifetimeCount,
RequestedMaxKeepAliveCount: params.MaxKeepAliveCount,
MaxNotificationsPerPublish: params.MaxNotificationsPerPublish,
Priority: params.Priority,
}

var res *ua.ModifySubscriptionResponse
err := s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})

if err != nil {
return nil, err
}

// update subscription parameters
s.paramsMu.Lock()
s.params = &params
s.paramsMu.Unlock()
// update revised subscription parameters
s.RevisedPublishingInterval = time.Duration(res.RevisedPublishingInterval) * time.Millisecond
s.RevisedLifetimeCount = res.RevisedLifetimeCount
s.RevisedMaxKeepAliveCount = res.RevisedMaxKeepAliveCount

return res, nil
}

func (s *Subscription) Monitor(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
stats.Subscription().Add("Monitor", 1)
stats.Subscription().Add("MonitoredItems", int64(len(items)))
Expand Down Expand Up @@ -358,29 +391,31 @@ func (p *SubscriptionParameters) setDefaults() {
}
}

// recreate_NeedsSubMuxLock creates a new subscription based on the previous subscription
// parameters and monitored items.
func (s *Subscription) recreate_NeedsSubMuxLock(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate: ", s.SubscriptionID)

if s.SubscriptionID == terminatedSubscriptionID {
dlog.Printf("subscription is not in a valid state")
return nil
// recreate_delete is called by the client when it is trying to
// recreate an existing subscription. This function deletes the
// existing subscription from the server.
func (s *Subscription) recreate_delete(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate_delete: ", s.SubscriptionID)
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
return nil
}

// recreate_create is called by the client when it is trying to
// recreate an existing subscription. This function creates a
// new subscription with the same parameters as the previous one.
func (s *Subscription) recreate_create(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate_create: ", s.SubscriptionID)

s.paramsMu.Lock()
params := s.params
{
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
}
s.c.forgetSubscription_NeedsSubMuxLock(ctx, s.SubscriptionID)
dlog.Printf("subscription forgotton")
s.paramsMu.Unlock()

req := &ua.CreateSubscriptionRequest{
RequestedPublishingInterval: float64(params.Interval / time.Millisecond),
Expand Down
Loading