RsgbProxy.java

package nl.b3p.brmo.loader;

import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Types;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.sql.DataSource;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamSource;
import nl.b3p.brmo.loader.entity.Bericht;
import nl.b3p.brmo.loader.updates.UpdateProcess;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.loader.util.DataComfortXMLReader;
import nl.b3p.brmo.loader.util.RsgbBRPTransformer;
import nl.b3p.brmo.loader.util.RsgbTransformer;
import nl.b3p.brmo.loader.util.RsgbWOZTransformer;
import nl.b3p.brmo.loader.util.TableData;
import nl.b3p.brmo.loader.util.TableRow;
import nl.b3p.jdbc.util.converter.ColumnMetadata;
import nl.b3p.jdbc.util.converter.GeometryJdbcConverter;
import nl.b3p.jdbc.util.converter.GeometryJdbcConverterFactory;
import nl.b3p.jdbc.util.converter.OracleJdbcConverter;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.javasimon.SimonManager;
import org.javasimon.Split;
import org.locationtech.jts.io.ParseException;
import org.w3c.dom.Node;

/**
 * @author Boy de Wit
 */
public class RsgbProxy implements Runnable, BerichtenHandler {

  private static final Log log = LogFactory.getLog(RsgbProxy.class);

  private final boolean alsoExecuteSql = true;

  private final ProgressUpdateListener listener;
  private final BerichtSelectMode mode;
  private final UpdateProcess updateProcess;
  private final Map<String, PreparedStatement> checkRowExistsStatements = new HashMap<>();
  private final Map<String, PreparedStatement> insertSqlPreparedStatements = new HashMap<>();
  private final Map<String, PreparedStatement> updateSqlPreparedStatements = new HashMap<>();
  private final Map<String, PreparedStatement> createDeleteSqlStatements = new HashMap<>();
  private final Map<String, List<String>> primaryKeyCache = new HashMap<>();
  private final DataComfortXMLReader dbXmlReader = new DataComfortXMLReader();
  // Gebruik andere instantie voor lezen oude berichten, gebeurt gelijktijdig
  // in andere thread als lezen van dbxml van berichten om te verwerken
  private final DataComfortXMLReader oldDbXmlReader = new DataComfortXMLReader();
  private int processed;

  /** De status voor BY_STATUS. */
  private Bericht.STATUS status;

  /** De bericht ID's voor BY_IDS. */
  private long[] berichtIds;

  /** Het ID van het laadproces vor BY_LAADPROCES. */
  private long[] laadprocesIds;

  private PreparedStatement insertMetadataStatement;
  private DatabaseMetaData dbMetadata = null;
  private DatabaseMetaData dbMetadataBrk = null;
  /* Map van lowercase tabelnaam naar originele case tabelnaam */
  private final Map<String, String> tables = new HashMap<>();
  private final Map<String, SortedSet<ColumnMetadata>> tableColumns = new HashMap<>();
  private Connection connRsgb = null;
  private Connection connRsgbBrk = null;
  private GeometryJdbcConverter geomToJdbc = null;
  private GeometryJdbcConverter geomToJdbcBrk = null;
  private Savepoint recentSavepoint = null;
  private String herkomstMetadata = null;
  private final DataSource dataSourceRsgb;
  private final DataSource dataSourceRsgbBrk;
  private final StagingProxy stagingProxy;
  private boolean enablePipeline = false;
  private int pipelineCapacity = 25;
  private boolean renewConnectionAfterCommit = false;
  private boolean orderBerichten = true;
  private String errorState = null;
  private final Map<String, RsgbTransformer> rsgbTransformers = new HashMap<>();
  private String simonNamePrefix = "b3p.rsgb.";

  public RsgbProxy(
      DataSource dataSourceRsgb,
      DataSource dataSourceRsgbBrk,
      StagingProxy stagingProxy,
      Bericht.STATUS status,
      ProgressUpdateListener listener) {
    this.stagingProxy = stagingProxy;
    this.dataSourceRsgb = dataSourceRsgb;
    this.dataSourceRsgbBrk = dataSourceRsgbBrk;

    mode = BerichtSelectMode.BY_STATUS;
    this.status = status;
    this.listener = listener;
    this.updateProcess = null;
  }

  /**
   * @param dataSourceRsgb de te gebruiken RSGB database
   * @param stagingProxy de te gebruiken StagingProxy
   * @param mode geeft aan wat ids zijn (laadprocessen of berichten)
   * @param ids record id's (afhankelijk van de {@code BerichtSelectMode})
   * @param listener voortgangs listener
   */
  public RsgbProxy(
      DataSource dataSourceRsgb,
      DataSource dataSourceRsgbBrk,
      StagingProxy stagingProxy,
      BerichtSelectMode mode,
      long[] ids,
      ProgressUpdateListener listener) {
    this.stagingProxy = stagingProxy;
    this.dataSourceRsgb = dataSourceRsgb;
    this.dataSourceRsgbBrk = dataSourceRsgbBrk;

    this.mode = mode;
    if (mode == BerichtSelectMode.BY_LAADPROCES) {
      this.laadprocesIds = ids;
    } else if (mode == BerichtSelectMode.BY_IDS) {
      this.berichtIds = ids;
    }
    this.listener = listener;
    this.updateProcess = null;
  }

  public RsgbProxy(
      DataSource dataSourceRsgb,
      DataSource dataSourceRsgbBrk,
      StagingProxy stagingProxy,
      UpdateProcess updateProcess,
      ProgressUpdateListener listener) {
    this.stagingProxy = stagingProxy;
    this.dataSourceRsgb = dataSourceRsgb;
    this.dataSourceRsgbBrk = dataSourceRsgbBrk;

    this.mode = BerichtSelectMode.FOR_UPDATE;
    this.updateProcess = updateProcess;
    this.listener = listener;
  }

  public void setSimonNamePrefix(String prefix) {
    this.simonNamePrefix = prefix;
  }

  public void setEnablePipeline(boolean enablePipeline) {
    this.enablePipeline = enablePipeline;
  }

  public void setPipelineCapacity(int pipelineCapacity) {
    this.pipelineCapacity = pipelineCapacity;
  }

  public void setRenewConnectionAfterCommit(boolean renewConnectionAfterCommit) {
    this.renewConnectionAfterCommit = renewConnectionAfterCommit;
  }

  public void setOrderBerichten(boolean orderBerichten) {
    this.orderBerichten = orderBerichten;
  }

  public void setErrorState(String errorState) {
    this.errorState = errorState;
  }

  public void init() throws SQLException {
    ResultSet tablesRs = null;
    if (null != this.dataSourceRsgb) {
      connRsgb = dataSourceRsgb.getConnection();
      geomToJdbc = GeometryJdbcConverterFactory.getGeometryJdbcConverter(connRsgb);

      try {
        dbMetadata = connRsgb.getMetaData();
        tablesRs = dbMetadata.getTables(null, geomToJdbc.getSchema(), "%", new String[] {"TABLE"});
        while (tablesRs.next()) {
          tables.put(
              tablesRs.getString("TABLE_NAME").toLowerCase(), tablesRs.getString("TABLE_NAME"));
        }
      } catch (SQLException sqlEx) {
        throw new SQLException("Fout bij ophalen rsgb tabellen: ", sqlEx);
      } finally {
        DbUtils.closeQuietly(tablesRs);
      }
    }

    if (null != dataSourceRsgbBrk) {
      connRsgbBrk = dataSourceRsgbBrk.getConnection();
      geomToJdbcBrk = GeometryJdbcConverterFactory.getGeometryJdbcConverter(connRsgbBrk);

      try {
        dbMetadataBrk = connRsgbBrk.getMetaData();
        tablesRs =
            dbMetadataBrk.getTables(null, geomToJdbcBrk.getSchema(), "%", new String[] {"TABLE"});
        while (tablesRs.next()) {
          tables.put(
              tablesRs.getString("TABLE_NAME").toLowerCase(), tablesRs.getString("TABLE_NAME"));
        }
      } catch (SQLException sqlEx) {
        throw new SQLException("Fout bij ophalen rsgb brk tabellen: ", sqlEx);
      } finally {
        DbUtils.closeQuietly(tablesRs);
      }
    }
  }

