package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapReducingState.class */
public class HeapReducingState<K, N, V> extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>> implements InternalReducingState<N, V> {
    private final ReduceTransformation<V> reduceTransformation;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapReducingState$ReduceTransformation.class */
    static final class ReduceTransformation<V> implements StateTransformationFunction<V, V> {
        private final ReduceFunction<V> reduceFunction;

        ReduceTransformation(ReduceFunction<V> reduceFunction) {
            this.reduceFunction = (ReduceFunction) Preconditions.checkNotNull(reduceFunction);
        }

        @Override // org.apache.flink.runtime.state.StateTransformationFunction
        public V apply(V v, V v2) throws Exception {
            return v != null ? (V) this.reduceFunction.reduce(v, v2) : v2;
        }
    }

    public HeapReducingState(ReducingStateDescriptor<V> reducingStateDescriptor, StateTable<K, N, V> stateTable, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2) {
        super(reducingStateDescriptor, stateTable, typeSerializer, typeSerializer2);
        this.reduceTransformation = new ReduceTransformation<>(reducingStateDescriptor.getReduceFunction());
    }

    public V get() {
        return (V) this.stateTable.get(this.currentNamespace);
    }

    public void add(V v) throws IOException {
        if (v == null) {
            clear();
            return;
        }
        try {
            this.stateTable.transform(this.currentNamespace, v, this.reduceTransformation);
        } catch (Exception e) {
            throw new IOException("Exception while applying ReduceFunction in reducing state", e);
        }
    }

    @Override // org.apache.flink.runtime.state.heap.AbstractHeapMergingState
    protected V mergeState(V v, V v2) throws Exception {
        return this.reduceTransformation.apply(v, v2);
    }
}
