package com.appleframework.jms.kafka.consumer.multithread.thread;

import com.appleframework.jms.core.config.TraceConfig;
import com.appleframework.jms.core.consumer.AbstractMessageConusmer;
import com.appleframework.jms.core.consumer.ErrorMessageProcessor;
import com.appleframework.jms.core.thread.NamedThreadFactory;
import com.appleframework.jms.core.utils.ExecutorUtils;
import com.appleframework.jms.core.utils.UuidUtils;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/appleframework/jms/kafka/consumer/multithread/thread/RecordMessageConsumer.class */
public abstract class RecordMessageConsumer extends AbstractMessageConusmer<ConsumerRecord<String, byte[]>> implements Runnable {
    private static Logger logger = LoggerFactory.getLogger(BaseMessageConsumer.class);
    protected String topic;
    private ErrorMessageProcessor<ConsumerRecord<String, byte[]>> errorProcessor;
    protected KafkaConsumer<String, byte[]> consumer;
    private ExecutorService messageExecutor;
    private ExecutorService mainExecutor;
    protected Integer threadsNum;
    protected String prefix = "";
    protected Boolean errorProcessorLock = true;
    private AtomicBoolean closed = new AtomicBoolean(false);
    private long timeout = Long.MAX_VALUE;
    protected boolean flowControl = false;
    protected Integer flowCapacity = Integer.MAX_VALUE;

    protected void init() {
        this.mainExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("apple-jms-kafka-comsumer-main"));
        this.mainExecutor.execute(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            String[] split = this.topic.split(",");
            HashSet hashSet = new HashSet();
            for (String str : split) {
                String str2 = this.prefix + str;
                hashSet.add(str2);
                logger.warn("subscribe the topic -> " + str2);
            }
            if (null == this.threadsNum) {
                this.threadsNum = Integer.valueOf(split.length);
            }
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            this.messageExecutor = ExecutorUtils.newFixedThreadPool(this.threadsNum.intValue(), linkedBlockingQueue, new NamedThreadFactory("apple-jms-kafka-comsumer-pool"));
            this.consumer.subscribe(hashSet);
            while (!this.closed.get()) {
                if (this.flowControl) {
                    while (linkedBlockingQueue.size() >= this.flowCapacity.intValue()) {
                        try {
                            Thread.sleep(10L);
                        } catch (InterruptedException e) {
                            logger.error("", e);
                        }
                    }
                }
                Iterator it = this.consumer.poll(this.timeout).iterator();
                while (it.hasNext()) {
                    final ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    if (TraceConfig.isSwitchTrace()) {
                        if (null != consumerRecord.key()) {
                            MDC.put(TraceConfig.getTraceIdKey(), (String) consumerRecord.key());
                        } else {
                            MDC.put(TraceConfig.getTraceIdKey(), UuidUtils.genUUID());
                        }
                    }
                    this.messageExecutor.submit(new Runnable() { // from class: com.appleframework.jms.kafka.consumer.multithread.thread.RecordMessageConsumer.1
                        @Override // java.lang.Runnable
                        public void run() {
                            if (RecordMessageConsumer.logger.isDebugEnabled()) {
                                RecordMessageConsumer.logger.debug("offset = %d, key = %s, value = %s%n", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                            }
                            if (RecordMessageConsumer.this.errorProcessorLock.booleanValue()) {
                                RecordMessageConsumer.this.processMessage(consumerRecord);
                                return;
                            }
                            try {
                                RecordMessageConsumer.this.processMessage(consumerRecord);
                            } catch (Exception e2) {
                                RecordMessageConsumer.this.processErrorMessage(consumerRecord);
                            }
                        }
                    });
                }
            }
        } catch (WakeupException e2) {
            if (!this.closed.get()) {
                throw e2;
            }
        }
    }

    protected void processErrorMessage(ConsumerRecord<String, byte[]> consumerRecord) {
        if (this.errorProcessorLock.booleanValue()) {
            return;
        }
        this.errorProcessor.processErrorMessage(consumerRecord, this);
    }

    public void setTopic(String str) {
        this.topic = str.trim().replaceAll(" ", "");
    }

    public void setErrorProcessorLock(Boolean bool) {
        this.errorProcessorLock = bool;
    }

    public void setConsumer(KafkaConsumer<String, byte[]> kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void destroy() {
        this.closed.set(true);
        this.consumer.wakeup();
        if (null != this.errorProcessor) {
            this.errorProcessor.close();
        }
        this.messageExecutor.shutdown();
        try {
            this.messageExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.error("", e);
        }
        this.mainExecutor.shutdown();
        try {
            this.mainExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
            logger.error("", e2);
        }
    }

    public void commit() {
        this.consumer.commitSync();
    }

    public void setThreadsNum(Integer num) {
        this.threadsNum = num;
    }

    public void setFlowControl(boolean z) {
        this.flowControl = z;
    }

    public void setFlowCapacity(Integer num) {
        this.flowCapacity = num;
    }
}
