package org.apache.flink.runtime.query;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import akka.pattern.Patterns;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.class */
public class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
    private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
    private final LeaderRetrievalService leaderRetrievalService;
    private final ActorSystem actorSystem;
    private final FiniteDuration askTimeout;
    private final LookupRetryStrategyFactory retryStrategyFactory;
    private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;

    /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService$DisabledLookupRetryStrategyFactory.class */
    static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
        private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();

        /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService$DisabledLookupRetryStrategyFactory$DisabledLookupRetryStrategy.class */
        private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
            private DisabledLookupRetryStrategy() {
            }

            @Override // org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy
            public FiniteDuration getRetryDelay() {
                return FiniteDuration.Zero();
            }

            @Override // org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy
            public boolean tryRetry() {
                return false;
            }
        }

        DisabledLookupRetryStrategyFactory() {
        }

        @Override // org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory
        public LookupRetryStrategy createRetryStrategy() {
            return RETRY_STRATEGY;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService$FixedDelayLookupRetryStrategyFactory.class */
    static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
        private final int maxRetries;
        private final FiniteDuration retryDelay;

        /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService$FixedDelayLookupRetryStrategyFactory$FixedDelayLookupRetryStrategy.class */
        private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
            private final Object retryLock = new Object();
            private final int maxRetries;
            private final FiniteDuration retryDelay;
            private int numRetries;

            public FixedDelayLookupRetryStrategy(int i, FiniteDuration finiteDuration) {
                Preconditions.checkArgument(i >= 0, "Negative number maximum retries");
                this.maxRetries = i;
                this.retryDelay = (FiniteDuration) Preconditions.checkNotNull(finiteDuration, "Retry delay");
            }

            @Override // org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy
            public FiniteDuration getRetryDelay() {
                FiniteDuration finiteDuration;
                synchronized (this.retryLock) {
                    finiteDuration = this.retryDelay;
                }
                return finiteDuration;
            }

            @Override // org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy
            public boolean tryRetry() {
                synchronized (this.retryLock) {
                    if (this.numRetries >= this.maxRetries) {
                        return false;
                    }
                    this.numRetries++;
                    return true;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public FixedDelayLookupRetryStrategyFactory(int i, FiniteDuration finiteDuration) {
            this.maxRetries = i;
            this.retryDelay = finiteDuration;
        }

        @Override // org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory
        public LookupRetryStrategy createRetryStrategy() {
            return new FixedDelayLookupRetryStrategy(this.maxRetries, this.retryDelay);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService$LookupRetryStrategy.class */
    public interface LookupRetryStrategy {
        FiniteDuration getRetryDelay();

        boolean tryRetry();
    }

    /* loaded from: input_file:org/apache/flink/runtime/query/AkkaKvStateLocationLookupService$LookupRetryStrategyFactory.class */
    interface LookupRetryStrategyFactory {
        LookupRetryStrategy createRetryStrategy();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AkkaKvStateLocationLookupService(LeaderRetrievalService leaderRetrievalService, ActorSystem actorSystem, FiniteDuration finiteDuration, LookupRetryStrategyFactory lookupRetryStrategyFactory) {
        this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem, "Actor system");
        this.askTimeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration, "Ask Timeout");
        this.retryStrategyFactory = (LookupRetryStrategyFactory) Preconditions.checkNotNull(lookupRetryStrategyFactory, "Retry strategy factory");
    }

    @Override // org.apache.flink.runtime.query.KvStateLocationLookupService
    public void start() {
        try {
            this.leaderRetrievalService.start(this);
        } catch (Exception e) {
            LOG.error("Failed to start leader retrieval service", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.query.KvStateLocationLookupService
    public void shutDown() {
        try {
            this.leaderRetrievalService.stop();
        } catch (Exception e) {
            LOG.error("Failed to stop leader retrieval service", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.query.KvStateLocationLookupService
    public Future<KvStateLocation> getKvStateLookupInfo(JobID jobID, String str) {
        return getKvStateLookupInfo(jobID, str, this.retryStrategyFactory.createRetryStrategy());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobID, final String str, final LookupRetryStrategy lookupRetryStrategy) {
        return this.jobManagerFuture.flatMap(new Mapper<ActorGateway, Future<Object>>() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.2
            public Future<Object> apply(ActorGateway actorGateway) {
                return actorGateway.ask(new KvStateMessage.LookupKvStateLocation(jobID, str), AkkaKvStateLocationLookupService.this.askTimeout);
            }
        }, this.actorSystem.dispatcher()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)).recoverWith(new Recover<Future<KvStateLocation>>() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.1
            /* renamed from: recover, reason: merged with bridge method [inline-methods] */
            public Future<KvStateLocation> m642recover(Throwable th) throws Throwable {
                return ((th instanceof UnknownJobManager) && lookupRetryStrategy.tryRetry()) ? Patterns.after(lookupRetryStrategy.getRetryDelay(), AkkaKvStateLocationLookupService.this.actorSystem.scheduler(), AkkaKvStateLocationLookupService.this.actorSystem.dispatcher(), new Callable<Future<KvStateLocation>>() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.1.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Future<KvStateLocation> call() throws Exception {
                        return AkkaKvStateLocationLookupService.this.getKvStateLookupInfo(jobID, str, lookupRetryStrategy);
                    }
                }) : Futures.failed(th);
            }
        }, this.actorSystem.dispatcher());
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
    public void notifyLeaderAddress(String str, final UUID uuid) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received leader address notification {}:{}", str, uuid);
        }
        if (str == null) {
            this.jobManagerFuture = UNKNOWN_JOB_MANAGER;
        } else {
            this.jobManagerFuture = AkkaUtils.getActorRefFuture(str, this.actorSystem, this.askTimeout).map(new Mapper<ActorRef, ActorGateway>() { // from class: org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.3
                public ActorGateway apply(ActorRef actorRef) {
                    return new AkkaActorGateway(actorRef, uuid);
                }
            }, this.actorSystem.dispatcher());
        }
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
    public void handleError(Exception exc) {
        this.jobManagerFuture = Futures.failed(exc);
    }
}