  private PreparedStatement checkAndGetPreparedStatement(
      Map<String, PreparedStatement> m, String name) throws SQLException {
    PreparedStatement stm = m.get(name);
    if (stm == null || stm.isClosed()) {
      return null;
    }
    stm.clearParameters();
    return stm;
  }

  public void checkAndAddStatement(
      Map<String, PreparedStatement> m, String tableName, PreparedStatement stmt) {
    m.put(tableName, stmt);
  }

  public void close() {
    for (PreparedStatement stmt : checkRowExistsStatements.values()) {
      DbUtils.closeQuietly(stmt);
    }
    checkRowExistsStatements.clear();
    for (PreparedStatement stmt : insertSqlPreparedStatements.values()) {
      DbUtils.closeQuietly(stmt);
    }
    insertSqlPreparedStatements.clear();
    for (PreparedStatement stmt : updateSqlPreparedStatements.values()) {
      DbUtils.closeQuietly(stmt);
    }
    updateSqlPreparedStatements.clear();
    for (PreparedStatement stmt : createDeleteSqlStatements.values()) {
      DbUtils.closeQuietly(stmt);
    }
    createDeleteSqlStatements.clear();
    if (insertMetadataStatement != null) {
      DbUtils.closeQuietly(insertMetadataStatement);
    }
    insertMetadataStatement = null;
    DbUtils.closeQuietly(this.connRsgb);
    this.connRsgb = null;
    DbUtils.closeQuietly(this.connRsgbBrk);
    this.connRsgbBrk = null;
  }

  @Override
  public void renewConnection() throws SQLException {
    close();
    init();
  }

  @Override
  public void run() {
    try {
      init();
      // Set all berichten to waiting
      long total = setWaitingStatus();
      if (listener != null) {
        listener.total(total);
      }
      // Do the work by querying all berichten, berichten are passed to
      // handle() method
      if (total > 0) {
        stagingProxy.handleBerichtenByJob(
            total, this, enablePipeline, pipelineCapacity, orderBerichten);
      }
    } catch (Exception e) {
      // user is informed via status in database
      log.error("Fout tijdens verwerken berichten", e);

      if (listener != null) {
        listener.exception(e);
      }
    }

    close();
  }

  private long setWaitingStatus() throws SQLException, BrmoException {
    // verwijder al verwerkte berichten uit afgebroken job
    stagingProxy.cleanJob();
    // zijn er nog berichten over uit de vorige job
    long count = stagingProxy.getCountJob();
    if (count > 0 && !mode.equals(BerichtSelectMode.RETRY_WAITING)) {
      throw new BrmoException(
          "Vorige transformatie is afgebroken," + " verwerk eerst die berichten (zie job-tabel)!");
    }

    switch (mode) {
      case BY_STATUS:
        return stagingProxy.setBerichtenJobByStatus(status, orderBerichten);
      case BY_IDS:
        return stagingProxy.setBerichtenJobByIds(berichtIds, orderBerichten);
      case BY_LAADPROCES:
        return stagingProxy.setBerichtenJobByLaadprocessen(laadprocesIds, orderBerichten);
      case FOR_UPDATE:
        return stagingProxy.setBerichtenJobForUpdate(updateProcess.getSoort(), orderBerichten);
      case RETRY_WAITING:
        return count;
      default:
        return 0L;
    }
  }

  @Override
  public void updateProcessingResult(Bericht ber) {
    if (updateProcess != null) {
      return;
    }
    ber.setStatusDatum(new Date());
    try {
      stagingProxy.updateBerichtProcessing(ber);
    } catch (SQLException ex) {
      log.error("Kan status niet updaten", ex);
    }
  }

  @Override
  public List<TableData> transformToTableData(Bericht ber) throws BrmoException {
    if (updateProcess != null) {
      return transformUpdateTableData(ber);
    }

    StringBuilder loadLog = new StringBuilder();
    SimpleDateFormat dateTimeFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
    SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy");

    try {
      loadLog.append(
          String.format(
              "%s: transformeren %s bericht object ref %s, met id %s, berichtdatum %s\n",
              dateTimeFormat.format(new Date()),
              ber.getSoort(),
              ber.getObjectRef(),
              ber.getId(),
              dateFormat.format(ber.getDatum())));

      RsgbTransformer transformer = getTransformer(ber.getSoort());

      long startTime = System.currentTimeMillis();
      stagingProxy.updateBerichtenDbXml(Collections.singletonList(ber), transformer);

      Split split = SimonManager.getStopwatch("b3p.staging.bericht.dbxml.read").start();
      StringReader nReader = new StringReader(ber.getDbXml());
      List<TableData> data = dbXmlReader.readDataXML(new StreamSource(nReader));
      split.stop();

      loadLog.append(
          String.format(
              "Transformeren naar RSGB database-xml en lezen resultaat: %4.1fs\n\n",
              (System.currentTimeMillis() - startTime) / 1000.0));
      ber.setOpmerking(loadLog.toString());
      return data;
    } catch (Exception e) {
      try {
        updateBerichtException(ber, e);
      } finally {
        updateProcessingResult(ber);
      }
      return null;
    }
  }

  public List<TableData> transformUpdateTableData(Bericht ber) throws BrmoException {

    RsgbTransformer transformer = rsgbTransformers.get(updateProcess.getName());

    if (transformer == null) {
      try {
        transformer = new RsgbTransformer(updateProcess.getXsl());
      } catch (Exception e) {
        throw new BrmoException("Fout bij laden XSL stylesheet: " + updateProcess.getXsl(), e);
      }
      rsgbTransformers.put(updateProcess.getName(), transformer);
    }
    try {
      Node dbxml = transformer.transformToDbXmlNode(ber);
      return dbXmlReader.readDataXML(new DOMSource(dbxml));
    } catch (Exception e) {
      log.error("Fout bij transformeren bericht #" + ber.getId() + " voor update", e);
      throw new BrmoException(
          "Fout bij transformeren bericht #"
              + ber.getId()
              + " voor update: "
              + e.getClass()
              + ": "
              + e.getMessage());
    }
  }

  public void updateBerichtException(Bericht ber, Throwable e) throws BrmoException {
    ber.setStatus(Bericht.STATUS.RSGB_NOK);
    StringWriter sw = new StringWriter();
    e.printStackTrace(new PrintWriter(sw, true));
    ber.setOpmerking(ber.getOpmerking() + "\nFout: " + sw);
    if (orderBerichten) {
      if (errorState != null && errorState.equalsIgnoreCase("ignore")) {
        // TODO omdat we te vaak moeten stoppen, later soort exception erbij betrekken.
      } else {
        // volgorde belangrijk dus stoppen
        log.fatal("Mutatieverwerking en foutief bericht, dus stoppen!", e);
        throw new BrmoException("Mutatieverwerking en foutief bericht, dus stoppen!");
      }
    }
  }

