Skip to content

Commit

Permalink
Merge pull request #542 from northvolt/add-context-to-public
Browse files Browse the repository at this point in the history
Add context to client methods
  • Loading branch information
magiconair authored Jan 20, 2022
2 parents f9d631d + c7e5dd2 commit c753e3d
Show file tree
Hide file tree
Showing 19 changed files with 361 additions and 91 deletions.
178 changes: 151 additions & 27 deletions client.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Client) transferSubscriptions(ids []uint32) (*ua.TransferSubscriptionsR
}

// republishSubscriptions sends republish requests for the given subscription id.
func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
func (c *Client) republishSubscription(ctx context.Context, id uint32, availableSeq []uint32) error {
c.subMux.RLock()
defer c.subMux.RUnlock()

Expand All @@ -123,7 +123,7 @@ func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
}

debug.Printf("republishing subscription %d", sub.SubscriptionID)
if err := c.sendRepublishRequests(sub, availableSeq); err != nil {
if err := c.sendRepublishRequests(ctx, sub, availableSeq); err != nil {
status, ok := err.(ua.StatusCode)
if !ok {
return err
Expand All @@ -144,7 +144,7 @@ func (c *Client) republishSubscription(id uint32, availableSeq []uint32) error {
// sendRepublishRequests sends republish requests for the given subscription
// until it gets a BadMessageNotAvailable which implies that there are no
// more messages to restore.
func (c *Client) sendRepublishRequests(sub *Subscription, availableSeq []uint32) error {
func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, availableSeq []uint32) error {
// todo(fs): check if sub.nextSeq is in the available sequence numbers
// todo(fs): if not then we need to decide whether we fail b/c of data loss
// todo(fs): or whether we log it and continue.
Expand All @@ -170,7 +170,7 @@ func (c *Client) sendRepublishRequests(sub *Subscription, availableSeq []uint32)

debug.Printf("RepublishRequest: req=%s", debug.ToJSON(req))
var res *ua.RepublishResponse
err := c.SecureChannel().SendRequest(req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
err := c.SecureChannel().SendRequestWithContext(ctx, req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
return safeAssign(v, &res)
})
debug.Printf("RepublishResponse: res=%s err=%v", debug.ToJSON(res), err)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (c *Client) publish(ctx context.Context) error {

// send the next publish request
// note that res contains data even if an error was returned
res, err := c.sendPublishRequest()
res, err := c.sendPublishRequest(ctx)
stats.RecordError(err)
switch {
case err == io.EOF:
Expand Down Expand Up @@ -508,7 +508,7 @@ func (c *Client) handleNotification(ctx context.Context, sub *Subscription, res
})
}

func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) {
func (c *Client) sendPublishRequest(ctx context.Context) (*ua.PublishResponse, error) {
dlog := debug.NewPrefixLogger("publish: ")

c.subMux.RLock()
Expand All @@ -522,7 +522,7 @@ func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) {

dlog.Printf("PublishRequest: %s", debug.ToJSON(req))
var res *ua.PublishResponse
err := c.sendWithTimeout(req, c.publishTimeout(), func(v interface{}) error {
err := c.sendWithTimeout(ctx, req, c.publishTimeout(), func(v interface{}) error {
return safeAssign(v, &res)
})
stats.RecordError(err)
Expand Down
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opcua

import (
"context"
"testing"

"github.com/gopcua/opcua/id"
Expand All @@ -10,7 +11,7 @@ import (

func TestClient_Send_DoesNotPanicWhenDisconnected(t *testing.T) {
c := NewClient("opc.tcp://example.com:4840")
err := c.Send(&ua.ReadRequest{}, func(i interface{}) error {
err := c.SendWithContext(context.Background(), &ua.ReadRequest{}, func(i interface{}) error {
return nil
})
verify.Values(t, "", err, ua.StatusBadServerNotConnected)
Expand Down
8 changes: 4 additions & 4 deletions examples/accesslevel/accesslevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
log.Fatal(err)
}

n := c.Node(id)
accessLevel, err := n.AccessLevel()
accessLevel, err := n.AccessLevelWithContext(ctx)
if err != nil {
log.Fatal(err)
}
log.Print("AccessLevel: ", accessLevel)

userAccessLevel, err := n.UserAccessLevel()
userAccessLevel, err := n.UserAccessLevelWithContext(ctx)
if err != nil {
log.Fatal(err)
}
log.Print("UserAccessLevel: ", userAccessLevel)

v, err := n.Value()
v, err := n.ValueWithContext(ctx)
switch {
case err != nil:
log.Fatal(err)
Expand Down
12 changes: 6 additions & 6 deletions examples/browse/browse.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func join(a, b string) string {
return a + "." + b
}

func browse(n *opcua.Node, path string, level int) ([]NodeDef, error) {
func browse(ctx context.Context, n *opcua.Node, path string, level int) ([]NodeDef, error) {
// fmt.Printf("node:%s path:%q level:%d\n", n, path, level)
if level > 10 {
return nil, nil
}

attrs, err := n.Attributes(ua.AttributeIDNodeClass, ua.AttributeIDBrowseName, ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType)
attrs, err := n.AttributesWithContext(ctx, ua.AttributeIDNodeClass, ua.AttributeIDBrowseName, ua.AttributeIDDescription, ua.AttributeIDAccessLevel, ua.AttributeIDDataType)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,13 +138,13 @@ func browse(n *opcua.Node, path string, level int) ([]NodeDef, error) {
}

browseChildren := func(refType uint32) error {
refs, err := n.ReferencedNodes(refType, ua.BrowseDirectionForward, ua.NodeClassAll, true)
refs, err := n.ReferencedNodesWithContext(ctx, refType, ua.BrowseDirectionForward, ua.NodeClassAll, true)
if err != nil {
return errors.Errorf("References: %d: %s", refType, err)
}
// fmt.Printf("found %d child refs\n", len(refs))
for _, rn := range refs {
children, err := browse(rn, def.Path, level+1)
children, err := browse(ctx, rn, def.Path, level+1)
if err != nil {
return errors.Errorf("browse children: %s", err)
}
Expand Down Expand Up @@ -178,14 +178,14 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
log.Fatalf("invalid node id: %s", err)
}

nodeList, err := browse(c.Node(id), "", 0)
nodeList, err := browse(ctx, c.Node(id), "", 0)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/crypto/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

// Use our connection (read the server's time)
v, err := c.Node(ua.NewNumericNodeID(0, 2258)).Value()
Expand Down
4 changes: 2 additions & 2 deletions examples/datetime/datetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

v, err := c.Node(ua.NewNumericNodeID(0, 2258)).Value()
v, err := c.Node(ua.NewNumericNodeID(0, 2258)).ValueWithContext(ctx)
switch {
case err != nil:
log.Fatal(err)
Expand Down
6 changes: 3 additions & 3 deletions examples/history-read/history-read.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func main() {

// HistoryRead with ContinuationPoint use
nodesToRequest := []*ua.HistoryReadValueID{
&ua.HistoryReadValueID{
{
NodeID: id,
DataEncoding: &ua.QualifiedName{},
},
Expand All @@ -54,7 +54,7 @@ func main() {
// Reset old nodes
nodesToRequest = make([]*ua.HistoryReadValueID, 0)

data, err := c.HistoryReadRawModified(nodes, &ua.ReadRawModifiedDetails{
data, err := c.HistoryReadRawModifiedWithContext(ctx, nodes, &ua.ReadRawModifiedDetails{
IsReadModified: false,
StartTime: time.Now().UTC().AddDate(0, -1, 0),
EndTime: time.Now().UTC().AddDate(0, 1, 0),
Expand Down
4 changes: 2 additions & 2 deletions examples/method/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

in := int64(12)
req := &ua.CallMethodRequest{
Expand All @@ -37,7 +37,7 @@ func main() {
InputArguments: []*ua.Variant{ua.MustVariant(in)},
}

resp, err := c.Call(req)
resp, err := c.CallWithContext(ctx, req)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
log.Fatal(err)
}

defer c.Close()
defer c.CloseSessionWithContext(ctx)

m, err := monitor.NewNodeMonitor(c)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/read/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
Expand All @@ -44,7 +44,7 @@ func main() {
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := c.Read(req)
resp, err := c.ReadWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
Expand Down
10 changes: 5 additions & 5 deletions examples/regread/regread.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
log.Fatalf("invalid node id: %v", err)
}

regResp, err := c.RegisterNodes(&ua.RegisterNodesRequest{
regResp, err := c.RegisterNodesWithContext(ctx, &ua.RegisterNodesRequest{
NodesToRegister: []*ua.NodeID{id},
})
if err != nil {
Expand All @@ -46,12 +46,12 @@ func main() {
req := &ua.ReadRequest{
MaxAge: 2000,
NodesToRead: []*ua.ReadValueID{
&ua.ReadValueID{NodeID: regResp.RegisteredNodeIDs[0]},
{NodeID: regResp.RegisteredNodeIDs[0]},
},
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := c.Read(req)
resp, err := c.ReadWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
Expand All @@ -60,7 +60,7 @@ func main() {
}
log.Print(resp.Results[0].Value.Value())

_, err = c.UnregisterNodes(&ua.UnregisterNodesRequest{
_, err = c.UnregisterNodesWithContext(ctx, &ua.UnregisterNodesRequest{
NodesToUnregister: []*ua.NodeID{id},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

notifyCh := make(chan *opcua.PublishNotificationData)

Expand Down
4 changes: 2 additions & 2 deletions examples/translate/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

root := c.Node(ua.NewTwoByteNodeID(id.ObjectsFolder))
nodeID, err := root.TranslateBrowsePathInNamespaceToNodeID(uint16(*ns), *nodePath)
nodeID, err := root.TranslateBrowsePathInNamespaceToNodeIDWithContext(ctx, uint16(*ns), *nodePath)
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

notifyCh := make(chan *opcua.PublishNotificationData)

Expand Down
4 changes: 2 additions & 2 deletions examples/udt/udt.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

v, err := c.Node(id).Value()
v, err := c.Node(id).ValueWithContext(ctx)
switch {
case err != nil:
log.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions examples/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
defer c.CloseSessionWithContext(ctx)

id, err := ua.ParseNodeID(*nodeID)
if err != nil {
Expand All @@ -55,7 +55,7 @@ func main() {
},
}

resp, err := c.Write(req)
resp, err := c.WriteWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
Expand Down
Loading

0 comments on commit c753e3d

Please sign in to comment.