Skip to content

Commit

Permalink
Refactor locking for join/leave to avoid race
Browse files Browse the repository at this point in the history
Instead of using "sync.Once" to determine whether to initialize a
network sandbox or subnet sandbox, we use a traditional mutex +
initialization boolean.  This is because the initialization state isn't
truly a once-and-done condition.  Rather, libnetwork destroys network
and subnet sandboxes when the last endpoint leaves them.  The use of
sync.Once in this kind of scenario requires, therefore, re-initializing
the Once which is impoissible.  So the approach that libnetwork
currently takes is to use a pointer to a Once and redirect that pointer
to a new Once on reset.  This leads to nasty race conditions.

In addition to refactoring the locking, this patch merges the functions
joinSandbox(), and joinSubnetSandbox(). This makes the code both cleaner
and it also holds the network and subnet locks through the series of
read-modify-writes avoiding further potential races.  This does reduce
the potential parallelism which could be applied should there be many
joins coming in on many different subnets in the same overlay network.
However, this should be an extremely minor performance hit for a very
obscure case.

One special case that requires attention is the fact that the join
operation notifies the PeerDB goroutine via a channel of the join.  This
can deadlock if the PeerDB is blocked accessing network state because
the join has it locked while the join is blocked sending to the PeerDB
channel.  This is handled by spawning a separate goroutine to post the
notification from the join.

Signed-off-by: Chris Telfer <ctelfer@docker.com>
  • Loading branch information
ctelfer committed May 16, 2018
1 parent 7d04e03 commit 26212fe
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 99 deletions.
10 changes: 1 addition & 9 deletions drivers/overlay/joinleave.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,10 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
return fmt.Errorf("couldn't get vxlan id for %q: %v", s.subnetIP.String(), err)
}

