-
Notifications
You must be signed in to change notification settings - Fork 4
Kafka Schema Registry Removal #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- .:/app | ||
- /app/node_modules | ||
extra_hosts: | ||
- "host.docker.internal:host-gateway" No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is recommended to ensure that the file ends with a newline character to avoid potential issues with certain tools or systems that might expect it.
|
||
export interface Config { | ||
APP: AppConfig; | ||
KAFKA: KafkaConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of SchemaConfig
from the Config
interface suggests that the schema registry is no longer needed. Ensure that all dependencies and usages of SchemaConfig
throughout the codebase are also removed or updated accordingly to prevent any runtime errors or undefined behavior.
} | ||
|
||
private async initializeSchemas(): Promise<void> { | ||
private encodeMessage(message: unknown): Buffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function encodeMessage
is introduced to replace the schema encoding logic. Ensure that the message
parameter is properly validated before encoding to prevent runtime errors due to unexpected data types.
} | ||
|
||
private async refreshSchemaId(topic: string): Promise<number> { | ||
private decodeMessage(buffer: Buffer): unknown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function decodeMessage
is introduced to replace the schema decoding logic. Consider validating the buffer
input to ensure it contains valid JSON data before attempting to parse it.
} | ||
} | ||
|
||
async sendMessage(topic: string, message: unknown): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sendMessage
function now uses encodeMessage
directly. Ensure that the message
passed to this function is always serializable to JSON to prevent encoding errors.
} | ||
|
||
const encodedValue = await this.schemaUtils.encode(message, schemaId); | ||
const encodedValue = this.encodeMessage(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method encodeMessage
is used here, but it's not clear from the diff if it handles schema encoding correctly. Ensure that encodeMessage
includes all necessary logic for encoding messages without relying on schema IDs, as the previous implementation did.
})), | ||
); | ||
const encodedMessages = messages.map((message) => ({ | ||
value: this.encodeMessage(message), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The encodeMessage
function should be checked to ensure it handles all necessary encoding scenarios previously managed by schemaUtils.encode
. If encodeMessage
does not provide equivalent functionality, it may lead to issues in message processing.
headers: { | ||
'correlation-id': correlationId, | ||
timestamp: Date.now().toString(), | ||
'content-type': 'application/json', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addition of the 'content-type': 'application/json'
header should be verified to ensure it aligns with the expected message format and does not conflict with any existing message processing logic.
const decodedMessage = (await this.schemaUtils.decode( | ||
message.value, | ||
)) as Record<string, unknown>; | ||
const decodedMessage = this.decodeMessage(message.value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method decodeMessage
is being used here, but it's not clear from the diff if this method handles all the necessary decoding logic that was previously managed by this.schemaUtils.decode
. Ensure that decodeMessage
provides equivalent functionality and handles all edge cases that this.schemaUtils.decode
did.
No description provided.