/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.AbstractBinaryExternalMerger;
import org.apache.flink.table.runtime.operators.sort.ChannelReaderKVInputViewIterator;
import org.apache.flink.table.runtime.operators.sort.SpillChannelManager;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.util.MutableObjectIterator;

public class BinaryKVExternalMerger
extends AbstractBinaryExternalMerger<Tuple2<BinaryRowData, BinaryRowData>> {
    private final BinaryRowDataSerializer keySerializer;
    private final BinaryRowDataSerializer valueSerializer;
    private final RecordComparator comparator;

    public BinaryKVExternalMerger(IOManager ioManager, int pageSize, int maxFanIn, SpillChannelManager channelManager, BinaryRowDataSerializer keySerializer, BinaryRowDataSerializer valueSerializer, RecordComparator comparator, boolean compressionEnable, BlockCompressionFactory compressionCodecFactory, int compressionBlockSize) {
        super(ioManager, pageSize, maxFanIn, channelManager, compressionEnable, compressionCodecFactory, compressionBlockSize);
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        this.comparator = comparator;
    }

    @Override
    protected List<Tuple2<BinaryRowData, BinaryRowData>> mergeReusedEntries(int size) {
        ArrayList<Tuple2<BinaryRowData, BinaryRowData>> reused = new ArrayList<Tuple2<BinaryRowData, BinaryRowData>>(size);
        for (int i = 0; i < size; ++i) {
            reused.add(new Tuple2<BinaryRowData, BinaryRowData>(this.keySerializer.createInstance(), this.valueSerializer.createInstance()));
        }
        return reused;
    }

    @Override
    protected MutableObjectIterator<Tuple2<BinaryRowData, BinaryRowData>> channelReaderInputViewIterator(AbstractChannelReaderInputView inView) {
        return new ChannelReaderKVInputViewIterator<BinaryRowData, BinaryRowData>(inView, null, this.keySerializer.duplicate(), this.valueSerializer.duplicate());
    }

    @Override
    protected Comparator<Tuple2<BinaryRowData, BinaryRowData>> mergeComparator() {
        return (o1, o2) -> this.comparator.compare((RowData)o1.f0, (RowData)o2.f0);
    }

    @Override
    protected void writeMergingOutput(MutableObjectIterator<Tuple2<BinaryRowData, BinaryRowData>> mergeIterator, AbstractPagedOutputView output) throws IOException {
        Tuple2<BinaryRowData, BinaryRowData> kv = new Tuple2<BinaryRowData, BinaryRowData>(this.keySerializer.createInstance(), this.valueSerializer.createInstance());
        while ((kv = mergeIterator.next(kv)) != null) {
            this.keySerializer.serialize((BinaryRowData)kv.f0, (DataOutputView)output);
            this.valueSerializer.serialize((BinaryRowData)kv.f1, (DataOutputView)output);
        }
    }
}

