package com.appleframework.jms.kafka.consumer;

import com.appleframework.jms.core.consumer.AbstractMessageConusmer;
import com.appleframework.jms.kafka.thread.StandardThreadExecutor;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appleframework/jms/kafka/consumer/ErrorMetadataMessageProcessor.class */
public class ErrorMetadataMessageProcessor implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(ErrorMetadataMessageProcessor.class);
    private static final long RETRY_PERIOD_UNIT = 15000;
    private final PriorityBlockingQueue<PriorityTask> taskQueue;
    private ExecutorService executor;
    private AtomicBoolean closed;

    /* loaded from: input_file:com/appleframework/jms/kafka/consumer/ErrorMetadataMessageProcessor$PriorityTask.class */
    class PriorityTask implements Runnable, Comparable<PriorityTask> {
        final MessageAndMetadata<byte[], byte[]> message;
        final AbstractMessageConusmer<MessageAndMetadata<byte[], byte[]>> metadataMessageConusmer;
        int retryCount;
        long nextFireTime;

        public PriorityTask(ErrorMetadataMessageProcessor errorMetadataMessageProcessor, MessageAndMetadata<byte[], byte[]> messageAndMetadata, AbstractMessageConusmer<MessageAndMetadata<byte[], byte[]>> abstractMessageConusmer) {
            this(messageAndMetadata, abstractMessageConusmer, System.currentTimeMillis() + ErrorMetadataMessageProcessor.RETRY_PERIOD_UNIT);
        }

        public PriorityTask(MessageAndMetadata<byte[], byte[]> messageAndMetadata, AbstractMessageConusmer<MessageAndMetadata<byte[], byte[]>> abstractMessageConusmer, long j) {
            this.retryCount = 0;
            this.message = messageAndMetadata;
            this.metadataMessageConusmer = abstractMessageConusmer;
            this.nextFireTime = j;
        }

        public MessageAndMetadata<byte[], byte[]> getMessage() {
            return this.message;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.metadataMessageConusmer.processMessage(this.message);
            } catch (Exception e) {
                this.retryCount++;
                retry();
            }
        }

        private void retry() {
            if (this.retryCount == 3) {
                ErrorMetadataMessageProcessor.logger.warn("retry_skip skip!!!");
            } else {
                this.nextFireTime += this.retryCount * ErrorMetadataMessageProcessor.RETRY_PERIOD_UNIT;
                ErrorMetadataMessageProcessor.this.taskQueue.add(this);
            }
        }

        @Override // java.lang.Comparable
        public int compareTo(PriorityTask priorityTask) {
            return (int) (this.nextFireTime - priorityTask.nextFireTime);
        }
    }

    public ErrorMetadataMessageProcessor() {
        this(1);
    }

    public ErrorMetadataMessageProcessor(int i) {
        this.taskQueue = new PriorityBlockingQueue<>(1000);
        this.closed = new AtomicBoolean(false);
        this.executor = Executors.newFixedThreadPool(i, new StandardThreadExecutor.StandardThreadFactory("ErrorByteMessageProcessor"));
        this.executor.submit(new Runnable() { // from class: com.appleframework.jms.kafka.consumer.ErrorMetadataMessageProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                PriorityTask priorityTask;
                while (!ErrorMetadataMessageProcessor.this.closed.get()) {
                    try {
                        priorityTask = (PriorityTask) ErrorMetadataMessageProcessor.this.taskQueue.take();
                    } catch (Exception e) {
                        ErrorMetadataMessageProcessor.logger.error(e.getMessage());
                    }
                    if (priorityTask.getMessage() == null) {
                        return;
                    }
                    if (priorityTask.nextFireTime - System.currentTimeMillis() > 0) {
                        TimeUnit.MILLISECONDS.sleep(1000L);
                        ErrorMetadataMessageProcessor.this.taskQueue.put(priorityTask);
                    } else {
                        priorityTask.run();
                    }
                }
            }
        });
    }

    public void submit(MessageAndMetadata<byte[], byte[]> messageAndMetadata, AbstractMessageConusmer<MessageAndMetadata<byte[], byte[]>> abstractMessageConusmer) {
        int size = this.taskQueue.size();
        if (size > 1000) {
            logger.warn("ErrorByteMessageProcessor queue task count over:{}", Integer.valueOf(size));
        }
        this.taskQueue.add(new PriorityTask(this, messageAndMetadata, abstractMessageConusmer));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed.set(true);
        this.taskQueue.add(new PriorityTask(this, null, null));
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
        }
        this.executor.shutdown();
        logger.info("ErrorByteMessageProcessor closed");
    }
}
