Skip to content

Commit c60f54b

Browse files
author
Daniel Jimenez
committed
Push finding overwrite events to asyncapi channel
1 parent 8cdf09c commit c60f54b

File tree

8 files changed

+383
-14
lines changed

8 files changed

+383
-14
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ Those are the variables you have to setup:
9999
|KAFKA_USER||user|
100100
|KAFKA_PASS||supersecret|
101101
|KAFKA_BROKER|if set to empty the Async API will be disabled|kafka.example.com:9094|
102-
|KAFKA_TOPICS|Contains a map, using toml format, mapping entities in the Vulcan async API to the kafka topics they wil be pushed to, by now the only available entity is ``assets`` |[assets = "assets-topic"]|
102+
|KAFKA_TOPICS|Contains a map, using toml format, mapping entities in the Vulcan async API to the kafka topics they wil be pushed to, by now the only available entity is ``assets`` |{assets = "assets-topic", findings = "findings-topic"}|
103103
First we have to build the `vulcan-api` because the build only copies the file.
104104

105105
We need to provide `linux` compiled binary to the docker build command. This won't be necessary when this component has been open sourced.

_resources/config/local.toml.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ key = "a key"
5858
retries = 4
5959
retry_interval = 2 # seconds
6060

61+
[kafka]
62+
user = "user"
63+
pass = "supersecret"
64+
broker = "kafka.example.com:9094"
65+
topics = '{assets = "assets-topic", findings = "findings-topic"}'
66+
6167
[globalpolicy]
6268
# This config policy emulates code policy definition.
6369
[globalpolicy.web-scanning-global]

docs/asyncapi.yaml

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ channels:
1515
subscribe:
1616
message:
1717
$ref: '#/components/messages/asset'
18+
findings:
19+
subscribe:
20+
message:
21+
$ref: '#/components/messages/FindingPayload'
1822

1923
components:
2024
messages:
@@ -29,6 +33,21 @@ components:
2933
contentType: application/json
3034
payload:
3135
$ref: "#/components/schemas/assetPayload"
36+
FindingPayload:
37+
contentType: application/json
38+
headers:
39+
properties:
40+
version:
41+
description: schema version header
42+
type: string
43+
required:
44+
- version
45+
type: object
46+
payload:
47+
$ref: '#/components/schemas/FindingPayload'
48+
summary: Events generated from Vulnerability DB findings state changes
49+
name: Finding
50+
title: Findings state
3251

3352
schemas:
3453
assetMetadata:
@@ -123,3 +142,113 @@ components:
123142
- name
124143
- description
125144
- tag
145+
146+
FindingPayload:
147+
properties:
148+
affected_resource:
149+
type: string
150+
current_exposure:
151+
type: integer
152+
details:
153+
type: string
154+
id:
155+
type: string
156+
impact_details:
157+
type: string
158+
issue:
159+
$ref: '#/components/schemas/StoreIssueLabels'
160+
resources:
161+
$ref: '#/components/schemas/StoreResources'
162+
score:
163+
type: number
164+
source:
165+
$ref: '#/components/schemas/StoreSource'
166+
status:
167+
type: string
168+
target:
169+
$ref: '#/components/schemas/StoreTargetTeams'
170+
total_exposure:
171+
type: integer
172+
type: object
173+
PqStringArray:
174+
items:
175+
type: string
176+
type:
177+
- array
178+
- "null"
179+
StoreIssueLabels:
180+
properties:
181+
cwe_id:
182+
minimum: 0
183+
type: integer
184+
description:
185+
type: string
186+
id:
187+
type: string
188+
labels:
189+
items:
190+
type: string
191+
type:
192+
- array
193+
- "null"
194+
recommendations:
195+
$ref: '#/components/schemas/PqStringArray'
196+
reference_links:
197+
$ref: '#/components/schemas/PqStringArray'
198+
summary:
199+
type: string
200+
type: object
201+
StoreResourceGroup:
202+
properties:
203+
attributes:
204+
items:
205+
type: string
206+
type:
207+
- array
208+
- "null"
209+
name:
210+
type: string
211+
resources:
212+
items:
213+
additionalProperties:
214+
type: string
215+
type: object
216+
type:
217+
- array
218+
- "null"
219+
type: object
220+
StoreResources:
221+
items:
222+
$ref: '#/components/schemas/StoreResourceGroup'
223+
type:
224+
- array
225+
- "null"
226+
StoreSource:
227+
properties:
228+
component:
229+
type: string
230+
id:
231+
type: string
232+
instance:
233+
type: string
234+
name:
235+
type: string
236+
options:
237+
type: string
238+
time:
239+
format: date-time
240+
type: string
241+
type: object
242+
StoreTargetTeams:
243+
properties:
244+
id:
245+
type: string
246+
identifier:
247+
type: string
248+
teams:
249+
items:
250+
type: string
251+
type:
252+
- array
253+
- "null"
254+
type: object

