package org.apache.flink.runtime.state;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.TernaryBoolean;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.class */
class IncrementalRemoteKeyedStateHandleTest {
    IncrementalRemoteKeyedStateHandleTest() {
    }

    @Test
    void testUnregisteredDiscarding() throws Exception {
        IncrementalRemoteKeyedStateHandle create = create(new Random(42L));
        create.discardState();
        Iterator it = create.getPrivateState().iterator();
        while (it.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it.next()).getHandle(), TernaryBoolean.TRUE);
        }
        Iterator it2 = create.getSharedState().iterator();
        while (it2.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it2.next()).getHandle(), TernaryBoolean.TRUE);
        }
        ((StreamStateHandle) Mockito.verify(create.getMetaDataStateHandle())).discardState();
    }

    @Test
    void testSharedStateDeRegistration() throws Exception {
        SharedStateRegistry sharedStateRegistry = (SharedStateRegistry) Mockito.spy(new SharedStateRegistryImpl());
        IncrementalRemoteKeyedStateHandle create = create(new Random(42L));
        IncrementalRemoteKeyedStateHandle create2 = create(new Random(42L));
        Iterator it = create.getSharedState().iterator();
        while (it.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it.next()).getHandle(), TernaryBoolean.FALSE);
        }
        Iterator it2 = create2.getSharedState().iterator();
        while (it2.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it2.next()).getHandle(), TernaryBoolean.FALSE);
        }
        create.registerSharedStates(sharedStateRegistry, 0L);
        sharedStateRegistry.checkpointCompleted(0L);
        create2.registerSharedStates(sharedStateRegistry, 0L);
        Iterator it3 = create.getSharedState().iterator();
        while (it3.hasNext()) {
            StreamStateHandle handle = ((IncrementalKeyedStateHandle.HandleAndLocalPath) it3.next()).getHandle();
            ((SharedStateRegistry) Mockito.verify(sharedStateRegistry, Mockito.times(2))).registerReference(SharedStateRegistryKey.forStreamStateHandle(handle), handle, 0L);
        }
        Iterator it4 = create2.getSharedState().iterator();
        while (it4.hasNext()) {
            StreamStateHandle handle2 = ((IncrementalKeyedStateHandle.HandleAndLocalPath) it4.next()).getHandle();
            ((SharedStateRegistry) Mockito.verify(sharedStateRegistry, Mockito.times(2))).registerReference(SharedStateRegistryKey.forStreamStateHandle(handle2), handle2, 0L);
        }
        create.discardState();
        Iterator it5 = create.getSharedState().iterator();
        while (it5.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it5.next()).getHandle(), TernaryBoolean.FALSE);
        }
        Iterator it6 = create2.getSharedState().iterator();
        while (it6.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it6.next()).getHandle(), TernaryBoolean.FALSE);
        }
        Iterator it7 = create.getPrivateState().iterator();
        while (it7.hasNext()) {
            ((StreamStateHandle) Mockito.verify(((IncrementalKeyedStateHandle.HandleAndLocalPath) it7.next()).getHandle(), Mockito.times(1))).discardState();
        }
        Iterator it8 = create2.getPrivateState().iterator();
        while (it8.hasNext()) {
            ((StreamStateHandle) Mockito.verify(((IncrementalKeyedStateHandle.HandleAndLocalPath) it8.next()).getHandle(), Mockito.times(0))).discardState();
        }
        ((StreamStateHandle) Mockito.verify(create.getMetaDataStateHandle(), Mockito.times(1))).discardState();
        ((StreamStateHandle) Mockito.verify(create2.getMetaDataStateHandle(), Mockito.times(0))).discardState();
        create2.discardState();
        sharedStateRegistry.unregisterUnusedState(Long.MAX_VALUE);
        Iterator it9 = create.getSharedState().iterator();
        while (it9.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it9.next()).getHandle(), TernaryBoolean.TRUE);
        }
        Iterator it10 = create2.getSharedState().iterator();
        while (it10.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it10.next()).getHandle(), TernaryBoolean.TRUE);
        }
        ((StreamStateHandle) Mockito.verify(create.getMetaDataStateHandle(), Mockito.times(1))).discardState();
        ((StreamStateHandle) Mockito.verify(create2.getMetaDataStateHandle(), Mockito.times(1))).discardState();
    }

    @Test
    void testSharedStateReRegistration() throws Exception {
        SharedStateRegistry sharedStateRegistry = (SharedStateRegistry) Mockito.spy(new SharedStateRegistryImpl());
        IncrementalRemoteKeyedStateHandle create = create(new Random(1L));
        IncrementalRemoteKeyedStateHandle create2 = create(new Random(2L));
        IncrementalRemoteKeyedStateHandle create3 = create(new Random(3L));
        create.registerSharedStates(sharedStateRegistry, 0L);
        create2.registerSharedStates(sharedStateRegistry, 0L);
        create3.registerSharedStates(sharedStateRegistry, 0L);
        Assertions.assertThatThrownBy(() -> {
            create.registerSharedStates(sharedStateRegistry, 0L);
        }).withFailMessage("Should not be able to register twice with the same registry.", new Object[0]).isInstanceOf(IllegalStateException.class);
        create3.discardState();
        ((StreamStateHandle) Mockito.verify(create3.getMetaDataStateHandle(), Mockito.times(1))).discardState();
        sharedStateRegistry.close();
        Assertions.assertThatThrownBy(() -> {
            create(new Random(4L)).registerSharedStates(sharedStateRegistry, 0L);
        }).withFailMessage("Should not be able to register new state to closed registry.", new Object[0]).isInstanceOf(IllegalStateException.class);
        create2.discardState();
        ((StreamStateHandle) Mockito.verify(create2.getMetaDataStateHandle(), Mockito.times(1))).discardState();
        ((StreamStateHandle) Mockito.verify(create.getMetaDataStateHandle(), Mockito.never())).discardState();
        SharedStateRegistry sharedStateRegistry2 = (SharedStateRegistry) Mockito.spy(new SharedStateRegistryImpl());
        create.registerSharedStates(sharedStateRegistry2, 0L);
        create.discardState();
        ((StreamStateHandle) Mockito.verify(create.getMetaDataStateHandle(), Mockito.times(1))).discardState();
        sharedStateRegistry2.unregisterUnusedState(1L);
        Iterator it = create.getSharedState().iterator();
        while (it.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it.next()).getHandle(), TernaryBoolean.TRUE);
        }
        Iterator it2 = create2.getSharedState().iterator();
        while (it2.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it2.next()).getHandle(), TernaryBoolean.FALSE);
        }
        Iterator it3 = create3.getSharedState().iterator();
        while (it3.hasNext()) {
            DiscardRecordedStateObject.verifyDiscard(((IncrementalKeyedStateHandle.HandleAndLocalPath) it3.next()).getHandle(), TernaryBoolean.FALSE);
        }
        sharedStateRegistry2.close();
    }

    @Test
    void testCheckpointedSize() {
        IncrementalRemoteKeyedStateHandle create = create(ThreadLocalRandom.current());
        Assertions.assertThat(create.getCheckpointedSize()).isEqualTo(create.getStateSize());
        Assertions.assertThat(create(ThreadLocalRandom.current(), 123L).getCheckpointedSize()).isEqualTo(123L);
    }

    @Test
    void testNonEmptyIntersection() {
        IncrementalRemoteKeyedStateHandle create = create(ThreadLocalRandom.current());
        KeyedStateHandle intersection = create.getIntersection(new KeyGroupRange(0, 3));
        Assertions.assertThat(intersection).isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
        Assertions.assertThat(intersection.getStateHandleId()).isEqualTo(create.getStateHandleId());
    }

    @Test
    void testCollectSizeStats() {
        final IncrementalRemoteKeyedStateHandle create = create(ThreadLocalRandom.current());
        StateObject.StateObjectSizeStatsCollector create2 = StateObject.StateObjectSizeStatsCollector.create();
        create.collectSizeStats(create2);
        org.junit.jupiter.api.Assertions.assertEquals(new HashMap<StateObject.StateObjectLocation, Long>() { // from class: org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.1
            {
                put(StateObject.StateObjectLocation.LOCAL_MEMORY, Long.valueOf(create.getStateSize()));
            }
        }, create2.getStats());
    }

    @Test
    void testConcurrentCheckpointSharedStateRegistration() throws Exception {
        ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle("file-1", new byte[]{115});
        ByteStreamStateHandle byteStreamStateHandle2 = new ByteStreamStateHandle("file-2", new byte[]{115});
        SharedStateRegistryImpl sharedStateRegistryImpl = new SharedStateRegistryImpl();
        UUID randomUUID = UUID.randomUUID();
        IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(randomUUID, KeyGroupRange.of(0, 0), 1L, placeSpies(Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of(byteStreamStateHandle, "1.sst"))), Collections.emptyList(), new ByteStreamStateHandle("", new byte[]{115}));
        incrementalRemoteKeyedStateHandle.registerSharedStates(sharedStateRegistryImpl, incrementalRemoteKeyedStateHandle.getCheckpointId());
        IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle2 = new IncrementalRemoteKeyedStateHandle(randomUUID, KeyGroupRange.of(0, 0), 2L, placeSpies(Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of(byteStreamStateHandle2, "1.sst"))), Collections.emptyList(), new ByteStreamStateHandle("", new byte[]{115}));
        incrementalRemoteKeyedStateHandle2.registerSharedStates(sharedStateRegistryImpl, incrementalRemoteKeyedStateHandle2.getCheckpointId());
        sharedStateRegistryImpl.checkpointCompleted(1L);
        incrementalRemoteKeyedStateHandle2.discardState();
        Iterator it = incrementalRemoteKeyedStateHandle.getSharedState().iterator();
        while (it.hasNext()) {
            ((StreamStateHandle) Mockito.verify(((IncrementalKeyedStateHandle.HandleAndLocalPath) it.next()).getHandle(), Mockito.never())).discardState();
        }
        Iterator it2 = incrementalRemoteKeyedStateHandle2.getSharedState().iterator();
        while (it2.hasNext()) {
            ((StreamStateHandle) Mockito.verify(((IncrementalKeyedStateHandle.HandleAndLocalPath) it2.next()).getHandle(), Mockito.never())).discardState();
        }
        sharedStateRegistryImpl.close();
    }

    private static IncrementalRemoteKeyedStateHandle create(Random random) {
        return new IncrementalRemoteKeyedStateHandle(UUID.nameUUIDFromBytes("test".getBytes(StandardCharsets.UTF_8)), KeyGroupRange.of(0, 0), 1L, CheckpointTestUtils.createRandomHandleAndLocalPathList(random), placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(random)), (StreamStateHandle) Mockito.spy(CheckpointTestUtils.createDummyStreamStateHandle(random, null)));
    }

    private static IncrementalRemoteKeyedStateHandle create(Random random, long j) {
        return new IncrementalRemoteKeyedStateHandle(UUID.nameUUIDFromBytes("test".getBytes()), KeyGroupRange.of(0, 0), 1L, CheckpointTestUtils.createRandomHandleAndLocalPathList(random), placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(random)), (StreamStateHandle) Mockito.spy(CheckpointTestUtils.createDummyStreamStateHandle(random, null)), j);
    }

    private static List<IncrementalKeyedStateHandle.HandleAndLocalPath> placeSpies(List<IncrementalKeyedStateHandle.HandleAndLocalPath> list) {
        return (List) list.stream().map(handleAndLocalPath -> {
            return IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle) Mockito.spy(handleAndLocalPath.getHandle()), handleAndLocalPath.getLocalPath());
        }).collect(Collectors.toList());
    }
}
