Skip to content

Introduce "FilteringSinkAdapter" in SQS integration #1373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
MatejNedic opened this issue Apr 20, 2025 · 3 comments
Open

Introduce "FilteringSinkAdapter" in SQS integration #1373

MatejNedic opened this issue Apr 20, 2025 · 3 comments
Labels
component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration

Comments

@MatejNedic
Copy link
Member

Sometimes when using @SqsListener() I would like to filter the messages before entering the method.

Good example is when messages have ttl (Time-to-live) or when we are no longer interested in certain events due business logic. Instead of polluting and replicating this logic every time in @SqsListener() it would be better if we could extract it outside in FilteringSinkAdapter.

This way our integration would provide users a clean way to filter and choose which message is processed and which is not.

@MatejNedic MatejNedic added the component: sqs SQS integration related issue label Apr 20, 2025
@tomazfernandes tomazfernandes added status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration labels Apr 25, 2025
@imsosleepy
Copy link
Contributor

imsosleepy commented May 10, 2025

Hi @MatejNedic, @tomazfernandes , I'm interested in working on this issue.
Before starting the actual implementation, I wanted to check in and get some clarification on the intended direction.

I currently have two possible approaches in mind:

  1. Creating a filter interface and allowing users to register custom filters as beans, which can then be applied before processing the message. example

  2. Adding a new option to @SqsListener, such as @SqsListener(filterStrategy = "ttlFilter"), to specify the filter in a more declarative way.

I initially started working on the first approach, but I realized that the second one could also be effective, especially for ease of configuration.

Would you be able to share your thoughts or suggest a preferred direction?

@imsosleepy
Copy link
Contributor

Hi @MatejNedic, @tomazfernandes , I'm interested in working on this issue. Before starting the actual implementation, I wanted to check in and get some clarification on the intended direction.

I currently have two possible approaches in mind:

  1. Creating a filter interface and allowing users to register custom filters as beans, which can then be applied before processing the message. example
  2. Adding a new option to @SqsListener, such as @SqsListener(filterStrategy = "ttlFilter"), to specify the filter in a more declarative way.

I initially started working on the first approach, but I realized that the second one could also be effective, especially for ease of configuration.

Would you be able to share your thoughts or suggest a preferred direction?

I realized that if I go with the first approach, the filter would be configured globally, which means that multiple listeners wouldn’t be able to have their own individual filters. This makes it less suitable. I'll proceed with the second approach and submit a PR.

@tomazfernandes
Copy link
Contributor

Hi @imsosleepy, thanks for looking into this!

There are a few things for us to consider.

First is what we need from the interface - we might want to have both a single message and a batch message methods.

I think Sink is an implementation detail that doesn't need to be part of the user facing API - how about we call the interface something like MessageFilter?

The best place for configuring this is likely ContainerOptions, we can add autoconfiguration for it later.

One other aspect to consider is whether we need to support async variants for filtering as well - this might be useful is an user needs to e.g. make a call to AWS to decide whether to filter or not.

Please let me know your thoughts, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration
Projects
None yet
Development

No branches or pull requests

3 participants