package org.apache.flink.runtime.rest.handler.legacy;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.class */
public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> implements WebHandler {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class);
    private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log";
    private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout";
    private final HashMap<String, TransientBlobKey> lastSubmittedLog;
    private final HashMap<String, TransientBlobKey> lastSubmittedStdout;
    private final ConcurrentHashMap<String, Boolean> lastRequestPending;
    private final Configuration config;
    private CompletableFuture<TransientBlobCache> cache;
    private FileMode fileMode;
    private final Executor executor;

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler$FileMode.class */
    public enum FileMode {
        LOG,
        STDOUT
    }

    public TaskManagerLogHandler(GatewayRetriever<JobManagerGateway> gatewayRetriever, Executor executor, CompletableFuture<String> completableFuture, Time time, FileMode fileMode, Configuration configuration) {
        super(completableFuture, gatewayRetriever, time, Collections.emptyMap());
        this.lastSubmittedLog = new HashMap<>();
        this.lastSubmittedStdout = new HashMap<>();
        this.lastRequestPending = new ConcurrentHashMap<>();
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.config = configuration;
        this.fileMode = fileMode;
    }

    @Override // org.apache.flink.runtime.rest.handler.WebHandler
    public String[] getPaths() {
        switch (this.fileMode) {
            case LOG:
                return new String[]{TASKMANAGER_LOG_REST_PATH};
            case STDOUT:
            default:
                return new String[]{TASKMANAGER_OUT_REST_PATH};
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.rest.handler.RedirectHandler
    public void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, JobManagerGateway jobManagerGateway) {
        if (this.cache == null) {
            this.cache = jobManagerGateway.requestBlobServerPort(this.timeout).thenApplyAsync(num -> {
                try {
                    return new TransientBlobCache(this.config, new InetSocketAddress(jobManagerGateway.getHostname(), num.intValue()));
                } catch (IOException e) {
                    throw new CompletionException((Throwable) new FlinkException("Could not create TransientBlobCache.", e));
                }
            }, this.executor);
        }
        String param = routedRequest.getRouteResult().param("taskmanagerid");
        HttpRequest request = routedRequest.getRequest();
        if (this.lastRequestPending.putIfAbsent(param, true) != null) {
            display(channelHandlerContext, request, "loading...");
            return;
        }
        try {
            try {
                ResourceID resourceID = new ResourceID(URLDecoder.decode(param, "UTF-8"));
                CompletableFuture thenCombineAsync = jobManagerGateway.requestTaskManagerInstance(resourceID, this.timeout).thenCompose(optional -> {
                    Instance instance = (Instance) optional.orElseThrow(() -> {
                        return new CompletionException((Throwable) new FlinkException("Could not find instance with " + resourceID + '.'));
                    });
                    switch (this.fileMode) {
                        case LOG:
                            return instance.getTaskManagerGateway().requestTaskManagerLog(this.timeout);
                        case STDOUT:
                        default:
                            return instance.getTaskManagerGateway().requestTaskManagerStdout(this.timeout);
                    }
                }).thenCombineAsync((CompletionStage) this.cache, (BiFunction<? super U, ? super U, ? extends V>) (transientBlobKey, transientBlobCache) -> {
                    HashMap<String, TransientBlobKey> hashMap = this.fileMode == FileMode.LOG ? this.lastSubmittedLog : this.lastSubmittedStdout;
                    if (!hashMap.containsKey(param)) {
                        hashMap.put(param, transientBlobKey);
                    } else if (!Objects.equals(transientBlobKey, hashMap.get(param))) {
                        if (!transientBlobCache.deleteFromCache(hashMap.get(param))) {
                            throw new CompletionException((Throwable) new FlinkException("Could not delete file for " + param + '.'));
                        }
                        hashMap.put(param, transientBlobKey);
                    }
                    try {
                        return transientBlobCache.getFile(transientBlobKey).getAbsolutePath();
                    } catch (IOException e) {
                        throw new CompletionException((Throwable) new FlinkException("Could not retrieve blob for " + transientBlobKey + '.', e));
                    }
                }, this.executor);
                thenCombineAsync.exceptionally(th -> {
                    display(channelHandlerContext, request, "Fetching TaskManager log failed.");
                    LOG.error("Fetching TaskManager log failed.", th);
                    this.lastRequestPending.remove(param);
                    return null;
                });
                thenCombineAsync.thenAccept(str -> {
                    ChannelFuture addListener;
                    try {
                        RandomAccessFile randomAccessFile = new RandomAccessFile(new File(str), "r");
                        try {
                            long length = randomAccessFile.length();
                            FileChannel channel = randomAccessFile.getChannel();
                            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                            defaultHttpResponse.headers().set("Content-Type", HttpPostBodyUtil.DEFAULT_TEXT_CONTENT_TYPE);
                            if (HttpHeaders.isKeepAlive(request)) {
                                defaultHttpResponse.headers().set("Connection", "keep-alive");
                            }
                            HttpHeaders.setContentLength(defaultHttpResponse, length);
                            channelHandlerContext.write(defaultHttpResponse);
                            GenericFutureListener genericFutureListener = future -> {
                                this.lastRequestPending.remove(param);
                                channel.close();
                                randomAccessFile.close();
                            };
                            if (channelHandlerContext.pipeline().get(SslHandler.class) == null) {
                                channelHandlerContext.write(new DefaultFileRegion(channel, 0L, length), channelHandlerContext.newProgressivePromise()).addListener(genericFutureListener);
                                addListener = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
                            } else {
                                try {
                                    addListener = channelHandlerContext.writeAndFlush(new HttpChunkedInput(new ChunkedFile(randomAccessFile, 0L, length, 8192)), channelHandlerContext.newProgressivePromise()).addListener(genericFutureListener);
                                } catch (IOException e) {
                                    display(channelHandlerContext, request, "Displaying TaskManager log failed.");
                                    LOG.warn("Could not write http data.", e);
                                    return;
                                }
                            }
                            if (HttpHeaders.isKeepAlive(request)) {
                                return;
                            }
                            addListener.addListener(ChannelFutureListener.CLOSE);
                        } catch (IOException e2) {
                            display(channelHandlerContext, request, "Displaying TaskManager log failed.");
                            LOG.error("Displaying TaskManager log failed.", e2);
                            try {
                                randomAccessFile.close();
                            } catch (IOException e3) {
                                LOG.error("Could not close random access file.", e3);
                            }
                        }
                    } catch (FileNotFoundException e4) {
                        display(channelHandlerContext, request, "Displaying TaskManager log failed.");
                        LOG.error("Displaying TaskManager log failed.", e4);
                    }
                });
            } catch (UnsupportedEncodingException e) {
                throw new FlinkException("Could not decode task manager id: " + param + '.', e);
            }
        } catch (Exception e2) {
            display(channelHandlerContext, request, "Error: " + e2.getMessage());
            LOG.error("Fetching TaskManager log failed.", e2);
            this.lastRequestPending.remove(param);
        }
    }

    private void display(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, String str) {
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        defaultHttpResponse.headers().set("Content-Type", HttpPostBodyUtil.DEFAULT_TEXT_CONTENT_TYPE);
        if (HttpHeaders.isKeepAlive(httpRequest)) {
            defaultHttpResponse.headers().set("Connection", "keep-alive");
        }
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(str.getBytes(ConfigConstants.DEFAULT_CHARSET));
        HttpHeaders.setContentLength(defaultHttpResponse, r0.length);
        channelHandlerContext.write(defaultHttpResponse);
        channelHandlerContext.write(copiedBuffer);
        ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if (HttpHeaders.isKeepAlive(httpRequest)) {
            return;
        }
        writeAndFlush.addListener(ChannelFutureListener.CLOSE);
    }
}
