Skip to content

Commit 06705c3

Browse files
committed
fix: prevent race by forcing nobori to check ready
1 parent 26c92e0 commit 06705c3

File tree

5 files changed

+42
-11
lines changed

5 files changed

+42
-11
lines changed

gura.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ func main() {
2323
}
2424

2525
db.SetupDB()
26+
nobori.StartFetchLoop()
2627
go kudari.FetchLoop()
27-
go nobori.FetchLoop()
2828

2929
router := api.SetupRouter(router_opts)
3030
var listen_address = ":8080"

nobori/github.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ func ghFillTokQl() *GHToken {
168168
})
169169
var q struct {
170170
RateLimit struct {
171-
Remaining int16 `graphql:"remaining"`
172-
ResetAt string `graphql:"resetAt"`
173-
} `graphql:"rateLimit"`
171+
Remaining int16
172+
ResetAt string
173+
}
174174
}
175175
err := qlcli.Query(context.Background(), &q, nil)
176176
if err != nil {
@@ -190,14 +190,22 @@ func ghFillTokQl() *GHToken {
190190
// Swimming
191191

192192
// the shark shall initialise the funny
193-
func GhSwim() {
193+
func GhSwim() chan struct{} {
194194
go ghSwimRt()
195195
go ghSwimQl()
196+
ready := make(chan struct{}, 1)
197+
go func() {
198+
for len(ghTokRt) == 0 || len(ghTokQl) == 0 {
199+
time.Sleep(20 * time.Millisecond)
200+
}
201+
ready <- struct{}{}
202+
}()
203+
return ready
196204
}
197205

198206
// Process GitHub REST streams using available tokens
199207
func ghSwimRt() {
200-
for token := ghFillTokRt(); token.noMoreFish(); token.thanksForAllTheFish(&ghRtTokIdx, &ghTokRt) {
208+
for token := ghFillTokRt(); ; token.thanksForAllTheFish(&ghRtTokIdx, &ghTokRt) {
201209
token.waitForFish()
202210
for !token.noMoreFish() {
203211
stream := <-ghRtPool
@@ -211,7 +219,7 @@ func ghSwimRt() {
211219

212220
// Process GitHub GraphQL streams using available tokens
213221
func ghSwimQl() {
214-
for token := ghFillTokQl(); token.noMoreFish(); token.thanksForAllTheFish(&ghQlTokIdx, &ghTokQl) {
222+
for token := ghFillTokQl(); ; token.thanksForAllTheFish(&ghQlTokIdx, &ghTokQl) {
215223
token.waitForFish()
216224
qlcli := token.qlcli()
217225
for {

nobori/nobori.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ var l = util.SetupLog("nobori")
2323
// Central channel for scheduling upstream stream fetches
2424
var queue chan db.Stream = make(chan db.Stream)
2525

26+
// Upstream handlers
27+
// Each handler function should return a channel.
28+
// Sending a message to the channel indicates the swimmer is ready.
29+
var swimmers = []func() chan struct{}{GhSwim}
30+
2631
// Calculate the timeout duration
2732
//
2833
// A power-law function is used to calculate the timeout duration.
@@ -49,14 +54,24 @@ func schedule(stream db.Stream) {
4954
queue <- stream
5055
}
5156

52-
// Main loop for upstream metadata fetching
57+
// Start the fetch loop for upstream metadata fetching
5358
//
54-
// Streams are processed from the queue as their scheduled time arrives.
55-
func FetchLoop() {
56-
go GhSwim()
59+
// Return when nobori is ready.
60+
func StartFetchLoop() {
61+
ready_chs := util.SliceMap(swimmers, func(swimmer func() chan struct{}) chan struct{} { return swimmer() })
5762
var strms []db.Stream
5863
r := db.DB.Find(&strms)
5964
util.Yeet(l, "can't find streams", r.Error)
65+
66+
util.SliceEach(ready_chs, func(ch chan struct{}) { <-ch })
67+
68+
go fetchLoop(strms)
69+
}
70+
71+
// Main loop for upstream metadata fetching
72+
//
73+
// Streams are processed from the queue as their scheduled time arrives.
74+
func fetchLoop(strms []db.Stream) {
6075
for _, stream := range strms {
6176
go schedule(stream)
6277
}

nobori/strm.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ var newStrmHdlrs = []func(db *gorm.DB, p *db.Pkg, url string, urls []string) boo
2121
if !strings.HasPrefix(url, "github.com/") {
2222
return false
2323
}
24+
l.Debug("strm hdl github", zap.String("p", p.ID.String()))
2425
strm := GhStrmHdlr(*p, url)
2526
strm.Mirrors = strings.Join(urls, ",")
2627
dbx.Save(strm)
@@ -83,6 +84,7 @@ recv:
8384
}
8485

8586
func handleNewStrm(dbx *gorm.DB, p *db.Pkg, urls []string) error {
87+
l.Debug("new strm", zap.String("ID", p.ID.String()))
8688
for _, hdlr := range newStrmHdlrs {
8789
for _, url := range urls {
8890
if hdlr(dbx, p, url, urls) {

util/sugar.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,9 @@ func SliceMap[T, U any](slice []T, fn func(T) U) []U {
1010
}
1111
return result
1212
}
13+
14+
func SliceEach[T any](slice []T, fn func(T)) {
15+
for _, v := range slice {
16+
fn(v)
17+
}
18+
}

0 commit comments

Comments
 (0)