package uk.ac.susx.mlcl.byblo.tasks;

import com.google.common.base.Objects;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import uk.ac.susx.mlcl.byblo.io.TokenPair;
import uk.ac.susx.mlcl.byblo.io.Weighted;
import uk.ac.susx.mlcl.byblo.weighings.impl.Step;
import uk.ac.susx.mlcl.lib.MiscUtil;
import uk.ac.susx.mlcl.lib.collect.Indexed;
import uk.ac.susx.mlcl.lib.collect.SparseDoubleVector;
import uk.ac.susx.mlcl.lib.events.ProgressReporting;
import uk.ac.susx.mlcl.lib.io.Chunk;
import uk.ac.susx.mlcl.lib.io.Chunker;
import uk.ac.susx.mlcl.lib.io.ObjectSink;
import uk.ac.susx.mlcl.lib.io.SeekableObjectSource;
import uk.ac.susx.mlcl.lib.tasks.Task;

/* loaded from: input_file:uk/ac/susx/mlcl/byblo/tasks/ThreadedApssTask.class */
public final class ThreadedApssTask<S> extends NaiveApssTask<S> {
    private static final Log LOG;
    private Class<? extends NaiveApssTask> innerAlgorithm;
    private static final int DEFAULT_NUM_THREADS;
    private int nThreads;
    private ExecutorService executor;
    private Queue<Future<? extends Task>> futureQueue;
    private Semaphore throttle;
    private long nChunks;
    private long queuedCount;
    private long completedCount;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ThreadedApssTask(SeekableObjectSource<Indexed<SparseDoubleVector>, S> seekableObjectSource, SeekableObjectSource<Indexed<SparseDoubleVector>, S> seekableObjectSource2, ObjectSink<Weighted<TokenPair>> objectSink) {
        super(seekableObjectSource, seekableObjectSource2, objectSink);
        this.innerAlgorithm = InvertedApssTask.class;
        this.nThreads = DEFAULT_NUM_THREADS;
        this.executor = null;
        this.futureQueue = new ArrayDeque();
        this.nChunks = 0L;
        this.queuedCount = 0L;
        this.completedCount = 0L;
        setNumThreads(DEFAULT_NUM_THREADS);
    }

    public ThreadedApssTask() {
        this.innerAlgorithm = InvertedApssTask.class;
        this.nThreads = DEFAULT_NUM_THREADS;
        this.executor = null;
        this.futureQueue = new ArrayDeque();
        this.nChunks = 0L;
        this.queuedCount = 0L;
        this.completedCount = 0L;
    }

    @Override // uk.ac.susx.mlcl.byblo.tasks.NaiveApssTask
    protected void buildPreCalcs() throws IOException {
    }

    public Class<? extends NaiveApssTask> getInnerAlgorithm() {
        return this.innerAlgorithm;
    }

    public void setInnerAlgorithm(Class<? extends NaiveApssTask> cls) {
        this.innerAlgorithm = cls;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uk.ac.susx.mlcl.byblo.tasks.NaiveApssTask, uk.ac.susx.mlcl.lib.tasks.AbstractTask
    public void initialiseTask() throws Exception {
        super.initialiseTask();
        this.executor = new ThreadPoolExecutor(this.nThreads, this.nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        this.futureQueue = new ArrayDeque();
        this.throttle = new Semaphore(getThrottleSize());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // uk.ac.susx.mlcl.byblo.tasks.NaiveApssTask, uk.ac.susx.mlcl.lib.tasks.AbstractTask
    protected void runTask() throws Exception {
        this.progress.startAdjusting();
        this.progress.setState(ProgressReporting.State.RUNNING);
        this.progress.setMessage("Reading threaded all-pairs.");
        this.progress.endAdjusting();
        int estimateChunkSize = estimateChunkSize();
        if (LOG.isInfoEnabled()) {
            LOG.info("Chunk-size estimated as: " + estimateChunkSize + " vectors per work unit.");
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Initialising chunker A.");
        }
        SeekableObjectSource newSeekableInstance = Chunker.newSeekableInstance(getSourceA(), estimateChunkSize);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Initialising chunker B.");
        }
        SeekableObjectSource newSeekableInstance2 = Chunker.newSeekableInstance(getSourceB(), estimateChunkSize);
        long j = 0;
        while (newSeekableInstance.hasNext()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Reading chunk A" + j);
            }
            Chunk chunk = (Chunk) newSeekableInstance.read();
            j++;
            chunk.setName(Long.toString(j));
            long j2 = 0;
            P position = newSeekableInstance2.position();
            while (newSeekableInstance2.hasNext()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Reading chunk B" + j2);
                }
                Chunk chunk2 = (Chunk) newSeekableInstance2.read();
                j2++;
                chunk2.setName(Long.toString(j2));
                this.progress.startAdjusting();
                this.progress.setMessage(MessageFormat.format("Queueing chunk pair {0,number} and {1,number}", Long.valueOf(j), Long.valueOf(j2)));
                updateProgress();
                this.progress.endAdjusting();
                NaiveApssTask newInstance = this.innerAlgorithm.newInstance();
                newInstance.setSourceA(new Chunk(chunk));
                newInstance.setSourceB(chunk2);
                newInstance.setMeasure(getMeasure());
                newInstance.setProducePair(getProducePair());
                newInstance.setProcessRecord(getProcessRecord());
                newInstance.setSink(getSink());
                newInstance.setStats(getStats());
                newInstance.setProperty("chunkPair", MessageFormat.format("{0,number} and {1,number}", Long.valueOf(j), Long.valueOf(j2)));
                queueTask(newInstance);
                this.queuedCount++;
                clearCompleted(false);
            }
            this.nChunks = j2;
            newSeekableInstance2.position(position);
        }
        getExecutor().shutdown();
        clearCompleted(true);
        getExecutor().awaitTermination(2147483647L, TimeUnit.DAYS);
        this.progress.startAdjusting();
        this.progress.setState(ProgressReporting.State.COMPLETED);
        this.progress.setProgressPercent(90);
        this.progress.setMessage("Finished");
        this.progress.endAdjusting();
    }

