@@ -207,9 +207,10 @@ void Kafka::KafkaController::add_producer(const KafkaProducerMetadata &metadata,
207
207
208
208
RdKafka::Producer *producer = create_producer (metadata.brokers , errstr, *deliveryReportCb, *loggerCb, m_log_level);
209
209
210
- if (!producer) {
210
+ if (!producer)
211
+ {
211
212
// std::cerr << "Failed to create producer: " << errstr << std::endl;
212
- if (m_log_callback)
213
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY )
213
214
m_log_callback (LogLevel::EMERGENCY, " Failed to create producer: " + errstr);
214
215
return ;
215
216
}
@@ -220,23 +221,29 @@ void Kafka::KafkaController::add_producer(const KafkaProducerMetadata &metadata,
220
221
221
222
if (message.err ())
222
223
{
223
- if (m_log_callback)
224
+ if (m_log_callback && m_log_level <= LogLevel::ERROR )
224
225
m_log_callback (LogLevel::ERROR, " Message delivery failed: " + RdKafka::err2str (message.err ()));
225
226
} else {
226
- if (m_log_callback && m_log_level > = LogLevel::DEBUG)
227
+ if (m_log_callback && m_log_level < = LogLevel::DEBUG)
227
228
m_log_callback (LogLevel::DEBUG, " Message delivered to partition " + std::to_string (message.partition ()) + " at offset " + std::to_string (message.offset ()));
228
229
}
229
230
});
230
231
loggerCb->set_callback ([this ](const LogLevel logLevel, const std::string &message) {
231
- if (m_log_callback && m_log_level > = logLevel)
232
+ if (m_log_callback && m_log_level < = logLevel)
232
233
m_log_callback (logLevel, message);
233
234
});
234
235
235
236
// Create topic handle
236
237
RdKafka::Topic *rd_topic = RdKafka::Topic::create (producer, metadata.topic , nullptr , errstr);
237
- if (m_log_callback && m_log_level >= LogLevel::DEBUG)
238
+ if (!rd_topic) {
239
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY)
240
+ m_log_callback (LogLevel::EMERGENCY, " Failed to create topic: " + errstr);
241
+ return ;
242
+ }
243
+ if (m_log_callback && m_log_level <= LogLevel::DEBUG)
238
244
m_log_callback (LogLevel::DEBUG, " Created topic: " + metadata.topic );
239
245
246
+
240
247
// Create shared objects.
241
248
std::shared_ptr<RdKafka::Producer> producer_shared (producer);
242
249
std::shared_ptr<RdKafka::Topic> topic_shared (rd_topic);
@@ -248,6 +255,12 @@ void Kafka::KafkaController::add_producer(const KafkaProducerMetadata &metadata,
248
255
loggerCb,
249
256
deliveryReportCb
250
257
);
258
+ if (!producerContext)
259
+ {
260
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY)
261
+ m_log_callback (LogLevel::EMERGENCY, " Failed to create producer context." );
262
+ return ;
263
+ }
251
264
252
265
// Create a vector, if it doesn't exist.
253
266
if (m_producers.find (channel) == m_producers.end ()) {
@@ -277,9 +290,9 @@ void Kafka::KafkaController::add_consumer(const KafkaConsumerMetadata &metadata,
277
290
278
291
// Create the consumer.
279
292
RdKafka::KafkaConsumer *consumer = create_consumer (metadata, errstr, *rebalance_cb, *loggerCb, m_log_level);
280
-
281
- if (!consumer) {
282
- if (m_log_callback)
293
+ if (!consumer)
294
+ {
295
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY )
283
296
m_log_callback (LogLevel::EMERGENCY, " Failed to create consumer: " + errstr);
284
297
return ;
285
298
}
@@ -294,23 +307,42 @@ void Kafka::KafkaController::add_consumer(const KafkaConsumerMetadata &metadata,
294
307
295
308
if (err != RdKafka::ERR_NO_ERROR)
296
309
{
297
- if (m_log_callback)
310
+ if (m_log_callback && m_log_level <= LogLevel::ERROR )
298
311
m_log_callback (LogLevel::ERROR, " Rebalance callback: " + RdKafka::err2str (err));
299
312
}
300
313
});
301
314
loggerCb->set_callback ([this ](const LogLevel logLevel, const std::string &message) {
302
- if (m_log_callback)
315
+ if (m_log_callback && m_log_level <= logLevel )
303
316
m_log_callback (logLevel, message);
304
317
});
305
318
306
319
// Subscribe.
307
- consumer->subscribe (metadata.topics );
320
+ if (consumer->subscribe (metadata.topics ) != RdKafka::ERR_NO_ERROR) {
321
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY)
322
+ m_log_callback (LogLevel::EMERGENCY, " Failed to subscribe to topics." );
323
+ return ;
324
+ }
308
325
309
326
// Create shared objects.
310
327
std::shared_ptr<RdKafka::KafkaConsumer> consumer_shared (consumer);
328
+ if (!consumer_shared) {
329
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY)
330
+ m_log_callback (LogLevel::EMERGENCY, " Failed to create consumer shared." );
331
+ return ;
332
+ }
311
333
std::shared_ptr<InternalLogger> loggerCb_shared (loggerCb);
334
+ if (!loggerCb_shared) {
335
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY)
336
+ m_log_callback (LogLevel::EMERGENCY, " Failed to create loggerCb shared." );
337
+ return ;
338
+ }
312
339
std::shared_ptr<InternalRebalanceCb> rebalance_cb_shared (rebalance_cb);
313
-
340
+ if (!rebalance_cb_shared) {
341
+ if (m_log_callback && m_log_level <= LogLevel::EMERGENCY)
342
+ m_log_callback (LogLevel::EMERGENCY, " Failed to create rebalance_cb shared." );
343
+ return ;
344
+ }
345
+
314
346
// Create Context, if it doesn't exist.
315
347
if (m_consumers.find (channel) == m_consumers.end ())
316
348
{
@@ -390,7 +422,7 @@ bool Kafka::KafkaController::send(const uint32_t channel, const void *data, cons
390
422
producer->get_producer ()->flush (MAX_TIMEOUT_MS);
391
423
392
424
if (err) {
393
- if (m_log_callback)
425
+ if (m_log_callback && m_log_level <= LogLevel::ERROR )
394
426
m_log_callback (LogLevel::ERROR, " Failed to produce message: " + RdKafka::err2str (err));
395
427
396
428
return false ;
0 commit comments