package org.apache.flink.streaming.api.datastream;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.experimental.CollectSink;
import org.apache.flink.streaming.experimental.SocketStreamIterator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;

@Experimental
/* loaded from: input_file:org/apache/flink/streaming/api/datastream/DataStreamUtils.class */
public final class DataStreamUtils {

    /* loaded from: input_file:org/apache/flink/streaming/api/datastream/DataStreamUtils$CallExecute.class */
    private static class CallExecute extends Thread {
        private final StreamExecutionEnvironment toTrigger;
        private final SocketStreamIterator<?> toNotify;

        private CallExecute(StreamExecutionEnvironment streamExecutionEnvironment, SocketStreamIterator<?> socketStreamIterator) {
            this.toTrigger = streamExecutionEnvironment;
            this.toNotify = socketStreamIterator;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.toTrigger.execute();
            } catch (Throwable th) {
                this.toNotify.notifyOfError(th);
            }
        }
    }

    public static <OUT> Iterator<OUT> collect(DataStream<OUT> dataStream) throws IOException {
        InetAddress localHost;
        TypeSerializer createSerializer = dataStream.getType().createSerializer(dataStream.getExecutionEnvironment().getConfig());
        SocketStreamIterator socketStreamIterator = new SocketStreamIterator(createSerializer);
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        if (executionEnvironment instanceof RemoteStreamEnvironment) {
            try {
                localHost = ConnectionUtils.findConnectingAddress(new InetSocketAddress(((RemoteStreamEnvironment) executionEnvironment).getHost(), ((RemoteStreamEnvironment) executionEnvironment).getPort()), 2000L, 400L);
            } catch (Exception e) {
                throw new IOException("Could not determine an suitable network address to receive back data from the streaming program.", e);
            }
        } else if (executionEnvironment instanceof LocalStreamEnvironment) {
            localHost = InetAddress.getLoopbackAddress();
        } else {
            try {
                localHost = InetAddress.getLocalHost();
            } catch (UnknownHostException e2) {
                throw new IOException("Could not determine this machines own local address to receive back data from the streaming program.", e2);
            }
        }
        dataStream.addSink(new CollectSink(localHost, socketStreamIterator.getPort(), createSerializer)).setParallelism(1);
        new CallExecute(executionEnvironment, socketStreamIterator).start();
        return socketStreamIterator;
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) {
        return reinterpretAsKeyedStream(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    public static <T, K> KeyedStream<T, K> reinterpretAsKeyedStream(DataStream<T> dataStream, KeySelector<T, K> keySelector, TypeInformation<K> typeInformation) {
        return new KeyedStream<>(dataStream, new PartitionTransformation(dataStream.getTransformation(), new ForwardPartitioner()), keySelector, typeInformation);
    }

    private DataStreamUtils() {
    }
}
