/*
* Copyright © 2019, 2020, 2021, 2022, 2023 Peter Doornbosch
*
* This file is part of Kwik, an implementation of the QUIC protocol in Java.
*
* Kwik is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* Kwik is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*/
package net.luminis.quic.stream;
import net.luminis.quic.EncryptionLevel;
import net.luminis.quic.QuicConnectionImpl;
import net.luminis.quic.QuicStream;
import net.luminis.quic.Version;
import net.luminis.quic.frame.*;
import net.luminis.quic.log.Logger;
import net.luminis.quic.log.NullLogger;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static net.luminis.quic.EncryptionLevel.App;
public class QuicStreamImpl extends BaseStream implements QuicStream {
protected static long waitForNextFrameTimeout = Long.MAX_VALUE;
protected static final float receiverMaxDataIncrementFactor = 0.10f;
private Object addMonitor = new Object();
protected final Version quicVersion;
protected final int streamId;
protected final QuicConnectionImpl connection;
protected final FlowControl flowController;
protected final Logger log;
private final StreamInputStream inputStream;
private final StreamOutputStream outputStream;
private volatile boolean aborted;
private long receiverFlowControlLimit;
private long lastCommunicatedMaxData;
private final long receiverMaxDataIncrement;
private volatile long lastOffset = -1;
private int sendBufferSize = 50 * 1024;
public QuicStreamImpl(int streamId, QuicConnectionImpl connection, FlowControl flowController) {
this(Version.getDefault(), streamId, connection, flowController, new NullLogger());
}
public QuicStreamImpl(int streamId, QuicConnectionImpl connection, FlowControl flowController, Logger log) {
this(Version.getDefault(), streamId, connection, flowController, log);
}
public QuicStreamImpl(Version quicVersion, int streamId, QuicConnectionImpl connection, FlowControl flowController, Logger log) {
this(quicVersion, streamId, connection, flowController, log, null);
}
QuicStreamImpl(Version quicVersion, int streamId, QuicConnectionImpl connection, FlowControl flowController, Logger log, Integer sendBufferSize) {
this.quicVersion = quicVersion;
this.streamId = streamId;
this.connection = connection;
this.flowController = flowController;
if (sendBufferSize != null && sendBufferSize > 0) {
this.sendBufferSize = sendBufferSize;
}
this.log = log;
inputStream = new StreamInputStream();
outputStream = createStreamOutputStream();
flowController.streamOpened(this);
receiverFlowControlLimit = connection.getInitialMaxStreamData();
lastCommunicatedMaxData = receiverFlowControlLimit;
receiverMaxDataIncrement = (long) (receiverFlowControlLimit * receiverMaxDataIncrementFactor);
}
@Override
public InputStream getInputStream() {
return inputStream;
}
@Override
public OutputStream getOutputStream() {
return outputStream;
}
/**
* Adds a newly received frame to the stream.
*
* This method is intentionally package-protected, as it should only be called by the (Stream)Packet processor.
* @param frame
*/
void add(StreamFrame frame) {
synchronized (addMonitor) {
super.add(frame);
if (frame.isFinal()) {
lastOffset = frame.getUpToOffset();
}
addMonitor.notifyAll();
}
}
@Override
protected boolean isStreamEnd(long offset) {
return lastOffset >= 0 && offset >= lastOffset;
}
@Override
public int getStreamId() {
return streamId;
}
@Override
public boolean isUnidirectional() {
// https://tools.ietf.org/html/draft-ietf-quic-transport-23#section-2.1
// "The second least significant bit (0x2) of the stream ID distinguishes
// between bidirectional streams (with the bit set to 0) and
// unidirectional streams (with the bit set to 1)."
return (streamId & 0x0002) == 0x0002;
}
@Override
public boolean isClientInitiatedBidirectional() {
// "Client-initiated streams have even-numbered stream IDs (with the bit set to 0)"
return (streamId & 0x0003) == 0x0000;
}
@Override
public boolean isServerInitiatedBidirectional() {
// "server-initiated streams have odd-numbered stream IDs"
return (streamId & 0x0003) == 0x0001;
}
@Override
public void closeInput(long applicationProtocolErrorCode) {
inputStream.stopInput(applicationProtocolErrorCode);
}
@Override
public void resetStream(long errorCode) {
outputStream.reset(errorCode);
}
@Override
public String toString() {
return "Stream " + streamId;
}
protected StreamOutputStream createStreamOutputStream() {
return new StreamOutputStream();
}
/**
* Terminates the receiving input stream (abruptly). Is called when peer sends a RESET_STREAM frame
*
* This method is intentionally package-protected, as it should only be called by the StreamManager class.
*
* @param errorCode
* @param finalSize
*/
void terminateStream(long errorCode, long finalSize) {
inputStream.terminate(errorCode, finalSize);
}
// TODO: QuicStream should have a close method that closes both input and output stream and releases all resources and marks itself as terminated.
/**
* Input stream for reading data received by the QUIC stream.
*/
protected class StreamInputStream extends InputStream {
private volatile boolean closed;
private volatile boolean reset;
private volatile Thread blockingReaderThread;
@Override
public int available() throws IOException {
return Integer.max(0, QuicStreamImpl.this.bytesAvailable());
}
// InputStream.read() contract:
// - The value byte is returned as an int in the range 0 to 255.
// - If no byte is available because the end of the stream has been reached, the value -1 is returned.
// - This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.
@Override
public int read() throws IOException {
byte[] data = new byte[1];
int bytesRead = read(data, 0, 1);
if (bytesRead == 1) {
return data[0] & 0xff;
}
else if (bytesRead < 0) {
// End of stream
return -1;
}
else {
// Impossible
throw new RuntimeException();
}
}
// InputStream.read() contract:
// - An attempt is made to read the requested number of bytes, but a smaller number may be read.
// - This method blocks until input data is available, end of file is detected, or an exception is thrown.
// - If requested number of bytes is greater than zero, an attempt is done to read at least one byte.
// - If no byte is available because the stream is at end of file, the value -1 is returned;
// otherwise, at least one byte is read and stored into the given byte array.
@Override
public int read(byte[] buffer, int offset, int len) throws IOException {
if (len == 0) {
return 0;
}
Instant readAttemptStarted = Instant.now();
long waitPeriod = waitForNextFrameTimeout;
while (true) {
if (aborted || closed || reset) {
throw new IOException(aborted? "Connection closed": closed? "Stream closed": "Stream reset by peer");
}
synchronized (addMonitor) {
try {
blockingReaderThread = Thread.currentThread();
int bytesRead = QuicStreamImpl.this.read(ByteBuffer.wrap(buffer, offset, len));
if (bytesRead > 0) {
updateAllowedFlowControl(bytesRead);
return bytesRead;
} else if (bytesRead < 0) {
// End of stream
return -1;
}
// Nothing read: block until bytes can be read, read timeout or abort
try {
addMonitor.wait(waitPeriod);
}
catch (InterruptedException e) {
// Nothing to do here: read will be abort in next loop iteration with IOException
}
}
finally {
blockingReaderThread = null;
}
}
if (bytesAvailable() <= 0) {
long waited = Duration.between(readAttemptStarted, Instant.now()).toMillis();
if (waited > waitForNextFrameTimeout) {
throw new SocketTimeoutException("Read timeout on stream " + streamId + "; read up to " + readOffset());
} else {
waitPeriod = Long.max(1, waitForNextFrameTimeout - waited);
}
}
}
}
@Override
public void close() throws IOException {
// Note that QUIC specification does not define application protocol error codes.
// By absence of an application specified error code, the arbitrary code 0 is used.
stopInput(0);
}
private void stopInput(long errorCode) {
if (! allDataReceived()) {
connection.send(new StopSendingFrame(quicVersion, streamId, errorCode), this::retransmitStopInput, true);
}
closed = true;
Thread blockingReader = blockingReaderThread;
if (blockingReader != null) {
blockingReader.interrupt();
}
}
private void retransmitStopInput(QuicFrame lostFrame) {
assert(lostFrame instanceof StopSendingFrame);
if (! allDataReceived()) {
connection.send(lostFrame, this::retransmitStopInput);
}
}
private void updateAllowedFlowControl(int bytesRead) {
// Slide flow control window forward (with as much bytes as are read)
receiverFlowControlLimit += bytesRead;
connection.updateConnectionFlowControl(bytesRead);
// Avoid sending flow control updates with every single read; check diff with last send max data
if (receiverFlowControlLimit - lastCommunicatedMaxData > receiverMaxDataIncrement) {
connection.send(new MaxStreamDataFrame(streamId, receiverFlowControlLimit), this::retransmitMaxData, true);
lastCommunicatedMaxData = receiverFlowControlLimit;
}
}
private void retransmitMaxData(QuicFrame lostFrame) {
connection.send(new MaxStreamDataFrame(streamId, receiverFlowControlLimit), this::retransmitMaxData);
log.recovery("Retransmitted max stream data, because lost frame " + lostFrame);
}
void terminate(long errorCode, long finalSize) {
if (!aborted && !closed && !reset) {
reset = true;
Thread blockingReader = blockingReaderThread;
if (blockingReader != null) {
blockingReader.interrupt();
}
}
}
void interruptBlockingThread() {
Thread readerBlocking = blockingReaderThread;
if (readerBlocking != null) {
readerBlocking.interrupt();
}
}
}
protected class StreamOutputStream extends OutputStream implements FlowControlUpdateListener {
// Minimum stream frame size: frame type (1), stream id (1..8), offset (1..8), length (1..2), data (1...)
// Note that in practice stream id and offset will seldom / never occupy 8 bytes, so the minimum leaves more room for data.
private static final int MIN_FRAME_SIZE = 1 + 8 + 8 + 2 + 1;
private final ByteBuffer END_OF_STREAM_MARKER = ByteBuffer.allocate(0);
private final Object lock = new Object();
// Send queue contains stream bytes to send in order. The position of the first byte buffer in the queue determines the next byte(s) to send.
private Queue sendQueue = new ConcurrentLinkedDeque<>();
private final int maxBufferSize;
private final AtomicInteger bufferedBytes;
private final ReentrantLock bufferLock;
private final Condition notFull;
// Current offset is the offset of the next byte in the stream that will be sent.
// Thread safety: only used by sender thread, so no synchronization needed.
private long currentOffset;
// Closed indicates whether the OutputStream is closed, meaning that no more bytes can be written by caller.
// Thread safety: only use by caller
private boolean closed;
// Send request queued indicates whether a request to send a stream frame is queued with the sender. Is used to avoid multiple requests being queued.
// Thread safety: read/set by caller and by sender thread, so must be synchronized; guarded by lock
private volatile boolean sendRequestQueued;
// Reset indicates whether the OutputStream has been reset.
private volatile boolean reset;
private volatile long resetErrorCode;
// Stream offset at which the stream was last blocked, for detecting the first time stream is blocked at a certain offset.
private long blockedOffset;
private volatile Thread blockingWriterThread;
StreamOutputStream() {
maxBufferSize = sendBufferSize;
bufferedBytes = new AtomicInteger();
bufferLock = new ReentrantLock();
notFull = bufferLock.newCondition();
flowController.register(QuicStreamImpl.this, this);
}
@Override
public void write(byte[] data) throws IOException {
write(data, 0, data.length);
}
@Override
public void write(byte[] data, int off, int len) throws IOException {
checkState();
if (len > maxBufferSize) {
// Buffering all would break the contract (because this method copies _all_ data) but splitting and
// writing smaller chunks (and waiting for each individual chunk to be buffered successfully) does not.
int halfBuffersize = maxBufferSize / 2;
int times = len / halfBuffersize;
for (int i = 0; i < times; i++) {
// Each individual write will probably block, but by splitting the writes in half buffer sizes
// avoids that the buffer needs to be emptied completely before a new block can be added (which
// could have severed negative impact on performance as the sender might have to wait for the caller
// to fill the buffer again).
write(data, off + i * halfBuffersize, halfBuffersize);
}
int rest = len % halfBuffersize;
if (rest > 0) {
write(data, off + times * halfBuffersize, rest);
}
return;
}
int availableBufferSpace = maxBufferSize - bufferedBytes.get();
if (len > availableBufferSpace) {
// Wait for enough buffer space to become available
bufferLock.lock();
blockingWriterThread = Thread.currentThread();
try {
while (maxBufferSize - bufferedBytes.get() < len) {
checkState();
try {
notFull.await();
} catch (InterruptedException e) {
throw new InterruptedIOException(aborted? "output aborted because connection is closed": "");
}
}
}
finally {
blockingWriterThread = null;
bufferLock.unlock();
}
}
sendQueue.add(ByteBuffer.wrap(Arrays.copyOfRange(data, off, off + len)));
bufferedBytes.getAndAdd(len);
synchronized (lock) {
if (! sendRequestQueued) {
sendRequestQueued = true;
connection.send(this::sendFrame, MIN_FRAME_SIZE, getEncryptionLevel(), this::retransmitStreamFrame, true);
}
}
}
@Override
public void write(int dataByte) throws IOException {
// Terrible for performance of course, but that is calling this method anyway.
byte[] data = new byte[] { (byte) dataByte };
write(data, 0, 1);
}
@Override
public void flush() throws IOException {
checkState();
// No-op, this implementation sends data as soon as possible.
}
@Override
public void close() throws IOException {
if (!closed && !reset) {
sendQueue.add(END_OF_STREAM_MARKER);
closed = true;
synchronized (lock) {
if (! sendRequestQueued) {
sendRequestQueued = true;
connection.send(this::sendFrame, MIN_FRAME_SIZE, getEncryptionLevel(), this::retransmitStreamFrame, true);
}
}
}
}
private void checkState() throws IOException {
if (closed || reset) {
throw new IOException("output stream " + (closed? "already closed": "is reset"));
}
if (aborted) {
throw new IOException("output aborted because connection is closed");
}
}
QuicFrame sendFrame(int maxFrameSize) {
if (reset) {
return null;
}
synchronized (lock) {
sendRequestQueued = false;
}
if (!sendQueue.isEmpty()) {
long flowControlLimit = flowController.getFlowControlLimit(QuicStreamImpl.this);
assert (flowControlLimit >= currentOffset);
int maxBytesToSend = bufferedBytes.get();
if (flowControlLimit > currentOffset || maxBytesToSend == 0) {
int nrOfBytes = 0;
StreamFrame dummy = new StreamFrame(quicVersion, streamId, currentOffset, new byte[0], false);
maxBytesToSend = Integer.min(maxBytesToSend, maxFrameSize - dummy.getFrameLength() - 1); // Take one byte extra for length field var int
int maxAllowedByFlowControl = (int) (flowController.increaseFlowControlLimit(QuicStreamImpl.this, currentOffset + maxBytesToSend) - currentOffset);
maxBytesToSend = Integer.min(maxAllowedByFlowControl, maxBytesToSend);
byte[] dataToSend = new byte[maxBytesToSend];
boolean finalFrame = false;
while (nrOfBytes < maxBytesToSend && !sendQueue.isEmpty()) {
ByteBuffer buffer = sendQueue.peek();
int position = nrOfBytes;
if (buffer.remaining() <= maxBytesToSend - nrOfBytes) {
// All bytes remaining in buffer will fit in stream frame
nrOfBytes += buffer.remaining();
buffer.get(dataToSend, position, buffer.remaining());
sendQueue.poll();
}
else {
// Just part of the buffer will fit in (and will fill up) the stream frame
buffer.get(dataToSend, position, maxBytesToSend - nrOfBytes);
nrOfBytes = maxBytesToSend; // Short form of: nrOfBytes += (maxBytesToSend - nrOfBytes)
}
}
if (!sendQueue.isEmpty() && sendQueue.peek() == END_OF_STREAM_MARKER) {
finalFrame = true;
sendQueue.poll();
}
if (nrOfBytes == 0 && !finalFrame) {
// Nothing to send really
return null;
}
bufferedBytes.getAndAdd(-1 * nrOfBytes);
bufferLock.lock();
try {
notFull.signal();
} finally {
bufferLock.unlock();
}
if (nrOfBytes < maxBytesToSend) {
// This can happen when not enough data is buffer to fill a stream frame, or length field is 1 byte (instead of 2 that was counted for)
dataToSend = Arrays.copyOfRange(dataToSend, 0, nrOfBytes);
}
StreamFrame streamFrame = new StreamFrame(quicVersion, streamId, currentOffset, dataToSend, finalFrame);
currentOffset += nrOfBytes;
if (!sendQueue.isEmpty()) {
synchronized (lock) {
sendRequestQueued = true;
}
// There is more to send, so queue a new send request.
connection.send(this::sendFrame, MIN_FRAME_SIZE, getEncryptionLevel(), this::retransmitStreamFrame, true);
}
if (streamFrame.isFinal()) {
finalFrameSent();
}
return streamFrame;
}
else {
// So flowControlLimit <= currentOffset
// Check if this condition hasn't been handled before
if (currentOffset != blockedOffset) {
// Not handled before, remember this offset, so this isn't executed twice for the same offset
blockedOffset = currentOffset;
// And let peer know
// https://www.rfc-editor.org/rfc/rfc9000.html#name-data-flow-control
// "A sender SHOULD send a STREAM_DATA_BLOCKED or DATA_BLOCKED frame to indicate to the receiver
// that it has data to write but is blocked by flow control limits."
connection.send(this::sendBlockReason, StreamDataBlockedFrame.getMaxSize(streamId), App, this::retransmitSendBlockReason, true);
}
}
}
return null;
}
protected void finalFrameSent() {
stopFlowControl();
}
@Override
public void streamNotBlocked(int streamId) {
// Stream might have been blocked (or it might have filled the flow control window exactly), queue send request
// and let sendFrame method determine whether there is more to send or not.
connection.send(this::sendFrame, MIN_FRAME_SIZE, getEncryptionLevel(), this::retransmitStreamFrame, false); // No need to flush, as this is called while processing received message
}
void interruptBlockingThread() {
Thread blocking = blockingWriterThread;
if (blocking != null) {
blocking.interrupt();
}
}
/**
* Sends StreamDataBlockedFrame or DataBlockedFrame to the peer, provided the blocked condition is still true.
* @param maxFrameSize
* @return
*/
private QuicFrame sendBlockReason(int maxFrameSize) {
// Retrieve actual block reason; could be "none" when an update has been received in the meantime.
BlockReason blockReason = flowController.getFlowControlBlockReason(QuicStreamImpl.this);
QuicFrame frame = null;
switch (blockReason) {
case STREAM_DATA_BLOCKED:
frame = new StreamDataBlockedFrame(quicVersion, streamId, currentOffset);
break;
case DATA_BLOCKED:
frame = new DataBlockedFrame(flowController.getConnectionDataLimit());
break;
}
return frame;
}
private void retransmitSendBlockReason(QuicFrame quicFrame) {
connection.send(this::sendBlockReason, StreamDataBlockedFrame.getMaxSize(streamId), App, this::retransmitSendBlockReason, true);
}
private void retransmitStreamFrame(QuicFrame frame) {
assert(frame instanceof StreamFrame);
if (! reset) {
connection.send(frame, this::retransmitStreamFrame);
log.recovery("Retransmitted lost stream frame " + frame);
}
}
protected EncryptionLevel getEncryptionLevel() {
return App;
}
private void restart() {
currentOffset = 0;
sendQueue.clear();
sendRequestQueued = false;
}
/**
* https://www.rfc-editor.org/rfc/rfc9000.html#name-operations-on-streams
* "reset the stream (abrupt termination), resulting in a RESET_STREAM frame (Section 19.4) if the stream was
* not already in a terminal state."
* @param errorCode
*/
protected void reset(long errorCode) {
if (!closed && !reset) {
reset = true;
resetErrorCode = errorCode;
// Use sender callback to ensure current offset used in reset frame is accessed by sender thread.
connection.send(this::createResetFrame, ResetStreamFrame.getMaximumFrameSize(streamId, errorCode), App, this::retransmitResetFrame, true);
// Ensure write is not blocked because of full write buffer
bufferLock.lock();
try {
notFull.signal();
}
finally {
bufferLock.unlock();
}
}
}
private QuicFrame createResetFrame(int maxFrameSize) {
assert(reset == true);
return new ResetStreamFrame(streamId, resetErrorCode, currentOffset);
}
private void retransmitResetFrame(QuicFrame frame) {
assert(frame instanceof ResetStreamFrame);
connection.send(frame, this::retransmitResetFrame);
}
}
/**
* Resets the output stream so data can again be send from the start of the stream (offset 0). Note that in such
* cases the caller must (again) provide the data to be sent.
*/
protected void resetOutputStream() {
outputStream.closed = false;
// TODO: this is currently not thread safe, see comment in EarlyDataStream how to fix.
outputStream.restart();
}
protected void stopFlowControl() {
// Done! Retransmissions may follow, but don't need flow control.
flowController.unregister(QuicStreamImpl.this);
flowController.streamClosed(QuicStreamImpl.this);
}
void abort() {
aborted = true;
inputStream.interruptBlockingThread();
outputStream.interruptBlockingThread();
}
}