Skip to content

Commit

Permalink
Merge pull request #1796 from fcrisciani/name-resolution-race
Browse files Browse the repository at this point in the history
Service discovery hardening
  • Loading branch information
mavenugo authored Jun 12, 2017
2 parents eb57059 + d64e71e commit 86ae3cf
Show file tree
Hide file tree
Showing 12 changed files with 660 additions and 158 deletions.
125 changes: 84 additions & 41 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
return nil
}

func (ep *endpoint) addServiceInfoToCluster() error {
func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
return nil
}
Expand All @@ -593,24 +593,49 @@ func (ep *endpoint) addServiceInfoToCluster() error {
return nil
}

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())

// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
// This is to handle a race between the EnableService and the sbLeave
// It is possible that the EnableService starts, fetches the list of the endpoints and
// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
// This check under the Service lock of the sandbox ensure the correct behavior.
// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
// removed from the list, in this situation the delete will bail out not finding any data to cleanup
// and the add will bail out not finding the endpoint on the sandbox.
if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}

c := n.getController()
agent := c.getAgent()

name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}

var ingressPorts []*PortConfig
if ep.svcID != "" {
// This is a task part of a service
// Gossip ingress ports only in ingress network.
if n.ingress {
ingressPorts = ep.ingressPorts
}

if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
}

name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}

buf, err := proto.Marshal(&EndpointRecord{
Expand All @@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
}
}

logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())

return nil
}

func (ep *endpoint) deleteServiceInfoFromCluster() error {
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
Expand All @@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
return nil
}

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

c := n.getController()
agent := c.getAgent()

if ep.svcID != "" && ep.Iface().Address() != nil {
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}

if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
return err
if ep.Iface().Address() != nil {
if ep.svcID != "" {
// This is a task part of a service
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
}
}

Expand All @@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
}
}

logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())

return nil
}

Expand Down Expand Up @@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
value = event.Value
case networkdb.UpdateEvent:
logrus.Errorf("Unexpected update service table event = %#v", event)
}

nw, err := c.NetworkByID(nid)
if err != nil {
logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
return
}
n := nw.(*network)

err = proto.Unmarshal(value, &epRec)
err := proto.Unmarshal(value, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal service table value: %v", err)
return
}

name := epRec.Name
containerName := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
ingressPorts := epRec.IngressPorts
aliases := epRec.Aliases
taskaliases := epRec.TaskAliases
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases

if name == "" || ip == nil {
if containerName == "" || ip == nil {
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}

if isAdd {
logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
}

n.addSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.addSvcRecords(alias, ip, nil, true)
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
}

n.deleteSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.deleteSvcRecords(alias, ip, nil, true)
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
}
}
2 changes: 1 addition & 1 deletion agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false;
// EndpointRecord specifies all the endpoint specific information that
// needs to gossiped to nodes participating in the network.
message EndpointRecord {
// Name of the endpoint
// Name of the container
string name = 1;

// Service name of the service to which this endpoint belongs.
Expand Down
123 changes: 123 additions & 0 deletions common/setmatrix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package common

import (
"sync"

mapset "github.com/deckarep/golang-set"
)

// SetMatrix is a map of Sets
type SetMatrix interface {
// Get returns the members of the set for a specific key as a slice.
Get(key string) ([]interface{}, bool)
// Contains is used to verify is an element is in a set for a specific key
// returns true if the element is in the set
// returns true if there is a set for the key
Contains(key string, value interface{}) (bool, bool)
// Insert inserts the mapping between the IP and the endpoint identifier
// returns true if the mapping was not present, false otherwise
// returns also the number of endpoints associated to the IP
Insert(key string, value interface{}) (bool, int)
// Remove removes the mapping between the IP and the endpoint identifier
// returns true if the mapping was deleted, false otherwise
// returns also the number of endpoints associated to the IP
Remove(key string, value interface{}) (bool, int)
// Cardinality returns the number of elements in the set of a specfic key
// returns false if the key is not in the map
Cardinality(key string) (int, bool)
// String returns the string version of the set, empty otherwise
// returns false if the key is not in the map
String(key string) (string, bool)
}

type setMatrix struct {
matrix map[string]mapset.Set

sync.Mutex
}

// NewSetMatrix creates a new set matrix object
func NewSetMatrix() SetMatrix {
s := &setMatrix{}
s.init()
return s
}

func (s *setMatrix) init() {
s.matrix = make(map[string]mapset.Set)
}

func (s *setMatrix) Get(key string) ([]interface{}, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return nil, ok
}
return set.ToSlice(), ok
}

func (s *setMatrix) Contains(key string, value interface{}) (bool, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return false, ok
}
return set.Contains(value), ok
}

func (s *setMatrix) Insert(key string, value interface{}) (bool, int) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
s.matrix[key] = mapset.NewSet()
s.matrix[key].Add(value)
return true, 1
}

return set.Add(value), set.Cardinality()
}

func (s *setMatrix) Remove(key string, value interface{}) (bool, int) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return false, 0
}

var removed bool
if set.Contains(value) {
set.Remove(value)
removed = true
// If the set is empty remove it from the matrix
if set.Cardinality() == 0 {
delete(s.matrix, key)
}
}

return removed, set.Cardinality()
}

func (s *setMatrix) Cardinality(key string) (int, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return 0, ok
}

return set.Cardinality(), ok
}

func (s *setMatrix) String(key string) (string, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return "", ok
}
return set.String(), ok
}
Loading

0 comments on commit 86ae3cf

Please sign in to comment.