  /**
   * verwerk een bericht.
   *
   * @param ber te verwerken bericht
   * @param pretransformedTableData tabel data uit staging
   * @param updateResult {@code true} update verwerkings resultaat in staging
   * @throws BrmoException if any
   */
  @Override
  public void handle(Bericht ber, List<TableData> pretransformedTableData, boolean updateResult)
      throws BrmoException {
    if (updateProcess != null) {
      update(ber, pretransformedTableData);
      return;
    }
    Connection conn = this.connRsgb;
    GeometryJdbcConverter jdbcConverter = this.geomToJdbc;
    DatabaseMetaData metaData = this.dbMetadata;
    if (ber.getSoort().equalsIgnoreCase(BrmoFramework.BR_BRK2)) {
      conn = this.connRsgbBrk;
      jdbcConverter = this.geomToJdbcBrk;
      metaData = this.dbMetadataBrk;
    }

    Bericht.STATUS newStatus = Bericht.STATUS.RSGB_OK;

    SimpleDateFormat dateTimeFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss:SSS");
    SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy");

    log.debug(
        String.format(
            "RSGB verwerking van %s bericht met id %s, object_ref %s, datum %tc",
            ber.getSoort(), ber.getId(), ber.getObjectRef(), ber.getDatum()));
    StringBuilder loadLog;
    if (pretransformedTableData == null) {
      loadLog = new StringBuilder();
    } else {
      loadLog = new StringBuilder(ber.getOpmerking());
    }
    loadLog.append(
        String.format(
            "%s: start verwerking %s bericht object ref %s, met id %s, berichtdatum %s\n",
            dateTimeFormat.format(new Date()),
            ber.getSoort(),
            ber.getObjectRef(),
            ber.getId(),
            dateFormat.format(ber.getDatum())));

    setHerkomstMetadata(ber.getSoort());

    long startTime = 0;
    try {

      if (conn.getAutoCommit()) {
        conn.setAutoCommit(false);
      }

      List<TableData> newList;
      if (pretransformedTableData != null) {
        loadLog.append("Bericht was al getransformeerd in pipeline\n");
        newList = pretransformedTableData;
      } else {
        loadLog.append("Transformeren naar RSGB database-xml en lezen resultaat... ");
        startTime = System.currentTimeMillis();
        newList = transformToTableData(ber);
        loadLog.append(
            String.format("%4.1fs\n", (System.currentTimeMillis() - startTime) / 1000.0));
      }
      // nieuwe starttijd voor laden in RSGB
      startTime = System.currentTimeMillis();

      // per bericht kijken of er een oud bericht is
      Bericht oud = null;
      if (orderBerichten) {
        oud = stagingProxy.getOldBericht(ber, loadLog);
      } else {
        // als geen orderberichten dan wordt stand ingelezen
        // en indien volgordenummer >=0 (geen stand) dan afbreken
        if (ber.getVolgordeNummer() >= 0) {
          throw new BrmoException("Standverwerking, maar bericht is mutatie: niet verwerken!");
        }
        loadLog.append(
            "Standverwerking, gegevens worden alleen toegevoegd als ze nog niet bestaan!\n");
      }

      // oud bericht
      if (oud != null) {
        loadLog.append(
            String.format(
                "Eerder bericht (id: %d) voor zelfde object gevonden van datum %s, volgnummer %d,status %s op %s\n",
                oud.getId(),
                dateTimeFormat.format(oud.getDatum()),
                oud.getVolgordeNummer(),
                oud.getStatus().toString(),
                dateTimeFormat.format(oud.getStatusDatum())));

        if (ber.getDatum().equals(oud.getDatum())
            && ber.getVolgordeNummer().equals(oud.getVolgordeNummer())) {
          if (Objects.equals(ber.getId(), oud.getId())) {
            // dit kan voorkomen bij herstart van job terwijl bericht nog in pipeline
            // van andere job zat
            // niets doen, log overnemen.
            loadLog.append(oud.getOpmerking());
          } else {
            loadLog.append(
                "Datum en volgordenummer van nieuw bericht hetzelfde als de oude, negeer dit bericht!\n");
            boolean dbXmlEquals = ber.getDbXml().equals(oud.getDbXml());
            if (!dbXmlEquals) {
              String s =
                  String.format(
                      "Bericht %d met zelfde datum en volgnummer als eerder verwerkt bericht %d heeft andere db xml! Object ref %s",
                      ber.getId(), oud.getId(), ber.getObjectRef());
              log.warn(s);
              loadLog.append("Waarschuwing: ").append(s).append("\n");
            }
          }
        } else {
          // Check of eerder bericht toevallig niet nieuwere datum
          if (oud.getDatum().after(ber.getDatum())) {
            newStatus = Bericht.STATUS.RSGB_OUTDATED;
            loadLog.append(
                "Bericht bevat oudere data dan eerder verwerkt bericht, status RSGB_OUTDATED\n");
            // Check of eerder bericht toevallig niet zelfde datum met hoger volgorde
            // nummer, mantis-6098
          } else if (oud.getDatum().equals(ber.getDatum())
              && oud.getVolgordeNummer() > ber.getVolgordeNummer()) {
            newStatus = Bericht.STATUS.RSGB_OUTDATED;
            loadLog.append(
                "Bericht bevat oudere data dan eerder verwerkt bericht met hoger volgnummer, status RSGB_OUTDATED\n");
          } else {
            StringReader oReader = new StringReader(oud.getDbXml());
            List<TableData> oudList = oldDbXmlReader.readDataXML(new StreamSource(oReader));

            parseNewData(oudList, newList, loadLog, jdbcConverter, conn, metaData);
            parseOldData(oudList, newList, ber.getDatum(), loadLog, jdbcConverter, conn, metaData);
          }
        }
      } else {
        parseNewData(null, newList, loadLog, jdbcConverter, conn, metaData);
      }

      Split commit = SimonManager.getStopwatch(simonNamePrefix + "commit").start();
      conn.commit();
      commit.stop();
      ber.setStatus(newStatus);

      this.processed++;
      if (listener != null) {
        listener.progress(this.processed);
      }

    } catch (Throwable ex) {
      log.error(
          "Fout bij verwerking bericht met id "
              + ber.getId()
              + ", melding: "
              + ex.getLocalizedMessage());
      log.debug("Fout bij verwerking bericht met id " + ber.getId(), ex);
      try {
        conn.rollback();
      } catch (SQLException e) {
        log.debug("Rollback exception", e);
      }
      ber.setOpmerking(loadLog.toString());
      loadLog = null;
      updateBerichtException(ber, ex);
    } finally {
      if (loadLog == null) {
        loadLog = new StringBuilder(ber.getOpmerking());
      }
      loadLog.append("\n\n");
      loadLog.append(
          String.format(
              "%s: einde verwerking, duur: %4.1fs",
              dateTimeFormat.format(new Date()),
              (System.currentTimeMillis() - startTime) / 1000.0));
      ber.setOpmerking(loadLog.toString());

      if (updateResult) {
        updateProcessingResult(ber);
      }

      if (recentSavepoint != null) {
        // Set savepoint to null, as it is automatically released after commit the
        // transaction.
        recentSavepoint = null;
      }

      if (renewConnectionAfterCommit) {
        log.debug("Vernieuwen van verbinding om cursors in Oracle vrij te geven!");
        if (!(jdbcConverter instanceof OracleJdbcConverter)) {
          log.info(
              "Vernieuwen van verbinding is alleen nodig bij sommige Oracle implementeties: lijkt hier overbodig!");
        }
        try {
          renewConnection();
        } catch (SQLException ex) {
          throw new BrmoException("Error renewing connection", ex);
        }
      }
    }
  }

  public void update(Bericht jobBericht, List<TableData> pretransformedTableData)
      throws BrmoException {
    Connection conn = this.connRsgb;
    GeometryJdbcConverter jdbcConverter = this.geomToJdbc;
    DatabaseMetaData metaData = dbMetadata;
    if (jobBericht.getSoort().equalsIgnoreCase(BrmoFramework.BR_BRK2)) {
      conn = this.connRsgbBrk;
      jdbcConverter = this.geomToJdbcBrk;
      metaData = this.dbMetadataBrk;
    }

    try {
      if (conn.getAutoCommit()) {
        conn.setAutoCommit(false);
      }

      List<TableData> newList;
      if (pretransformedTableData != null) {
        newList = pretransformedTableData;
      } else {
        newList = transformUpdateTableData(jobBericht);
      }

      log.trace("data voor update: " + newList);
      for (TableData newData : newList) {
        for (TableRow row : newData.getRows()) {

          boolean comfortFound =
              rowExistsInDb(row, new StringBuilder(), jdbcConverter, conn, metaData);

          // zoek obv natural key op in rsgb
          if (comfortFound) {
            createUpdateSql(row, new StringBuilder(), jdbcConverter, conn, metaData);
          } else {
            // insert all comfort records into hoofdtabel
            createInsertSql(row, false, new StringBuilder(), jdbcConverter, conn, metaData);
          }
        }
      }
      conn.commit();

      this.processed++;
      if (listener != null) {
        listener.progress(this.processed);
      }
    } catch (Throwable ex) {
      log.error("Fout bij updaten bericht met id " + jobBericht.getId(), ex);
      try {
        conn.rollback();
      } catch (SQLException e) {
        log.debug("Rollback exception", e);
      }
    }
  }

