Skip to content

Commit 9f8ce56

Browse files
committed
fix: make streams and nobori fetching work
1 parent 06705c3 commit 9f8ce56

File tree

8 files changed

+164
-79
lines changed

8 files changed

+164
-79
lines changed

db/db.go

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package db
22

33
import (
44
"os"
5+
"strings"
56
"time"
67

7-
"github.com/google/uuid"
88
"github.com/terrapkg/gura/util"
9-
"gorm.io/datatypes"
9+
dt "gorm.io/datatypes"
1010
"gorm.io/driver/postgres"
1111
"gorm.io/gorm"
1212
"moul.io/zapgorm2"
@@ -28,18 +28,44 @@ const (
2828

2929
// Streams are grouped packages with the same upstream
3030
type Stream struct {
31-
ID uuid.UUID `gorm:"type:uuid;default:gen_random_uuid();primaryKey" json:"id"`
31+
ID dt.UUID `gorm:"type:uuid;default:gen_random_uuid();primaryKey" json:"id"`
3232
LastChk time.Time `json:"last_chk"`
3333
LastUpd time.Time `json:"last_upd"`
3434
Fetch string `json:"fetch"`
3535
Forge ForgeType `json:"forge"`
3636
Ver string `json:"ver"`
37-
Mirrors string `json:"mirrors"` // comma-separated list of mirrors
37+
38+
Mirrors []StreamMirror `json:"mirrors"` // comma-separated list of mirrors
39+
}
40+
41+
func (s *Stream) BeforeCreate(tx *gorm.DB) (err error) {
42+
if s.ID.IsEmpty() {
43+
s.ID = dt.NewUUIDv4()
44+
}
45+
return nil
46+
}
47+
48+
type StreamMirror struct {
49+
ID dt.UUID `gorm:"type:uuid" json:"id"`
50+
StreamID dt.UUID `gorm:"type:uuid" json:"streamid"`
51+
Stream Stream `json:"-"`
52+
Mirror string `json:"mirror"` // url without `http(s)://` and trailing slash
53+
}
54+
55+
func (m *StreamMirror) BeforeCreate(tx *gorm.DB) error {
56+
if m.ID.IsEmpty() {
57+
m.ID = dt.NewUUIDv4()
58+
}
59+
return nil
60+
}
61+
func (m *StreamMirror) BeforeSave(tx *gorm.DB) error {
62+
m.Mirror = strings.TrimSuffix(m.Mirror, "/")
63+
return nil
3864
}
3965

4066
// Pkg is the package model. Uses UUID primary key instead of gorm.Model's uint.
4167
type Pkg struct {
42-
ID datatypes.UUID `gorm:"type:uuid;default:gen_random_uuid();primaryKey" json:"id"`
68+
ID dt.UUID `gorm:"type:uuid;default:gen_random_uuid();primaryKey" json:"id"`
4369
CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"`
4470
UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updated_at"`
4571
DeletedAt gorm.DeletedAt `json:"deleted_at"`
@@ -53,10 +79,17 @@ type Pkg struct {
5379
// Foreign key relationship referencing Repo.ID (string).
5480
Repo Repo `gorm:"constraint:OnDelete:CASCADE;foreignKey:RepoID;references:ID" json:"-"`
5581

56-
Meta datatypes.JSON `gorm:"type:jsonb;default:'{}'" json:"meta"`
82+
Meta dt.JSON `gorm:"type:jsonb;default:'{}'" json:"meta"`
5783

58-
StreamID *uuid.UUID `json:"stream_id"`
59-
Stream Stream `json:"-"`
84+
StreamID *dt.UUID `gorm:"type:uuid" json:"stream_id"`
85+
Stream Stream `json:"-"`
86+
}
87+
88+
func (s *Pkg) BeforeCreate(tx *gorm.DB) (err error) {
89+
if s.ID.IsEmpty() {
90+
s.ID = dt.NewUUIDv4()
91+
}
92+
return nil
6093
}
6194

6295
// Repo represents a package repository. Its ID is a string.
@@ -73,6 +106,7 @@ var l = util.SetupLog("db")
73106

74107
// SetupDB initializes the global DB connection and runs migrations.
75108
func SetupDB() {
109+
l.Info("setting up db")
76110
var err error
77111
dsn := os.Getenv("GURA_DSN")
78112
if dsn == "" {
@@ -85,5 +119,6 @@ func SetupDB() {
85119
util.MaybeSuicide(l, "cannot open db", err)
86120

87121
// AutoMigrate in an order that respects foreign keys.
88-
util.MaybeSuicide(l, "auto migrate failed", DB.AutoMigrate(&Repo{}, &Pkg{}, &Stream{}))
122+
util.MaybeSuicide(l, "auto migrate failed", DB.AutoMigrate(&Repo{}, &Pkg{}, &Stream{}, &StreamMirror{}))
123+
l.Info("db ready")
89124
}

db/pkg.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ func SubmitPkg(pkg *Pkg) (*Pkg, error) {
4444
return pkg, nil
4545
}
4646

47-
func (p *Pkg) Update() error {
48-
if err := DB.Save(p).Error; err != nil {
47+
func (s *Pkg) Update() error {
48+
if err := DB.Save(s).Error; err != nil {
4949
return err
5050
}
5151
return nil
5252
}
5353

54-
func (p *Pkg) Delete() error {
55-
if err := DB.Delete(p).Error; err != nil {
54+
func (s *Pkg) Delete() error {
55+
if err := DB.Delete(s).Error; err != nil {
5656
return err
5757
}
5858
return nil

kudari/kudari.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,25 @@ var l = util.SetupLog("kudari")
2222
const fetchRepoTimer = 3e10 // 30s
2323

2424
func fetch(repo db.Repo) {
25-
defer func() {
26-
time.Sleep(fetchRepoTimer)
27-
go fetch(repo)
28-
}()
2925
l.Info("fetching repository", zap.String("repoID", repo.ID))
3026
switch repo.Type {
3127
case db.Rpm:
3228
rpmFetch(repo)
3329
}
3430
n, err := gorm.G[db.Repo](db.DB).Where("id = ?", repo.ID).Update(context.Background(), "upd_at", time.Now())
3531
if util.Yeet(l, "error while updating upd_at", err, zap.String("repoID", repo.ID), zap.Error(err)) {
36-
return
32+
goto next
3733
}
3834
if n != 1 {
3935
l.DPanic("bug: mut upd_at", zap.Int("n", n), zap.String("repoID", repo.ID))
40-
return
36+
goto next
4137
}
4238
l.Info("done fetching repository", zap.String("repoID", repo.ID))
39+
40+
next:
41+
time.Sleep(fetchRepoTimer)
42+
go fetch(repo)
43+
4344
}
4445

4546
func FetchLoop() {

kudari/rpm.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,7 @@ func rpmFetch(repo db.Repo) {
167167
packages := util.MergeSortedDedup(allSlices, rpmCompare)
168168

169169
l.Info("processing packages", zap.String("repoID", repo.ID), zap.Int("count", len(packages)))
170-
var newpkgs []*db.Pkg
171-
updated, unchanged, lastIdx := 0, 0, 0
170+
updated, unchanged, lastIdx, newpkgs := 0, 0, 0, 0
172171
walked := make([]bool, len(pkgs))
173172
tx := db.DB.Begin()
174173
for _, p := range packages {
@@ -189,17 +188,18 @@ func rpmFetch(repo db.Repo) {
189188
pkgs[n].FullVer = fullver
190189
pkgs[n].Ver = p.Version.Ver
191190
util.MaybeSuicide(l, "Meta.UnmarshalJSON", pkgs[n].Meta.UnmarshalJSON(rpm2MetaJSON(p)))
192-
nobori.RegPkg(tx, &pkgs[n])
193191
updated++
194192
} else {
195-
newpkgs = append(newpkgs, &db.Pkg{
193+
p := &db.Pkg{
196194
Name: p.Name,
197195
FullVer: rpmFullVer(p),
198196
Ver: p.Version.Ver,
199197
Arch: p.Arch,
200198
RepoID: repo.ID,
201199
Meta: rpm2MetaJSON(p),
202-
})
200+
}
201+
util.Yeet(l, "cannot RegPkg", nobori.RegPkg(tx, p), zap.Any("pkg", p))
202+
newpkgs++
203203
}
204204
}
205205
var deletes []uuid.UUID
@@ -209,15 +209,12 @@ func rpmFetch(repo db.Repo) {
209209
}
210210
}
211211
tx.Delete(&db.Pkg{}, "id IN (?)", deletes)
212-
if newpkgs != nil {
213-
tx.CreateInBatches(newpkgs, 5000)
214-
}
215212
tx.Commit()
216213
l.Info("package update summary",
217214
zap.String("repoID", repo.ID),
218215
zap.Int("unchanged", unchanged),
219216
zap.Int("updated", updated),
220-
zap.Int("added", len(newpkgs)),
217+
zap.Int("added", newpkgs),
221218
zap.Int("deleted", len(deletes)),
222219
)
223220
}

nobori/TRACE.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func UpTrace(p db.Pkg, url_ch chan string) {
2323
default:
2424
l.DPanic("unreachable in uptrace")
2525
}
26+
close(url_ch)
2627
}
2728

2829
func rpmTrace(p db.Pkg, url_ch chan string) {

nobori/github.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import (
2323
"go.uber.org/zap"
2424
)
2525

26+
const GH_WARN_DUR = 500 // ms
27+
2628
// ? https://docs.github.com/en/rest/using-the-rest-api/rate-limits-for-the-rest-api?apiVersion=2022-11-28#about-secondary-rate-limits
2729
// no more than 100 parallel requests
28-
var ghRtPool = make(chan db.Stream)
29-
var ghQlPool = make(chan db.Stream)
30+
var ghRtPool = make(chan db.Stream, 100)
31+
var ghQlPool = make(chan db.Stream, 100)
3032
var ghTokRt []GHToken // GitHub REST Tokens
3133
var ghTokQl []GHToken // GitHub GraphQL Tokens
3234
var ghRtTokIdx int = 0
@@ -73,9 +75,9 @@ func (token *GHToken) waitForFish() {
7375
// Update token quota and reset time from HTTP response headers
7476
func (token *GHToken) updTok(h http.Header) {
7577
q, err := strconv.ParseInt(h.Get("x-ratelimit-remaining"), 10, 16)
76-
util.MaybeSuicide(ghl, "strconv x-ratelimit-remaining", err)
78+
util.MaybeSuicide(ghl, "strconv", err, zap.String("x-ratelimit-remaining", h.Get("x-ratelimit-remaining")))
7779
r, err := strconv.ParseInt(h.Get("x-ratelimit-reset"), 10, 64)
78-
util.MaybeSuicide(ghl, "strconv x-ratelimit-reset", err)
80+
util.MaybeSuicide(ghl, "strconv", err, zap.String("x-ratelimit-reset", h.Get("x-ratelimit-reset")))
7981
token.quota = int16(q)
8082
token.reset = time.Unix(r, 0)
8183
}
@@ -161,18 +163,16 @@ func ghFillTokRt() *GHToken {
161163
//
162164
// Return the token with the highest quota.
163165
func ghFillTokQl() *GHToken {
164-
return ghInitTokenPool(&ghTokQl, func(token string) (GHToken, error) {
165-
var qlcli = ql.NewClient("https://api.github.com/graphql", http.DefaultClient).WithRequestModifier(
166-
func(r *http.Request) {
167-
r.Header.Add("Authorization", "Bearer "+token)
168-
})
166+
return ghInitTokenPool(&ghTokQl, func(token string) (tok GHToken, err error) {
167+
tok.key = token
168+
qlcli := tok.qlcli()
169169
var q struct {
170170
RateLimit struct {
171171
Remaining int16
172172
ResetAt string
173173
}
174174
}
175-
err := qlcli.Query(context.Background(), &q, nil)
175+
err = qlcli.Query(context.Background(), &q, nil)
176176
if err != nil {
177177
return GHToken{}, xerrors.Newf("query RateLimit -> give up token %s: %w", token, err)
178178
}
@@ -196,7 +196,7 @@ func GhSwim() chan struct{} {
196196
ready := make(chan struct{}, 1)
197197
go func() {
198198
for len(ghTokRt) == 0 || len(ghTokQl) == 0 {
199-
time.Sleep(20 * time.Millisecond)
199+
time.Sleep(1 * time.Millisecond)
200200
}
201201
ready <- struct{}{}
202202
}()
@@ -269,18 +269,36 @@ func ghRtReleaseCall(stream *db.Stream, repo string, token *GHToken) []string {
269269
return []string{}
270270
}
271271
req.Header.Add("Authorization", "Bearer "+token.key)
272+
l.Debug("ghRtReleaseCall", zap.String("repo", repo))
273+
t0 := time.Now()
272274
resp, err := http.DefaultClient.Do(req)
275+
if t := time.Since(t0); t.Milliseconds() > GH_WARN_DUR {
276+
l.Warn("took " + t.String())
277+
}
273278
if util.Yeet(ghl, "resp fail", err, zap.String("fetch", stream.Fetch)) {
274279
return []string{}
275280
}
281+
if resp.StatusCode != http.StatusOK {
282+
body, err := io.ReadAll(resp.Body)
283+
if err != nil {
284+
l.Error("can't read body", zap.String("fetch", stream.Fetch), zap.Error(err))
285+
body = []byte{}
286+
}
287+
l.Error("bad status", zap.String("status", resp.Status), zap.String("fetch", stream.Fetch), zap.ByteString("body", body))
288+
return []string{}
289+
}
276290
token.updTok(resp.Header)
277291
buf, err := io.ReadAll(resp.Body)
278292
util.MaybeSuicide(ghl, "can't read buf", err)
279-
var v []struct {
280-
tag_name string
293+
type Resp struct {
294+
Tag string `json:"tag_name"`
281295
}
282-
json.Unmarshal(buf, &v)
283-
return util.SliceMap(v, func(r struct{ tag_name string }) string { return r.tag_name })
296+
297+
var v []Resp
298+
if util.Yeet(l, "can't unmarshal", json.Unmarshal(buf, &v), zap.ByteString("resp", buf)) {
299+
return []string{}
300+
}
301+
return util.SliceMap(v, func(r Resp) string { return r.Tag })
284302
}
285303

286304
// Fetch latest GitHub release using the REST API
@@ -320,16 +338,22 @@ func ghQlTagCall(prefix, owner, name string, len int, qlcli ql.Client, token *GH
320338
} `graphql:"refs(refPrefix: \"refs/tags/\", last: $len, orderBy: {field: TAG_COMMIT_DATE, direction: ASC}, query: $prefix)"`
321339
} `graphql:"repository(owner: $owner, name: $name)"`
322340
}
323-
var headers http.Header
324-
err := qlcli.Query(context.Background(), &q, map[string]any{
341+
headers := http.Header{}
342+
l.Debug("ghQlTagCall", zap.String("repo", owner+"/"+name))
343+
t0 := time.Now()
344+
err := qlcli.Query(context.TODO(), &q, map[string]any{
325345
"prefix": prefix,
326346
"owner": owner,
327347
"name": name,
328348
"len": len,
329349
}, ql.BindResponseHeaders(&headers))
350+
if t := time.Since(t0); t.Milliseconds() > GH_WARN_DUR {
351+
l.Warn("took " + t.String())
352+
}
330353
if util.Yeet(ghl, "ql_tag_call", err) {
331354
return nil
332355
}
356+
333357
tags := make([]string, 0, len)
334358
for _, edge := range q.Repository.Refs.Edges {
335359
tags = append(tags, edge.Node.Name)
@@ -353,7 +377,7 @@ func ghQlTag(stream *db.Stream, remain string, token *GHToken, qlcli ql.Client)
353377
}
354378
}
355379

356-
const GH_STRM_HDLR_TEST_QL_MAX = 100
380+
const GH_STRM_HDLR_TEST_QL_MAX = 20
357381

358382
func GhStrmHdlr(pkg db.Pkg, url string) *db.Stream {
359383
strm := &db.Stream{
@@ -377,7 +401,8 @@ func GhStrmHdlr(pkg db.Pkg, url string) *db.Stream {
377401
l.Warn("Found releases, but cannot determine ver/prefix",
378402
zap.Strings("releases", releases),
379403
zap.String("repo", repo),
380-
zap.String("pkgid", pkg.ID.String()))
404+
zap.String("pkgid", pkg.ID.String()),
405+
zap.String("pkgv", pkg.Ver))
381406
}
382407

383408
tokenql := &ghTokQl[ghQlTokIdx]

nobori/nobori.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func schedule(stream db.Stream) {
5858
//
5959
// Return when nobori is ready.
6060
func StartFetchLoop() {
61+
l.Info("preparing fetch loop")
6162
ready_chs := util.SliceMap(swimmers, func(swimmer func() chan struct{}) chan struct{} { return swimmer() })
6263
var strms []db.Stream
6364
r := db.DB.Find(&strms)
@@ -66,6 +67,7 @@ func StartFetchLoop() {
6667
util.SliceEach(ready_chs, func(ch chan struct{}) { <-ch })
6768

6869
go fetchLoop(strms)
70+
l.Info("nobori ready")
6971
}
7072

7173
// Main loop for upstream metadata fetching

0 commit comments

Comments
 (0)