BAG2MutatieProcesRunner.java

/*
 * Copyright (C) 2021 B3Partners B.V.
 *
 * SPDX-License-Identifier: MIT
 *
 */

package nl.b3p.brmo.service.scanner;

import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.ERROR;
import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.PROCESSING;
import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.WAITING;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import javax.persistence.Transient;
import nl.b3p.brmo.bag2.loader.BAG2Database;
import nl.b3p.brmo.bag2.loader.cli.BAG2DatabaseOptions;
import nl.b3p.brmo.bag2.loader.cli.BAG2LoadOptions;
import nl.b3p.brmo.bag2.loader.cli.BAG2LoaderMain;
import nl.b3p.brmo.bag2.loader.cli.BAG2MutatiesCommand;
import nl.b3p.brmo.bag2.loader.cli.BAG2ProgressOptions;
import nl.b3p.brmo.bgt.loader.ProgressReporter;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.persistence.staging.BAG2MutatieProces;
import nl.b3p.brmo.persistence.staging.ClobElement;
import nl.b3p.brmo.service.util.ConfigUtil;
import nl.b3p.jdbc.util.converter.PGConnectionUnwrapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.stripesstuff.stripersist.Stripersist;

public class BAG2MutatieProcesRunner extends AbstractExecutableProces {
  private static final Log LOG = LogFactory.getLog(BAG2MutatieProcesRunner.class);
  private final BAG2MutatieProces config;

  @Transient private ProgressUpdateListener listener;

  public BAG2MutatieProcesRunner(BAG2MutatieProces config) {
    this.config = config;
  }

  /**
   * Voert deze taak eenmalig uit.
   *
   * @throws BrmoException als er een fout optreed in het uitvoeren van het proces.
   */
  @Override
  public void execute() throws BrmoException {
    this.execute(
        new ProgressUpdateListener() {
          @Override
          public void total(long total) {}

          @Override
          public void progress(long progress) {}

          @Override
          public void exception(Throwable t) {
            LOG.error(t);
          }

          @Override
          public void updateStatus(String status) {}

          @Override
          public void addLog(String log) {
            LOG.info(log);
            config.updateSamenvattingEnLogfile(log);
            Stripersist.getEntityManager().merge(config);
          }
        });
  }