  private RsgbTransformer getTransformer(String brType)
      throws TransformerConfigurationException, ParserConfigurationException {
    RsgbTransformer t = rsgbTransformers.get(brType);
    if (t == null) {
      t =
          switch (brType) {
            case BrmoFramework.BR_BRK2 -> new RsgbTransformer(BrmoFramework.XSL_BRK2);
            case BrmoFramework.BR_BRP ->
                new RsgbBRPTransformer(BrmoFramework.XSL_BRP, this.stagingProxy);
            case BrmoFramework.BR_NHR -> new RsgbTransformer(BrmoFramework.XSL_NHR);
            case BrmoFramework.BR_GBAV -> new RsgbTransformer(BrmoFramework.XSL_GBAV);
            case BrmoFramework.BR_WOZ ->
                new RsgbWOZTransformer(BrmoFramework.XSL_WOZ, this.stagingProxy);
            default -> throw new IllegalArgumentException("Onbekende basisregistratie: " + brType);
          };
      rsgbTransformers.put(brType, t);
    }
    return t;
  }

  private StringBuilder parseNewData(
      List<TableData> oldList,
      List<TableData> newList,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection conn,
      DatabaseMetaData metaData)
      throws SQLException, ParseException, BrmoException {
    Split split = SimonManager.getStopwatch(simonNamePrefix + "parsenewdata").start();

    // parse new db xml
    loadLog.append("\nVerwerk nieuw bericht");
    if (newList == null || newList.isEmpty()) {
      if (oldList == null || oldList.isEmpty()) {
        throw new BrmoException(
            "Bericht verwijdert object waarvoor geen eerder bericht gevonden is!");
      }
      split.stop();
      return loadLog;
    }

    String lastExistingBrondocumentId = null;
    for (TableData newData : newList) {
      if (!newData.isComfortData() && !newData.isDeleteData()) {
        Split splitAuthentic =
            SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.authentic").start();
        // auth data
        loadLog.append("\n");
        for (TableRow row : newData.getRows()) {

          // Controleer op al bestaand stukdeel
          if (lastExistingBrondocumentId != null
              && "brondocument".equalsIgnoreCase(row.getTable())
              && lastExistingBrondocumentId.equals(row.getColumnValue("tabel_identificatie"))) {
            loadLog.append("\nOverslaan stukdeel voor stuk ").append(lastExistingBrondocumentId);
            SimonManager.getCounter(simonNamePrefix + "parsenewdata.authentic.skipstukdeel")
                .increase();
            continue;
          } else {
            lastExistingBrondocumentId = null;
          }

          Split split2 =
              SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.authentic.getpk").start();
          List<String> pkColumns = getPrimaryKeys(row.getTable(), jdbcConverter, metaData);
          split2.stop();

          boolean existsInOldList = doesRowExist(row, pkColumns, oldList);
          split2 =
              SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.authentic.rowexistsindb")
                  .start();
          // er kan een record bestaan zonder dat er
          // een oud bericht aan ten grondslag ligt.
          // dan maar update maar zonder historie vastlegging.
          // oud record kan niet worden terugherleid.
          // TODO: dure check, kan het anders?
          boolean doInsert = !rowExistsInDb(row, loadLog, jdbcConverter, conn, metaData);
          split2.stop();

          // insert hoofdtabel
          if (doInsert) {
            if (existsInOldList) {
              // insert in hoofdtabel, terwijl er wel oud bericht beschikbaar is
              // dit lijkt op een fout situatie, wat te doen?
              loadLog.append(
                  "\nOud bericht bestaat, maar staat niet (meer) in database, daarom toevoeging van object aan tabel: ");
            } else {
              // normale insert in hoofdtabel
              loadLog.append("\nNormale toevoeging van object aan tabel: ");
            }
            loadLog.append(row.getTable());
            split2 =
                SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.authentic.insert")
                    .start();
            createInsertSql(row, false, loadLog, jdbcConverter, conn, metaData);
            split2.stop();
          } else {
            split2 =
                SimonManager.getStopwatch(
                        simonNamePrefix + "parsenewdata.authentic.isalreadyinmetadata")
                    .start();
            boolean inMetaDataTable =
                isAlreadyInMetadata(row, loadLog, jdbcConverter, conn, metaData);
            split2.stop();
            // wis metadata en update hoofdtabel
            if (inMetaDataTable) {
              loadLog.append("\nwis uit metadata tabel (upgrade van comfort naar authentiek). ");
              split2 =
                  SimonManager.getStopwatch(
                          simonNamePrefix + "parsenewdata.authentic.deletefrommetadata")
                      .start();
              deleteFromMetadata(row, loadLog, jdbcConverter, conn, metaData);
              split2.stop();
            }
            if ("brondocument".equalsIgnoreCase(row.getTable())) {
              // Nooit stukdeel wat al bestaat updaten, en ook alle
              // volgende stukdelen niet
              lastExistingBrondocumentId = row.getColumnValue("tabel_identificatie");
              loadLog
                  .append("\nRij voor stukdeel ")
                  .append(lastExistingBrondocumentId)
                  .append(" bestaat al, geen update");
              continue;
            }

            if (existsInOldList) {
              // update end date old record
              loadLog.append("\nUpdate einddatum in vorige versie object");
              TableRow oldRow = getMatchingRowFromTableData(row, pkColumns, oldList);
              String newBeginDate = getValueFromTableRow(row, row.getColumnDatumBeginGeldigheid());
              updateValueInTableRow(oldRow, row.getColumnDatumEindeGeldigheid(), newBeginDate);

              // write old to archive table
              loadLog.append("\nSchrijf vorige versie naar archief tabel");
              oldRow.setIgnoreDuplicates(true);

              split2 =
                  SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.authentic.archive")
                      .start();
              createInsertSql(oldRow, true, loadLog, jdbcConverter, conn, metaData);
              split2.stop();

              loadLog.append("\nUpdate object in tabel: ");
            } else {
              loadLog.append("\nUpdate object (zonder vastlegging historie) in tabel: ");
            }
            loadLog.append(row.getTable());
            split2 =
                SimonManager.getStopwatch(
                        simonNamePrefix + "parsenewdata.authentic.update." + row.getTable())
                    .start();
            createUpdateSql(row, loadLog, jdbcConverter, conn, metaData);
            split2.stop();
          }
        }
        splitAuthentic.stop();

      } else if (!newData.isDeleteData()) {
        Split splitComfort =
            SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.comfort").start();

        // TODO: select herkomst_br/datum uit herkomst_metadata voor tabel,kolom,waarde
        // indien niet bestaat: nieuwe comfort data, alles unconditional insert
        // indien wel bestaat en herkomst_br/datum identiek: gegevens uit stand al geinsert,
        // negeer
        // indien wel bestaat en herkomst_br/datum anders: update en insert nieuwe herkomst
        // comfort data
        String tabel = newData.getComfortSearchTable();
        String kolom = newData.getComfortSearchColumn();
        String waarde = newData.getComfortSearchValue();
        String datum = newData.getComfortSnapshotDate();

        Split split2;

        for (TableRow row : newData.getRows()) {
          String subclass = row.getTable();
          loadLog.append(
              String.format(
                  "\n\nComfort data voor %s (class: %s, %s=%s), controleer aanwezigheid in tabel",
                  subclass, tabel, kolom, waarde));
          split2 =
              SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.comfort.exists").start();
          boolean comfortFound = rowExistsInDb(row, loadLog, jdbcConverter, conn, metaData);
          split2.stop();

          // zoek obv natural key op in rsgb
          if (comfortFound) {
            split2 =
                SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.comfort.update").start();
            createUpdateSql(row, loadLog, jdbcConverter, conn, metaData);
            split2.stop();
          } else {
            // insert all comfort records into hoofdtabel
            split2 =
                SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.comfort.insert").start();
            createInsertSql(row, false, loadLog, jdbcConverter, conn, metaData);
            split2.stop();
          }
        }

        // Voeg herkomst van metadata toe, alleen voor superclass tabel!
        split2 =
            SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.comfort.metadata").start();
        if (metaData != dbMetadataBrk) {
          // overslaan voor brk 2
          createInsertMetadataSql(tabel, kolom, waarde, datum, loadLog, jdbcConverter, conn);
        }
        split2.stop();

        splitComfort.stop();
        loadLog.append("\n");

      } else if (newData.isDeleteData() && (oldList == null || oldList.isEmpty())) {
        // only use delete in newData if there is no old record (this is more complete)
        Split splitDelete =
            SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.delete").start();
        loadLog.append("\n\nWissen van object zonder oud record.");
        for (TableRow row : newData.getRows()) {
          // zoek obv natural key op in rsgb
          if (rowExistsInDb(row, loadLog, jdbcConverter, conn, metaData)) {
            Split split2 =
                SimonManager.getStopwatch(simonNamePrefix + "parsenewdata.delete.update").start();
            createDeleteSql(row, loadLog, jdbcConverter, conn, metaData);
            split2.stop();
          } else {
            loadLog.append("\nTe wissen record niet in database.");
          }
        }
        splitDelete.stop();
        loadLog.append("\n");
      }
    }
    split.stop();
    return loadLog;
  }

