Skip to content

Commit

Permalink
subscription: add ModifySubscription functionality
Browse files Browse the repository at this point in the history
closes #713

Signed-off-by: Jack Chen <jack@iotechsys.com>
  • Loading branch information
jackchenjc committed Jan 7, 2025
1 parent a958e31 commit 0b636c4
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Here is the current set of supported services. For low-level access use the clie
| | SetMonitoringMode | Yes | Yes | |
| | SetTriggering | | | |
| Subscription Service Set | CreateSubscription | Yes | Yes | |
| | ModifySubscription | | | |
| | ModifySubscription | Yes | | |
| | SetPublishingMode | | | |
| | Publish | Yes | Yes | |
| | Republish | | | |
Expand Down
8 changes: 8 additions & 0 deletions examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func main() {
log.Fatal(err)
}

// Uncomment the following to try modifying the subscription
//var params opcua.SubscriptionParameters
//params.Interval = time.Millisecond * 2000
//_, err = sub.ModifySubscription(ctx, params)
//if err != nil {
// log.Fatal(err)
//}

// read from subscription's notification channel until ctx is cancelled
for {
select {
Expand Down
35 changes: 35 additions & 0 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Subscription struct {
RevisedMaxKeepAliveCount uint32
Notifs chan<- *PublishNotificationData
params *SubscriptionParameters
paramsMu sync.Mutex
items map[uint32]*monitoredItem
itemsMu sync.Mutex
lastSeq uint32
Expand Down Expand Up @@ -111,6 +112,40 @@ func (s *Subscription) delete(ctx context.Context) error {
}
}

func (s *Subscription) ModifySubscription(ctx context.Context, params SubscriptionParameters) (*ua.ModifySubscriptionResponse, error) {
stats.Subscription().Add("ModifySubscription", 1)

params.setDefaults()
req := &ua.ModifySubscriptionRequest{
SubscriptionID: s.SubscriptionID,
RequestedPublishingInterval: float64(params.Interval.Milliseconds()),
RequestedLifetimeCount: params.LifetimeCount,
RequestedMaxKeepAliveCount: params.MaxKeepAliveCount,
MaxNotificationsPerPublish: params.MaxNotificationsPerPublish,
Priority: params.Priority,
}

var res *ua.ModifySubscriptionResponse
err := s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})

if err != nil {
return nil, err
}

// update subscription parameters
s.paramsMu.Lock()
s.params = &params
s.paramsMu.Unlock()
// update revised subscription parameters
s.RevisedPublishingInterval = time.Duration(res.RevisedPublishingInterval) * time.Millisecond
s.RevisedLifetimeCount = res.RevisedLifetimeCount
s.RevisedMaxKeepAliveCount = res.RevisedMaxKeepAliveCount

return res, nil
}

func (s *Subscription) Monitor(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
stats.Subscription().Add("Monitor", 1)
stats.Subscription().Add("MonitoredItems", int64(len(items)))
Expand Down

0 comments on commit 0b636c4

Please sign in to comment.