Skip to content

Merge upstream changes up to commit 4ad747972 #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type ClusterConfig struct {
// If DisableInitialHostLookup then the driver will not attempt to get host info
// from the system.peers table, this will mean that the driver will connect to
// hosts supplied and will not attempt to lookup the hosts information, this will
// mean that data_centre, rack and token information will not be available and as
// mean that data_center, rack and token information will not be available and as
// such host filtering and token aware query routing will not be available.
DisableInitialHostLookup bool

Expand Down
14 changes: 10 additions & 4 deletions filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,20 @@ func DenyAllFilter() HostFilter {
})
}

// DataCentreHostFilter filters all hosts such that they are in the same data centre
// as the supplied data centre.
func DataCentreHostFilter(dataCentre string) HostFilter {
// DataCenterHostFilter filters all hosts such that they are in the same data center
// as the supplied data center.
func DataCenterHostFilter(dataCenter string) HostFilter {
return HostFilterFunc(func(host *HostInfo) bool {
return host.DataCenter() == dataCentre
return host.DataCenter() == dataCenter
})
}

// Deprecated: Use DataCenterHostFilter instead.
// DataCentreHostFilter is an alias that doesn't use the preferred spelling.
func DataCentreHostFilter(dataCenter string) HostFilter {
return DataCenterHostFilter(dataCenter)
}

// WhiteListHostFilter filters incoming hosts by checking that their address is
// in the initial hosts whitelist.
func WhiteListHostFilter(hosts ...string) HostFilter {
Expand Down
10 changes: 8 additions & 2 deletions filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ func TestFilter_DenyAll(t *testing.T) {
}
}

func TestFilter_DataCentre(t *testing.T) {
f := DataCentreHostFilter("dc1")
func TestFilter_DataCenter(t *testing.T) {
f := DataCenterHostFilter("dc1")
fDeprecated := DataCentreHostFilter("dc1")

tests := [...]struct {
dc string
accept bool
Expand All @@ -116,5 +118,9 @@ func TestFilter_DataCentre(t *testing.T) {
} else if test.accept {
t.Errorf("%d: should have been accepted but wasn't", i)
}

if f.Accept(&HostInfo{dataCenter: test.dc}) != fDeprecated.Accept(&HostInfo{dataCenter: test.dc}) {
t.Errorf("%d: DataCenterHostFilter and DataCentreHostFilter should be the same", i)
}
}
}
42 changes: 29 additions & 13 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ import (
"time"
)

var ErrCannotFindHost = errors.New("cannot find host")
var ErrHostAlreadyExists = errors.New("host already exists")
var (
ErrCannotFindHost = errors.New("cannot find host")
ErrHostAlreadyExists = errors.New("host already exists")
)

type nodeState int32

Expand All @@ -55,6 +57,7 @@ const (

type cassVersion struct {
Major, Minor, Patch int
Qualifier string
}

func (c *cassVersion) Set(v string) error {
Expand All @@ -70,9 +73,7 @@ func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
}

func (c *cassVersion) unmarshal(data []byte) error {
version := strings.TrimSuffix(string(data), "-SNAPSHOT")
version = strings.TrimPrefix(version, "v")
v := strings.Split(version, ".")
v := strings.SplitN(strings.TrimPrefix(strings.TrimSuffix(string(data), "-SNAPSHOT"), "v"), ".", 3)

if len(v) < 2 {
return fmt.Errorf("invalid version string: %s", data)
Expand All @@ -84,18 +85,31 @@ func (c *cassVersion) unmarshal(data []byte) error {
return fmt.Errorf("invalid major version %v: %v", v[0], err)
}

if len(v) == 2 {
vMinor := strings.SplitN(v[1], "-", 2)
c.Minor, err = strconv.Atoi(vMinor[0])
if err != nil {
return fmt.Errorf("invalid minor version %v: %v", vMinor[0], err)
}
if len(vMinor) == 2 {
c.Qualifier = vMinor[1]
}
return nil
}

c.Minor, err = strconv.Atoi(v[1])
if err != nil {
return fmt.Errorf("invalid minor version %v: %v", v[1], err)
}

if len(v) > 2 {
c.Patch, err = strconv.Atoi(v[2])
if err != nil {
return fmt.Errorf("invalid patch version %v: %v", v[2], err)
}
vPatch := strings.SplitN(v[2], "-", 2)
c.Patch, err = strconv.Atoi(vPatch[0])
if err != nil {
return fmt.Errorf("invalid patch version %v: %v", vPatch[0], err)
}
if len(vPatch) == 2 {
c.Qualifier = vPatch[1]
}

return nil
}

Expand All @@ -110,7 +124,6 @@ func (c cassVersion) Before(major, minor, patch int) bool {
} else if c.Minor == minor && c.Patch < patch {
return true
}

}
return false
}
Expand All @@ -120,6 +133,9 @@ func (c cassVersion) AtLeast(major, minor, patch int) bool {
}

func (c cassVersion) String() string {
if c.Qualifier != "" {
return fmt.Sprintf("%d.%d.%d-%v", c.Major, c.Minor, c.Patch, c.Qualifier)
}
return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
}

Expand Down Expand Up @@ -498,7 +514,7 @@ func (h *HostInfo) String() string {
connectAddr, source := h.connectAddressLocked()
return fmt.Sprintf("[HostInfo hostname=%q connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
"preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
"port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
"port=%d data_center=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
h.hostname, h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
connectAddr, source,
h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
Expand Down
25 changes: 16 additions & 9 deletions host_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ func TestUnmarshalCassVersion(t *testing.T) {
data string
version cassVersion
}{
{"3.2", cassVersion{3, 2, 0}},
{"2.10.1-SNAPSHOT", cassVersion{2, 10, 1}},
{"1.2.3", cassVersion{1, 2, 3}},
{"3.2", cassVersion{3, 2, 0, ""}},
{"2.10.1-SNAPSHOT", cassVersion{2, 10, 1, ""}},
{"1.2.3", cassVersion{1, 2, 3, ""}},
{"4.0-rc2", cassVersion{4, 0, 0, "rc2"}},
{"4.3.2-rc1", cassVersion{4, 3, 2, "rc1"}},
{"4.3.2-rc1-qualifier1", cassVersion{4, 3, 2, "rc1-qualifier1"}},
{"4.3-rc1-qualifier1", cassVersion{4, 3, 0, "rc1-qualifier1"}},
}

for i, test := range tests {
Expand All @@ -56,14 +60,17 @@ func TestCassVersionBefore(t *testing.T) {
tests := [...]struct {
version cassVersion
major, minor, patch int
Qualifier string
}{
{cassVersion{1, 0, 0}, 0, 0, 0},
{cassVersion{0, 1, 0}, 0, 0, 0},
{cassVersion{0, 0, 1}, 0, 0, 0},
{cassVersion{1, 0, 0, ""}, 0, 0, 0, ""},
{cassVersion{0, 1, 0, ""}, 0, 0, 0, ""},
{cassVersion{0, 0, 1, ""}, 0, 0, 0, ""},

{cassVersion{1, 0, 0}, 0, 1, 0},
{cassVersion{0, 1, 0}, 0, 0, 1},
{cassVersion{4, 1, 0}, 3, 1, 2},
{cassVersion{1, 0, 0, ""}, 0, 1, 0, ""},
{cassVersion{0, 1, 0, ""}, 0, 0, 1, ""},
{cassVersion{4, 1, 0, ""}, 3, 1, 2, ""},

{cassVersion{4, 1, 0, ""}, 3, 1, 2, ""},
}

for i, test := range tests {
Expand Down
157 changes: 157 additions & 0 deletions hostpolicy/hostpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package hostpolicy

import (
"sync"

"github.com/hailocab/go-hostpool"

"github.com/gocql/gocql"
)

// HostPool is a host policy which uses the bitly/go-hostpool library
// to distribute queries between hosts and prevent sending queries to
// unresponsive hosts. When creating the host pool that is passed to the policy
// use an empty slice of hosts as the hostpool will be populated later by gocql.
// See below for examples of usage:
//
// // Create host selection policy using a simple host pool
// cluster.PoolConfig.HostSelectionPolicy = HostPool(hostpool.New(nil))
//
// // Create host selection policy using an epsilon greedy pool
// cluster.PoolConfig.HostSelectionPolicy = HostPool(
// hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
// )

func HostPool(hp hostpool.HostPool) gocql.HostSelectionPolicy {
return &hostPoolHostPolicy{hostMap: map[string]*gocql.HostInfo{}, hp: hp}
}

type hostPoolHostPolicy struct {
hp hostpool.HostPool
mu sync.RWMutex
hostMap map[string]*gocql.HostInfo
}

func (r *hostPoolHostPolicy) Init(*gocql.Session) {}
func (r *hostPoolHostPolicy) Reset() {}
func (r *hostPoolHostPolicy) IsOperational(*gocql.Session) error { return nil }
func (r *hostPoolHostPolicy) KeyspaceChanged(gocql.KeyspaceUpdateEvent) {}
func (r *hostPoolHostPolicy) SetPartitioner(string) {}
func (r *hostPoolHostPolicy) IsLocal(*gocql.HostInfo) bool { return true }

func (r *hostPoolHostPolicy) SetHosts(hosts []*gocql.HostInfo) {
peers := make([]string, len(hosts))
hostMap := make(map[string]*gocql.HostInfo, len(hosts))

for i, host := range hosts {
ip := host.ConnectAddress().String()
peers[i] = ip
hostMap[ip] = host
}

r.mu.Lock()
r.hp.SetHosts(peers)
r.hostMap = hostMap
r.mu.Unlock()
}

func (r *hostPoolHostPolicy) AddHost(host *gocql.HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

// If the host addr is present and isn't nil return
if h, ok := r.hostMap[ip]; ok && h != nil {
return
}
// otherwise, add the host to the map
r.hostMap[ip] = host
// and construct a new peer list to give to the HostPool
hosts := make([]string, 0, len(r.hostMap))
for addr := range r.hostMap {
hosts = append(hosts, addr)
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) RemoveHost(host *gocql.HostInfo) {
ip := host.ConnectAddress().String()

r.mu.Lock()
defer r.mu.Unlock()

if _, ok := r.hostMap[ip]; !ok {
return
}

delete(r.hostMap, ip)
hosts := make([]string, 0, len(r.hostMap))
for _, host := range r.hostMap {
hosts = append(hosts, host.ConnectAddress().String())
}

r.hp.SetHosts(hosts)
}

func (r *hostPoolHostPolicy) HostUp(host *gocql.HostInfo) {
r.AddHost(host)
}

func (r *hostPoolHostPolicy) HostDown(host *gocql.HostInfo) {
r.RemoveHost(host)
}

func (r *hostPoolHostPolicy) Pick(qry gocql.ExecutableQuery) gocql.NextHost {
return func() gocql.SelectedHost {
r.mu.RLock()
defer r.mu.RUnlock()

if len(r.hostMap) == 0 {
return nil
}

hostR := r.hp.Get()
host, ok := r.hostMap[hostR.Host()]
if !ok {
return nil
}

return selectedHostPoolHost{
policy: r,
info: host,
hostR: hostR,
}
}
}

// selectedHostPoolHost is a host returned by the hostPoolHostPolicy and
// implements the SelectedHost interface
type selectedHostPoolHost struct {
policy *hostPoolHostPolicy
info *gocql.HostInfo
hostR hostpool.HostPoolResponse
}

func (host selectedHostPoolHost) Info() *gocql.HostInfo {
return host.info
}

func (host selectedHostPoolHost) Token() gocql.Token {
return nil
}

func (host selectedHostPoolHost) Mark(err error) {
ip := host.info.ConnectAddress().String()

host.policy.mu.RLock()
defer host.policy.mu.RUnlock()

if _, ok := host.policy.hostMap[ip]; !ok {
// host was removed between pick and mark
return
}

host.hostR.Mark(err)
}
Loading
Loading