-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathindex.js
325 lines (268 loc) · 9.07 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
'use strict'
const { Key, Adapter } = require('interface-datastore')
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
const uint8ArrayEquals = require('uint8arrays/equals')
const errcode = require('err-code')
const debug = require('debug')
const log = Object.assign(debug('datastore-pubsub:publisher'), {
error: debug('datastore-pubsub:publisher:error')
})
/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('./types').Validator} Validator
* @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn
* @typedef {import('libp2p-interfaces/src/pubsub/message').Message} PubSubMessage
*/
// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
// [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js}
class DatastorePubsub extends Adapter {
/**
* Creates an instance of DatastorePubsub.
*
* @param {import('libp2p-interfaces/src/pubsub')} pubsub - pubsub implementation
* @param {import('interface-datastore').Datastore} datastore - datastore instance
* @param {PeerId} peerId - peer-id instance
* @param {Validator} validator - validator functions
* @param {SubscriptionKeyFn} [subscriptionKeyFn] - function to manipulate the key topic received before processing it
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
super()
if (!validator) {
throw errcode(new TypeError('missing validator'), 'ERR_INVALID_PARAMETERS')
}
if (typeof validator.validate !== 'function') {
throw errcode(new TypeError('missing validate function'), 'ERR_INVALID_PARAMETERS')
}
if (typeof validator.select !== 'function') {
throw errcode(new TypeError('missing select function'), 'ERR_INVALID_PARAMETERS')
}
if (subscriptionKeyFn && typeof subscriptionKeyFn !== 'function') {
throw errcode(new TypeError('invalid subscriptionKeyFn received'), 'ERR_INVALID_PARAMETERS')
}
this._pubsub = pubsub
this._datastore = datastore
this._peerId = peerId
this._validator = validator
this._handleSubscriptionKeyFn = subscriptionKeyFn
// Bind _onMessage function, which is called by pubsub.
this._onMessage = this._onMessage.bind(this)
}
/**
* Publishes a value through pubsub.
*
* @param {Uint8Array} key - identifier of the value to be published.
* @param {Uint8Array} val - value to be propagated.
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async put (key, val) {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')
}
if (!(val instanceof Uint8Array)) {
const errMsg = 'received value is not a Uint8Array'
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')
}
const stringifiedTopic = keyToTopic(key)
log(`publish value for topic ${stringifiedTopic}`)
// Publish record to pubsub
await this._pubsub.publish(stringifiedTopic, val)
}
/**
* Try to subscribe a topic with Pubsub and returns the local value if available.
*
* @param {Uint8Array} key - identifier of the value to be subscribed.
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async get (key) {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')
}
const stringifiedTopic = keyToTopic(key)
const subscriptions = await this._pubsub.getTopics()
// If already subscribed, just try to get it
if (subscriptions && Array.isArray(subscriptions) && subscriptions.indexOf(stringifiedTopic) > -1) {
return this._getLocal(key)
}
// subscribe
try {
this._pubsub.on(stringifiedTopic, this._onMessage)
await this._pubsub.subscribe(stringifiedTopic)
} catch (err) {
const errMsg = `cannot subscribe topic ${stringifiedTopic}`
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_SUBSCRIBING_TOPIC')
}
log(`subscribed values for key ${stringifiedTopic}`)
return this._getLocal(key)
}
/**
* Unsubscribe topic.
*
* @param {Uint8Array} key - identifier of the value to unsubscribe.
* @returns {void}
*/
unsubscribe (key) {
const stringifiedTopic = keyToTopic(key)
this._pubsub.removeListener(stringifiedTopic, this._onMessage)
return this._pubsub.unsubscribe(stringifiedTopic)
}
/**
* Get record from local datastore
*
* @private
* @param {Uint8Array} key
*/
async _getLocal (key) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
let dsVal
try {
dsVal = await this._datastore.get(routingKey)
} catch (err) {
if (err.code !== 'ERR_NOT_FOUND') {
const errMsg = `unexpected error getting the ipns record for ${routingKey.toString()}`
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_UNEXPECTED_ERROR_GETTING_RECORD')
}
const errMsg = `local record requested was not found for ${routingKey.toString()}`
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_FOUND')
}
if (!(dsVal instanceof Uint8Array)) {
const errMsg = 'found record that we couldn\'t convert to a value'
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_INVALID_RECORD_RECEIVED')
}
return dsVal
}
/**
* handles pubsub subscription messages
*
* @param {PubSubMessage} msg
*/
async _onMessage (msg) {
const { data, from, topicIDs } = msg
let key
try {
key = topicToKey(topicIDs[0])
} catch (err) {
log.error(err)
return
}
log(`message received for topic ${topicIDs[0]}`)
// Stop if the message is from the peer (it already stored it while publishing to pubsub)
if (from === this._peerId.toB58String()) {
log('message discarded as it is from the same peer')
return
}
if (this._handleSubscriptionKeyFn) {
let res
try {
res = await this._handleSubscriptionKeyFn(key)
} catch (err) {
log.error('message discarded by the subscriptionKeyFn')
return
}
key = res
}
try {
await this._storeIfSubscriptionIsBetter(key, data)
} catch (err) {
log.error(err)
}
}
/**
* Store the received record if it is better than the current stored
*
* @param {Uint8Array} key
* @param {Uint8Array} data
*/
async _storeIfSubscriptionIsBetter (key, data) {
let isBetter = false
try {
isBetter = await this._isBetter(key, data)
} catch (err) {
if (err.code !== 'ERR_NOT_VALID_RECORD') {
throw err
}
}
if (isBetter) {
await this._storeRecord(key, data)
}
}
/**
* Validate record according to the received validation function
*
* @param {Uint8Array} value
* @param {Uint8Array} peerId
*/
async _validateRecord (value, peerId) { // eslint-disable-line require-await
return this._validator.validate(value, peerId)
}
/**
* Select the best record according to the received select function
*
* @param {Uint8Array} receivedRecord
* @param {Uint8Array} currentRecord
*/
async _selectRecord (receivedRecord, currentRecord) {
const res = await this._validator.select(receivedRecord, currentRecord)
// If the selected was the first (0), it should be stored (true)
return res === 0
}
/**
* Verify if the record received through pubsub is valid and better than the one currently stored
*
* @param {Uint8Array} key
* @param {Uint8Array} val
*/
async _isBetter (key, val) {
// validate received record
let error, valid
try {
valid = await this._validateRecord(val, key)
} catch (err) {
error = err
}
// If not valid, it is not better than the one currently available
if (error || !valid) {
const errMsg = 'record received through pubsub is not valid'
log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_VALID_RECORD')
}
// Get Local record
const dsKey = new Key(key)
let currentRecord
try {
currentRecord = await this._getLocal(dsKey.uint8Array())
} catch (err) {
// if the old one is invalid, the new one is *always* better
return true
}
// if the same record, do not need to store
if (uint8ArrayEquals(currentRecord, val)) {
return false
}
// verify if the received record should replace the current one
return this._selectRecord(val, currentRecord)
}
/**
* add record to datastore
*
* @param {Uint8Array} key
* @param {Uint8Array} data
*/
async _storeRecord (key, data) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
await this._datastore.put(routingKey, data)
log(`record for ${keyToTopic(key)} was stored in the datastore`)
}
}
exports = module.exports = DatastorePubsub