Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/release-notes/11473-harvesting-client-improvements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
A setting has been added for configuring sleep intervals between OAI calls for specific harvesting clients. Making it possible to harvest uninterrupted from servers enforcing rate limit policies. See the configuration guide for details. Additionally, this release fixes a problem with harvesting from DataCite OAI-PMH where initial, long-running harvests were failing on sets with large numbers of records.

## New Database Settings

- :HarvestingClientCallRateLimit
15 changes: 15 additions & 0 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4672,6 +4672,21 @@ Examples:

``curl -X PUT -d '{"default":"0", "CSV":"268435456"}' http://localhost:8080/api/admin/settings/:TabularIngestSizeLimit``

.. _:HarvestingClientCallRateLimit:

:HarvestingClientCallRateLimit
++++++++++++++++++++++++++++++

This setting allows configuring sleep intervals between OAI calls for specific harvesting clients. Which makes it possible to harvest from servers that enforce rate limits.

The setting value is a serialized JSON object mapping client names to the specified intervals in fractional seconds. It is also possible to set a universal default interval for all harvesting clients on the instance (in a somewhat unlikely use case where this may be practically necessary).

In the following example, the harvester is instructed to sleep for 900 milliseconds between calls when running the client named ``harvarddv``, and to default to zero otherwise:

``curl -X PUT -d "{\"harvarddv\": 0.9, \"default\": 0}" "http://localhost:8080/api/admin/settings/:HarvestingClientCallRateLimit"``

Please note that the default in the example above is there for illustrative purposes and is otherwise redundant, since no sleep interval is the default behavior anyway.

.. _:ZipUploadFilesLimit:

