HttpStartRangeInputStreamProvider.java
/*
* Copyright (C) 2021 B3Partners B.V.
*
* SPDX-License-Identifier: MIT
*
*/
package nl.b3p.brmo.util.http;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
import nl.b3p.brmo.util.ResumingInputStream;
/**
* Provides a stream reading a HTTP entity starting at a specified position until the end of the
* entity using <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range">HTTP Range
* requests</a>.
*
* <p>For any requests after the first, an <a
* href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Range">If-Range</a> header is
* sent to guarantee that the stored resource has not been modified since the last fragment has been
* received.
*
* <p>If the HTTP server does not answer range requests with the "206 Partial Content" response an
* error is thrown.
*
* @author Matthijs Laan
*/
public class HttpStartRangeInputStreamProvider
implements ResumingInputStream.StreamAtStartPositionProvider {
private final URI uri;
private final HttpClientWrapper httpClientWrapper;
private final Long contentLength;
private boolean first = true;
private String ifRange;
private String acceptRanges;
private boolean assumeAcceptsRanges = false;
public HttpStartRangeInputStreamProvider(URI uri) {
this(uri, HttpClientWrappers.getDefault());
}
public HttpStartRangeInputStreamProvider(URI uri, HttpClientWrapper httpClientWrapper) {
this(uri, httpClientWrapper, null);
}
public HttpStartRangeInputStreamProvider(
URI uri, HttpClientWrapper httpClientWrapper, Long contentLength) {
this.uri = uri;
this.httpClientWrapper = httpClientWrapper;
this.contentLength = contentLength;
}
public HttpStartRangeInputStreamProvider assumeAcceptsRanges(boolean assumeAcceptsRanges) {
this.assumeAcceptsRanges = assumeAcceptsRanges;
return this;
}
public boolean isAssumeAcceptsRanges() {
return assumeAcceptsRanges;
}
public void setAssumeAcceptsRanges(boolean assumeAcceptsRanges) {
this.assumeAcceptsRanges = assumeAcceptsRanges;
}
public Long getContentLength() {
return contentLength;
}
@Override
public InputStream get(long position, int totalRetries, Exception causeForRetry)
throws IOException {
List<String[]> headers = new ArrayList<>();
if (position > 0 && !first) {
if (!assumeAcceptsRanges && !"bytes".equals(acceptRanges)) {
throw new IOException(
"Exception reading from HTTP server and resume not supported", causeForRetry);
}
if (ifRange == null) {
throw new IOException(
"Exception reading from HTTP server, cannot resume HTTP request reliably: no strong ETag or Last-Modified",
causeForRetry);
}
headers.add(new String[] {"If-Range", ifRange});
}
if (position > 0) {
headers.add(
new String[] {
"Range", "bytes=" + position + "-" + (contentLength != null ? contentLength - 1 : "")
});
}
first = false;
HttpResponseWrapper response;
try {
response =
httpClientWrapper.request(
uri,
headers.stream()
.flatMap(header -> Stream.of(header[0], header[1]))
.toArray(String[]::new));
} catch (InterruptedException e) {
throw new IOException(e);
}
if (position > 0) {
if (response.getStatusCode() != 206) {
throw new RuntimeException(
"Error retrying HTTP request at position "
+ position
+ ": expected 206 response status but got "
+ response.getStatusCode());
}
} else if (response.getStatusCode() != 200) {
throw new RuntimeException("HTTP status code: " + response.getStatusCode());
}
String lastModified = response.getHeader("Last-Modified");
String eTag = response.getHeader("ETag");
if (eTag != null) {
// Use strong ETag only
if (!eTag.startsWith("W/")) {
// Accept any value, also if not surrounded by DQUOTE as required by spec
// DQUOTE should also be included in If-Range header
ifRange = eTag;
}
} else {
ifRange = lastModified;
}
acceptRanges = response.getHeader("Accept-Ranges");
return response.getResponseBody();
}
}