/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.BatchWriteGeneratorTagOperator;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorTest;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class BatchWriteGeneratorTagOperatorTest
extends CommitterOperatorTest {
    @Test
    public void testBatchWriteGeneratorTag() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        HashMap<String, String> dynamicOptions = new HashMap<String, String>();
        dynamicOptions.put("tag.automatic-creation", "batch");
        dynamicOptions.put("tag.num-retained-max", "2");
        table = table.copy(dynamicOptions);
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        OneInputStreamOperator<Committable, Committable> committerOperator = this.createCommitterOperator(table, this.initialCommitUser, (CommittableStateManager<ManifestCommittable>)new RestoreAndFailCommittableStateManager((SerializableSupplier & Serializable)() -> new VersionedSerializerWrapper((VersionedSerializer)new ManifestCommittableSerializer())));
        committerOperator.open();
        TableCommitImpl tableCommit = table.newCommit(this.initialCommitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        tableCommit.commit(write.prepareCommit(false, 1L));
        SnapshotManager snapshotManager = table.newSnapshotReader().snapshotManager();
        TagManager tagManager = table.tagManager();
        String prefix = "batch-write-";
        Instant instant = Instant.ofEpochMilli(Objects.requireNonNull(snapshotManager.latestSnapshot()).timeMillis());
        LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
        String tagName = prefix + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(0L);
        committerOperator.finish();
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(1L);
        Assertions.assertThat((Object)tagManager.taggedSnapshot(tagName)).isEqualTo((Object)snapshotManager.latestSnapshot());
        table.createTag("many-tags-test1");
        Thread.sleep(1000L);
        table.createTag("many-tags-test2");
        Assertions.assertThat((long)tagManager.tagCount()).isEqualTo(3L);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
        tableCommit = table.newCommit(this.initialCommitUser);
        tableCommit.commit(write.prepareCommit(false, 2L));
        committerOperator.finish();
        Assertions.assertThat((List)tagManager.allTagNames()).containsOnly((Object[])new String[]{"many-tags-test2", tagName});
    }

    @Override
    protected OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable table, String commitUser, CommittableStateManager<ManifestCommittable> committableStateManager) {
        return new BatchWriteGeneratorTagOperator((CommitterOperator)super.createCommitterOperator(table, commitUser, committableStateManager), table);
    }
}

