Skip to content

Commit

Permalink
Added support for k8s port forward (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
BSick7 authored Feb 1, 2025
1 parent 322f23a commit 26f7cb8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 0.0.136 (Feb 01, 2025)
* Added support for port forwarding when using `nullstone ssh` command. (`--forward <local-port>:<remote-port>`)

# 0.0.135 (Jan 06, 2025)
* Added `nullstone run` command that allows you to a start a new job/task.
* Added support for `nullstone run` to ECS/Fargate tasks and GKE jobs.
Expand Down
7 changes: 5 additions & 2 deletions gcp/gke/remoter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (r Remoter) Exec(ctx context.Context, options admin.RemoteOptions, cmd []st
ErrOut: r.OsWriters.Stderr(),
TTY: false,
}
for _, pf := range options.PortForwards {
opts.PortMappings = append(opts.PortMappings, fmt.Sprintf("%s:%s", pf.LocalPort, pf.RemotePort))
}

return ExecCommand(ctx, r.Infra, options.Pod, options.Container, cmd, opts)
}
Expand All @@ -59,8 +62,8 @@ func (r Remoter) Ssh(ctx context.Context, options admin.RemoteOptions) error {
ErrOut: r.OsWriters.Stderr(),
TTY: true,
}
if len(options.PortForwards) > 0 {
return fmt.Errorf("gke provider does not support port forwarding yet")
for _, pf := range options.PortForwards {
opts.PortMappings = append(opts.PortMappings, fmt.Sprintf("%s:%s", pf.LocalPort, pf.RemotePort))
}

return ExecCommand(ctx, r.Infra, options.Pod, options.Container, []string{"/bin/sh"}, opts)
Expand Down
11 changes: 11 additions & 0 deletions k8s/exec_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ func ExecCommand(ctx context.Context, cfg *rest.Config, podNamespace, podName, c
return fmt.Errorf("unable to execute kubernetes command: %w", err)
}

portForwarder, err := NewPortForwarder(cfg, podNamespace, podName, opts.PortMappings)
if err != nil {
return fmt.Errorf("unable to create port forwarder: %w", err)
}

return tty.Safe(func() error {
restClient, err := rest.RESTClientFor(cfg)
if err != nil {
Expand All @@ -41,6 +46,12 @@ func ExecCommand(ctx context.Context, cfg *rest.Config, podNamespace, podName, c
return fmt.Errorf("unable to create kubernetes remote executor: %w", err)
}

if portForwarder != nil {
stop := make(chan struct{}, 1)
defer close(stop)
go portForwarder.ForwardPorts(stop, opts)
}

return executor.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: opts.In,
Stdout: opts.Out,
Expand Down
1 change: 1 addition & 0 deletions k8s/exec_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ExecOptions struct {
ErrOut io.Writer
TTY bool
InterruptParent *interrupt.Handler
PortMappings []string
}

func (o *ExecOptions) CreateTTY() (term.TTY, remotecommand.TerminalSizeQueue, error) {
Expand Down
69 changes: 69 additions & 0 deletions k8s/port_forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package k8s

import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"net/http"
"os"
)

type PortForwarder struct {
Transport http.RoundTripper
Upgrader spdy.Upgrader
Request *rest.Request
}

func NewPortForwarder(cfg *rest.Config, podNamespace, podName string, portMappings []string) (*PortForwarder, error) {
if len(portMappings) < 1 {
return nil, nil
}

restClient, err := rest.RESTClientFor(cfg)
if err != nil {
return nil, fmt.Errorf("cannot create rest client: %w", err)
}
req := restClient.Post().
Resource("pods").
Namespace(podNamespace).
Name(podName).
SubResource("portforward").
VersionedParams(&corev1.PodPortForwardOptions{}, scheme.ParameterCodec)

transport, upgrader, err := spdy.RoundTripperFor(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create SPDY transport: %w", err)
}

return &PortForwarder{
Transport: transport,
Upgrader: upgrader,
Request: req,
}, nil
}

func (f *PortForwarder) ForwardPorts(stop <-chan struct{}, opts *ExecOptions) {
if len(opts.PortMappings) < 1 {
return
}

stderr := opts.ErrOut
if stderr == nil {
stderr = os.Stderr
}

ready := make(chan struct{})
dialer := spdy.NewDialer(f.Upgrader, &http.Client{Transport: f.Transport}, http.MethodPost, f.Request.URL())
fw, err := portforward.New(dialer, opts.PortMappings, stop, ready, opts.Out, opts.ErrOut)
if err != nil {
fmt.Fprintf(stderr, "error forwarding ports: %s\n", err)
return
}

if err := fw.ForwardPorts(); err != nil {
fmt.Fprintf(stderr, "port forwarding stopped: %s\n", err)
}
}

0 comments on commit 26f7cb8

Please sign in to comment.