package org.apache.flink.queryablestate.network;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/queryablestate/network/ServerConnection.class */
public final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> {
    private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);

    @GuardedBy("connectionLock")
    private InternalConnection<REQ, RESP> internalConnection;
    private final Object connectionLock = new Object();

    @GuardedBy("connectionLock")
    private boolean running = true;
    private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/ServerConnection$EstablishedConnection.class */
    public static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody> implements ClientHandlerCallback<RESP>, InternalConnection<REQ, RESP> {
        private final Channel channel;
        private final KvStateRequestStats stats;
        private final Object lock = new Object();
        private final ConcurrentHashMap<Long, TimestampedCompletableFuture<RESP>> pendingRequests = new ConcurrentHashMap<>();
        private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

        @GuardedBy("lock")
        private long requestCount = 0;

        @GuardedBy("lock")
        private boolean running = true;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/queryablestate/network/ServerConnection$EstablishedConnection$TimestampedCompletableFuture.class */
        public static final class TimestampedCompletableFuture<RESP extends MessageBody> extends CompletableFuture<RESP> {
            private final long timestampInNanos;

            TimestampedCompletableFuture(long j) {
                this.timestampInNanos = j;
            }

            public long getTimestamp() {
                return this.timestampInNanos;
            }
        }

        EstablishedConnection(String str, MessageSerializer<REQ, RESP> messageSerializer, Channel channel, KvStateRequestStats kvStateRequestStats) {
            this.channel = (Channel) Preconditions.checkNotNull(channel);
            channel.pipeline().addLast(str + " Handler", new ClientHandler(str, messageSerializer, this));
            this.stats = kvStateRequestStats;
            kvStateRequestStats.reportActiveConnection();
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public CompletableFuture<Void> close() {
            return close(new ClosedChannelException());
        }

        private CompletableFuture<Void> close(Throwable th) {
            synchronized (this.lock) {
                if (this.running) {
                    this.running = false;
                    this.channel.close().addListener(future -> {
                        this.stats.reportInactiveConnection();
                        Iterator it = this.pendingRequests.keySet().iterator();
                        while (it.hasNext()) {
                            TimestampedCompletableFuture<RESP> remove = this.pendingRequests.remove(Long.valueOf(((Long) it.next()).longValue()));
                            if (remove != null && remove.completeExceptionally(th)) {
                                this.stats.reportFailedRequest();
                            }
                        }
                        if (future.isSuccess()) {
                            this.closeFuture.completeExceptionally(th);
                        } else {
                            ServerConnection.LOG.warn("Something went wrong when trying to close connection due to : ", th);
                            this.closeFuture.completeExceptionally(future.cause());
                        }
                    });
                }
            }
            return this.closeFuture;
        }

        /*  JADX ERROR: Failed to decode insn: 0x001E: MOVE_MULTI, method: org.apache.flink.queryablestate.network.ServerConnection.EstablishedConnection.sendRequest(REQ extends org.apache.flink.queryablestate.network.messages.MessageBody):java.util.concurrent.CompletableFuture<RESP extends org.apache.flink.queryablestate.network.messages.MessageBody>
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public java.util.concurrent.CompletableFuture<RESP> sendRequest(REQ r9) {
            /*
                r8 = this;
                r0 = r8
                java.lang.Object r0 = r0.lock
                r1 = r0
                r10 = r1
                monitor-enter(r0)
                r0 = r8
                boolean r0 = r0.running
                if (r0 == 0) goto L77
                org.apache.flink.queryablestate.network.ServerConnection$EstablishedConnection$TimestampedCompletableFuture r0 = new org.apache.flink.queryablestate.network.ServerConnection$EstablishedConnection$TimestampedCompletableFuture
                r1 = r0
                long r2 = java.lang.System.nanoTime()
                r1.<init>(r2)
                r11 = r0
                r0 = r8
                r1 = r0
                long r1 = r1.requestCount
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 1
                long r1 = r1 + r2
                r0.requestCount = r1
                r12 = r-1
                r-1 = r8
                java.util.concurrent.ConcurrentHashMap<java.lang.Long, org.apache.flink.queryablestate.network.ServerConnection$EstablishedConnection$TimestampedCompletableFuture<RESP extends org.apache.flink.queryablestate.network.messages.MessageBody>> r-1 = r-1.pendingRequests
                r0 = r12
                java.lang.Long r0 = java.lang.Long.valueOf(r0)
                r1 = r11
                r-1.put(r0, r1)
                r-1 = r8
                org.apache.flink.queryablestate.network.stats.KvStateRequestStats r-1 = r-1.stats
                r-1.reportRequest()
                r-1 = r8
                org.apache.flink.shaded.netty4.io.netty.channel.Channel r-1 = r-1.channel
                r-1.alloc()
                r0 = r12
                r1 = r9
                org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(r-1, r0, r1)
                r14 = r-1
                r-1 = r8
                org.apache.flink.shaded.netty4.io.netty.channel.Channel r-1 = r-1.channel
                r0 = r14
                r-1.writeAndFlush(r0)
                r0 = r8
                r1 = r12
                java.util.concurrent.CompletableFuture<RESP extends org.apache.flink.queryablestate.network.messages.MessageBody> r0 = (v2) -> { // org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener.operationComplete(org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future):void
                    r0.lambda$sendRequest$1(r1, v2);
                }
                r-1.addListener(r0)
                goto L73
                r12 = move-exception
                r0 = r11
                r1 = r12
                boolean r0 = r0.completeExceptionally(r1)
                r-1 = r11
                r0 = r10
                monitor-exit(r0)
                return r-1
                java.nio.channels.ClosedChannelException r0 = new java.nio.channels.ClosedChannelException
                r1 = r0
                r1.<init>()
                java.util.concurrent.CompletableFuture r0 = org.apache.flink.queryablestate.FutureUtils.getFailedFuture(r0)
                r1 = r10
                monitor-exit(r1)
                return r0
                r15 = move-exception
                r0 = r10
                monitor-exit(r0)
                r0 = r15
                throw r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.queryablestate.network.ServerConnection.EstablishedConnection.sendRequest(org.apache.flink.queryablestate.network.messages.MessageBody):java.util.concurrent.CompletableFuture");
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public InternalConnection<REQ, RESP> establishConnection(ChannelFuture channelFuture) {
            throw new IllegalStateException("The connection is already established.");
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public boolean isEstablished() {
            return true;
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public CompletableFuture<Void> getCloseFuture() {
            return this.closeFuture;
        }

        @Override // org.apache.flink.queryablestate.network.ClientHandlerCallback
        public void onRequestResult(long j, RESP resp) {
            TimestampedCompletableFuture<RESP> remove = this.pendingRequests.remove(Long.valueOf(j));
            if (remove == null || remove.isDone()) {
                return;
            }
            this.stats.reportSuccessfulRequest((System.nanoTime() - remove.getTimestamp()) / 1000000);
            remove.complete(resp);
        }

        @Override // org.apache.flink.queryablestate.network.ClientHandlerCallback
        public void onRequestFailure(long j, Throwable th) {
            TimestampedCompletableFuture<RESP> remove = this.pendingRequests.remove(Long.valueOf(j));
            if (remove == null || remove.isDone()) {
                return;
            }
            this.stats.reportFailedRequest();
            remove.completeExceptionally(th);
        }

        @Override // org.apache.flink.queryablestate.network.ClientHandlerCallback
        public void onFailure(Throwable th) {
            close(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/ServerConnection$InternalConnection.class */
    public interface InternalConnection<REQ, RESP> {
        CompletableFuture<RESP> sendRequest(REQ req);

        InternalConnection<REQ, RESP> establishConnection(ChannelFuture channelFuture);

        boolean isEstablished();

        CompletableFuture<Void> getCloseFuture();

        CompletableFuture<Void> close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/ServerConnection$PendingConnection.class */
    public static final class PendingConnection<REQ extends MessageBody, RESP extends MessageBody> implements InternalConnection<REQ, RESP> {
        private final String clientName;
        private final MessageSerializer<REQ, RESP> serializer;
        private final KvStateRequestStats stats;
        private final CompletableFuture<Void> closeFuture;
        private final ArrayDeque<PendingRequest<REQ, RESP>> queuedRequests;

        @Nullable
        private Throwable failureCause;
        private boolean running;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/queryablestate/network/ServerConnection$PendingConnection$PendingRequest.class */
        public static final class PendingRequest<REQ extends MessageBody, RESP extends MessageBody> extends CompletableFuture<RESP> {
            private final REQ request;

            private PendingRequest(REQ req) {
                this.request = req;
            }

            public REQ getRequest() {
                return this.request;
            }
        }

        private PendingConnection(String str, MessageSerializer<REQ, RESP> messageSerializer, KvStateRequestStats kvStateRequestStats) {
            this.closeFuture = new CompletableFuture<>();
            this.queuedRequests = new ArrayDeque<>();
            this.failureCause = null;
            this.running = true;
            this.clientName = str;
            this.serializer = messageSerializer;
            this.stats = kvStateRequestStats;
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public CompletableFuture<RESP> sendRequest(REQ req) {
            if (this.failureCause != null) {
                return FutureUtils.getFailedFuture(this.failureCause);
            }
            if (!this.running) {
                return FutureUtils.getFailedFuture(new ClosedChannelException());
            }
            PendingRequest<REQ, RESP> pendingRequest = new PendingRequest<>(req);
            this.queuedRequests.add(pendingRequest);
            return pendingRequest;
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public InternalConnection<REQ, RESP> establishConnection(ChannelFuture channelFuture) {
            if (channelFuture.isSuccess()) {
                return createEstablishedConnection(channelFuture.channel());
            }
            close(channelFuture.cause());
            return this;
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public boolean isEstablished() {
            return false;
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public CompletableFuture<Void> getCloseFuture() {
            return this.closeFuture;
        }

        private InternalConnection<REQ, RESP> createEstablishedConnection(Channel channel) {
            if (this.failureCause != null || !this.running) {
                channel.close();
                return this;
            }
            EstablishedConnection establishedConnection = new EstablishedConnection(this.clientName, this.serializer, channel, this.stats);
            while (!this.queuedRequests.isEmpty()) {
                PendingRequest<REQ, RESP> poll = this.queuedRequests.poll();
                establishedConnection.sendRequest((EstablishedConnection) poll.getRequest()).whenComplete((messageBody, th) -> {
                    if (th != null) {
                        poll.completeExceptionally(th);
                    } else {
                        poll.complete(messageBody);
                    }
                });
            }
            return establishedConnection;
        }

        @Override // org.apache.flink.queryablestate.network.ServerConnection.InternalConnection
        public CompletableFuture<Void> close() {
            return close(new ClosedChannelException());
        }

        private CompletableFuture<Void> close(Throwable th) {
            if (this.running) {
                this.running = false;
                this.failureCause = th;
                Iterator<PendingRequest<REQ, RESP>> it = this.queuedRequests.iterator();
                while (it.hasNext()) {
                    it.next().completeExceptionally(th);
                }
                this.queuedRequests.clear();
                this.closeFuture.completeExceptionally(th);
            }
            return this.closeFuture;
        }
    }

    private ServerConnection(InternalConnection<REQ, RESP> internalConnection) {
        this.internalConnection = internalConnection;
        forwardCloseFuture();
    }

    @GuardedBy("connectionLock")
    private void forwardCloseFuture() {
        InternalConnection<REQ, RESP> internalConnection = this.internalConnection;
        internalConnection.getCloseFuture().whenComplete((r5, th) -> {
            synchronized (this.connectionLock) {
                if (this.internalConnection == internalConnection) {
                    if (th != null) {
                        this.closeFuture.completeExceptionally(th);
                    } else {
                        this.closeFuture.complete(null);
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<RESP> sendRequest(REQ req) {
        CompletableFuture<RESP> sendRequest;
        synchronized (this.connectionLock) {
            Preconditions.checkState(this.running, "Connection has already been closed.");
            sendRequest = this.internalConnection.sendRequest(req);
        }
        return sendRequest;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void establishConnection(ChannelFuture channelFuture) {
        synchronized (this.connectionLock) {
            Preconditions.checkState(this.running, "Connection has already been closed.");
            this.internalConnection = this.internalConnection.establishConnection(channelFuture);
            forwardCloseFuture();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.connectionLock) {
            if (this.running) {
                this.running = false;
                this.internalConnection.close();
            }
            completableFuture = this.closeFuture;
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <REQ extends MessageBody, RESP extends MessageBody> ServerConnection<REQ, RESP> createPendingConnection(String str, MessageSerializer<REQ, RESP> messageSerializer, KvStateRequestStats kvStateRequestStats) {
        return new ServerConnection<>(new PendingConnection(str, messageSerializer, kvStateRequestStats));
    }
}