local.env.example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,4 @@ GPC_1_NAME=web-scanning-global
4141
KAFKA_USER=user
4242
KAFKA_PASS=supersecret
4343
KAFKA_BROKER=kafka.example.com:9094
44-
KAFKA_TOPICS={assets = "assets-topic"}
44+
KAFKA_TOPICS={assets = "assets-topic", findings = "findings-topic"}

pkg/api/store/cdc/parser.go

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type AsyncTxParser struct {
5858
type AsyncAPI interface {
5959
PushAsset(asset asyncapi.AssetPayload) error
6060
DeleteAsset(asset asyncapi.AssetPayload) error
61+
PushFinding(finding asyncapi.FindingPayload) error
6162
}
6263

6364
// NewAsyncTxParser builds a new CDC log parser to handle distributed
@@ -241,12 +242,7 @@ func (p *AsyncTxParser) processUpdateAsset(data []byte) error {
241242
return errInvalidData
242243
}
243244
asyncAsset := assetToAsyncAsset(dto.NewAsset)
244-
err = p.asyncAPI.PushAsset(asyncAsset)
245-
if err != nil {
246-
return err
247-
}
248-
249-
return nil
245+
return p.asyncAPI.PushAsset(asyncAsset)
250246
}
251247

252248
func (p *AsyncTxParser) processDeleteAllAssets(data []byte) error {
@@ -276,6 +272,7 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error {
276272
return errInvalidData
277273
}
278274

275+
// Update finding in vulndb
279276
_, err = p.VulnDBClient.UpdateFinding(
280277
context.Background(),
281278
dto.FindingOverwrite.FindingID,
@@ -289,7 +286,20 @@ func (p *AsyncTxParser) processFindingOverwrite(data []byte) error {
289286
}
290287
return err
291288
}
292-
return nil
289+
290+
// Retrieve current finding status and push event
291+
f, err := p.VulnDBClient.Finding(context.Background(), dto.FindingOverwrite.FindingID)
292+
if err != nil {
293+
if errors.IsKind(err, errors.ErrNotFound) {
294+
// This should not happen as distributed txs from API to VulnDB are single
295+
// threaded and we already verified the finding existence in the previous step
296+
return nil
297+
}
298+
}
299+
// TODO: Can we have a conflict in relation with possible concurrent finding state
300+
// changes from API finding overwrite and check processing from vulndb consumer?
301+
asyncFinding := findingToAsyncFinding(f)
302+
return p.asyncAPI.PushFinding(asyncFinding)
293303
}
294304

295305
// processMergeDiscoveredAssets performs the following actions:
@@ -407,3 +417,39 @@ func assetToAsyncAsset(a api.Asset) asyncapi.AssetPayload {
407417
}
408418
return asyncAsset
409419
}
420+
421+
func findingToAsyncFinding(f *api.Finding) asyncapi.FindingPayload {
422+
return asyncapi.FindingPayload{
423+
AffectedResource: f.Finding.AffectedResource,
424+
CurrentExposure: int(f.Finding.CurrentExposure),
425+
Details: f.Finding.Details,
426+
Id: f.Finding.ID,
427+
ImpactDetails: f.Finding.ImpactDetails,
428+
Issue: &asyncapi.StoreIssueLabels{
429+
CweId: f.Finding.Issue.CWEID,
430+
Description: f.Finding.Issue.Description,
431+
Id: f.Finding.Issue.ID,
432+
Labels: []interface{}{f.Finding.Issue.Labels},
433+
Recommendations: []interface{}{f.Finding.Issue.Recommendations},
434+
ReferenceLinks: []interface{}{f.Finding.Issue.ReferenceLinks},
435+
Summary: f.Finding.Issue.Summary,
436+
},
437+
Resources: []interface{}{f.Finding.Resources},
438+
Score: float64(f.Finding.Score),
439+
Source: &asyncapi.StoreSource{
440+
Component: f.Finding.Source.Component,
441+
Id: f.Finding.Source.ID,
442+
Instance: f.Finding.Source.Instance,
443+
Name: f.Finding.Source.Name,
444+
Options: f.Finding.Source.Options,
445+
Time: f.Finding.Source.Time,
446+
},
447+
Status: f.Finding.Status,
448+
Target: &asyncapi.StoreTargetTeams{
449+
Id: f.Finding.Target.ID,
450+
Identifier: f.Finding.Target.Identifier,
451+
Teams: []interface{}{f.Finding.Target.Teams},
452+
},
453+
TotalExposure: int(f.Finding.TotalExposure),
454+
}
455+
}

