Skip to content

Commit 68ba56e

Browse files
Release V1.0.0-rc3 (#21)
- Fix Unit Tests - Improve Docs - Add Workflow Test - Remove Unneeded Exports
1 parent cac64a8 commit 68ba56e

File tree

7 files changed

+71
-38
lines changed

7 files changed

+71
-38
lines changed

.github/workflows/test.yml

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
name: Build Test
2+
3+
on:
4+
push:
5+
branches:
6+
- "master"
7+
tags:
8+
- "v*"
9+
pull_request:
10+
branches:
11+
- "master"
12+
13+
jobs:
14+
build:
15+
runs-on: ubuntu-latest
16+
steps:
17+
- name: Checkout
18+
uses: actions/checkout@v4
19+
20+
- name: Setup Go 1.22
21+
uses: actions/setup-go@v5
22+
with:
23+
go-version: "1.22"
24+
25+
- name: Test
26+
run: |
27+
sudo apt update -y
28+
sudo apt install -y golang git
29+
go test -v

README.md

+8-7
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
</h4>
88

99
<p align="center">
10-
<img alt="Go version" src="https://img.shields.io/github/go-mod/go-version/JustinTimperio/gpq">
11-
<a href="https://pkg.go.dev/github.com/JustinTimperio/gpq"><img src="https://pkg.go.dev/badge/github.com/JustinTimperio/gpq.svg" alt="Go Reference"></a>
12-
<img alt="GitHub License" src="https://img.shields.io/github/license/JustinTimperio/gpq">
13-
<img alt="GitHub Release" src="https://img.shields.io/github/v/release/JustinTimperio/gpq">
14-
<img alt="GitHub Issues or Pull Requests" src="https://img.shields.io/github/issues/JustinTimperio/gpq">
10+
<a href="https://go.dev/dl/"><img alt="Go version" src="https://img.shields.io/github/go-mod/go-version/JustinTimperio/gpq"></a>
11+
<a href="https://pkg.go.dev/github.com/JustinTimperio/gpq"><img alt="Go Reference" src="https://pkg.go.dev/badge/github.com/JustinTimperio/gpq.svg"></a>
12+
<a href="https://github.com/JustinTimperio/gpq/blob/master/LICENSE"><img alt="GitHub License" src="https://img.shields.io/github/license/JustinTimperio/gpq"></a>
13+
<a href="https://github.com/JustinTimperio/gpq/releases"><img alt="GitHub Release" src="https://img.shields.io/github/v/release/JustinTimperio/gpq"></a>
14+
<a href="https://github.com/JustinTimperio/gpq/issues"><img alt="GitHub Issues" src="https://img.shields.io/github/issues/JustinTimperio/gpq"></a>
15+
<a href="https://github.com/JustinTimperio/gpq/actions"><img alt="GitHub Branch Status" src="https://img.shields.io/github/checks-status/JustinTimperio/gpq/master"></a>
1516
</p>
1617

1718
## Notice
@@ -71,7 +72,7 @@ import "github.com/JustinTimperio/gpq"
7172
```
7273

7374
### Prerequisites
74-
For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github.com/cornelk/hashmap) and [BadgerDB](https://github.com/dgraph-io/badger).
75+
For this you will need Go >= `1.22` and gpq itself uses [hashmap](https://github.com/cornelk/hashmap), [btree](https://github.com/tidwall/btree) and [BadgerDB](https://github.com/dgraph-io/badger).
7576

7677
### API Reference
7778
- `NewGPQ[d any](options schema.GPQOptions) (uint, *GPQ[d], error)` - Creates a new GPQ with the specified options and returns the number of restored items, the GPQ, and an error if one occurred.
@@ -102,7 +103,7 @@ func main() {
102103
ShouldEscalate: true,
103104
EscalationRate: time.Duration(time.Second),
104105
CanTimeout: true,
105-
Timeout: time.Duration(time.Second * 1),
106+
Timeout: time.Duration(time.Second * 5),
106107
}
107108

108109
opts := schema.GPQOptions{

gpq.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ type GPQ[d any] struct {
3535
// lazyDiskDeleteChan is a channel used to send messages to the lazy disk cache
3636
lazyDiskDeleteChan chan schema.DeleteMessage
3737
// batchHandler allows for synchronization of disk cache batches
38-
batchHandler *BatchHandler[d]
38+
batchHandler *batchHandler[d]
3939
// batchCounter is used to keep track the current batch number
40-
batchCounter *BatchCounter
40+
batchCounter *batchCounter
4141
}
4242

4343
// NewGPQ creates a new GPQ with the given number of buckets
@@ -72,8 +72,8 @@ func NewGPQ[d any](Options schema.GPQOptions) (uint, *GPQ[d], error) {
7272

7373
lazyDiskSendChan: sender,
7474
lazyDiskDeleteChan: receiver,
75-
batchHandler: NewBatchHandler(diskCache),
76-
batchCounter: NewBatchCounter(Options.LazyDiskBatchSize),
75+
batchHandler: newBatchHandler(diskCache),
76+
batchCounter: newBatchCounter(Options.LazyDiskBatchSize),
7777
}
7878

7979
var restored uint
@@ -130,7 +130,7 @@ func (g *GPQ[d]) Enqueue(item schema.Item[d]) error {
130130
item.DiskUUID = key
131131

132132
if g.options.LazyDiskCacheEnabled {
133-
item.BatchNumber = g.batchCounter.Increment()
133+
item.BatchNumber = g.batchCounter.increment()
134134
g.lazyDiskSendChan <- item
135135
} else {
136136
err = g.diskCache.WriteSingle(key, item)
@@ -169,7 +169,7 @@ func (g *GPQ[d]) EnqueueBatch(items []schema.Item[d]) []error {
169169
items[i].DiskUUID = key
170170

171171
if g.options.LazyDiskCacheEnabled {
172-
items[i].BatchNumber = g.batchCounter.Increment()
172+
items[i].BatchNumber = g.batchCounter.increment()
173173
g.lazyDiskSendChan <- items[i]
174174
} else {
175175
err = g.diskCache.WriteSingle(items[i].DiskUUID, items[i])

gpq_base_test.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gpq_test
33
import (
44
"log"
55
"sync"
6+
"sync/atomic"
67
"testing"
78
"time"
89

@@ -113,9 +114,9 @@ func TestPrioritize(t *testing.T) {
113114
}
114115

115116
var (
116-
escalated uint
117-
removed uint
118-
received uint
117+
escalated uint64
118+
removed uint64
119+
received uint64
119120
)
120121

121122
var wg sync.WaitGroup
@@ -135,9 +136,10 @@ func TestPrioritize(t *testing.T) {
135136
if err != nil {
136137
log.Fatalln(err)
137138
}
138-
removed += r
139-
escalated += e
140-
t.Log("Received:", received, "Removed:", removed, "Escalated:", escalated)
139+
140+
atomic.AddUint64(&removed, uint64(r))
141+
atomic.AddUint64(&escalated, uint64(e))
142+
t.Log("Received:", atomic.LoadUint64(&received), "Removed:", atomic.LoadUint64(&removed), "Escalated:", atomic.LoadUint64(&escalated))
141143

142144
case <-shutdown:
143145
break forloop
@@ -164,15 +166,15 @@ func TestPrioritize(t *testing.T) {
164166
go func() {
165167
defer wg.Done()
166168
for {
167-
if received+removed >= tm {
169+
if atomic.LoadUint64(&received)+atomic.LoadUint64(&removed) >= uint64(tm) {
168170
break
169171
}
170172
time.Sleep(time.Millisecond * 10)
171173
_, err := queue.Dequeue()
172174
if err != nil {
173175
continue
174176
}
175-
received++
177+
atomic.AddUint64(&received, 1)
176178
}
177179
t.Log("Dequeued all items")
178180
shutdown <- struct{}{}

gpq_e2e_test.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ func TestE2E(t *testing.T) {
5252

5353
var (
5454
received uint64
55-
removed uint
56-
escalated uint
55+
removed uint64
56+
escalated uint64
5757
)
5858

5959
var wg sync.WaitGroup
@@ -72,9 +72,10 @@ func TestE2E(t *testing.T) {
7272
if err != nil {
7373
log.Fatalln(err)
7474
}
75-
removed += r
76-
escalated += e
77-
t.Log("Received:", received, "Removed:", removed, "Escalated:", escalated)
75+
76+
atomic.AddUint64(&received, uint64(r))
77+
atomic.AddUint64(&escalated, uint64(e))
78+
t.Log("Received:", atomic.LoadUint64(&received), "Removed:", atomic.LoadUint64(&removed), "Escalated:", atomic.LoadUint64(&escalated))
7879

7980
case <-shutdown:
8081
break breaker
@@ -111,7 +112,7 @@ func TestE2E(t *testing.T) {
111112
go func() {
112113
defer wg.Done()
113114
for {
114-
if atomic.LoadUint64(&received)+uint64(removed) >= total {
115+
if atomic.LoadUint64(&received)+atomic.LoadUint64(&removed) >= total {
115116
break
116117
}
117118
items, err := queue.DequeueBatch(batchSize)

gpq_parallel_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// Tests pushing and pulling single messages in parallel
1414
func TestSingleParallel(t *testing.T) {
1515
var (
16-
total uint = 10_000_000
16+
total uint = 1_000_000
1717
syncToDisk bool = false
1818
lazySync bool = false
1919
maxBuckets uint = 10
@@ -95,7 +95,7 @@ func TestSingleParallel(t *testing.T) {
9595
// Tests pushing and pulling batches of messages in parallel
9696
func TestBatchParallel(t *testing.T) {
9797
var (
98-
total uint = 10_000_000
98+
total uint = 1_000_000
9999
syncToDisk bool = false
100100
lazySync bool = false
101101
maxBuckets uint = 10

helpers.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,23 @@ import (
77
"github.com/JustinTimperio/gpq/schema"
88
)
99

10-
type BatchHandler[T any] struct {
10+
type batchHandler[T any] struct {
1111
mux *sync.Mutex
1212
syncedBatches map[uint]bool
1313
deletedBatches map[uint]bool
1414
diskCache *disk.Disk[T]
1515
}
1616

17-
func NewBatchHandler[T any](diskCache *disk.Disk[T]) *BatchHandler[T] {
18-
return &BatchHandler[T]{
17+
func newBatchHandler[T any](diskCache *disk.Disk[T]) *batchHandler[T] {
18+
return &batchHandler[T]{
1919
mux: &sync.Mutex{},
2020
syncedBatches: make(map[uint]bool),
2121
deletedBatches: make(map[uint]bool),
2222
diskCache: diskCache,
2323
}
2424
}
2525

26-
func (bh *BatchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uint) {
26+
func (bh *batchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uint) {
2727
bh.mux.Lock()
2828
defer bh.mux.Unlock()
2929

@@ -36,7 +36,7 @@ func (bh *BatchHandler[T]) processBatch(batch []*schema.Item[T], batchNumber uin
3636
bh.deletedBatches[batchNumber] = false
3737
}
3838

39-
func (bh *BatchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumber uint, wasRestored bool) {
39+
func (bh *batchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumber uint, wasRestored bool) {
4040
bh.mux.Lock()
4141
defer bh.mux.Unlock()
4242

@@ -55,23 +55,23 @@ func (bh *BatchHandler[T]) deleteBatch(batch []*schema.DeleteMessage, batchNumbe
5555

5656
}
5757

58-
type BatchCounter struct {
58+
type batchCounter struct {
5959
mux *sync.Mutex
6060
batchNumber uint
6161
batchCounter uint
6262
batchSize uint
6363
}
6464

65-
func NewBatchCounter(batchSize uint) *BatchCounter {
66-
return &BatchCounter{
65+
func newBatchCounter(batchSize uint) *batchCounter {
66+
return &batchCounter{
6767
mux: &sync.Mutex{},
6868
batchNumber: 0,
6969
batchCounter: 0,
7070
batchSize: batchSize,
7171
}
7272
}
7373

74-
func (bc *BatchCounter) Increment() (batchNumber uint) {
74+
func (bc *batchCounter) increment() (batchNumber uint) {
7575
bc.mux.Lock()
7676
defer bc.mux.Unlock()
7777

0 commit comments

Comments
 (0)