RabbitMQ
Vagrantfile
config.vm.network "forwarded_port", guest: 5672, host: 5672 # AMQP 端口
config.vm.network "forwarded_port", guest: 15672, host: 15672 # MQ管理界面端口
docker-compose.yml
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP 端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: rabbitmq_password
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- rabbitmq_logs:/var/log/rabbitmq
volumes:
rabbitmq_data:
rabbitmq_logs:
Nacos 配置中心
-
保存 RabbitMQ 的隊列配置信息。
-
提供服務配置的靈活性。
backend
Spring Boot 通過 ConvertListener 消費 RabbitMQ 消息(convert-queue)。
消息由 RabbitListener 消費後,轉換任務結束再發給 RabbitMQ 消息(notification-queue)。
package com.feddoubt.YT1.service.mq;
import com.feddoubt.YT1.service.utils.YouTubeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Map;
@Slf4j
@Service
public class ConvertListener {
@Autowired
private YouTubeUtils youTubeUtils;
private final RabbitTemplate rabbitTemplate;
public ConvertListener(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@RabbitListener(queues = "${rabbitmq.convert-queue}")
@Async
public void handleConvert(Map<String, Object> map) {
try {
log.info("收到轉換任務: {}", map);
String videoPath = (String) map.get("output");
log.info("開始轉換: {}", videoPath);
// 轉換邏輯
String result = youTubeUtils.convertToMp3ORMp4(map);
log.info("notificationQueue success:{}",result);
rabbitTemplate.convertAndSend("notificationQueue", result);
} catch (Exception e) {
log.error("處理轉換任務失敗", e);
}
}
}
Spring Boot 通過 FrontListener 消費 RabbitMQ 消息(notification-queue 和 embedUrl-queue)。
消息由 RabbitListener 消費後,推送到 WebSocket 主題。
package com.feddoubt.YT1.service.mq;
import com.feddoubt.YT1.service.NotificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class FrontListener {
@Autowired
private NotificationService notificationService;
@RabbitListener(queues = "${rabbitmq.embedUrl-queue}")
@Async
public void handleEmbedUrl(String message) {
log.info("message:{}",message);
// 推送消息到前端
notificationService.sendNotification("/topic/embedUrl", message);
}
@RabbitListener(queues = "${rabbitmq.notification-queue}")
@Async
public void handleNotification(String message) {
log.info("message:{}",message);
// 推送消息到前端
notificationService.sendNotification("/topic/convert", message);
}
}
創建下載隊列,參數:隊列名稱,是否持久化
package com.feddoubt.common.YT1.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue notificationQueue() {
return new Queue("notificationQueue", true);
}
@Bean
public Queue embedUrlQueue() {
return new Queue("embedUrlQueue", true);
}
}