Skip to content

Commit 4a67fd5

Browse files
authored
[all] rename & refactor (#419)
1 parent e735489 commit 4a67fd5

File tree

9 files changed

+27
-26
lines changed

9 files changed

+27
-26
lines changed

cmd/coordinator/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ func main() {
3030
log.Fatal(errors.Wrap(err, "failed to new postgres persistence"))
3131
}
3232

33-
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress,
33+
projectManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress,
3434
conf.ProjectCacheDirectory, conf.IPFSEndpoint, conf.ProjectFileDirectory)
3535
if err != nil {
3636
log.Fatal(errors.Wrap(err, "failed to new project config manager"))
3737
}
3838

39-
if err := task.RunDispatcher(persistence, datasource.NewPostgres, projectConfigManager.GetAllCacheProjectIDs, projectConfigManager.Get, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectFileDirectory, conf.IoTeXChainID); err != nil {
39+
if err := task.RunDispatcher(persistence, datasource.NewPostgres, projectManager.GetCachedProjectIDs, projectManager.Get, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectFileDirectory, conf.IoTeXChainID); err != nil {
4040
log.Fatal(errors.Wrap(err, "failed to run dispatcher"))
4141
}
4242

cmd/prover/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func main() {
4040
},
4141
)
4242

