/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.interval;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
import org.apache.flink.table.runtime.operators.join.interval.EmitAwareCollector;
import org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class TimeIntervalJoin
extends KeyedCoProcessFunction<RowData, RowData, RowData, RowData> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TimeIntervalJoin.class);
    private final FlinkJoinType joinType;
    protected final long leftRelativeSize;
    protected final long rightRelativeSize;
    private final long minCleanUpInterval;
    protected final long allowedLateness;
    private final InternalTypeInfo<RowData> leftType;
    private final InternalTypeInfo<RowData> rightType;
    private final IntervalJoinFunction joinFunction;
    private transient OuterJoinPaddingUtil paddingUtil;
    private transient EmitAwareCollector joinCollector;
    private transient MapState<Long, List<Tuple2<RowData, Boolean>>> leftCache;
    private transient MapState<Long, List<Tuple2<RowData, Boolean>>> rightCache;
    private transient ValueState<Long> leftTimerState;
    private transient ValueState<Long> rightTimerState;
    private long leftExpirationTime = 0L;
    private long rightExpirationTime = 0L;
    protected long leftOperatorTime = 0L;
    protected long rightOperatorTime = 0L;

    TimeIntervalJoin(FlinkJoinType joinType, long leftLowerBound, long leftUpperBound, long allowedLateness, InternalTypeInfo<RowData> leftType, InternalTypeInfo<RowData> rightType, IntervalJoinFunction joinFunc) {
        this.joinType = joinType;
        this.leftRelativeSize = -leftLowerBound;
        this.rightRelativeSize = leftUpperBound;
        this.minCleanUpInterval = (this.leftRelativeSize + this.rightRelativeSize) / 2L;
        if (allowedLateness < 0L) {
            throw new IllegalArgumentException("The allowed lateness must be non-negative.");
        }
        this.allowedLateness = allowedLateness;
        this.leftType = leftType;
        this.rightType = rightType;
        this.joinFunction = joinFunc;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        this.joinFunction.setRuntimeContext(this.getRuntimeContext());
        this.joinFunction.open(parameters);
        this.joinCollector = new EmitAwareCollector();
        ListTypeInfo leftRowListTypeInfo = new ListTypeInfo(new TupleTypeInfo(this.leftType, BasicTypeInfo.BOOLEAN_TYPE_INFO));
        MapStateDescriptor leftMapStateDescriptor = new MapStateDescriptor("IntervalJoinLeftCache", (TypeInformation<Long>)BasicTypeInfo.LONG_TYPE_INFO, leftRowListTypeInfo);
        this.leftCache = this.getRuntimeContext().getMapState(leftMapStateDescriptor);
        ListTypeInfo rightRowListTypeInfo = new ListTypeInfo(new TupleTypeInfo(this.rightType, BasicTypeInfo.BOOLEAN_TYPE_INFO));
        MapStateDescriptor rightMapStateDescriptor = new MapStateDescriptor("IntervalJoinRightCache", (TypeInformation<Long>)BasicTypeInfo.LONG_TYPE_INFO, rightRowListTypeInfo);
        this.rightCache = this.getRuntimeContext().getMapState(rightMapStateDescriptor);
        ValueStateDescriptor<Long> leftValueStateDescriptor = new ValueStateDescriptor<Long>("IntervalJoinLeftTimerState", Long.class);
        this.leftTimerState = this.getRuntimeContext().getState(leftValueStateDescriptor);
        ValueStateDescriptor<Long> rightValueStateDescriptor = new ValueStateDescriptor<Long>("IntervalJoinRightTimerState", Long.class);
        this.rightTimerState = this.getRuntimeContext().getState(rightValueStateDescriptor);
        this.paddingUtil = new OuterJoinPaddingUtil(this.leftType.toRowSize(), this.rightType.toRowSize());
    }

    @Override
    public void close() throws Exception {
        if (this.joinFunction != null) {
            this.joinFunction.close();
        }
    }

    @Override
    public void processElement1(RowData leftRow, KeyedCoProcessFunction.Context ctx, Collector<RowData> out) throws Exception {
        this.joinFunction.setJoinKey((RowData)ctx.getCurrentKey());
        this.joinCollector.setInnerCollector(out);
        this.updateOperatorTime(ctx);
        long timeForLeftRow = this.getTimeForLeftStream(ctx, leftRow);
        long rightQualifiedLowerBound = timeForLeftRow - this.rightRelativeSize;
        long rightQualifiedUpperBound = timeForLeftRow + this.leftRelativeSize;
        boolean emitted = false;
        if (this.rightExpirationTime < rightQualifiedUpperBound) {
            this.rightExpirationTime = this.calExpirationTime(this.leftOperatorTime, this.rightRelativeSize);
            Iterator<Map.Entry<Long, List<Tuple2<RowData, Boolean>>>> rightIterator = this.rightCache.iterator();
            while (rightIterator.hasNext()) {
                List<Tuple2<RowData, Boolean>> rightRows;
                Map.Entry<Long, List<Tuple2<RowData, Boolean>>> rightEntry = rightIterator.next();
                Long rightTime = rightEntry.getKey();
                if (rightTime >= rightQualifiedLowerBound && rightTime <= rightQualifiedUpperBound) {
                    rightRows = rightEntry.getValue();
                    boolean entryUpdated = false;
                    for (Tuple2<RowData, Boolean> tuple2 : rightRows) {
                        this.joinCollector.reset();
                        this.joinFunction.join(leftRow, (RowData)tuple2.f0, (Collector<RowData>)this.joinCollector);
                        boolean bl = emitted = emitted || this.joinCollector.isEmitted();
                        if (!this.joinType.isRightOuter() || ((Boolean)tuple2.f1).booleanValue() || !this.joinCollector.isEmitted()) continue;
                        tuple2.f1 = true;
                        entryUpdated = true;
                    }
                    if (entryUpdated) {
                        rightEntry.setValue(rightRows);
                    }
                }
                if (rightTime > this.rightExpirationTime) continue;
                if (this.joinType.isRightOuter()) {
                    rightRows = rightEntry.getValue();
                    rightRows.forEach(tuple -> {
                        if (!((Boolean)tuple.f1).booleanValue()) {
                            this.joinCollector.collect(this.paddingUtil.padRight((RowData)tuple.f0));
                        }
                    });
                }
                rightIterator.remove();
            }
        }
        if (this.rightOperatorTime < rightQualifiedUpperBound) {
            List<Tuple2<RowData, Boolean>> leftRowList = this.leftCache.get(timeForLeftRow);
            if (leftRowList == null) {
                leftRowList = new ArrayList<Tuple2<RowData, Boolean>>(1);
            }
            leftRowList.add(Tuple2.of(leftRow, emitted));
            this.leftCache.put(timeForLeftRow, leftRowList);
            if (this.rightTimerState.value() == null) {
                this.registerCleanUpTimer(ctx, timeForLeftRow, true);
            }
        } else if (!emitted && this.joinType.isLeftOuter()) {
            this.joinCollector.collect(this.paddingUtil.padLeft(leftRow));
        }
    }

    @Override
    public void processElement2(RowData rightRow, KeyedCoProcessFunction.Context ctx, Collector<RowData> out) throws Exception {
        this.joinFunction.setJoinKey((RowData)ctx.getCurrentKey());
        this.joinCollector.setInnerCollector(out);
        this.updateOperatorTime(ctx);
        long timeForRightRow = this.getTimeForRightStream(ctx, rightRow);
        long leftQualifiedLowerBound = timeForRightRow - this.leftRelativeSize;
        long leftQualifiedUpperBound = timeForRightRow + this.rightRelativeSize;
        boolean emitted = false;
        if (this.leftExpirationTime < leftQualifiedUpperBound) {
            this.leftExpirationTime = this.calExpirationTime(this.rightOperatorTime, this.leftRelativeSize);
            Iterator<Map.Entry<Long, List<Tuple2<RowData, Boolean>>>> leftIterator = this.leftCache.iterator();
            while (leftIterator.hasNext()) {
                List<Tuple2<RowData, Boolean>> leftRows;
                Map.Entry<Long, List<Tuple2<RowData, Boolean>>> leftEntry = leftIterator.next();
                Long leftTime = leftEntry.getKey();
                if (leftTime >= leftQualifiedLowerBound && leftTime <= leftQualifiedUpperBound) {
                    leftRows = leftEntry.getValue();
                    boolean entryUpdated = false;
                    for (Tuple2<RowData, Boolean> tuple2 : leftRows) {
                        this.joinCollector.reset();
                        this.joinFunction.join((RowData)tuple2.f0, rightRow, (Collector<RowData>)this.joinCollector);
                        boolean bl = emitted = emitted || this.joinCollector.isEmitted();
                        if (!this.joinType.isLeftOuter() || ((Boolean)tuple2.f1).booleanValue() || !this.joinCollector.isEmitted()) continue;
                        tuple2.f1 = true;
                        entryUpdated = true;
                    }
                    if (entryUpdated) {
                        leftEntry.setValue(leftRows);
                    }
                }
                if (leftTime > this.leftExpirationTime) continue;
                if (this.joinType.isLeftOuter()) {
                    leftRows = leftEntry.getValue();
                    leftRows.forEach(tuple -> {
                        if (!((Boolean)tuple.f1).booleanValue()) {
                            this.joinCollector.collect(this.paddingUtil.padLeft((RowData)tuple.f0));
                        }
                    });
                }
                leftIterator.remove();
            }
        }
        if (this.leftOperatorTime < leftQualifiedUpperBound) {
            List<Tuple2<RowData, Boolean>> rightRowList = this.rightCache.get(timeForRightRow);
            if (null == rightRowList) {
                rightRowList = new ArrayList<Tuple2<RowData, Boolean>>(1);
            }
            rightRowList.add(Tuple2.of(rightRow, emitted));
            this.rightCache.put(timeForRightRow, rightRowList);
            if (this.leftTimerState.value() == null) {
                this.registerCleanUpTimer(ctx, timeForRightRow, false);
            }
        } else if (!emitted && this.joinType.isRightOuter()) {
            this.joinCollector.collect(this.paddingUtil.padRight(rightRow));
        }
    }

    @Override
    public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<RowData> out) throws Exception {
        Long rightCleanUpTime;
        this.joinFunction.setJoinKey((RowData)ctx.getCurrentKey());
        this.joinCollector.setInnerCollector(out);
        this.updateOperatorTime(ctx);
        Long leftCleanUpTime = this.leftTimerState.value();
        if (leftCleanUpTime != null && timestamp == leftCleanUpTime) {
            this.rightExpirationTime = this.calExpirationTime(this.leftOperatorTime, this.rightRelativeSize);
            this.removeExpiredRows(this.joinCollector, this.rightExpirationTime, this.rightCache, this.leftTimerState, ctx, false);
        }
        if ((rightCleanUpTime = this.rightTimerState.value()) != null && timestamp == rightCleanUpTime) {
            this.leftExpirationTime = this.calExpirationTime(this.rightOperatorTime, this.leftRelativeSize);
            this.removeExpiredRows(this.joinCollector, this.leftExpirationTime, this.leftCache, this.rightTimerState, ctx, true);
        }
    }

    private long calExpirationTime(long operatorTime, long relativeSize) {
        if (operatorTime < Long.MAX_VALUE) {
            return operatorTime - relativeSize - this.allowedLateness - 1L;
        }
        return Long.MAX_VALUE;
    }

    private void registerCleanUpTimer(KeyedCoProcessFunction.Context ctx, long rowTime, boolean leftRow) throws IOException {
        if (leftRow) {
            long cleanUpTime = rowTime + this.leftRelativeSize + this.minCleanUpInterval + this.allowedLateness + 1L;
            this.registerTimer(ctx, cleanUpTime);
            this.rightTimerState.update(cleanUpTime);
        } else {
            long cleanUpTime = rowTime + this.rightRelativeSize + this.minCleanUpInterval + this.allowedLateness + 1L;
            this.registerTimer(ctx, cleanUpTime);
            this.leftTimerState.update(cleanUpTime);
        }
    }

    private void removeExpiredRows(Collector<RowData> collector, long expirationTime, MapState<Long, List<Tuple2<RowData, Boolean>>> rowCache, ValueState<Long> timerState, KeyedCoProcessFunction.OnTimerContext ctx, boolean removeLeft) throws Exception {
        Iterator<Map.Entry<Long, List<Tuple2<RowData, Boolean>>>> iterator = rowCache.iterator();
        long earliestTimestamp = -1L;
        while (iterator.hasNext()) {
            Map.Entry<Long, List<Tuple2<RowData, Boolean>>> entry = iterator.next();
            Long rowTime = entry.getKey();
            if (rowTime <= expirationTime) {
                List<Tuple2<RowData, Boolean>> rows;
                if (removeLeft && this.joinType.isLeftOuter()) {
                    rows = entry.getValue();
                    rows.forEach(tuple -> {
                        if (!((Boolean)tuple.f1).booleanValue()) {
                            collector.collect(this.paddingUtil.padLeft((RowData)tuple.f0));
                        }
                    });
                } else if (!removeLeft && this.joinType.isRightOuter()) {
                    rows = entry.getValue();
                    rows.forEach(tuple -> {
                        if (!((Boolean)tuple.f1).booleanValue()) {
                            collector.collect(this.paddingUtil.padRight((RowData)tuple.f0));
                        }
                    });
                }
                iterator.remove();
                continue;
            }
            if (rowTime >= earliestTimestamp && earliestTimestamp >= 0L) continue;
            earliestTimestamp = rowTime;
        }
        if (earliestTimestamp > 0L) {
            this.registerCleanUpTimer(ctx, earliestTimestamp, removeLeft);
        } else {
            timerState.clear();
            rowCache.clear();
        }
    }

    abstract void updateOperatorTime(KeyedCoProcessFunction.Context var1);

    abstract long getTimeForLeftStream(KeyedCoProcessFunction.Context var1, RowData var2);

    abstract long getTimeForRightStream(KeyedCoProcessFunction.Context var1, RowData var2);

    abstract void registerTimer(KeyedCoProcessFunction.Context var1, long var2);
}

