Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to client methods #542

Merged
merged 7 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 151 additions & 27 deletions client.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Client) transferSubscriptions(ids []uint32) (*ua.TransferSubscriptionsR
}

// republishSubscriptions sends republish requests for the given subscription id.
func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
func (c *Client) republishSubscription(ctx context.Context, id uint32, availableSeq []uint32) error {
c.subMux.RLock()
defer c.subMux.RUnlock()

Expand All @@ -123,7 +123,7 @@ func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
}

debug.Printf("republishing subscription %d", sub.SubscriptionID)
if err := c.sendRepublishRequests(sub, availableSeq); err != nil {
if err := c.sendRepublishRequests(ctx, sub, availableSeq); err != nil {
status, ok := err.(ua.StatusCode)
if !ok {
return err
Expand All @@ -144,7 +144,7 @@ func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
// sendRepublishRequests sends republish requests for the given subscription
// until it gets a BadMessageNotAvailable which implies that there are no
// more messages to restore.
func (c *Client) sendRepublishRequests(sub *Subscription, availableSeq []uint32) error {
func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, availableSeq []uint32) error {
// todo(fs): check if sub.nextSeq is in the available sequence numbers
// todo(fs): if not then we need to decide whether we fail b/c of data loss
// todo(fs): or whether we log it and continue.
Expand All @@ -170,7 +170,7 @@ func (c *Client) sendRepublishRequests(sub *Subscription, availableSeq []uint32)

debug.Printf("RepublishRequest: req=%s", debug.ToJSON(req))
var res *ua.RepublishResponse
err := c.SecureChannel().SendRequest(req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
err := c.SecureChannel().SendRequestWithContext(ctx, req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
return safeAssign(v, &res)
})
debug.Printf("RepublishResponse: res=%s err=%v", debug.ToJSON(res), err)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (c *Client) publish(ctx context.Context) error {

// send the next publish request
// note that res contains data even if an error was returned
res, err := c.sendPublishRequest()
res, err := c.sendPublishRequest(ctx)
stats.RecordError(err)
switch {
case err == io.EOF:
Expand Down Expand Up @@ -508,7 +508,7 @@ func (c *Client) handleNotification(ctx context.Context, sub *Subscription, res
})
}

func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) {
func (c *Client) sendPublishRequest(ctx context.Context) (*ua.PublishResponse, error) {
dlog := debug.NewPrefixLogger("publish: ")

c.subMux.RLock()
Expand All @@ -522,7 +522,7 @@ func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) {

dlog.Printf("PublishRequest: %s", debug.ToJSON(req))
var res *ua.PublishResponse
err := c.sendWithTimeout(req, c.publishTimeout(), func(v interface{}) error {
err := c.sendWithTimeout(ctx, req, c.publishTimeout(), func(v interface{}) error {
return safeAssign(v, &res)
})
stats.RecordError(err)
Expand Down
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opcua

import (
"context"
"testing"

"github.com/gopcua/opcua/id"
Expand All @@ -10,7 +11,7 @@ import (

func TestClient_Send_DoesNotPanicWhenDisconnected(t *testing.T) {
c := NewClient("opc.tcp://example.com:4840")
err := c.Send(&ua.ReadRequest{}, func(i interface{}) error {
err := c.SendWithContext(context.Background(), &ua.ReadRequest{}, func(i interface{}) error {
return nil
})
verify.Values(t, "", err, ua.StatusBadServerNotConnected)
Expand Down
8 changes: 4 additions & 4 deletions examples/accesslevel/accesslevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
log.Fatal(err)
}

n := c.Node(id)
accessLevel, err := n.AccessLevel()
accessLevel, err := n.AccessLevelWithContext(ctx)
if err != nil {
log.Fatal(err)
}
log.Print("AccessLevel: ", accessLevel)

userAccessLevel, err := n.UserAccessLevel()
userAccessLevel, err := n.UserAccessLevelWithContext(ctx)
if err != nil {
log.Fatal(err)
}
log.Print("UserAccessLevel: ", userAccessLevel)

v, err := n.Value()
v, err := n.ValueWithContext(ctx)
switch {
case err != nil:
log.Fatal(err)
Expand Down
12 changes: 6 additions & 6 deletions examples/browse/browse.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func join(a, b string) string {
return a + "." + b
}

func browse(n *opcua.Node, path string, level int) ([]NodeDef, error) {
func browse(ctx context.Context, n *opcua.Node, path string, level int) ([]NodeDef, error) {
// fmt.Printf("node:%s path:%q level:%d\n", n, path, level)
if level > 10 {
return nil, nil
}

attrs, err := n.Attributes(ua.AttributeIDNodeClass, ua.AttributeIDBrowseName, ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType)
attrs, err := n.AttributesWithContext(ctx, ua.AttributeIDNodeClass, ua.AttributeIDBrowseName, ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,13 +138,13 @@ func browse(n *opcua.Node, path string, level int) ([]NodeDef, error) {
}

browseChildren := func(refType uint32) error {
refs, err := n.ReferencedNodes(refType, ua.BrowseDirectionForward, ua.NodeClassAll, true)
refs, err := n.ReferencedNodesWithContext(ctx, refType, ua.BrowseDirectionForward, ua.NodeClassAll, true)
if err != nil {
return errors.Errorf("References: %d: %s", refType, err)
}
// fmt.Printf("found %d child refs\n", len(refs))
for _, rn := range refs {
children, err := browse(rn, def.Path, level+1)
children, err := browse(ctx, rn, def.Path, level+1)
if err != nil {
return errors.Errorf("browse children: %s", err)
}
Expand Down Expand Up @@ -178,14 +178,14 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
log.Fatalf("invalid node id: %s", err)
}

nodeList, err := browse(c.Node(id), "", 0)
nodeList, err := browse(ctx, c.Node(id), "", 0)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/crypto/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

// Use our connection (read the server's time)
v, err := c.Node(ua.NewNumericNodeID(0, 2258)).Value()
Expand Down
4 changes: 2 additions & 2 deletions examples/datetime/datetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

v, err := c.Node(ua.NewNumericNodeID(0, 2258)).Value()
v, err := c.Node(ua.NewNumericNodeID(0, 2258)).ValueWithContext(ctx)
switch {
case err != nil:
log.Fatal(err)
Expand Down
6 changes: 3 additions & 3 deletions examples/history-read/history-read.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func main() {

// HistoryRead with ContinuationPoint use
nodesToRequest := []*ua.HistoryReadValueID{
&ua.HistoryReadValueID{
{
NodeID: id,
DataEncoding: &ua.QualifiedName{},
},
Expand All @@ -54,7 +54,7 @@ func main() {
// Reset old nodes
nodesToRequest = make([]*ua.HistoryReadValueID, 0)

data, err := c.HistoryReadRawModified(nodes, &ua.ReadRawModifiedDetails{
data, err := c.HistoryReadRawModifiedWithContext(ctx, nodes, &ua.ReadRawModifiedDetails{
IsReadModified: false,
StartTime: time.Now().UTC().AddDate(0, -1, 0),
EndTime: time.Now().UTC().AddDate(0, 1, 0),
Expand Down
4 changes: 2 additions & 2 deletions examples/method/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

in := int64(12)
req := &ua.CallMethodRequest{
Expand All @@ -37,7 +37,7 @@ func main() {
InputArguments: []*ua.Variant{ua.MustVariant(in)},
}

resp, err := c.Call(req)
resp, err := c.CallWithContext(ctx, req)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
log.Fatal(err)
}

defer c.Close()
defer c.CloseSessionWithContext(ctx)

m, err := monitor.NewNodeMonitor(c)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/read/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
Expand All @@ -44,7 +44,7 @@ func main() {
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := c.Read(req)
resp, err := c.ReadWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
Expand Down
10 changes: 5 additions & 5 deletions examples/regread/regread.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
log.Fatalf("invalid node id: %v", err)
}

regResp, err := c.RegisterNodes(&ua.RegisterNodesRequest{
regResp, err := c.RegisterNodesWithContext(ctx, &ua.RegisterNodesRequest{
NodesToRegister: []*ua.NodeID{id},
})
if err != nil {
Expand All @@ -46,12 +46,12 @@ func main() {
req := &ua.ReadRequest{
MaxAge: 2000,
NodesToRead: []*ua.ReadValueID{
&ua.ReadValueID{NodeID: regResp.RegisteredNodeIDs[0]},
{NodeID: regResp.RegisteredNodeIDs[0]},
},
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := c.Read(req)
resp, err := c.ReadWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
Expand All @@ -60,7 +60,7 @@ func main() {
}
log.Print(resp.Results[0].Value.Value())

_, err = c.UnregisterNodes(&ua.UnregisterNodesRequest{
_, err = c.UnregisterNodesWithContext(ctx, &ua.UnregisterNodesRequest{
NodesToUnregister: []*ua.NodeID{id},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

notifyCh := make(chan *opcua.PublishNotificationData)

Expand Down
4 changes: 2 additions & 2 deletions examples/translate/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

root := c.Node(ua.NewTwoByteNodeID(id.ObjectsFolder))
nodeID, err := root.TranslateBrowsePathInNamespaceToNodeID(uint16(*ns), *nodePath)
nodeID, err := root.TranslateBrowsePathInNamespaceToNodeIDWithContext(ctx, uint16(*ns), *nodePath)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

notifyCh := make(chan *opcua.PublishNotificationData)

Expand Down
4 changes: 2 additions & 2 deletions examples/udt/udt.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

v, err := c.Node(id).Value()
v, err := c.Node(id).ValueWithContext(ctx)
switch {
case err != nil:
log.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions examples/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
Expand All @@ -55,7 +55,7 @@ func main() {
},
}

resp, err := c.Write(req)
resp, err := c.WriteWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
Expand Down
Loading