Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Write and XOF #25

Merged
merged 3 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 86 additions & 18 deletions blake3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
package blake3 // import "lukechampine.com/blake3"

import (
"bytes"
"encoding/binary"
"errors"
"hash"
"io"
"math"
"math/bits"
"runtime"
"sync"

"lukechampine.com/blake3/bao"
"lukechampine.com/blake3/guts"
Expand All @@ -20,32 +23,32 @@ type Hasher struct {
size int // output size, for Sum

// log(n) set of Merkle subtree roots, at most one per height.
stack [64 - (guts.MaxSIMD + 10)][8]uint32 // 10 = log2(guts.ChunkSize)
counter uint64 // number of buffers hashed; also serves as a bit vector indicating which stack elems are occupied
stack [64][8]uint32
counter uint64 // number of buffers hashed; also serves as a bit vector indicating which stack elems are occupied

buf [guts.MaxSIMD * guts.ChunkSize]byte
buf [guts.ChunkSize]byte
buflen int
}

func (h *Hasher) hasSubtreeAtHeight(i int) bool {
return h.counter&(1<<i) != 0
}

func (h *Hasher) pushSubtree(cv [8]uint32) {
func (h *Hasher) pushSubtree(cv [8]uint32, height int) {
// seek to first open stack slot, merging subtrees as we go
i := 0
i := height
for h.hasSubtreeAtHeight(i) {
cv = guts.ChainingValue(guts.ParentNode(h.stack[i], cv, &h.key, h.flags))
i++
}
h.stack[i] = cv
h.counter++
h.counter += 1 << height
}

// rootNode computes the root of the Merkle tree. It does not modify the
// stack.
func (h *Hasher) rootNode() guts.Node {
n := guts.CompressBuffer(&h.buf, h.buflen, &h.key, h.counter*guts.MaxSIMD, h.flags)
n := guts.CompressChunk(h.buf[:h.buflen], &h.key, h.counter, h.flags)
for i := bits.TrailingZeros64(h.counter); i < bits.Len64(h.counter); i++ {
if h.hasSubtreeAtHeight(i) {
n = guts.ParentNode(h.stack[i], guts.ChainingValue(n), &h.key, h.flags)
Expand All @@ -58,16 +61,49 @@ func (h *Hasher) rootNode() guts.Node {
// Write implements hash.Hash.
func (h *Hasher) Write(p []byte) (int, error) {
lenp := len(p)
for len(p) > 0 {
if h.buflen == len(h.buf) {
n := guts.CompressBuffer(&h.buf, h.buflen, &h.key, h.counter*guts.MaxSIMD, h.flags)
h.pushSubtree(guts.ChainingValue(n))
h.buflen = 0
}

// align to chunk boundary
if h.buflen > 0 {
n := copy(h.buf[h.buflen:], p)
h.buflen += n
p = p[n:]
}
if h.buflen == len(h.buf) {
n := guts.CompressChunk(h.buf[:], &h.key, h.counter, h.flags)
h.pushSubtree(guts.ChainingValue(n), 0)
h.buflen = 0
}

// process full chunks
if len(p) > len(h.buf) {
rem := len(p) % len(h.buf)
if rem == 0 {
rem = len(h.buf) // don't prematurely compress
}
eigenbuf := bytes.NewBuffer(p[:len(p)-rem])
trees := guts.Eigentrees(h.counter, uint64(eigenbuf.Len()/guts.ChunkSize))
cvs := make([][8]uint32, len(trees))
counter := h.counter
var wg sync.WaitGroup
for i, height := range trees {
wg.Add(1)
go func(i int, buf []byte, counter uint64) {
defer wg.Done()
cvs[i] = guts.ChainingValue(guts.CompressEigentree(buf, &h.key, counter, h.flags))
}(i, eigenbuf.Next((1<<height)*guts.ChunkSize), counter)
counter += 1 << height
}
wg.Wait()
for i, height := range trees {
h.pushSubtree(cvs[i], height)
}
p = p[len(p)-rem:]
}

// buffer remaining partial chunk
n := copy(h.buf[h.buflen:], p)
h.buflen += n

return lenp, nil
}

Expand Down Expand Up @@ -211,14 +247,46 @@ func (or *OutputReader) Read(p []byte) (int, error) {
p = p[:rem]
}
lenp := len(p)

// drain existing buffer
const bufsize = guts.MaxSIMD * guts.BlockSize
if or.off%bufsize != 0 {
n := copy(p, or.buf[or.off%bufsize:])
p = p[n:]
or.off += uint64(n)
}

for len(p) > 0 {
if or.off%(guts.MaxSIMD*guts.BlockSize) == 0 {
or.n.Counter = or.off / guts.BlockSize
or.n.Counter = or.off / guts.BlockSize
if numBufs := len(p) / len(or.buf); numBufs < 1 {
guts.CompressBlocks(&or.buf, or.n)
n := copy(p, or.buf[or.off%bufsize:])
p = p[n:]
or.off += uint64(n)
} else if numBufs == 1 {
guts.CompressBlocks((*[bufsize]byte)(p), or.n)
p = p[bufsize:]
or.off += bufsize
} else {
// parallelize
par := min(numBufs, runtime.NumCPU())
per := uint64(numBufs / par)
var wg sync.WaitGroup
for range par {
wg.Add(1)
go func(p []byte, n guts.Node) {
defer wg.Done()
for i := range per {
guts.CompressBlocks((*[bufsize]byte)(p[i*bufsize:]), n)
n.Counter += bufsize / guts.BlockSize
}
}(p, or.n)
p = p[per*bufsize:]
or.off += per * bufsize
or.n.Counter = or.off / guts.BlockSize
}
wg.Wait()
}
n := copy(p, or.buf[or.off%(guts.MaxSIMD*guts.BlockSize):])
p = p[n:]
or.off += uint64(n)
}
return lenp, nil
}
Expand Down
84 changes: 57 additions & 27 deletions blake3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"testing"

"lukechampine.com/blake3"
"lukechampine.com/blake3/guts"
)

func toHex(data []byte) string { return hex.EncodeToString(data) }
Expand Down Expand Up @@ -106,6 +108,24 @@ func TestXOF(t *testing.T) {
}
}
}

{
// test multiple-buffer output
golden := make([]byte, 1<<20)
n := guts.CompressChunk(nil, &guts.IV, 0, 0)
n.Flags |= guts.FlagRoot
for i := 0; i < len(golden); i += 64 {
block := guts.WordsToBytes(guts.CompressNode(n))
copy(golden[i:], block[:])
n.Counter++
}
got := make([]byte, 1<<20)
blake3.New(0, nil).XOF().Read(got)
if !bytes.Equal(golden, got) {
t.Error("XOF output did not match golden output")
}
}

// test behavior at end of stream
xof := blake3.New(0, nil).XOF()
buf := make([]byte, 1024)
Expand Down Expand Up @@ -184,6 +204,21 @@ func TestReset(t *testing.T) {
}
}

func TestEigentrees(t *testing.T) {
for i := uint64(0); i < 64; i++ {
for j := uint64(0); j < 64; j++ {
trees := guts.Eigentrees(i, j)
x := i
for _, tree := range trees {
x += 1 << tree
}
if x != i+j {
t.Errorf("Wrong eigentrees for %v, %v: %v", i, j, trees)
}
}
}
}

type nopReader struct{}

func (nopReader) Read(p []byte) (int, error) { return len(p), nil }
Expand All @@ -195,34 +230,29 @@ func BenchmarkWrite(b *testing.B) {
}

func BenchmarkXOF(b *testing.B) {
b.ReportAllocs()
b.SetBytes(1024)
io.CopyN(io.Discard, blake3.New(0, nil).XOF(), int64(b.N*1024))
for _, size := range []int64{64, 1024, 65536, 1048576} {
b.Run(fmt.Sprint(size), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(size)
buf := make([]byte, size)
xof := blake3.New(0, nil).XOF()
for i := 0; i < b.N; i++ {
xof.Seek(0, 0)
xof.Read(buf)
}
})
}
}

func BenchmarkSum256(b *testing.B) {
b.Run("64", func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(64)
buf := make([]byte, 64)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
b.Run("1024", func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(1024)
buf := make([]byte, 1024)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
b.Run("65536", func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(65536)
buf := make([]byte, 65536)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
for _, size := range []int64{64, 1024, 65536, 1048576} {
b.Run(fmt.Sprint(size), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(size)
buf := make([]byte, size)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module lukechampine.com/blake3

go 1.17
go 1.22

require github.com/klauspost/cpuid/v2 v2.0.9
4 changes: 3 additions & 1 deletion guts/compress_amd64.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package guts

import "unsafe"
import (
"unsafe"
)

//go:generate go run avo/gen.go -out blake3_amd64.s

Expand Down
4 changes: 3 additions & 1 deletion guts/compress_noasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package guts

import "encoding/binary"
import (
"encoding/binary"
)

// CompressBuffer compresses up to MaxSIMD chunks in parallel and returns their
// root node.
Expand Down
54 changes: 54 additions & 0 deletions guts/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
// function.
package guts

import (
"math/bits"
"sync"
)

// Various constants.
const (
FlagChunkStart = 1 << iota
Expand Down Expand Up @@ -46,3 +51,52 @@ func ParentNode(left, right [8]uint32, key *[8]uint32, flags uint32) Node {
copy(n.Block[8:], right[:])
return n
}

// Eigentrees returns the sequence of eigentree heights that increment counter
// to counter+chunks.
func Eigentrees(counter uint64, chunks uint64) (trees []int) {
for i := counter; i < counter+chunks; {
bite := min(bits.TrailingZeros64(i), bits.Len64(counter+chunks-i)-1)
trees = append(trees, bite)
i += 1 << bite
}
return
}

// CompressEigentree compresses a buffer of 2^n chunks in parallel, returning
// their root node.
func CompressEigentree(buf []byte, key *[8]uint32, counter uint64, flags uint32) Node {
if numChunks := uint64(len(buf) / ChunkSize); bits.OnesCount64(numChunks) != 1 {
panic("non-power-of-two eigentree size")
} else if numChunks == 1 {
return CompressChunk(buf, key, counter, flags)
} else if numChunks <= MaxSIMD {
buflen := len(buf)
if cap(buf) < MaxSIMD*ChunkSize {
buf = append(buf, make([]byte, MaxSIMD*ChunkSize-len(buf))...)
}
return CompressBuffer((*[MaxSIMD * ChunkSize]byte)(buf[:MaxSIMD*ChunkSize]), buflen, key, counter, flags)
} else {
cvs := make([][8]uint32, numChunks/MaxSIMD)
var wg sync.WaitGroup
for i := range cvs {
wg.Add(1)
go func(i uint64) {
defer wg.Done()
cvs[i] = ChainingValue(CompressBuffer((*[MaxSIMD * ChunkSize]byte)(buf[i*MaxSIMD*ChunkSize:]), MaxSIMD*ChunkSize, key, counter+(MaxSIMD*i), flags))
}(uint64(i))
}
wg.Wait()

var rec func(cvs [][8]uint32) Node
rec = func(cvs [][8]uint32) Node {
if len(cvs) == 2 {
return ParentNode(cvs[0], cvs[1], key, flags)
} else if len(cvs) == MaxSIMD {
return mergeSubtrees((*[MaxSIMD][8]uint32)(cvs), MaxSIMD, key, flags)
}
return ParentNode(ChainingValue(rec(cvs[:len(cvs)/2])), ChainingValue(rec(cvs[len(cvs)/2:])), key, flags)
}
return rec(cvs)
}
}