Skip to content

Commit

Permalink
Merge pull request #549 from gopcua/add-context-to-subscription
Browse files Browse the repository at this point in the history
add context to subscription methods
  • Loading branch information
CrowdHailer authored Jan 20, 2022
2 parents 36fac3f + 9f41dc1 commit 6c5af25
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (c *Client) monitor(ctx context.Context) {

// try to transfer all subscriptions to the new session and
// recreate them all if that fails.
res, err := c.transferSubscriptions(subIDs)
res, err := c.transferSubscriptions(ctx, subIDs)
switch {
case err != nil:
dlog.Printf("transfer subscriptions failed. Recreating all subscriptions: %v", err)
Expand Down
14 changes: 11 additions & 3 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ import (
// Subscribe creates a Subscription with given parameters.
// Parameters that have not been set are set to their default values.
// See opcua.DefaultSubscription* constants
//
// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan<- *PublishNotificationData) (*Subscription, error) {
return c.SubscribeWithContext(context.Background(), params, notifyCh)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (c *Client) SubscribeWithContext(ctx context.Context, params *SubscriptionParameters, notifyCh chan<- *PublishNotificationData) (*Subscription, error) {
stats.Client().Add("Subscribe", 1)

if params == nil {
Expand All @@ -34,7 +42,7 @@ func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan<- *Publ
}

var res *ua.CreateSubscriptionResponse
err := c.Send(req, func(v interface{}) error {
err := c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -99,14 +107,14 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {

// transferSubscriptions ask the server to transfer the given subscriptions
// of the previous session to the current one.
func (c *Client) transferSubscriptions(ids []uint32) (*ua.TransferSubscriptionsResponse, error) {
func (c *Client) transferSubscriptions(ctx context.Context, ids []uint32) (*ua.TransferSubscriptionsResponse, error) {
req := &ua.TransferSubscriptionsRequest{
SubscriptionIDs: ids,
SendInitialValues: false,
}

var res *ua.TransferSubscriptionsResponse
err := c.Send(req, func(v interface{}) error {
err := c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
Expand Down
49 changes: 39 additions & 10 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ type PublishNotificationData struct {
func (s *Subscription) Cancel(ctx context.Context) error {
stats.Subscription().Add("Cancel", 1)
s.c.forgetSubscription(ctx, s.SubscriptionID)
return s.delete()
return s.delete(ctx)
}

// delete removes the subscription from the server.
func (s *Subscription) delete() error {
func (s *Subscription) delete(ctx context.Context) error {
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}

var res *ua.DeleteSubscriptionsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -111,7 +111,14 @@ func (s *Subscription) delete() error {
}
}

// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
return s.MonitorWithContext(context.Background(), ts, items...)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) MonitorWithContext(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
stats.Subscription().Add("Monitor", 1)
stats.Subscription().Add("MonitoredItems", int64(len(items)))

Expand All @@ -123,7 +130,7 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI
}

var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -146,7 +153,14 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI
return res, err
}

// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
return s.UnmonitorWithContext(context.Background(), monitoredItemIDs...)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) UnmonitorWithContext(ctx context.Context, monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
stats.Subscription().Add("Unmonitor", 1)
stats.Subscription().Add("UnmonitoredItems", int64(len(monitoredItemIDs)))

Expand All @@ -156,7 +170,7 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore
}

var res *ua.DeleteMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -172,7 +186,14 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore
return res, err
}

// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) {
return s.ModifyMonitoredItemsWithContext(context.Background(), ts, items...)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) ModifyMonitoredItemsWithContext(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) {
stats.Subscription().Add("ModifyMonitoredItems", 1)
stats.Subscription().Add("ModifiedMonitoredItems", int64(len(items)))

Expand All @@ -191,7 +212,7 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*
ItemsToModify: items,
}
var res *ua.ModifyMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -222,7 +243,15 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*
// SetTriggering sends a request to the server to add and/or remove triggering links from a triggering item.
// To add links from a triggering item to an item to report provide the server assigned ID(s) in the `add` argument.
// To remove links from a triggering item to an item to report provide the server assigned ID(s) in the `remove` argument.
//
// Note: Starting with v0.5 this method will require a context
// and the corresponding XXXWithContext(ctx) method will be removed.
func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) {
return s.SetTriggeringWithContext(context.Background(), triggeringItemID, add, remove)
}

// Note: Starting with v0.5 this method is superseded by the non 'WithContext' method.
func (s *Subscription) SetTriggeringWithContext(ctx context.Context, triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) {
stats.Subscription().Add("SetTriggering", 1)

// Part 4, 5.12.5.2 SetTriggering Service Parameters
Expand All @@ -234,7 +263,7 @@ func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint
}

var res *ua.SetTriggeringResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
Expand Down Expand Up @@ -319,7 +348,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(req, func(v interface{}) error {
_ = s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
Expand All @@ -336,7 +365,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
Priority: params.Priority,
}
var res *ua.CreateSubscriptionResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -379,7 +408,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
}

var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down

0 comments on commit 6c5af25

Please sign in to comment.