【资料图】
RabbitMQ 支持优先级队列,当工作中有一些任务需要紧急优先处理,此时可以使用优先级队列通过设置 MQ 的 x-max-priority 属性可以将对列设置为优先级队列
配置文件类
@Slf4j@Getter@Configurationpublic class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.cache.channel.size:100}") private int channelCacheSize; // 单条链接channel 缓存大小 @Value("${spring.rabbitmq.cache.channel.checkout-timeout:5000}") private long channelCheckTimeout; //获取channel等待时间,毫秒 @Value("${app.accept-text-queue-name}") private String acceptTextQueueName; @Bean public MessageConverter jsonConverter() { return new Jackson2JsonMessageConverter(); } @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setChannelCheckoutTimeout(channelCheckTimeout); connectionFactory.setChannelCacheSize(channelCacheSize); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "jsonRabbitTemplate") public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMandatory(true); template.setMessageConverter(jsonConverter()); template.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { log.error("confirm message error! correlationData: {}, cause: {}", correlationData, cause); } }); return template; } @Bean(name = "acceptSseConvertQueue") public Queue acceptSseConvertQueue() { Map args = new HashMap<>(); args.put("x-max-priority", 100);// 设置优先级最高为 100,数字越大优先级越高 return new Queue(acceptTextQueueName, true, false, false, args); }}
设置预取数
在消费者中默认 containerFactory 为 SimpleRabbitListenerContainerFactory ,其中默认预取数为 250,该配置在 AbstractMessageListenerContainer 中的 DEFAULT_PREFETCH_COUNT如果需要每次消费一个消息,就需要在配置文件中指定预取数为 1, 配置项为
// 设置预取数为 1spring.rabbitmq.listener.simple.prefetch = 1
设置并发消费数
在 @RabbitListener 中,配置项 concurrency 来配置并发消费数