Skip to content

Commit

Permalink
feat(CSI-334): allow using multiple mounters in parallel on same server
Browse files Browse the repository at this point in the history
For example to be able to connect to different clusters by different protocols
  • Loading branch information
sergeyberezansky committed Feb 18, 2025
1 parent fdf133b commit 36797e7
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 45 deletions.
14 changes: 9 additions & 5 deletions pkg/wekafs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ControllerServer struct {
csi.UnimplementedControllerServer
caps []*csi.ControllerServiceCapability
nodeID string
mounter AnyMounter
mounters *MounterGroup
api *ApiStore
config *DriverConfig
semaphores map[string]*semaphore.Weighted
Expand All @@ -66,8 +66,12 @@ func (cs *ControllerServer) getConfig() *DriverConfig {
return cs.config
}

func (cs *ControllerServer) getMounter() AnyMounter {
return cs.mounter
func (cs *ControllerServer) getMounter(ctx context.Context) AnyMounter {
return cs.mounters.GetPreferredMounter(ctx)
}

func (cs *ControllerServer) getMounterByTransport(ctx context.Context, transport DataTransport) AnyMounter {
return cs.mounters.GetMounterByTransport(ctx, transport)
}

func (cs *ControllerServer) getApiStore() *ApiStore {
Expand Down Expand Up @@ -104,7 +108,7 @@ func (cs *ControllerServer) ControllerModifyVolume(context.Context, *csi.Control
panic("implement me")
}

func NewControllerServer(nodeID string, api *ApiStore, mounter AnyMounter, config *DriverConfig) *ControllerServer {
func NewControllerServer(nodeID string, api *ApiStore, mounters *MounterGroup, config *DriverConfig) *ControllerServer {
exposedCapabilities := []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
Expand All @@ -122,7 +126,7 @@ func NewControllerServer(nodeID string, api *ApiStore, mounter AnyMounter, confi
return &ControllerServer{
caps: capabilities,
nodeID: nodeID,
mounter: mounter,
mounters: mounters,
api: api,
config: config,
semaphores: make(map[string]*semaphore.Weighted),
Expand Down
6 changes: 5 additions & 1 deletion pkg/wekafs/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ const (
)

type AnyServer interface {
getMounter() AnyMounter
getMounter(ctx context.Context) AnyMounter
getMounterByTransport(ctx context.Context, transport DataTransport) AnyMounter
getApiStore() *ApiStore
getConfig() *DriverConfig
isInDevMode() bool
Expand All @@ -30,6 +31,9 @@ type AnyMounter interface {
schedulePeriodicMountGc()
getGarbageCollector() *innerPathVolGc
getTransport() DataTransport
isEnabled() bool
Enable()
Disable()
}

type nfsMountsMap map[string]int // we only follow the mountPath and number of references
Expand Down
63 changes: 63 additions & 0 deletions pkg/wekafs/mountergroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package wekafs

import (
"context"
"github.com/rs/zerolog/log"
)

type MounterGroup struct {
nfs AnyMounter
wekafs AnyMounter
}

var TransportPreference []DataTransport = []DataTransport{dataTransportWekafs, dataTransportNfs}

func NewMounterGroup(driver *WekaFsDriver) *MounterGroup {
ret := &MounterGroup{}
log.Info().Msg("Configuring Mounter Group")
ret.nfs = newNfsMounter(driver)
ret.wekafs = newWekafsMounter(driver)

if driver.config.useNfs {
log.Warn().Msg("Enforcing NFS transport due to configuration")
ret.nfs.Enable()
ret.wekafs.Disable()

} else if driver.config.allowNfsFailback {
ret.nfs.Enable()
if driver.config.isInDevMode() {
log.Info().Msg("Not Enforcing NFS transport due to dev mode")
} else {
if !isWekaInstalled() {
ret.nfs.Enable()
ret.wekafs.Disable()
log.Warn().Msg("Weka Driver not found. Failing back to NFS transport")
}
}
}
log.Info().Msg("Enforcing WekaFS transport")
return ret
}

func (mg *MounterGroup) GetMounterByTransport(ctx context.Context, transport DataTransport) AnyMounter {
logger := log.Ctx(ctx)
if transport == dataTransportNfs {
return mg.nfs
} else if transport == dataTransportWekafs {
return mg.wekafs
} else {
logger.Error().Msgf("Unknown transport type: %s", transport)
return nil
}
}

func (mg *MounterGroup) GetPreferredMounter(ctx context.Context) AnyMounter {
for _, t := range TransportPreference {
m := mg.GetMounterByTransport(ctx, t)
if m.isEnabled() {
return m
}
}
log.Ctx(ctx).Error().Msg("No enabled mounter found")
return nil
}
19 changes: 16 additions & 3 deletions pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"k8s.io/mount-utils"
"path"
"strings"
"sync"
"time"
Expand All @@ -21,25 +22,37 @@ type nfsMounter struct {
clientGroupName string
nfsProtocolVersion string
exclusiveMountOptions []mutuallyExclusiveMountOptionSet
enabled bool
}

func (m *nfsMounter) getGarbageCollector() *innerPathVolGc {
return m.gc
}

func (n *nfsMounter) isEnabled() bool {
return n.enabled
}

func (n *nfsMounter) Enable() {
n.enabled = true
}

func (n *nfsMounter) Disable() {
n.enabled = false
}

func newNfsMounter(driver *WekaFsDriver) *nfsMounter {
var selinuxSupport *bool
if driver.selinuxSupport {
log.Debug().Msg("SELinux support is forced")
selinuxSupport = &[]bool{true}[0]
}
mounter := &nfsMounter{mountMap: make(nfsMountsMap), debugPath: driver.debugPath, selinuxSupport: selinuxSupport, exclusiveMountOptions: driver.config.mutuallyExclusiveOptions}
mounter := &nfsMounter{mountMap: make(nfsMountsMap), debugPath: driver.debugPath, selinuxSupport: selinuxSupport, exclusiveMountOptions: driver.config.mutuallyExclusiveOptions, enabled: false}
mounter.gc = initInnerPathVolumeGc(mounter)
mounter.gc.config = driver.config
mounter.schedulePeriodicMountGc()
mounter.clientGroupName = driver.config.clientGroupName
mounter.nfsProtocolVersion = driver.config.nfsProtocolVersion

return mounter
}

Expand All @@ -53,7 +66,7 @@ func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount {
kMounter: m.kMounter,
fsName: fsName,
debugPath: m.debugPath,
mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId,
mountPoint: path.Join(MountBasePath, string(m.getTransport()), getAsciiPart(fsName, 64)+"-"+uniqueId),
mountOptions: options,
clientGroupName: m.clientGroupName,
protocolVersion: apiclient.NfsVersionString(fmt.Sprintf("V%s", m.nfsProtocolVersion)),
Expand Down
16 changes: 10 additions & 6 deletions pkg/wekafs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type NodeServer struct {
caps []*csi.NodeServiceCapability
nodeID string
maxVolumesPerNode int64
mounter AnyMounter
mounters *MounterGroup
api *ApiStore
config *DriverConfig
semaphores map[string]*semaphore.Weighted
Expand All @@ -82,8 +82,12 @@ func (ns *NodeServer) getApiStore() *ApiStore {
return ns.api
}

func (ns *NodeServer) getMounter() AnyMounter {
return ns.mounter
func (ns *NodeServer) getMounter(ctx context.Context) AnyMounter {
return ns.mounters.GetPreferredMounter(ctx)
}

func (ns *NodeServer) getMounterByTransport(ctx context.Context, transport DataTransport) AnyMounter {
return ns.mounters.GetMounterByTransport(ctx, transport)
}

//goland:noinspection GoUnusedParameter
Expand Down Expand Up @@ -189,7 +193,7 @@ func getVolumeStats(volumePath string) (volumeStats *VolumeStats, err error) {
return &VolumeStats{capacityBytes, usedBytes, availableBytes, inodes, inodesUsed, inodesFree}, nil
}

func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounter AnyMounter, config *DriverConfig) *NodeServer {
func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounters *MounterGroup, config *DriverConfig) *NodeServer {
//goland:noinspection GoBoolExpressions
return &NodeServer{
caps: getNodeServiceCapabilities(
Expand All @@ -201,7 +205,7 @@ func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounte
),
nodeID: nodeId,
maxVolumesPerNode: maxVolumesPerNode,
mounter: mounter,
mounters: mounters,
api: api,
config: config,
semaphores: make(map[string]*semaphore.Weighted),
Expand Down Expand Up @@ -530,7 +534,7 @@ func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoReque
}
// this will either overwrite or add the keys based on the driver name
segments[fmt.Sprintf(TopologyLabelNodePattern, driverName)] = ns.nodeID
segments[fmt.Sprintf(TopologyLabelTransportPattern, driverName)] = string(ns.getMounter().getTransport())
segments[fmt.Sprintf(TopologyLabelTransportPattern, driverName)] = string(ns.getMounter(ctx).getTransport()) // for backward compatibility, return the preferred transport
segments[fmt.Sprintf(TopologyLabelWekaLocalPattern, driverName)] = "true"

topology := &csi.Topology{
Expand Down
14 changes: 13 additions & 1 deletion pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/status"
timestamp "google.golang.org/protobuf/types/known/timestamppb"
"os"
"path"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -318,7 +319,7 @@ func GetMountIpFromActualMountPoint(mountPointBase string) (string, error) {
fields := strings.Fields(scanner.Text())
if len(fields) >= 3 && strings.HasPrefix(fields[1], fmt.Sprintf("%s-", mountPointBase)) {
actualMountPoint = fields[1]
return strings.TrimLeft(actualMountPoint, mountPointBase+"-"), nil
return strings.TrimLeft(actualMountPoint, string(mountPointBase+"-")), nil
}
}
return "", errors.New("mount point not found")
Expand Down Expand Up @@ -578,3 +579,14 @@ func getSelinuxStatus(ctx context.Context) bool {
}
return false
}

func getDataTransportFromMountPath(mountPoint string) DataTransport {
if strings.HasPrefix(mountPoint, path.Join(MountBasePath, string(dataTransportNfs))) {
return dataTransportWekafs
}
if strings.HasPrefix(mountPoint, path.Join(MountBasePath, string(dataTransportWekafs))) {
return dataTransportWekafs
}
// just default
return dataTransportWekafs
}
16 changes: 10 additions & 6 deletions pkg/wekafs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func (v *Volume) updateCapacityXattr(ctx context.Context, enforceCapacity *bool,

func (v *Volume) Trash(ctx context.Context) error {
if v.requiresGc() {
return v.server.getMounter().getGarbageCollector().triggerGcVolume(ctx, v)
return v.server.getMounter(ctx).getGarbageCollector().triggerGcVolume(ctx, v)
}
return v.Delete(ctx)
}
Expand Down Expand Up @@ -841,12 +841,12 @@ func (v *Volume) MountUnderlyingFS(ctx context.Context) (error, UnmountFunc) {
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
if v.server.getMounter() == nil {
if v.server.getMounter(ctx) == nil {
return errors.New("could not mount volume, mounter not in context"), func() {}
}

mountOpts := v.getMountOptions(ctx)
mount, err, unmountFunc := v.server.getMounter().mountWithOptions(ctx, v.FilesystemName, mountOpts, v.apiClient)
mount, err, unmountFunc := v.server.getMounter(ctx).mountWithOptions(ctx, v.FilesystemName, mountOpts, v.apiClient)
retUmountFunc := func() {}
if err == nil {
v.mountPath = mount
Expand All @@ -868,12 +868,16 @@ func (v *Volume) UnmountUnderlyingFS(ctx context.Context) error {
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)

if v.server.getMounter() == nil {
if v.server.getMounter(ctx) == nil {
Die("Volume unmount could not be done since mounter not defined on it")
}

mountOpts := v.getMountOptions(ctx)
err := v.server.getMounter().unmountWithOptions(ctx, v.FilesystemName, mountOpts)
// use the correct mounter based on data transport derived from mountPath
dt := getDataTransportFromMountPath(v.mountPath)
mounter := v.server.getMounterByTransport(ctx, dt)

err := mounter.unmountWithOptions(ctx, v.FilesystemName, mountOpts)
if err == nil {
v.mountPath = ""
} else {
Expand Down Expand Up @@ -1457,7 +1461,7 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error {
return nil
}
if !fsObj.IsRemoving { // if filesystem is already removing, just wait
if v.server.getMounter().getTransport() == dataTransportNfs {
if v.server.getMounter(ctx).getTransport() == dataTransportNfs {
logger.Trace().Str("filesystem", v.FilesystemName).Msg("Ensuring no NFS permissions exist that could block filesystem deletion")
err := v.apiClient.EnsureNoNfsPermissionsForFilesystem(ctx, fsObj.Name)
if err != nil {
Expand Down
26 changes: 5 additions & 21 deletions pkg/wekafs/wekafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"syscall"
)

const MountBasePath = "/run/weka-fs-mounts/"

var DefaultVolumePermissions fs.FileMode = 0750

type WekaFsDriver struct {
Expand Down Expand Up @@ -297,7 +299,7 @@ func NewWekaFsDriver(
}

func (driver *WekaFsDriver) Run(ctx context.Context) {
mounter := driver.NewMounter()
mounters := NewMounterGroup(driver)

// Create GRPC servers

Expand All @@ -308,7 +310,7 @@ func (driver *WekaFsDriver) Run(ctx context.Context) {
if driver.csiMode == CsiModeController || driver.csiMode == CsiModeAll {
log.Info().Msg("Loading ControllerServer")
// bring up controller part
driver.cs = NewControllerServer(driver.nodeID, driver.api, mounter, driver.config)
driver.cs = NewControllerServer(driver.nodeID, driver.api, mounters, driver.config)
} else {
driver.cs = &ControllerServer{}
}
Expand All @@ -319,7 +321,7 @@ func (driver *WekaFsDriver) Run(ctx context.Context) {
log.Info().Msg("Cleaning up node stale labels")
driver.CleanupNodeLabels(ctx)
log.Info().Msg("Loading NodeServer")
driver.ns = NewNodeServer(driver.nodeID, driver.maxVolumesPerNode, driver.api, mounter, driver.config)
driver.ns = NewNodeServer(driver.nodeID, driver.maxVolumesPerNode, driver.api, mounters, driver.config)
} else {
driver.ns = &NodeServer{}
}
Expand Down Expand Up @@ -421,21 +423,3 @@ func GetCsiPluginMode(mode *string) CsiPluginMode {
return ""
}
}

func (driver *WekaFsDriver) NewMounter() AnyMounter {
log.Info().Msg("Configuring Mounter")
if driver.config.useNfs {
log.Warn().Msg("Enforcing NFS transport due to configuration")
return newNfsMounter(driver)
}
if driver.config.allowNfsFailback && !isWekaInstalled() {
if driver.config.isInDevMode() {
log.Info().Msg("Not Enforcing NFS transport due to dev mode")
} else {
log.Warn().Msg("Weka Driver not found. Failing back to NFS transport")
return newNfsMounter(driver)
}
}
log.Info().Msg("Enforcing WekaFS transport")
return newWekafsMounter(driver)
}
Loading

0 comments on commit 36797e7

Please sign in to comment.