Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(crypto): v2 volume encryption #3438

Merged
merged 3 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions controller/share_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@ func (c *ShareManagerController) createShareManagerPod(sm *longhorn.ShareManager
string(secret.Data[types.CryptoPBKDF]))
}

manifest := c.createPodManifest(sm, annotations, tolerations, affinity, imagePullPolicy, nil, registrySecret,
manifest := c.createPodManifest(sm, volume.Spec.DataEngine, annotations, tolerations, affinity, imagePullPolicy, nil, registrySecret,
priorityClass, nodeSelector, fsType, mountOptions, cryptoKey, cryptoParams, nfsConfig)

storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
Expand Down Expand Up @@ -1411,13 +1411,13 @@ func (c *ShareManagerController) createLeaseManifest(sm *longhorn.ShareManager)
return lease
}

func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, annotations map[string]string, tolerations []corev1.Toleration,
func (c *ShareManagerController) createPodManifest(sm *longhorn.ShareManager, dataEngine longhorn.DataEngineType, annotations map[string]string, tolerations []corev1.Toleration,
affinity *corev1.Affinity, pullPolicy corev1.PullPolicy, resourceReq *corev1.ResourceRequirements, registrySecret, priorityClass string,
nodeSelector map[string]string, fsType string, mountOptions []string, cryptoKey string, cryptoParams *crypto.EncryptParams,
nfsConfig *nfsServerConfig) *corev1.Pod {

// command args for the share-manager
args := []string{"--debug", "daemon", "--volume", sm.Name}
args := []string{"--debug", "daemon", "--volume", sm.Name, "--data-engine", string(dataEngine)}

if len(fsType) > 0 {
args = append(args, "--fs", fsType)
Expand Down
47 changes: 34 additions & 13 deletions csi/crypto/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/longhorn/longhorn-manager/types"

lhns "github.com/longhorn/go-common-libs/ns"
lhtypes "github.com/longhorn/go-common-libs/types"
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

const (
mapperFilePathPrefix = "/dev/mapper"
mapperV2VolumeSuffix = "-encrypted"

CryptoKeyDefaultCipher = "aes-xts-plain64"
CryptoKeyDefaultHash = "sha256"
Expand Down Expand Up @@ -69,7 +73,13 @@ func (cp *EncryptParams) GetPBKDF() string {
}

// VolumeMapper returns the path for mapped encrypted device.
func VolumeMapper(volume string) string {
func VolumeMapper(volume, dataEngine string) string {
if types.IsDataEngineV2(longhorn.DataEngineType(dataEngine)) {
// v2 volume will use a dm device as default to control IO path when attaching.
// This dm device will be created with the same name as the volume name.
// The encrypted volume will be created with the volume name with "-encrypted" suffix to resolve the naming conflict.
return path.Join(mapperFilePathPrefix, getEncryptVolumeName(volume, dataEngine))
}
return path.Join(mapperFilePathPrefix, volume)
}

Expand All @@ -95,9 +105,10 @@ func EncryptVolume(devicePath, passphrase string, cryptoParams *EncryptParams) e
}

// OpenVolume opens volume so that it can be used by the client.
func OpenVolume(volume, devicePath, passphrase string) error {
if isOpen, _ := IsDeviceOpen(VolumeMapper(volume)); isOpen {
logrus.Infof("Device %s is already opened at %s", devicePath, VolumeMapper(volume))
// devicePath is the path of the volume on the host that will be opened for instance '/dev/longhorn/volume1'
func OpenVolume(volume, dataEngine, devicePath, passphrase string) error {
if isOpen, _ := IsDeviceOpen(VolumeMapper(volume, dataEngine)); isOpen {
logrus.Infof("Device %s is already opened at %s", devicePath, VolumeMapper(volume, dataEngine))
return nil
}

Expand All @@ -107,29 +118,38 @@ func OpenVolume(volume, devicePath, passphrase string) error {
return err
}

logrus.Infof("Opening device %s with LUKS on %s", devicePath, volume)
_, err = nsexec.LuksOpen(volume, devicePath, passphrase, lhtypes.LuksTimeout)
encryptVolumeName := getEncryptVolumeName(volume, dataEngine)
logrus.Infof("Opening device %s with LUKS on %s", devicePath, encryptVolumeName)
_, err = nsexec.LuksOpen(encryptVolumeName, devicePath, passphrase, lhtypes.LuksTimeout)
if err != nil {
logrus.WithError(err).Warnf("Failed to open LUKS device %s", devicePath)
logrus.WithError(err).Warnf("Failed to open LUKS device %s to %s", devicePath, encryptVolumeName)
}
return err
}

func getEncryptVolumeName(volume, dataEngine string) string {
if types.IsDataEngineV2(longhorn.DataEngineType(dataEngine)) {
return volume + mapperV2VolumeSuffix
}
return volume
}

// CloseVolume closes encrypted volume so it can be detached.
func CloseVolume(volume string) error {
func CloseVolume(volume, dataEngine string) error {
namespaces := []lhtypes.Namespace{lhtypes.NamespaceMnt, lhtypes.NamespaceIpc}
nsexec, err := lhns.NewNamespaceExecutor(lhtypes.ProcessNone, lhtypes.HostProcDirectory, namespaces)
if err != nil {
return err
}

logrus.Infof("Closing LUKS device %s", volume)
_, err = nsexec.LuksClose(volume, lhtypes.LuksTimeout)
encryptVolumeName := getEncryptVolumeName(volume, dataEngine)
logrus.Infof("Closing LUKS device %s", encryptVolumeName)
_, err = nsexec.LuksClose(encryptVolumeName, lhtypes.LuksTimeout)
return err
}

func ResizeEncryptoDevice(volume, passphrase string) error {
if isOpen, err := IsDeviceOpen(VolumeMapper(volume)); err != nil {
func ResizeEncryptoDevice(volume, dataEngine, passphrase string) error {
if isOpen, err := IsDeviceOpen(VolumeMapper(volume, dataEngine)); err != nil {
return err
} else if !isOpen {
return fmt.Errorf("volume %v encrypto device is closed for resizing", volume)
Expand All @@ -141,7 +161,8 @@ func ResizeEncryptoDevice(volume, passphrase string) error {
return err
}

_, err = nsexec.LuksResize(volume, passphrase, lhtypes.LuksTimeout)
encryptVolumeName := getEncryptVolumeName(volume, dataEngine)
_, err = nsexec.LuksResize(encryptVolumeName, passphrase, lhtypes.LuksTimeout)
return err
}

Expand Down
39 changes: 21 additions & 18 deletions csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Errorf(codes.Internal, "failed to evaluate device filesystem %v format: %v", devicePath, err)
}

log.Infof("Volume %v device %v contains filesystem of format %v", volumeID, devicePath, diskFormat)
dataEngine := volume.DataEngine
log.Infof("Volume %v (%v) device %v contains filesystem of format %v", volumeID, dataEngine, devicePath, diskFormat)

if volume.Encrypted {
secrets := req.GetSecrets()
Expand All @@ -515,7 +516,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
}

cryptoDevice := crypto.VolumeMapper(volumeID)
cryptoDevice := crypto.VolumeMapper(volumeID, dataEngine)
log.Infof("Volume %s requires crypto device %s", volumeID, cryptoDevice)

// check if the crypto device is open at the null path.
Expand All @@ -525,12 +526,12 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Errorf(codes.Internal, "failed to check if the crypto device %s for volume %s is mapped to the null path: %v", cryptoDevice, volumeID, err.Error())
} else if mappedToNullPath {
log.Warnf("Closing active crypto device %s for volume %s since the volume is not closed properly before", cryptoDevice, volumeID)
if err := crypto.CloseVolume(volumeID); err != nil {
if err := crypto.CloseVolume(volumeID, dataEngine); err != nil {
return nil, status.Errorf(codes.Internal, "failed to close active crypto device %s for volume %s: %v ", cryptoDevice, volumeID, err.Error())
}
}

if err := crypto.OpenVolume(volumeID, devicePath, passphrase); err != nil {
if err := crypto.OpenVolume(volumeID, dataEngine, devicePath, passphrase); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down Expand Up @@ -628,19 +629,20 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// optionally try to retrieve the volume and check if it's an RWX volume
// if it is we let the share-manager clean up the crypto device
volume, _ := ns.apiClient.Volume.ById(volumeID)
if volume == nil || types.IsDataEngineV1(longhorn.DataEngineType(volume.DataEngine)) {
// Currently, only "RWO v1 volumes" and "block device with v1 volume.Migratable is true" supports encryption.
sharedAccess := requiresSharedAccess(volume, nil)
cleanupCryptoDevice := !sharedAccess || (sharedAccess && volume.Migratable)
if cleanupCryptoDevice {
cryptoDevice := crypto.VolumeMapper(volumeID)
if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil {
dataEngine := string(longhorn.DataEngineTypeV1)
if volume != nil {
dataEngine = volume.DataEngine
}
sharedAccess := requiresSharedAccess(volume, nil)
cleanupCryptoDevice := !sharedAccess || (sharedAccess && volume.Migratable)
if cleanupCryptoDevice {
cryptoDevice := crypto.VolumeMapper(volumeID, dataEngine)
if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if isOpen {
log.Infof("Volume %s closing active crypto device %s", volumeID, cryptoDevice)
if err := crypto.CloseVolume(volumeID, dataEngine); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if isOpen {
log.Infof("Volume %s closing active crypto device %s", volumeID, cryptoDevice)
if err := crypto.CloseVolume(volumeID); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
}
Expand Down Expand Up @@ -820,14 +822,15 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return nil, fmt.Errorf("unknown filesystem type for volume %v node expansion", volumeID)
}

dataEngine := volume.DataEngine
devicePath, err = func() (string, error) {
if !volume.Encrypted {
return devicePath, nil
}
if diskFormat != "crypto_LUKS" {
return "", status.Errorf(codes.InvalidArgument, "unsupported disk encryption format %v", diskFormat)
}
devicePath = crypto.VolumeMapper(volumeID)
devicePath = crypto.VolumeMapper(volumeID, dataEngine)

// Need to enable feature gate in v1.25:
// https://github.com/kubernetes/enhancements/issues/3107
Expand All @@ -847,7 +850,7 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}

// blindly resize the encrypto device
if err := crypto.ResizeEncryptoDevice(volumeID, passphrase); err != nil {
if err := crypto.ResizeEncryptoDevice(volumeID, dataEngine, passphrase); err != nil {
return "", status.Errorf(codes.InvalidArgument, "failed to resize crypto device %v for volume %v node expansion: %v", devicePath, volumeID, err)
}

Expand Down
3 changes: 0 additions & 3 deletions webhook/resources/volume/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,368 +48,365 @@
}
}

func (v *volumeValidator) Create(request *admission.Request, newObj runtime.Object) error {
volume, ok := newObj.(*longhorn.Volume)
if !ok {
return werror.NewInvalidError(fmt.Sprintf("%v is not a *longhorn.Volume", newObj), "")
}

if !util.ValidateName(volume.Name) {
return werror.NewInvalidError(fmt.Sprintf("invalid name %v", volume.Name), "")
}

if err := types.ValidateDataLocality(volume.Spec.DataLocality); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateAccessMode(volume.Spec.AccessMode); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := validateReplicaCount(volume.Spec.DataLocality, volume.Spec.NumberOfReplicas); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateDataLocalityAndReplicaCount(volume.Spec.DataLocality, volume.Spec.NumberOfReplicas); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateDataLocalityAndAccessMode(volume.Spec.DataLocality, volume.Spec.Migratable, volume.Spec.AccessMode); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaAutoBalance(volume.Spec.ReplicaAutoBalance); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateUnmapMarkSnapChainRemoved(volume.Spec.DataEngine, volume.Spec.UnmapMarkSnapChainRemoved); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaSoftAntiAffinity(volume.Spec.ReplicaSoftAntiAffinity); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaZoneSoftAntiAffinity(volume.Spec.ReplicaZoneSoftAntiAffinity); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaDiskSoftAntiAffinity(volume.Spec.ReplicaDiskSoftAntiAffinity); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if volume.Spec.BackingImage != "" {
backingImage, err := v.ds.GetBackingImage(volume.Spec.BackingImage)
if err != nil {
return werror.NewInvalidError(err.Error(), "")
}
if backingImage != nil {
if backingImage.Spec.DataEngine != volume.Spec.DataEngine {
return werror.NewInvalidError("volume should have the same data engine as the backing image", "")
}
}
// For qcow2 files, VirtualSize may be larger than the physical image size on disk.
// For raw files, `qemu-img info` will report VirtualSize as being the same as the physical file size.
// Volume size should not be smaller than the backing image size.
if volume.Spec.Size < backingImage.Status.VirtualSize {
return werror.NewInvalidError("volume size should be larger than the backing image size", "")
}
}

if volume.Spec.Image == "" {
return werror.NewInvalidError("BUG: Invalid empty Setting.EngineImage", "")
}

if !volume.Spec.Standby {
if volume.Spec.Frontend != longhorn.VolumeFrontendBlockDev &&
volume.Spec.Frontend != longhorn.VolumeFrontendISCSI &&
volume.Spec.Frontend != longhorn.VolumeFrontendNvmf {
return werror.NewInvalidError(fmt.Sprintf("invalid volume frontend specified: %v", volume.Spec.Frontend), "")
}
}

if volume.Spec.Migratable && volume.Spec.AccessMode != longhorn.AccessModeReadWriteMany {
return werror.NewInvalidError("migratable volumes are only supported in ReadWriteMany (rwx) access mode", "")
}

// Check engine version before disable revision counter
if volume.Spec.RevisionCounterDisabled {
if ok, err := v.canDisableRevisionCounter(volume.Spec.Image, volume.Spec.DataEngine); !ok {
err := errors.Wrapf(err, "can not create volume with current engine image that doesn't support disable revision counter")
return werror.NewInvalidError(err.Error(), "")
}
}

if err := datastore.CheckVolume(volume); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

err := wcommon.ValidateRequiredDataEngineEnabled(v.ds, volume.Spec.DataEngine)
if err != nil {
return err
}

if err := validateSnapshotMaxCount(volume.Spec.SnapshotMaxCount); err != nil {
return werror.NewInvalidError(err.Error(), "spec.snapshotMaxCount")
}

if err := validateSnapshotMaxSize(volume.Spec.Size, volume.Spec.SnapshotMaxSize); err != nil {
return werror.NewInvalidError(err.Error(), "spec.snapshotMaxSize")
}

if err := v.ds.CheckDataEngineImageCompatiblityByImage(volume.Spec.Image, volume.Spec.DataEngine); err != nil {
return werror.NewInvalidError(err.Error(), "volume.spec.image")
}

if err := v.validateBackupTarget("", volume.Spec.BackupTargetName); err != nil {
return werror.NewInvalidError(err.Error(), "spec.backupTargetName")
}

// TODO: remove this check when we support the following features for SPDK volumes
if types.IsDataEngineV2(volume.Spec.DataEngine) {
if volume.Spec.Encrypted {
return werror.NewInvalidError("encrypted volume is not supported for data engine v2", "")
}
if types.IsDataFromVolume(volume.Spec.DataSource) {
return werror.NewInvalidError("clone is not supported for data engine v2", "")
}
}

return nil
}

Check notice on line 177 in webhook/resources/volume/validator.go

View check run for this annotation

codefactor.io / CodeFactor

webhook/resources/volume/validator.go#L51-L177

Complex Method
func (v *volumeValidator) Update(request *admission.Request, oldObj runtime.Object, newObj runtime.Object) error {
oldVolume, ok := oldObj.(*longhorn.Volume)
if !ok {
return werror.NewInvalidError(fmt.Sprintf("%v is not a *longhorn.Volume", oldObj), "")
}
newVolume, ok := newObj.(*longhorn.Volume)
if !ok {
return werror.NewInvalidError(fmt.Sprintf("%v is not a *longhorn.Volume", newObj), "")
}

if err := v.validateExpansionSize(oldVolume, newVolume); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := validateDataLocalityUpdate(oldVolume, newVolume); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := validateReplicaCount(newVolume.Spec.DataLocality, newVolume.Spec.NumberOfReplicas); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateAccessMode(newVolume.Spec.AccessMode); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaAutoBalance(newVolume.Spec.ReplicaAutoBalance); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateUnmapMarkSnapChainRemoved(newVolume.Spec.DataEngine, newVolume.Spec.UnmapMarkSnapChainRemoved); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaSoftAntiAffinity(newVolume.Spec.ReplicaSoftAntiAffinity); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaZoneSoftAntiAffinity(newVolume.Spec.ReplicaZoneSoftAntiAffinity); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if err := types.ValidateReplicaDiskSoftAntiAffinity(newVolume.Spec.ReplicaDiskSoftAntiAffinity); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.Image != newVolume.Spec.Image {
if err := v.ds.CheckDataEngineImageCompatiblityByImage(newVolume.Spec.Image, newVolume.Spec.DataEngine); err != nil {
return werror.NewInvalidError(err.Error(), "volume.spec.image")
}
}

if newVolume.Spec.DataLocality == longhorn.DataLocalityStrictLocal {
// Check if the strict-local volume can attach to newVolume.Spec.NodeID
if oldVolume.Spec.NodeID != newVolume.Spec.NodeID && newVolume.Spec.NodeID != "" {
ok, err := v.hasLocalReplicaOnSameNodeAsStrictLocalVolume(newVolume)
if !ok {
err = errors.Wrapf(err, "failed to check if %v volume %v and its replica are on the same node",
longhorn.DataLocalityStrictLocal, newVolume.Name)
return werror.NewInvalidError(err.Error(), "")
}
}
}

if err := datastore.CheckVolume(newVolume); err != nil {
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.BackupCompressionMethod != "" {
if oldVolume.Spec.BackupCompressionMethod != newVolume.Spec.BackupCompressionMethod {
err := fmt.Errorf("changing backup compression method for volume %v is not supported", oldVolume.Name)
return werror.NewInvalidError(err.Error(), "")
}
}

if oldVolume.Spec.DataEngine != "" {
if oldVolume.Spec.DataEngine != newVolume.Spec.DataEngine {
err := fmt.Errorf("changing data engine for volume %v is not supported", oldVolume.Name)
return werror.NewInvalidError(err.Error(), "")
}
}

if types.IsDataEngineV2(newVolume.Spec.DataEngine) {
// TODO: remove this check when we support the following features for SPDK volumes
if oldVolume.Spec.Size != newVolume.Spec.Size {
err := fmt.Errorf("changing volume size for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.BackingImage != newVolume.Spec.BackingImage {
err := fmt.Errorf("changing backing image for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.Encrypted != newVolume.Spec.Encrypted {
err := fmt.Errorf("changing encryption for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.SnapshotDataIntegrity != newVolume.Spec.SnapshotDataIntegrity {
err := fmt.Errorf("changing snapshot data integrity for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.ReplicaAutoBalance != newVolume.Spec.ReplicaAutoBalance {
err := fmt.Errorf("changing replica auto balance for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.RestoreVolumeRecurringJob != newVolume.Spec.RestoreVolumeRecurringJob {
err := fmt.Errorf("changing restore volume recurring job for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.ReplicaSoftAntiAffinity != newVolume.Spec.ReplicaSoftAntiAffinity {
err := fmt.Errorf("changing replica soft anti-affinity for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.ReplicaZoneSoftAntiAffinity != newVolume.Spec.ReplicaZoneSoftAntiAffinity {
err := fmt.Errorf("changing replica zone soft anti-affinity for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}

if oldVolume.Spec.ReplicaDiskSoftAntiAffinity != newVolume.Spec.ReplicaDiskSoftAntiAffinity {
if oldVolume.Spec.ReplicaDiskSoftAntiAffinity != "" && newVolume.Spec.ReplicaDiskSoftAntiAffinity != longhorn.ReplicaDiskSoftAntiAffinityDefault {
err := fmt.Errorf("changing replica disk soft anti-affinity for volume %v is not supported for data engine %v",
newVolume.Name, newVolume.Spec.DataEngine)
return werror.NewInvalidError(err.Error(), "")
}
}
}

// prevent the changing v.Spec.MigrationNodeID to different node when the volume is doing live migration (when v.Status.CurrentMigrationNodeID != "")
if newVolume.Status.CurrentMigrationNodeID != "" &&
newVolume.Spec.MigrationNodeID != oldVolume.Spec.MigrationNodeID &&
newVolume.Spec.MigrationNodeID != newVolume.Status.CurrentMigrationNodeID &&
newVolume.Spec.MigrationNodeID != "" {
err := fmt.Errorf("cannot change v.Spec.MigrationNodeID to node %v when the volume is doing live migration to node %v ", newVolume.Spec.MigrationNodeID, newVolume.Status.CurrentMigrationNodeID)
return werror.NewInvalidError(err.Error(), "")
}

if err := validateSnapshotMaxCount(newVolume.Spec.SnapshotMaxCount); err != nil {
return werror.NewInvalidError(err.Error(), "spec.snapshotMaxCount")
}

if err := validateSnapshotMaxSize(newVolume.Spec.Size, newVolume.Spec.SnapshotMaxSize); err != nil {
return werror.NewInvalidError(err.Error(), "spec.snapshotMaxSize")
}

if err := v.validateBackupTarget(oldVolume.Spec.BackupTargetName, newVolume.Spec.BackupTargetName); err != nil {
return werror.NewInvalidError(err.Error(), "spec.backupTargetName")
}

if (oldVolume.Spec.SnapshotMaxCount != newVolume.Spec.SnapshotMaxCount) ||
(oldVolume.Spec.SnapshotMaxSize != newVolume.Spec.SnapshotMaxSize) {
if err := v.validateUpdatingSnapshotMaxCountAndSize(oldVolume, newVolume); err != nil {
return err
}
}
return nil
}

Check notice on line 348 in webhook/resources/volume/validator.go

View check run for this annotation

codefactor.io / CodeFactor

webhook/resources/volume/validator.go#L178-L348

Complex Method
func (v *volumeValidator) validateExpansionSize(oldVolume *longhorn.Volume, newVolume *longhorn.Volume) error {
oldSize := oldVolume.Spec.Size
newSize := newVolume.Spec.Size
if newSize == oldSize {
return nil
}
if newSize < oldSize && !newVolume.Status.ExpansionRequired {
return fmt.Errorf("shrinking volume %v size from %v to %v is not supported", newVolume.Name, oldSize, newSize)
}

replicas, err := v.ds.ListVolumeReplicasRO(newVolume.Name)
if err != nil {
return err
}
for _, replica := range replicas {
diskUUID := replica.Spec.DiskID
node, diskName, err := v.ds.GetReadyDiskNode(diskUUID)
if err != nil {
return err
}
diskStatus := node.Status.DiskStatus[diskName]
if !datastore.IsSupportedVolumeSize(replica.Spec.DataEngine, diskStatus.FSType, newSize) {
return fmt.Errorf("file system %s does not support volume size %v", diskStatus.Type, newSize)
}
break
}

newKubernetesStatus := &newVolume.Status.KubernetesStatus
namespace := newKubernetesStatus.Namespace
pvcName := newKubernetesStatus.PVCName
if pvcName == "" || newKubernetesStatus.LastPVCRefAt != "" {
return nil
}

pvc, err := v.ds.GetPersistentVolumeClaim(namespace, pvcName)
if err != nil {
return err
}

pvcSCName := *pvc.Spec.StorageClassName
pvcStorageClass, err := v.ds.GetStorageClassRO(pvcSCName)
if err != nil {
return err
}
if pvcStorageClass.AllowVolumeExpansion != nil && !*pvcStorageClass.AllowVolumeExpansion {
return fmt.Errorf("storage class %v of PVC %v does not allow the volume expansion", pvcSCName, pvcName)
}

pvcSpecValue, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
if !ok {
return fmt.Errorf("cannot get request storage of PVC %v", pvcName)
}

requestedSize := resource.MustParse(strconv.FormatInt(newSize, 10))
if pvcSpecValue.Cmp(requestedSize) < 0 {
return fmt.Errorf("PVC %v size should be expanded from %v to %v first", pvcName, pvcSpecValue.Value(), requestedSize.Value())
}

return nil
}

Check notice on line 409 in webhook/resources/volume/validator.go

View check run for this annotation

codefactor.io / CodeFactor

webhook/resources/volume/validator.go#L349-L409

Complex Method
func (v *volumeValidator) hasLocalReplicaOnSameNodeAsStrictLocalVolume(volume *longhorn.Volume) (bool, error) {
replicas, err := v.ds.ListVolumeReplicas(volume.Name)
if err != nil {
Expand Down Expand Up @@ -511,51 +508,51 @@
return nil
}

func (v *volumeValidator) validateUpdatingSnapshotMaxCountAndSize(oldVolume, newVolume *longhorn.Volume) error {
var (
currentSnapshotCount int
currentTotalSnapshotSize int64
engine *longhorn.Engine
)
engines, err := v.ds.ListVolumeEngines(newVolume.Name)
if err != nil && !datastore.ErrorIsNotFound(err) {
return werror.NewInternalError(fmt.Sprintf("can't list engines for volume %s, err %v", newVolume.Name, err))
} else if len(engines) == 0 {
return nil
}

// It is dangerous to update snapshotMaxCount and snapshotMaxSize while migrating. However, when upgrading to a
// Longhorn version that includes these fields from one that does not, we automatically set snapshotMaxCount = 250,
// and we do not want a validation failure here to stop the upgrade. We accept the change, but do not propagate it
// to engines or replicas until the migration is complete.
if len(engines) >= 2 {
if oldVolume.Spec.SnapshotMaxCount == 0 &&
newVolume.Spec.SnapshotMaxCount == types.MaxSnapshotNum &&
oldVolume.Spec.SnapshotMaxSize == newVolume.Spec.SnapshotMaxSize {
logrus.WithField("volumeName", newVolume.Name).Debugf("Allowing snapshotMaxCount of a migrating volume to change from 0 to %v during upgrade", types.MaxSnapshotNum)
} else {
return werror.NewInvalidError("can't update snapshotMaxCount or snapshotMaxSize during migration", "")
}
}

for _, e := range engines {
engine = e
}

for _, snapshotInfo := range engine.Status.Snapshots {
if snapshotInfo == nil || snapshotInfo.Removed || snapshotInfo.Name == "volume-head" {
continue
}
currentSnapshotCount++
snapshotSize, err := strconv.ParseInt(snapshotInfo.Size, 10, 64)
if err != nil {
return werror.NewInternalError(fmt.Sprintf("can't parse size %s from snapshot %s in volume %s, err %v", snapshotInfo.Size, snapshotInfo.Name, newVolume.Name, err))
}
currentTotalSnapshotSize += snapshotSize
}

if currentSnapshotCount > newVolume.Spec.SnapshotMaxCount || (newVolume.Spec.SnapshotMaxSize != 0 && currentTotalSnapshotSize > newVolume.Spec.SnapshotMaxSize) {
return werror.NewInvalidError("can't make snapshotMaxCount or snapshotMaxSize be smaller than current usage, please remove snapshots first", "")
}
return nil

Check notice on line 557 in webhook/resources/volume/validator.go

View check run for this annotation

codefactor.io / CodeFactor

webhook/resources/volume/validator.go#L511-L557

Complex Method
}
Loading