  private StringBuilder parseOldData(
      List<TableData> oldList,
      List<TableData> newList,
      Date newDate,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection,
      DatabaseMetaData metaData)
      throws SQLException, ParseException {

    List<TableRow> rowsToDelete = new ArrayList<>();
    // als heel object verwijderd moet worden omdat newList een verwijderbericht is
    boolean deleteAll = !newList.isEmpty() && newList.get(0).isDeleteData();

    // parse old db xml
    loadLog.append("\nControleren te verwijderen gegevens vorig bericht...\n");
    for (TableData oldData : oldList) {
      if (oldData.isComfortData()) {
        // comfort block ? do nothing
      } else {
        for (TableRow oldRow : oldData.getRows()) {
          List<String> pkColumns = getPrimaryKeys(oldRow.getTable(), jdbcConverter, metaData);
          boolean exists = doesRowExist(oldRow, pkColumns, newList);
          if (deleteAll || !exists) {
            loadLog
                .append("Vervallen record te verwijderen: ")
                .append(oldRow.toString(pkColumns))
                .append("\n");
            rowsToDelete.add(oldRow);
          }
        }
      }
    }

    if (!rowsToDelete.isEmpty()) {
      loadLog.append("Verwijder rijen in omgekeerde volgorde...\n");

      Collections.reverse(rowsToDelete);
      for (TableRow rowToDelete : rowsToDelete) {
        Split split = SimonManager.getStopwatch(simonNamePrefix + "parseolddata.delete").start();
        createDeleteSql(rowToDelete, loadLog, jdbcConverter, connection, metaData);
        split.stop();

        String beginDate =
            getValueFromTableRow(rowToDelete, rowToDelete.getColumnDatumBeginGeldigheid());
        String endDate = formatDateLikeOtherDate(newDate, beginDate);
        updateValueInTableRow(rowToDelete, rowToDelete.getColumnDatumEindeGeldigheid(), endDate);

        split = SimonManager.getStopwatch(simonNamePrefix + "parseolddata.archive").start();
        rowToDelete.setIgnoreDuplicates(true);
        createInsertSql(rowToDelete, true, loadLog, jdbcConverter, connection, metaData);
        split.stop();
      }
    }

    return loadLog;
  }

  /**
   * Probeer de datum te formatteren aan de hand van de formatting van de {@code otherDate}
   * formatting. De fallback optie voor formatting is {@code yyyy-MM-dd}, ondersteunde opties zijn:
   *
   * <ul>
   *   <li>{@code yyyy-MM-dd} (BRK2)
   *   <li>{@code yyyyMMddHHmmssSSS0} (BAG)
   * </ul>
   *
   * @param newDate te formatten datum
   * @param otherDate de formatted datum
   * @return formatted datum, indien mogelijk in de vorm van {@code otherDate}
   */
  private String formatDateLikeOtherDate(Date newDate, String otherDate) {
    // 2010-06-29 (BRK2)
    SimpleDateFormat dfltFmt = new SimpleDateFormat("yyyy-MM-dd");
    // 201006291200000000 (BAG)
    SimpleDateFormat f2 = new SimpleDateFormat("yyyyMMddHHmmssSSS0");

    if (otherDate == null || otherDate.isEmpty()) {
      // fallback op f1, er zijn gevallen waar otherDate null is, bijv. zak_recht
      return dfltFmt.format(newDate);
    }

    try {
      f2.parse(otherDate);
      return f2.format(newDate);
    } catch (java.text.ParseException ex) {
      // ignore
    }
    // add other formats
    return dfltFmt.format(newDate);
  }

  private String getValueFromTableRow(TableRow row, String column) {
    int i = 0;
    for (String c : row.getColumns()) {
      if (c.equalsIgnoreCase(column)) {
        return row.getValues().get(i);
      }
      i++;
    }
    return null;
  }

  private void updateValueInTableRow(TableRow row, String column, String newValue) {
    // TODO, mag later weer weg
    repairOldRowIfRequired(row, newValue);

    if (column == null) {
      return;
    }
    int i = 0;
    for (String c : row.getColumns()) {
      if (c.equalsIgnoreCase(column)) {
        row.getValues().set(i, newValue);
        return;
      }
      i++;
    }
  }

  private void repairOldRowIfRequired(TableRow row, String guessDate) {

    if (row.getTable().equals("kad_perceel")
        || row.getTable().equals("app_re")
        || row.getTable().equals("nummeraand")
        || row.getTable().equals("ligplaats")
        || row.getTable().equals("standplaats")
        || row.getTable().equals("verblijfsobj")) {
      if (!row.getColumns().contains("sc_dat_beg_geldh")) {
        row.getColumns().add("sc_dat_beg_geldh");
        row.getValues().add(guessDate);
      }
    }
  }

  private TableRow getMatchingRowFromTableData(
      TableRow row, List<String> pkColumns, List<TableData> testList) {
    TableRow newRow = null;

    for (TableData data : testList) {
      for (TableRow testRow : data.getRows()) {
        if (row.getTable().equals(testRow.getTable())) {
          int matches = 0;
          for (String pkColumn : pkColumns) {
            int rowIdx = row.getColumns().indexOf(pkColumn);
            int testIdx = testRow.getColumns().indexOf(pkColumn);

            String val = row.getValues().get(rowIdx);
            String testVal = testRow.getValues().get(testIdx);

            if (val.equals(testVal)) {
              matches++;
            }
          }

          if (matches == pkColumns.size()) {
            newRow = testRow;
          }
        }
      }
    }

    return newRow;
  }

