Skip to content

Commit 7da18c5

Browse files
authored
Merge pull request #196 from redpanda-data/add-batch-sync-res
Add batch aware sync response method
2 parents 9c2a174 + c343110 commit 7da18c5

File tree

3 files changed

+71
-0
lines changed

3 files changed

+71
-0
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ Changelog
33

44
All notable changes to this project will be documented in this file.
55

6+
## 4.46.0 - 2025-03-20
7+
8+
### Added
9+
10+
- Go API: New batch aware API for capturing synchronous responses from downstream components. (@Jeffail)
11+
612
## 4.45.1 - 2025-03-10
713

814
### Fixed

public/service/message.go

+16
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,22 @@ func (m *Message) AddSyncResponse() error {
690690
return transaction.SetAsResponse(message.Batch{m.part})
691691
}
692692

693+
// WithSyncResponseStore returns a modified message batch and a response store
694+
// associated with all messages within it. If any message of the batch is sent
695+
// through a processing pipeline or output there is the potential for sync
696+
// response components to add messages to the store, which can be consumed once
697+
// an acknowledgement is received.
698+
func (b MessageBatch) WithSyncResponseStore() (MessageBatch, *SyncResponseStore) {
699+
resStore := transaction.NewResultStore()
700+
701+
newB := b.Copy()
702+
for i, m := range newB {
703+
m.part = transaction.AddResultStoreMsg(b[i].part, resStore)
704+
}
705+
706+
return newB, &SyncResponseStore{s: resStore}
707+
}
708+
693709
// AddSyncResponse attempts to add this batch of messages, in its exact current
694710
// condition, to the synchronous response destined for the original source input
695711
// of this data. Synchronous responses aren't supported by all inputs, and so

public/service/message_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -519,3 +519,52 @@ func TestSyncResponse(t *testing.T) {
519519
require.NoError(t, err)
520520
assert.Equal(t, "hello world c", string(data))
521521
}
522+
523+
func TestSyncResponseBatched(t *testing.T) {
524+
batchA := MessageBatch{
525+
NewMessage([]byte("hello world a 1")),
526+
NewMessage([]byte("hello world a 2")),
527+
NewMessage([]byte("hello world a 3")),
528+
}
529+
530+
batchB, storeB := batchA.WithSyncResponseStore()
531+
batchB[0].SetBytes([]byte("hello world b 1"))
532+
batchB[1].SetBytes([]byte("hello world b 2"))
533+
batchB[2].SetBytes([]byte("hello world b 3"))
534+
535+
require.Error(t, batchA.AddSyncResponse())
536+
require.NoError(t, batchB.AddSyncResponse())
537+
538+
batchC := batchB.Copy()
539+
batchC[1].SetBytes([]byte("hello world c 2"))
540+
require.NoError(t, batchC.AddSyncResponse())
541+
542+
batchD := batchA.Copy()
543+
batchD[1].SetBytes([]byte("hello world d 2"))
544+
require.Error(t, batchD.AddSyncResponse())
545+
546+
resBatches := storeB.Read()
547+
require.Len(t, resBatches, 2)
548+
require.Len(t, resBatches[0], 3)
549+
require.Len(t, resBatches[1], 3)
550+
551+
for i, c := range []string{
552+
"hello world b 1",
553+
"hello world b 2",
554+
"hello world b 3",
555+
} {
556+
data, err := resBatches[0][i].AsBytes()
557+
require.NoError(t, err)
558+
assert.Equal(t, c, string(data))
559+
}
560+
561+
for i, c := range []string{
562+
"hello world b 1",
563+
"hello world c 2",
564+
"hello world b 3",
565+
} {
566+
data, err := resBatches[1][i].AsBytes()
567+
require.NoError(t, err)
568+
assert.Equal(t, c, string(data))
569+
}
570+
}

0 commit comments

Comments
 (0)