-
Notifications
You must be signed in to change notification settings - Fork 209
Expand file tree
/
Copy pathdownloader.go
More file actions
382 lines (316 loc) · 11.9 KB
/
downloader.go
File metadata and controls
382 lines (316 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
package execution_data
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"github.com/ipfs/go-cid"
"golang.org/x/sync/errgroup"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/executiondatasync/tracker"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/storage"
)
// Downloader is used to download execution data blobs from the network via a blob service.
type Downloader interface {
module.ReadyDoneAware
ExecutionDataGetter
ProcessedHeightRecorder
}
var _ Downloader = (*downloader)(nil)
var _ ProcessedHeightRecorder = (*downloader)(nil)
type downloader struct {
ProcessedHeightRecorder
blobService network.BlobService
maxBlobSize int
serializer Serializer
storage tracker.Storage
headers storage.Headers
}
type DownloaderOption func(*downloader)
// WithSerializer configures the serializer for the downloader
func WithSerializer(serializer Serializer) DownloaderOption {
return func(d *downloader) {
d.serializer = serializer
}
}
// WithExecutionDataTracker configures the execution data tracker and the storage headers for the downloader
func WithExecutionDataTracker(storage tracker.Storage, headers storage.Headers) DownloaderOption {
return func(d *downloader) {
d.storage = storage
d.headers = headers
}
}
// NewDownloader creates a new Downloader instance
func NewDownloader(blobService network.BlobService, opts ...DownloaderOption) *downloader {
d := &downloader{
blobService: blobService,
maxBlobSize: DefaultMaxBlobSize,
serializer: DefaultSerializer,
ProcessedHeightRecorder: NewProcessedHeightRecorderManager(0),
}
for _, opt := range opts {
opt(d)
}
return d
}
// Ready returns a channel that will be closed when the downloader is ready to be used
func (d *downloader) Ready() <-chan struct{} {
return d.blobService.Ready()
}
// Done returns a channel that will be closed when the downloader is finished shutting down
func (d *downloader) Done() <-chan struct{} {
return d.blobService.Done()
}
// Get downloads a blob tree identified by executionDataID from the network and returns the deserialized BlockExecutionData struct
//
// Expected errors during normal operations:
// - BlobNotFoundError if some CID in the blob tree could not be found from the blob service
// - MalformedDataError if some level of the blob tree cannot be properly deserialized
// - BlobSizeLimitExceededError if some blob in the blob tree exceeds the maximum allowed size
func (d *downloader) Get(ctx context.Context, executionDataID flow.Identifier) (*BlockExecutionData, error) {
blobGetter := d.blobService.GetSession(ctx)
// First, download the root execution data record which contains a list of chunk execution data
// blobs included in the original record.
edRoot, err := d.getExecutionDataRoot(ctx, executionDataID, blobGetter)
if err != nil {
return nil, fmt.Errorf("failed to get execution data root: %w", err)
}
g, gCtx := errgroup.WithContext(ctx)
// Next, download each of the chunk execution data blobs
chunkExecutionDatas := make([]*ChunkExecutionData, len(edRoot.ChunkExecutionDataIDs))
// Execution data cids
var edCids = []cid.Cid{flow.IdToCid(executionDataID)}
var mu sync.Mutex
for i, chunkDataID := range edRoot.ChunkExecutionDataIDs {
g.Go(func() error {
ced, cids, err := d.getChunkExecutionData(
gCtx,
chunkDataID,
blobGetter,
)
if err != nil {
return fmt.Errorf("failed to get chunk execution data at index %d: %w", i, err)
}
mu.Lock()
chunkExecutionDatas[i] = ced
edCids = append(edCids, cids...)
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
err = d.trackBlobs(edRoot.BlockID, edCids)
if err != nil {
return nil, fmt.Errorf("failed to track blob: %w", err)
}
// Finally, recombine data into original record.
bed := &BlockExecutionData{
BlockID: edRoot.BlockID,
ChunkExecutionDatas: chunkExecutionDatas,
}
return bed, nil
}
// getExecutionDataRoot downloads the root execution data record from the network and returns the
// deserialized flow.BlockExecutionDataRoot struct.
//
// Expected errors during normal operations:
// - BlobNotFoundError if the root blob could not be found from the blob service
// - MalformedDataError if the root blob cannot be properly deserialized
// - BlobSizeLimitExceededError if the root blob exceeds the maximum allowed size
func (d *downloader) getExecutionDataRoot(
ctx context.Context,
rootID flow.Identifier,
blobGetter network.BlobGetter,
) (*flow.BlockExecutionDataRoot, error) {
rootCid := flow.IdToCid(rootID)
blob, err := blobGetter.GetBlob(ctx, rootCid)
if err != nil {
if errors.Is(err, network.ErrBlobNotFound) {
return nil, NewBlobNotFoundError(rootCid)
}
return nil, fmt.Errorf("failed to get root blob: %w", err)
}
blobSize := len(blob.RawData())
if blobSize > d.maxBlobSize {
return nil, &BlobSizeLimitExceededError{blob.Cid()}
}
v, err := d.serializer.Deserialize(bytes.NewBuffer(blob.RawData()))
if err != nil {
return nil, NewMalformedDataError(err)
}
edRoot, ok := v.(*flow.BlockExecutionDataRoot)
if !ok {
return nil, NewMalformedDataError(fmt.Errorf("execution data root blob does not deserialize to a BlockExecutionDataRoot, got %T instead", v))
}
return edRoot, nil
}
// getChunkExecutionData downloads a chunk execution data blob from the network and returns the
// deserialized ChunkExecutionData struct with list of cids from all levels of the blob tree.
//
// Expected errors during normal operations:
// - context.Canceled or context.DeadlineExceeded if the context is canceled or times out
// - BlobNotFoundError if the root blob could not be found from the blob service
// - MalformedDataError if the root blob cannot be properly deserialized
// - BlobSizeLimitExceededError if the root blob exceeds the maximum allowed size
func (d *downloader) getChunkExecutionData(
ctx context.Context,
chunkExecutionDataID cid.Cid,
blobGetter network.BlobGetter,
) (*ChunkExecutionData, []cid.Cid, error) {
cids := []cid.Cid{chunkExecutionDataID}
cidsFromAllLevels := []cid.Cid{chunkExecutionDataID}
// iteratively process each level of the blob tree until a ChunkExecutionData is returned or an
// error is encountered
for i := 0; ; i++ {
v, err := d.getBlobs(ctx, blobGetter, cids)
if err != nil {
return nil, nil, fmt.Errorf("failed to get level %d of blob tree: %w", i, err)
}
switch v := v.(type) {
case *ChunkExecutionData:
return v, cidsFromAllLevels, nil
case *[]cid.Cid:
cidsFromAllLevels = append(cidsFromAllLevels, *v...)
cids = *v
default:
return nil, nil, NewMalformedDataError(fmt.Errorf("blob tree contains unexpected type %T at level %d", v, i))
}
}
}
// trackBlobs updates the storage to track the provided CIDs for a given block.
// This is used to ensure that the blobs can be pruned later.
//
// Parameters:
// - blockID: The identifier of the block to which the blobs belong.
// - cids: CIDs to be tracked.
//
// No errors are expected during normal operations.
func (d *downloader) trackBlobs(blockID flow.Identifier, cids []cid.Cid) error {
if d.storage == nil || d.headers == nil {
return nil
}
return d.storage.Update(func(trackBlobs tracker.TrackBlobsFn) error {
header, err := d.headers.ByBlockID(blockID)
if err != nil {
return err
}
// track new blobs so that they can be pruned later
err = trackBlobs(header.Height, cids...)
if err != nil {
return err
}
d.OnBlockProcessed(header.Height)
return nil
})
}
// getBlobs gets the given CIDs from the blobservice, reassembles the blobs, and deserializes the reassembled data into an object.
//
// Expected errors during normal operations:
// - context.Canceled or context.DeadlineExceeded if the context is canceled or times out
// - BlobNotFoundError if the root blob could not be found from the blob service
// - MalformedDataError if the root blob cannot be properly deserialized
// - BlobSizeLimitExceededError if the root blob exceeds the maximum allowed size
func (d *downloader) getBlobs(ctx context.Context, blobGetter network.BlobGetter, cids []cid.Cid) (any, error) {
// this uses an optimization to deserialize the data in a streaming fashion as it is received
// from the network, reducing the amount of memory required to deserialize large objects.
blobCh, errCh := d.retrieveBlobs(ctx, blobGetter, cids)
bcr := blobs.NewBlobChannelReader(blobCh)
v, deserializeErr := d.serializer.Deserialize(bcr)
// blocks until all blobs have been retrieved or an error is encountered
err := <-errCh
if err != nil {
return nil, err
}
if deserializeErr != nil {
return nil, NewMalformedDataError(deserializeErr)
}
return v, nil
}
// retrieveBlobs asynchronously retrieves the blobs for the given CIDs with the given BlobGetter.
// Blobs corresponding to the requested CIDs are returned in order on the response channel.
//
// Expected errors during normal operations:
// - context.Canceled or context.DeadlineExceeded if the context is canceled or times out
// - BlobNotFoundError if the root blob could not be found from the blob service
// - MalformedDataError if the root blob cannot be properly deserialized
// - BlobSizeLimitExceededError if the root blob exceeds the maximum allowed size
func (d *downloader) retrieveBlobs(parent context.Context, blobGetter network.BlobGetter, cids []cid.Cid) (<-chan blobs.Blob, <-chan error) {
blobsOut := make(chan blobs.Blob, len(cids))
errCh := make(chan error, 1)
go func() {
var err error
ctx, cancel := context.WithCancel(parent)
defer cancel()
defer close(blobsOut)
defer func() {
errCh <- err
close(errCh)
}()
blobChan := blobGetter.GetBlobs(ctx, cids) // initiate a batch request for the given CIDs
cachedBlobs := make(map[cid.Cid]blobs.Blob)
cidCounts := make(map[cid.Cid]int) // used to account for duplicate CIDs
// record the number of times each CID appears in the list. this is later used to determine
// when it's safe to delete cached blobs during processing
for _, c := range cids {
cidCounts[c]++
}
// for each cid, find the corresponding blob from the incoming blob channel and send it to
// the outgoing blob channel in the proper order
for _, c := range cids {
blob, ok := cachedBlobs[c]
if !ok {
if blob, err = d.findBlob(blobChan, c, cachedBlobs); err != nil {
// the blob channel may be closed as a result of the context being canceled,
// in which case we should return the context error.
if ctxErr := ctx.Err(); ctxErr != nil {
err = ctxErr
}
return
}
}
// remove the blob from the cache if it's no longer needed
cidCounts[c]--
if cidCounts[c] == 0 {
delete(cachedBlobs, c)
delete(cidCounts, c)
}
blobsOut <- blob
}
}()
return blobsOut, errCh
}
// findBlob retrieves blobs from the given channel, caching them along the way, until it either
// finds the target blob or exhausts the channel.
//
// This is necessary to ensure blobs can be reassembled in order from the underlying blobservice
// which provides no guarantees for blob order on the response channel.
//
// Expected errors during normal operations:
// - BlobNotFoundError if the root blob could not be found from the blob service
// - BlobSizeLimitExceededError if the root blob exceeds the maximum allowed size
func (d *downloader) findBlob(
blobChan <-chan blobs.Blob,
target cid.Cid,
cache map[cid.Cid]blobs.Blob,
) (blobs.Blob, error) {
// pull blobs off the blob channel until the target blob is found or the channel is closed
// Note: blobs are returned on the blob channel as they are found, in no particular order
for blob := range blobChan {
// check blob size
blobSize := len(blob.RawData())
if blobSize > d.maxBlobSize {
return nil, &BlobSizeLimitExceededError{blob.Cid()}
}
cache[blob.Cid()] = blob
if blob.Cid() == target {
return blob, nil
}
}
return nil, NewBlobNotFoundError(target)
}