spring – Implement Kafka Communication with Sterilized Java objects

I want to implement Kafka Topic which sends and receives Serialized Java Objects based on this example.

I tried this:

Producer Config:

    @Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Object> requestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> requestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(requestFactoryProducerFactory());
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.reply");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate(ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
        ConcurrentMessageListenerContainer<String, Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
        ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
        requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
        return requestReplyKafkaTemplate;
    }
}

Producer:

@RestController
@RequestMapping("/checkout")
public class CheckoutController {

    private static final Logger LOG = LoggerFactory.getLogger(CheckoutController.class);

    private KafkaTemplate<String, Object> requestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate;

    @Autowired
    public CheckoutController(KafkaTemplate<String, Object> requestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, Object, Object> requestReplyKafkaTemplate){
        this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("sale_test")
    public void performSaleTest() throws ExecutionException, InterruptedException, TimeoutException {

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
        RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }

    @PostMapping("authorize_test")
    public void performAuthTest() throws ExecutionException, InterruptedException, TimeoutException {

        AuthRequestFactory obj = new AuthRequestFactory();
        obj.setId(140);

        ProducerRecord<String, Object> record = new ProducerRecord<>("tp-sale.request", obj);
        RequestReplyFuture<String, Object, Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());


    }
}

ObjectFactoryDeserializer

    public class ObjectFactoryDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte() data) {
        return null;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte() data) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
            return ois.readObject();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

}

ObjectFactorySerializer

public class ObjectFactorySerializer implements Serializer<Object> {

    @Override
    public byte() serialize(String topic, Object data) {
        return null;
    }

    @Override
    public byte() serialize(String topic, Headers headers, Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(data);
            return baos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

}

Consumer configuration:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "tp-sale.request");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ProducerFactory<String, Object> saleResponseFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
        return factory;
    }

    @Bean
    public KafkaTemplate<String, Object> saleResponseFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
    }

}

Consumer

@Component
@KafkaListener(id = "tp-sale.request", topics = "tp-sale.request")
public class ConsumerListener {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
        System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);

        AuthResponseFactory resObj = new AuthResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }

    @KafkaHandler
    @SendTo("tp-sale.reply")
    public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
        System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

Full minimal working example

When I hit the endpoint authorize_test the code is working fine.

When I hit the endpoint sale_test the code is working fine.

Do you know how I can improve this proof of concept? Do you see any workflows into the code that I need to improve?