I want to implement Kafka Topic which sends and receives Serialized Java Objects based on this example.
I tried this:
Producer Config:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> requestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> requestFactoryKafkaTemplate() {
return new KafkaTemplate<>(requestFactoryProducerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.reply");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate(ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
ConcurrentMessageListenerContainer<String, Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
return requestReplyKafkaTemplate;
}
}
Producer:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private static final Logger LOG = LoggerFactory.getLogger(CheckoutController.class);
private KafkaTemplate<String, Object> requestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate;
@Autowired
public CheckoutController(KafkaTemplate<String, Object> requestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate){
this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("sale_test")
public void performSaleTest() throws ExecutionException, InterruptedException, TimeoutException {
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
@PostMapping("authorize_test")
public void performAuthTest() throws ExecutionException, InterruptedException, TimeoutException {
AuthRequestFactory obj = new AuthRequestFactory();
obj.setId(140);
ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
ObjectFactoryDeserializer
public class ObjectFactoryDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte() data) {
return null;
}
@Override
public Object deserialize(String topic, Headers headers, byte() data) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}
}
ObjectFactorySerializer
public class ObjectFactorySerializer implements Serializer<Object> {
@Override
public byte() serialize(String topic, Object data) {
return null;
}
@Override
public byte() serialize(String topic, Headers headers, Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(data);
return baos.toByteArray();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Consumer configuration:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.request");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ProducerFactory<String, Object> saleResponseFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
return factory;
}
@Bean
public KafkaTemplate<String, Object> saleResponseFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
}
}
Consumer
@Component
@KafkaListener(id = "tp-sale.request", topics = "tp-sale.request")
public class ConsumerListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);
@KafkaHandler
@SendTo("tp-sale.reply")
public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);
AuthResponseFactory resObj = new AuthResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
@KafkaHandler
@SendTo("tp-sale.reply")
public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
Full minimal working example
When I hit the endpoint authorize_test
the code is working fine.
When I hit the endpoint sale_test
the code is working fine.
Do you know how I can improve this proof of concept? Do you see any workflows into the code that I need to improve?