/*
* Copyright © 2020, 2021, 2022, 2023 Peter Doornbosch
*
* This file is part of Kwik, an implementation of the QUIC protocol in Java.
*
* Kwik is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* Kwik is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
* more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*/
package net.luminis.quic.stream;
import net.luminis.quic.*;
import net.luminis.quic.frame.MaxStreamsFrame;
import net.luminis.quic.frame.QuicFrame;
import net.luminis.quic.frame.StreamFrame;
import net.luminis.quic.log.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
class StreamManagerTest {
private StreamManager streamManager;
private QuicConnectionImpl quicConnection;
@BeforeEach
void init() {
quicConnection = mock(QuicConnectionImpl.class);
streamManager = new StreamManager(quicConnection, Role.Client, mock(Logger.class), 10, 10);
streamManager.setFlowController(mock(FlowControl.class));
}
@Test
void canCreateBidirectionalStreamWhenMaxStreamsNotReached() {
// Given
streamManager.setInitialMaxStreamsBidi(3);
// When
QuicStream stream1 = streamManager.createStream(true);
QuicStream stream2 = streamManager.createStream(true);
QuicStream stream3 = streamManager.createStream(true);
// Then
assertThat(stream1).isNotNull();
assertThat(stream2).isNotNull();
assertThat(stream3).isNotNull();
}
@Test
void canCreateUnirectionalStreamWhenMaxStreamsNotReached() {
// Given
streamManager.setInitialMaxStreamsUni(3);
// When
QuicStream stream1 = streamManager.createStream(false);
QuicStream stream2 = streamManager.createStream(false);
QuicStream stream3 = streamManager.createStream(false);
// Then
assertThat(stream1).isNotNull();
assertThat(stream2).isNotNull();
assertThat(stream3).isNotNull();
}
@Test
void cannotCreateBidirectionalStreamWhenMaxStreamsReached() {
// Given
streamManager.setInitialMaxStreamsBidi(1);
QuicStream stream1 = streamManager.createStream(true);
assertThatThrownBy(
// When
() -> streamManager.createStream(true, 1, TimeUnit.MILLISECONDS)
// Then
).isInstanceOf(TimeoutException.class);
assertThat(stream1).isNotNull();
}
@Test
void cannotCreateUnirectionalStreamWhenMaxStreamsReached() {
// Given
streamManager.setInitialMaxStreamsUni(1);
QuicStream stream1 = streamManager.createStream(false);
assertThatThrownBy(
// When
() -> streamManager.createStream(false, 1, TimeUnit.MILLISECONDS)
// Then
).isInstanceOf(TimeoutException.class);
assertThat(stream1).isNotNull();
}
@Test
void canCreateBidirectionalStreamWhenMaxStreamsIsIncreased() {
// Given
streamManager.setInitialMaxStreamsBidi(1);
QuicStream stream1 = streamManager.createStream(true);
// When
streamManager.process(new MaxStreamsFrame(8, true));
QuicStream stream2 = streamManager.createStream(true);
// Then
assertThat(stream1).isNotNull();
assertThat(stream2).isNotNull();
}
@Test
void canCreateUndirectionalStreamWhenMaxStreamsIsIncreased() {
// Given
streamManager.setInitialMaxStreamsUni(1);
QuicStream stream1 = streamManager.createStream(false);
// When
streamManager.process(new MaxStreamsFrame(8, false));
QuicStream stream2 = streamManager.createStream(false);
// Then
assertThat(stream1).isNotNull();
assertThat(stream2).isNotNull();
}
@Test
void maxBidiStreamsCanNeverDecrease() {
// Given
streamManager.setInitialMaxStreamsBidi(1);
streamManager.process(new MaxStreamsFrame(8, true));
// When
streamManager.process(new MaxStreamsFrame(5, true));
// Then
assertThat(streamManager.getMaxBidirectionalStreams()).isGreaterThanOrEqualTo(8);
}
@Test
void maxUniStreamsCanNeverDecrease() {
// Given
streamManager.setInitialMaxStreamsUni(1);
streamManager.process(new MaxStreamsFrame(8, false));
// When
streamManager.process(new MaxStreamsFrame(5, false));
// Then
assertThat(streamManager.getMaxUnirectionalStreams()).isGreaterThanOrEqualTo(8);
}
@Test
void settingInitialMaxBidiStreamsCanOnlyIncreaseValue() {
// Given
streamManager.setInitialMaxStreamsBidi(4);
// When
streamManager.setInitialMaxStreamsBidi(3);
assertThat(streamManager.getMaxBidirectionalStreams()).isEqualTo(4);
}
@Test
void settingInitialMaxUniStreamsCanOnlyIncreaseValue() {
// Given
streamManager.setInitialMaxStreamsUni(10);
// When
streamManager.setInitialMaxStreamsUni(3);
assertThat(streamManager.getMaxUnirectionalStreams()).isEqualTo(10);
}
@Test
void blockingCreateBidirectionalStreamContinuesWhenMaxStreamsIsIncreased() throws Exception {
// Given
streamManager.setInitialMaxStreamsBidi(1);
QuicStream firstStream = streamManager.createStream(true);
assertThat(firstStream).isNotNull();
AtomicReference streamReference = new AtomicReference<>();
new Thread(() -> {
// Creating the stream should block, because there are not credits at the moment
QuicStream stream = streamManager.createStream(true);
streamReference.set(stream);
}).start();
Thread.sleep(50); // Give parallel thread a little time to start, so it blocks before this thread continues
assertThat(streamReference.get()).isNull(); // This should more or less prove the thread is blocking
// When
streamManager.process(new MaxStreamsFrame(2, true));
Thread.sleep(50); // Give parallel thread a little time to finish
// Then
assertThat(streamReference.get()).isNotNull();
}
@Test
void blockingCreateUnirectionalStreamContinuesWhenMaxStreamsIsIncreased() throws Exception {
// Given
streamManager.setInitialMaxStreamsUni(1);
QuicStream firstStream = streamManager.createStream(false);
assertThat(firstStream).isNotNull();
AtomicReference streamReference = new AtomicReference<>();
new Thread(() -> {
// Creating the stream should block, because there are not credits at the moment
QuicStream stream = streamManager.createStream(false);
streamReference.set(stream);
}).start();
Thread.sleep(50); // Give parallel thread a little time to start, so it blocks before this thread continues
assertThat(streamReference.get()).isNull(); // This should more or less prove the thread is blocking
// When
streamManager.process(new MaxStreamsFrame(2, false));
Thread.sleep(50); // Give parallel thread a little time to finish
// Then
assertThat(streamReference.get()).isNotNull();
}
@Test
void creatingEarlyDataStreamShouldNotBlockWhenMaxStreamsReached() throws Exception {
// Given
streamManager.setInitialMaxStreamsUni(1);
QuicStream firstStream = streamManager.createStream(false);
assertThat(firstStream).isNotNull();
// When
QuicStreamImpl earlyDataStream = streamManager.createEarlyDataStream(true);
// Then
assertThat(earlyDataStream).isNull();
}
@Test
void serverInitiatedStreamShouldHaveOddId() {
// Given
streamManager = new StreamManager(mock(QuicConnectionImpl.class), Role.Server, mock(Logger.class), 10, 10);
streamManager.setFlowController(mock(FlowControl.class));
streamManager.setInitialMaxStreamsUni(1);
// When
QuicStream stream = streamManager.createStream(false);
// Then
assertThat(stream.getStreamId() % 4).isEqualTo(3); // 0x3 | Server-Initiated, Unidirectional
assertThat(stream.getStreamId() % 2).isEqualTo(1);
}
@Test
void inServerRoleClientInitiatedStreamCausesCallback() throws Exception {
// Given
streamManager = new StreamManager(mock(QuicConnectionImpl.class), Role.Server, mock(Logger.class), 10, 10);
streamManager.setFlowController(mock(FlowControl.class));
streamManager.setInitialMaxStreamsBidi(1);
List openedStreams = new ArrayList<>();
streamManager.setPeerInitiatedStreamCallback(stream -> openedStreams.add(stream));
// When
streamManager.process(new StreamFrame(0, new byte[100], true));
// Then
assertThat(openedStreams).hasSize(1);
assertThat(openedStreams.get(0).getStreamId()).isEqualTo(0);
}
@Test
void whenStreamLimitIsReachedCreateStreamLeadsToTransportErrorException() throws Exception {
// Given
int i;
for (i = 0; i < 10; i++) {
streamManager.process(new StreamFrame(i * 4 + 1, new byte[0], false));
}
int next = i;
assertThatThrownBy(() ->
// When
streamManager.process(new StreamFrame(next * 4 + 1, new byte[0], false)))
// Then
.isInstanceOf(TransportError.class);
}
@Test
void whenStreamLimitIsReachedImplicitlyCreateStreamLeadsToTransportErrorException() throws Exception {
// Given
streamManager.process(new StreamFrame(9 * 4 + 1, new byte[0], false));
int next = 10;
assertThatThrownBy(() ->
// When
streamManager.process(new StreamFrame(next * 4 + 1, new byte[0], false)))
// Then
.isInstanceOf(TransportError.class);
}
@Test
void whenStreamIsClosedOneMoreCanBeOpened() throws Exception {
// Given
int streamId = 9 * 4 + 1;
streamManager.process(new StreamFrame(streamId, new byte[0], false));
// When
StreamFrame closeFrame = new StreamFrame(streamId, new byte[0], true);
streamManager.process(closeFrame);
// Then
int nextStreamId = 10 * 4 + 1;
// Assert that the next line does not throw
streamManager.process(new StreamFrame(nextStreamId, new byte[0], false));
// And
verifyMaxStreamsFrameIsToBeSent(11);
}
@Test
void whenStreamIsClosedInSameFrameOneMoreCanBeOpened() throws Exception {
// Given
int streamId = 9 * 4 + 1;
// When
streamManager.process(new StreamFrame(streamId, new byte[0], true));
// Then
int nextStreamId = 10 * 4 + 1;
// Assert that the next line does not throw
streamManager.process(new StreamFrame(nextStreamId, new byte[0], false));
// And
verifyMaxStreamsFrameIsToBeSent(11);
}
@Test
void whenMultipleStreamsAreClosedOnlyOneMaxStreamsFrameIsSent() throws Exception {
// When
for (int i = 0; i < 10; i++) {
streamManager.process(new StreamFrame(i * 4 + 1, new byte[0], true));
}
verifyMaxStreamsFrameIsToBeSent(20);
}
void verifyMaxStreamsFrameIsToBeSent(int expectedMaxStreams) {
ArgumentCaptor> captor = ArgumentCaptor.forClass(Function.class);
verify(quicConnection).send(captor.capture(), anyInt(), any(EncryptionLevel.class), any(Consumer.class));
QuicFrame frame = captor.getValue().apply(9);
assertThat(frame).isInstanceOf(MaxStreamsFrame.class);
assertThat(((MaxStreamsFrame) frame).getMaxStreams()).isEqualTo(expectedMaxStreams);
}
}