Skip to content

Commit 038bf27

Browse files
feat: mailman_queue for mailman
1 parent 615f058 commit 038bf27

File tree

13 files changed

+280
-37
lines changed

13 files changed

+280
-37
lines changed

cmd/smtpbridge/main.go

+9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/ItsNotGoodName/smtpbridge/internal/database"
1616
"github.com/ItsNotGoodName/smtpbridge/internal/file"
1717
"github.com/ItsNotGoodName/smtpbridge/internal/mailman"
18+
"github.com/ItsNotGoodName/smtpbridge/internal/models"
1819
"github.com/ItsNotGoodName/smtpbridge/internal/repo"
1920
"github.com/ItsNotGoodName/smtpbridge/migrations"
2021
"github.com/ItsNotGoodName/smtpbridge/pkg/secret"
@@ -96,6 +97,14 @@ func run(flags *flag.FlagSet) lieut.Executor {
9697
webFileStore := app.NewWebFileStore("apple-touch-icon.png", fmt.Sprintf("http://127.0.0.1:%d", cfg.HTTPPort))
9798
app := app.New(db, fileStore, bus, cfg.Config, cfg.EndpointFactory, webFileStore)
9899

100+
// TODO: move this somewhere else
101+
{
102+
release := bus.OnEnvelopeCreated(func(ctx context.Context, evt models.EventEnvelopeCreated) error {
103+
return app.MailmanEnqueue(ctx, evt.ID)
104+
})
105+
defer release()
106+
}
107+
99108
// Supervisor
100109
super := suture.New("root", suture.Spec{
101110
EventHook: sutureext.EventHook(),

internal/app/app.go

+32
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package app
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"time"
@@ -359,4 +360,35 @@ func (a App) RetentionPolicyRun(ctx context.Context, tracer trace.Tracer) error
359360
return nil
360361
}
361362

363+
func (a App) MailmanDequeue(ctx context.Context) (*models.Envelope, error) {
364+
envelopeID, err := repo.MailmanDequeue(ctx, a.db)
365+
if err != nil {
366+
if errors.Is(err, repo.ErrNoRows) {
367+
return nil, nil
368+
}
369+
return nil, err
370+
}
371+
372+
env, err := repo.EnvelopeGet(ctx, a.db, envelopeID)
373+
if err != nil {
374+
if errors.Is(err, repo.ErrNoRows) {
375+
return nil, nil
376+
}
377+
return nil, err
378+
}
379+
380+
return &env, nil
381+
}
382+
383+
func (a App) MailmanEnqueue(ctx context.Context, envelopeID int64) error {
384+
err := repo.MailmanEnqueue(ctx, a.db, envelopeID)
385+
if err != nil {
386+
return err
387+
}
388+
389+
a.bus.MailmanEnqueued(ctx)
390+
391+
return nil
392+
}
393+
362394
var _ core.App = App{}

internal/bus/bus.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,15 @@ import (
77
"github.com/ItsNotGoodName/smtpbridge/internal/models"
88
"github.com/google/uuid"
99
"github.com/mustafaturan/bus/v3"
10+
"github.com/rs/zerolog/log"
1011
)
1112

13+
func logEmitErr(err error) {
14+
if err != nil {
15+
log.Err(err).Msg("Failed to emit bus event")
16+
}
17+
}
18+
1219
type generator struct{}
1320

1421
func (generator) Generate() string {
@@ -28,6 +35,7 @@ func New() (Bus, error) {
2835
bus.RegisterTopics(
2936
TopicEnvelopeCreated,
3037
TopicEnvelopeDeleted,
38+
TopicMailmanEnqueued,
3139
)
3240

3341
return Bus{
@@ -38,11 +46,12 @@ func New() (Bus, error) {
3846
const (
3947
TopicEnvelopeCreated = "envelope.created"
4048
TopicEnvelopeDeleted = "envelope.deleted"
49+
TopicMailmanEnqueued = "mailman.enqueued"
4150
)
4251

4352
// EnvelopeCreated implements core.Bus.
4453
func (b Bus) EnvelopeCreated(ctx context.Context, id int64) {
45-
b.bus.Emit(ctx, TopicEnvelopeCreated, id)
54+
logEmitErr(b.bus.Emit(ctx, TopicEnvelopeCreated, id))
4655
}
4756

4857
// OnEnvelopeCreated implements core.Bus.
@@ -64,7 +73,7 @@ func (b Bus) OnEnvelopeCreated(h func(ctx context.Context, evt models.EventEnvel
6473

6574
// EnvelopeDeleted implements core.Bus.
6675
func (b Bus) EnvelopeDeleted(ctx context.Context) {
67-
b.bus.Emit(ctx, TopicEnvelopeDeleted, nil)
76+
logEmitErr(b.bus.Emit(ctx, TopicEnvelopeDeleted, nil))
6877
}
6978

7079
// OnEnvelopeDeleted implements core.Bus.
@@ -81,4 +90,23 @@ func (b Bus) OnEnvelopeDeleted(h func(ctx context.Context, evt models.EventEnvel
8190
return func() { b.bus.DeregisterHandler(key) }
8291
}
8392

93+
// MailmanEnqueued implements core.Bus.
94+
func (b Bus) MailmanEnqueued(ctx context.Context) {
95+
logEmitErr(b.bus.Emit(ctx, TopicMailmanEnqueued, nil))
96+
}
97+
98+
// OnMailmanEnqueued implements core.Bus.
99+
func (b Bus) OnMailmanEnqueued(h func(ctx context.Context, evt models.EventMailmanEnqueued) error) func() {
100+
key := uuid.NewString()
101+
102+
b.bus.RegisterHandler(key, bus.Handler{
103+
Handle: func(ctx context.Context, e bus.Event) {
104+
h(ctx, models.EventMailmanEnqueued{})
105+
},
106+
Matcher: TopicMailmanEnqueued,
107+
})
108+
109+
return func() { b.bus.DeregisterHandler(key) }
110+
}
111+
84112
var _ core.Bus = Bus{}

internal/core/app.go

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ type Bus interface {
1313
OnEnvelopeCreated(func(ctx context.Context, evt models.EventEnvelopeCreated) error) func()
1414
EnvelopeDeleted(ctx context.Context)
1515
OnEnvelopeDeleted(func(ctx context.Context, evt models.EventEnvelopeDeleted) error) func()
16+
MailmanEnqueued(ctx context.Context)
17+
OnMailmanEnqueued(func(ctx context.Context, evt models.EventMailmanEnqueued) error) func()
1618
}
1719

1820
type App interface {
@@ -46,4 +48,6 @@ type App interface {
4648
TraceDrop(ctx context.Context) error
4749
TraceList(ctx context.Context, page pagination.Page, req models.DTOTraceListRequest) (models.DTOTraceListResult, error)
4850
Tracer(source string) trace.Tracer
51+
MailmanEnqueue(ctx context.Context, envelopeID int64) error
52+
MailmanDequeue(ctx context.Context) (*models.Envelope, error)
4953
}

internal/jet/table/mailman_queue.go

+78
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/jet/table/table_use_schema.go

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/mailman/mailman.go

+31-32
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package mailman
22

33
import (
44
"context"
5-
"fmt"
65
"time"
76

87
"github.com/ItsNotGoodName/smtpbridge/internal/core"
@@ -18,7 +17,6 @@ type Mailman struct {
1817
app core.App
1918
fileStore endpoint.FileStore
2019
endpointFactory endpoint.Factory
21-
queueLimit int
2220
}
2321

2422
func New(app core.App, bus core.Bus, fileStore endpoint.FileStore, endpointFactory endpoint.Factory) Mailman {
@@ -27,20 +25,15 @@ func New(app core.App, bus core.Bus, fileStore endpoint.FileStore, endpointFacto
2725
bus: bus,
2826
fileStore: fileStore,
2927
endpointFactory: endpointFactory,
30-
queueLimit: 100,
3128
}
3229
}
3330

3431
func (m Mailman) Serve(ctx context.Context) error {
35-
idC := make(chan int64, m.queueLimit)
36-
release := m.bus.OnEnvelopeCreated(func(ctx context.Context, evt models.EventEnvelopeCreated) error {
32+
checkC := make(chan struct{}, 1)
33+
release := m.bus.OnMailmanEnqueued(func(ctx context.Context, evt models.EventMailmanEnqueued) error {
3734
select {
38-
case idC <- evt.ID:
35+
case checkC <- struct{}{}:
3936
default:
40-
m.app.Tracer(trace.SourceMailman).Trace(ctx,
41-
"mailman.overflow",
42-
trace.WithEnvelope(evt.ID),
43-
trace.WithError(fmt.Errorf("mailman is full")))
4437
}
4538

4639
return nil
@@ -51,36 +44,42 @@ func (m Mailman) Serve(ctx context.Context) error {
5144
select {
5245
case <-ctx.Done():
5346
return nil
54-
case id := <-idC:
55-
tracer := m.app.Tracer(trace.SourceMailman).
56-
Sticky(trace.WithEnvelope(id))
57-
58-
tracer.Trace(ctx, "mailman.start")
59-
err := m.send(ctx, tracer, id)
60-
if err != nil {
61-
tracer.Trace(ctx, "mailman.error", trace.WithError(err))
62-
log.Err(err).Int64("envelope-id", id).Msg("Failed to send envelope")
47+
case <-checkC:
48+
for {
49+
tracer := m.app.Tracer(trace.SourceMailman)
50+
51+
maybeEnv, err := m.app.MailmanDequeue(ctx)
52+
if err != nil {
53+
tracer.Trace(ctx, "mailman.dequeue", trace.WithError(err))
54+
break
55+
}
56+
if maybeEnv == nil {
57+
break
58+
}
59+
env := *maybeEnv
60+
61+
tracer = tracer.Sticky(trace.WithEnvelope(env.Message.ID))
62+
63+
tracer.Trace(ctx, "mailman.start")
64+
if err := m.send(ctx, tracer, env); err != nil {
65+
tracer.Trace(ctx, "mailman.error", trace.WithError(err))
66+
log.Err(err).Int64("envelope-id", env.Message.ID).Msg("Failed to send envelope")
67+
}
68+
tracer.Trace(ctx, "mailman.end")
6369
}
64-
tracer.Trace(ctx, "mailman.end")
6570
}
6671
}
6772
}
6873

69-
func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64) error {
70-
// Get envelope
71-
env, err := m.app.EnvelopeGet(ctx, envelopeID)
72-
if err != nil {
73-
return err
74-
}
75-
74+
func (m Mailman) send(ctx context.Context, tracer trace.Tracer, env models.Envelope) error {
7675
// List all rules
7776
rules, err := m.app.RuleEndpointsList(ctx)
7877
if err != nil {
7978
return err
8079
}
8180

8281
if len(rules) == 0 {
83-
tracer.Trace(ctx, "mailman.rules.skip.empty")
82+
tracer.Trace(ctx, "mailman.rules.skip(empty)")
8483
return nil
8584
}
8685

@@ -89,7 +88,7 @@ func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64
8988
tracer := tracer.Sticky(trace.WithRule(r.Rule.ID))
9089

9190
if len(r.Endpoints) == 0 {
92-
tracer.Trace(ctx, "mailman.rule.endpoints.skip.empty")
91+
tracer.Trace(ctx, "mailman.rule.endpoints.skip(empty)")
9392
continue
9493
}
9594

@@ -111,14 +110,14 @@ func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64
111110
continue
112111
}
113112

114-
tracer.Trace(ctx, "mailman.rule.match.pass")
113+
tracer.Trace(ctx, "mailman.rule.match.success")
115114

116115
for _, e := range r.Endpoints {
117116
tracer := tracer.Sticky(trace.WithEndpoint(e.ID))
118117

119118
// Prevent duplicate envelopes
120119
if _, ok := sent[e.ID]; ok {
121-
tracer.Trace(ctx, "mailman.rule.endpoint.skip.duplicate")
120+
tracer.Trace(ctx, "mailman.rule.endpoint.skip(duplicate)")
122121
continue
123122
}
124123
sent[e.ID] = struct{}{}
@@ -137,7 +136,7 @@ func (m Mailman) send(ctx context.Context, tracer trace.Tracer, envelopeID int64
137136
if err != nil {
138137
tracer.Trace(ctx, "mailman.rule.endpoint.send.error", trace.WithError(err), trace.WithDuration(time.Now().Sub(start)))
139138
} else {
140-
tracer.Trace(ctx, "mailman.rule.endpoint.send", trace.WithDuration(time.Now().Sub(start)))
139+
tracer.Trace(ctx, "mailman.rule.endpoint.send.success", trace.WithDuration(time.Now().Sub(start)))
141140
}
142141
}
143142
}

internal/models/event.go

+3
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ type EventEnvelopeCreated struct {
66

77
type EventEnvelopeDeleted struct {
88
}
9+
10+
type EventMailmanEnqueued struct {
11+
}

0 commit comments

Comments
 (0)