Skip to content

Commit

Permalink
Merge pull request #25 from lukechampine/parallel
Browse files Browse the repository at this point in the history
Parallel Write and XOF
  • Loading branch information
lukechampine authored Feb 24, 2025
2 parents 4f5562c + 84bf553 commit 02493b4
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 48 deletions.
104 changes: 86 additions & 18 deletions blake3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
package blake3 // import "lukechampine.com/blake3"

import (
"bytes"
"encoding/binary"
"errors"
"hash"
"io"
"math"
"math/bits"
"runtime"
"sync"

"lukechampine.com/blake3/bao"
"lukechampine.com/blake3/guts"
Expand All @@ -20,32 +23,32 @@ type Hasher struct {
size int // output size, for Sum

// log(n) set of Merkle subtree roots, at most one per height.
stack [64 - (guts.MaxSIMD + 10)][8]uint32 // 10 = log2(guts.ChunkSize)
counter uint64 // number of buffers hashed; also serves as a bit vector indicating which stack elems are occupied
stack [64][8]uint32
counter uint64 // number of buffers hashed; also serves as a bit vector indicating which stack elems are occupied

buf [guts.MaxSIMD * guts.ChunkSize]byte
buf [guts.ChunkSize]byte
buflen int
}

func (h *Hasher) hasSubtreeAtHeight(i int) bool {
return h.counter&(1<<i) != 0
}

func (h *Hasher) pushSubtree(cv [8]uint32) {
func (h *Hasher) pushSubtree(cv [8]uint32, height int) {
// seek to first open stack slot, merging subtrees as we go
i := 0
i := height
for h.hasSubtreeAtHeight(i) {
cv = guts.ChainingValue(guts.ParentNode(h.stack[i], cv, &h.key, h.flags))
i++
}
h.stack[i] = cv
h.counter++
h.counter += 1 << height
}

// rootNode computes the root of the Merkle tree. It does not modify the
// stack.
func (h *Hasher) rootNode() guts.Node {
n := guts.CompressBuffer(&h.buf, h.buflen, &h.key, h.counter*guts.MaxSIMD, h.flags)
n := guts.CompressChunk(h.buf[:h.buflen], &h.key, h.counter, h.flags)
for i := bits.TrailingZeros64(h.counter); i < bits.Len64(h.counter); i++ {
if h.hasSubtreeAtHeight(i) {
n = guts.ParentNode(h.stack[i], guts.ChainingValue(n), &h.key, h.flags)
Expand All @@ -58,16 +61,49 @@ func (h *Hasher) rootNode() guts.Node {
// Write implements hash.Hash.
func (h *Hasher) Write(p []byte) (int, error) {
lenp := len(p)
for len(p) > 0 {
if h.buflen == len(h.buf) {
n := guts.CompressBuffer(&h.buf, h.buflen, &h.key, h.counter*guts.MaxSIMD, h.flags)
h.pushSubtree(guts.ChainingValue(n))
h.buflen = 0
}

// align to chunk boundary
if h.buflen > 0 {
n := copy(h.buf[h.buflen:], p)
h.buflen += n
p = p[n:]
}
if h.buflen == len(h.buf) {
n := guts.CompressChunk(h.buf[:], &h.key, h.counter, h.flags)
h.pushSubtree(guts.ChainingValue(n), 0)
h.buflen = 0
}

// process full chunks
if len(p) > len(h.buf) {
rem := len(p) % len(h.buf)
if rem == 0 {
rem = len(h.buf) // don't prematurely compress
}
eigenbuf := bytes.NewBuffer(p[:len(p)-rem])
trees := guts.Eigentrees(h.counter, uint64(eigenbuf.Len()/guts.ChunkSize))
cvs := make([][8]uint32, len(trees))
counter := h.counter
var wg sync.WaitGroup
for i, height := range trees {
wg.Add(1)
go func(i int, buf []byte, counter uint64) {
defer wg.Done()
cvs[i] = guts.ChainingValue(guts.CompressEigentree(buf, &h.key, counter, h.flags))
}(i, eigenbuf.Next((1<<height)*guts.ChunkSize), counter)
counter += 1 << height
}
wg.Wait()
for i, height := range trees {
h.pushSubtree(cvs[i], height)
}
p = p[len(p)-rem:]
}

// buffer remaining partial chunk
n := copy(h.buf[h.buflen:], p)
h.buflen += n

return lenp, nil
}

Expand Down Expand Up @@ -211,14 +247,46 @@ func (or *OutputReader) Read(p []byte) (int, error) {
p = p[:rem]
}
lenp := len(p)

// drain existing buffer
const bufsize = guts.MaxSIMD * guts.BlockSize
if or.off%bufsize != 0 {
n := copy(p, or.buf[or.off%bufsize:])
p = p[n:]
or.off += uint64(n)
}

for len(p) > 0 {
if or.off%(guts.MaxSIMD*guts.BlockSize) == 0 {
or.n.Counter = or.off / guts.BlockSize
or.n.Counter = or.off / guts.BlockSize
if numBufs := len(p) / len(or.buf); numBufs < 1 {
guts.CompressBlocks(&or.buf, or.n)
n := copy(p, or.buf[or.off%bufsize:])
p = p[n:]
or.off += uint64(n)
} else if numBufs == 1 {
guts.CompressBlocks((*[bufsize]byte)(p), or.n)
p = p[bufsize:]
or.off += bufsize
} else {
// parallelize
par := min(numBufs, runtime.NumCPU())
per := uint64(numBufs / par)
var wg sync.WaitGroup
for range par {
wg.Add(1)
go func(p []byte, n guts.Node) {
defer wg.Done()
for i := range per {
guts.CompressBlocks((*[bufsize]byte)(p[i*bufsize:]), n)
n.Counter += bufsize / guts.BlockSize
}
}(p, or.n)
p = p[per*bufsize:]
or.off += per * bufsize
or.n.Counter = or.off / guts.BlockSize
}
wg.Wait()
}
n := copy(p, or.buf[or.off%(guts.MaxSIMD*guts.BlockSize):])
p = p[n:]
or.off += uint64(n)
}
return lenp, nil
}
Expand Down
84 changes: 57 additions & 27 deletions blake3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"testing"

"lukechampine.com/blake3"
"lukechampine.com/blake3/guts"
)

func toHex(data []byte) string { return hex.EncodeToString(data) }
Expand Down Expand Up @@ -106,6 +108,24 @@ func TestXOF(t *testing.T) {
}
}
}

{
// test multiple-buffer output
golden := make([]byte, 1<<20)
n := guts.CompressChunk(nil, &guts.IV, 0, 0)
n.Flags |= guts.FlagRoot
for i := 0; i < len(golden); i += 64 {
block := guts.WordsToBytes(guts.CompressNode(n))
copy(golden[i:], block[:])
n.Counter++
}
got := make([]byte, 1<<20)
blake3.New(0, nil).XOF().Read(got)
if !bytes.Equal(golden, got) {
t.Error("XOF output did not match golden output")
}
}

// test behavior at end of stream
xof := blake3.New(0, nil).XOF()
buf := make([]byte, 1024)
Expand Down Expand Up @@ -184,6 +204,21 @@ func TestReset(t *testing.T) {
}
}

func TestEigentrees(t *testing.T) {
for i := uint64(0); i < 64; i++ {
for j := uint64(0); j < 64; j++ {
trees := guts.Eigentrees(i, j)
x := i
for _, tree := range trees {
x += 1 << tree
}
if x != i+j {
t.Errorf("Wrong eigentrees for %v, %v: %v", i, j, trees)
}
}
}
}

type nopReader struct{}

func (nopReader) Read(p []byte) (int, error) { return len(p), nil }
Expand All @@ -195,34 +230,29 @@ func BenchmarkWrite(b *testing.B) {
}

func BenchmarkXOF(b *testing.B) {
b.ReportAllocs()
b.SetBytes(1024)
io.CopyN(io.Discard, blake3.New(0, nil).XOF(), int64(b.N*1024))
for _, size := range []int64{64, 1024, 65536, 1048576} {
b.Run(fmt.Sprint(size), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(size)
buf := make([]byte, size)
xof := blake3.New(0, nil).XOF()
for i := 0; i < b.N; i++ {
xof.Seek(0, 0)
xof.Read(buf)
}
})
}
}

func BenchmarkSum256(b *testing.B) {
b.Run("64", func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(64)
buf := make([]byte, 64)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
b.Run("1024", func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(1024)
buf := make([]byte, 1024)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
b.Run("65536", func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(65536)
buf := make([]byte, 65536)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
for _, size := range []int64{64, 1024, 65536, 1048576} {
b.Run(fmt.Sprint(size), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(size)
buf := make([]byte, size)
for i := 0; i < b.N; i++ {
blake3.Sum256(buf)
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module lukechampine.com/blake3

go 1.17
go 1.22

require github.com/klauspost/cpuid/v2 v2.0.9
4 changes: 3 additions & 1 deletion guts/compress_amd64.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package guts

import "unsafe"
import (
"unsafe"
)

//go:generate go run avo/gen.go -out blake3_amd64.s

Expand Down
4 changes: 3 additions & 1 deletion guts/compress_noasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package guts

import "encoding/binary"
import (
"encoding/binary"
)

// CompressBuffer compresses up to MaxSIMD chunks in parallel and returns their
// root node.
Expand Down
54 changes: 54 additions & 0 deletions guts/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
// function.
package guts

import (
"math/bits"
"sync"
)

// Various constants.
const (
FlagChunkStart = 1 << iota
Expand Down Expand Up @@ -46,3 +51,52 @@ func ParentNode(left, right [8]uint32, key *[8]uint32, flags uint32) Node {
copy(n.Block[8:], right[:])
return n
}

// Eigentrees returns the sequence of eigentree heights that increment counter
// to counter+chunks.
func Eigentrees(counter uint64, chunks uint64) (trees []int) {
for i := counter; i < counter+chunks; {
bite := min(bits.TrailingZeros64(i), bits.Len64(counter+chunks-i)-1)
trees = append(trees, bite)
i += 1 << bite
}
return
}

// CompressEigentree compresses a buffer of 2^n chunks in parallel, returning
// their root node.
func CompressEigentree(buf []byte, key *[8]uint32, counter uint64, flags uint32) Node {
if numChunks := uint64(len(buf) / ChunkSize); bits.OnesCount64(numChunks) != 1 {
panic("non-power-of-two eigentree size")
} else if numChunks == 1 {
return CompressChunk(buf, key, counter, flags)
} else if numChunks <= MaxSIMD {
buflen := len(buf)
if cap(buf) < MaxSIMD*ChunkSize {
buf = append(buf, make([]byte, MaxSIMD*ChunkSize-len(buf))...)
}
return CompressBuffer((*[MaxSIMD * ChunkSize]byte)(buf[:MaxSIMD*ChunkSize]), buflen, key, counter, flags)
} else {
cvs := make([][8]uint32, numChunks/MaxSIMD)
var wg sync.WaitGroup
for i := range cvs {
wg.Add(1)
go func(i uint64) {
defer wg.Done()
cvs[i] = ChainingValue(CompressBuffer((*[MaxSIMD * ChunkSize]byte)(buf[i*MaxSIMD*ChunkSize:]), MaxSIMD*ChunkSize, key, counter+(MaxSIMD*i), flags))
}(uint64(i))
}
wg.Wait()

var rec func(cvs [][8]uint32) Node
rec = func(cvs [][8]uint32) Node {
if len(cvs) == 2 {
return ParentNode(cvs[0], cvs[1], key, flags)
} else if len(cvs) == MaxSIMD {
return mergeSubtrees((*[MaxSIMD][8]uint32)(cvs), MaxSIMD, key, flags)
}
return ParentNode(ChainingValue(rec(cvs[:len(cvs)/2])), ChainingValue(rec(cvs[len(cvs)/2:])), key, flags)
}
return rec(cvs)
}
}

0 comments on commit 02493b4

Please sign in to comment.