package com.appleframework.jms.kafka.consumer;

import com.appleframework.jms.core.consumer.MessageConusmer2;
import com.appleframework.jms.core.utils.ByteUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appleframework/jms/kafka/consumer/TextMessageConsumer2.class */
public class TextMessageConsumer2 {
    private static final Logger logger = LoggerFactory.getLogger(TextMessageConsumer2.class);

    @Resource
    private MessageConusmer2<String> messageConusmer2;

    @Resource
    private ConsumerConfig consumerConfig;
    private String topic;
    private Integer partitionsNum;
    private ConsumerConnector connector;

    public void init() {
        HashMap hashMap = new HashMap();
        this.connector = Consumer.createJavaConsumerConnector(this.consumerConfig);
        String[] split = this.topic.split(",");
        for (String str : split) {
            hashMap.put(str, this.partitionsNum);
        }
        Map createMessageStreams = this.connector.createMessageStreams(hashMap);
        ArrayList<KafkaStream> arrayList = new ArrayList();
        for (String str2 : split) {
            arrayList.addAll((Collection) createMessageStreams.get(str2));
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            logger.error(e.getMessage());
        }
        final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.partitionsNum.intValue() * split.length);
        for (final KafkaStream kafkaStream : arrayList) {
            newFixedThreadPool.submit(new Runnable() { // from class: com.appleframework.jms.kafka.consumer.TextMessageConsumer2.1
                @Override // java.lang.Runnable
                public void run() {
                    ConsumerIterator it = kafkaStream.iterator();
                    while (it.hasNext()) {
                        TextMessageConsumer2.this.messageConusmer2.processMessage((String) ByteUtils.fromByte((byte[]) it.next().message()));
                    }
                }
            });
        }
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.appleframework.jms.kafka.consumer.TextMessageConsumer2.2
            @Override // java.lang.Runnable
            public void run() {
                newFixedThreadPool.shutdown();
            }
        }));
    }

    public void setMessageConusmer2(MessageConusmer2<String> messageConusmer2) {
        this.messageConusmer2 = messageConusmer2;
    }

    public void setConsumerConfig(ConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    public void setTopic(String str) {
        this.topic = str.trim().replaceAll(" ", "");
    }

    public void setPartitionsNum(Integer num) {
        this.partitionsNum = num;
    }

    public void destroy() {
        if (null != this.connector) {
            this.connector.shutdown();
        }
    }
}