if err := n.joinSandbox(false); err != nil {
if err := n.joinSandbox(s, false, true); err != nil {
return fmt.Errorf("network sandbox join failed: %v", err)
}

if err := n.joinSubnetSandbox(s, false); err != nil {
return fmt.Errorf("subnet sandbox join failed for %q: %v", s.subnetIP.String(), err)
}

// joinSubnetSandbox gets called when an endpoint comes up on a new subnet in the
// overlay network. Hence the Endpoint count should be updated outside joinSubnetSandbox
n.incEndpointCount()

sbox := n.sandbox()

overlayIfName, containerIfName, err := createVethPair()
Expand Down
134 changes: 62 additions & 72 deletions drivers/overlay/ov_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
type networkTable map[string]*network

type subnet struct {
once *sync.Once
sboxInit bool
vxlanName string
brName string
vni uint32
Expand All @@ -63,7 +63,7 @@ type network struct {
endpoints endpointTable
driver *driver
joinCnt int
once *sync.Once
sboxInit bool
initEpoch int
initErr error
subnets []*subnet
Expand Down Expand Up @@ -150,7 +150,6 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
id: id,
driver: d,
endpoints: endpointTable{},
once: &sync.Once{},
subnets: []*subnet{},
}

Expand Down Expand Up @@ -193,7 +192,6 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d
s := &subnet{
subnetIP: ipd.Pool,
gwIP: ipd.Gateway,
once: &sync.Once{},
}

if len(vnis) != 0 {
Expand Down Expand Up @@ -296,29 +294,41 @@ func (d *driver) RevokeExternalConnectivity(nid, eid string) error {
return nil
}

func (n *network) incEndpointCount() {
func (n *network) joinSandbox(s *subnet, restore bool, incJoinCount bool) error {
// If there is a race between two go routines here only one will win
// the other will wait.
networkOnce.Do(networkOnceInit)

n.Lock()
defer n.Unlock()
n.joinCnt++
}

func (n *network) joinSandbox(restore bool) error {
// If there is a race between two go routines here only one will win
// the other will wait.
n.once.Do(func() {
// save the error status of initSandbox in n.initErr so that
// all the racing go routines are able to know the status.
if !n.sboxInit {
n.initErr = n.initSandbox(restore)
})
n.sboxInit = true // we cannot recover from this error
}

return n.initErr
}
if n.initErr != nil {
return fmt.Errorf("network sandbox join failed: %v", n.initErr)
}

func (n *network) joinSubnetSandbox(s *subnet, restore bool) error {
s.once.Do(func() {
s.initErr = n.initSubnetSandbox(s, restore)
})
return s.initErr
subnetErr := s.initErr
if !s.sboxInit {
subnetErr = n.initSubnetSandbox(s, restore)
// We can recover from these errors, but not on restore
if restore || subnetErr == nil {
s.initErr = subnetErr
s.sboxInit = true
}
}
if subnetErr != nil {
return fmt.Errorf("subnet sandbox join failed for %q: %v", s.subnetIP.String(), subnetErr)
}

if incJoinCount {
n.joinCnt++
}

return nil
}

func (n *network) leaveSandbox() {
Expand All @@ -329,15 +339,14 @@ func (n *network) leaveSandbox() {
return
}

// We are about to destroy sandbox since the container is leaving the network
// Reinitialize the once variable so that we will be able to trigger one time
// sandbox initialization(again) when another container joins subsequently.
n.once = &sync.Once{}
n.destroySandbox()

n.sboxInit = false
n.initErr = nil
for _, s := range n.subnets {
s.once = &sync.Once{}
s.sboxInit = false
s.initErr = nil
}

n.destroySandbox()
}

// to be called while holding network lock
Expand Down Expand Up @@ -470,7 +479,7 @@ func (n *network) generateVxlanName(s *subnet) string {
id = n.id[:5]
}

return "vx-" + fmt.Sprintf("%06x", n.vxlanID(s)) + "-" + id
return fmt.Sprintf("vx-%06x-%v", s.vni, id)
}

func (n *network) generateBridgeName(s *subnet) string {
Expand All @@ -483,7 +492,7 @@ func (n *network) generateBridgeName(s *subnet) string {
}

func (n *network) getBridgeNamePrefix(s *subnet) string {
return "ov-" + fmt.Sprintf("%06x", n.vxlanID(s))
return fmt.Sprintf("ov-%06x", s.vni)
}

func checkOverlap(nw *net.IPNet) error {
Expand All @@ -505,7 +514,7 @@ func checkOverlap(nw *net.IPNet) error {
}

func (n *network) restoreSubnetSandbox(s *subnet, brName, vxlanName string) error {
sbox := n.sandbox()
sbox := n.sbox

// restore overlay osl sandbox
Ifaces := make(map[string][]osl.IfaceOption)
Expand Down Expand Up @@ -534,7 +543,7 @@ func (n *network) setupSubnetSandbox(s *subnet, brName, vxlanName string) error
deleteInterfaceBySubnet(n.getBridgeNamePrefix(s), s)
}
// Try to delete the vxlan interface by vni if already present
deleteVxlanByVNI("", n.vxlanID(s))
deleteVxlanByVNI("", s.vni)

if err := checkOverlap(s.subnetIP); err != nil {
return err
Expand All @@ -548,32 +557,32 @@ func (n *network) setupSubnetSandbox(s *subnet, brName, vxlanName string) error
// it must a stale namespace from previous
// life. Destroy it completely and reclaim resourced.
networkMu.Lock()
path, ok := vniTbl[n.vxlanID(s)]
path, ok := vniTbl[s.vni]
networkMu.Unlock()

if ok {
deleteVxlanByVNI(path, n.vxlanID(s))
deleteVxlanByVNI(path, s.vni)
if err := syscall.Unmount(path, syscall.MNT_FORCE); err != nil {
logrus.Errorf("unmount of %s failed: %v", path, err)
}
os.Remove(path)

networkMu.Lock()
delete(vniTbl, n.vxlanID(s))
delete(vniTbl, s.vni)
networkMu.Unlock()
}
}

// create a bridge and vxlan device for this subnet and move it to the sandbox
sbox := n.sandbox()
sbox := n.sbox

if err := sbox.AddInterface(brName, "br",
sbox.InterfaceOptions().Address(s.gwIP),
sbox.InterfaceOptions().Bridge(true)); err != nil {
return fmt.Errorf("bridge creation in sandbox failed for subnet %q: %v", s.subnetIP.String(), err)
}

err := createVxlan(vxlanName, n.vxlanID(s), n.maxMTU())
err := createVxlan(vxlanName, s.vni, n.maxMTU())
if err != nil {
return err
}
Expand Down Expand Up @@ -638,17 +647,12 @@ func (n *network) initSubnetSandbox(s *subnet, restore bool) error {
}
} else {
if err := n.setupSubnetSandbox(s, brName, vxlanName); err != nil {
// The error in setupSubnetSandbox could be a temporary glitch. reset the
// subnet once object to allow the setup to be retried on another endpoint join.
s.once = &sync.Once{}
return err
}
}

n.Lock()
s.vxlanName = vxlanName
s.brName = brName
n.Unlock()

return nil
}
Expand Down Expand Up @@ -689,11 +693,7 @@ func (n *network) cleanupStaleSandboxes() {
}

func (n *network) initSandbox(restore bool) error {
n.Lock()
n.initEpoch++
n.Unlock()

networkOnce.Do(networkOnceInit)

if !restore {
if hostMode {
Expand Down Expand Up @@ -723,11 +723,11 @@ func (n *network) initSandbox(restore bool) error {
}

// this is needed to let the peerAdd configure the sandbox
n.setSandbox(sbox)
n.sbox = sbox

if !restore {
// Initialize the sandbox with all the peers previously received from networkdb
n.driver.initSandboxPeerDB(n.id)
go n.driver.initSandboxPeerDB(n.id)
}

// If we are in swarm mode, we don't need anymore the watchMiss routine.
Expand All @@ -746,7 +746,7 @@ func (n *network) initSandbox(restore bool) error {
tv := syscall.NsecToTimeval(soTimeout.Nanoseconds())
err = nlSock.SetReceiveTimeout(&tv)
})
n.setNetlinkSocket(nlSock)
n.nlSocket = nlSock

if err == nil {
go n.watchMiss(nlSock, key)
Expand Down Expand Up @@ -848,7 +848,6 @@ func (d *driver) restoreNetworkFromStore(nid string) *network {
if n != nil {
n.driver = d
n.endpoints = endpointTable{}
n.once = &sync.Once{}
d.networks[nid] = n
}
return n
Expand Down Expand Up @@ -881,26 +880,12 @@ func (d *driver) getNetworkFromStore(nid string) *network {
func (n *network) sandbox() osl.Sandbox {
n.Lock()
defer n.Unlock()

return n.sbox
}

func (n *network) setSandbox(sbox osl.Sandbox) {
n.Lock()
n.sbox = sbox
n.Unlock()
}

func (n *network) setNetlinkSocket(nlSk *nl.NetlinkSocket) {
n.Lock()
n.nlSocket = nlSk
n.Unlock()
}

func (n *network) vxlanID(s *subnet) uint32 {
n.Lock()
defer n.Unlock()

return s.vni
}

Expand Down Expand Up @@ -1009,7 +994,6 @@ func (n *network) SetValue(value []byte) error {
subnetIP: subnetIP,
gwIP: gwIP,
vni: vni,
once: &sync.Once{},
}
n.subnets = append(n.subnets, s)
} else {
Expand All @@ -1035,7 +1019,10 @@ func (n *network) writeToStore() error {
}

func (n *network) releaseVxlanID() ([]uint32, error) {
if len(n.subnets) == 0 {
n.Lock()
nSubnets := len(n.subnets)
n.Unlock()
if nSubnets == 0 {
return nil, nil
}

Expand All @@ -1051,22 +1038,25 @@ func (n *network) releaseVxlanID() ([]uint32, error) {
}
}
var vnis []uint32
n.Lock()
for _, s := range n.subnets {
if n.driver.vxlanIdm != nil {
vni := n.vxlanID(s)
vnis = append(vnis, vni)
n.driver.vxlanIdm.Release(uint64(vni))
vnis = append(vnis, s.vni)
}
s.vni = 0
}
n.Unlock()

n.setVxlanID(s, 0)
for _, vni := range vnis {
n.driver.vxlanIdm.Release(uint64(vni))
}

return vnis, nil
}

func (n *network) obtainVxlanID(s *subnet) error {
//return if the subnet already has a vxlan id assigned
if s.vni != 0 {
if n.vxlanID(s) != 0 {
return nil
}

Expand All @@ -1079,7 +1069,7 @@ func (n *network) obtainVxlanID(s *subnet) error {
return fmt.Errorf("getting network %q from datastore failed %v", n.id, err)
}

if s.vni == 0 {
if n.vxlanID(s) == 0 {
vxlanID, err := n.driver.vxlanIdm.GetID(true)
if err != nil {
return fmt.Errorf("failed to allocate vxlan id: %v", err)
Expand Down
Loading

0 comments on commit 26212fe

Please sign in to comment.