Skip to content

Commit

Permalink
add context to sendAsyncWithTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
CrowdHailer committed Jan 7, 2022
1 parent 2486dac commit ca2228a
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions uasc/secure_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ func (s *SecureChannel) Open(ctx context.Context) error {
}

func (s *SecureChannel) open(ctx context.Context, instance *channelInstance, requestType ua.SecurityTokenRequestType) error {
// TODO: do something with the context
defer s.rcvLocker.unlock()

s.openingMu.Lock()
Expand Down Expand Up @@ -539,7 +538,7 @@ func (s *SecureChannel) open(ctx context.Context, instance *channelInstance, req
RequestedLifetime: s.cfg.Lifetime,
}

return s.sendRequestWithTimeout(req, reqID, s.openingInstance, nil, s.cfg.RequestTimeout, func(v interface{}) error {
return s.sendRequestWithTimeout(ctx, req, reqID, s.openingInstance, nil, s.cfg.RequestTimeout, func(v interface{}) error {
resp, ok := v.(*ua.OpenSecureChannelResponse)
if !ok {
return errors.Errorf("got %T, want OpenSecureChannelResponse", v)
Expand Down Expand Up @@ -656,6 +655,7 @@ func (s *SecureChannel) scheduleExpiration(instance *channelInstance) {
}

func (s *SecureChannel) sendRequestWithTimeout(
ctx context.Context,
req ua.Request,
reqID uint32,
instance *channelInstance,
Expand All @@ -666,7 +666,7 @@ func (s *SecureChannel) sendRequestWithTimeout(
s.pendingReq.Add(1)
respRequired := h != nil

ch, err := s.sendAsyncWithTimeout(req, reqID, instance, authToken, respRequired, timeout)
ch, err := s.sendAsyncWithTimeout(ctx, req, reqID, instance, authToken, respRequired, timeout)
s.pendingReq.Done()
if err != nil {
return err
Expand All @@ -681,6 +681,9 @@ func (s *SecureChannel) sendRequestWithTimeout(
defer timer.Stop()

select {
case <-ctx.Done():
s.popHandler(reqID)
return ctx.Err()
case <-s.disconnected:
s.popHandler(reqID)
return io.EOF
Expand Down Expand Up @@ -724,16 +727,21 @@ func (s *SecureChannel) SendRequest(req ua.Request, authToken *ua.NodeID, h func
}

func (s *SecureChannel) SendRequestWithTimeout(req ua.Request, authToken *ua.NodeID, timeout time.Duration, h func(interface{}) error) error {
return s.SendRequestWithTimeoutWithContext(context.Background(), req, authToken, timeout, h)
}

func (s *SecureChannel) SendRequestWithTimeoutWithContext(ctx context.Context, req ua.Request, authToken *ua.NodeID, timeout time.Duration, h func(interface{}) error) error {
s.reqLocker.waitIfLock()
active, err := s.getActiveChannelInstance()
if err != nil {
return err
}

return s.sendRequestWithTimeout(req, s.nextRequestID(), active, authToken, timeout, h)
return s.sendRequestWithTimeout(ctx, req, s.nextRequestID(), active, authToken, timeout, h)
}

func (s *SecureChannel) sendAsyncWithTimeout(
ctx context.Context,
req ua.Request,
reqID uint32,
instance *channelInstance,
Expand Down Expand Up @@ -773,6 +781,11 @@ func (s *SecureChannel) sendAsyncWithTimeout(
}

for i, chunk := range chunks {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
if i > 0 { // fix sequence number on subsequent chunks
number := instance.nextSequenceNumber()
binary.LittleEndian.PutUint32(chunk[16:], uint32(number))
Expand Down

0 comments on commit ca2228a

Please sign in to comment.