-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathshard.go
185 lines (157 loc) · 4.4 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package giashard
import (
"fmt"
"hash/fnv"
"log"
"net/url"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"github.com/weppos/publicsuffix-go/publicsuffix"
)
type Shard struct {
dir string // root directory
n uint // number of shards (2^n)
size int64 // batch size
key string // key to use for sharding
cols []string // columns
batches []*Batch
}
// we need a specific error type to distinguish from cases where we
// just can't figure out what the shard should be because of bad
// input (which could reasonably be skipped or sent to an explicit
// "corrupted" shard) from errors writing to output which should
// generally be fatal
type ShardErr struct {
s string
e error
}
var ShardError *ShardErr
func NewShardErr(s string, e error) *ShardErr {
return &ShardErr{s, e}
}
func (se *ShardErr) Error() (errs string) {
if se.e == nil {
errs = se.s
} else {
errs = fmt.Sprintf("%s: %v", se.s, se.e)
}
return
}
func (se *ShardErr) Is(target error) bool {
_, ok := target.(*ShardErr)
return ok
}
func (se *ShardErr) Unwrap() (err error) {
return se.e
}
var host_re *regexp.Regexp
var path_re *regexp.Regexp
func init() {
host_re = regexp.MustCompile(`^(?:[a-zA-Z]+://)?([a-zA-Z0-9][a-zA-Z0-9\-.]*[a-zA-Z0-9]).*`)
path_re = regexp.MustCompile(`^([^/]+).*`)
ShardError = NewShardErr("Unspecified error", nil)
}
// disperse records over 2^n shards using key, with batch sizes of size
// this uses the idea of "domain" from publicsuffix, which tries to get the
// most "significant" part of a domain name, stripping prefixes and suffixes
func NewShard(dir string, n uint, size int64, key string, cols ...string) (s *Shard, err error) {
batches := make([]*Batch, 1<<n)
s = &Shard{dir, n, size, key, cols, batches}
return
}
func (s *Shard) Close() (err error) {
for _, b := range s.batches {
if b != nil {
e := b.Close()
if e != nil {
err = e
}
}
}
return
}
func AddRulesToDefaultList(domainList string) (added int, err error) {
rules, err := publicsuffix.DefaultList.LoadFile(domainList, nil)
return len(rules), err
}
// pull out second-level domain (SLD) to calculate shard bucket number
func Slug(key string) (slug string, err error) {
// parse the url to get the domain name
url, e := url.Parse(key)
var host string
if e != nil || len(url.Host) == 0 {
// if we can't parse it, try to extract something sensible using a regexp
ms := host_re.FindStringSubmatch(key)
if len(ms) != 2 {
err = NewShardErr(fmt.Sprintf("Unable to determine host using regexp from %v", key), e)
return
}
host = ms[1]
} else {
host = strings.TrimRight(url.Host, ".") // a trailing . will confuse publicsuffix
}
// parse the domain name to get the slug
dn, err := publicsuffix.Parse(host)
if err != nil {
// last ditch effort to get something reasonable out of the key
ms := path_re.FindStringSubmatch(key)
if len(ms) != 2 || len(ms[1]) == 0 {
err = NewShardErr(fmt.Sprintf("Unable to determine slug by parsing %v from %v", host, key), err)
}
slug = ms[1]
err = nil
} else {
slug = dn.SLD // second-level domain
}
return
}
func ShardId(key string, n uint) (shard uint64, err error) {
hash := fnv.New64() // calculate new 64-bit hash
slug, err := Slug(key)
if err != nil {
return
}
// use the slug to compute the hash
_, err = hash.Write([]byte(slug))
if err != nil {
return
}
shard = hash.Sum64() % (1 << n)
return
}
// This returns an error of ShardErr kind if the error relates to
// figuring out what shard the data should be in. Generally this should
// not be fatal: no writing will have happened and it is safe to just
// skip to the next row. If a different kind of error is returned, it
// relates to writing the output and should be considered fatal.
func (s *Shard) WriteRow(row map[string][]byte) (err error) {
key := row[s.key]
shard, err := ShardId(string(key), s.n)
if err != nil {
return
}
if s.batches[shard] == nil {
b, err := s.openShard(shard)
if err != nil {
return err
}
s.batches[shard] = b
}
err = s.batches[shard].WriteRow(row)
return
}
func (s *Shard) openShard(shard uint64) (b *Batch, err error) {
sdir := s.shardDir(shard)
log.Printf("Initialising shard %d at %s", shard, sdir)
if err = os.MkdirAll(sdir, os.ModePerm); err != nil {
return
}
b, err = NewBatch(sdir, s.size, s.cols...)
return
}
func (s *Shard) shardDir(n uint64) string {
return filepath.Join(s.dir, strconv.FormatInt(int64(n), 10))
}