package cascading.flow.hadoop.stream;

import cascading.flow.FlowProcess;
import cascading.flow.stream.MemoryHashJoinGate;
import cascading.pipe.HashJoin;
import cascading.provider.FactoryLoader;
import cascading.tuple.Tuple;
import cascading.tuple.collect.Spillable;
import cascading.tuple.collect.SpillableTupleList;
import cascading.tuple.collect.TupleMapFactory;
import cascading.tuple.hadoop.collect.HadoopTupleMapFactory;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/flow/hadoop/stream/HadoopMemoryJoinGate.class */
public class HadoopMemoryJoinGate extends MemoryHashJoinGate {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopMemoryJoinGate.class);
    private final SpillListener spillListener;
    private TupleMapFactory<JobConf> tupleMapFactory;

    /* loaded from: input_file:cascading/flow/hadoop/stream/HadoopMemoryJoinGate$Spill.class */
    public enum Spill {
        Num_Spills_Written,
        Num_Spills_Read,
        Num_Tuples_Spilled,
        Duration_Millis_Written
    }

    /* loaded from: input_file:cascading/flow/hadoop/stream/HadoopMemoryJoinGate$SpillListener.class */
    private class SpillListener implements Spillable.SpillListener {
        private final FlowProcess<JobConf> flowProcess;

        public SpillListener(FlowProcess<JobConf> flowProcess) {
            this.flowProcess = flowProcess;
        }

        @Override // cascading.tuple.collect.Spillable.SpillListener
        public void notifyWriteSpillBegin(Spillable spillable, int i, String str) {
            int spillCount = spillable.spillCount();
            if (spillCount % 10 == 0) {
                HadoopMemoryJoinGate.LOG.info("spilling grouping: {}, num times: {}, with reason: {}", spillable.getGrouping().print(), Integer.valueOf(spillCount + 1), str);
                Runtime runtime = Runtime.getRuntime();
                HadoopMemoryJoinGate.LOG.info("mem on spill (mb), free: " + ((runtime.freeMemory() / FileUtils.ONE_KB) / FileUtils.ONE_KB) + ", total: " + ((runtime.totalMemory() / FileUtils.ONE_KB) / FileUtils.ONE_KB) + ", max: " + ((runtime.maxMemory() / FileUtils.ONE_KB) / FileUtils.ONE_KB));
            }
            HadoopMemoryJoinGate.LOG.info("spilling {} tuples in list to file number {}", Integer.valueOf(i), Integer.valueOf(spillCount + 1));
            this.flowProcess.increment(Spill.Num_Spills_Written, 1L);
            this.flowProcess.increment(Spill.Num_Tuples_Spilled, i);
        }

        @Override // cascading.tuple.collect.Spillable.SpillListener
        public void notifyWriteSpillEnd(SpillableTupleList spillableTupleList, long j) {
            this.flowProcess.increment(Spill.Duration_Millis_Written, j);
        }

        @Override // cascading.tuple.collect.Spillable.SpillListener
        public void notifyReadSpillBegin(Spillable spillable) {
            this.flowProcess.increment(Spill.Num_Spills_Read, 1L);
        }
    }

    public HadoopMemoryJoinGate(FlowProcess<JobConf> flowProcess, HashJoin hashJoin) {
        super(flowProcess, hashJoin);
        this.spillListener = new SpillListener(flowProcess);
        this.tupleMapFactory = (TupleMapFactory) FactoryLoader.getInstance().loadFactoryFrom(flowProcess, TupleMapFactory.TUPLE_MAP_FACTORY, HadoopTupleMapFactory.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // cascading.flow.stream.MemorySpliceGate
    public Set<Tuple> createKeySet() {
        return new HashSet();
    }

    @Override // cascading.flow.stream.MemorySpliceGate
    protected Map<Tuple, Collection<Tuple>> createTupleMap() {
        Map<Tuple, Collection<Tuple>> create = this.tupleMapFactory.create(this.flowProcess);
        if (create instanceof Spillable) {
            ((Spillable) create).setSpillListener(this.spillListener);
        }
        return create;
    }

    @Override // cascading.flow.stream.MemoryHashJoinGate
    protected void waitOnLatch() {
    }

    @Override // cascading.flow.stream.MemoryHashJoinGate
    protected void countDownLatch() {
    }
}
