package com.appleframework.jms.kafka.consumer.multithread.group;

import com.appleframework.jms.core.consumer.AbstractMessageConusmer;
import com.appleframework.jms.core.consumer.ErrorMessageProcessor;
import com.appleframework.jms.core.thread.NamedThreadFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appleframework/jms/kafka/consumer/multithread/group/BaseMessageConsumer.class */
public abstract class BaseMessageConsumer extends AbstractMessageConusmer<byte[]> {
    private static Logger logger = LoggerFactory.getLogger(BaseMessageConsumer.class);
    protected String topic;
    private ErrorMessageProcessor<byte[]> errorProcessor;
    private Properties properties;
    private ExecutorService executor;
    protected String prefix = "";
    protected Boolean errorProcessorLock = true;
    private long timeout = Long.MAX_VALUE;
    private Integer threadsNum = 1;
    private Boolean mixConsumer = true;
    private List<MessageConsumerThread> threadList = new ArrayList();

    public void init() {
        this.executor = Executors.newFixedThreadPool(this.threadsNum.intValue(), new NamedThreadFactory("apple-jms-kafka-comsumer-pool"));
        if (this.mixConsumer.booleanValue()) {
            for (int i = 0; i < this.threadsNum.intValue(); i++) {
                startThread(this.topic);
            }
            return;
        }
        for (String str : this.topic.split(",")) {
            for (int i2 = 0; i2 < this.threadsNum.intValue(); i2++) {
                startThread(str);
            }
        }
    }

    private void startThread(String str) {
        MessageConsumerThread messageConsumerThread = new MessageConsumerThread();
        messageConsumerThread.setProperties(this.properties);
        messageConsumerThread.setErrorProcessor(this.errorProcessor);
        messageConsumerThread.setErrorProcessorLock(this.errorProcessorLock);
        messageConsumerThread.setMessageConusmer(this);
        messageConsumerThread.setPrefix(this.prefix);
        messageConsumerThread.setTimeout(this.timeout);
        messageConsumerThread.setTopic(str);
        this.threadList.add(messageConsumerThread);
        this.executor.submit(messageConsumerThread);
    }

    protected void processErrorMessage(byte[] bArr) {
        if (this.errorProcessorLock.booleanValue() || null == this.errorProcessor) {
            return;
        }
        this.errorProcessor.processErrorMessage(bArr, this);
    }

    public void setTopic(String str) {
        this.topic = str.trim().replaceAll(" ", "");
    }

    public void setErrorProcessorLock(Boolean bool) {
        this.errorProcessorLock = bool;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void destroy() {
        if (null != this.errorProcessor) {
            this.errorProcessor.close();
        }
        Iterator<MessageConsumerThread> it = this.threadList.iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.error("", e);
        }
    }

    public void setErrorProcessor(ErrorMessageProcessor<byte[]> errorMessageProcessor) {
        this.errorProcessor = errorMessageProcessor;
    }

    public void setPrefix(String str) {
        this.prefix = str;
    }

    public void setThreadsNum(Integer num) {
        this.threadsNum = num;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public void setMixConsumer(Boolean bool) {
        this.mixConsumer = bool;
    }
}
