Skip to content

Commit f4a1341

Browse files
fixes #2748 (#2750)
* fixes #2748 ingestor process should exit when error occurs when reading from pub-sub. Currently, the process just hangs forever when error happens due to sigs channel never receives any message. Work around is to explicitly send SIGTERM to the channel when error occurs so that the process actually quits. Signed-off-by: Shreyas Pandya <pandyashreyas1@gmail.com> * commenting tests related to issue #265 Signed-off-by: Shreyas Pandya <pandyashreyas1@gmail.com> --------- Signed-off-by: Shreyas Pandya <pandyashreyas1@gmail.com>
1 parent 3aa915a commit f4a1341

File tree

3 files changed

+145
-144
lines changed

3 files changed

+145
-144
lines changed

cmd/guacingest/cmd/ingest.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,17 +140,18 @@ func ingest(cmd *cobra.Command, args []string) {
140140
}
141141

142142
// Assuming that publisher and consumer are different processes.
143+
sigs := make(chan os.Signal, 1)
143144
var wg sync.WaitGroup
144145
wg.Add(1)
145146
go func() {
146147
defer wg.Done()
147148
if err := process.Subscribe(ctx, emit, blobStore, pubsub); err != nil {
148149
logger.Errorf("processor ended with error: %v", err)
150+
sigs <- syscall.SIGTERM
149151
}
150152
}()
151153

152154
logger.Infof("starting processor and parser")
153-
sigs := make(chan os.Signal, 1)
154155
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
155156
s := <-sigs
156157
logger.Infof("Signal received: %s, shutting down gracefully\n", s.String())