  private boolean doesRowExist(TableRow row, List<String> pkColumns, List<TableData> testList) {
    boolean found = false;

    if (testList != null && testList.size() > 0) {
      for (TableData data : testList) {
        for (TableRow testRow : data.getRows()) {
          if (row.getTable().equals(testRow.getTable())) {
            int matches = 0;
            for (String pkColumn : pkColumns) {
              int rowIdx = row.getColumns().indexOf(pkColumn);
              int testIdx = testRow.getColumns().indexOf(pkColumn);

              String val = row.getValues().get(rowIdx);
              String testVal = testRow.getValues().get(testIdx);

              if (val.equals(testVal)) {
                matches++;
              }
            }

            if (matches == pkColumns.size()) {
              found = true;
            }
          }
        }
      }
    }

    return found;
  }

  private void createInsertSql(
      TableRow row,
      boolean useArchiveTable,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection,
      DatabaseMetaData metaData)
      throws SQLException, ParseException {

    StringBuilder sql = new StringBuilder("insert into ");

    String tableName;
    if (!useArchiveTable) {
      tableName = row.getTable();
    } else {
      tableName = row.getTable() + "_archief";
    }

    sql.append(tableName);

    SortedSet<ColumnMetadata> tableColumnMetadata;
    tableColumnMetadata = getTableColumnMetadata(tableName, jdbcConverter, metaData);

    if (tableColumnMetadata == null) {
      if (useArchiveTable) {
        // Wanneer archief tabellen niet zijn aangemaakt negeren
        return;
      } else {
        throw new IllegalStateException("Kan tabel metadata niet vinden voor tabel " + tableName);
      }
    }

    sql.append(" (");
    StringBuilder valuesSql = new StringBuilder();
    List<Object> params = new ArrayList<>();

    // TODO maak sql op basis van alle columns en gebruik null values
    // zodat insert voor elke tabel hetzelfde, reuse preparedstatement
    Iterator<String> valuesIt = row.getValues().iterator();
    for (Iterator<String> it = row.getColumns().iterator(); it.hasNext(); ) {
      String column = it.next();
      String stringValue = valuesIt.next();

      if (row.isAlleenArchiefColumn(column) && !useArchiveTable) {
        continue;
      }

      ColumnMetadata cm = findColumnMetadata(tableColumnMetadata, column);
      Object param = getValueAsObject(tableName, column, stringValue, cm, jdbcConverter, metaData);
      params.add(param);

      sql.append(cm.getName());

      String insertValuePlaceholder = "?";
      if (cm.getTypeName().equals(jdbcConverter.getGeomTypeName())
          // in geval we in een ander dan public schema werken, en postgis zit in public
          // schema
          || cm.getTypeName()
              .replace("\"", "")
              .equals("public." + jdbcConverter.getGeomTypeName())) {
        // waarde altijd als WKT aanbieden en placeholder per db type
        insertValuePlaceholder = jdbcConverter.createPSGeometryPlaceholder();
      }
      valuesSql.append(insertValuePlaceholder);

      if (it.hasNext()) {
        sql.append(", ");
        valuesSql.append(", ");
      }
    }

    boolean conditionalInsert = row.isIgnoreDuplicates();

    if (!conditionalInsert) {
      sql.append(") values (");
      sql.append(valuesSql);
      sql.append(")");
    } else {
      sql.append(") select ");
      sql.append(valuesSql);
      if (jdbcConverter instanceof OracleJdbcConverter) {
        sql.append(" from dual");
      }
      sql.append(" where not exists (select 1 from ");
      sql.append(tableName);
      sql.append(" where ");

      boolean first = true;
      for (String column : getPrimaryKeys(tableName, jdbcConverter, metaData)) {
        if (first) {
          first = false;
        } else {
          sql.append(" and ");
        }
        sql.append(column);
        sql.append(" = ? ");

        String val = getValueFromTableRow(row, column);
        Object obj = getValueAsObject(tableName, column, val, null, jdbcConverter, metaData);
        params.add(obj);
      }
      sql.append(")");
    }

    PreparedStatement stm =
        checkAndGetPreparedStatement(insertSqlPreparedStatements, sql.toString());
    if (stm == null) {
      SimonManager.getCounter("b3p.rsgb.insertsql.preparestatement").increase();
      stm = connection.prepareStatement(sql.toString());
      checkAndAddStatement(insertSqlPreparedStatements, sql.toString(), stm);
    } else {
      SimonManager.getCounter("b3p.rsgb.insertsql.reusestatement").increase();
    }
    loadLog.append("\nSQL: ");
    loadLog.append(sql);
    loadLog.append(", params (");
    loadLog.append(params);
    loadLog.append(")");
    log.trace("insert SQL: " + sql + ", params: " + params);

    for (int i = 0; i < params.size(); i++) {
      Object param = params.get(i);
      // voor "null" geom moet hier iets extra gedaan worden voor sommige databases...
      if (null == param
          && (findColumnMetadata(tableColumnMetadata, row.getColumns().get(i))
                  .getTypeName()
                  .equals(jdbcConverter.getGeomTypeName())
              || findColumnMetadata(tableColumnMetadata, row.getColumns().get(i))
                  .getTypeName()
                  .replace("\"", "")
                  .equals("public." + jdbcConverter.getGeomTypeName()))
      // geeft: java.sql.SQLFeatureNotSupportedException: Unsupported feature: checkValidIndex
      // && stm.getParameterMetaData().getParameterTypeName(i +
      // 1).equals(geomToJdbc.getGeomTypeName())
      ) {
        if (jdbcConverter instanceof OracleJdbcConverter) {
          stm.setNull(i + 1, Types.STRUCT, "MDSYS.SDO_GEOMETRY");
        } else {
          stm.setNull(i + 1, stm.getParameterMetaData().getParameterType(i + 1));
        }
      } else {
        stm.setObject(i + 1, param);
      }
    }

    int count = -1;
    try {
      count = stm.executeUpdate();
    } catch (SQLException e) {
      String message = e.getMessage();
      if (jdbcConverter.isDuplicateKeyViolationMessage(message) && row.isIgnoreDuplicates()) {
        if (recentSavepoint != null) {
          log.debug(
              "Ignoring duplicate key violation by rolling back to savepoint with id "
                  + recentSavepoint.getSavepointId());
          connection.rollback(recentSavepoint);
        }
      } else {
        throw e;
      }
    }

    loadLog.append("\nAantal toegevoegde records: ").append(count);
  }

