package io.jboot.components.mq.aliyunmq;

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.util.Map;
import java.util.Properties;

/* loaded from: input_file:io/jboot/components/mq/aliyunmq/JbootAliyunmqImpl.class */
public class JbootAliyunmqImpl extends JbootmqBase implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootAliyunmqImpl.class);
    private Producer producer;
    private Consumer consumer;
    private JbootAliyunmqConfig aliyunmqConfig;

    public JbootAliyunmqImpl(JbootmqConfig jbootmqConfig) {
        super(jbootmqConfig);
        String typeName = jbootmqConfig.getTypeName();
        if (!StrUtil.isNotBlank(typeName)) {
            this.aliyunmqConfig = (JbootAliyunmqConfig) Jboot.config(JbootAliyunmqConfig.class);
            return;
        }
        Map configModels = ConfigUtil.getConfigModels(JbootAliyunmqConfig.class);
        if (!configModels.containsKey(typeName)) {
            throw new JbootIllegalConfigException("Please config \"jboot.mq.aliyun." + typeName + ".addr\" in your jboot.properties.");
        }
        this.aliyunmqConfig = (JbootAliyunmqConfig) configModels.get(typeName);
    }

    @Override // io.jboot.components.mq.JbootmqBase
    protected void onStartListening() {
        startQueueConsumer();
        startBroadCastConsumer();
    }

    @Override // io.jboot.components.mq.JbootmqBase
    protected void onStopListening() {
        if (this.consumer != null) {
            this.consumer.shutdown();
            this.consumer = null;
        }
    }

    public void startQueueConsumer() {
        this.consumer = ONSFactory.createConsumer(createProperties());
        for (String str : this.channels) {
            this.consumer.subscribe(this.aliyunmqConfig.getBroadcastChannelPrefix() + str, this.aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> {
                AliyunmqMessageContext aliyunmqMessageContext = new AliyunmqMessageContext(this, message, consumeContext);
                notifyListeners(str, getSerializer().deserialize(message.getBody()), aliyunmqMessageContext);
                return aliyunmqMessageContext.getReturnAction();
            });
        }
        this.consumer.start();
    }

    public void startBroadCastConsumer() {
        Properties createProperties = createProperties();
        createProperties.put("MessageModel", "BROADCASTING");
        this.consumer = ONSFactory.createConsumer(createProperties);
        for (String str : this.channels) {
            this.consumer.subscribe(str, this.aliyunmqConfig.getSubscribeSubExpression(), (message, consumeContext) -> {
                AliyunmqMessageContext aliyunmqMessageContext = new AliyunmqMessageContext(this, message, consumeContext);
                notifyListeners(str, getSerializer().deserialize(message.getBody()), aliyunmqMessageContext);
                return aliyunmqMessageContext.getReturnAction();
            });
        }
        this.consumer.start();
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void enqueue(Object obj, String str) {
        if (getProducer().send(obj instanceof Message ? (Message) obj : new Message(str, "*", getSerializer().serialize(obj))) == null) {
            LOG.warn("Rockect mq send message fail!!!");
        }
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void publish(Object obj, String str) {
        Message message;
        if (obj instanceof Message) {
            message = (Message) obj;
        } else {
            message = new Message(this.aliyunmqConfig.getBroadcastChannelPrefix() + str, "*", getSerializer().serialize(obj));
        }
        if (getProducer().send(message) == null) {
            LOG.warn("Rockect mq send message fail!!!");
        }
    }

    public Producer getProducer() {
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    createProducer();
                }
            }
        }
        return this.producer;
    }

    public void createProducer() {
        this.producer = ONSFactory.createProducer(createProperties());
        this.producer.start();
    }

    public Properties createProperties() {
        Properties properties = new Properties();
        properties.put("AccessKey", this.aliyunmqConfig.getAccessKey());
        properties.put("SecretKey", this.aliyunmqConfig.getSecretKey());
        properties.put("ProducerId", this.aliyunmqConfig.getProducerId());
        properties.put("NAMESRV_ADDR", this.aliyunmqConfig.getAddr());
        properties.put("InstanceName", this.aliyunmqConfig.getInstanceName());
        properties.setProperty("SendMsgTimeoutMillis", this.aliyunmqConfig.getSendMsgTimeoutMillis());
        return properties;
    }
}
