Skip to content

Commit

Permalink
add context to subscription methods
Browse files Browse the repository at this point in the history
  • Loading branch information
CrowdHailer committed Jan 20, 2022
1 parent 36fac3f commit 560b53a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (c *Client) monitor(ctx context.Context) {

// try to transfer all subscriptions to the new session and
// recreate them all if that fails.
res, err := c.transferSubscriptions(subIDs)
res, err := c.transferSubscriptions(ctx, subIDs)
switch {
case err != nil:
dlog.Printf("transfer subscriptions failed. Recreating all subscriptions: %v", err)
Expand Down
9 changes: 6 additions & 3 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
// Parameters that have not been set are set to their default values.
// See opcua.DefaultSubscription* constants
func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan<- *PublishNotificationData) (*Subscription, error) {
return c.SubscribeWithContext(context.Background(), params, notifyCh)
}
func (c *Client) SubscribeWithContext(ctx context.Context, params *SubscriptionParameters, notifyCh chan<- *PublishNotificationData) (*Subscription, error) {
stats.Client().Add("Subscribe", 1)

if params == nil {
Expand All @@ -34,7 +37,7 @@ func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan<- *Publ
}

var res *ua.CreateSubscriptionResponse
err := c.Send(req, func(v interface{}) error {
err := c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -99,14 +102,14 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {

// transferSubscriptions ask the server to transfer the given subscriptions
// of the previous session to the current one.
func (c *Client) transferSubscriptions(ids []uint32) (*ua.TransferSubscriptionsResponse, error) {
func (c *Client) transferSubscriptions(ctx context.Context, ids []uint32) (*ua.TransferSubscriptionsResponse, error) {
req := &ua.TransferSubscriptionsRequest{
SubscriptionIDs: ids,
SendInitialValues: false,
}

var res *ua.TransferSubscriptionsResponse
err := c.Send(req, func(v interface{}) error {
err := c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
Expand Down
32 changes: 22 additions & 10 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ type PublishNotificationData struct {
func (s *Subscription) Cancel(ctx context.Context) error {
stats.Subscription().Add("Cancel", 1)
s.c.forgetSubscription(ctx, s.SubscriptionID)
return s.delete()
return s.delete(ctx)
}

// delete removes the subscription from the server.
func (s *Subscription) delete() error {
func (s *Subscription) delete(ctx context.Context) error {
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}

var res *ua.DeleteSubscriptionsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -112,6 +112,9 @@ func (s *Subscription) delete() error {
}

func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
return s.MonitorWithContext(context.Background(), ts, items...)
}
func (s *Subscription) MonitorWithContext(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) {
stats.Subscription().Add("Monitor", 1)
stats.Subscription().Add("MonitoredItems", int64(len(items)))

Expand All @@ -123,7 +126,7 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI
}

var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -147,6 +150,9 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI
}

func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
return s.UnmonitorWithContext(context.Background(), monitoredItemIDs...)
}
func (s *Subscription) UnmonitorWithContext(ctx context.Context, monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) {
stats.Subscription().Add("Unmonitor", 1)
stats.Subscription().Add("UnmonitoredItems", int64(len(monitoredItemIDs)))

Expand All @@ -156,7 +162,7 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore
}

var res *ua.DeleteMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})

Expand All @@ -173,6 +179,9 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore
}

func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) {
return s.ModifyMonitoredItemsWithContext(context.Background(), ts, items...)
}
func (s *Subscription) ModifyMonitoredItemsWithContext(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) {
stats.Subscription().Add("ModifyMonitoredItems", 1)
stats.Subscription().Add("ModifiedMonitoredItems", int64(len(items)))

Expand All @@ -191,7 +200,7 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*
ItemsToModify: items,
}
var res *ua.ModifyMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -223,6 +232,9 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*
// To add links from a triggering item to an item to report provide the server assigned ID(s) in the `add` argument.
// To remove links from a triggering item to an item to report provide the server assigned ID(s) in the `remove` argument.
func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) {
return s.SetTriggeringWithContext(context.Background(), triggeringItemID, add, remove)
}
func (s *Subscription) SetTriggeringWithContext(ctx context.Context, triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) {
stats.Subscription().Add("SetTriggering", 1)

// Part 4, 5.12.5.2 SetTriggering Service Parameters
Expand All @@ -234,7 +246,7 @@ func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint
}

var res *ua.SetTriggeringResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
return res, err
Expand Down Expand Up @@ -319,7 +331,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(req, func(v interface{}) error {
_ = s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
Expand All @@ -336,7 +348,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
Priority: params.Priority,
}
var res *ua.CreateSubscriptionResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down Expand Up @@ -379,7 +391,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
}

var res *ua.CreateMonitoredItemsResponse
err := s.c.Send(req, func(v interface{}) error {
err := s.c.SendWithContext(ctx, req, func(v interface{}) error {
return safeAssign(v, &res)
})
if err != nil {
Expand Down

0 comments on commit 560b53a

Please sign in to comment.