/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.admin.AdminClient;
import kafka.admin.AdminUtils;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.coordinator.group.GroupOverview;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.nuxeo.lib.stream.log.LogPartition;
import scala.collection.JavaConversions;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.List;

public class KafkaUtils
implements AutoCloseable {
    public static final String DEFAULT_ZK_SERVER = "localhost:2181";
    public static final int ZK_TIMEOUT_MS = 6000;
    public static final int ZK_CONNECTION_TIMEOUT_MS = 10000;
    private static final Log log = LogFactory.getLog(KafkaUtils.class);
    protected final ZkClient zkClient;
    protected final ZkUtils zkUtils;

    public KafkaUtils() {
        this(DEFAULT_ZK_SERVER);
    }

    public KafkaUtils(String zkServers) {
        log.debug((Object)("Init zkServers: " + zkServers));
        this.zkClient = KafkaUtils.createZkClient(zkServers);
        this.zkUtils = KafkaUtils.createZkUtils(zkServers, this.zkClient);
    }

    public static boolean kafkaDetected() {
        return KafkaUtils.kafkaDetected(DEFAULT_ZK_SERVER);
    }

    public static boolean kafkaDetected(String zkServers) {
        try {
            ZkClient tmp = new ZkClient(zkServers, 1000, 1000, (ZkSerializer)ZKStringSerializer$.MODULE$);
            tmp.close();
        }
        catch (ZkTimeoutException e) {
            return false;
        }
        return true;
    }

    protected static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) {
        return new ZkUtils(zkClient, new ZkConnection(zkServers), false);
    }

    protected static ZkClient createZkClient(String zkServers) {
        return new ZkClient(zkServers, 6000, 10000, (ZkSerializer)ZKStringSerializer$.MODULE$);
    }

    public static java.util.List<java.util.List<LogPartition>> rangeAssignments(int threads, java.util.Map<String, Integer> streams) {
        RangeAssignor assignor = new RangeAssignor();
        return KafkaUtils.assignments((PartitionAssignor)assignor, threads, streams);
    }

    public static java.util.List<java.util.List<LogPartition>> roundRobinAssignments(int threads, java.util.Map<String, Integer> streams) {
        RoundRobinAssignor assignor = new RoundRobinAssignor();
        return KafkaUtils.assignments((PartitionAssignor)assignor, threads, streams);
    }

    protected static java.util.List<java.util.List<LogPartition>> assignments(PartitionAssignor assignor, int threads, java.util.Map<String, Integer> streams) {
        ArrayList parts = new ArrayList();
        streams.forEach((streamName, size) -> parts.addAll(KafkaUtils.getPartsFor(streamName, size)));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        java.util.List streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
        for (int i = 0; i < threads; ++i) {
            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
        }
        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), Collections.emptySet());
        java.util.Map assignments = assignor.assign(cluster, subscriptions);
        ArrayList<java.util.List<LogPartition>> ret = new ArrayList<java.util.List<LogPartition>>(threads);
        for (int i = 0; i < threads; ++i) {
            ret.add(((PartitionAssignor.Assignment)assignments.get(String.valueOf(i))).partitions().stream().map(part -> new LogPartition(part.topic(), part.partition())).collect(Collectors.toList()));
        }
        return ret;
    }

    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
        ArrayList<PartitionInfo> ret = new ArrayList<PartitionInfo>();
        for (int i = 0; i < partitions; ++i) {
            ret.add(new PartitionInfo(topic, i, null, null, null));
        }
        return ret;
    }

    public void createTopicWithoutReplication(Properties properties, String topic, int partitions) {
        this.createTopic(properties, topic, partitions, (short)1);
    }

    public void createTopic(Properties properties, String topic, int partitions, short replicationFactor) {
        log.info((Object)("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor));
        if (AdminUtils.topicExists((ZkUtils)this.zkUtils, (String)topic)) {
            String msg = "Cannot create Topic already exists: " + topic;
            log.error((Object)msg);
            throw new IllegalArgumentException(msg);
        }
        try (org.apache.kafka.clients.admin.AdminClient client = org.apache.kafka.clients.admin.AdminClient.create((Properties)properties);){
            client.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
        }
    }

    public boolean topicExists(String topic) {
        return AdminUtils.topicExists((ZkUtils)this.zkUtils, (String)topic);
    }

    public java.util.List<String> listTopics() {
        Seq topics = this.zkUtils.getAllTopics();
        return JavaConversions.seqAsJavaList((Seq)topics);
    }

    public java.util.List<String> listConsumers(Properties props, String topic) {
        return this.listAllConsumers(props).stream().filter(consumer -> this.getConsumerTopics(props, (String)consumer).contains(topic)).collect(Collectors.toList());
    }

    protected java.util.List<String> getConsumerTopics(Properties props, String group) {
        AdminClient client = AdminClient.create((Properties)props);
        return JavaConversions.mapAsJavaMap((Map)client.listGroupOffsets(group)).keySet().stream().map(TopicPartition::topic).collect(Collectors.toList());
    }

    public java.util.List<String> listAllConsumers(Properties props) {
        ArrayList<String> ret = new ArrayList<String>();
        AdminClient client = AdminClient.create((Properties)props);
        List groups = client.listAllConsumerGroupsFlattened();
        for (GroupOverview group : groups) {
            if (group == null) continue;
            ret.add(group.groupId());
        }
        return ret;
    }

    public void markTopicForDeletion(String topic) {
        log.debug((Object)("mark topic for deletion: " + topic));
        AdminUtils.deleteTopic((ZkUtils)this.zkUtils, (String)topic);
    }

    /*
     * Exception decompiling
     */
    public int getNumberOfPartitions(Properties properties, String topic) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [7[CATCHBLOCK]], but top level block is 4[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public void resetConsumerStates(String topic) {
        log.debug((Object)"Resetting consumer states");
        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK((ZkUtils)this.zkUtils, (String)topic);
    }

    public Set<String> getBrokerEndPoints() {
        HashSet<String> ret = new HashSet<String>();
        Seq brokers = this.zkUtils.getAllBrokersInCluster();
        for (Broker broker : brokers) {
            if (broker == null) continue;
            Seq endPoints = broker.endPoints();
            for (EndPoint endPoint : endPoints) {
                ret.add(endPoint.connectionString());
            }
        }
        return ret;
    }

    public String getDefaultBootstrapServers() {
        return this.getBrokerEndPoints().stream().collect(Collectors.joining(","));
    }

    @Override
    public void close() {
        if (this.zkUtils != null) {
            this.zkUtils.close();
        }
        if (this.zkClient != null) {
            this.zkClient.close();
        }
        log.debug((Object)"Closed.");
    }
}