43-
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress,
43+
projectManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress,
4444
conf.ProjectCacheDirectory, conf.IPFSEndpoint, conf.ProjectFileDirectory)
4545
if err != nil {
4646
log.Fatal(err)
@@ -59,7 +59,7 @@ func main() {
5959
log.Fatal(errors.Wrap(err, "failed to decode sequencer pubkey"))
6060
}
6161

62-
taskProcessor := task.NewProcessor(vmHandler, projectConfigManager, sk, sequencerPubKey, proverID)
62+
taskProcessor := task.NewProcessor(vmHandler, projectManager.Get, sk, sequencerPubKey, proverID)
6363

6464
pubSubs, err := p2p.NewPubSubs(taskProcessor.HandleP2PData, conf.BootNodeMultiAddr, conf.IoTeXChainID)
6565
if err != nil {
@@ -68,7 +68,7 @@ func main() {
6868

6969
if err := scheduler.Run(conf.SchedulerEpoch, conf.ChainEndpoint, conf.ProverContractAddress,
7070
conf.ProjectContractAddress, conf.ProjectFileDirectory, proverID, pubSubs, taskProcessor.HandleProjectProvers,
71-
projectConfigManager.GetAllCacheProjectIDs); err != nil {
71+
projectManager.GetCachedProjectIDs); err != nil {
7272
log.Fatal(err)
7373
}
7474

cmd/tests/init.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func runProver(conf *proverconfig.Config) {
115115
},
116116
)
117117

118-
projectConfigManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint, conf.ProjectFileDirectory)
118+
projectManager, err := project.NewManager(conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectCacheDirectory, conf.IPFSEndpoint, conf.ProjectFileDirectory)
119119
if err != nil {
120120
log.Fatal(err)
121121
}
@@ -131,15 +131,15 @@ func runProver(conf *proverconfig.Config) {
131131
log.Fatal(errors.Wrap(err, "failed to decode sequencer pubkey"))
132132
}
133133

134-
taskProcessor := task.NewProcessor(vmHandler, projectConfigManager, sk, sequencerPubKey, proverID)
134+
taskProcessor := task.NewProcessor(vmHandler, projectManager.Get, sk, sequencerPubKey, proverID)
135135

136136
pubSubs, err := p2p.NewPubSubs(taskProcessor.HandleP2PData, conf.BootNodeMultiAddr, conf.IoTeXChainID)
137137
if err != nil {
138138
log.Fatal(err)
139139
}
140140

141141
if err := scheduler.Run(conf.SchedulerEpoch, conf.ChainEndpoint, conf.ProverContractAddress, conf.ProjectContractAddress,
142-
conf.ProjectFileDirectory, proverID, pubSubs, taskProcessor.HandleProjectProvers, projectConfigManager.GetAllCacheProjectIDs); err != nil {
142+
conf.ProjectFileDirectory, proverID, pubSubs, taskProcessor.HandleProjectProvers, projectManager.GetCachedProjectIDs); err != nil {
143143
log.Fatal(err)
144144
}
145145

@@ -159,7 +159,7 @@ func runCoordinator(conf *coordinatorconfig.Config) {
159159
log.Fatal(err)
160160
}
161161

162-
if err := task.RunDispatcher(pg, datasource.NewPostgres, projectConfigManager.GetAllCacheProjectIDs,
162+
if err := task.RunDispatcher(pg, datasource.NewPostgres, projectConfigManager.GetCachedProjectIDs,
163163
projectConfigManager.Get, conf.BootNodeMultiAddr, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519,
164164
conf.ChainEndpoint, conf.ProjectContractAddress, conf.ProjectFileDirectory, conf.IoTeXChainID); err != nil {
165165
log.Fatal(errors.Wrap(err, "failed to run dispatcher"))

project/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type Manager struct {
2323
cache *cache // optional
2424
}
2525

26-
func (m *Manager) GetAllCacheProjectIDs() []uint64 {
26+
func (m *Manager) GetCachedProjectIDs() []uint64 {
2727
var ids []uint64
2828
m.projects.Range(func(key, value any) bool {
2929
ids = append(ids, key.(uint64))

scheduler/scheduler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
type HandleProjectProvers func(projectID uint64, provers []string)
2020

21-
type GetCacheProjectIDs func() []uint64
21+
type GetCachedProjectIDs func() []uint64
2222

2323
type scheduler struct {
2424
provers *sync.Map // proverID(string) -> Prover(*contract.Prover)
@@ -101,7 +101,7 @@ func watchChainHead(head chan<- uint64, chainEndpoint string) error {
101101
}
102102

103103
func Run(epoch uint64, chainEndpoint, proverContractAddress, projectContractAddress, projectFileDirectory, proverID string,
104-
pubSubs *p2p.PubSubs, handleProjectProvers HandleProjectProvers, getProjectIDs GetCacheProjectIDs) error {
104+
pubSubs *p2p.PubSubs, handleProjectProvers HandleProjectProvers, getProjectIDs GetCachedProjectIDs) error {
105105

106106
if projectFileDirectory != "" {
107107
dummySchedule(proverID, pubSubs, handleProjectProvers, getProjectIDs)
@@ -165,7 +165,7 @@ func Run(epoch uint64, chainEndpoint, proverContractAddress, projectContractAddr
165165
return nil
166166
}
167167

168-
func dummySchedule(proverID string, pubSubs *p2p.PubSubs, handleProjectProvers HandleProjectProvers, getProjectIDs GetCacheProjectIDs) {
168+
func dummySchedule(proverID string, pubSubs *p2p.PubSubs, handleProjectProvers HandleProjectProvers, getProjectIDs GetCachedProjectIDs) {
169169
s := &scheduler{
170170
pubSubs: pubSubs,
171171
handleProjectProvers: handleProjectProvers,

task/dispatcher.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ type Datasource interface {
2828
Retrieve(projectID, nextTaskID uint64) (*types.Task, error)
2929
}
3030

31-
type ProjectManager interface {
32-
Get(projectID uint64) (*project.Project, error)
33-
}
31+
type GetCachedProjectIDs func() []uint64
3432

3533
type dispatcher struct {
3634
projectDispatchers *sync.Map // projectID(uint64) -> *ProjectDispatcher
@@ -51,7 +49,7 @@ func (d *dispatcher) handleP2PData(data *p2p.Data, topic *pubsub.Topic) {
5149
}
5250

5351
func RunDispatcher(persistence Persistence, newDatasource internaldispatcher.NewDatasource,
54-
getProjectIDs handler.GetCacheProjectIDs, getProject handler.GetProject,
52+
getProjectIDs GetCachedProjectIDs, getProject handler.GetProject,
5553
bootNodeMultiaddr, operatorPrivateKey, operatorPrivateKeyED25519, chainEndpoint, projectContractAddress, projectFileDirectory string,
5654
iotexChainID int) error {
5755
projectDispatchers := &sync.Map{}
@@ -79,7 +77,7 @@ func RunDispatcher(persistence Persistence, newDatasource internaldispatcher.New
7977
}
8078

8179
func dummyDispatch(persistence Persistence, newDatasource internaldispatcher.NewDatasource,
82-
getProjectIDs handler.GetCacheProjectIDs, getProject handler.GetProject,
80+
getProjectIDs GetCachedProjectIDs, getProject handler.GetProject,
8381
projectDispatchers *sync.Map, ps *p2p.PubSubs, handler *handler.TaskStateHandler) error {
8482
projectIDs := getProjectIDs()
8583
for _, id := range projectIDs {

task/internal/handler/task_state_handler.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ type SaveTaskStateLog func(s *types.TaskStateLog, t *types.Task) error
1313

1414
type GetProject func(projectID uint64) (*project.Project, error)
1515

16-
type GetCacheProjectIDs func() []uint64
17-
1816
type TaskStateHandler struct {
1917
saveTaskStateLog SaveTaskStateLog
2018
getProject GetProject

task/processor.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/pkg/errors"
1717

1818
"github.com/machinefi/sprout/p2p"
19+
"github.com/machinefi/sprout/project"
1920
"github.com/machinefi/sprout/types"
2021
"github.com/machinefi/sprout/utils/distance"
2122
"github.com/machinefi/sprout/vm"
@@ -25,9 +26,11 @@ type VMHandler interface {
2526
Handle(task *types.Task, vmtype vm.Type, code string, expParam string) ([]byte, error)
2627
}
2728

29+
type GetProject func(projectID uint64) (*project.Project, error)
30+
2831
type Processor struct {
2932
vmHandler VMHandler
30-
projectManager ProjectManager
33+
getProject GetProject
3134
proverPrivateKey *ecdsa.PrivateKey
3235
sequencerPubKey []byte
3336
proverID string
@@ -44,7 +47,7 @@ func (r *Processor) HandleP2PData(d *p2p.Data, topic *pubsub.Topic) {
4447
}
4548
t := d.Task
4649

47-
p, err := r.projectManager.Get(t.ProjectID)
50+
p, err := r.getProject(t.ProjectID)
4851
if err != nil {
4952
slog.Error("failed to get project", "error", err, "project_id", t.ProjectID)
5053
r.reportFail(t, err, topic)
@@ -161,10 +164,10 @@ func (r *Processor) reportSuccess(t *types.Task, state types.TaskState, result [
161164
}
162165
}
163166

164-
func NewProcessor(vmHandler VMHandler, projectManager ProjectManager, proverPrivateKey *ecdsa.PrivateKey, seqPubkey []byte, proverID string) *Processor {
167+
func NewProcessor(vmHandler VMHandler, getProject GetProject, proverPrivateKey *ecdsa.PrivateKey, seqPubkey []byte, proverID string) *Processor {
165168
return &Processor{
166169
vmHandler: vmHandler,
167-
projectManager: projectManager,
170+
getProject: getProject,
168171
proverPrivateKey: proverPrivateKey,
169172
sequencerPubKey: seqPubkey,
170173
proverID: proverID,

task/processor_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ func TestProcessor_ReportSuccess(t *testing.T) {
5959

6060
func TestProcessor_HandleP2PData(t *testing.T) {
6161
r := require.New(t)
62+
63+
m := &project.Manager{}
6264
processor := &Processor{
63-
vmHandler: &vm.Handler{},
64-
projectManager: &project.Manager{},
65+
vmHandler: &vm.Handler{},
66+
getProject: m.Get,
6567
}
6668

6769
t.Run("TaskNil", func(t *testing.T) {

0 commit comments

Comments
 (0)