Skip to content

Commit 75be880

Browse files
author
Daniel Jimenez
committed
Rename findings asyncAPI spec and generate JSON tags for models
1 parent c60f54b commit 75be880

File tree

8 files changed

+267
-110
lines changed

8 files changed

+267
-110
lines changed

docs/asyncapi.yaml

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ channels:
1414
description: CDC Events of the assets stored in Vulcan.
1515
subscribe:
1616
message:
17-
$ref: '#/components/messages/asset'
17+
$ref: "#/components/messages/asset"
1818
findings:
1919
subscribe:
2020
message:
21-
$ref: '#/components/messages/FindingPayload'
21+
$ref: "#/components/messages/finding"
2222

2323
components:
2424
messages:
@@ -33,18 +33,12 @@ components:
3333
contentType: application/json
3434
payload:
3535
$ref: "#/components/schemas/assetPayload"
36-
FindingPayload:
36+
finding:
37+
headers:
38+
$ref: "#/components/schemas/findingMetadata"
3739
contentType: application/json
38-
headers:
39-
properties:
40-
version:
41-
description: schema version header
42-
type: string
43-
required:
44-
- version
45-
type: object
4640
payload:
47-
$ref: '#/components/schemas/FindingPayload'
41+
$ref: "#/components/schemas/findingPayload"
4842
summary: Events generated from Vulnerability DB findings state changes
4943
name: Finding
5044
title: Findings state
@@ -143,7 +137,7 @@ components:
143137
- description
144138
- tag
145139

146-
FindingPayload:
140+
findingPayload:
147141
properties:
148142
affected_resource:
149143
type: string
@@ -156,27 +150,40 @@ components:
156150
impact_details:
157151
type: string
158152
issue:
159-
$ref: '#/components/schemas/StoreIssueLabels'
153+
$ref: '#/components/schemas/issue'
160154
resources:
161-
$ref: '#/components/schemas/StoreResources'
155+
$ref: '#/components/schemas/resources'
162156
score:
163157
type: number
164158
source:
165-
$ref: '#/components/schemas/StoreSource'
159+
$ref: '#/components/schemas/source'
166160
status:
167161
type: string
168162
target:
169-
$ref: '#/components/schemas/StoreTargetTeams'
163+
$ref: '#/components/schemas/target'
170164
total_exposure:
171165
type: integer
172166
type: object
173-
PqStringArray:
167+
additionalProperties: false
168+
169+
findingMetadata:
170+
type: object
171+
additionalProperties: false
172+
properties:
173+
version:
174+
type: string
175+
description: The value of this field is equal to the value of the field info.version of this document.
176+
required:
177+
- version
178+
179+
stringArray:
174180
items:
175181
type: string
176182
type:
177183
- array
178184
- "null"
179-
StoreIssueLabels:
185+
186+
issue:
180187
properties:
181188
cwe_id:
182189
minimum: 0
@@ -192,13 +199,15 @@ components:
192199
- array
193200
- "null"
194201
recommendations:
195-
$ref: '#/components/schemas/PqStringArray'
202+
$ref: '#/components/schemas/stringArray'
196203
reference_links:
197-
$ref: '#/components/schemas/PqStringArray'
204+
$ref: '#/components/schemas/stringArray'
198205
summary:
199206
type: string
200207
type: object
201-
StoreResourceGroup:
208+
additionalProperties: false
209+
210+
resourceGroup:
202211
properties:
203212
attributes:
204213
items:
@@ -217,13 +226,16 @@ components:
217226
- array
218227
- "null"
219228
type: object
220-
StoreResources:
229+
additionalProperties: false
230+
231+
resources:
221232
items:
222-
$ref: '#/components/schemas/StoreResourceGroup'
233+
$ref: '#/components/schemas/resourceGroup'
223234
type:
224235
- array
225236
- "null"
226-
StoreSource:
237+
238+
source:
227239
properties:
228240
component:
229241
type: string
@@ -239,7 +251,9 @@ components:
239251
format: date-time
240252
type: string
241253
type: object
242-
StoreTargetTeams:
254+
additionalProperties: false
255+
256+
target:
243257
properties:
244258
id:
245259
type: string
@@ -252,3 +266,4 @@ components:
252266
- array
253267
- "null"
254268
type: object
269+
additionalProperties: false

