Skip to content

Commit 67c2a9e

Browse files
authored
Merge pull request #905 from aws-powertools/feature/kafka-consumer
chore: Feature/kafka consumer
2 parents f4ea35d + 397a476 commit 67c2a9e

File tree

83 files changed

+7671
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+7671
-1
lines changed

.github/workflows/codeql-analysis.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ jobs:
3939
with:
4040
languages: ${{ matrix.language }}
4141

42+
- name: Install global tools
43+
run: dotnet tool install --global Apache.Avro.Tools
44+
4245
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
4346
# If this step fails, then you should remove it and run the build manually (see below)
4447
- name: Autobuild

.github/workflows/examples-tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ jobs:
3333
- name: Install dependencies
3434
run: dotnet restore
3535

36+
- name: Install global tools
37+
run: dotnet tool install --global Apache.Avro.Tools
38+
3639
- name: Build
3740
run: dotnet build --configuration Release --no-restore /tl
3841

examples/Kafka/Avro/src/Avro.csproj

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<OutputType>Exe</OutputType>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<GenerateRuntimeConfigurationFiles>true</GenerateRuntimeConfigurationFiles>
8+
<AWSProjectType>Lambda</AWSProjectType>
9+
<!-- This property makes the build directory similar to a publish directory and helps the AWS .NET Lambda Mock Test Tool find project dependencies. -->
10+
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
11+
<!-- Generate ready to run images during publishing to improve cold start time. -->
12+
<PublishReadyToRun>true</PublishReadyToRun>
13+
14+
<AssemblyName>Avro.Example</AssemblyName>
15+
16+
</PropertyGroup>
17+
<ItemGroup>
18+
<PackageReference Include="Amazon.Lambda.RuntimeSupport" Version="1.12.0"/>
19+
<PackageReference Include="Amazon.Lambda.Core" Version="2.5.0"/>
20+
<PackageReference Include="Amazon.Lambda.Serialization.SystemTextJson" Version="2.4.4"/>
21+
<PackageReference Include="AWS.Lambda.Powertools.Logging" Version="2.0.0" />
22+
</ItemGroup>
23+
<ItemGroup>
24+
<ProjectReference Include="..\..\..\..\libraries\src\AWS.Lambda.Powertools.Kafka.Avro\AWS.Lambda.Powertools.Kafka.Avro.csproj" />
25+
</ItemGroup>
26+
<Target Name="GenerateAvroClasses" BeforeTargets="CoreCompile">
27+
<Exec Command="avrogen -s $(ProjectDir)CustomerProfile.avsc $(ProjectDir)Generated"/>
28+
</Target>
29+
<ItemGroup>
30+
<None Remove="kafka-avro-event.json" />
31+
<Content Include="kafka-avro-event.json">
32+
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
33+
</Content>
34+
</ItemGroup>
35+
</Project>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{
2+
"type": "record",
3+
"name": "CustomerProfile",
4+
"namespace": "com.example",
5+
"fields": [
6+
{"name": "user_id", "type": "string"},
7+
{"name": "full_name", "type": "string"},
8+
{"name": "email", "type": {
9+
"type": "record",
10+
"name": "EmailAddress",
11+
"fields": [
12+
{"name": "address", "type": "string"},
13+
{"name": "verified", "type": "boolean"},
14+
{"name": "primary", "type": "boolean"}
15+
]
16+
}},
17+
{"name": "age", "type": "int"},
18+
{"name": "address", "type": {
19+
"type": "record",
20+
"name": "Address",
21+
"fields": [
22+
{"name": "street", "type": "string"},
23+
{"name": "city", "type": "string"},
24+
{"name": "state", "type": "string"},
25+
{"name": "country", "type": "string"},
26+
{"name": "zip_code", "type": "string"}
27+
]
28+
}},
29+
{"name": "phone_numbers", "type": {
30+
"type": "array",
31+
"items": {
32+
"type": "record",
33+
"name": "PhoneNumber",
34+
"fields": [
35+
{"name": "number", "type": "string"},
36+
{"name": "type", "type": {"type": "enum", "name": "PhoneType", "symbols": ["HOME", "WORK", "MOBILE"]}}
37+
]
38+
}
39+
}},
40+
{"name": "preferences", "type": {
41+
"type": "map",
42+
"values": "string"
43+
}},
44+
{"name": "account_status", "type": {"type": "enum", "name": "AccountStatus", "symbols": ["ACTIVE", "INACTIVE", "SUSPENDED"]}}
45+
]
46+
}