pkg/asyncapi/models.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,64 @@ type Annotation struct {
4343
Key string
4444
Value string
4545
}
46+
47+
// FindingPayload represents a FindingPayload model.
48+
type FindingPayload struct {
49+
AffectedResource string
50+
CurrentExposure int
51+
Details string
52+
Id string
53+
ImpactDetails string
54+
Issue *StoreIssueLabels
55+
Resources []interface{}
56+
Score float64
57+
Source *StoreSource
58+
Status string
59+
Target *StoreTargetTeams
60+
TotalExposure int
61+
AdditionalProperties map[string][]interface{}
62+
}
63+
64+
// StoreIssueLabels represents a StoreIssueLabels model.
65+
type StoreIssueLabels struct {
66+
CweId int
67+
Description string
68+
Id string
69+
Labels []interface{}
70+
Recommendations []interface{}
71+
ReferenceLinks []interface{}
72+
Summary string
73+
AdditionalProperties map[string][]interface{}
74+
}
75+
76+
// StoreResourceGroup represents a StoreResourceGroup model.
77+
type StoreResourceGroup struct {
78+
Attributes []interface{}
79+
Name string
80+
Resources []interface{}
81+
AdditionalProperties map[string][]interface{}
82+
}
83+
84+
// AnonymousSchema33 represents a AnonymousSchema33 model.
85+
type AnonymousSchema33 struct {
86+
AdditionalProperties map[string]string
87+
}
88+
89+
// StoreSource represents a StoreSource model.
90+
type StoreSource struct {
91+
Component string
92+
Id string
93+
Instance string
94+
Name string
95+
Options string
96+
Time string
97+
AdditionalProperties map[string][]interface{}
98+
}
99+
100+
// StoreTargetTeams represents a StoreTargetTeams model.
101+
type StoreTargetTeams struct {
102+
Id string
103+
Identifier string
104+
Teams []interface{}
105+
AdditionalProperties map[string][]interface{}
106+
}

pkg/asyncapi/vulcan.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,15 @@ func (l LevelLogger) Debugf(s string, params ...any) {
3939
level.Debug(l.Logger).Log("log", v)
4040
}
4141

42-
// AssetsEntityName defines the key for the assets entity used by an [EventStreamClient] to
43-
// determine the topic where the assets are send.
44-
const AssetsEntityName = "assets"
42+
const (
43+
// AssetsEntityName defines the key for the assets entity used by an [EventStreamClient] to
44+
// determine the topic where the assets are sent.
45+
AssetsEntityName = "assets"
46+
47+
// FindingsEntityName defines the key for the findings entity used by an [EventStreamClient] to
48+
// determine the topic where the findings are sent.
49+
FindingsEntityName = "findings"
50+
)
4551

4652
// Vulcan implements the asynchorus API of Vulcan.
4753
type Vulcan struct {
@@ -104,6 +110,25 @@ func (v *Vulcan) DeleteAsset(asset AssetPayload) error {
104110
return err
105111
}
106112

113+
// PushFinding publishes the state of a finding in the current point of time
114+
// to the underlying [EventStreamClient].
115+
func (v *Vulcan) PushFinding(finding FindingPayload) error {
116+
v.logger.Debugf("pushing finding %+v", finding)
117+
payload, err := json.Marshal(finding)
118+
if err != nil {
119+
return fmt.Errorf("error marshaling to json: %w", err)
120+
}
121+
metadata := map[string][]byte{
122+
"version": []byte(Version),
123+
}
124+
err = v.client.Push(FindingsEntityName, finding.Id, payload, metadata)
125+
if err != nil {
126+
return fmt.Errorf("error pushing finding %v: %w", finding, err)
127+
}
128+
v.logger.Debugf("finding pushed %+v", finding)
129+
return nil
130+
}
131+
107132
// NullVulcan implements an Async Vulcan API interface that does not send the
108133
// events to any [EventStreamClient]. It's intended to be used when the async
109134
// API is disabled but other components still need to fullfill a dependency
@@ -123,6 +148,12 @@ func (v *NullVulcan) PushAsset(asset AssetPayload) error {
123148
return nil
124149
}
125150

151+
// PushFinding acepts an event indicating that a finding has been modified or
152+
// created and just ignores it.
153+
func (v *NullVulcan) PushFinding(finding FindingPayload) error {
154+
return nil
155+
}
156+
126157
func metadata(asset AssetPayload) map[string][]byte {
127158
// The asset type can't be nil.
128159
return map[string][]byte{

0 commit comments

Comments
 (0)