I need to bind several exchanges with several routing keys to one single queue and be able to send messages by exchange and routing key and receive it by listening to queue by queue-name.
my code:
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(ExchangeConfig.class)
public class RabbitConfig {
private final ExchangeConfig exchangeConfig;
@Bean
public List<Binding> bindings() {
List<Binding> bindings = new ArrayList<>();
exchangeConfig.getExchangesWithKeys()
.forEach(exchangeWithKeys -> exchangeWithKeys.getRoutingKeys()
.forEach(key -> {
Exchange exchange = ExchangeBuilder.directExchange(exchangeWithKeys.getExchange()).build();
Queue queue = QueueBuilder.durable(exchangeConfig.getLogsQueue()).build();
Binding binding = BindingBuilder.bind(queue).to(exchange)
.with(key).noargs();
bindings.add(binding);
}));
return bindings;
}
}
config:
spring:
rabbitmq:
host: localhost
port: 5672
rabbitmq:
exchanges-with-keys:
- exchange: exchange1
routing-keys: exchange1.live, exchange1.after
- exchange: exchange2
routing-keys: exchange2.live, exchange2.after
- exchange: exchange3
routing-keys: exchange3.live, exchange3.after
logs-queue: log-messages_q
props:
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq")
public class ExchangeConfig {
private String logsQueue;
private List<ExchangeWithKeys> exchangesWithKeys;
@Data
public static class ExchangeWithKeys {
private String exchange;
private List<String> routingKeys;
}
}
listener:
@Component
@Slf4j
@RequiredArgsConstructor
public class LogsListener {
private final LogMessageEventProcessor logMessageEventProcessor;
@RabbitListener(queues = "${rabbitmq.logs-queue}")
public void onLiveEvent(LogMessageEvent event) {
log.info("Received log event message [{}]", event.getBody());
logMessageEventProcessor.processLogMessageEvent(event);
}
}
test:
@SpringBootTest
@ContextConfiguration(initializers = LogsListenerTest.Initializer.class)
class LogsListenerTest {
@Autowired
private RabbitTemplate template;
@ClassRule
private static final RabbitMQContainer container = new RabbitMQContainer("rabbitmq:3.7.25-management-alpine")
.withExposedPorts(5672, 15672).withQueue("log-messages_q");
@BeforeAll
private static void startRabbit() {container.start();}
@AfterAll
private static void stopRabbit() {
container.stop();
}
@Test
public void test() {
template.convertAndSend("exchange1", "exchange1.live", new LogMessageEvent());
template.receiveAndConvert("log-messages_q");
}
public static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(@NotNull ConfigurableApplicationContext configurableApplicationContext) {
val values = TestPropertyValues.of(
"spring.rabbitmq.host=" + container.getContainerIpAddress(),
"spring.rabbitmq.port=" + container.getMappedPort(5672)
);
values.applyTo(configurableApplicationContext);
}
}
}
Everything above does not working.
So where should i put these bindings to make it work? Thanks.
What version are you using? The use of List<Binding>
has been replaced by Declarables
.
See https://docs.spring.io/spring-amqp/docs/current/reference/html/#collection-declaration
The documentation is a bit out of date, the admin declareCollections
property was removed in 2.2.