  /**
   * Voert de taak uit en rapporteert de voortgang.
   *
   * @param listener voortgangs listener
   */
  @Override
  public void execute(ProgressUpdateListener listener) {
    this.listener = listener;
    BAG2LoaderMain.configureLogging(false);
    listener.addLog(String.format("Het BAG2 mutatie proces is gestart op %tc.", new Date()));
    if (config.getStatus().equals(WAITING) || config.getStatus().equals(ERROR)) {
      if (config.getStatus().equals(ERROR)) {
        listener.addLog("Vorige run is met ERROR status afgerond, opnieuw proberen");
      }

      int exitCode = -1;
      BAG2Database bag2Database;
      try (Connection rsgbbagConnection = ConfigUtil.getDataSourceRsgbBag().getConnection()) {

        BAG2DatabaseOptions databaseOptions = new BAG2DatabaseOptions();
        // Niet nodig (rsgbbagConnection wordt gebruikt), maar voor de duidelijkheid
        databaseOptions.setConnectionString(rsgbbagConnection.getMetaData().getURL());
        Connection connection = rsgbbagConnection;
        if (databaseOptions.getConnectionString().startsWith("jdbc:postgresql:")) {
          // Voor gebruik van pgCopy is unwrappen van de connectie nodig.
          // Ook al doet PostGISCopyInsertBatch zelf ook een unwrap, de
          // PGConnectionUnwrapper kan ook Tomcat JNDI
          // connection pool unwrapping aan welke niet met een normale Connection.unwrap()
          // werkt.
          connection = (Connection) PGConnectionUnwrapper.unwrap(rsgbbagConnection);
          databaseOptions.setUsePgCopy(true);
        }
        bag2Database =
            new BAG2Database(databaseOptions, connection) {
              /** connectie niet sluiten; dat doen we later als we helemaal klaar zijn */
              @Override
              public void close() {
                LOG.debug("Had de BAG2 databaseconnectie kunnen sluiten... maar niet gedaan.");
              }
            };

        BAG2LoaderMain main = new BAG2LoaderMain();
        main.setBag2Database(bag2Database);
        BAG2MutatiesCommand mutatiesCommand = new BAG2MutatiesCommand();
        mutatiesCommand.setParent(main);

        // Use defaults
        BAG2LoadOptions loadOptions = new BAG2LoadOptions();
        loadOptions.setGeoFilter(ClobElement.nullSafeGet(config.getConfig().get("geo-filter")));
        BAG2DatabaseOptions dbOptions = new BAG2DatabaseOptions();

        listener.updateStatus(PROCESSING.toString());
        config.setStatus(PROCESSING);
        Stripersist.getEntityManager().merge(config);
        Stripersist.getEntityManager().flush();

        String mode = ClobElement.nullSafeGet(config.getConfig().get("mode"));
        String directory = ClobElement.nullSafeGet(config.getConfig().get("directory"));
        String url = ClobElement.nullSafeGet(config.getConfig().get("url"));
        String kadasterUser = ClobElement.nullSafeGet(config.getConfig().get("kadaster-username"));
        String kadasterPassword =
            ClobElement.nullSafeGet(config.getConfig().get("kadaster-password"));
        String queryParams = ClobElement.nullSafeGet(config.getConfig().get("query"));

        if (!mode.equals("applyFromMirror")) {
          // Let op, zet URL naar die van BAG Bestanden, niet de default publieke mirror
          // voor applyFromMirror modus
          // (alhoewel downloaden/verwerken van de mirror en op andere computer 'load'
          // modus ook zou kunnen, maar dan
          // moet de default value voor de 'url' config niet ingevuld worden als nu)
          url = BAG2MutatiesCommand.LVBAG_BESTANDEN_API_URL;
        }
        if (queryParams == null || queryParams.trim().isEmpty()) {
          // Defaultwaarde voor dagmutaties, anders krijg je ook standen er bij...
          queryParams = "artikelnummers=2529";
        }
        final BAG2ProgressOptions progress = new BAG2ProgressOptions();
        progress.setCustomProgressReporter(
            new ProgressReporter() {
              @Override
              protected void log(String msg) {
                listener.addLog(msg);
              }

              @Override
              protected void status(String msg) {
                listener.updateStatus(msg);
              }
            });

        switch (mode) {
          case "applyFromMirror":
            listener.updateStatus("Verwerken van BAG2 mutatiebestanden...");
            listener.addLog(
                String.format(
                    "Verwerken van BAG2 mutatiebestanden van publieke mirror \"%s\"", url));
            exitCode =
                mutatiesCommand.apply(dbOptions, progress, null, null, url, queryParams, false);
            listener.addLog("Einde verwerken BAG2 bestanden");
            break;
          case "apply":
            listener.updateStatus("Verwerken van BAG2 mutatiebestanden...");
            listener.addLog(
                String.format(
                    "Verwerken van BAG2 mutatiebestanden van BAG Bestanden, gebruikersnaam %s",
                    kadasterUser));
            exitCode =
                mutatiesCommand.apply(
                    dbOptions, progress, kadasterUser, kadasterPassword, url, queryParams, false);
            listener.addLog("Einde verwerken BAG2 bestanden");
            break;
          case "download":
            listener.updateStatus("Downloaden van BAG2 mutatiebestanden...");
            listener.addLog(
                String.format(
                    "Downloaden van BAG2 mutatiebestanden naar directory %s, gebruikersnaam %s",
                    directory, kadasterUser));
            exitCode =
                mutatiesCommand.download(
                    false, kadasterUser, kadasterPassword, url, queryParams, directory, "", false);
            listener.addLog("Einde downloaden BAG2 bestanden");
            break;
          case "load":
            listener.updateStatus("Laden van BAG2 bestanden...");
            listener.addLog("Laden van BAG2 bestanden uit directory " + directory);
            exitCode = main.load(dbOptions, loadOptions, progress, new String[] {directory}, false);
            listener.addLog("Einde laden BAG2 bestanden");
            break;
        }
      } catch (ClassNotFoundException e) {
        LOG.error(e.getLocalizedMessage());
        listener.exception(e);
      } catch (BrmoException | SQLException e) {
        LOG.error("Fout tijdens benaderen BAG2 database", e);
        listener.exception(e);
      } catch (Exception e) {
        LOG.error("Fout tijdens BAG2 proces", e);
        listener.exception(e);
      } finally {
        if (exitCode == 0) {
          config.setStatus(WAITING);
          config.setLastrun(new Date());

          listener.updateStatus(WAITING.toString());
        } else {
          config.setStatus(ERROR);
          config.setLastrun(new Date());
          listener.updateStatus(ERROR.toString());
        }
        listener.addLog("BAG2 mutatieproces afgerond op " + new Date());
        config.setLastrun(new Date());
        Stripersist.getEntityManager().merge(config);
        Stripersist.getEntityManager().flush();
      }
    }
  }
}