package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;
import org.threeten.bp.temporal.ChronoUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher.class */
public class MessageDispatcher {
    private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9d;
    private final Executor executor;
    private final ScheduledExecutorService systemExecutor;
    private final ApiClock clock;
    private final Duration ackExpirationPadding;
    private final Duration maxAckExtensionPeriod;
    private final MessageReceiver receiver;
    private final AckProcessor ackProcessor;
    private final FlowController flowController;
    private ScheduledFuture<?> backgroundJob;
    private final Distribution ackLatencyDistribution;
    private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());

    @InternalApi
    static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
    private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap();
    private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue<>();
    private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(60);
    private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
    private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches = new LinkedBlockingDeque<>();
    private final Lock jobLock = new ReentrantLock();
    private final MessageWaiter messagesWaiter = new MessageWaiter();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher$AckHandler.class */
    public class AckHandler implements ApiFutureCallback<AckReply> {
        private final String ackId;
        private final int outstandingBytes;
        private final long receivedTimeMillis;
        private final Instant totalExpiration;

        AckHandler(String str, int i, Instant instant) {
            this.ackId = str;
            this.outstandingBytes = i;
            this.receivedTimeMillis = MessageDispatcher.this.clock.millisTime();
            this.totalExpiration = instant;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void forget() {
            if (MessageDispatcher.this.pendingMessages.remove(this.ackId) == null) {
                return;
            }
            MessageDispatcher.this.flowController.release(1L, this.outstandingBytes);
            MessageDispatcher.this.messagesWaiter.incrementPendingMessages(-1);
            MessageDispatcher.this.processOutstandingBatches();
        }

        public void onFailure(Throwable th) {
            MessageDispatcher.logger.log(Level.WARNING, "MessageReceiver failed to processes ack ID: " + this.ackId + ", the message will be nacked.", th);
            MessageDispatcher.this.pendingNacks.add(this.ackId);
            forget();
        }

        public void onSuccess(AckReply ackReply) {
            LinkedBlockingQueue linkedBlockingQueue;
            switch (ackReply) {
                case ACK:
                    linkedBlockingQueue = MessageDispatcher.this.pendingAcks;
                    MessageDispatcher.this.ackLatencyDistribution.record(Ints.saturatedCast((long) Math.ceil((MessageDispatcher.this.clock.millisTime() - this.receivedTimeMillis) / 1000.0d)));
                    break;
                case NACK:
                    linkedBlockingQueue = MessageDispatcher.this.pendingNacks;
                    break;
                default:
                    throw new IllegalArgumentException(String.format("AckReply: %s not supported", ackReply));
            }
            linkedBlockingQueue.add(this.ackId);
            forget();
        }
    }

    /* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher$AckProcessor.class */
    public interface AckProcessor {
        void sendAckOperations(List<String> list, List<PendingModifyAckDeadline> list2);
    }

    /* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher$AckReply.class */
    public enum AckReply {
        ACK,
        NACK
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher$OutstandingMessageBatch.class */
    public static class OutstandingMessageBatch {
        private final Deque<OutstandingMessage> messages = new LinkedList();
        private final Runnable doneCallback;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher$OutstandingMessageBatch$OutstandingMessage.class */
        public static class OutstandingMessage {
            private final ReceivedMessage receivedMessage;
            private final AckHandler ackHandler;

            public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
                this.receivedMessage = receivedMessage;
                this.ackHandler = ackHandler;
            }

            public ReceivedMessage receivedMessage() {
                return this.receivedMessage;
            }

            public AckHandler ackHandler() {
                return this.ackHandler;
            }
        }

        public OutstandingMessageBatch(Runnable runnable) {
            this.doneCallback = runnable;
        }

        public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
            this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
        }

        public Deque<OutstandingMessage> messages() {
            return this.messages;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/pubsub/v1/MessageDispatcher$PendingModifyAckDeadline.class */
    public static class PendingModifyAckDeadline {
        final List<String> ackIds;
        final int deadlineExtensionSeconds;

        PendingModifyAckDeadline(int i, String... strArr) {
            this(i, Arrays.asList(strArr));
        }

        private PendingModifyAckDeadline(int i, Collection<String> collection) {
            this.ackIds = new ArrayList(collection);
            this.deadlineExtensionSeconds = i;
        }

        public void addAckId(String str) {
            this.ackIds.add(str);
        }

        public String toString() {
            return String.format("PendingModifyAckDeadline{extension: %d sec, ackIds: %s}", Integer.valueOf(this.deadlineExtensionSeconds), this.ackIds);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageDispatcher(MessageReceiver messageReceiver, AckProcessor ackProcessor, Duration duration, Duration duration2, Distribution distribution, FlowController flowController, Executor executor, ScheduledExecutorService scheduledExecutorService, ApiClock apiClock) {
        this.executor = executor;
        this.systemExecutor = scheduledExecutorService;
        this.ackExpirationPadding = duration;
        this.maxAckExtensionPeriod = duration2;
        this.receiver = messageReceiver;
        this.ackProcessor = ackProcessor;
        this.flowController = flowController;
        this.ackLatencyDistribution = distribution;
        this.clock = apiClock;
    }

    public void start() {
        final Runnable runnable = new Runnable() { // from class: com.google.cloud.pubsub.v1.MessageDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                MessageDispatcher.this.extendDeadline.set(true);
            }
        };
        this.jobLock.lock();
        try {
            this.backgroundJob = this.systemExecutor.scheduleWithFixedDelay(new Runnable() { // from class: com.google.cloud.pubsub.v1.MessageDispatcher.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (MessageDispatcher.this.extendDeadline.getAndSet(false)) {
                            int computeDeadlineSeconds = MessageDispatcher.this.computeDeadlineSeconds();
                            MessageDispatcher.this.messageDeadlineSeconds.set(computeDeadlineSeconds);
                            MessageDispatcher.this.extendDeadlines();
                            MessageDispatcher.this.systemExecutor.schedule(runnable, computeDeadlineSeconds - MessageDispatcher.this.ackExpirationPadding.getSeconds(), TimeUnit.SECONDS);
                        }
                        MessageDispatcher.this.processOutstandingAckOperations();
                    } catch (Throwable th) {
                        MessageDispatcher.logger.log(Level.WARNING, "failed to run periodic job", th);
                    }
                }
            }, PENDING_ACKS_SEND_DELAY.toMillis(), PENDING_ACKS_SEND_DELAY.toMillis(), TimeUnit.MILLISECONDS);
        } finally {
            this.jobLock.unlock();
        }
    }

    public void stop() {
        this.messagesWaiter.waitNoMessages();
        this.jobLock.lock();
        try {
            if (this.backgroundJob != null) {
                this.backgroundJob.cancel(false);
                this.backgroundJob = null;
            }
            processOutstandingAckOperations();
        } finally {
            this.jobLock.unlock();
        }
    }

    @InternalApi
    void setMessageDeadlineSeconds(int i) {
        this.messageDeadlineSeconds.set(i);
    }

    @InternalApi
    int getMessageDeadlineSeconds() {
        return this.messageDeadlineSeconds.get();
    }

    public void processReceivedMessages(List<ReceivedMessage> list, Runnable runnable) {
        if (list.isEmpty()) {
            runnable.run();
            return;
        }
        Instant plus = now().plus(this.maxAckExtensionPeriod);
        OutstandingMessageBatch outstandingMessageBatch = new OutstandingMessageBatch(runnable);
        for (ReceivedMessage receivedMessage : list) {
            AckHandler ackHandler = new AckHandler(receivedMessage.getAckId(), receivedMessage.getMessage().getSerializedSize(), plus);
            if (this.pendingMessages.putIfAbsent(receivedMessage.getAckId(), ackHandler) == null) {
                outstandingMessageBatch.addMessage(receivedMessage, ackHandler);
                this.pendingReceipts.add(receivedMessage.getAckId());
            }
        }
        if (outstandingMessageBatch.messages.isEmpty()) {
            runnable.run();
            return;
        }
        this.messagesWaiter.incrementPendingMessages(outstandingMessageBatch.messages.size());
        this.outstandingMessageBatches.add(outstandingMessageBatch);
        processOutstandingBatches();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processOutstandingBatches() {
        OutstandingMessageBatch poll = this.outstandingMessageBatches.poll();
        while (true) {
            OutstandingMessageBatch outstandingMessageBatch = poll;
            if (outstandingMessageBatch == null) {
                return;
            }
            Object poll2 = outstandingMessageBatch.messages.poll();
            while (true) {
                OutstandingMessageBatch.OutstandingMessage outstandingMessage = (OutstandingMessageBatch.OutstandingMessage) poll2;
                if (outstandingMessage != null) {
                    try {
                        this.flowController.reserve(1L, outstandingMessage.receivedMessage.getMessage().getSerializedSize());
                        processOutstandingMessage(outstandingMessage.receivedMessage.getMessage(), outstandingMessage.ackHandler);
                        poll2 = outstandingMessageBatch.messages.poll();
                    } catch (FlowController.MaxOutstandingElementCountReachedException | FlowController.MaxOutstandingRequestBytesReachedException e) {
                        outstandingMessageBatch.messages.addFirst(outstandingMessage);
                        this.outstandingMessageBatches.addFirst(outstandingMessageBatch);
                        return;
                    } catch (FlowController.FlowControlException e2) {
                        throw new IllegalStateException("Flow control unexpected exception", e2);
                    }
                }
            }
            outstandingMessageBatch.doneCallback.run();
            poll = this.outstandingMessageBatches.poll();
        }
    }

    private void processOutstandingMessage(final PubsubMessage pubsubMessage, final AckHandler ackHandler) {
        final SettableApiFuture create = SettableApiFuture.create();
        final AckReplyConsumer ackReplyConsumer = new AckReplyConsumer() { // from class: com.google.cloud.pubsub.v1.MessageDispatcher.3
            @Override // com.google.cloud.pubsub.v1.AckReplyConsumer
            public void ack() {
                create.set(AckReply.ACK);
            }

            @Override // com.google.cloud.pubsub.v1.AckReplyConsumer
            public void nack() {
                create.set(AckReply.NACK);
            }
        };
        ApiFutures.addCallback(create, ackHandler, MoreExecutors.directExecutor());
        this.executor.execute(new Runnable() { // from class: com.google.cloud.pubsub.v1.MessageDispatcher.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (ackHandler.totalExpiration.plusSeconds(MessageDispatcher.this.messageDeadlineSeconds.get()).isBefore(MessageDispatcher.this.now())) {
                        ackHandler.forget();
                    } else {
                        MessageDispatcher.this.receiver.receiveMessage(pubsubMessage, ackReplyConsumer);
                    }
                } catch (Exception e) {
                    create.setException(e);
                }
            }
        });
    }

    @InternalApi
    int computeDeadlineSeconds() {
        int saturatedCast = Ints.saturatedCast(this.ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES));
        if (saturatedCast < 10) {
            saturatedCast = 10;
        } else if (saturatedCast > 600) {
            saturatedCast = 600;
        }
        return saturatedCast;
    }

    @InternalApi
    void extendDeadlines() {
        int messageDeadlineSeconds = getMessageDeadlineSeconds();
        ArrayList arrayList = new ArrayList();
        PendingModifyAckDeadline pendingModifyAckDeadline = new PendingModifyAckDeadline(messageDeadlineSeconds, new String[0]);
        Instant now = now();
        Instant plusSeconds = now.plusSeconds(messageDeadlineSeconds);
        for (Map.Entry<String, AckHandler> entry : this.pendingMessages.entrySet()) {
            String key = entry.getKey();
            Instant instant = entry.getValue().totalExpiration;
            if (instant.isAfter(plusSeconds)) {
                pendingModifyAckDeadline.ackIds.add(key);
            } else {
                entry.getValue().forget();
                if (instant.isAfter(now)) {
                    arrayList.add(new PendingModifyAckDeadline(Math.max(1, (int) now.until(instant, ChronoUnit.SECONDS)), key));
                }
            }
        }
        logger.log(Level.FINER, "Sending {0} modacks", Integer.valueOf(pendingModifyAckDeadline.ackIds.size() + arrayList.size()));
        arrayList.add(pendingModifyAckDeadline);
        this.ackProcessor.sendAckOperations(Collections.emptyList(), arrayList);
    }

    @InternalApi
    void processOutstandingAckOperations() {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.pendingAcks.drainTo(arrayList2);
        logger.log(Level.FINER, "Sending {0} acks", Integer.valueOf(arrayList2.size()));
        PendingModifyAckDeadline pendingModifyAckDeadline = new PendingModifyAckDeadline(0, new String[0]);
        this.pendingNacks.drainTo(pendingModifyAckDeadline.ackIds);
        logger.log(Level.FINER, "Sending {0} nacks", Integer.valueOf(pendingModifyAckDeadline.ackIds.size()));
        if (!pendingModifyAckDeadline.ackIds.isEmpty()) {
            arrayList.add(pendingModifyAckDeadline);
        }
        PendingModifyAckDeadline pendingModifyAckDeadline2 = new PendingModifyAckDeadline(getMessageDeadlineSeconds(), new String[0]);
        this.pendingReceipts.drainTo(pendingModifyAckDeadline2.ackIds);
        logger.log(Level.FINER, "Sending {0} receipts", Integer.valueOf(pendingModifyAckDeadline2.ackIds.size()));
        if (!pendingModifyAckDeadline2.ackIds.isEmpty()) {
            arrayList.add(pendingModifyAckDeadline2);
        }
        this.ackProcessor.sendAckOperations(arrayList2, arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Instant now() {
        return Instant.ofEpochMilli(this.clock.millisTime());
    }
}
