/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.enumerator.initializer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.util.Preconditions;

class SpecifiedOffsetsInitializer
implements OffsetsInitializer,
OffsetsInitializerValidator {
    private static final long serialVersionUID = 1649702397250402877L;
    private final Map<TopicPartition, Long> initialOffsets;
    private final OffsetResetStrategy offsetResetStrategy;

    SpecifiedOffsetsInitializer(Map<TopicPartition, Long> initialOffsets, OffsetResetStrategy offsetResetStrategy) {
        this.initialOffsets = Collections.unmodifiableMap(initialOffsets);
        this.offsetResetStrategy = offsetResetStrategy;
    }

    @Override
    public Map<TopicPartition, Long> getPartitionOffsets(Collection<TopicPartition> partitions, OffsetsInitializer.PartitionOffsetsRetriever partitionOffsetsRetriever) {
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        ArrayList<TopicPartition> toLookup = new ArrayList<TopicPartition>();
        for (TopicPartition tp : partitions) {
            Long offset = this.initialOffsets.get(tp);
            if (offset == null) {
                toLookup.add(tp);
                continue;
            }
            offsets.put(tp, offset);
        }
        if (!toLookup.isEmpty()) {
            Map<TopicPartition, Long> committedOffsets = partitionOffsetsRetriever.committedOffsets(toLookup);
            offsets.putAll(committedOffsets);
            toLookup.removeAll(committedOffsets.keySet());
            switch (this.offsetResetStrategy) {
                case EARLIEST: {
                    offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup));
                    break;
                }
                case LATEST: {
                    offsets.putAll(partitionOffsetsRetriever.endOffsets(toLookup));
                    break;
                }
                default: {
                    throw new IllegalStateException("Cannot find initial offsets for partitions: " + toLookup);
                }
            }
        }
        return offsets;
    }

    @Override
    public OffsetResetStrategy getAutoOffsetResetStrategy() {
        return this.offsetResetStrategy;
    }

    @Override
    public void validate(Properties kafkaSourceProperties) {
        this.initialOffsets.forEach((tp, offset) -> {
            if (offset == -3L) {
                Preconditions.checkState((boolean)kafkaSourceProperties.containsKey("group.id"), (Object)String.format("Property %s is required because partition %s is initialized with committed offset", "group.id", tp));
            }
        });
    }
}

