package org.apache.flink.streaming.api.functions.sink;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.AssumptionViolatedException;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.class */
class SocketClientSinkTest {
    private static final String TEST_MESSAGE = "testSocketSinkInvoke";
    private static final String EXCEPTION_MESSGAE = "Failed to send message 'testSocketSinkInvoke\n'";
    private static final String host = "127.0.0.1";
    private SerializationSchema<String> simpleSchema = new SerializationSchema<String>() { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.1
        public byte[] serialize(String str) {
            return str.getBytes(ConfigConstants.DEFAULT_CHARSET);
        }
    };

    SocketClientSinkTest() {
    }

    @Test
    void testSocketSink() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        final int localPort = serverSocket.getLocalPort();
        CheckedThread checkedThread = new CheckedThread("Test sink runner") { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.2
            public void go() throws Exception {
                SocketClientSink socketClientSink = new SocketClientSink(SocketClientSinkTest.host, localPort, SocketClientSinkTest.this.simpleSchema, 0);
                socketClientSink.open(DefaultOpenContext.INSTANCE);
                socketClientSink.invoke("testSocketSinkInvoke\n", SinkContextUtil.forTimestamp(0L));
                socketClientSink.close();
            }
        };
        checkedThread.start();
        String readLine = new BufferedReader(new InputStreamReader(NetUtils.acceptWithoutTimeout(serverSocket).getInputStream())).readLine();
        checkedThread.sync();
        serverSocket.close();
        Assertions.assertThat(readLine).isEqualTo(TEST_MESSAGE);
    }

    @Test
    void testSinkAutoFlush() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        final SocketClientSink socketClientSink = new SocketClientSink(host, serverSocket.getLocalPort(), this.simpleSchema, 0, true);
        socketClientSink.open(DefaultOpenContext.INSTANCE);
        CheckedThread checkedThread = new CheckedThread("Test sink runner") { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.3
            public void go() throws Exception {
                socketClientSink.invoke("testSocketSinkInvoke\n", SinkContextUtil.forTimestamp(0L));
            }
        };
        checkedThread.start();
        String readLine = new BufferedReader(new InputStreamReader(NetUtils.acceptWithoutTimeout(serverSocket).getInputStream())).readLine();
        checkedThread.sync();
        socketClientSink.close();
        serverSocket.close();
        Assertions.assertThat(readLine).isEqualTo(TEST_MESSAGE);
    }

    @Test
    void testSocketSinkNoRetry() throws Exception {
        final ServerSocket serverSocket = new ServerSocket(0);
        int localPort = serverSocket.getLocalPort();
        try {
            CheckedThread checkedThread = new CheckedThread("Test server runner") { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.4
                public void go() throws Exception {
                    NetUtils.acceptWithoutTimeout(serverSocket).close();
                }
            };
            checkedThread.start();
            SocketClientSink socketClientSink = new SocketClientSink(host, localPort, this.simpleSchema, 0, true);
            socketClientSink.open(DefaultOpenContext.INSTANCE);
            checkedThread.sync();
            Assertions.assertThatThrownBy(() -> {
                while (true) {
                    socketClientSink.invoke("testSocketSinkInvoke\n", SinkContextUtil.forTimestamp(0L));
                }
            }).isInstanceOf(IOException.class).hasMessageContaining(EXCEPTION_MESSGAE);
            Assertions.assertThat(socketClientSink.getCurrentNumberOfRetries()).isZero();
            IOUtils.closeQuietly(serverSocket);
        } catch (Throwable th) {
            IOUtils.closeQuietly(serverSocket);
            throw th;
        }
    }

    @Test
    void testRetry() throws Exception {
        final ServerSocket[] serverSocketArr = new ServerSocket[1];
        ExecutorService[] executorServiceArr = new ExecutorService[1];
        try {
            serverSocketArr[0] = new ServerSocket(0);
            executorServiceArr[0] = Executors.newCachedThreadPool();
            int localPort = serverSocketArr[0].getLocalPort();
            Future submit = executorServiceArr[0].submit(new Callable<Void>() { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.5
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    Socket acceptWithoutTimeout = NetUtils.acceptWithoutTimeout(serverSocketArr[0]);
                    Assertions.assertThat(new BufferedReader(new InputStreamReader(acceptWithoutTimeout.getInputStream())).readLine()).isEqualTo("0");
                    acceptWithoutTimeout.close();
                    return null;
                }
            });
            final SocketClientSink socketClientSink = new SocketClientSink(host, serverSocketArr[0].getLocalPort(), this.simpleSchema, -1, true);
            socketClientSink.open(DefaultOpenContext.INSTANCE);
            socketClientSink.invoke("0\n", SinkContextUtil.forTimestamp(0L));
            submit.get();
            serverSocketArr[0].close();
            Assertions.assertThat(serverSocketArr[0].isClosed()).isTrue();
            Assertions.assertThat(socketClientSink.getCurrentNumberOfRetries()).isZero();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            new CountDownLatch(1);
            executorServiceArr[0].submit(new Callable<Void>() { // from class: org.apache.flink.streaming.api.functions.sink.SocketClientSinkTest.6
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    while (countDownLatch.getCount() != 0) {
                        socketClientSink.invoke("1\n");
                    }
                    return null;
                }
            });
            while (socketClientSink.getCurrentNumberOfRetries() == 0) {
                Thread.sleep(100L);
            }
            countDownLatch.countDown();
            try {
                serverSocketArr[0] = new ServerSocket(localPort);
                Assertions.assertThat(new BufferedReader(new InputStreamReader(NetUtils.acceptWithoutTimeout(serverSocketArr[0]).getInputStream())).readLine()).isEqualTo("1");
                if (serverSocketArr[0] != null) {
                    serverSocketArr[0].close();
                }
                if (executorServiceArr[0] != null) {
                    executorServiceArr[0].shutdown();
                }
            } catch (BindException e) {
                throw new AssumptionViolatedException("Could not bind server to previous port.", e);
            }
        } catch (Throwable th) {
            if (serverSocketArr[0] != null) {
                serverSocketArr[0].close();
            }
            if (executorServiceArr[0] != null) {
                executorServiceArr[0].shutdown();
            }
            throw th;
        }
    }
}
