package com.appleframework.jms.kafka.consumer;

import com.appleframework.jms.core.consumer.BytesMessageConusmer;
import com.appleframework.jms.core.consumer.ErrorByteMessageProcessor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/* loaded from: input_file:com/appleframework/jms/kafka/consumer/BaseMessageConsumer.class */
public abstract class BaseMessageConsumer extends BytesMessageConusmer {

    @Resource
    private ConsumerConfig consumerConfig;
    protected String topic;
    protected Integer partitionsNum;
    private ConsumerConnector connector;
    private ExecutorService executor;
    private ErrorByteMessageProcessor errorProcessor;
    protected Integer errorProcessorPoolSize = 1;
    protected Boolean errorProcessorLock = true;

    protected void init() {
        HashMap hashMap = new HashMap();
        this.connector = Consumer.createJavaConsumerConnector(this.consumerConfig);
        String[] split = this.topic.split(",");
        for (String str : split) {
            hashMap.put(str, this.partitionsNum);
        }
        Map createMessageStreams = this.connector.createMessageStreams(hashMap);
        ArrayList<KafkaStream> arrayList = new ArrayList();
        for (String str2 : split) {
            arrayList.addAll((Collection) createMessageStreams.get(str2));
        }
        this.executor = Executors.newFixedThreadPool(this.partitionsNum.intValue() * split.length);
        for (final KafkaStream kafkaStream : arrayList) {
            this.executor.submit(new Runnable() { // from class: com.appleframework.jms.kafka.consumer.BaseMessageConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    ConsumerIterator it = kafkaStream.iterator();
                    while (it.hasNext()) {
                        byte[] bArr = (byte[]) it.next().message();
                        if (BaseMessageConsumer.this.errorProcessorLock.booleanValue()) {
                            BaseMessageConsumer.this.processByteMessage(bArr);
                        } else {
                            try {
                                BaseMessageConsumer.this.processByteMessage(bArr);
                            } catch (Exception e) {
                                BaseMessageConsumer.this.processErrorMessage(bArr);
                            }
                        }
                    }
                }
            });
        }
        if (this.errorProcessorLock.booleanValue()) {
            return;
        }
        this.errorProcessor = new ErrorByteMessageProcessor(this.errorProcessorPoolSize.intValue());
    }

    protected void processErrorMessage(byte[] bArr) {
        if (this.errorProcessorLock.booleanValue()) {
            return;
        }
        this.errorProcessor.processErrorMessage(bArr, this);
    }

    public void setConsumerConfig(ConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    public void setTopic(String str) {
        this.topic = str.trim().replaceAll(" ", "");
    }

    public void setPartitionsNum(Integer num) {
        this.partitionsNum = num;
    }

    public void setErrorProcessorPoolSize(Integer num) {
        this.errorProcessorPoolSize = num;
    }

    public void setErrorProcessorLock(Boolean bool) {
        this.errorProcessorLock = bool;
    }

    public void destroy() {
        if (null != this.connector) {
            this.connector.shutdown();
        }
        if (null != this.executor) {
            this.executor.shutdown();
        }
        if (null != this.errorProcessor) {
            this.errorProcessor.close();
        }
    }
}
