Java 17+ implementation of the Durable Streams protocol.
Passes the durable-streams conformance suite.
Note: While this library can run on Java 17, JDK 21+ is highly recommended to leverage Virtual Threads for high-concurrency performance and scalability.
durable-streams-core- protocol types and helpersdurable-streams-client- JDK HttpClient clientdurable-streams-json-spi- JSON serialization SPI (library-agnostic)durable-streams-json-jackson- Jackson implementation for JSON mode (optional)durable-streams-server-spi- server storage/policy abstractionsdurable-streams-server-core- protocol enginedurable-streams-servlet- Servlet integration helpers (like Spring MVC)durable-streams-spring-webflux- Spring WebFlux integration helpersdurable-streams-micronaut- Micronaut integration helpersdurable-streams-quarkus- Quarkus integration helpersdurable-streams-ktor- Ktor integration helpersdurable-streams-conformance-runner- conformance server/client runner
These modules expose the full protocol via framework-specific adapters and are used in conformance tests:
example-micronaut(port 4431)example-quarkus(port 4432)example-spring-webflux(port 4433)example-spring-webmvc(port 4434)example-ktor(port 4435)
- High-Performance File Storage: Uses synchronous
FileChannelI/O on Virtual Threads (Java 21+) with a bounded dedicated I/O executor to prevent platform thread starvation. - Strict Concurrency: Per-stream
ReentrantLockensures atomic appends and metadata updates. - Efficient Waiting: Lock-free await queues (
ConcurrentLinkedQueue) for thousands of concurrent long-poll/SSE connections. - Protocol Conformance: Fully compliant with the Durable Streams protocol (131/131 tests passed), including:
- Strict byte-offset tracking
- Streaming JSON parsing (Jackson-default) for low memory footprint
- Correct ETag generation and cache control
- Proper handling of
Stream-Seqfor writer coordination
Early in development, we benchmarked three storage approaches:
- Blocking I/O - Synchronous
FileChannel(baseline) - NIO Async -
AsynchronousFileChannelwith callbacks - Virtual Threads - Blocking I/O wrapped with virtual thread executor (recommanded)
| Workload | Blocking | NIO Async | Virtual Threads (Winner) |
|---|---|---|---|
| Sequential writes | Baseline | Slower (callback overhead) | Similar to baseline |
| Sequential reads | Baseline | Slower (callback overhead) | Similar to baseline |
| Concurrent reads | Baseline | 1.08x faster | 1.33x faster ⭐ |
| Mixed (70% read, 30% write) | Baseline | Equivalent | Equivalent |
| Await latency | ~2.4ms | ~2.4ms | ~2.4ms |
| Metric | InMemory p50 | InMemory p99 | Filestore p50 | Filestore p99 | Unit |
|---|---|---|---|---|---|
| Baseline Ping | 0.828 | 1.930 | 0.812 | 2.327 | ms |
| Latency - Total RTT | 2.450 | 5.867 | 3.399 | 8.347 | ms |
| Latency - Ping | 1.033 | 2.372 | 1.063 | 2.682 | ms |
| Latency - Overhead | 1.391 | 4.393 | 2.291 | 6.795 | ms |
| Throughput - Small Messages | 34108.27 | 45143.67 | 24712.04 | 36643.46 | msg/sec |
| Throughput - Large Messages | 135.28 | 148.48 | 123.58 | 137.98 | msg/sec |
- CPU: 13th Gen Intel(R) Core(TM) i5-13600K (14C/20T)
- RAM: 64 GB
- OS: Windows 11 Pro (10.0.26200, build 26200)
- JDK: Temurin OpenJDK 25.0.1 LTS
- Node.js: v24.12.0
- Storage: WD_BLACK SN850X 2 TB
- Virtual Threads won for concurrent read-heavy workloads (1.33x faster)
- NIO async showed callback overhead in sequential operations
- All implementations had similar await latency (~2.4ms)
- Virtual threads provide the best balance of performance and code simplicity
Java's AsynchronousFileChannel is not truly asynchronous. It uses an internal thread pool to emulate async behavior because most operating systems (pre-io_uring on Linux) don't provide native async file I/O APIs. This means:
AsynchronousFileChannel= Hidden thread pool + Blocking I/O + Callback wrapper- Virtual Threads = Explicit thread pool(carrier thread) + Blocking I/O + Simpler code
Since both approaches use threads internally, Virtual Threads eliminate the callback complexity while delivering better performance (1.33x faster for concurrent reads). We get:
- ✅ Simpler, more maintainable code
- ✅ Better performance
- ✅ Full control over thread pool sizing
- ✅ No hidden thread pool surprises
Conclusion: We chose Virtual Threads + Blocking I/O as it delivers superior concurrent performance without callback complexity.
Note: The async NIO implementation was removed in favor of the simpler and faster virtual thread approach. See commit
40fba4bfor the original benchmark code.
JSON mode is required by the protocol and implemented via the JSON SPI. You can use the Jackson module or provide your own codec implementation.
- Default codec discovery uses
ServiceLoaderviaStreamCodecProvider. - To avoid Jackson, ship your own module that implements
JsonCodecandStreamCodecProviderforapplication/json.
RocksDB JNI publishes OS-specific classifier jars (native binaries).
durable-streams-server-core uses RocksDB for persistent metadata when you use file-based stores (for example, BlockingFileStreamStore defaults to RocksDB metadata).
To keep Maven Central artifacts portable, this project does not publish an OS-specific rocksdbjni classifier dependency transitively.
You must add the correct classifier at runtime.
Gradle:
dependencies {
implementation("io.github.clickin:durable-streams-server-core:<version>")
runtimeOnly("org.rocksdb:rocksdbjni:<rocksdb-version>:linux64") // or win64/osx
}Maven:
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version><!-- rocksdb-version --></version>
<classifier>linux64</classifier>
<scope>runtime</scope>
</dependency>When building this repository, the classifier used for tests/examples is selected as:
- Default: detect current OS
- Override with Gradle property:
-ProcksdbClassifier=win64 - Override with env var:
ROCKSDB_CLASSIFIER=win64
Common classifiers: win64, linux64, osx.
Basic usage:
import io.github.clickin.client.AppendRequest;
import io.github.clickin.client.CreateRequest;
import io.github.clickin.client.DurableStreamsClient;
import io.github.clickin.client.ReadRequest;
import io.github.clickin.core.Offset;
DurableStreamsClient client = DurableStreamsClient.create(); // uses JDK internal HTTP client by default
client.create(new CreateRequest(streamUrl, "application/json", null, null));
client.append(new AppendRequest(streamUrl, "application/json", null, dataStream));
var read = client.readCatchUp(new ReadRequest(streamUrl, Offset.beginning(), null));Custom transport (no ServiceLoader, GraalVM-friendly):
import io.github.clickin.client.DurableStreamsClient;
import io.github.clickin.client.DurableStreamsTransport;
import io.github.clickin.client.TransportRequest;
import io.github.clickin.client.TransportResponse;
DurableStreamsTransport transport = new MyHttpTransport();
DurableStreamsClient client = DurableStreamsClient.builder()
.transport(transport)
.build();These examples wire the protocol handler into common frameworks. They follow the same handler flow used in durable-streams-conformance-runner.
import io.github.clickin.server.core.CachePolicy;
import io.github.clickin.server.core.DurableStreamsHandler;
import io.github.clickin.server.core.HttpMethod;
import io.github.clickin.server.core.InMemoryStreamStore;
import io.github.clickin.server.core.ResponseBody;
import io.github.clickin.server.core.ServerRequest;
import io.github.clickin.server.core.ServerResponse;
import io.github.clickin.server.core.SseFrame;
import io.github.clickin.server.spi.CursorPolicy;
import io.javalin.Javalin;
import io.javalin.http.Context;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
DurableStreamsHandler handler = DurableStreamsHandler.builder(new InMemoryStreamStore())
.cursorPolicy(new CursorPolicy(Clock.systemUTC()))
.cachePolicy(CachePolicy.defaultPrivate())
.longPollTimeout(Duration.ofSeconds(25))
.sseMaxDuration(Duration.ofSeconds(60))
.build();
Javalin app = Javalin.create();
app.get("/*", ctx -> handle(ctx, handler));
app.post("/*", ctx -> handle(ctx, handler));
app.put("/*", ctx -> handle(ctx, handler));
app.delete("/*", ctx -> handle(ctx, handler));
app.head("/*", ctx -> handle(ctx, handler));
app.start(4437);
static void handle(Context ctx, DurableStreamsHandler handler) throws Exception {
ServerRequest request = new ServerRequest(
HttpMethod.valueOf(ctx.method().name()),
URI.create(ctx.fullUrl()),
toHeaders(ctx),
bodyOrNull(ctx)
);
ServerResponse response = handler.handle(request);
ctx.status(response.status());
for (Map.Entry<String, List<String>> e : response.headers().entrySet()) {
for (String v : e.getValue()) {
ctx.header(e.getKey(), v);
}
}
if (response.body() instanceof ResponseBody.Empty) {
return;
}
if (response.body() instanceof ResponseBody.Bytes bytes) {
ctx.result(bytes.bytes());
return;
}
if (response.body() instanceof ResponseBody.Sse sse) {
ctx.contentType("text/event-stream");
writeSse(ctx, sse.publisher());
}
}
static Map<String, List<String>> toHeaders(Context ctx) {
Map<String, List<String>> headers = new LinkedHashMap<>();
for (Map.Entry<String, String> e : ctx.headerMap().entrySet()) {
headers.put(e.getKey(), List.of(e.getValue()));
}
return headers;
}
static java.io.InputStream bodyOrNull(Context ctx) {
long len = ctx.req().getContentLengthLong();
return len <= 0 ? null : ctx.bodyInputStream();
}
static void writeSse(Context ctx, Flow.Publisher<SseFrame> publisher) throws Exception {
OutputStream out = ctx.res().getOutputStream();
CountDownLatch done = new CountDownLatch(1);
publisher.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(SseFrame item) {
try {
out.write(item.render().getBytes(StandardCharsets.UTF_8));
out.flush();
} catch (Exception e) {
subscription.cancel();
}
}
@Override
public void onError(Throwable throwable) {
done.countDown();
}
@Override
public void onComplete() {
done.countDown();
}
});
done.await();
}package com.example.durable.streams.webmvc;
import io.github.clickin.server.core.CachePolicy;
import io.github.clickin.server.core.DurableStreamsHandler;
import io.github.clickin.server.core.InMemoryStreamStore;
import io.github.clickin.server.spi.CursorPolicy;
import io.github.clickin.servlet.DurableStreamsServlet;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
@RestController
public class DurableStreamController {
private final DurableStreamsHandler handler = DurableStreamsHandler.builder(new InMemoryStreamStore())
.cursorPolicy(new CursorPolicy(Clock.systemUTC()))
.cachePolicy(CachePolicy.defaultPrivate())
.longPollTimeout(Duration.ofSeconds(25))
.sseMaxDuration(Duration.ofSeconds(60))
.build();
private final DurableStreamsServlet adapter = new DurableStreamsServlet(handler);
@RequestMapping("/**")
public void handleDurableStream(ServletRequest request, ServletResponse response) throws ServletException, IOException {
adapter.service(request, response);
}
}package com.example.durable.streams.webmvc.webflux;
import io.github.clickin.server.core.CachePolicy;
import io.github.clickin.server.core.DurableStreamsHandler;
import io.github.clickin.server.core.InMemoryStreamStore;
import io.github.clickin.server.spi.CursorPolicy;
import io.github.clickin.spring.webflux.DurableStreamsWebFluxAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import java.time.Clock;
import java.time.Duration;
@Configuration
public class RouterConfig {
@Bean
public DurableStreamsHandler durableStreamsHandler() {
return DurableStreamsHandler.builder(new InMemoryStreamStore())
.cursorPolicy(new CursorPolicy(Clock.systemUTC()))
.cachePolicy(CachePolicy.defaultPrivate())
.longPollTimeout(Duration.ofSeconds(25))
.sseMaxDuration(Duration.ofSeconds(60))
.build();
}
@Bean
public DurableStreamsWebFluxAdapter durableStreamsWebFluxAdapter(DurableStreamsHandler handler) {
return new DurableStreamsWebFluxAdapter(handler);
}
@Bean
public RouterFunction<ServerResponse> durableStreamsRoutes(DurableStreamsWebFluxAdapter adapter) {
return RouterFunctions.route()
.add(RouterFunctions.route(req -> true, adapter::handle))
.build();
}
}import io.github.clickin.micronaut.DurableStreamsMicronautAdapter;
import io.github.clickin.server.core.CachePolicy;
import io.github.clickin.server.core.DurableStreamsHandler;
import io.github.clickin.server.core.InMemoryStreamStore;
import io.github.clickin.server.spi.CursorPolicy;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Consumes;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.PathVariable;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Put;
import io.micronaut.http.annotation.Produces;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import java.time.Clock;
import java.time.Duration;
@ExecuteOn(TaskExecutors.BLOCKING)
@Produces(MediaType.ALL)
@Consumes(MediaType.ALL)
@Controller("/")
final class DurableStreamsController {
private final DurableStreamsHandler handler = DurableStreamsHandler.builder(new InMemoryStreamStore())
.cursorPolicy(new CursorPolicy(Clock.systemUTC()))
.cachePolicy(CachePolicy.defaultPrivate())
.longPollTimeout(Duration.ofSeconds(25))
.sseMaxDuration(Duration.ofSeconds(60))
.build();
@Get("/{+path}")
HttpResponse<?> get(@PathVariable("path") String path, HttpRequest<byte[]> request) {
return handle(request);
}
@Post("/{+path}")
HttpResponse<?> post(@PathVariable("path") String path, HttpRequest<byte[]> request) {
return handle(request);
}
@Put("/{+path}")
HttpResponse<?> put(@PathVariable("path") String path, HttpRequest<byte[]> request) {
return handle(request);
}
@Delete("/{+path}")
HttpResponse<?> delete(@PathVariable("path") String path, HttpRequest<byte[]> request) {
return handle(request);
}
private HttpResponse<?> handle(HttpRequest<byte[]> request) {
return DurableStreamsMicronautAdapter.handle(request, handler);
}
}import io.github.clickin.quarkus.DurableStreamsQuarkusAdapter;
import io.github.clickin.server.core.CachePolicy;
import io.github.clickin.server.core.DurableStreamsHandler;
import io.github.clickin.server.core.HttpMethod;
import io.github.clickin.server.core.InMemoryStreamStore;
import io.github.clickin.server.spi.CursorPolicy;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HEAD;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriInfo;
import java.time.Clock;
import java.time.Duration;
@Path("/")
public class DurableStreamsResource {
private final DurableStreamsHandler handler = DurableStreamsHandler.builder(new InMemoryStreamStore())
.cursorPolicy(new CursorPolicy(Clock.systemUTC()))
.cachePolicy(CachePolicy.defaultPrivate())
.longPollTimeout(Duration.ofSeconds(25))
.sseMaxDuration(Duration.ofSeconds(60))
.build();
@GET
@Path("{path:.*}")
public Response get(@Context UriInfo uriInfo, @Context HttpHeaders headers) {
return DurableStreamsQuarkusAdapter.handle(HttpMethod.GET, uriInfo, headers, null, handler);
}
@POST
@Path("{path:.*}")
public Response post(@Context UriInfo uriInfo, @Context HttpHeaders headers, byte[] body) {
return DurableStreamsQuarkusAdapter.handle(HttpMethod.POST, uriInfo, headers, body, handler);
}
@PUT
@Path("{path:.*}")
public Response put(@Context UriInfo uriInfo, @Context HttpHeaders headers, byte[] body) {
return DurableStreamsQuarkusAdapter.handle(HttpMethod.PUT, uriInfo, headers, body, handler);
}
@DELETE
@Path("{path:.*}")
public Response delete(@Context UriInfo uriInfo, @Context HttpHeaders headers) {
return DurableStreamsQuarkusAdapter.handle(HttpMethod.DELETE, uriInfo, headers, null, handler);
}
@HEAD
@Path("{path:.*}")
public Response head(@Context UriInfo uriInfo, @Context HttpHeaders headers) {
return DurableStreamsQuarkusAdapter.handle(HttpMethod.HEAD, uriInfo, headers, null, handler);
}
}import io.github.clickin.ktor.DurableStreamsKtorAdapter
import io.github.clickin.server.core.CachePolicy
import io.github.clickin.server.core.DurableStreamsHandler
import io.github.clickin.server.core.InMemoryStreamStore
import io.github.clickin.server.spi.CursorPolicy
import io.ktor.server.application.Application
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
import io.ktor.server.routing.handle
import io.ktor.server.routing.route
import io.ktor.server.routing.routing
import java.time.Clock
import java.time.Duration
fun main() {
embeddedServer(Netty, port = 4435, module = Application::module).start(wait = true)
}
fun Application.module() {
val handler = DurableStreamsHandler.builder(InMemoryStreamStore())
.cursorPolicy(CursorPolicy(Clock.systemUTC()))
.cachePolicy(CachePolicy.defaultPrivate())
.longPollTimeout(Duration.ofSeconds(25))
.sseMaxDuration(Duration.ofSeconds(60))
.build()
routing {
route("{path...}") {
handle {
DurableStreamsKtorAdapter.handle(call, handler)
}
}
}
}Gradle dependencies:
dependencies {
implementation("org.reactivestreams:reactive-streams-flow-adapters:1.0.2")
implementation("io.projectreactor:reactor-core:3.7.1")
}Usage:
import io.github.clickin.client.DurableStreamsClient;
import io.github.clickin.client.LiveLongPollRequest;
import io.github.clickin.core.StreamEvent;
import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
DurableStreamsClient client = DurableStreamsClient.create();
Flow.Publisher<StreamEvent> pub = client.subscribeLongPoll(request);
Flux<StreamEvent> flux = Flux.from(FlowAdapters.toPublisher(pub));Gradle dependencies:
dependencies {
implementation("org.reactivestreams:reactive-streams-flow-adapters:1.0.2")
implementation("io.reactivex.rxjava3:rxjava:3.1.9")
}Usage:
import io.github.clickin.client.DurableStreamsClient;
import io.github.clickin.core.StreamEvent;
import io.reactivex.rxjava3.core.Flowable;
import org.reactivestreams.FlowAdapters;
DurableStreamsClient client = DurableStreamsClient.create();
Flow.Publisher<StreamEvent> pub = client.subscribeLongPoll(request);
Flowable<StreamEvent> flowable = Flowable.fromPublisher(FlowAdapters.toPublisher(pub));Gradle dependencies:
dependencies {
implementation("org.reactivestreams:reactive-streams-flow-adapters:1.0.2")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.9.0")
}Usage:
import io.github.clickin.client.DurableStreamsClient
import io.github.clickin.core.StreamEvent
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.reactivestreams.FlowAdapters
val client = DurableStreamsClient.create()
val pub = client.subscribeLongPoll(request)
val flow: Flow<StreamEvent> = FlowAdapters.toPublisher(pub).asFlow()