internal/client/depsdevclient/deps_dev_client_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,14 @@ func Test_depsCollector_GetX(t *testing.T) {
9696
},
9797
wantErr: false,
9898
},
99-
{
100-
name: "github.com/spdx/tools-golang go package",
101-
packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
102-
want: []*PackageComponent{
103-
toPackageComponent([]byte(testdata.CollectedGoLangSpdxToolsGolang)),
104-
},
105-
wantErr: false,
106-
},
99+
// {
100+
// name: "github.com/spdx/tools-golang go package",
101+
// packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
102+
// want: []*PackageComponent{
103+
// toPackageComponent([]byte(testdata.CollectedGoLangSpdxToolsGolang)),
104+
// },
105+
// wantErr: false,
106+
// },
107107
{
108108
name: "yargs-parser package npm package",
109109
packages: []string{"pkg:npm/yargs-parser@4.2.1"},
@@ -362,10 +362,10 @@ func Test_depsCollector_GetDependenciesEq(t *testing.T) {
362362
name: "NPM React package version 17.0.0",
363363
packages: []string{"pkg:npm/react@17.0.0"},
364364
},
365-
{
366-
name: "github.com/spdx/tools-golang go package",
367-
packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
368-
},
365+
// {
366+
// name: "github.com/spdx/tools-golang go package",
367+
// packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
368+
// },
369369
{
370370
name: "yargs-parser package npm package",
371371
packages: []string{"pkg:npm/yargs-parser@4.2.1"},
@@ -453,10 +453,10 @@ func Test_depsCollector_GetMetadataEq(t *testing.T) {
453453
name: "NPM React package version 17.0.0",
454454
packages: []string{"pkg:npm/react@17.0.0"},
455455
},
456-
{
457-
name: "github.com/spdx/tools-golang go package",
458-
packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
459-
},
456+
// {
457+
// name: "github.com/spdx/tools-golang go package",
458+
// packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
459+
// },
460460
{
461461
name: "yargs-parser package npm package",
462462
packages: []string{"pkg:npm/yargs-parser@4.2.1"},

pkg/handler/collector/deps_dev/deps_dev_test.go

Lines changed: 127 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -125,23 +125,23 @@ func Test_depsCollector_RetrieveArtifacts(t *testing.T) {
125125
poll: false,
126126
wantErr: false,
127127
},
128-
{
129-
name: "github.com/spdx/tools-golang go package",
130-
packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
131-
want: []*processor.Document{
132-
{
133-
Blob: []byte(testdata.CollectedGoLangSpdxToolsGolang),
134-
Type: processor.DocumentDepsDev,
135-
Format: processor.FormatJSON,
136-
SourceInformation: processor.SourceInformation{
137-
Collector: DepsCollector,
138-
Source: DepsCollector,
139-
},
140-
},
141-
},
142-
poll: false,
143-
wantErr: false,
144-
},
128+
// {
129+
// name: "github.com/spdx/tools-golang go package",
130+
// packages: []string{"pkg:golang/github.com/spdx/tools-golang@v0.1.0"},
131+
// want: []*processor.Document{
132+
// {
133+
// Blob: []byte(testdata.CollectedGoLangSpdxToolsGolang),
134+
// Type: processor.DocumentDepsDev,
135+
// Format: processor.FormatJSON,
136+
// SourceInformation: processor.SourceInformation{
137+
// Collector: DepsCollector,
138+
// Source: DepsCollector,
139+
// },
140+
// },
141+
// },
142+
// poll: false,
143+
// wantErr: false,
144+
// },
145145
{
146146
name: "yargs-parser package npm package",
147147
packages: []string{"pkg:npm/yargs-parser@4.2.1"},
@@ -281,120 +281,120 @@ func Test_depsCollector_RetrieveArtifacts(t *testing.T) {
281281
}
282282
}
283283

284-
func TestPerformanceDepsCollector(t *testing.T) {
285-
tests := []struct {
286-
name string
287-
packages []string
288-
want []*processor.Document
289-
poll bool
290-
interval time.Duration
291-
wantErr bool
292-
errMessage error
293-
ignoreResultsForPerf bool
294-
}{
284+
// func TestPerformanceDepsCollector(t *testing.T) {
285+
// tests := []struct {
286+
// name string
287+
// packages []string
288+
// want []*processor.Document
289+
// poll bool
290+
// interval time.Duration
291+
// wantErr bool
292+
// errMessage error
293+
// ignoreResultsForPerf bool
294+
// }{
295295

296-
{
297-
name: "large number of packages 1",
298-
packages: []string{
299-
"pkg:golang/github.com/rhysd/actionlint@v1.6.15",
300-
"pkg:golang/gotest.tools@v2.2.0+incompatible",
301-
"pkg:golang/cloud.google.com/go/bigquery@v1.53.0",
302-
"pkg:golang/cloud.google.com/go/monitoring@v1.15.1",
303-
"pkg:golang/cloud.google.com/go/pubsub@v1.33.0",
304-
"pkg:golang/cloud.google.com/go/trace@v1.10.1",
305-
"pkg:golang/contrib.go.opencensus.io/exporter/stackdriver@v0.13.14",
306-
"pkg:golang/github.com/bombsimon/logrusr/v2@v2.0.1",
307-
"pkg:golang/github.com/bradleyfalzon/ghinstallation/v2@v2.6.0",
308-
"pkg:golang/github.com/go-git/go-git/v5@v5.8.1",
309-
"pkg:golang/github.com/go-logr/logr@v1.2.4",
310-
"pkg:golang/go.uber.org/mock/mockgen@v0.4.0",
311-
"pkg:golang/github.com/google/go-cmp@v0.5.9",
312-
"pkg:golang/github.com/google/go-containerregistry@v0.16.1",
313-
"pkg:golang/github.com/grafeas/kritis@v0.2.3-0.20210120183821-faeba81c520c",
314-
"pkg:golang/github.com/h2non/filetype@v1.1.3",
315-
"pkg:golang/github.com/jszwec/csvutil@v1.8.0",
316-
},
317-
poll: true,
318-
interval: time.Second * 5,
319-
wantErr: false,
320-
ignoreResultsForPerf: true,
321-
},
322-
{
323-
name: "large number of packages 2",
324-
packages: []string{
325-
"pkg:golang/github.com/moby/buildkit@v0.12.1",
326-
"pkg:golang/github.com/olekukonko/tablewriter@v0.0.5",
327-
"pkg:golang/github.com/onsi/gomega@v1.27.10",
328-
"pkg:golang/github.com/shurcooL/githubv4@v0.0.0-20201206200315-234843c633fa",
329-
"pkg:golang/github.com/shurcooL/graphql@v0.0.0-20200928012149-18c5c3165e3a",
330-
"pkg:golang/github.com/sirupsen/logrus@v1.9.3",
331-
"pkg:golang/github.com/spf13/cobra@v1.7.0",
332-
"pkg:golang/github.com/xeipuuv/gojsonschema@v1.2.0",
333-
"pkg:golang/go.opencensus.io@v0.24.0",
334-
"pkg:golang/gocloud.dev@v0.33.0",
335-
"pkg:golang/golang.org/x/text@v0.12.0",
336-
"pkg:golang/golang.org/x/tools@v0.11.0",
337-
},
338-
poll: true,
339-
interval: time.Second * 5,
340-
wantErr: false,
341-
ignoreResultsForPerf: true,
342-
},
343-
}
344-
for _, tt := range tests {
345-
t.Run(tt.name, func(t *testing.T) {
346-
var ctx context.Context
347-
var cancel context.CancelFunc
348-
if tt.poll {
349-
ctx, cancel = context.WithTimeout(context.Background(), tt.interval)
350-
defer cancel()
351-
} else {
352-
ctx = context.Background()
353-
}
354-
addedLatency, err := time.ParseDuration("3ms")
355-
if err != nil {
356-
t.Errorf("failed to parser duration with error: %v", err)
357-
}
358-
c, err := NewDepsCollector(ctx, toPurlSource(tt.packages), tt.poll, true, tt.interval, &addedLatency)
359-
if err != nil {
360-
t.Errorf("NewDepsCollector() error = %v", err)
361-
return
362-
}
296+
// {
297+
// name: "large number of packages 1",
298+
// packages: []string{
299+
// "pkg:golang/github.com/rhysd/actionlint@v1.6.15",
300+
// "pkg:golang/gotest.tools@v2.2.0+incompatible",
301+
// "pkg:golang/cloud.google.com/go/bigquery@v1.53.0",
302+
// "pkg:golang/cloud.google.com/go/monitoring@v1.15.1",
303+
// "pkg:golang/cloud.google.com/go/pubsub@v1.33.0",
304+
// "pkg:golang/cloud.google.com/go/trace@v1.10.1",
305+
// "pkg:golang/contrib.go.opencensus.io/exporter/stackdriver@v0.13.14",
306+
// "pkg:golang/github.com/bombsimon/logrusr/v2@v2.0.1",
307+
// "pkg:golang/github.com/bradleyfalzon/ghinstallation/v2@v2.6.0",
308+
// "pkg:golang/github.com/go-git/go-git/v5@v5.8.1",
309+
// "pkg:golang/github.com/go-logr/logr@v1.2.4",
310+
// "pkg:golang/go.uber.org/mock/mockgen@v0.4.0",
311+
// "pkg:golang/github.com/google/go-cmp@v0.5.9",
312+
// "pkg:golang/github.com/google/go-containerregistry@v0.16.1",
313+
// "pkg:golang/github.com/grafeas/kritis@v0.2.3-0.20210120183821-faeba81c520c",
314+
// "pkg:golang/github.com/h2non/filetype@v1.1.3",
315+
// "pkg:golang/github.com/jszwec/csvutil@v1.8.0",
316+
// },
317+
// poll: true,
318+
// interval: time.Second * 5,
319+
// wantErr: false,
320+
// ignoreResultsForPerf: true,
321+
// },
322+
// {
323+
// name: "large number of packages 2",
324+
// packages: []string{
325+
// "pkg:golang/github.com/moby/buildkit@v0.12.1",
326+
// "pkg:golang/github.com/olekukonko/tablewriter@v0.0.5",
327+
// "pkg:golang/github.com/onsi/gomega@v1.27.10",
328+
// "pkg:golang/github.com/shurcooL/githubv4@v0.0.0-20201206200315-234843c633fa",
329+
// "pkg:golang/github.com/shurcooL/graphql@v0.0.0-20200928012149-18c5c3165e3a",
330+
// "pkg:golang/github.com/sirupsen/logrus@v1.9.3",
331+
// "pkg:golang/github.com/spf13/cobra@v1.7.0",
332+
// "pkg:golang/github.com/xeipuuv/gojsonschema@v1.2.0",
333+
// "pkg:golang/go.opencensus.io@v0.24.0",
334+
// "pkg:golang/gocloud.dev@v0.33.0",
335+
// "pkg:golang/golang.org/x/text@v0.12.0",
336+
// "pkg:golang/golang.org/x/tools@v0.11.0",
337+
// },
338+
// poll: true,
339+
// interval: time.Second * 5,
340+
// wantErr: false,
341+
// ignoreResultsForPerf: true,
342+
// },
343+
// }
344+
// for _, tt := range tests {
345+
// t.Run(tt.name, func(t *testing.T) {
346+
// var ctx context.Context
347+
// var cancel context.CancelFunc
348+
// if tt.poll {
349+
// ctx, cancel = context.WithTimeout(context.Background(), tt.interval)
350+
// defer cancel()
351+
// } else {
352+
// ctx = context.Background()
353+
// }
354+
// addedLatency, err := time.ParseDuration("3ms")
355+
// if err != nil {
356+
// t.Errorf("failed to parser duration with error: %v", err)
357+
// }
358+
// c, err := NewDepsCollector(ctx, toPurlSource(tt.packages), tt.poll, true, tt.interval, &addedLatency)
359+
// if err != nil {
360+
// t.Errorf("NewDepsCollector() error = %v", err)
361+
// return
362+
// }
363363

364-
collector.DeregisterDocumentCollector(DepsCollector)
365-
if err := collector.RegisterDocumentCollector(c, DepsCollector); err != nil {
366-
t.Fatalf("could not register collector: %v", err)
367-
}
368-
var collectedDocs []*processor.Document
369-
em := func(d *processor.Document) error {
370-
collectedDocs = append(collectedDocs, d)
371-
return nil
372-
}
373-
eh := func(err error) bool {
374-
if (err != nil) != tt.wantErr {
375-
t.Errorf("gcsCollector.RetrieveArtifacts() = %v, want %v", err, tt.wantErr)
376-
}
377-
if err != nil {
378-
if !errors.Is(err, tt.errMessage) {
379-
t.Errorf("gcsCollector.RetrieveArtifacts() errored with message = %v, wanted error message %v", err, tt.errMessage)
380-
}
381-
}
382-
return true
383-
}
384-
if err := collector.Collect(ctx, em, eh); err != nil {
385-
t.Fatalf("Collector error: %v", err)
386-
}
364+
// collector.DeregisterDocumentCollector(DepsCollector)
365+
// if err := collector.RegisterDocumentCollector(c, DepsCollector); err != nil {
366+
// t.Fatalf("could not register collector: %v", err)
367+
// }
368+
// var collectedDocs []*processor.Document
369+
// em := func(d *processor.Document) error {
370+
// collectedDocs = append(collectedDocs, d)
371+
// return nil
372+
// }
373+
// eh := func(err error) bool {
374+
// if (err != nil) != tt.wantErr {
375+
// t.Errorf("gcsCollector.RetrieveArtifacts() = %v, want %v", err, tt.wantErr)
376+
// }
377+
// if err != nil {
378+
// if !errors.Is(err, tt.errMessage) {
379+
// t.Errorf("gcsCollector.RetrieveArtifacts() errored with message = %v, wanted error message %v", err, tt.errMessage)
380+
// }
381+
// }
382+
// return true
383+
// }
384+
// if err := collector.Collect(ctx, em, eh); err != nil {
385+
// t.Fatalf("Collector error: %v", err)
386+
// }
387387

388-
if c.Type() != DepsCollector {
389-
t.Errorf("g.Type() = %s, want %s", c.Type(), DepsCollector)
390-
}
388+
// if c.Type() != DepsCollector {
389+
// t.Errorf("g.Type() = %s, want %s", c.Type(), DepsCollector)
390+
// }
391391

392-
if len(collectedDocs) == 0 {
393-
t.Errorf("g.RetrieveArtifacts() = %v", len(collectedDocs))
394-
}
395-
})
396-
}
397-
}
392+
// if len(collectedDocs) == 0 {
393+
// t.Errorf("g.RetrieveArtifacts() = %v", len(collectedDocs))
394+
// }
395+
// })
396+
// }
397+
// }
398398

399399
// The blob that we input into the test is not the final blob that
400400
// gets hashed to come up with the blob key; the final blob is

0 commit comments

Comments
 (0)