-
Notifications
You must be signed in to change notification settings - Fork 209
Expand file tree
/
Copy pathblock_time_controller.go
More file actions
462 lines (403 loc) · 19.3 KB
/
block_time_controller.go
File metadata and controls
462 lines (403 loc) · 19.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
// Package cruisectl implements a "cruise control" system for Flow by adjusting
// nodes' latest ProposalTiming in response to changes in the measured view rate and
// target epoch switchover time.
//
// It uses a PID controller with the projected epoch switchover time as the process
// variable and the set-point computed using epoch length config. The error is
// the difference between the projected epoch switchover time, assuming an
// ideal view time τ, and the target epoch switchover time (based on a schedule).
package cruisectl
import (
"fmt"
"time"
"github.com/rs/zerolog"
"go.uber.org/atomic"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/events"
)
// TimedBlock represents a block, with a timestamp recording when the BlockTimeController received the block
type TimedBlock struct {
Block *model.Block
TimeObserved time.Time // timestamp when BlockTimeController received the block, per convention in UTC
}
// epochInfo stores data about the current and next epoch. It is updated when we enter
// the first view of a new epoch, or the EpochSetup phase of the current epoch.
type epochInfo struct {
curEpochFirstView uint64
curEpochFinalView uint64 // F[v] - the final view of the epoch
curEpochTargetEndTime time.Time // T[v] - the target end time of the current epoch
nextEpochFinalView *uint64
}
// targetViewTime returns τ[v], the ideal, steady-state view time for the current epoch.
// For numerical stability, we avoid repetitive conversions between seconds and time.Duration.
// Instead, internally within the controller, we work with float64 in units of seconds.
func (epoch *epochInfo) targetViewTime() float64 {
return epochLength.Seconds() / float64(epoch.curEpochFinalView-epoch.curEpochFirstView+1)
}
// fractionComplete returns the percentage of views completed of the epoch for the given curView.
// curView must be within the range [curEpochFirstView, curEpochFinalView]
// Returns the completion percentage as a float between [0, 1]
func (epoch *epochInfo) fractionComplete(curView uint64) float64 {
return float64(curView-epoch.curEpochFirstView) / float64(epoch.curEpochFinalView-epoch.curEpochFirstView)
}
// BlockTimeController dynamically adjusts the ProposalTiming of this node,
// based on the measured view rate of the consensus committee as a whole, in
// order to achieve a desired switchover time for each epoch.
// In a nutshell, the controller outputs the block time on the happy path, i.e.
// - Suppose the node is observing the parent block B0 at some time `x0`.
// - The controller determines the duration `d` of how much later the child block B1
// should be observed by the committee.
// - The controller internally memorizes the latest B0 it has seen and outputs
// the tuple `(B0, x0, d)`
//
// This low-level controller output `(B0, x0, d)` is wrapped into a `ProposalTiming`
// interface, specifically `happyPathBlockTime` on the happy path. The purpose of the
// `ProposalTiming` wrapper is to translate the raw controller output into a form
// that is useful for the event handler. Edge cases, such as initialization or
// EECC are implemented by other implementations of `ProposalTiming`.
type BlockTimeController struct {
component.Component
protocol.Consumer // consumes protocol state events
config *Config
state protocol.State
log zerolog.Logger
metrics module.CruiseCtlMetrics
epochInfo // scheduled transition view for current/next epoch
epochFallbackTriggered bool
incorporatedBlocks chan TimedBlock // OnBlockIncorporated events, we desire these blocks to be processed in a timely manner and therefore use a small channel capacity
epochSetups chan *flow.Header // EpochSetupPhaseStarted events (block header within setup phase)
epochFallbacks chan struct{} // EpochFallbackTriggered events
proportionalErr Ewma
integralErr LeakyIntegrator
// latestProposalTiming holds the ProposalTiming that the controller generated in response to processing the latest observation
latestProposalTiming *atomic.Pointer[ProposalTiming]
}
var _ hotstuff.ProposalDurationProvider = (*BlockTimeController)(nil)
var _ protocol.Consumer = (*BlockTimeController)(nil)
var _ component.Component = (*BlockTimeController)(nil)
// NewBlockTimeController returns a new BlockTimeController.
func NewBlockTimeController(log zerolog.Logger, metrics module.CruiseCtlMetrics, config *Config, state protocol.State, curView uint64) (*BlockTimeController, error) {
// Initial error must be 0 unless we are making assumptions of the prior history of the proportional error `e[v]`
initProptlErr, initItgErr, initDrivErr := .0, .0, .0
proportionalErr, err := NewEwma(config.alpha(), initProptlErr)
if err != nil {
return nil, fmt.Errorf("failed to initialize EWMA for computing the proportional error: %w", err)
}
integralErr, err := NewLeakyIntegrator(config.beta(), initItgErr)
if err != nil {
return nil, fmt.Errorf("failed to initialize LeakyIntegrator for computing the integral error: %w", err)
}
ctl := &BlockTimeController{
Consumer: events.NewNoop(),
config: config,
log: log.With().Str("hotstuff", "cruise_ctl").Logger(),
metrics: metrics,
state: state,
incorporatedBlocks: make(chan TimedBlock, 3),
epochSetups: make(chan *flow.Header, 5),
epochFallbacks: make(chan struct{}, 5),
proportionalErr: proportionalErr,
integralErr: integralErr,
latestProposalTiming: atomic.NewPointer[ProposalTiming](nil), // set in initProposalTiming
}
ctl.Component = component.NewComponentManagerBuilder().
AddWorker(ctl.processEventsWorkerLogic).
Build()
// initialize state
err = ctl.initEpochInfo(curView)
if err != nil {
return nil, fmt.Errorf("could not initialize epoch info: %w", err)
}
ctl.initProposalTiming(curView)
ctl.log.Debug().
Uint64("view", curView).
Msg("initialized BlockTimeController")
ctl.metrics.PIDError(initProptlErr, initItgErr, initDrivErr)
ctl.metrics.ControllerOutput(0)
ctl.metrics.TargetProposalDuration(0)
return ctl, nil
}
// initEpochInfo initializes the epochInfo state upon component startup.
// No errors are expected during normal operation.
func (ctl *BlockTimeController) initEpochInfo(curView uint64) error {
finalSnapshot := ctl.state.Final()
curEpoch := finalSnapshot.Epochs().Current()
curEpochFirstView, err := curEpoch.FirstView()
if err != nil {
return fmt.Errorf("could not initialize current epoch first view: %w", err)
}
ctl.curEpochFirstView = curEpochFirstView
curEpochFinalView, err := curEpoch.FinalView()
if err != nil {
return fmt.Errorf("could not initialize current epoch final view: %w", err)
}
ctl.curEpochFinalView = curEpochFinalView
phase, err := finalSnapshot.Phase()
if err != nil {
return fmt.Errorf("could not check snapshot phase: %w", err)
}
if phase > flow.EpochPhaseStaking {
nextEpochFinalView, err := finalSnapshot.Epochs().Next().FinalView()
if err != nil {
return fmt.Errorf("could not initialize next epoch final view: %w", err)
}
ctl.epochInfo.nextEpochFinalView = &nextEpochFinalView
}
ctl.curEpochTargetEndTime = ctl.config.TargetTransition.inferTargetEndTime(time.Now().UTC(), ctl.epochInfo.fractionComplete(curView))
epochFallbackTriggered, err := ctl.state.Params().EpochFallbackTriggered()
if err != nil {
return fmt.Errorf("could not check epoch fallback: %w", err)
}
ctl.epochFallbackTriggered = epochFallbackTriggered
return nil
}
// initProposalTiming initializes the ProposalTiming value upon startup.
// CAUTION: Must be called after initEpochInfo.
func (ctl *BlockTimeController) initProposalTiming(curView uint64) {
// When disabled, or in epoch fallback, use fallback timing (constant ProposalDuration)
if ctl.epochFallbackTriggered || !ctl.config.Enabled.Load() {
ctl.storeProposalTiming(newFallbackTiming(curView, time.Now().UTC(), ctl.config.FallbackProposalDelay.Load()))
return
}
// Otherwise, before we observe any view changes, publish blocks immediately
ctl.storeProposalTiming(newPublishImmediately(curView, time.Now().UTC()))
}
// storeProposalTiming stores the latest ProposalTiming
// Concurrency safe.
func (ctl *BlockTimeController) storeProposalTiming(proposalTiming ProposalTiming) {
ctl.latestProposalTiming.Store(&proposalTiming)
}
// GetProposalTiming returns the controller's latest ProposalTiming. Concurrency safe.
func (ctl *BlockTimeController) GetProposalTiming() ProposalTiming {
pt := ctl.latestProposalTiming.Load()
if pt == nil { // should never happen, as we always store non-nil instances of ProposalTiming. Though, this extra check makes `GetProposalTiming` universal.
return nil
}
return *pt
}
func (ctl *BlockTimeController) TargetPublicationTime(proposalView uint64, timeViewEntered time.Time, parentBlockId flow.Identifier) time.Time {
return ctl.GetProposalTiming().TargetPublicationTime(proposalView, timeViewEntered, parentBlockId)
}
// processEventsWorkerLogic is the logic for processing events received from other components.
// This method should be executed by a dedicated worker routine (not concurrency safe).
func (ctl *BlockTimeController) processEventsWorkerLogic(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
done := ctx.Done()
for {
// Priority 1: EpochSetup
select {
case block := <-ctl.epochSetups:
snapshot := ctl.state.AtHeight(block.Height)
err := ctl.processEpochSetupPhaseStarted(snapshot)
if err != nil {
ctl.log.Err(err).Msgf("fatal error handling EpochSetupPhaseStarted event")
ctx.Throw(err)
return
}
default:
}
// Priority 2: EpochFallbackTriggered
select {
case <-ctl.epochFallbacks:
err := ctl.processEpochFallbackTriggered()
if err != nil {
ctl.log.Err(err).Msgf("fatal error processing epoch EECC event")
ctx.Throw(err)
}
default:
}
// Priority 3: OnBlockIncorporated
select {
case <-done:
return
case block := <-ctl.incorporatedBlocks:
err := ctl.processIncorporatedBlock(block)
if err != nil {
ctl.log.Err(err).Msgf("fatal error handling OnBlockIncorporated event")
ctx.Throw(err)
return
}
case block := <-ctl.epochSetups:
snapshot := ctl.state.AtHeight(block.Height)
err := ctl.processEpochSetupPhaseStarted(snapshot)
if err != nil {
ctl.log.Err(err).Msgf("fatal error handling EpochSetupPhaseStarted event")
ctx.Throw(err)
return
}
case <-ctl.epochFallbacks:
err := ctl.processEpochFallbackTriggered()
if err != nil {
ctl.log.Err(err).Msgf("fatal error processing epoch EECC event")
ctx.Throw(err)
return
}
}
}
}
// processIncorporatedBlock processes `OnBlockIncorporated` events from HotStuff.
// Whenever the view changes, we:
// - updates epoch info, if this is the first observed view of a new epoch
// - compute error terms, compensation function output, and new ProposalTiming
// - compute a new projected epoch end time, assuming an ideal view rate
//
// No errors are expected during normal operation.
func (ctl *BlockTimeController) processIncorporatedBlock(tb TimedBlock) error {
// if epoch fallback is triggered, we always use fallbackProposalTiming
if ctl.epochFallbackTriggered {
return nil
}
latest := ctl.GetProposalTiming()
if tb.Block.View <= latest.ObservationView() { // we don't care about older blocks that are incorporated into the protocol state
return nil
}
err := ctl.checkForEpochTransition(tb)
if err != nil {
return fmt.Errorf("could not check for epoch transition: %w", err)
}
err = ctl.measureViewDuration(tb)
if err != nil {
return fmt.Errorf("could not measure view rate: %w", err)
}
return nil
}
// checkForEpochTransition updates the epochInfo to reflect an epoch transition if curView
// being entered causes a transition to the next epoch. Otherwise, this is a no-op.
// No errors are expected during normal operation.
func (ctl *BlockTimeController) checkForEpochTransition(tb TimedBlock) error {
view := tb.Block.View
if view <= ctl.curEpochFinalView { // prevalent case: we are still within the current epoch
return nil
}
// sanity checks, since we are beyond the final view of the most recently processed epoch:
if ctl.nextEpochFinalView == nil { // final view of epoch we are entering should be known
return fmt.Errorf("cannot transition without nextEpochFinalView set")
}
if view > *ctl.nextEpochFinalView { // the block's view should be within the upcoming epoch
return fmt.Errorf("sanity check failed: curView %d is beyond both current epoch (final view %d) and next epoch (final view %d)",
view, ctl.curEpochFinalView, *ctl.nextEpochFinalView)
}
ctl.curEpochFirstView = ctl.curEpochFinalView + 1
ctl.curEpochFinalView = *ctl.nextEpochFinalView
ctl.nextEpochFinalView = nil
ctl.curEpochTargetEndTime = ctl.config.TargetTransition.inferTargetEndTime(tb.Block.Timestamp, ctl.epochInfo.fractionComplete(view))
return nil
}
// measureViewDuration computes a new measurement of projected epoch switchover time and error for the newly entered view.
// It updates the latest ProposalTiming based on the new error.
// No errors are expected during normal operation.
func (ctl *BlockTimeController) measureViewDuration(tb TimedBlock) error {
view := tb.Block.View
// if the controller is disabled, we don't update measurements and instead use a fallback timing
if !ctl.config.Enabled.Load() {
fallbackDelay := ctl.config.FallbackProposalDelay.Load()
ctl.storeProposalTiming(newFallbackTiming(view, tb.TimeObserved, fallbackDelay))
ctl.log.Debug().
Uint64("cur_view", view).
Dur("fallback_proposal_delay", fallbackDelay).
Msg("controller is disabled - using fallback timing")
return nil
}
previousProposalTiming := ctl.GetProposalTiming()
previousPropErr := ctl.proportionalErr.Value()
// Compute the projected time still needed for the remaining views, assuming that we progress through the remaining views with
// the idealized target view time.
// Note the '+1' term in the computation of `viewDurationsRemaining`. This is related to our convention that the epoch begins
// (happy path) when observing the first block of the epoch. Only by observing this block, the nodes transition to the first
// view of the epoch. Up to that point, the consensus replicas remain in the last view of the previous epoch, in the state of
// "having processed the last block of the old epoch and voted for it" (happy path). Replicas remain in this state until they
// see a confirmation of the view (either QC or TC for the last view of the previous epoch).
// In accordance with this convention, observing the proposal for the last view of an epoch, marks the start of the last view.
// By observing the proposal, nodes enter the last view, verify the block, vote for it, the primary aggregates the votes,
// constructs the child (for first view of new epoch). The last view of the epoch ends, when the child proposal is published.
tau := ctl.targetViewTime() // τ - idealized target view time in units of seconds
viewDurationsRemaining := ctl.curEpochFinalView + 1 - view // k[v] - views remaining in current epoch
durationRemaining := ctl.curEpochTargetEndTime.Sub(tb.TimeObserved)
// Compute instantaneous error term: e[v] = k[v]·τ - T[v] i.e. the projected difference from target switchover
// and update PID controller's error terms. All UNITS in SECOND.
instErr := float64(viewDurationsRemaining)*tau - durationRemaining.Seconds()
propErr := ctl.proportionalErr.AddObservation(instErr)
itgErr := ctl.integralErr.AddObservation(instErr)
drivErr := propErr - previousPropErr
// controller output u[v] in units of second
u := propErr*ctl.config.KP + itgErr*ctl.config.KI + drivErr*ctl.config.KD
// compute the controller output for this observation
unconstrainedBlockTime := time.Duration((tau - u) * float64(time.Second)) // desired time between parent and child block, in units of seconds
proposalTiming := newHappyPathBlockTime(tb, unconstrainedBlockTime, ctl.config.TimingConfig)
constrainedBlockTime := proposalTiming.ConstrainedBlockTime()
ctl.log.Debug().
Uint64("last_observation", previousProposalTiming.ObservationView()).
Dur("duration_since_last_observation", tb.TimeObserved.Sub(previousProposalTiming.ObservationTime())).
Dur("projected_time_remaining", durationRemaining).
Uint64("view_durations_remaining", viewDurationsRemaining).
Float64("inst_err", instErr).
Float64("proportional_err", propErr).
Float64("integral_err", itgErr).
Float64("derivative_err", drivErr).
Dur("controller_output", time.Duration(u*float64(time.Second))).
Dur("unconstrained_block_time", unconstrainedBlockTime).
Dur("constrained_block_time", constrainedBlockTime).
Msg("measured error upon view change")
ctl.metrics.PIDError(propErr, itgErr, drivErr)
ctl.metrics.ControllerOutput(time.Duration(u * float64(time.Second)))
ctl.metrics.TargetProposalDuration(proposalTiming.ConstrainedBlockTime())
ctl.storeProposalTiming(proposalTiming)
return nil
}
// processEpochSetupPhaseStarted processes EpochSetupPhaseStarted events from the protocol state.
// Whenever we enter the EpochSetup phase, we:
// - store the next epoch's final view
//
// No errors are expected during normal operation.
func (ctl *BlockTimeController) processEpochSetupPhaseStarted(snapshot protocol.Snapshot) error {
if ctl.epochFallbackTriggered {
return nil
}
nextEpoch := snapshot.Epochs().Next()
finalView, err := nextEpoch.FinalView()
if err != nil {
return fmt.Errorf("could not get next epochInfo final view: %w", err)
}
ctl.epochInfo.nextEpochFinalView = &finalView
return nil
}
// processEpochFallbackTriggered processes EpochFallbackTriggered events from the protocol state.
// When epoch fallback mode is triggered, we:
// - set ProposalTiming to the default value
// - set epoch fallback triggered, to disable the controller
//
// No errors are expected during normal operation.
func (ctl *BlockTimeController) processEpochFallbackTriggered() error {
ctl.epochFallbackTriggered = true
latestFinalized, err := ctl.state.Final().Head()
if err != nil {
return fmt.Errorf("failed to retrieve latest finalized block from protocol state %w", err)
}
ctl.storeProposalTiming(newFallbackTiming(latestFinalized.View, time.Now().UTC(), ctl.config.FallbackProposalDelay.Load()))
return nil
}
// OnBlockIncorporated listens to notification from HotStuff about incorporating new blocks.
// The event is queued for async processing by the worker. If the channel is full,
// the event is discarded - since we are taking an average it doesn't matter if we
// occasionally miss a sample.
func (ctl *BlockTimeController) OnBlockIncorporated(block *model.Block) {
select {
case ctl.incorporatedBlocks <- TimedBlock{Block: block, TimeObserved: time.Now().UTC()}:
default:
}
}
// EpochSetupPhaseStarted responds to the EpochSetup phase starting for the current epoch.
// The event is queued for async processing by the worker.
func (ctl *BlockTimeController) EpochSetupPhaseStarted(_ uint64, first *flow.Header) {
ctl.epochSetups <- first
}
// EpochEmergencyFallbackTriggered responds to epoch fallback mode being triggered.
func (ctl *BlockTimeController) EpochEmergencyFallbackTriggered() {
ctl.epochFallbacks <- struct{}{}
}