/*
* Copyright © 2020, 2021, 2022, 2023 Peter Doornbosch
*
* This file is part of Kwik, an implementation of the QUIC protocol in Java.
*
* Kwik is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* Kwik is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*/
package net.luminis.quic.stream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
public class BaseStream {
private SortedSet frames = new TreeSet<>();
private long processedToOffset = 0;
/**
* Add a stream frame to this stream. The frame can contain any number of bytes positioned anywhere in the stream;
* the read method will take care of returning stream bytes in the right order, without gaps.
* @param frame
* @return true if the frame is adds bytes to this stream; false if the frame does not add bytes to the stream
* (because the frame is a duplicate or its stream bytes where already received with previous frames).
*/
protected synchronized boolean add(StreamElement frame) {
if (frame.getUpToOffset() > processedToOffset) {
frames.add(frame);
return true;
}
else {
return false;
}
}
/**
* Returns the number of bytes that can be read from this stream.
* @return
*/
protected synchronized int bytesAvailable() {
if (isStreamEnd(processedToOffset)) {
return -1;
}
if (frames.isEmpty()) {
return 0;
}
else {
int available = 0;
long countedUpTo = processedToOffset;
Iterator iterator = frames.iterator();
while (iterator.hasNext()) {
StreamElement nextFrame = iterator.next();
if (nextFrame.getOffset() <= countedUpTo) {
if (nextFrame.getUpToOffset() > countedUpTo) {
available += nextFrame.getUpToOffset() - countedUpTo;
countedUpTo = nextFrame.getUpToOffset();
}
} else {
break;
}
}
return available;
}
}
/**
* Read a much as possible bytes from the stream (limited by the size of the given buffer or the number of bytes
* available on the stream). If no byte is available because the end of the stream has been reached, the value -1 is returned.
* Does not block: returns 0 when no bytes can be read.
* @param buffer
* @return
*/
protected synchronized int read(ByteBuffer buffer) {
if (isStreamEnd(processedToOffset)) {
return -1;
}
if (frames.isEmpty()) {
return 0;
}
else {
int read = 0;
long readUpTo = processedToOffset;
Iterator iterator = frames.iterator();
while (iterator.hasNext() && buffer.remaining() > 0) {
StreamElement nextFrame = iterator.next();
if (nextFrame.getOffset() <= readUpTo) {
if (nextFrame.getUpToOffset() > readUpTo) {
long available = nextFrame.getOffset() - readUpTo + nextFrame.getLength();
int bytesToRead = (int) Long.min(buffer.limit() - buffer.position(), available);
buffer.put(nextFrame.getStreamData(), (int) (readUpTo - nextFrame.getOffset()), bytesToRead);
readUpTo += bytesToRead;
read += bytesToRead;
}
} else {
break;
}
}
processedToOffset += read;
removeParsedFrames();
return read;
}
}
/**
* Determines whether all data (up to stream end offset) is received (but might have not been read)
*
* @return true if all data has been received, false otherwise
*/
protected synchronized boolean allDataReceived() {
if (isStreamEnd(processedToOffset)) {
return true;
}
else {
long completeUpTo = processedToOffset;
Iterator iterator = frames.iterator();
while (iterator.hasNext()) {
StreamElement nextFrame = iterator.next();
if (nextFrame.getOffset() <= completeUpTo) {
if (nextFrame.getUpToOffset() > completeUpTo) {
completeUpTo = nextFrame.getUpToOffset();
}
} else {
// There is a hole between
break;
}
}
return isStreamEnd(completeUpTo);
}
}
/**
* Indicates whether the given offset is end of stream.
* @param offset
* @return when offset is beyond the last byte of the stream. For example, if offset is equal to the length of the
* stream, return value should be true.
*/
protected boolean isStreamEnd(long offset) {
return false;
}
private void removeParsedFrames() {
Iterator iterator = frames.iterator();
while (iterator.hasNext()) {
if (iterator.next().getUpToOffset() <= processedToOffset) {
iterator.remove();
}
else {
break;
}
}
}
/**
* Returns the position in the stream up to where stream bytes are read.
* @return
*/
protected synchronized long readOffset() {
return processedToOffset;
}
}