/*
 * Decompiled with CFR 0.152.
 */
package net.luminis.quic.send;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.luminis.quic.EncryptionLevel;
import net.luminis.quic.GlobalAckGenerator;
import net.luminis.quic.IdleTimer;
import net.luminis.quic.PnSpace;
import net.luminis.quic.QuicConnectionImpl;
import net.luminis.quic.VersionHolder;
import net.luminis.quic.cc.CongestionControlEventListener;
import net.luminis.quic.cc.CongestionController;
import net.luminis.quic.cc.NewRenoCongestionController;
import net.luminis.quic.crypto.Aead;
import net.luminis.quic.crypto.ConnectionSecrets;
import net.luminis.quic.frame.QuicFrame;
import net.luminis.quic.frame.StreamFrame;
import net.luminis.quic.log.Logger;
import net.luminis.quic.packet.QuicPacket;
import net.luminis.quic.packet.RetryPacket;
import net.luminis.quic.packet.ShortHeaderPacket;
import net.luminis.quic.qlog.QLog;
import net.luminis.quic.recovery.RecoveryManager;
import net.luminis.quic.recovery.RttEstimator;
import net.luminis.quic.send.GlobalPacketAssembler;
import net.luminis.quic.send.SendItem;
import net.luminis.quic.send.SendRequestQueue;
import net.luminis.quic.send.SendStatistics;
import net.luminis.quic.send.Sender;

