package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.class */
public class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CreditBasedPartitionRequestClientHandler.class);
    private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap();
    private final ArrayDeque<ClientOutboundMessage> clientOutboundMessages = new ArrayDeque<>();
    private final AtomicReference<Throwable> channelError = new AtomicReference<>();
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = new ConcurrentHashMap();
    private volatile ChannelHandlerContext ctx;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler$AddCreditMessage.class */
    private static class AddCreditMessage extends ClientOutboundMessage {
        AddCreditMessage(RemoteInputChannel remoteInputChannel) {
            super((RemoteInputChannel) Preconditions.checkNotNull(remoteInputChannel));
        }

        @Override // org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.ClientOutboundMessage
        public Object buildMessage() {
            return new NettyMessage.AddCredit(this.inputChannel.getAndResetUnannouncedCredit(), this.inputChannel.getInputChannelId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler$ClientOutboundMessage.class */
    public static abstract class ClientOutboundMessage {
        protected final RemoteInputChannel inputChannel;

        ClientOutboundMessage(RemoteInputChannel remoteInputChannel) {
            this.inputChannel = remoteInputChannel;
        }

        abstract Object buildMessage();
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler$ResumeConsumptionMessage.class */
    private static class ResumeConsumptionMessage extends ClientOutboundMessage {
        ResumeConsumptionMessage(RemoteInputChannel remoteInputChannel) {
            super((RemoteInputChannel) Preconditions.checkNotNull(remoteInputChannel));
        }

        @Override // org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.ClientOutboundMessage
        Object buildMessage() {
            return new NettyMessage.ResumeConsumption(this.inputChannel.getInputChannelId());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler$WriteAndFlushNextMessageIfPossibleListener.class */
    private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            try {
                if (channelFuture.isSuccess()) {
                    CreditBasedPartitionRequestClientHandler.this.writeAndFlushNextMessageIfPossible(channelFuture.channel());
                } else if (channelFuture.cause() != null) {
                    CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(channelFuture.cause());
                } else {
                    CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(new IllegalStateException("Sending cancelled by user."));
                }
            } catch (Throwable th) {
                CreditBasedPartitionRequestClientHandler.this.notifyAllChannelsOfErrorAndClose(th);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.NetworkClientHandler
    public void addInputChannel(RemoteInputChannel remoteInputChannel) throws IOException {
        checkError();
        this.inputChannels.putIfAbsent(remoteInputChannel.getInputChannelId(), remoteInputChannel);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkClientHandler
    public void removeInputChannel(RemoteInputChannel remoteInputChannel) {
        this.inputChannels.remove(remoteInputChannel.getInputChannelId());
    }

    @Override // org.apache.flink.runtime.io.network.NetworkClientHandler
    public RemoteInputChannel getInputChannel(InputChannelID inputChannelID) {
        return this.inputChannels.get(inputChannelID);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkClientHandler
    public void cancelRequestFor(InputChannelID inputChannelID) {
        if (inputChannelID == null || this.ctx == null || this.cancelled.putIfAbsent(inputChannelID, inputChannelID) != null) {
            return;
        }
        this.ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelID));
    }

    @Override // org.apache.flink.runtime.io.network.NetworkClientHandler
    public void notifyCreditAvailable(RemoteInputChannel remoteInputChannel) {
        this.ctx.executor().execute(() -> {
            this.ctx.pipeline().fireUserEventTriggered(new AddCreditMessage(remoteInputChannel));
        });
    }

    @Override // org.apache.flink.runtime.io.network.NetworkClientHandler
    public void resumeConsumption(RemoteInputChannel remoteInputChannel) {
        this.ctx.executor().execute(() -> {
            this.ctx.pipeline().fireUserEventTriggered(new ResumeConsumptionMessage(remoteInputChannel));
        });
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ctx == null) {
            this.ctx = channelHandlerContext;
        }
        super.channelActive(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (!this.inputChannels.isEmpty()) {
            SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
            notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Connection unexpectedly closed by remote task manager '" + remoteAddress + "'. This might indicate that the remote task manager was lost.", remoteAddress));
        }
        super.channelInactive(channelHandlerContext);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (th instanceof TransportException) {
            notifyAllChannelsOfErrorAndClose(th);
        } else {
            SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
            notifyAllChannelsOfErrorAndClose((th.getMessage() == null || !th.getMessage().contains("Connection reset by peer")) ? new LocalTransportException(String.format("%s (connection to '%s')", th.getMessage(), remoteAddress), channelHandlerContext.channel().localAddress(), th) : new RemoteTransportException("Lost connection to task manager '" + remoteAddress + "'. This indicates that the remote task manager was lost.", remoteAddress, th));
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        try {
            decodeMsg(obj);
        } catch (Throwable th) {
            notifyAllChannelsOfErrorAndClose(th);
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof ClientOutboundMessage)) {
            channelHandlerContext.fireUserEventTriggered(obj);
            return;
        }
        boolean isEmpty = this.clientOutboundMessages.isEmpty();
        this.clientOutboundMessages.add((ClientOutboundMessage) obj);
        if (isEmpty) {
            writeAndFlushNextMessageIfPossible(channelHandlerContext.channel());
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        writeAndFlushNextMessageIfPossible(channelHandlerContext.channel());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyAllChannelsOfErrorAndClose(Throwable th) {
        if (this.channelError.compareAndSet(null, th)) {
            try {
                try {
                    Iterator<RemoteInputChannel> it = this.inputChannels.values().iterator();
                    while (it.hasNext()) {
                        it.next().onError(th);
                    }
                } catch (Throwable th2) {
                    LOG.warn("An Exception was thrown during error notification of a remote input channel.", th2);
                    this.inputChannels.clear();
                    this.clientOutboundMessages.clear();
                    if (this.ctx != null) {
                        this.ctx.close();
                    }
                }
            } finally {
                this.inputChannels.clear();
                this.clientOutboundMessages.clear();
                if (this.ctx != null) {
                    this.ctx.close();
                }
            }
        }
    }

    @VisibleForTesting
    void checkError() throws IOException {
        Throwable th = this.channelError.get();
        if (th != null) {
            if (!(th instanceof IOException)) {
                throw new IOException("There has been an error in the channel.", th);
            }
            throw ((IOException) th);
        }
    }

    private void decodeMsg(Object obj) throws Throwable {
        Class<?> cls = obj.getClass();
        if (cls == NettyMessage.BufferResponse.class) {
            NettyMessage.BufferResponse bufferResponse = (NettyMessage.BufferResponse) obj;
            RemoteInputChannel remoteInputChannel = this.inputChannels.get(bufferResponse.receiverId);
            if (remoteInputChannel == null || remoteInputChannel.isReleased()) {
                bufferResponse.releaseBuffer();
                cancelRequestFor(bufferResponse.receiverId);
                return;
            } else {
                try {
                    decodeBufferOrEvent(remoteInputChannel, bufferResponse);
                    return;
                } catch (Throwable th) {
                    remoteInputChannel.onError(th);
                    return;
                }
            }
        }
        if (cls != NettyMessage.ErrorResponse.class) {
            throw new IllegalStateException("Received unknown message from producer: " + obj.getClass());
        }
        NettyMessage.ErrorResponse errorResponse = (NettyMessage.ErrorResponse) obj;
        SocketAddress remoteAddress = this.ctx.channel().remoteAddress();
        if (errorResponse.isFatalError()) {
            notifyAllChannelsOfErrorAndClose(new RemoteTransportException("Fatal error at remote task manager '" + remoteAddress + "'.", remoteAddress, errorResponse.cause));
            return;
        }
        RemoteInputChannel remoteInputChannel2 = this.inputChannels.get(errorResponse.receiverId);
        if (remoteInputChannel2 != null) {
            if (errorResponse.cause.getClass() == PartitionNotFoundException.class) {
                remoteInputChannel2.onFailedPartitionRequest();
            } else {
                remoteInputChannel2.onError(new RemoteTransportException("Error at remote task manager '" + remoteAddress + "'.", remoteAddress, errorResponse.cause));
            }
        }
    }

    private void decodeBufferOrEvent(RemoteInputChannel remoteInputChannel, NettyMessage.BufferResponse bufferResponse) throws Throwable {
        if (bufferResponse.isBuffer() && bufferResponse.bufferSize == 0) {
            remoteInputChannel.onEmptyBuffer(bufferResponse.sequenceNumber, bufferResponse.backlog);
        } else {
            if (bufferResponse.getBuffer() == null) {
                throw new IllegalStateException("The read buffer is null in credit-based input channel.");
            }
            remoteInputChannel.onBuffer(bufferResponse.getBuffer(), bufferResponse.sequenceNumber, bufferResponse.backlog);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeAndFlushNextMessageIfPossible(Channel channel) {
        ClientOutboundMessage poll;
        if (this.channelError.get() != null || !channel.isWritable()) {
            return;
        }
        do {
            poll = this.clientOutboundMessages.poll();
            if (poll == null) {
                return;
            }
        } while (poll.inputChannel.isReleased());
        channel.writeAndFlush(poll.buildMessage()).addListener(this.writeListener);
    }
}