pkg/api/store/cdc/parser.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -291,13 +291,21 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error {
291291
f, err := p.VulnDBClient.Finding(context.Background(), dto.FindingOverwrite.FindingID)
292292
if err != nil {
293293
if errors.IsKind(err, errors.ErrNotFound) {
294-
// This should not happen as distributed txs from API to VulnDB are single
295-
// threaded and we already verified the finding existence in the previous step
296294
return nil
297295
}
298296
}
299-
// TODO: Can we have a conflict in relation with possible concurrent finding state
300-
// changes from API finding overwrite and check processing from vulndb consumer?
297+
// TODO: There can be a race condition here between two concurrent state changes for a
298+
// finding between this finding overwrite and a related check processing from vulndb side.
299+
// Currently this can only generate a conflict if the two concurrent events are a "mark as
300+
// false positive" action initiated from Vulcan API and a finding detection event from the
301+
// vulndb side which contains a fingerprint variation, as in that situation the FALSE POSITIVE
302+
// state "preference" does not apply.
303+
// Example:
304+
// API -> mark as false positive -> VulnDB API
305+
// API -> retrieve finding state -> VulnDB API
306+
// VulnDB -> check processing -> Reopen false positive finding
307+
// VulnDB -> push reopened finding -> Kafka
308+
// API -> push false positive finding -> Kafka
301309
asyncFinding := findingToAsyncFinding(f)
302310
return p.asyncAPI.PushFinding(asyncFinding)
303311
}
@@ -419,13 +427,12 @@ func assetToAsyncAsset(a api.Asset) asyncapi.AssetPayload {
419427
}
420428

