Skip to content

Commit cf0cf32

Browse files
authored
Merge pull request #1191 from onflow/smnzhu/staked-sync-provider
add sync provider to staked AN
2 parents 17a2363 + 4a745ac commit cf0cf32

28 files changed

+311
-122
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -601,12 +601,11 @@ func (builder *FlowAccessNodeBuilder) initMiddleware(nodeID flow.Identifier,
601601
func (builder *FlowAccessNodeBuilder) initNetwork(nodeID module.Local,
602602
networkMetrics module.NetworkMetrics,
603603
middleware network.Middleware,
604-
topology network.Topology) (*p2p.Network, error) {
604+
topology network.Topology,
605+
) (*p2p.Network, error) {
605606

606607
codec := cborcodec.NewCodec()
607608

608-
subscriptionManager := p2p.NewChannelSubscriptionManager(middleware)
609-
610609
// creates network instance
611610
net, err := p2p.NewNetwork(
612611
builder.Logger,
@@ -615,7 +614,7 @@ func (builder *FlowAccessNodeBuilder) initNetwork(nodeID module.Local,
615614
builder.Middleware,
616615
p2p.DefaultCacheSize,
617616
topology,
618-
subscriptionManager,
617+
p2p.NewChannelSubscriptionManager(middleware),
619618
networkMetrics,
620619
builder.IdentityProvider,
621620
)

cmd/access/node_builder/staked_access_node_builder.go

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ import (
88

99
"github.com/onflow/flow-go/cmd"
1010
"github.com/onflow/flow-go/crypto"
11+
"github.com/onflow/flow-go/engine"
1112
pingeng "github.com/onflow/flow-go/engine/access/ping"
13+
"github.com/onflow/flow-go/engine/common/splitter"
14+
synceng "github.com/onflow/flow-go/engine/common/synchronization"
1215
"github.com/onflow/flow-go/model/flow"
1316
"github.com/onflow/flow-go/model/flow/filter"
1417
"github.com/onflow/flow-go/module"
1518
"github.com/onflow/flow-go/module/id"
19+
"github.com/onflow/flow-go/module/metrics/unstaked"
20+
"github.com/onflow/flow-go/network"
1621
"github.com/onflow/flow-go/network/p2p"
1722
"github.com/onflow/flow-go/network/topology"
1823
"github.com/onflow/flow-go/state/protocol/events/gadgets"
@@ -86,23 +91,61 @@ func (builder *StakedAccessNodeBuilder) Initialize() cmd.NodeBuilder {
8691
}
8792

8893
func (anb *StakedAccessNodeBuilder) Build() AccessNodeBuilder {
89-
anb.FlowAccessNodeBuilder.
90-
Build().
91-
Component("ping engine", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
92-
ping, err := pingeng.New(
93-
node.Logger,
94-
node.State,
95-
node.Me,
96-
anb.PingMetrics,
97-
anb.pingEnabled,
98-
node.Middleware,
99-
anb.nodeInfoFile,
100-
)
101-
if err != nil {
102-
return nil, fmt.Errorf("could not create ping engine: %w", err)
103-
}
104-
return ping, nil
105-
})
94+
anb.FlowAccessNodeBuilder.Build()
95+
96+
if anb.supportsUnstakedFollower {
97+
var unstakedNetworkConduit network.Conduit
98+
var proxyEngine *splitter.Engine
99+
100+
anb.
101+
Component("unstaked sync request proxy", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
102+
proxyEngine = splitter.New(node.Logger, engine.UnstakedSyncCommittee)
103+
104+
// register the proxy engine with the unstaked network
105+
var err error
106+
unstakedNetworkConduit, err = node.Network.Register(engine.UnstakedSyncCommittee, proxyEngine)
107+
if err != nil {
108+
return nil, fmt.Errorf("could not register unstaked sync request proxy: %w", err)
109+
}
110+
111+
return proxyEngine, nil
112+
}).
113+
Component("unstaked sync request handler", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
114+
syncRequestHandler := synceng.NewRequestHandlerEngine(
115+
node.Logger.With().Bool("unstaked", true).Logger(),
116+
unstaked.NewUnstakedEngineCollector(node.Metrics.Engine),
117+
unstakedNetworkConduit,
118+
node.Me,
119+
node.Storage.Blocks,
120+
anb.SyncCore,
121+
anb.FinalizedHeader,
122+
// don't queue missing heights from unstaked nodes
123+
// since we are always more up-to-date than them
124+
false,
125+
)
126+
127+
// register the sync request handler with the proxy engine
128+
proxyEngine.RegisterEngine(syncRequestHandler)
129+
130+
return syncRequestHandler, nil
131+
})
132+
}
133+
134+
anb.Component("ping engine", func(builder cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
135+
ping, err := pingeng.New(
136+
node.Logger,
137+
node.State,
138+
node.Me,
139+
anb.PingMetrics,
140+
anb.pingEnabled,
141+
node.Middleware,
142+
anb.nodeInfoFile,
143+
)
144+
if err != nil {
145+
return nil, fmt.Errorf("could not create ping engine: %w", err)
146+
}
147+
return ping, nil
148+
})
106149

107150
return anb
108151
}
@@ -156,7 +199,7 @@ func (builder *StakedAccessNodeBuilder) initLibP2PFactory(ctx context.Context,
156199
myAddr = builder.BaseConfig.BindAddr
157200
}
158201

159-
connManager := p2p.NewConnManager(builder.Logger, builder.Metrics.Network)
202+
connManager := p2p.NewConnManager(builder.Logger, builder.Metrics.Network, p2p.TrackUnstakedConnections(builder.IdentityProvider))
160203

161204
return func() (*p2p.Node, error) {
162205
libp2pNode, err := p2p.NewDefaultLibP2PNodeBuilder(nodeID, myAddr, networkKey).

cmd/access/node_builder/unstaked_access_node_builder.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88

99
"github.com/onflow/flow-go/cmd"
1010
"github.com/onflow/flow-go/crypto"
11+
"github.com/onflow/flow-go/engine"
1112
"github.com/onflow/flow-go/model/flow"
1213
"github.com/onflow/flow-go/module"
1314
"github.com/onflow/flow-go/module/id"
1415
"github.com/onflow/flow-go/module/local"
1516
"github.com/onflow/flow-go/module/metrics"
17+
"github.com/onflow/flow-go/network/converter"
1618
"github.com/onflow/flow-go/network/p2p"
1719
"github.com/onflow/flow-go/state/protocol/events/gadgets"
1820
)
@@ -225,7 +227,7 @@ func (anb *UnstakedAccessNodeBuilder) enqueueUnstakedNetworkInit(ctx context.Con
225227
network, err := anb.initNetwork(anb.Me, unstakedNetworkMetrics, anb.Middleware, nil)
226228
anb.MustNot(err)
227229

228-
anb.Network = network
230+
anb.Network = converter.NewNetwork(network, engine.SyncCommittee, engine.UnstakedSyncCommittee)
229231

230232
anb.Logger.Info().Msgf("network will run on address: %s", anb.BindAddr)
231233

cmd/node_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ type NodeConfig struct {
132132
ProtocolEvents *events.Distributor
133133
State protocol.State
134134
Middleware network.Middleware
135-
Network p2p.ReadyDoneAwareNetwork
135+
Network module.ReadyDoneAwareNetwork
136136
MsgValidators []network.MessageValidator
137137
FvmOptions []fvm.Option
138138
StakingKey crypto.PrivateKey

cmd/scaffold.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
164164
fnb.RootBlock.ID().String(),
165165
p2p.DefaultMaxPubSubMsgSize,
166166
fnb.Metrics.Network,
167-
pingProvider)
167+
pingProvider,
168+
)
168169
if err != nil {
169170
return nil, fmt.Errorf("could not generate libp2p node factory: %w", err)
170171
}

engine/channels.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ const (
133133
ProvideChunks = RequestChunks
134134
ProvideReceiptsByBlockID = RequestReceiptsByBlockID
135135
ProvideApprovalsByChunk = RequestApprovalsByChunk
136+
137+
// Unstaked network channels
138+
UnstakedSyncCommittee = network.Channel("unstaked-sync-committee")
136139
)
137140

138141
// initializeChannelRoleMap initializes an instance of channelRoleMap and populates it with the channels and their

engine/common/splitter/engine.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package splitter
22

33
import (
4-
"errors"
54
"fmt"
65
"sync"
76

@@ -40,17 +39,11 @@ func New(
4039
// RegisterEngine registers a new engine with the splitter. Events
4140
// that are received by the splitter after the engine has registered
4241
// will be passed down to it.
43-
func (e *Engine) RegisterEngine(engine module.Engine) error {
42+
func (e *Engine) RegisterEngine(engine module.Engine) {
4443
e.enginesMu.Lock()
4544
defer e.enginesMu.Unlock()
4645

47-
if _, ok := e.engines[engine]; ok {
48-
return errors.New("engine already registered with splitter")
49-
}
50-
5146
e.engines[engine] = struct{}{}
52-
53-
return nil
5447
}
5548

5649
// UnregisterEngine unregisters an engine with the splitter. After

engine/common/splitter/engine_test.go

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,8 @@ func (suite *Suite) TestDownstreamEngineFailure() {
4848
engine1 := new(mockmodule.Engine)
4949
engine2 := new(mockmodule.Engine)
5050

51-
err := suite.engine.RegisterEngine(engine1)
52-
suite.Assert().Nil(err)
53-
err = suite.engine.RegisterEngine(engine2)
54-
suite.Assert().Nil(err)
51+
suite.engine.RegisterEngine(engine1)
52+
suite.engine.RegisterEngine(engine2)
5553

5654
processError := errors.New("Process Error!")
5755

@@ -60,7 +58,7 @@ func (suite *Suite) TestDownstreamEngineFailure() {
6058
engine1.On("Process", suite.channel, id, event).Return(processError).Once()
6159
engine2.On("Process", suite.channel, id, event).Return(nil).Once()
6260

63-
err = suite.engine.Process(suite.channel, id, event)
61+
err := suite.engine.Process(suite.channel, id, event)
6462
merr, ok := err.(*multierror.Error)
6563
suite.Assert().True(ok)
6664
suite.Assert().Len(merr.Errors, 1)
@@ -100,26 +98,14 @@ func (suite *Suite) TestProcessUnknownChannel() {
10098

10199
engine := new(mockmodule.Engine)
102100

103-
err := suite.engine.RegisterEngine(engine)
104-
suite.Assert().Nil(err)
101+
suite.engine.RegisterEngine(engine)
105102

106-
err = suite.engine.Process(unknownChannel, id, event)
103+
err := suite.engine.Process(unknownChannel, id, event)
107104
suite.Assert().Error(err)
108105

109106
engine.AssertNumberOfCalls(suite.T(), "Process", 0)
110107
}
111108

112-
// TestDuplicateRegistrations tests that an engine cannot register for the same channel twice.
113-
func (suite *Suite) TestDuplicateRegistrations() {
114-
engine := new(mockmodule.Engine)
115-
116-
err := suite.engine.RegisterEngine(engine)
117-
suite.Assert().Nil(err)
118-
119-
err = suite.engine.RegisterEngine(engine)
120-
suite.Assert().Error(err)
121-
}
122-
123109
// TestConcurrentEvents tests that sending multiple messages concurrently, results in each engine
124110
// receiving every message.
125111
func (suite *Suite) TestConcurrentEvents() {
@@ -131,8 +117,7 @@ func (suite *Suite) TestConcurrentEvents() {
131117

132118
for i := 0; i < numEngines; i++ {
133119
engine := new(mockmodule.Engine)
134-
err := suite.engine.RegisterEngine(engine)
135-
suite.Assert().Nil(err)
120+
suite.engine.RegisterEngine(engine)
136121
engines[i] = engine
137122
}
138123

engine/common/splitter/network/network.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/onflow/flow-go/module"
1212
"github.com/onflow/flow-go/module/lifecycle"
1313
"github.com/onflow/flow-go/network"
14-
"github.com/onflow/flow-go/network/p2p"
1514
)
1615

1716
// Network is the splitter network. It is a wrapper around the default network implementation
@@ -21,7 +20,7 @@ import (
2120
// splitter engine. As a result, multiple engines can register with the splitter network on
2221
// the same channel and will each receive all events on that channel.
2322
type Network struct {
24-
p2p.ReadyDoneAwareNetwork
23+
module.ReadyDoneAwareNetwork
2524
mu sync.RWMutex
2625
log zerolog.Logger
2726
splitters map[network.Channel]*splitterEngine.Engine // stores splitters for each channel
@@ -31,7 +30,7 @@ type Network struct {
3130

3231
// NewNetwork returns a new splitter network.
3332
func NewNetwork(
34-
net p2p.ReadyDoneAwareNetwork,
33+
net module.ReadyDoneAwareNetwork,
3534
log zerolog.Logger,
3635
) *Network {
3736
return &Network{
@@ -76,19 +75,10 @@ func (n *Network) Register(channel network.Channel, e network.Engine) (network.C
7675
}
7776

7877
// register engine with splitter
79-
err := splitter.RegisterEngine(engine)
80-
81-
if err != nil {
82-
// remove the splitter engine if this was the first time the given channel was registered
83-
if !channelRegistered {
84-
delete(n.splitters, channel)
85-
}
86-
87-
return nil, fmt.Errorf("failed to register engine with splitter: %w", err)
88-
}
78+
splitter.RegisterEngine(engine)
8979

9080
if !channelRegistered {
91-
conduit, err = n.ReadyDoneAwareNetwork.Register(channel, splitter)
81+
conduit, err := n.ReadyDoneAwareNetwork.Register(channel, splitter)
9282

9383
if err != nil {
9484
// undo previous steps

engine/common/splitter/network/network_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Suite struct {
3232
}
3333

3434
func (suite *Suite) SetupTest() {
35-
net := new(mocknetwork.ReadyDoneAwareNetwork)
35+
net := new(mockmodule.ReadyDoneAwareNetwork)
3636
suite.con = new(mocknetwork.Conduit)
3737
suite.engines = make(map[network.Channel]module.Engine)
3838

0 commit comments

Comments
 (0)