package org.apache.flink.streaming.api.operators.co;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.class */
public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>> implements TwoInputStreamOperator<IN1, IN2, OUT> {
    private static final long serialVersionUID = -1869740381935471752L;
    private long currentWatermark;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    private transient TimestampedCollector<OUT> collector;
    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
    private transient CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>.ReadWriteContextImpl rwContext;
    private transient CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>.ReadOnlyContextImpl rContext;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator$ReadOnlyContextImpl.class */
    private class ReadOnlyContextImpl extends BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final ProcessingTimeService timerService;
        private StreamRecord<IN1> element;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        ReadOnlyContextImpl(ExecutionConfig executionConfig, BroadcastProcessFunction<IN1, IN2, OUT> broadcastProcessFunction, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> map, ProcessingTimeService processingTimeService) {
            super();
            broadcastProcessFunction.getClass();
            this.config = (ExecutionConfig) Preconditions.checkNotNull(executionConfig);
            this.states = (Map) Preconditions.checkNotNull(map);
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        }

        void setElement(StreamRecord<IN1> streamRecord) {
            this.element = streamRecord;
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public Long timestamp() {
            Preconditions.checkState(this.element != null);
            if (this.element.hasTimestamp()) {
                return Long.valueOf(this.element.getTimestamp());
            }
            return null;
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public <X> void output(OutputTag<X> outputTag, X x) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null.");
            CoBroadcastWithNonKeyedOperator.this.output.collect(outputTag, new StreamRecord<>(x, this.element.getTimestamp()));
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public long currentProcessingTime() {
            return this.timerService.getCurrentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public long currentWatermark() {
            return CoBroadcastWithNonKeyedOperator.this.currentWatermark;
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.ReadOnlyContext
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) {
            Preconditions.checkNotNull(mapStateDescriptor);
            mapStateDescriptor.initializeSerializerUnlessSet(this.config);
            ReadOnlyBroadcastState<K, V> readOnlyBroadcastState = this.states.get(mapStateDescriptor);
            if (readOnlyBroadcastState == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return readOnlyBroadcastState;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator$ReadWriteContextImpl.class */
    private class ReadWriteContextImpl extends BroadcastProcessFunction.Context {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final ProcessingTimeService timerService;
        private StreamRecord<IN2> element;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        ReadWriteContextImpl(ExecutionConfig executionConfig, BroadcastProcessFunction<IN1, IN2, OUT> broadcastProcessFunction, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> map, ProcessingTimeService processingTimeService) {
            super();
            broadcastProcessFunction.getClass();
            this.config = (ExecutionConfig) Preconditions.checkNotNull(executionConfig);
            this.states = (Map) Preconditions.checkNotNull(map);
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        }

        void setElement(StreamRecord<IN2> streamRecord) {
            this.element = streamRecord;
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public Long timestamp() {
            Preconditions.checkState(this.element != null);
            return Long.valueOf(this.element.getTimestamp());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.Context
        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> mapStateDescriptor) {
            Preconditions.checkNotNull(mapStateDescriptor);
            mapStateDescriptor.initializeSerializerUnlessSet(this.config);
            BroadcastState<?, ?> broadcastState = this.states.get(mapStateDescriptor);
            if (broadcastState == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return broadcastState;
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public <X> void output(OutputTag<X> outputTag, X x) {
            Preconditions.checkArgument(outputTag != null, "OutputTag must not be null.");
            CoBroadcastWithNonKeyedOperator.this.output.collect(outputTag, new StreamRecord<>(x, this.element.getTimestamp()));
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public long currentProcessingTime() {
            return this.timerService.getCurrentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction.BaseContext
        public long currentWatermark() {
            return CoBroadcastWithNonKeyedOperator.this.currentWatermark;
        }
    }

    public CoBroadcastWithNonKeyedOperator(BroadcastProcessFunction<IN1, IN2, OUT> broadcastProcessFunction, List<MapStateDescriptor<?, ?>> list) {
        super(broadcastProcessFunction);
        this.currentWatermark = Long.MIN_VALUE;
        this.broadcastStateDescriptors = (List) Preconditions.checkNotNull(list);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector<>(this.output);
        this.broadcastStates = new HashMap(this.broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> mapStateDescriptor : this.broadcastStateDescriptors) {
            this.broadcastStates.put(mapStateDescriptor, getOperatorStateBackend().getBroadcastState(mapStateDescriptor));
        }
        this.rwContext = new ReadWriteContextImpl(getExecutionConfig(), this.userFunction, this.broadcastStates, getProcessingTimeService());
        this.rContext = new ReadOnlyContextImpl(getExecutionConfig(), this.userFunction, this.broadcastStates, getProcessingTimeService());
    }

    @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        this.collector.setTimestamp(streamRecord);
        this.rContext.setElement(streamRecord);
        this.userFunction.processElement(streamRecord.getValue(), this.rContext, this.collector);
        this.rContext.setElement(null);
    }

    @Override // org.apache.flink.streaming.api.operators.TwoInputStreamOperator
    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        this.collector.setTimestamp(streamRecord);
        this.rwContext.setElement(streamRecord);
        this.userFunction.processBroadcastElement(streamRecord.getValue(), this.rwContext, this.collector);
        this.rwContext.setElement(null);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }
}
