package com.appleframework.jms.kafka.consumer.multithread.thread;

import com.appleframework.jms.core.consumer.AbstractMessageConusmer;
import com.appleframework.jms.core.consumer.ErrorMessageProcessor;
import com.appleframework.jms.core.thread.NamedThreadFactory;
import com.appleframework.jms.core.utils.ExecutorUtils;
import com.appleframework.jms.core.utils.UuidUtils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.kafka.annotation.KafkaListener;

/* loaded from: input_file:com/appleframework/jms/kafka/consumer/multithread/thread/BaseMessageConsumer.class */
public abstract class BaseMessageConsumer extends AbstractMessageConusmer<byte[]> {
    private static Logger logger = LoggerFactory.getLogger(BaseMessageConsumer.class);
    private ErrorMessageProcessor<byte[]> errorProcessor;
    private ExecutorService messageExecutor;
    protected Integer threadsNum;
    protected boolean errorProcessorLock = true;
    protected boolean flowControl = false;
    protected int flowCapacity = Integer.MAX_VALUE;
    private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue();

    @PostConstruct
    protected void init() {
        if (null == this.threadsNum) {
            this.threadsNum = 1;
        }
        if (null == this.messageExecutor) {
            this.messageExecutor = ExecutorUtils.newFixedThreadPool(this.threadsNum.intValue(), this.workQueue, new NamedThreadFactory("apple-jms-kafka-comsumer-pool"));
        }
    }

    @KafkaListener(topics = {"#{'${spring.kafka.consumer.topics}'.split(',')}"})
    public void run(final ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            if (null != consumerRecord.key()) {
                MDC.put("traceId", (String) consumerRecord.key());
            } else {
                MDC.put("traceId", UuidUtils.genUUID());
            }
            if (this.flowControl) {
                while (this.workQueue.size() >= this.flowCapacity) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        logger.error("", e);
                    }
                }
            }
            this.messageExecutor.submit(new Runnable() { // from class: com.appleframework.jms.kafka.consumer.multithread.thread.BaseMessageConsumer.1
                @Override // java.lang.Runnable
                public void run() {
                    if (BaseMessageConsumer.logger.isDebugEnabled()) {
                        BaseMessageConsumer.logger.debug("offset = %d, key = %s, value = %s%n", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                    }
                    byte[] bArr = (byte[]) consumerRecord.value();
                    if (BaseMessageConsumer.this.errorProcessorLock) {
                        BaseMessageConsumer.this.processMessage(bArr);
                        return;
                    }
                    try {
                        BaseMessageConsumer.this.processMessage(bArr);
                    } catch (Exception e2) {
                        BaseMessageConsumer.this.processErrorMessage(bArr);
                    }
                }
            });
        } catch (WakeupException e2) {
            throw e2;
        }
    }

    protected void processErrorMessage(byte[] bArr) {
        if (this.errorProcessorLock || null == this.errorProcessor) {
            return;
        }
        this.errorProcessor.processErrorMessage(bArr, this);
    }

    public void setErrorProcessorLock(Boolean bool) {
        this.errorProcessorLock = bool.booleanValue();
    }

    public void destroy() {
        if (null != this.errorProcessor) {
            this.errorProcessor.close();
        }
        this.messageExecutor.shutdown();
        try {
            this.messageExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.error("", e);
        }
    }

    public void commitSync() {
    }

    public void commitAsync() {
    }

    public void setErrorProcessor(ErrorMessageProcessor<byte[]> errorMessageProcessor) {
        this.errorProcessor = errorMessageProcessor;
    }

    public void setThreadsNum(Integer num) {
        this.threadsNum = num;
    }

    public void setFlowControl(boolean z) {
        this.flowControl = z;
    }

    public void setFlowCapacity(Integer num) {
        this.flowCapacity = num.intValue();
    }
}
