跳转至

RabbitMQ

Vagrantfile

  config.vm.network "forwarded_port", guest: 5672, host: 5672 # AMQP 端口
  config.vm.network "forwarded_port", guest: 15672, host: 15672 # MQ管理界面端口

docker-compose.yml

   rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"  # AMQP 端口
      - "15672:15672"  # 管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: rabbitmq_password
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - rabbitmq_logs:/var/log/rabbitmq

volumes:
  rabbitmq_data:
  rabbitmq_logs:

Nacos 配置中心

  • 保存 RabbitMQ 的隊列配置信息。

  • 提供服務配置的靈活性。

    spring:
      rabbitmq:
        host: 192.168.33.11
        port: 5672
        username: admin
        password: password
    rabbitmq:
      download-queue: downloadQueue
      convert-queue: convertQueue
      download-log-queue: downloadLogQueue
    

backend

Spring Boot 通過 ConvertListener 消費 RabbitMQ 消息(convert-queue)。

消息由 RabbitListener 消費後,轉換任務結束再發給 RabbitMQ 消息(notification-queue)。

package com.feddoubt.YT1.service.mq;

import com.feddoubt.YT1.service.utils.YouTubeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


import java.util.Map;

@Slf4j
@Service
public class ConvertListener {

    @Autowired
    private YouTubeUtils youTubeUtils;

    private final RabbitTemplate rabbitTemplate;

    public ConvertListener(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = "${rabbitmq.convert-queue}")
    @Async
    public void handleConvert(Map<String, Object> map) {
        try {
            log.info("收到轉換任務: {}", map);
            String videoPath = (String) map.get("output");
            log.info("開始轉換: {}", videoPath);

            // 轉換邏輯
            String result = youTubeUtils.convertToMp3ORMp4(map);
            log.info("notificationQueue success:{}",result);
            rabbitTemplate.convertAndSend("notificationQueue", result);

        } catch (Exception e) {
            log.error("處理轉換任務失敗", e);
        }
    }

}

Spring Boot 通過 FrontListener 消費 RabbitMQ 消息(notification-queueembedUrl-queue)。

消息由 RabbitListener 消費後,推送到 WebSocket 主題。

package com.feddoubt.YT1.service.mq;

import com.feddoubt.YT1.service.NotificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class FrontListener {

    @Autowired
    private NotificationService notificationService;

    @RabbitListener(queues = "${rabbitmq.embedUrl-queue}")
    @Async
    public void handleEmbedUrl(String message) {
        log.info("message:{}",message);
        // 推送消息到前端
        notificationService.sendNotification("/topic/embedUrl", message);
    }

    @RabbitListener(queues = "${rabbitmq.notification-queue}")
    @Async
    public void handleNotification(String message) {
        log.info("message:{}",message);
        // 推送消息到前端
        notificationService.sendNotification("/topic/convert", message);
    }
}

創建下載隊列,參數:隊列名稱,是否持久化

package com.feddoubt.common.YT1.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue notificationQueue() {
        return new Queue("notificationQueue", true);
    }

    @Bean
    public Queue embedUrlQueue() {
        return new Queue("embedUrlQueue", true);
    }
}