examples/Kafka/Avro/src/Function.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using Amazon.Lambda.Core;
2+
using Amazon.Lambda.RuntimeSupport;
3+
using AWS.Lambda.Powertools.Kafka;
4+
using AWS.Lambda.Powertools.Kafka.Avro;
5+
using AWS.Lambda.Powertools.Logging;
6+
using com.example;
7+
8+
string Handler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
9+
{
10+
foreach (var record in records)
11+
{
12+
Logger.LogInformation("Record Value: {@record}", record.Value);
13+
}
14+
15+
return "Processed " + records.Count() + " records";
16+
}
17+
18+
await LambdaBootstrapBuilder.Create((Func<ConsumerRecords<string, CustomerProfile>, ILambdaContext, string>?)Handler,
19+
new PowertoolsKafkaAvroSerializer()) // Use PowertoolsKafkaAvroSerializer for Avro serialization
20+
.Build()
21+
.RunAsync();
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// ------------------------------------------------------------------------------
2+
// <auto-generated>
3+
// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
4+
// Changes to this file may cause incorrect behavior and will be lost if code
5+
// is regenerated
6+
// </auto-generated>
7+
// ------------------------------------------------------------------------------
8+
namespace com.example
9+
{
10+
using System;
11+
using System.Collections.Generic;
12+
using System.Text;
13+
using global::Avro;
14+
using global::Avro.Specific;
15+
16+
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
17+
public enum AccountStatus
18+
{
19+
ACTIVE,
20+
INACTIVE,
21+
SUSPENDED,
22+
}
23+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// ------------------------------------------------------------------------------
2+
// <auto-generated>
3+
// Generated by avrogen, version 1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e
4+
// Changes to this file may cause incorrect behavior and will be lost if code
5+
// is regenerated
6+
// </auto-generated>
7+
// ------------------------------------------------------------------------------
8+
namespace com.example
9+
{
10+
using System;
11+
using System.Collections.Generic;
12+
using System.Text;
13+
using global::Avro;
14+
using global::Avro.Specific;
15+
16+
[global::System.CodeDom.Compiler.GeneratedCodeAttribute("avrogen", "1.12.0+8c27801dc8d42ccc00997f25c0b8f45f8d4a233e")]
17+
public partial class Address : global::Avro.Specific.ISpecificRecord
18+
{
19+
public static global::Avro.Schema _SCHEMA = global::Avro.Schema.Parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"st" +
20+
"reet\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"s" +
21+
"tring\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"zip_code\",\"type\":\"string\"}]}" +
22+
"");
23+
private string _street;
24+
private string _city;
25+
private string _state;
26+
private string _country;
27+
private string _zip_code;
28+
public virtual global::Avro.Schema Schema
29+
{
30+
get
31+
{
32+
return Address._SCHEMA;
33+
}
34+
}
35+
public string street
36+
{
37+
get
38+
{
39+
return this._street;
40+
}
41+
set
42+
{
43+
this._street = value;
44+
}
45+
}
46+
public string city
47+
{
48+
get
49+
{
50+
return this._city;
51+
}
52+
set
53+
{
54+
this._city = value;
55+
}
56+
}
57+
public string state
58+
{
59+
get
60+
{
61+
return this._state;
62+
}
63+
set
64+
{
65+
this._state = value;
66+
}
67+
}
68+
public string country
69+
{
70+
get
71+
{
72+
return this._country;
73+
}
74+
set
75+
{
76+
this._country = value;
77+
}
78+
}
79+
public string zip_code
80+
{
81+
get
82+
{
83+
return this._zip_code;
84+
}
85+
set
86+
{
87+
this._zip_code = value;
88+
}
89+
}
90+
public virtual object Get(int fieldPos)
91+
{
92+
switch (fieldPos)
93+
{
94+
case 0: return this.street;
95+
case 1: return this.city;
96+
case 2: return this.state;
97+
case 3: return this.country;
98+
case 4: return this.zip_code;
99+
default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Get()");
100+
};
101+
}
102+
public virtual void Put(int fieldPos, object fieldValue)
103+
{
104+
switch (fieldPos)
105+
{
106+
case 0: this.street = (System.String)fieldValue; break;
107+
case 1: this.city = (System.String)fieldValue; break;
108+
case 2: this.state = (System.String)fieldValue; break;
109+
case 3: this.country = (System.String)fieldValue; break;
110+
case 4: this.zip_code = (System.String)fieldValue; break;
111+
default: throw new global::Avro.AvroRuntimeException("Bad index " + fieldPos + " in Put()");
112+
};
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)