/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.List;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.PrepareCommitOperator;
import org.apache.flink.table.store.connector.sink.StoreSinkWrite;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.OffsetRowData;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.util.Preconditions;

public class StoreCompactOperator
extends PrepareCommitOperator {
    private final FileStoreTable table;
    private final StoreSinkWrite.Provider storeSinkWriteProvider;
    private final boolean isStreaming;
    private transient StoreSinkWrite write;
    private transient RowDataSerializer partitionSerializer;
    private transient OffsetRowData reusedPartition;
    private transient DataFileMetaSerializer dataFileMetaSerializer;

    public StoreCompactOperator(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, boolean isStreaming) {
        Preconditions.checkArgument((!table.options().writeOnly() ? 1 : 0) != 0, (Object)(CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator."));
        this.table = table;
        this.storeSinkWriteProvider = storeSinkWriteProvider;
        this.isStreaming = isStreaming;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.write = this.storeSinkWriteProvider.provide(this.table, context, this.getContainingTask().getEnvironment().getIOManager());
    }

    public void open() throws Exception {
        super.open();
        this.partitionSerializer = new RowDataSerializer(this.table.schema().logicalPartitionType());
        this.reusedPartition = new OffsetRowData(this.partitionSerializer.getArity(), 1);
        this.dataFileMetaSerializer = new DataFileMetaSerializer();
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData record = (RowData)element.getValue();
        long snapshotId = record.getLong(0);
        this.reusedPartition.replace(record);
        BinaryRowData partition = this.partitionSerializer.toBinaryRow((RowData)this.reusedPartition).copy();
        int bucket = record.getInt(this.partitionSerializer.getArity() + 1);
        byte[] serializedFiles = record.getBinary(this.partitionSerializer.getArity() + 2);
        List<DataFileMeta> files = this.dataFileMetaSerializer.deserializeList(serializedFiles);
        if (this.isStreaming) {
            this.write.notifyNewFiles(snapshotId, partition, bucket, files);
            this.write.compact(partition, bucket, false);
        } else {
            Preconditions.checkArgument((boolean)files.isEmpty(), (Object)"Batch compact job does not concern what files are compacted. They only need to know what buckets are compacted.");
            this.write.compact(partition, bucket, true);
        }
    }

    @Override
    protected List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException {
        return this.write.prepareCommit(doCompaction, checkpointId);
    }
}

