Need a solution to write a data on RabbitMQ using AmqpWriter
and read the data using RabbitMQ using AmqpReader
. We're not looking for Apache Kafka, we want to simply send say Program details and consume it.
Writer Code
public class JobConfig {
private StepBuilderFactory stepBuilderFactory;
private JobBuilderFactory jobBuilderFactory;
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
public Queue myQueue() {
return new Queue("myqueue");
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
return reader;
public AmqpItemWriter<Customer> amqpWriter(){
AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
return amqpItemWriter;
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String firstName;
private String lastName;
private String birthdate;
public class SpringBatchAmqpApplication {
public static void main(String[] args) {, args);
Reader code
public class JobConfiguration {
private StepBuilderFactory stepBuilderFactory;
private JobBuilderFactory jobBuilderFactory;
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
return factory;
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
public Queue myQueue() {
return new Queue("myqueue");
public ItemReader<Customer> customerReader(){
return new AmqpItemReader<>(this.rabbitTemplate());
public ItemWriter<Customer> customerItemWriter(){
return items -> {
for(Customer c : items) {
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
public Job job() {
return jobBuilderFactory.get("job")
public CustomerStepListener customerStepListener() {
return new CustomerStepListener();
public class CustomerStepListener implements StepExecutionListener {
public void beforeStep(StepExecution stepExecution) {
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("READ COUNT = "+stepExecution);
return ExitStatus.COMPLETED;
2021-01-18 18:41:05.023 INFO 25532 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] == 2021-01-18 18:41:05.031 INFO 25532 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: localhost:5672 2021-01-18 18:41:05.072 INFO 25532 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: connectionFactory#20a14b55:0/SimpleConnection@4650a407 [delegate=amqp://guest@, localPort= 55797] READ COUNT = StepExecution: id=1, version=2, name=step1, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= 2021-01-18 18:41:05.097 INFO 25532 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 73ms 2021-01-18 18:41:05.099 INFO 25532 --- [ main] : Job: [SimpleJob: [name=job]] completed with the following parameters: [{-spring.output.ansi.enabled=always}] and the following status: [COMPLETED] in 87ms
On the "Writer Code" side, you are using an AmqpItemWriter
configured with a RabbitTemplate
. By default, messages will be sent to the nameless exchange, here an excerpt from the Javadoc:
Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.
In your writer configuration, there is no "connection" between the rabbit template and your queue. So you need to configure the rabbit template to send messages to your queue:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
This is similar to what you did on the reader side with rabbitTemplate.setDefaultReceiveQueue("myqueue");