:ZipUploadFilesLimit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import edu.harvard.iq.dataverse.harvest.client.oai.OaiHandler;
import edu.harvard.iq.dataverse.harvest.client.oai.OaiHandlerException;
import edu.harvard.iq.dataverse.search.IndexServiceBean;
import edu.harvard.iq.dataverse.util.SystemConfig;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.InputStream;
Expand Down Expand Up @@ -85,6 +86,8 @@ public class HarvesterServiceBean {
EjbDataverseEngine engineService;
@EJB
IndexServiceBean indexService;
@EJB
SystemConfig systemConfig;

private static final Logger logger = Logger.getLogger("edu.harvard.iq.dataverse.harvest.client.HarvesterServiceBean");
private static final SimpleDateFormat logFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm-ss");
Expand Down Expand Up @@ -270,6 +273,8 @@ private void harvestOAI(DataverseRequest dataverseRequest, HarvestingClient harv
}

private void harvestOAIviaListIdentifiers(OaiHandler oaiHandler, DataverseRequest dataverseRequest, HarvestingClient harvestingClient, HttpClient httpClient, List<String> failedIdentifiers, List<String> deletedIdentifiers, List<Long> harvestedDatasetIds, Logger harvesterLogger, PrintWriter importCleanupLog) throws OaiHandlerException, StopHarvestException {
int sleepInterval = lookupSleepInterval(harvestingClient.getName());

for (Iterator<Header> idIter = oaiHandler.runListIdentifiers(); idIter.hasNext();) {
// Before each iteration, check if this harvesting job needs to be aborted:
if (checkIfStoppingJob(harvestingClient)) {
Expand All @@ -291,6 +296,8 @@ private void harvestOAIviaListIdentifiers(OaiHandler oaiHandler, DataverseReques

MutableBoolean getRecordErrorOccurred = new MutableBoolean(false);

sleepIfNeeded(sleepInterval);

// Retrieve and process this record with a separate GetRecord call:
Long datasetId = processRecord(dataverseRequest, harvesterLogger, importCleanupLog, oaiHandler, identifier, getRecordErrorOccurred, deletedIdentifiers, dateStamp, httpClient);

Expand All @@ -307,6 +314,22 @@ private void harvestOAIviaListIdentifiers(OaiHandler oaiHandler, DataverseReques
}

private void harvestOAIviaListRecords(OaiHandler oaiHandler, DataverseRequest dataverseRequest, HarvestingClient harvestingClient, HttpClient httpClient, List<String> failedIdentifiers, List<String> deletedIdentifiers, List<Long> harvestedDatasetIds, Logger harvesterLogger, PrintWriter importCleanupLog) throws OaiHandlerException, StopHarvestException {
/*
* It is *exceptionally* unlikely that anyone will ever run into issues
* with server rate limits when harvesting using the ListRecords method.
* Since only one call needs to me be made in order to import multiple
* datasets. The number of records served is nominally arbitrary and
* varies from server to server. However, most known OAI servers will
* serve 50 to 100 records at a time. If a server has a rate limit policy
* of 300 calls/5 min. and their ListRecords serves 50 records per call,
* Dataverse will need to import 50 datasets per second in order to run
* afoul of the limit. Even with an empty database, Dataverse generally
* doesn't work that fast.
* But, it doesn't hurt to make it possible to define the interval
* regardless, in case it is called for in some exotic scenario.
**/
int sleepInterval = lookupSleepInterval(harvestingClient.getName());

for (Iterator<Record> idIter = oaiHandler.runListRecords(); idIter.hasNext();) {
// Before each iteration, check if this harvesting job needs to be aborted:
if (checkIfStoppingJob(harvestingClient)) {
Expand Down Expand Up @@ -375,9 +398,33 @@ private void harvestOAIviaListRecords(OaiHandler oaiHandler, DataverseRequest da
//can be uncommented out for testing failure handling:
//throw new IOException("Exception occured, stopping harvest");
}

sleepIfNeeded(sleepInterval);
}
}


private int lookupSleepInterval(String clientName) {
int sleepMilliseconds = 0;
float clientIntervalValue = systemConfig.getHarvestingClientRequestInterval(clientName);

sleepMilliseconds = (int) (clientIntervalValue * 1000);
logger.info("Sleep interval in milliseconds: " + sleepMilliseconds);

return sleepMilliseconds;
}

private void sleepIfNeeded(int sleepInterval) {
if (sleepInterval > 0) {
logger.fine("Sleeping for " + sleepInterval + " milliseconds...");
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException iex) {
logger.warning("InterruptedException trying to sleep for " + sleepInterval + " milliseconds");
}
}

}

private Long processRecord(DataverseRequest dataverseRequest, Logger hdLogger, PrintWriter importCleanupLog, OaiHandler oaiHandler, String identifier, MutableBoolean recordErrorOccurred, List<String> deletedIdentifiers, Date dateStamp, HttpClient httpClient) {
String errMessage = null;
Dataset harvestedDataset = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.http.HttpClient;
import java.time.Duration;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -147,14 +148,21 @@ public ServiceProvider getServiceProvider() throws OaiHandlerException {

context.withBaseUrl(baseOaiUrl);
context.withGranularity(Granularity.Second);

JdkHttpOaiClient.Builder xoaiClientBuilder = JdkHttpOaiClient.newBuilder().withBaseUrl(getBaseOaiUrl());

// Note that we are defaulting to HTTP/1 (JDK HttpClient defaults to
// HTTP/2 otherwise. By nature of OAI-PMH, HTTP/2 offers no practical
// benefit. However, long-running harvests from servers supporting
// HTTP/2 can fail due to a bug in JDK 17 (HttpClient does not
// properly handle GoAway stream responses apparently). For example,
// harvests from DataCite OAI-PMH were failing at 1 hour mark.
JdkHttpOaiClient.Builder xoaiClientBuilder = (new JdkHttpOaiClient.JdkHttpBuilder(HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1))).withBaseUrl(getBaseOaiUrl());
if (getCustomHeaders() != null) {
for (String headerName : getCustomHeaders().keySet()) {
logger.fine("adding custom header; name: "+headerName+", value: "+getCustomHeaders().get(headerName));
}
xoaiClientBuilder = xoaiClientBuilder.withCustomHeaders(getCustomHeaders());
}
xoaiClientBuilder = xoaiClientBuilder.withConnectTimeout(Duration.ofSeconds(180));
context.withOAIClient(xoaiClientBuilder.build());
context.withSaveUnparsedMetadata();
serviceProvider = new ServiceProvider(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ Whether Harvesting (OAI) service is enabled
FileCategories,
CreateDataFilesMaxErrorsToDisplay,

ContactFeedbackMessageSizeLimit,
ContactFeedbackMessageSizeLimit,
//Experimental setting to allow connecting to a GET external search service expecting a GET request with query parameter mirroring the search API query parameters (without search_service)
GetExternalSearchUrl,
//Experimental setting to provide a display name for the GET external search service
Expand All @@ -779,6 +779,8 @@ Whether Harvesting (OAI) service is enabled
COARNotifyRelationshipAnnouncementTriggerFields,
// JSON specification of the targets to send announcements to
COARNotifyRelationshipAnnouncementTargets,
// Configurable delay between harvesting calls, when required to avoid triggering rate limits
HarvestingClientCallRateLimit
;

@Override
Expand Down
79 changes: 79 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/util/SystemConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class SystemConfig {
* token is valid ({@link #getMinutesUntilPasswordResetTokenExpires}).
*/
private static final String PASSWORD_RESET_TIMEOUT_IN_MINUTES = "dataverse.auth.password-reset-timeout-in-minutes";

public static final String DEFAULT_KEY = "default";

/**
* The default number of datafiles that we allow to be created through
Expand Down Expand Up @@ -601,6 +603,7 @@ public long getTabularIngestSizeLimit() {
* or the default size limit if no format-specific limit is found or its name is invalid (null, blank, ...).
* -1 = unlimited if not set, 0 if disabled or invalid, some long number of bytes otherwise
*/

public long getTabularIngestSizeLimit(String formatName) {
if (formatName != null && !formatName.isBlank()) {
// We convert to lowercase so it doesn't matter which variant someone uses in the JSON config
Expand All @@ -610,6 +613,82 @@ public long getTabularIngestSizeLimit(String formatName) {
return getTabularIngestSizeLimit();
}

public Map<String, Float> getHarvestingClientRequestIntervals() {
String settingString = settingsService.getValueForKey(SettingsServiceBean.Key.HarvestingClientCallRateLimit);
if (settingString != null) {
// Case A: the setting is using JSON to support multiple clients
if (settingString.trim().startsWith("{")) {
try (JsonReader reader = Json.createReader(new StringReader(settingString))) {
JsonObject delays = reader.readObject();

Map<String, Float> limitsMap = new HashMap<>();
// We add the default in case the JSON does not contain the default (which is optional).
limitsMap.put(DEFAULT_KEY, 0F);

for (Map.Entry<String, JsonValue> clientEntry : delays.entrySet()) {
String clientName = clientEntry.getKey();
String lowercaseClientName = clientName.toLowerCase();

try {
JsonValue value = clientEntry.getValue();
float delayInterval;

// We want to be able to use either numbers or string values, so detect which one it is.
// This is necessary as we need to tell the JSON parser what to do, it doesn't automatically handle this for us.
if (value.getValueType() == JsonValue.ValueType.STRING) {
delayInterval = Float.parseFloat(delays.getString(clientName));
} else if (value.getValueType() == JsonValue.ValueType.NUMBER) {
// Will throw if not a valid float number!
delayInterval = delays.getJsonNumber(clientName).numberValue().floatValue(); //.doubleValue();
} else {
logger.warning(() -> "Invalid value type for client " + clientName + ": expected string or number");
logger.warning("Disabling all harvesting client delay intervals completely until fixed!");
return Map.of(DEFAULT_KEY, 0F);
}

limitsMap.put(lowercaseClientName, delayInterval);
} catch (NumberFormatException nfe) {
logger.warning(() -> "Could not convert " + SettingsServiceBean.Key.HarvestingClientCallRateLimit + " entry to float for client " + clientName + " (not a valid number)");
logger.warning("Disabling all harvesting client delay intervals completely until fixed!");
return Map.of(DEFAULT_KEY, 0F);
} catch (ArithmeticException ae) {
logger.warning(() -> "Number too large, or otherwise invalid for client " + clientName);
logger.warning("Disabling all harvesting client delay intervals completely until fixed!");
return Map.of(DEFAULT_KEY, 0F);
}
}

return Collections.unmodifiableMap(limitsMap);
} catch (JsonParsingException e) {
logger.warning(() -> "Invalid " + SettingsServiceBean.Key.HarvestingClientCallRateLimit + " option found, cannot parse JSON: " + e.getMessage());
logger.warning("Disabling all harvesting client delay intervals completely until fixed!");
return Map.of(DEFAULT_KEY, 0F);
}
// Case B: It might be just a simple float, providing a default for all clients.
} else {
try {
float delayInterval = Float.valueOf(settingString);
return Map.of(DEFAULT_KEY, delayInterval);
} catch (NumberFormatException nfe) {
logger.warning(() -> "Could not convert " + SettingsServiceBean.Key.HarvestingClientCallRateLimit + " to float: " + nfe.getMessage());
logger.warning("Disabling all harvesting client delay intervals completely until fixed!");
return Map.of(DEFAULT_KEY, 0F);
}
}
}
// Default is not to limit at all
return Map.of(DEFAULT_KEY, 0F);
}

public float getHarvestingClientRequestInterval(String clientName) {
if (clientName != null && !clientName.isBlank()) {
// We convert to lowercase so it doesn't matter which variant someone uses in the JSON config
String convertedClientName = clientName.toLowerCase();
return getHarvestingClientRequestIntervals().getOrDefault(convertedClientName, getHarvestingClientRequestIntervals().get(DEFAULT_KEY));
}
return getHarvestingClientRequestIntervals().get(DEFAULT_KEY);
}

public boolean isOAIServerEnabled() {
boolean defaultResponse = false;
return settingsService.isTrueForKey(SettingsServiceBean.Key.OAIServerEnabled, defaultResponse);
Expand Down
49 changes: 49 additions & 0 deletions src/test/java/edu/harvard/iq/dataverse/util/SystemConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,55 @@ void testGetTabularIngestSizeLimitsWithSingleInvalidValue() {
assertEquals(1, result.size());
assertEquals(0L, (long) result.get(SystemConfig.TABULAR_INGEST_SIZE_LIMITS_DEFAULT_KEY));
}

@Test
public void testGetHarvestingClientRequestIntervals() {

// Test with setting not set will return default 0.0
// given
doReturn(null).when(settingsService).getValueForKey(SettingsServiceBean.Key.HarvestingClientCallRateLimit);
// when
Map<String, Float> result = systemConfig.getHarvestingClientRequestIntervals();
// then
assertEquals(1, result.size());
assertEquals(0, result.get(SystemConfig.DEFAULT_KEY));

// Test with good client
String value = "{\"harvarddv\": 0.9, \"default\": 0.0}";
// given
doReturn(value).when(settingsService).getValueForKey(SettingsServiceBean.Key.HarvestingClientCallRateLimit);
// when
result = systemConfig.getHarvestingClientRequestIntervals();
// then
assertEquals(2, result.size());
assertTrue(result.containsKey("harvarddv"));
assertTrue(result.containsKey("default"));
assertEquals(0.9F, systemConfig.getHarvestingClientRequestInterval("harvarddv"));
assertEquals(0.0F, systemConfig.getHarvestingClientRequestInterval("notFoundSoDefault"));

// Test with missing default will create default 0.0
value = "{\"harvarddv\": 0.9}";
// given
doReturn(value).when(settingsService).getValueForKey(SettingsServiceBean.Key.HarvestingClientCallRateLimit);
// when
result = systemConfig.getHarvestingClientRequestIntervals();
// then
assertEquals(2, result.size());
assertTrue(result.containsKey("default"));
assertEquals(0.0F, systemConfig.getHarvestingClientRequestInterval("default"));

// Test with invalid JSON (value as string instead of float) will default setting to default 0.0
value = "{\"harvarddv1\": 0.9, \"harvarddv2\": \"string\"}";
// given
doReturn(value).when(settingsService).getValueForKey(SettingsServiceBean.Key.HarvestingClientCallRateLimit);
// when
result = systemConfig.getHarvestingClientRequestIntervals();
// then
assertEquals(1, result.size());
assertTrue(result.containsKey("default"));
assertTrue(!result.containsKey("harvarddv1"));
assertTrue(!result.containsKey("harvarddv2"));
}

@ParameterizedTest
@ValueSource(strings = {"", "{ invalid: }"})
Expand Down