public class SenderImpl
implements Sender,
CongestionControlEventListener {
    private final Clock clock;
    private volatile int maxPacketSize;
    private volatile DatagramSocket socket;
    private final InetSocketAddress peerAddress;
    private final QuicConnectionImpl connection;
    private final CongestionController congestionController;
    private final RttEstimator rttEstimater;
    private final Logger log;
    private final QLog qlog;
    private final SendRequestQueue[] sendRequestQueue = new SendRequestQueue[EncryptionLevel.values().length];
    private final GlobalPacketAssembler packetAssembler;
    private final GlobalAckGenerator globalAckGenerator;
    private final RecoveryManager recoveryManager;
    private final IdleTimer idleTimer;
    private final Thread senderThread;
    private final boolean[] discardedSpaces = new boolean[PnSpace.values().length];
    private ConnectionSecrets connectionSecrets;
    private final Object condition = new Object();
    private boolean signalled;
    private volatile boolean running;
    private volatile boolean stopping;
    private volatile boolean stopped;
    private volatile int receiverMaxAckDelay;
    private volatile int datagramsSent;
    private volatile long bytesSent;
    private volatile long dataSent;
    private volatile long packetsSent;
    private AtomicInteger subsequentZeroDelays = new AtomicInteger();
    private volatile boolean lastDelayWasZero = false;
    private volatile int antiAmplificationLimit = -1;
    private volatile Runnable shutdownHook;

    public SenderImpl(VersionHolder version, int maxPacketSize, DatagramSocket socket, InetSocketAddress peerAddress, QuicConnectionImpl connection, Integer initialRtt, Logger log) {
        this(Clock.systemUTC(), version, maxPacketSize, socket, peerAddress, connection, initialRtt, log);
    }

    public SenderImpl(Clock clock, VersionHolder version, int maxPacketSize, DatagramSocket socket, InetSocketAddress peerAddress, QuicConnectionImpl connection, Integer initialRtt, Logger log) {
        this.clock = clock;
        this.maxPacketSize = maxPacketSize;
        this.socket = socket;
        this.peerAddress = peerAddress;
        this.connection = connection;
        this.log = log;
        this.qlog = log.getQLog();
        Arrays.stream(EncryptionLevel.values()).forEach(level -> {
            int levelIndex = level.ordinal();
            this.sendRequestQueue[levelIndex] = new SendRequestQueue(clock, (EncryptionLevel)((Object)level));
        });
        this.globalAckGenerator = new GlobalAckGenerator(this);
        this.packetAssembler = new GlobalPacketAssembler(version, this.sendRequestQueue, this.globalAckGenerator);
        this.congestionController = new NewRenoCongestionController(log, this);
        this.rttEstimater = initialRtt == null ? new RttEstimator(log) : new RttEstimator(log, initialRtt);
        this.recoveryManager = new RecoveryManager(connection.getRole(), this.rttEstimater, this.congestionController, this, log);
        connection.addHandshakeStateListener(this.recoveryManager);
        connection.addAckFrameReceivedListener(this.recoveryManager);
        this.idleTimer = connection.getIdleTimer();
        this.senderThread = new Thread(() -> this.sendLoop(), "sender-loop");
        this.senderThread.setDaemon(true);
    }

    public void start(ConnectionSecrets secrets) {
        this.connectionSecrets = secrets;
        this.senderThread.start();
    }

    @Override
    public void send(QuicFrame frame, EncryptionLevel level) {
        this.sendRequestQueue[level.ordinal()].addRequest(frame, f -> {});
    }

    @Override
    public void send(QuicFrame frame, EncryptionLevel level, Consumer<QuicFrame> frameLostCallback) {
        this.sendRequestQueue[level.ordinal()].addRequest(frame, frameLostCallback);
    }

    @Override
    public void send(Function<Integer, QuicFrame> frameSupplier, int minimumSize, EncryptionLevel level, Consumer<QuicFrame> lostCallback) {
        this.sendRequestQueue[level.ordinal()].addRequest(frameSupplier, minimumSize, lostCallback);
    }

    public void send(RetryPacket retryPacket) {
        try {
            this.send(List.of(new SendItem(retryPacket)));
        }
        catch (IOException e) {
            this.log.error("Sending packet failed: " + retryPacket);
        }
    }

    @Override
    public void setInitialToken(byte[] token) {
        if (token != null) {
            this.packetAssembler.setInitialToken(token);
        }
    }

    @Override
    public void sendAck(PnSpace pnSpace, int maxDelay) {
        this.sendRequestQueue[pnSpace.relatedEncryptionLevel().ordinal()].addAckRequest(maxDelay);
        if (maxDelay > 0) {
            // empty if block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendProbe(EncryptionLevel level) {
        boolean[] blArray = this.discardedSpaces;
        synchronized (this.discardedSpaces) {
            if (!this.discardedSpaces[level.relatedPnSpace().ordinal()]) {
                this.sendRequestQueue[level.ordinal()].addProbeRequest();
                this.wakeUpSenderLoop();
            } else {
                this.log.warn("Attempt to send probe on discarded space (" + level.relatedPnSpace() + ") => ignoring");
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendProbe(List<QuicFrame> frames, EncryptionLevel level) {
        boolean[] blArray = this.discardedSpaces;
        synchronized (this.discardedSpaces) {
            if (!this.discardedSpaces[level.relatedPnSpace().ordinal()]) {
                this.sendRequestQueue[level.ordinal()].addProbeRequest(frames);
                this.wakeUpSenderLoop();
            } else {
                this.log.warn("Attempt to send probe on discarded space (" + level.relatedPnSpace() + ") => ignoring");
            }
            // ** MonitorExit[var3_3] (shouldn't be in output)
            return;
        }
    }

    @Override
    public void packetProcessed(boolean expectingMore) {
        this.wakeUpSenderLoop();
    }

    @Override
    public void datagramProcessed(boolean expectingMore) {
    }

    @Override
    public void flush() {
        this.wakeUpSenderLoop();
    }

    public void changeAddress(DatagramSocket newSocket) {
        this.socket = newSocket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void discard(PnSpace space, String reason) {
        boolean[] blArray = this.discardedSpaces;
        synchronized (this.discardedSpaces) {
            if (!this.discardedSpaces[space.ordinal()]) {
                this.packetAssembler.stop(space);
                this.recoveryManager.stopRecovery(space);
                this.log.recovery("Discarding pn space " + space + " because " + reason);
                this.globalAckGenerator.discard(space);
                this.discardedSpaces[space.ordinal()] = true;
            }
            // ** MonitorExit[var3_3] (shouldn't be in output)
            return;
        }
    }

    public void stop() {
        Arrays.stream(this.sendRequestQueue).forEach(sendRequestQueue -> sendRequestQueue.clear());
        this.recoveryManager.stopRecovery();
        this.stopped = true;
    }

    public void shutdown(Runnable postShutdownAction) {
        assert (this.stopped);
        this.shutdownHook = postShutdownAction;
        this.stopping = true;
        this.senderThread.interrupt();
    }

    @Override
    public void bytesInFlightIncreased(long bytesInFlight) {
    }

    @Override
    public void bytesInFlightDecreased(long bytesInFlight) {
        this.wakeUpSenderLoop();
    }

    private void sendLoop() {
        try {
            this.running = true;
            while (this.running) {
                this.doLoopIteration();
            }
        }
        catch (Throwable fatalError) {
            if (this.running) {
                this.log.error("Sender thread aborted with exception", fatalError);
                this.connection.abortConnection(fatalError);
            }
            this.log.warn("Ignoring " + fatalError + " because sender is shutting down.");
        }
        if (this.shutdownHook != null) {
            this.shutdownHook.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void doLoopIteration() throws IOException {
        Object object = this.condition;
        synchronized (object) {
            try {
                long timeout;
                if (!this.signalled && (timeout = this.determineMaximumWaitTime()) > 0L) {
                    this.condition.wait(timeout);
                }
                this.signalled = false;
            }
            catch (InterruptedException e) {
                this.log.debug("Sender thread is interrupted; probably shutting down? " + this.running);
            }
        }
        if (this.stopping) {
            this.running = false;
        }
        this.sendIfAny();
    }

    void sendIfAny() throws IOException {
        List<SendItem> items;
        do {
            if ((items = this.assemblePacket()).isEmpty()) continue;
            this.send(items);
        } while (!items.isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeUpSenderLoop() {
        Object object = this.condition;
        synchronized (object) {
            this.signalled = true;
            this.condition.notify();
        }
    }

    long determineMaximumWaitTime() {
        Optional<Instant> nextDelayedSendTime = this.packetAssembler.nextDelayedSendTime();
        if (nextDelayedSendTime.isPresent()) {
            long delay = Long.max(Duration.between(this.clock.instant(), nextDelayedSendTime.get()).toMillis(), 0L);
            if (delay > 0L) {
                this.subsequentZeroDelays.set(0);
                this.lastDelayWasZero = false;
                return delay;
            }
            if (this.lastDelayWasZero) {
                int count = this.subsequentZeroDelays.incrementAndGet();
                if (count % 20 == 3) {
                    this.log.error("possible bug: sender is looping in busy wait; got " + count + " iterations");
                }
                if (count > 10003) {
                    return 8000L;
                }
            }
            this.lastDelayWasZero = true;
            return 0L;
        }
        return 5000L;
    }

    void send(List<SendItem> itemsToSend) throws IOException {
        byte[] datagramData = new byte[this.maxPacketSize];
        ByteBuffer buffer = ByteBuffer.wrap(datagramData);
        try {
            itemsToSend.stream().map(item -> item.getPacket()).forEach(packet -> {
                Aead aead = this.connectionSecrets.getOwnAead(packet.getEncryptionLevel());
                if (aead == null) {
                    throw new IllegalStateException("Missing keys for encryption level " + packet.getEncryptionLevel());
                }
                byte[] packetData = packet.generatePacketBytes(aead);
                buffer.put(packetData);
                this.log.raw("packet sent, pn: " + packet.getPacketNumber(), packetData);
            });
        }
        catch (BufferOverflowException bufferOverflow) {
            this.log.error("Buffer overflow while generating datagram for " + itemsToSend);
            throw bufferOverflow;
        }
        DatagramPacket datagram = new DatagramPacket(datagramData, buffer.position(), this.peerAddress.getAddress(), this.peerAddress.getPort());
        Instant timeSent = this.clock.instant();
        this.socket.send(datagram);
        ++this.datagramsSent;
        this.packetsSent += (long)itemsToSend.size();
        this.bytesSent += (long)buffer.position();
        itemsToSend.stream().forEach(item -> {
            this.recoveryManager.packetSent(item.getPacket(), timeSent, item.getPacketLostCallback());
            this.idleTimer.packetSent(item.getPacket(), timeSent);
        });
        List<QuicPacket> packetsSent = itemsToSend.stream().map(item -> item.getPacket()).collect(Collectors.toList());
        this.log.sent(timeSent, packetsSent);
        this.dataSent += SenderImpl.countDataBytes(packetsSent);
        this.qlog.emitPacketSentEvent(packetsSent, timeSent);
    }

    private List<SendItem> assemblePacket() {
        int remainingCwnd = (int)this.congestionController.remainingCwnd();
        int currentMaxPacketSize = this.maxPacketSize;
        if (this.antiAmplificationLimit >= 0) {
            if (this.bytesSent < (long)this.antiAmplificationLimit) {
                currentMaxPacketSize = Integer.min(currentMaxPacketSize, (int)((long)this.antiAmplificationLimit - this.bytesSent));
            } else {
                this.log.warn("Cannot send; anti-amplification limit is reached");
                return Collections.emptyList();
            }
        }
        byte[] srcCid = this.connection.getSourceConnectionId();
        byte[] destCid = this.connection.getDestinationConnectionId();
        return this.packetAssembler.assemble(remainingCwnd, currentMaxPacketSize, srcCid, destCid);
    }

    private Instant earliest(Instant instant1, Instant instant2) {
        if (instant1 == null) {
            return instant2;
        }
        if (instant2 == null) {
            return instant1;
        }
        if (instant1.isBefore(instant2)) {
            return instant1;
        }
        return instant2;
    }

    private static long countDataBytes(List<QuicPacket> packets) {
        return packets.stream().filter(p -> p instanceof ShortHeaderPacket).mapToInt(p -> p.getFrames().stream().filter(f -> f instanceof StreamFrame).mapToInt(f -> ((StreamFrame)f).getLength()).sum()).sum();
    }

    public SendStatistics getStatistics() {
        return new SendStatistics(this.datagramsSent, this.packetsSent, this.bytesSent, this.dataSent, this.recoveryManager.getLost(), this.rttEstimater.getSmoothedRtt(), this.rttEstimater.getRttVar(), this.rttEstimater.getLatestRtt());
    }

    public int getPto() {
        return this.rttEstimater.getSmoothedRtt() + 4 * this.rttEstimater.getRttVar() + this.receiverMaxAckDelay;
    }

    public CongestionController getCongestionController() {
        return this.congestionController;
    }

    public void setReceiverMaxAckDelay(int maxAckDelay) {
        this.receiverMaxAckDelay = maxAckDelay;
        this.rttEstimater.setMaxAckDelay(maxAckDelay);
    }

    public GlobalAckGenerator getGlobalAckGenerator() {
        return this.globalAckGenerator;
    }

    public void setAntiAmplificationLimit(int antiAmplificationLimit) {
        this.antiAmplificationLimit = antiAmplificationLimit;
    }

    public void unsetAntiAmplificationLimit() {
        this.antiAmplificationLimit = -1;
    }

    public void enableAllLevels() {
        this.packetAssembler.enableAppLevel();
    }

    public void enableAppLevel() {
        this.packetAssembler.enableAppLevel();
    }

    public void registerMaxUdpPayloadSize(int maxUdpPayloadSize) {
        if (maxUdpPayloadSize < this.maxPacketSize) {
            this.maxPacketSize = maxUdpPayloadSize;
        }
    }
}

