package com.seeyon.ctp.common.mq;

import com.seeyon.ctp.common.constants.SystemProperties;
import com.seeyon.ctp.common.exceptions.BusinessException;
import com.seeyon.ctp.common.log.CtpLogFactory;
import com.seeyon.ctp.util.DBAgent;
import java.io.File;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.logging.Log;

/* loaded from: input_file:com/seeyon/ctp/common/mq/MQHandler.class */
public class MQHandler {
    private static Producer producer;
    private static Consummer consumer;
    private static final Log logger = CtpLogFactory.getLog(MQHandler.class);
    private static boolean enabled = false;
    public static boolean isEmbeddedServer = true;
    public static BrokerService broker = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void initialize() {
        if (!startMQServerIfNecessary()) {
            logger.error("****************** 启动嵌入式MQ出错，请检查！******************");
        }
        try {
            initProducer();
            initConsumer();
        } catch (BusinessException e) {
            logger.error("****************** 初始化MQ Consumer或Producer出错 ******************");
            logger.error(e.getLocalizedMessage(), e);
        }
        enabled = true;
    }

    public static void sendTopicTextMessage(String str, String str2, String str3) throws BusinessException {
        sendTextMessage(ChannelTypeEnum.TOPIC, str, str2, str3);
    }

    public static void sendQueueTextMessage(String str, String str2, String str3) throws BusinessException {
        sendTextMessage(ChannelTypeEnum.QUEUE, str, str2, str3);
    }

    public static void sendTopicMapMessage(String str, String str2, HashMap hashMap) throws BusinessException {
        sendMapMessage(ChannelTypeEnum.TOPIC, str, str2, hashMap);
    }

    public static void sendQueueMapMessage(String str, String str2, HashMap hashMap) throws BusinessException {
        sendMapMessage(ChannelTypeEnum.QUEUE, str, str2, hashMap);
    }

    public static void sendTopicObjectMessage(String str, String str2, Serializable serializable) throws BusinessException {
        sendObjectMessage(ChannelTypeEnum.TOPIC, str, str2, serializable);
    }

    public static void sendQueueObjectMessage(String str, String str2, Serializable serializable) throws BusinessException {
        sendObjectMessage(ChannelTypeEnum.QUEUE, str, str2, serializable);
    }

    public static void sendMessage(Message message) throws BusinessException {
        if (!enabled) {
            throw new BusinessException("MQ相关组件未初始化，请检查是否开启");
        }
        if (message.getChannelTypeEnum().equals(ChannelTypeEnum.QUEUE)) {
            producer.sendQueueMessage(message.getChannel(), message);
        } else {
            producer.sendTopicMessage(message.getChannel(), message);
        }
    }

    public static void subscribe(ChannelTypeEnum channelTypeEnum, String str, MessageReceiver messageReceiver) throws BusinessException {
        if (!enabled) {
            throw new BusinessException("MQ相关组件未初始化，请检查是否开启");
        }
        consumer.subscribe(MessageAckTypeEnum.AUTO_ACK, channelTypeEnum, str, messageReceiver);
    }

    public static void subscribe(MessageAckTypeEnum messageAckTypeEnum, ChannelTypeEnum channelTypeEnum, String str, MessageReceiver messageReceiver) throws BusinessException {
        if (!enabled) {
            throw new BusinessException("MQ相关组件未初始化，请检查是否开启");
        }
        consumer.subscribe(messageAckTypeEnum, channelTypeEnum, str, messageReceiver);
    }

    private static void sendTextMessage(ChannelTypeEnum channelTypeEnum, String str, String str2, String str3) throws BusinessException {
        TextMessage textMessage = new TextMessage(str3);
        textMessage.setChannelTypeEnum(channelTypeEnum);
        textMessage.setChannel(str);
        textMessage.setKey(str2);
        sendMessage(textMessage);
    }

    private static void sendMapMessage(ChannelTypeEnum channelTypeEnum, String str, String str2, HashMap hashMap) throws BusinessException {
        MapMessage mapMessage = new MapMessage(hashMap);
        mapMessage.setChannelTypeEnum(channelTypeEnum);
        mapMessage.setChannel(str);
        mapMessage.setKey(str2);
        sendMessage(mapMessage);
    }

    private static void sendObjectMessage(ChannelTypeEnum channelTypeEnum, String str, String str2, Serializable serializable) throws BusinessException {
        ObjectMessage objectMessage = new ObjectMessage(serializable);
        objectMessage.setChannelTypeEnum(channelTypeEnum);
        objectMessage.setChannel(str);
        objectMessage.setKey(str2);
        sendMessage(objectMessage);
    }

    private static boolean startMQServerIfNecessary() {
        if (!"ActiveMQ".equals(SystemProperties.getInstance().getProperty("mq.config.type"))) {
            isEmbeddedServer = false;
            logger.info("非ActiveMQ 不启动本地嵌入式ActiveMQ");
            return true;
        }
        if (!SystemProperties.getInstance().getProperty("mq.config.connection").contains("localhost")) {
            isEmbeddedServer = false;
            logger.info("IP地址配置非localhost，不启动本地嵌入式ActiveMQ");
            return true;
        }
        logger.info("启动嵌入式ActiveMQ Server ......");
        try {
            broker = new BrokerService();
            broker.addConnector(SystemProperties.getInstance().getProperty("mq.config.connection"));
            for (TransportConnector transportConnector : broker.getTransportConnectors()) {
                logger.info("ActiveMQ TransportConnector: " + transportConnector.toString());
                transportConnector.setAllowLinkStealing(true);
            }
            broker.setUseJmx(false);
            broker.setPersistent(false);
            broker.getPersistenceAdapter().setDirectory(new File(SystemProperties.getInstance().getProperty("ctp.base.folder") + "/activemq"));
            broker.start();
            logger.info("启动嵌入式ActiveMQ Server 完成");
            return true;
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage(), e);
            return false;
        }
    }

    private static void initConsumer() throws BusinessException {
        String property = SystemProperties.getInstance().getProperty("mq.config.type");
        String str = "failover:(" + SystemProperties.getInstance().getProperty("mq.config.connection").trim() + ")?initialReconnectDelay=1000&maxReconnectDelay=30000";
        String property2 = SystemProperties.getInstance().getProperty("mq.config.user");
        String property3 = SystemProperties.getInstance().getProperty("mq.config.password");
        int parseInt = Integer.parseInt(SystemProperties.getInstance().getProperty("mq.config.minThreadNum"));
        int parseInt2 = Integer.parseInt(SystemProperties.getInstance().getProperty("mq.config.maxThreadNum"));
        consumer = MQProcessorFactory.generateConsumer(property, str, property2, property3);
        consumer.initThreadPoolExecutor(parseInt, parseInt2, DBAgent.batch_size);
    }

    private static void initProducer() throws BusinessException {
        producer = MQProcessorFactory.generateProducer(SystemProperties.getInstance().getProperty("mq.config.type"), "failover:(" + SystemProperties.getInstance().getProperty("mq.config.connection").trim() + ")?initialReconnectDelay=1000&maxReconnectDelay=30000", SystemProperties.getInstance().getProperty("mq.config.user"), SystemProperties.getInstance().getProperty("mq.config.password"));
    }
}
