package org.apache.flink.kubernetes.operator;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RuntimeInfo;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.HealthProbe;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.NetUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/HealthProbeTest.class */
public class HealthProbeTest {
    KubernetesClient client;

    @Test
    public void testHealthProbeEndpoint() throws Exception {
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            Configuration configuration = new Configuration();
            configuration.set(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT, Integer.valueOf(availablePort.getPort()));
            FlinkOperator flinkOperator = new FlinkOperator(configuration) { // from class: org.apache.flink.kubernetes.operator.HealthProbeTest.1
                Operator createOperator() {
                    return new Operator(HealthProbeTest.this.client);
                }
            };
            try {
                flinkOperator.run();
                Assertions.assertTrue(callHealthEndpoint(configuration));
                Assertions.assertNotNull(HealthProbe.INSTANCE.getRuntimeInfo());
                flinkOperator.stop();
                Assertions.assertFalse(callHealthEndpoint(configuration));
                if (availablePort != null) {
                    availablePort.close();
                }
            } catch (Throwable th) {
                flinkOperator.stop();
                Assertions.assertFalse(callHealthEndpoint(configuration));
                throw th;
            }
        } catch (Throwable th2) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    public void testHealthProbe() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        HealthProbe.INSTANCE.setRuntimeInfo(new RuntimeInfo(new Operator(this.client)) { // from class: org.apache.flink.kubernetes.operator.HealthProbeTest.2
            public boolean isStarted() {
                return atomicBoolean.get();
            }

            public boolean allEventSourcesAreHealthy() {
                return atomicBoolean2.get();
            }
        });
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        atomicBoolean.set(true);
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        atomicBoolean2.set(true);
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        atomicBoolean.set(false);
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
    }

    private boolean callHealthEndpoint(Configuration configuration) throws Exception {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://localhost:" + configuration.get(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT) + "/").openConnection();
        httpURLConnection.setConnectTimeout(100000);
        httpURLConnection.connect();
        return httpURLConnection.getResponseCode() == HttpResponseStatus.OK.code();
    }
}