    void updateProgress() {
        if (this.nChunks != 0) {
            this.progress.setProgressPercent((int) (100.0d * ((this.completedCount + this.queuedCount) / ((this.nChunks * this.nChunks) * 2))));
        }
    }

    void clearCompleted(boolean z) throws Exception {
        if (z) {
            while (!getFutureQueue().isEmpty()) {
                Task task = getFutureQueue().poll().get();
                while (task.isExceptionTrapped()) {
                    task.throwTrappedException();
                }
                this.completedCount++;
                this.progress.startAdjusting();
                this.progress.setMessage("Completed chunk pair " + task.getProperty("chunkPair"));
                updateProgress();
                this.progress.endAdjusting();
            }
            return;
        }
        ArrayList arrayList = null;
        for (Future<? extends Task> future : getFutureQueue()) {
            if (future.isDone()) {
                Task task2 = future.get();
                while (task2.isExceptionTrapped()) {
                    task2.throwTrappedException();
                }
                this.completedCount++;
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(future);
                this.progress.startAdjusting();
                this.progress.setMessage("Completed chunk pair " + task2.getProperty("chunkPair"));
                updateProgress();
                this.progress.endAdjusting();
            }
        }
        if (arrayList == null || arrayList.isEmpty()) {
            return;
        }
        getFutureQueue().removeAll(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uk.ac.susx.mlcl.byblo.tasks.NaiveApssTask, uk.ac.susx.mlcl.lib.tasks.AbstractTask
    public void finaliseTask() throws Exception {
        if (getExecutor() != null) {
            getExecutor().shutdownNow();
        }
        super.finaliseTask();
    }

    <T extends Task> void queueTask(final T t) throws InterruptedException {
        if (t == null) {
            throw new NullPointerException("task is null");
        }
        this.throttle.acquire();
        try {
            Future<? extends Task> submit = getExecutor().submit(new Runnable() { // from class: uk.ac.susx.mlcl.byblo.tasks.ThreadedApssTask.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ThreadedApssTask.this.progress.startAdjusting();
                        ThreadedApssTask.this.progress.setMessage("Starting chunk pair " + t.getProperty("chunkPair"));
                        ThreadedApssTask.this.updateProgress();
                        ThreadedApssTask.this.progress.endAdjusting();
                        t.run();
                    } finally {
                        ThreadedApssTask.this.throttle.release();
                    }
                }
            }, t);
            if (getFutureQueue().add(submit)) {
            } else {
                throw new AssertionError(MessageFormat.format("Failed to add future {0} to futureQueue, presumably because it already existed.", submit));
            }
        } catch (RejectedExecutionException e) {
            this.throttle.release();
            throw e;
        } catch (RuntimeException e2) {
            this.throttle.release();
            throw e2;
        }
    }

    final int getNumThreads() {
        return this.nThreads;
    }

    final synchronized ExecutorService getExecutor() {
        return this.executor;
    }

    final synchronized Queue<Future<? extends Task>> getFutureQueue() {
        return this.futureQueue;
    }

    public final void setNumThreads(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("nThreads < 1");
        }
        this.nThreads = i;
    }

    private int getThrottleSize() {
        return getNumThreads() + 1;
    }

    private int estimateChunkSize() {
        double throttleSize = getThrottleSize();
        System.gc();
        double freeMaxMemory = MiscUtil.freeMaxMemory() / (((10000.0d * throttleSize) * ((throttleSize + 1.0d) / throttleSize)) * 12.0d);
        if (!$assertionsDisabled && freeMaxMemory <= Step.DEFAULT_BOUNDARY) {
            throw new AssertionError();
        }
        if (freeMaxMemory < 1.0d) {
            freeMaxMemory = 1.0d;
        }
        return (int) Math.floor(Math.min(freeMaxMemory, 4000.0d));
    }

    @Override // uk.ac.susx.mlcl.byblo.tasks.NaiveApssTask, uk.ac.susx.mlcl.lib.events.ProgressReporting
    public String getName() {
        return "threaded-allpairs";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // uk.ac.susx.mlcl.byblo.tasks.NaiveApssTask, uk.ac.susx.mlcl.lib.tasks.AbstractTask
    public Objects.ToStringHelper toStringHelper() {
        return super.toStringHelper().add("innerAlgorithm", this.innerAlgorithm).add("nThreads", this.nThreads).add("executor", this.executor).add("futureQueue", this.futureQueue).add("throttle", this.throttle);
    }

    static {
        $assertionsDisabled = !ThreadedApssTask.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(ThreadedApssTask.class);
        DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() + 1;
    }
}