  private void createUpdateSql(
      TableRow row,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection,
      DatabaseMetaData metaData)
      throws SQLException, ParseException {
    // doSavePoint(row);

    StringBuilder sql = new StringBuilder("update ");

    String tableName = row.getTable();

    sql.append(tableName);

    SortedSet<ColumnMetadata> tableColumnMetadata;
    tableColumnMetadata = getTableColumnMetadata(tableName, jdbcConverter, metaData);

    if (tableColumnMetadata == null) {
      throw new IllegalStateException("Kan tabel metadata niet vinden voor tabel " + tableName);
    }

    sql.append(" set ");

    List<Object> params = new ArrayList<>();

    // XXX does set previously set columns to NULL!
    // need statement for all columns from column metadata, not only
    // columns in TableRow
    Iterator<String> valuesIt = row.getValues().iterator();
    for (Iterator<String> it = row.getColumns().iterator(); it.hasNext(); ) {
      String column = it.next();
      String stringValue = valuesIt.next();

      if (row.isAlleenArchiefColumn(column)) {
        continue;
      }

      ColumnMetadata cm = findColumnMetadata(tableColumnMetadata, column);
      Object param = getValueAsObject(tableName, column, stringValue, cm, jdbcConverter, metaData);
      params.add(param);

      sql.append(cm.getName());
      sql.append(" = ");
      String insertValuePlaceholder = "?";
      if (cm.getTypeName().equals(jdbcConverter.getGeomTypeName())
          // in geval we in een ander dan public schema werken, en postgis zit in public
          // schema
          || cm.getTypeName()
              .replace("\"", "")
              .equals("public." + jdbcConverter.getGeomTypeName())) {
        // waarde altijd als WKT aanbieden en placeholder per db type
        insertValuePlaceholder = jdbcConverter.createPSGeometryPlaceholder();
      }
      sql.append(insertValuePlaceholder);

      if (it.hasNext()) {
        sql.append(", ");
      }
    }

    // Where clause using pk columns
    sql.append(" where ");
    boolean first = true;
    for (String column : getPrimaryKeys(tableName, jdbcConverter, metaData)) {
      if (first) {
        first = false;
      } else {
        sql.append(" and ");
      }
      sql.append(column);
      sql.append(" = ? ");

      String val = getValueFromTableRow(row, column);
      Object obj = getValueAsObject(tableName, column, val, null, jdbcConverter, metaData);
      params.add(obj);
    }

    PreparedStatement stm =
        checkAndGetPreparedStatement(updateSqlPreparedStatements, sql.toString());
    if (stm == null) {
      SimonManager.getCounter("b3p.rsgb.updatesql.preparestatement").increase();
      stm = connection.prepareStatement(sql.toString());
      checkAndAddStatement(updateSqlPreparedStatements, sql.toString(), stm);
    } else {
      SimonManager.getCounter("b3p.rsgb.updatesql.reusestatement").increase();
    }
    loadLog.append("\nSQL: ");
    loadLog.append(sql);
    loadLog.append(", params (");
    loadLog.append(params);
    loadLog.append(")");
    log.debug("update SQL: " + sql + ", params: " + params);

    for (int i = 0; i < params.size(); i++) {
      Object param = params.get(i);
      // voor "null" geom moet hier iets extra gedaan worden voor sommige databases...
      if (null == param
          && (stm.getParameterMetaData()
                  .getParameterTypeName(i + 1)
                  .equals(jdbcConverter.getGeomTypeName())
              || stm.getParameterMetaData()
                  .getParameterTypeName(i + 1)
                  .replace("\"", "")
                  .equals("public." + jdbcConverter.getGeomTypeName()))) {
        log.trace(
            "gebruik `setNull` voor kolom: "
                + i
                + " "
                + row.getColumns().get(i)
                + ", waarde "
                + param
                + "("
                + stm.getParameterMetaData().getParameterTypeName(i + 1)
                + ")");
        if (jdbcConverter instanceof OracleJdbcConverter) {
          stm.setNull(i + 1, Types.STRUCT, "MDSYS.SDO_GEOMETRY");
        } else {
          stm.setNull(i + 1, stm.getParameterMetaData().getParameterType(i + 1));
        }
      } else {
        stm.setObject(i + 1, param, stm.getParameterMetaData().getParameterType(i + 1));
      }
    }

    int count = -1;
    try {
      count = stm.executeUpdate();
    } finally {
      loadLog.append("\nAantal record updates: ").append(count);
    }
  }

  /* Columns id, tabel, kolom, waarde, herkomst_br, datum (toestandsdatum) */
  private boolean createInsertMetadataSql(
      String tabel,
      String kolom,
      String waarde,
      String datum,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection)
      throws SQLException {

    if (insertMetadataStatement == null || insertMetadataStatement.isClosed()) {
      String sql =
          "insert into herkomst_metadata (tabel, kolom, waarde, herkomst_br, datum) "
              + "select ?, ?, ?, ?, ? "
              + (jdbcConverter instanceof OracleJdbcConverter ? "from dual" : "")
              + " where not exists ("
              + "  select 1 from herkomst_metadata where tabel = ? and kolom = ? and waarde = ? and herkomst_br = ? and datum = ?"
              + ")";

      insertMetadataStatement = connection.prepareStatement(sql);
    } else {
      insertMetadataStatement.clearParameters();
    }

    Date date = null;
    Calendar calendar = javax.xml.bind.DatatypeConverter.parseDateTime(datum);
    if (calendar != null) {
      date = new java.sql.Date(calendar.getTimeInMillis());
    }

    insertMetadataStatement.setString(1, tabel);
    insertMetadataStatement.setString(2, kolom);
    insertMetadataStatement.setString(3, waarde);
    insertMetadataStatement.setString(4, getHerkomstMetadata());
    insertMetadataStatement.setObject(5, date);
    insertMetadataStatement.setString(6, tabel);
    insertMetadataStatement.setString(7, kolom);
    insertMetadataStatement.setString(8, waarde);
    insertMetadataStatement.setString(9, getHerkomstMetadata());
    insertMetadataStatement.setObject(10, date);

    loadLog.append(
        String.format(
            "\nConditioneel toevoegen herkomst metadata tabel=%s, kolom=%s, waarde=%s, herkomst_br=%s, datum=%s",
            tabel, kolom, waarde, getHerkomstMetadata(), datum));

    int count = insertMetadataStatement.executeUpdate();
    loadLog.append(": record geinsert: ").append(count > 0 ? "ja" : "nee (rij bestond al)");
    return count > 0;
  }

