Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes for managed transport #637

Merged
merged 6 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/onsi/gomega v1.18.1
github.com/otiai10/copy v1.7.0
github.com/spf13/pflag v1.0.5
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/api v0.73.0
gotest.tools v2.2.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,8 @@ golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064 h1:S25/rfnfsMVgORT4/J61MJ7rdyseOZOyvLIrZEZ7s6s=
golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
2 changes: 1 addition & 1 deletion internal/helm/repository/chart_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/fluxcd/pkg/version"

"github.com/fluxcd/source-controller/internal/helm"
transport "github.com/fluxcd/source-controller/internal/helm/getter"
"github.com/fluxcd/source-controller/internal/transport"
)

var ErrNoChartIndex = errors.New("no chart index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package getter
package transport

import (
"crypto/tls"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package getter
package transport

import (
"crypto/tls"
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func main() {
}()

if managed.Enabled() {
managed.InitManagedTransport()
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
}

setupLog.Info("starting manager")
Expand Down
69 changes: 44 additions & 25 deletions pkg/git/libgit2/managed/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
"time"

pool "github.com/fluxcd/source-controller/internal/transport"
git2go "github.com/libgit2/git2go/v33"
)

Expand All @@ -73,15 +72,18 @@ func registerManagedHTTP() error {
}

func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{
transport: transport,
transport: transport,
httpTransport: pool.NewOrIdle(nil),
}

return sst, nil
}

type httpSmartSubtransport struct {
transport *git2go.Transport
transport *git2go.Transport
httpTransport *http.Transport
}

func (t *httpSmartSubtransport) Action(targetUrl string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
Expand All @@ -104,25 +106,10 @@ func (t *httpSmartSubtransport) Action(targetUrl string, action git2go.SmartServ
proxyFn = http.ProxyURL(parsedUrl)
}

httpTransport := &http.Transport{
// Add the proxy to the http transport.
Proxy: proxyFn,

// Set reasonable timeouts to ensure connections are not
// left open in an idle state, nor they hang indefinitely.
//
// These are based on the official go http.DefaultTransport:
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
t.httpTransport.Proxy = proxyFn
t.httpTransport.DisableCompression = false

client, req, err := createClientRequest(targetUrl, action, httpTransport)
client, req, err := createClientRequest(targetUrl, action, t.httpTransport)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -223,10 +210,18 @@ func createClientRequest(targetUrl string, action git2go.SmartServiceAction, t *
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")

if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool")
pool.Release(t.httpTransport)
t.httpTransport = nil
}
}

type httpSmartSubtransportStream struct {
Expand Down Expand Up @@ -291,7 +286,15 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
self.resp.Body.Close()
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")

if self.resp.Body != nil {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
_, _ = io.Copy(io.Discard, self.resp.Body) // errors can be safely ignored
_ = self.resp.Body.Close() // errors can be safely ignored
}
}
}

Expand Down Expand Up @@ -354,6 +357,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
}

req.SetBasicAuth(userName, password)
traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand All @@ -362,21 +366,36 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
// GET requests will be automatically redirected.
// POST require the new destination, and also the body content.
if req.Method == "POST" && resp.StatusCode >= 301 && resp.StatusCode <= 308 {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
_, _ = io.Copy(io.Discard, resp.Body) // errors can be safely ignored

if err := resp.Body.Close(); err != nil {
return err
}

// The next try will go against the new destination
self.req.URL, err = resp.Location()
if err != nil {
return err
}

traceLog.Info("[http]: POST redirect", "URL", self.req.URL)
continue
}

// for HTTP 200, the response will be cleared up by Free()
if resp.StatusCode == http.StatusOK {
break
}

io.Copy(io.Discard, resp.Body)
defer resp.Body.Close()
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
_, _ = io.Copy(io.Discard, resp.Body) // errors can be safely ignored
if err := resp.Body.Close(); err != nil {
return err
}

return fmt.Errorf("Unhandled HTTP error %s", resp.Status)
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/git/libgit2/managed/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -34,6 +37,9 @@ var (
// regardless of the current operation (i.e. connection,
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
)

// InitManagedTransport initialises HTTP(S) and SSH managed transport
Expand All @@ -47,9 +53,14 @@ var (
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport() error {
func InitManagedTransport(log logr.Logger) error {
var err error

once.Do(func() {
log.Info("Enabling experimental managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down
Loading