421429
func findingToAsyncFinding(f *api.Finding) asyncapi.FindingPayload {
422-
return asyncapi.FindingPayload{
430+
findingPayload := asyncapi.FindingPayload{
423431
AffectedResource: f.Finding.AffectedResource,
424-
CurrentExposure: int(f.Finding.CurrentExposure),
425432
Details: f.Finding.Details,
426433
Id: f.Finding.ID,
427434
ImpactDetails: f.Finding.ImpactDetails,
428-
Issue: &asyncapi.StoreIssueLabels{
435+
Issue: &asyncapi.Issue{
429436
CweId: f.Finding.Issue.CWEID,
430437
Description: f.Finding.Issue.Description,
431438
Id: f.Finding.Issue.ID,
@@ -436,7 +443,7 @@ func findingToAsyncFinding(f *api.Finding) asyncapi.FindingPayload {
436443
},
437444
Resources: []interface{}{f.Finding.Resources},
438445
Score: float64(f.Finding.Score),
439-
Source: &asyncapi.StoreSource{
446+
Source: &asyncapi.Source{
440447
Component: f.Finding.Source.Component,
441448
Id: f.Finding.Source.ID,
442449
Instance: f.Finding.Source.Instance,
@@ -445,11 +452,15 @@ func findingToAsyncFinding(f *api.Finding) asyncapi.FindingPayload {
445452
Time: f.Finding.Source.Time,
446453
},
447454
Status: f.Finding.Status,
448-
Target: &asyncapi.StoreTargetTeams{
455+
Target: &asyncapi.Target{
449456
Id: f.Finding.Target.ID,
450457
Identifier: f.Finding.Target.Identifier,
451458
Teams: []interface{}{f.Finding.Target.Teams},
452459
},
453460
TotalExposure: int(f.Finding.TotalExposure),
454461
}
462+
if f.Finding.OpenFinding != nil {
463+
findingPayload.CurrentExposure = int(f.Finding.OpenFinding.CurrentExposure)
464+
}
465+
return findingPayload
455466
}

pkg/api/store/cdc/parser_test.go

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ type mockVulnDBClient struct {
141141
deleteTeamTagF func(ctx context.Context, authTeam, teamID, tag string) error
142142
deleteTargetTeamF func(ctx context.Context, authTeam, targetID, teamID string) error
143143
deleteTargetTagF func(ctx context.Context, authTeam, targetID, tag string) error
144+
getFindingF func(ctx context.Context, findingID string) (*api.Finding, error)
144145
updateFindingF func(ctx context.Context, findingID string, payload *api.UpdateFinding, tag string) (*api.Finding, error)
145146
}
146147

@@ -162,6 +163,9 @@ func (m *mockVulnDBClient) DeleteTargetTeam(ctx context.Context, authTeam, targe
162163
func (m *mockVulnDBClient) DeleteTargetTag(ctx context.Context, authTeam, targetID, tag string) error {
163164
return m.deleteTargetTagF(ctx, authTeam, targetID, tag)
164165
}
166+
func (m *mockVulnDBClient) Finding(ctx context.Context, findingID string) (*api.Finding, error) {
167+
return m.getFindingF(ctx, findingID)
168+
}
165169
func (m *mockVulnDBClient) UpdateFinding(ctx context.Context, findingID string, payload *api.UpdateFinding, tag string) (*api.Finding, error) {
166170
return m.updateFindingF(ctx, findingID, payload, tag)
167171
}
@@ -196,14 +200,15 @@ func init() {
196200

197201
func TestParse(t *testing.T) {
198202
testCases := []struct {
199-
name string
200-
log []Event
201-
vulnDBClient *mockVulnDBClient
202-
asyncAPI func() (*asyncapi.Vulcan, kafka.Client, error)
203-
loggr *mockLoggr
204-
wantNParsed uint
205-
wantAsyncAssets []testutil.AssetTopicData
206-
wantErr error
203+
name string
204+
log []Event
205+
vulnDBClient *mockVulnDBClient
206+
asyncAPI func() (*asyncapi.Vulcan, kafka.Client, error)
207+
loggr *mockLoggr
208+
wantNParsed uint
209+
wantAsyncAssets []testutil.AssetTopicData
210+
wantAsyncFindings []testutil.FindingTopicData
211+
wantErr error
207212
}{
208213
{
209214
name: "Happy path",
@@ -257,6 +262,12 @@ func TestParse(t *testing.T) {
257262
deleteTargetTagF: func(ctx context.Context, authTeam, targetID, tag string) error {
258263
return nil
259264
},
265+
getFindingF: func(ctx context.Context, findingID string) (*api.Finding, error) {
266+
return &api.Finding{
267+
Finding: vulndb.FindingExpanded{
268+
Finding: vulndb.Finding{ID: "1"}},
269+
}, nil
270+
},
260271
updateFindingF: func(ctx context.Context, findingID string, payload *api.UpdateFinding, tag string) (*api.Finding, error) {
261272
var f = &api.Finding{}
262273
return f, nil
@@ -307,6 +318,26 @@ func TestParse(t *testing.T) {
307318
},
308319
},
309320
},
321+
wantAsyncFindings: []testutil.FindingTopicData{
322+
{
323+
Payload: asyncapi.FindingPayload{
324+
Id: "1",
325+
Issue: &asyncapi.Issue{
326+
Recommendations: []any{nil},
327+
ReferenceLinks: []any{nil},
328+
Labels: []any{nil},
329+
},
330+
Source: &asyncapi.Source{},
331+
Target: &asyncapi.Target{
332+
Teams: []any{nil},
333+
},
334+
Resources: []any{nil},
335+
},
336+
Headers: map[string][]byte{
337+
"version": []byte(asyncapi.Version),
338+
},
339+
},
340+
},
310341
wantNParsed: 6,
311342
},
312343
{
@@ -431,6 +462,8 @@ func TestParse(t *testing.T) {
431462
if nParsed != tc.wantNParsed {
432463
t.Fatalf("expected nParsed to be %d, but got %d", tc.wantNParsed, nParsed)
433464
}
465+
466+
// Verify async assets
434467
topic := kclient.Topics[asyncapi.AssetsEntityName]
435468
gotAssets, err := testutil.ReadAllAssetsTopic(topic)
436469
if err != nil {
@@ -445,6 +478,20 @@ func TestParse(t *testing.T) {
445478
t.Fatalf("want!=got, diff: %s", diff)
446479
}
447480

481+
// Verify async findings
482+
topic = kclient.Topics[asyncapi.FindingsEntityName]
483+
gotFindings, err := testutil.ReadAllFindingsTopic(topic)
484+
if err != nil {
485+
t.Fatalf("error reading findings from kafka %v", err)
486+
}
487+
wantFindings := tc.wantAsyncFindings
488+
sortSlices = cmpopts.SortSlices(func(a, b testutil.FindingTopicData) bool {
489+
return strings.Compare(a.Payload.Id, b.Payload.Id) < 0
490+
})
491+
diff = cmp.Diff(wantFindings, gotFindings, sortSlices)
492+
if diff != "" {
493+
t.Fatalf("want!=got, diff: %s", diff)
494+
}
448495
})
449496
}
450497
}
@@ -464,7 +511,10 @@ func (n nullLogger) Debugf(s string, params ...any) {
464511
}
465512

466513
func newTestAsyncAPI() (*asyncapi.Vulcan, kafka.Client, error) {
467-
topics := map[string]string{asyncapi.AssetsEntityName: "assets"}
514+
topics := map[string]string{
515+
asyncapi.AssetsEntityName: "assets",
516+
asyncapi.FindingsEntityName: "findings",
517+
}
468518
testTopics, err := testutil.PrepareKafka(topics)
469519
if err != nil {
470520
return nil, kafka.Client{}, fmt.Errorf("error creating test topics: %v", err)

pkg/asyncapi/_gen/gen.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ docker run \
2727
/bin/sh -c "
2828
npm install --silent &&
2929
node gen.js "${SOURCE_FILE}" "${GO_PACKAGE_NAME}""
30+
31+
# Add JSON tags
32+
go install github.com/betacraft/easytags@v1.0.2
33+
easytags models.go

pkg/asyncapi/kafka/client.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kafka
77
import (
88
"errors"
99
"fmt"
10+
"time"
1011

1112
"github.com/confluentinc/confluent-kafka-go/kafka"
1213
)
@@ -23,6 +24,8 @@ var (
2324
const (
2425
kafkaSecurityProtocol = "sasl_ssl"
2526
kafkaSaslMechanisms = "SCRAM-SHA-256"
27+
28+
maxDeliveryWait = 30 * time.Second
2629
)
2730

2831
// Client implements an EventStreamClient using Kafka as the event stream
@@ -63,15 +66,18 @@ func (c *Client) Push(entity string, id string, payload []byte, metadata map[str
6366
if !ok {
6467
return ErrUndefinedEntity
6568
}
69+
6670
delivered := make(chan kafka.Event)
6771
defer close(delivered)
72+
6873
var headers []kafka.Header
6974
for k, v := range metadata {
7075
headers = append(headers, kafka.Header{
7176
Key: k,
7277
Value: v,
7378
})
7479
}
80+
7581
msg := kafka.Message{
7682
TopicPartition: kafka.TopicPartition{
7783
Topic: &topic,
@@ -85,7 +91,14 @@ func (c *Client) Push(entity string, id string, payload []byte, metadata map[str
8591
if err != nil {
8692
return fmt.Errorf("error producing message: %w", err)
8793
}
88-
e := <-delivered
94+
95+
var e kafka.Event
96+
select {
97+
case <-time.After(maxDeliveryWait):
98+
return fmt.Errorf("error time out waiting for mssg delivery confirmation")
99+
case e = <-delivered:
100+
}
101+
89102
m := e.(*kafka.Message)
90103
if m.TopicPartition.Error != nil {
91104
return fmt.Errorf("error delivering message: %w", m.TopicPartition.Error)

0 commit comments

Comments
 (0)