  private boolean rowExistsInDb(
      TableRow row,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection conn,
      DatabaseMetaData metaData)
      throws SQLException, ParseException {

    String tableName = row.getTable();
    List<String> pkColumns = getPrimaryKeys(tableName, jdbcConverter, metaData);

    List<Object> params = new ArrayList<>();
    for (String column : pkColumns) {
      String val = getValueFromTableRow(row, column);
      Object obj = getValueAsObject(row.getTable(), column, val, null, jdbcConverter, metaData);
      params.add(obj);
    }

    loadLog.append("\nControleer ").append(tableName).append(" op primary key: ").append(params);

    PreparedStatement stm = checkAndGetPreparedStatement(checkRowExistsStatements, tableName);

    if (stm == null) {
      StringBuilder sql = new StringBuilder("select 1 from ").append(tableName);

      boolean first = true;
      for (String column : pkColumns) {
        if (first) {
          sql.append(" where ").append(column).append(" = ?");
          first = false;
        } else {
          sql.append(" and ").append(column).append(" = ?");
        }
      }

      stm =
          conn.prepareStatement(
              sql.toString(), ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
      checkAndAddStatement(checkRowExistsStatements, tableName, stm);
    }

    int i = 1;
    for (Object p : params) {
      stm.setObject(i++, p);
    }

    ResultSet rs = null;
    boolean exists = false;
    try {
      rs = stm.executeQuery();
      exists = rs.next();
    } finally {
      if (rs != null) {
        rs.close();
      }
    }
    loadLog.append(", rij bestaat: ").append(exists ? "ja" : "nee");

    return exists;
  }

  private boolean isAlreadyInMetadata(
      TableRow row,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection,
      DatabaseMetaData metaData)
      throws SQLException, BrmoException {

    if (metaData == this.dbMetadataBrk) {
      // geen metadata voor brk 2
      return false;
    }

    if (connection == null || connection.isClosed()) {
      // if used not-threaded, init() required
      throw new BrmoException("No connection found");
    }

    List<String> params = new ArrayList<>();
    String herkomstTableName = "herkomst_metadata";
    String tableName = row.getTable();
    params.add(tableName);
    List<String> pkColumns = getPrimaryKeys(tableName, jdbcConverter, metaData);
    String column = null;
    if (pkColumns.size() == 1) {
      column = pkColumns.get(0);
    }
    params.add(column);
    String waarde = getValueFromTableRow(row, column);
    params.add(waarde);

    loadLog
        .append("\nControleer ")
        .append(herkomstTableName)
        .append(" op primary key: ")
        .append(params);

    PreparedStatement stm =
        checkAndGetPreparedStatement(checkRowExistsStatements, herkomstTableName);
    if (stm == null) {
      String sql =
          "select 1 from " + herkomstTableName + " WHERE tabel = ? AND kolom = ? AND kolom = ?";

      stm =
          connection.prepareStatement(
              sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
      checkAndAddStatement(checkRowExistsStatements, herkomstTableName, stm);
    }

    int i = 1;
    for (Object p : params) {
      stm.setObject(i++, p);
    }

    ResultSet rs = null;
    boolean exists = false;
    try {
      rs = stm.executeQuery();
      exists = rs.next();
    } finally {
      if (rs != null) {
        rs.close();
      }
    }
    loadLog.append(", rij bestaat: ").append(exists ? "ja" : "nee");

    return exists;
  }

  private boolean createDeleteSql(
      TableRow row,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection,
      DatabaseMetaData metaData)
      throws SQLException, ParseException {
    doSavePoint(row, jdbcConverter, connection);

    String tableName = row.getTable();
    // Where clause using pk columns
    List<String> pkColumns = getPrimaryKeys(tableName, jdbcConverter, metaData);

    List<Object> params = new ArrayList<>();
    for (String column : pkColumns) {
      String val = getValueFromTableRow(row, column);
      Object obj = getValueAsObject(row.getTable(), column, val, null, jdbcConverter, metaData);
      params.add(obj);
    }
    loadLog
        .append("\nVerwijderen rij uit ")
        .append(tableName)
        .append(" met primary key: ")
        .append(params);

    PreparedStatement stm = checkAndGetPreparedStatement(createDeleteSqlStatements, tableName);

    if (stm == null) {
      StringBuilder sql = new StringBuilder("delete from ").append(tableName);

      boolean first = true;
      for (String column : pkColumns) {
        if (first) {
          sql.append(" where ").append(column).append(" = ?");
          first = false;
        } else {
          sql.append(" and ").append(column).append(" = ?");
        }
      }

      stm =
          connection.prepareStatement(
              sql.toString(), ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
      checkAndAddStatement(createDeleteSqlStatements, tableName, stm);
    }

    int i = 1;
    for (Object p : params) {
      stm.setObject(i++, p);
    }

    int count = 1; // all ok
    try {
      if (alsoExecuteSql) {
        count = stm.executeUpdate();
      }
    } catch (SQLException e) {
      log.error("Error executing delete statement", e);
    }
    loadLog.append(", aantal rijen verwijderd: ").append(count);

    return count > 0;
  }

  private boolean deleteFromMetadata(
      TableRow row,
      StringBuilder loadLog,
      GeometryJdbcConverter jdbcConverter,
      Connection connection,
      DatabaseMetaData metaData)
      throws SQLException {

    List<String> params = new ArrayList<>();
    String herkomstTableName = "herkomst_metadata";
    String tableName = row.getTable();
    params.add(tableName);
    List<String> pkColumns = getPrimaryKeys(tableName, jdbcConverter, metaData);
    String column = null;
    if (pkColumns.size() == 1) {
      column = pkColumns.get(0);
    }
    params.add(column);
    String waarde = getValueFromTableRow(row, column);
    params.add(waarde);

    loadLog
        .append("\nVerwijder rij uit ")
        .append(herkomstTableName)
        .append(" met primary key: ")
        .append(params);

    PreparedStatement stm =
        checkAndGetPreparedStatement(createDeleteSqlStatements, herkomstTableName);
    if (stm == null) {
      String sql =
          "delete from " + herkomstTableName + " WHERE tabel = ? AND kolom = ? AND kolom = ?";

      stm =
          connection.prepareStatement(
              sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
      checkAndAddStatement(createDeleteSqlStatements, herkomstTableName, stm);
    }

    int i = 1;
    for (Object p : params) {
      stm.setObject(i++, p);
    }

    int count = 1; // all ok
    try {
      if (alsoExecuteSql) {
        count = stm.executeUpdate();
      }
    } catch (SQLException e) {
      log.error("Error executing delete statement", e);
    }
    loadLog.append(", aantal rijen verwijderd: ").append(count);

    return count > 0;
  }

  private List<String> getPrimaryKeys(
      String tableName, GeometryJdbcConverter jdbcConverter, DatabaseMetaData metaData)
      throws SQLException {

    List<String> pks = primaryKeyCache.get(tableName);
    if (pks != null) {
      return pks;
    }

    pks = new ArrayList<>();

    String origName = tables.get(tableName);

    if (origName == null) {
      throw new IllegalArgumentException("Tabel bestaat niet: " + tableName);
    }

    ResultSet set = metaData.getPrimaryKeys(null, jdbcConverter.getSchema(), origName);
    while (set.next()) {
      String column = set.getString("COLUMN_NAME");
      pks.add(column.toLowerCase());
    }

    set.close();
    primaryKeyCache.put(tableName, pks);

    return pks;
  }

  private SortedSet<ColumnMetadata> getTableColumnMetadata(
      String table, GeometryJdbcConverter jdbcConverter, DatabaseMetaData metaData)
      throws SQLException {

    table = table.toLowerCase();

    if (!tables.containsKey(table)) {
      return null;
    }

    SortedSet<ColumnMetadata> columnMetadata = tableColumns.get(table);
    if (columnMetadata == null) {
      columnMetadata = new TreeSet<>();
      tableColumns.put(table, columnMetadata);

      ResultSet columnsRs =
          metaData.getColumns(null, jdbcConverter.getSchema(), tables.get(table), "%");
      while (columnsRs.next()) {
        ColumnMetadata cm = new ColumnMetadata(columnsRs);
        columnMetadata.add(cm);
      }

      columnsRs.close();
    }

    return columnMetadata;
  }

  private ColumnMetadata findColumnMetadata(
      SortedSet<ColumnMetadata> tableColumnMetadata, String columnName) {

    if (tableColumnMetadata == null || tableColumnMetadata.size() < 1) {
      return null;
    }

    for (ColumnMetadata cm : tableColumnMetadata) {
      if (cm.getName().equalsIgnoreCase(columnName)) {
        return cm;
      }
    }

    return null;
  }

  public String getHerkomstMetadata() {
    return herkomstMetadata;
  }

  public void setHerkomstMetadata(String herkomstMetadata) {
    this.herkomstMetadata = herkomstMetadata;
  }

  private void doSavePoint(TableRow row, GeometryJdbcConverter jdbcConverter, Connection connection)
      throws SQLException {
    if (!alsoExecuteSql) {
      return;
    }
    if (jdbcConverter.useSavepoints()) {
      if (row.isIgnoreDuplicates()) {
        if (recentSavepoint == null) {
          recentSavepoint = connection.setSavepoint();
          log.trace("Created savepoint with id: " + recentSavepoint.getSavepointId());
        } else {
          log.trace(
              "No need for new savepoint, previous insert caused rollback to recent savepoint with id "
                  + recentSavepoint.getSavepointId());
        }
      } else if (recentSavepoint != null) {
        log.trace(
            "About to insert non-recoverable row, discarding savepoint with id "
                + recentSavepoint.getSavepointId());
        connection.releaseSavepoint(recentSavepoint);
        recentSavepoint = null;
      }
    }
  }

  private Object getValueAsObject(
      String tableName,
      String column,
      String value,
      ColumnMetadata cm,
      GeometryJdbcConverter jdbcConverter,
      DatabaseMetaData metaData)
      throws SQLException, ParseException {
    if (cm == null) {
      SortedSet<ColumnMetadata> tableColumnMetadata;
      tableColumnMetadata = getTableColumnMetadata(tableName, jdbcConverter, metaData);
      cm = findColumnMetadata(tableColumnMetadata, column);
    }

    Object param = null;
    if (cm == null) {
      throw new IllegalArgumentException("Column not found: " + column + " in table " + tableName);
    } else if (cm.getTypeName().equals(jdbcConverter.getGeomTypeName())
        // in geval we in een ander dan public schema werken, en postgis zit in public
        // schema
        || cm.getTypeName().replace("\"", "").equals("public." + jdbcConverter.getGeomTypeName())) {
      param = jdbcConverter.convertToNativeGeometryObject(value);
    } else if (value != null) {
      param = GeometryJdbcConverter.convertToSQLObject(value, cm, tableName, column);
    }

    return param;
  }

  public enum BerichtSelectMode {
    BY_STATUS,
    BY_IDS,
    BY_LAADPROCES,
    FOR_UPDATE,
    RETRY_WAITING
  }
}