/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;

import java.io.Externalizable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle.ChronicleMQTailer;

public class ChronicleCompoundMQTailer<M extends Externalizable>
implements MQTailer<M> {
    private final List<ChronicleMQTailer<M>> tailers = new ArrayList<ChronicleMQTailer<M>>();
    private final String group;
    private final int size;
    private final List<MQPartition> mqPartitions = new ArrayList<MQPartition>();
    private boolean closed = false;
    private long counter = 0L;

    public ChronicleCompoundMQTailer(Collection<ChronicleMQTailer<M>> tailers, String group) {
        this.tailers.addAll(tailers);
        this.group = group;
        this.size = tailers.size();
        tailers.forEach(partition -> this.mqPartitions.addAll(partition.assignments()));
    }

    @Override
    public MQRecord<M> read(Duration timeout) throws InterruptedException {
        MQRecord<M> ret = this.read();
        if (ret != null) {
            return ret;
        }
        long timeoutMs = timeout.toMillis();
        long deadline = System.currentTimeMillis() + timeoutMs;
        long delay = Math.min(100L, timeoutMs);
        while (ret == null && System.currentTimeMillis() < deadline) {
            Thread.sleep(delay);
            ret = this.read();
        }
        return ret;
    }

    private MQRecord<M> read() {
        if (this.size <= 0) {
            return null;
        }
        long end = this.counter + (long)this.size;
        do {
            ++this.counter;
            int i = (int)this.counter % this.size;
            MQRecord<M> ret = this.tailers.get(i).read();
            if (ret == null) continue;
            return ret;
        } while (this.counter < end);
        return null;
    }

    @Override
    public MQOffset commit(MQPartition partition) {
        for (MQTailer mQTailer : this.tailers) {
            if (!mQTailer.assignments().contains(partition)) continue;
            return mQTailer.commit(partition);
        }
        throw new IllegalArgumentException("No tailer matching: " + partition);
    }

    @Override
    public void commit() {
        this.tailers.forEach(MQTailer::commit);
    }

    @Override
    public void toEnd() {
        this.tailers.forEach(ChronicleMQTailer::toEnd);
    }

    @Override
    public void toStart() {
        this.tailers.forEach(ChronicleMQTailer::toStart);
    }

    @Override
    public void toLastCommitted() {
        this.tailers.forEach(ChronicleMQTailer::toLastCommitted);
    }

    @Override
    public Collection<MQPartition> assignments() {
        return this.mqPartitions;
    }

    @Override
    public String group() {
        return this.group;
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    public void seek(MQPartition partition, MQOffset offset) {
        for (MQTailer mQTailer : this.tailers) {
            if (!mQTailer.assignments().contains(partition)) continue;
            ((ChronicleMQTailer)mQTailer).seek(partition, offset);
            return;
        }
    }

    @Override
    public void close() throws Exception {
        for (ChronicleMQTailer<M> tailer : this.tailers) {
            tailer.close();
        }
        this.closed = true;
    }
}

