package org.apache.flink.runtime.rest.handler.taskmanager;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.class */
public class AbstractTaskManagerFileHandlerTest extends TestLogger {
    private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate();
    private static final DefaultFullHttpRequest HTTP_REQUEST = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/foobar");

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static BlobServer blobServer;
    private static HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> handlerRequest;
    private String fileContent1;
    private TransientBlobKey transientBlobKey1;
    private String fileContent2;
    private TransientBlobKey transientBlobKey2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest$TestTaskManagerFileHandler.class */
    public static final class TestTaskManagerFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
        private final Queue<CompletableFuture<TransientBlobKey>> requestFileUploads;
        private final ResourceID expectedTaskManagerId;

        protected TestTaskManagerFileHandler(@Nonnull GatewayRetriever<? extends RestfulGateway> gatewayRetriever, @Nonnull Time time, @Nonnull Map<String, String> map, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> gatewayRetriever2, @Nonnull TransientBlobService transientBlobService, @Nonnull Time time2, Queue<CompletableFuture<TransientBlobKey>> queue, ResourceID resourceID) {
            super(gatewayRetriever, time, map, untypedResponseMessageHeaders, gatewayRetriever2, transientBlobService, time2);
            this.requestFileUploads = (Queue) Preconditions.checkNotNull(queue);
            this.expectedTaskManagerId = (ResourceID) Preconditions.checkNotNull(resourceID);
        }

        protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> tuple2) {
            Assert.assertThat(tuple2.f0, Matchers.is(Matchers.equalTo(this.expectedTaskManagerId)));
            CompletableFuture<TransientBlobKey> poll = this.requestFileUploads.poll();
            return poll != null ? poll : FutureUtils.completedExceptionally(new FlinkException("Could not upload file."));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest$TestUntypedMessageHeaders.class */
    public static final class TestUntypedMessageHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {
        private static final String URL = "/foobar";

        private TestUntypedMessageHeaders() {
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public TaskManagerMessageParameters m399getUnresolvedMessageParameters() {
            return new TaskManagerMessageParameters();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return URL;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest$TestingChannelHandlerContext.class */
    public static final class TestingChannelHandlerContext implements ChannelHandlerContext {
        final File outputFile;

        private TestingChannelHandlerContext(File file) {
            this.outputFile = (File) Preconditions.checkNotNull(file);
        }

        public ChannelFuture write(Object obj, ChannelPromise channelPromise) {
            if (obj instanceof DefaultFileRegion) {
                DefaultFileRegion defaultFileRegion = (DefaultFileRegion) obj;
                try {
                    FileOutputStream fileOutputStream = new FileOutputStream(this.outputFile);
                    Throwable th = null;
                    try {
                        try {
                            fileOutputStream.getChannel();
                            defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L);
                            if (fileOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        fileOutputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    fileOutputStream.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            return new DefaultChannelPromise(new EmbeddedChannel());
        }

        public EventExecutor executor() {
            return ImmediateEventExecutor.INSTANCE;
        }

        public ChannelFuture write(Object obj) {
            return write(obj, null);
        }

        public ChannelFuture writeAndFlush(Object obj, ChannelPromise channelPromise) {
            ChannelFuture write = write(obj, channelPromise);
            m409flush();
            return write;
        }

        public ChannelFuture writeAndFlush(Object obj) {
            return writeAndFlush(obj, null);
        }

        public ChannelPipeline pipeline() {
            return (ChannelPipeline) Mockito.mock(ChannelPipeline.class);
        }

        public Channel channel() {
            return null;
        }

        public String name() {
            return null;
        }

        public ChannelHandler handler() {
            return null;
        }

        public boolean isRemoved() {
            return false;
        }

        /* renamed from: fireChannelRegistered, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m408fireChannelRegistered() {
            return null;
        }

        /* renamed from: fireChannelUnregistered, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m407fireChannelUnregistered() {
            return null;
        }

        /* renamed from: fireChannelActive, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m406fireChannelActive() {
            return null;
        }

        /* renamed from: fireChannelInactive, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m405fireChannelInactive() {
            return null;
        }

        /* renamed from: fireExceptionCaught, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m404fireExceptionCaught(Throwable th) {
            return null;
        }

        /* renamed from: fireUserEventTriggered, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m403fireUserEventTriggered(Object obj) {
            return null;
        }

        /* renamed from: fireChannelRead, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m402fireChannelRead(Object obj) {
            return null;
        }

        /* renamed from: fireChannelReadComplete, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m401fireChannelReadComplete() {
            return null;
        }

        /* renamed from: fireChannelWritabilityChanged, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m400fireChannelWritabilityChanged() {
            return null;
        }

        public ChannelFuture bind(SocketAddress socketAddress) {
            return null;
        }

        public ChannelFuture connect(SocketAddress socketAddress) {
            return null;
        }

        public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress2) {
            return null;
        }

        public ChannelFuture disconnect() {
            return null;
        }

        public ChannelFuture close() {
            return null;
        }

        public ChannelFuture deregister() {
            return null;
        }

        public ChannelFuture bind(SocketAddress socketAddress, ChannelPromise channelPromise) {
            return null;
        }

        public ChannelFuture connect(SocketAddress socketAddress, ChannelPromise channelPromise) {
            return null;
        }

        public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress2, ChannelPromise channelPromise) {
            return null;
        }

        public ChannelFuture disconnect(ChannelPromise channelPromise) {
            return null;
        }

        public ChannelFuture close(ChannelPromise channelPromise) {
            return null;
        }

        public ChannelFuture deregister(ChannelPromise channelPromise) {
            return null;
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m410read() {
            return null;
        }

        /* renamed from: flush, reason: merged with bridge method [inline-methods] */
        public ChannelHandlerContext m409flush() {
            return null;
        }

        public ByteBufAllocator alloc() {
            return null;
        }

        public ChannelPromise newPromise() {
            return null;
        }

        public ChannelProgressivePromise newProgressivePromise() {
            return null;
        }

        public ChannelFuture newSucceededFuture() {
            return null;
        }

        public ChannelFuture newFailedFuture(Throwable th) {
            return null;
        }

        public ChannelPromise voidPromise() {
            return null;
        }

        public <T> Attribute<T> attr(AttributeKey<T> attributeKey) {
            return null;
        }

        public <T> boolean hasAttr(AttributeKey<T> attributeKey) {
            return false;
        }
    }

    @BeforeClass
    public static void setup() throws IOException, HandlerRequestException {
        Configuration configuration = new Configuration();
        configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        blobServer = new BlobServer(configuration, new VoidBlobStore());
        handlerRequest = new HandlerRequest<>(EmptyRequestBody.getInstance(), new TaskManagerFileMessageParameters(), Collections.singletonMap("taskmanagerid", EXPECTED_TASK_MANAGER_ID.getResourceIdString()), Collections.emptyMap());
    }

    @Before
    public void setupTest() throws IOException {
        this.fileContent1 = UUID.randomUUID().toString();
        this.transientBlobKey1 = storeFileInBlobServer(createFileWithContent(this.fileContent1));
        this.fileContent2 = UUID.randomUUID().toString();
        this.transientBlobKey2 = storeFileInBlobServer(createFileWithContent(this.fileContent2));
    }

    @AfterClass
    public static void teardown() throws IOException {
        if (blobServer != null) {
            blobServer.close();
            blobServer = null;
        }
    }

    @Test
    public void testFileServing() throws Exception {
        Time milliseconds = Time.milliseconds(1000L);
        ArrayDeque arrayDeque = new ArrayDeque(1);
        arrayDeque.add(CompletableFuture.completedFuture(this.transientBlobKey1));
        TestTaskManagerFileHandler createTestTaskManagerFileHandler = createTestTaskManagerFileHandler(milliseconds, arrayDeque, EXPECTED_TASK_MANAGER_ID);
        File newFile = temporaryFolder.newFile();
        createTestTaskManagerFileHandler.respondToRequest(new TestingChannelHandlerContext(newFile), HTTP_REQUEST, handlerRequest, null);
        Assert.assertThat(Long.valueOf(newFile.length()), Matchers.is(Matchers.greaterThan(0L)));
        Assert.assertThat(FileUtils.readFileUtf8(newFile), Matchers.is(Matchers.equalTo(this.fileContent1)));
    }

    @Test
    public void testFileCaching() throws Exception {
        File runFileCachingTest = runFileCachingTest(Time.milliseconds(5000L), Time.milliseconds(0L));
        Assert.assertThat(Long.valueOf(runFileCachingTest.length()), Matchers.is(Matchers.greaterThan(0L)));
        Assert.assertThat(FileUtils.readFileUtf8(runFileCachingTest), Matchers.is(Matchers.equalTo(this.fileContent1)));
    }

    @Test
    public void testFileCacheExpiration() throws Exception {
        Time milliseconds = Time.milliseconds(5L);
        File runFileCachingTest = runFileCachingTest(milliseconds, milliseconds);
        Assert.assertThat(Long.valueOf(runFileCachingTest.length()), Matchers.is(Matchers.greaterThan(0L)));
        Assert.assertThat(FileUtils.readFileUtf8(runFileCachingTest), Matchers.is(Matchers.equalTo(this.fileContent2)));
    }

    private File runFileCachingTest(Time time, Time time2) throws Exception {
        ArrayDeque arrayDeque = new ArrayDeque(2);
        arrayDeque.add(CompletableFuture.completedFuture(this.transientBlobKey1));
        arrayDeque.add(CompletableFuture.completedFuture(this.transientBlobKey2));
        TestTaskManagerFileHandler createTestTaskManagerFileHandler = createTestTaskManagerFileHandler(time, arrayDeque, EXPECTED_TASK_MANAGER_ID);
        File newFile = temporaryFolder.newFile();
        TestingChannelHandlerContext testingChannelHandlerContext = new TestingChannelHandlerContext(newFile);
        createTestTaskManagerFileHandler.respondToRequest(testingChannelHandlerContext, HTTP_REQUEST, handlerRequest, null);
        Thread.sleep(time2.toMilliseconds());
        createTestTaskManagerFileHandler.respondToRequest(testingChannelHandlerContext, HTTP_REQUEST, handlerRequest, null);
        return newFile;
    }

    private TestTaskManagerFileHandler createTestTaskManagerFileHandler(Time time, Queue<CompletableFuture<TransientBlobKey>> queue, ResourceID resourceID) {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        return new TestTaskManagerFileHandler(() -> {
            return CompletableFuture.completedFuture(null);
        }, TestingUtils.infiniteTime(), Collections.emptyMap(), new TestUntypedMessageHeaders(), () -> {
            return CompletableFuture.completedFuture(testingResourceManagerGateway);
        }, blobServer, time, queue, resourceID);
    }

    private static File createFileWithContent(String str) throws IOException {
        File newFile = temporaryFolder.newFile();
        FileOutputStream fileOutputStream = new FileOutputStream(newFile);
        Throwable th = null;
        try {
            try {
                fileOutputStream.write(str.getBytes("UTF-8"));
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                return newFile;
            } finally {
            }
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private static TransientBlobKey storeFileInBlobServer(File file) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(file);
        Throwable th = null;
        try {
            TransientBlobKey putTransient = blobServer.getTransientBlobService().putTransient(fileInputStream);
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            return putTransient;
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }
}
