KVKMutatieserviceProcesRunner.java
package nl.b3p.brmo.service.scanner;
import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.*;
import static nl.b3p.brmo.persistence.staging.KVKMutatieserviceProces.*;
import static nl.b3p.gds2.GDS2Util.getDatumTijd;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.Set;
import javax.persistence.TransactionRequiredException;
import javax.persistence.Transient;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.persistence.staging.ClobElement;
import nl.b3p.brmo.persistence.staging.KVKMutatieserviceProces;
import nl.b3p.brmo.persistence.staging.NHRInschrijving;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.text.StringEscapeUtils;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.stripesstuff.stripersist.Stripersist;
public class KVKMutatieserviceProcesRunner extends AbstractExecutableProces {
private static final Log LOG = LogFactory.getLog(KVKMutatieserviceProcesRunner.class);
private final KVKMutatieserviceProces config;
private int pages;
private String ophalenVanaf = "";
private String ophalenTot = "";
private final Set<String> kvkNummers = new HashSet<>();
@Transient private ProgressUpdateListener listener;
public static Log getLog() {
return LOG;
}
public KVKMutatieserviceProcesRunner(KVKMutatieserviceProces config) {
this.config = config;
}
@Override
public void execute() {
this.execute(
new ProgressUpdateListener() {
@Override
public void total(long total) {}
@Override
public void progress(long progress) {}
@Override
public void exception(Throwable t) {
LOG.error(t);
}
@Override
public void updateStatus(String status) {}
@Override
public void addLog(String log) {
KVKMutatieserviceProcesRunner.LOG.info(log);
config.updateSamenvattingEnLogfile(log);
Stripersist.getEntityManager().merge(config);
}
});
}
@Override
public void execute(ProgressUpdateListener listener) {
this.listener = listener;
listener.updateStatus("KVK Mutatieservice proces gestart op %tc.".formatted(new Date()));
if (config.getStatus().equals(WAITING) || config.getStatus().equals(ERROR)) {
if (config.getStatus().equals(ERROR)) {
listener.addLog("Vorige run is met ERROR status afgerond, opnieuw proberen");
}
try {
listener.updateStatus(PROCESSING.toString());
config.setStatus(PROCESSING);
Stripersist.getEntityManager().merge(config);
LOG.debug(
"Gebruik %s als apikey en %s als abonnementId."
.formatted(config.getConfig().get(APIKEY), config.getConfig().get(ABONNEMENT_ID)));
// datum tijd parsen/instellen
GregorianCalendar vanaf;
GregorianCalendar tot =
getDatumTijd(ClobElement.nullSafeGet(this.config.getConfig().get(TOT)));
String sVanaf = ClobElement.nullSafeGet(this.config.getConfig().get(VANAF));
if (tot != null && ("-1".equals(sVanaf) || "-2".equals(sVanaf) || "-3".equals(sVanaf))) {
vanaf =
getDatumTijd(
ClobElement.nullSafeGet(this.config.getConfig().get(TOT)),
Integer.parseInt(sVanaf));
} else {
vanaf = getDatumTijd(sVanaf);
}
ophalenVanaf =
vanaf == null ? "" : vanaf.toZonedDateTime().format(DateTimeFormatter.ISO_INSTANT);
ophalenTot = tot == null ? "" : tot.toZonedDateTime().format(DateTimeFormatter.ISO_INSTANT);
getPageFromAPI(/*start met p.1*/ 1);
config.setSamenvatting(
"Er zijn %d pagina's van de KVK Mutatieservice opgehaald en verwerkt."
.formatted(pages));
config.setStatus(WAITING);
listener.updateStatus("KVK Mutatieservice proces voltooid op %tc.".formatted(new Date()));
} catch (Exception e) {
listener.exception(e);
listener.updateStatus(ERROR.toString());
listener.addLog(e.getMessage());
config.setStatus(ERROR);
} finally {
config.setLastrun(new Date());
Stripersist.getEntityManager().merge(config);
Stripersist.getEntityManager().flush();
}
} else {
// PROCESSING
// Als het proces al bezig is of succesvol is afgerond, doen we niets
listener.addLog("KVK Mutatieservice proces is al bezig, geen actie nodig.");
LOG.info("KVK Mutatieservice proces is al bezig, geen actie nodig.");
}
}
private void getPageFromAPI(int pagina)
throws JSONException,
IllegalArgumentException,
TransactionRequiredException,
IOException,
BrmoException {
final String API_URL_PARAMS = "%s%s?pagina=%d&vanaf=%s&tot=%s";
String url =
String.format(
API_URL_PARAMS,
config.getConfig().get(APIURL).getValue(),
config.getConfig().get(ABONNEMENT_ID).getValue(),
pagina,
this.ophalenVanaf,
this.ophalenTot);
LOG.debug("Requesting KVK Mutatieservice API: " + url);
listener.progress(pagina);
listener.updateStatus(
"Ophalen van pagina %d van %d van KVK mutaties.".formatted(pagina, pages));
LOG.info("Ophalen van pagina %d van %d van KVK mutaties.".formatted(pagina, pages));
try {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request =
HttpRequest.newBuilder()
.uri(URI.create(url))
.header(APIKEY, config.getConfig().get(APIKEY).getValue())
.header("Accept", "application/json")
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
parseApiResponse(response.body());
} else {
// de API heeft een foutmelding teruggegeven, maar dat kan HTML zijn, dus die moet
// escaped/gestript worden, zie ook KVK TopDesk melding "M2507 3520"
String errorMsg =
"Ophalen van mutatiedata is mislukt met statuscode %s en melding: %s"
.formatted(
response.statusCode(),
StringEscapeUtils.escapeHtml4(response.body().replaceAll("<[^>]*>", "")));
listener.updateStatus(ERROR.toString());
config.setStatus(ERROR);
throw new BrmoException(errorMsg);
}
} catch (InterruptedException | JSONException e) {
LOG.error("Fout tijdens benaderen KVK Mutatieservice API data", e);
listener.addLog("Fout tijdens benaderen KVK Mutatieservice API data: " + e.getMessage());
listener.exception(e);
}
}
private void parseApiResponse(String responseBody)
throws JSONException,
IllegalArgumentException,
TransactionRequiredException,
BrmoException,
IOException {
// test json response:
// {
// "pagina": 1,
// "aantal": 100,
// "totaal": 14344,
// "totaalPaginas": 144,
// "signalen": [
// {
// "id": "ed6b886b-39e2-46d9-8175-85a48f227c40",
// "timestamp": "2025-07-17T13:16:01.86692Z",
// "kvknummer": "90003403",
// "signaalType": "SignaalGewijzigdeInschrijving"
// },
// {
// "id": "a124dca0-de17-4d10-af8a-9e0ad79da1ed",
// "timestamp": "2025-07-17T13:16:01.885015Z",
// "kvknummer": "90004604",
// "signaalType": "SignaalGewijzigdeInschrijving"
// }
// ]}
LOG.debug("Parsing API response: " + responseBody);
JSONObject jsonObject = new JSONObject(responseBody);
pages = jsonObject.getInt("totaalPaginas");
int pagina = jsonObject.getInt("pagina");
listener.total(pages);
LOG.debug("Aantal signalen op pagina %d: %d".formatted(pagina, jsonObject.getInt("aantal")));
final JSONArray signalen = jsonObject.getJSONArray("signalen");
for (int i = 0; i < signalen.length(); i++) {
JSONObject signaal = signalen.getJSONObject(i);
String kvkNummer = signaal.getString("kvknummer");
if (kvkNummers.add(kvkNummer)) {
listener.addLog("KVK nummer %s toegevoegd van pagina %d.".formatted(kvkNummer, pagina));
}
}
if (pagina < pages) {
// If there are more pages, fetch the next page
getPageFromAPI(pagina + 1);
} else {
listener.addLog("Alle %d pagina's KVK mutaties zijn opgehaald.".formatted(pages));
storeKvkNummers();
}
}
private void storeKvkNummers() throws IllegalArgumentException, TransactionRequiredException {
for (String kvkNummer : kvkNummers) {
NHRInschrijving inschrijving =
Stripersist.getEntityManager().find(NHRInschrijving.class, kvkNummer);
if (inschrijving == null) {
inschrijving = new NHRInschrijving(kvkNummer);
} else {
inschrijving.setDatum(new Date());
inschrijving.setVolgendProberen(new Date());
inschrijving.setProbeerAantal(0);
}
Stripersist.getEntityManager().merge(inschrijving);
}
listener.addLog(
"Er zijn %s unieke KVK nummers met mutatie opgeslagen of bijgewerkt."
.formatted(kvkNummers.size()));
}
}