Skip to content

Commit 0a01e13

Browse files
authored
[datasource] add default datasource uri (#527)
1 parent 8922965 commit 0a01e13

File tree

20 files changed

+39
-43
lines changed

20 files changed

+39
-43
lines changed

cmd/coordinator/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
type Config struct {
1111
ServiceEndpoint string `env:"HTTP_SERVICE_ENDPOINT"`
1212
DatabaseDSN string `env:"DATABASE_DSN"`
13+
DefaultDatasourceURI string `env:"DEFAULT_DATASOURCE_URI"`
1314
BootNodeMultiAddr string `env:"BOOTNODE_MULTIADDR"`
1415
IoTeXChainID int `env:"IOTEX_CHAINID"`
1516
ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"`
@@ -34,6 +35,7 @@ var (
3435
defaultConfig = &Config{
3536
ServiceEndpoint: ":9001",
3637
DatabaseDSN: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
38+
DefaultDatasourceURI: "postgres://test_user:test_passwd@postgres:5432/test?sslmode=disable",
3739
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
3840
IoTeXChainID: 2,
3941
ChainEndpoint: "https://babel-api.testnet.iotex.io",
@@ -51,6 +53,7 @@ var (
5153
defaultDebugConfig = &Config{
5254
ServiceEndpoint: ":9001",
5355
DatabaseDSN: "postgres://test_user:test_passwd@localhost:5432/test?sslmode=disable",
56+
DefaultDatasourceURI: "postgres://test_user:test_passwd@localhost:5432/test?sslmode=disable",
5457
BootNodeMultiAddr: "/dns4/bootnode-0.testnet.iotex.one/tcp/4689/ipfs/12D3KooWFnaTYuLo8Mkbm3wzaWHtUuaxBRe24Uiopu15Wr5EhD3o",
5558
IoTeXChainID: 2,
5659
IPFSEndpoint: "ipfs.mainnet.iotex.io",
@@ -63,6 +66,7 @@ var (
6366
ServiceEndpoint: ":19001",
6467
ChainEndpoint: "https://babel-api.testnet.iotex.io",
6568
DatabaseDSN: "postgres://test_user:test_passwd@localhost:15432/test?sslmode=disable",
69+
DefaultDatasourceURI: "postgres://test_user:test_passwd@localhost:15432/test?sslmode=disable",
6670
BootNodeMultiAddr: "/ip4/bootnode/tcp/18000/p2p/12D3KooWJkfxZL1dx74yM1afWof6ka4uW5jMsoGasCSBwGyCUJML",
6771
IoTeXChainID: 2,
6872
ProjectContractAddress: "", //"0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",

cmd/coordinator/config/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func TestConfig_Init(t *testing.T) {
2222
ServiceEndpoint: ":1999",
2323
ChainEndpoint: "http://iotex.chainendpoint.io",
2424
DatabaseDSN: "postgres://username:password@host:port/database?ext=1",
25+
DefaultDatasourceURI: "postgres://username:password@host:port/database?ext=1",
2526
BootNodeMultiAddr: "/dns4/a.b.com/tcp/1000/ipfs/123123123",
2627
IoTeXChainID: 100,
2728
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
@@ -39,6 +40,7 @@ func TestConfig_Init(t *testing.T) {
3940
_ = os.Setenv("HTTP_SERVICE_ENDPOINT", expected.ServiceEndpoint)
4041
_ = os.Setenv("CHAIN_ENDPOINT", expected.ChainEndpoint)
4142
_ = os.Setenv("DATABASE_DSN", expected.DatabaseDSN)
43+
_ = os.Setenv("DEFAULT_DATASOURCE_URI", expected.DefaultDatasourceURI)
4244
_ = os.Setenv("BOOTNODE_MULTIADDR", expected.BootNodeMultiAddr)
4345
_ = os.Setenv("IOTEX_CHAINID", strconv.Itoa(expected.IoTeXChainID))
4446
_ = os.Setenv("PROJECT_CONTRACT_ADDRESS", expected.ProjectContractAddress)

cmd/coordinator/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ func main() {
7272
datasourcePG := datasource.NewPostgres()
7373
var taskDispatcher *dispatcher.Dispatcher
7474
if local {
75-
taskDispatcher, err = dispatcher.NewLocal(persistence, datasourcePG.New, projectManager,
75+
taskDispatcher, err = dispatcher.NewLocal(persistence, datasourcePG.New, projectManager, conf.DefaultDatasourceURI,
7676
conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.BootNodeMultiAddr, sequencerPubKey, conf.IoTeXChainID)
7777
} else {
7878
projectOffsets := scheduler.NewProjectEpochOffsets(conf.SchedulerEpoch, contractPersistence.LatestProjects, schedulerNotification)
7979

80-
taskDispatcher, err = dispatcher.New(persistence, datasourcePG.New, projectManager, conf.BootNodeMultiAddr,
80+
taskDispatcher, err = dispatcher.New(persistence, datasourcePG.New, projectManager, conf.DefaultDatasourceURI, conf.BootNodeMultiAddr,
8181
conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, sequencerPubKey, conf.IoTeXChainID,
8282
dispatcherNotification, chainHeadNotification, contractPersistence, projectOffsets)
8383
}

cmd/e2etest/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func runCoordinator(conf *coordinatorconfig.Config) {
208208

209209
datasourcePG := datasource.NewPostgres()
210210

211-
taskDispatcher, err := dispatcher.NewLocal(pg, datasourcePG.New, projectManager, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.BootNodeMultiAddr, sequencerPubKey, conf.IoTeXChainID)
211+
taskDispatcher, err := dispatcher.NewLocal(pg, datasourcePG.New, projectManager, conf.DefaultDatasourceURI, conf.OperatorPrivateKey, conf.OperatorPrivateKeyED25519, conf.BootNodeMultiAddr, sequencerPubKey, conf.IoTeXChainID)
212212
if err != nil {
213213
log.Fatal(errors.Wrap(err, "failed to new local dispatcher"))
214214
}

cmd/internal/env_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func TestParseEnv(t *testing.T) {
1919
ServiceEndpoint: ":1999",
2020
ChainEndpoint: "http://iotex.chainendpoint.io",
2121
DatabaseDSN: "postgres://username:password@host:port/database?ext=1",
22+
DefaultDatasourceURI: "postgres://username:password@host:port/database?ext=1",
2223
BootNodeMultiAddr: "/dns4/a.b.com/tcp/1000/ipfs/123123123",
2324
IoTeXChainID: 100,
2425
ProjectContractAddress: "0x02feBE78F3A740b3e9a1CaFAA1b23a2ac0793D26",
@@ -33,6 +34,7 @@ func TestParseEnv(t *testing.T) {
3334
_ = os.Setenv("HTTP_SERVICE_ENDPOINT", expected.ServiceEndpoint)
3435
_ = os.Setenv("CHAIN_ENDPOINT", expected.ChainEndpoint)
3536
_ = os.Setenv("DATABASE_DSN", expected.DatabaseDSN)
37+
_ = os.Setenv("DEFAULT_DATASOURCE_URI", expected.DefaultDatasourceURI)
3638
_ = os.Setenv("BOOTNODE_MULTIADDR", expected.BootNodeMultiAddr)
3739
_ = os.Setenv("IOTEX_CHAINID", strconv.Itoa(expected.IoTeXChainID))
3840
_ = os.Setenv("PROJECT_CONTRACT_ADDRESS", expected.ProjectContractAddress)

docker-compose.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ services:
3838
OPERATOR_PRIVATE_KEY: ${PRIVATE_KEY:-}
3939
OPERATOR_PRIVATE_KEY_ED25519: ${PRIVATE_KEY_ED25519:-}
4040
volumes:
41-
- ./test/container_model:/data
41+
- ./test/project:/data
4242

4343
prover:
4444
image: ghcr.io/machinefi/prover:v0.10.1
@@ -57,7 +57,7 @@ services:
5757
PROJECT_FILE_DIRECTORY: "/data"
5858
BOOTNODE_MULTIADDR: "/ip4/bootnode/tcp/8000/p2p/12D3KooWJkfxZL1dx74yM1afWof6ka4uW5jMsoGasCSBwGyCUJML"
5959
volumes:
60-
- ./test/container_model:/data
60+
- ./test/project:/data
6161

6262
halo2:
6363
image: wangweixiaohao2944/halo2server:v0.0.6

project/project.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var (
2323
)
2424

2525
type Project struct {
26-
DatasourceURI string `json:"datasourceURI"`
26+
DatasourceURI string `json:"datasourceURI,omitempty"`
2727
DefaultVersion string `json:"defaultVersion"`
2828
Versions []*Config `json:"versions"`
2929
}

scheduler/scheduler_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,13 @@ func TestScheduler_schedule(t *testing.T) {
126126

127127
paused := false
128128
pm := &contract.Contract{}
129-
pes := &ProjectEpochOffsets{}
129+
pes := &ProjectEpochOffsets{epoch: 1}
130130
p.ApplyMethodReturn(pes, "Projects", []*ScheduledProject{{1, 0}})
131131
p.ApplyMethodReturn(pm, "Provers", []*contract.Prover{{ID: 1, Paused: &paused}})
132132
p.ApplyMethodReturn(pm, "Project", &contract.Project{
133133
Attributes: map[common.Hash][]byte{contract.RequiredProverAmountHash: []byte("10")},
134134
})
135+
p.ApplyMethodReturn(&p2p.PubSubs{}, "Delete")
135136

136137
chainHead := make(chan uint64, 10)
137138
chainHead <- 1
@@ -156,7 +157,7 @@ func TestScheduler_schedule(t *testing.T) {
156157

157158
paused := false
158159
pm := &contract.Contract{}
159-
pes := &ProjectEpochOffsets{}
160+
pes := &ProjectEpochOffsets{epoch: 1}
160161
p.ApplyMethodReturn(pes, "Projects", []*ScheduledProject{{1, 0}})
161162
p.ApplyMethodReturn(pm, "Provers", []*contract.Prover{{ID: 1, Paused: &paused}})
162163
p.ApplyMethodReturn(pm, "Project", &contract.Project{ID: 1})

task/dispatcher/dispatcher.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Dispatcher struct {
4545
persistence Persistence
4646
newDatasource NewDatasource
4747
projectManager ProjectManager
48+
defaultDatasourceURI string
4849
bootNodeMultiaddr string
4950
operatorPrivateKey string
5051
operatorPrivateKeyED25519 string
@@ -98,7 +99,7 @@ func (d *Dispatcher) setWindowSize(head uint64) {
9899
}
99100

100101
func New(persistence Persistence, newDatasource NewDatasource,
101-
projectManager ProjectManager, bootNodeMultiaddr, operatorPrivateKey, operatorPrivateKeyED25519 string,
102+
projectManager ProjectManager, defaultDatasourceURI, bootNodeMultiaddr, operatorPrivateKey, operatorPrivateKeyED25519 string,
102103
sequencerPubKey []byte, iotexChainID int, projectNotification <-chan *contract.Project, chainHeadNotification <-chan uint64,
103104
contract Contract, projectOffsets *scheduler.ProjectEpochOffsets) (*Dispatcher, error) {
104105

@@ -109,6 +110,7 @@ func New(persistence Persistence, newDatasource NewDatasource,
109110
persistence: persistence,
110111
newDatasource: newDatasource,
111112
projectManager: projectManager,
113+
defaultDatasourceURI: defaultDatasourceURI,
112114
bootNodeMultiaddr: bootNodeMultiaddr,
113115
operatorPrivateKey: operatorPrivateKey,
114116
operatorPrivateKeyED25519: operatorPrivateKeyED25519,
@@ -130,7 +132,7 @@ func New(persistence Persistence, newDatasource NewDatasource,
130132
}
131133

132134
func NewLocal(persistence Persistence, newDatasource NewDatasource,
133-
projectManager ProjectManager, operatorPrivateKey, operatorPrivateKeyED25519, bootNodeMultiaddr string,
135+
projectManager ProjectManager, defaultDatasourceURI, operatorPrivateKey, operatorPrivateKeyED25519, bootNodeMultiaddr string,
134136
sequencerPubKey []byte, iotexChainID int) (*Dispatcher, error) {
135137

136138
projectDispatchers := &sync.Map{}
@@ -161,7 +163,11 @@ func NewLocal(persistence Persistence, newDatasource NewDatasource,
161163
ID: id,
162164
Attributes: map[common.Hash][]byte{},
163165
}
164-
pd, err := newProjectDispatcher(persistence, p.DatasourceURI, newDatasource, cp, ps, taskStateHandler, sequencerPubKey)
166+
uri := p.DatasourceURI
167+
if uri == "" {
168+
uri = defaultDatasourceURI
169+
}
170+
pd, err := newProjectDispatcher(persistence, uri, newDatasource, cp, ps, taskStateHandler, sequencerPubKey)
165171
if err != nil {
166172
return nil, errors.Wrapf(err, "failed to new project dispatcher, project_id %v", id)
167173
}
@@ -195,7 +201,11 @@ func (d *Dispatcher) setProjectDispatcher(p *contract.Project) {
195201
slog.Error("failed to add pubsubs", "project_id", cp.ID, "error", err)
196202
return
197203
}
198-
pd, err := newProjectDispatcher(d.persistence, pf.DatasourceURI, d.newDatasource, cp, d.pubSubs, d.taskStateHandler, d.sequencerPubKey)
204+
uri := pf.DatasourceURI
205+
if uri == "" {
206+
uri = d.defaultDatasourceURI
207+
}
208+
pd, err := newProjectDispatcher(d.persistence, uri, d.newDatasource, cp, d.pubSubs, d.taskStateHandler, d.sequencerPubKey)
199209
if err != nil {
200210
slog.Error("failed to new project dispatcher", "project_id", cp.ID, "error", err)
201211
return

task/dispatcher/dispatcher_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestNew(t *testing.T) {
135135

136136
p.ApplyFuncReturn(p2p.NewPubSubs, nil, errors.New(t.Name()))
137137

138-
_, err := New(&mockPersistence{}, nil, nil, "", "", "", []byte(""), 0, nil, nil, nil, nil)
138+
_, err := New(&mockPersistence{}, nil, nil, "", "", "", "", []byte(""), 0, nil, nil, nil, nil)
139139
r.ErrorContains(err, t.Name())
140140
})
141141
t.Run("Success", func(t *testing.T) {
@@ -145,7 +145,7 @@ func TestNew(t *testing.T) {
145145
p.ApplyFuncReturn(p2p.NewPubSubs, nil, nil)
146146
p.ApplyFuncReturn(newTaskStateHandler, nil)
147147

148-
_, err := New(&mockPersistence{}, nil, nil, "", "", "", []byte(""), 0, nil, nil, nil, nil)
148+
_, err := New(&mockPersistence{}, nil, nil, "", "", "", "", []byte(""), 0, nil, nil, nil, nil)
149149
r.NoError(err)
150150
})
151151
}
@@ -158,7 +158,7 @@ func TestNewLocal(t *testing.T) {
158158

159159
p.ApplyFuncReturn(p2p.NewPubSubs, nil, errors.New(t.Name()))
160160

161-
_, err := NewLocal(&mockPersistence{}, nil, nil, "", "", "", []byte(""), 0)
161+
_, err := NewLocal(&mockPersistence{}, nil, nil, "", "", "", "", []byte(""), 0)
162162
r.ErrorContains(err, t.Name())
163163
})
164164
t.Run("FailedToGetProject", func(t *testing.T) {
@@ -169,7 +169,7 @@ func TestNewLocal(t *testing.T) {
169169
p.ApplyMethodReturn(pm, "Project", nil, errors.New(t.Name()))
170170
p.ApplyFuncReturn(p2p.NewPubSubs, &p2p.PubSubs{}, nil)
171171

172-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", []byte(""), 0)
172+
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
173173
r.ErrorContains(err, t.Name())
174174
})
175175
t.Run("FailedToAddPubSubs", func(t *testing.T) {
@@ -181,7 +181,7 @@ func TestNewLocal(t *testing.T) {
181181
p.ApplyMethodReturn(&p2p.PubSubs{}, "Add", errors.New(t.Name()))
182182
p.ApplyMethodReturn(pm, "Project", nil, nil)
183183

184-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", []byte(""), 0)
184+
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
185185
r.ErrorContains(err, t.Name())
186186
})
187187
t.Run("FailedToNewProjectDispatch", func(t *testing.T) {
@@ -194,7 +194,7 @@ func TestNewLocal(t *testing.T) {
194194
p.ApplyFuncReturn(newProjectDispatcher, nil, errors.New(t.Name()))
195195
p.ApplyMethodReturn(pm, "Project", &project.Project{}, nil)
196196

197-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", []byte(""), 0)
197+
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
198198
r.ErrorContains(err, t.Name())
199199
})
200200
t.Run("Success", func(t *testing.T) {
@@ -208,7 +208,7 @@ func TestNewLocal(t *testing.T) {
208208
p.ApplyMethodReturn(pm, "ProjectIDs", []uint64{0, 0})
209209
p.ApplyMethodReturn(pm, "Project", &project.Project{}, nil)
210210

211-
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", []byte(""), 0)
211+
_, err := NewLocal(&mockPersistence{}, nil, pm, "", "", "", "", []byte(""), 0)
212212
r.NoError(err)
213213
})
214214
}

0 commit comments

Comments
 (0)