StagingProxy.java
package nl.b3p.brmo.loader;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import javax.sql.DataSource;
import javax.xml.transform.TransformerException;
import nl.b3p.brmo.loader.entity.Bericht;
import nl.b3p.brmo.loader.entity.BerichtenSorter;
import nl.b3p.brmo.loader.entity.Brk2Bericht;
import nl.b3p.brmo.loader.entity.BrkBericht;
import nl.b3p.brmo.loader.entity.LaadProces;
import nl.b3p.brmo.loader.pipeline.BerichtTypeOfWork;
import nl.b3p.brmo.loader.pipeline.BerichtWorkUnit;
import nl.b3p.brmo.loader.pipeline.ProcessDbXmlPipeline;
import nl.b3p.brmo.loader.util.BrmoDuplicaatLaadprocesException;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.loader.util.BrmoLeegBestandException;
import nl.b3p.brmo.loader.util.RsgbTransformer;
import nl.b3p.brmo.loader.util.StagingRowHandler;
import nl.b3p.brmo.loader.util.TableData;
import nl.b3p.brmo.loader.xml.BRPXMLReader;
import nl.b3p.brmo.loader.xml.Brk2SnapshotXMLReader;
import nl.b3p.brmo.loader.xml.BrkSnapshotXMLReader;
import nl.b3p.brmo.loader.xml.BrmoXMLReader;
import nl.b3p.brmo.loader.xml.GbavXMLReader;
import nl.b3p.brmo.loader.xml.NhrXMLReader;
import nl.b3p.brmo.loader.xml.TopNLFileReader;
import nl.b3p.brmo.loader.xml.WozXMLReader;
import nl.b3p.jdbc.util.converter.GeometryJdbcConverter;
import nl.b3p.jdbc.util.converter.GeometryJdbcConverterFactory;
import nl.b3p.jdbc.util.converter.OracleJdbcConverter;
import nl.b3p.jdbc.util.converter.PostgisJdbcConverter;
import nl.b3p.jdbc.util.dbutils.LongColumnListHandler;
import nl.b3p.topnl.TopNLType;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.dbutils.RowProcessor;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.apache.commons.dbutils.handlers.ScalarHandler;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.javasimon.SimonManager;
import org.javasimon.Split;
import org.xml.sax.SAXException;
/**
* @author Boy de Wit
*/
public class StagingProxy {
private static final Log log = LogFactory.getLog(StagingProxy.class);
private Connection connStaging;
private DataSource dataSourceStaging;
private GeometryJdbcConverter geomToJdbc;
private Integer batchCapacity = 150;
private Integer limitStandBerichtenToTransform = -1;
public StagingProxy(DataSource dataSourceStaging) throws SQLException, BrmoException {
this.dataSourceStaging = dataSourceStaging;
this.connStaging = getConnection();
geomToJdbc = GeometryJdbcConverterFactory.getGeometryJdbcConverter(connStaging);
}
public void closeStagingProxy() {
if (getOldBerichtStatement != null) {
DbUtils.closeQuietly(getOldBerichtStatement);
}
DbUtils.closeQuietly(connStaging);
}
private Connection getConnection() throws SQLException {
if (connStaging == null || connStaging.isClosed()) {
connStaging = dataSourceStaging.getConnection();
}
return connStaging;
}
public LaadProces getLaadProcesById(Long id) throws SQLException {
List<LaadProces> processen;
ResultSetHandler<List<LaadProces>> h =
new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
processen =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?",
h,
id);
if (processen != null && processen.size() == 1) {
return processen.get(0);
}
return null;
}
public Bericht getBerichtById(Long id) throws SQLException {
List<Bericht> berichten;
ResultSetHandler<List<Bericht>> h =
new BeanListHandler<>(Bericht.class, new StagingRowHandler());
berichten =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?",
h,
id);
if (berichten != null && berichten.size() == 1) {
return berichten.get(0);
}
return null;
}
public List<Bericht> getBerichtByLaadProces(LaadProces lp) throws SQLException {
List<Bericht> berichten;
ResultSetHandler<List<Bericht>> h =
new BeanListHandler<>(Bericht.class, new StagingRowHandler());
berichten =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
h,
lp.getId());
return berichten;
}
public LaadProces getLaadProcesByFileName(String name) throws SQLException {
List<LaadProces> processen;
ResultSetHandler<List<LaadProces>> h =
new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
processen =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE bestand_naam = ?",
h,
name);
if (processen != null && processen.size() == 1) {
return processen.get(0);
}
return null;
}
public LaadProces getLaadProcesByRestoredFilename(String name) throws SQLException {
List<LaadProces> processen;
ResultSetHandler<List<LaadProces>> h =
new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
processen =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"SELECT * FROM "
+ BrmoFramework.LAADPROCES_TABEL
+ " WHERE bestand_naam_hersteld = ?",
h,
name);
if (processen != null && processen.size() == 1) {
return processen.get(0);
}
return null;
}
/**
* bepaal of bericht bestaat aan de hand van laadprocesid, object_ref, datum en volgordenummer.
*
* @param b het bestaande bericht
* @return het bestaande bericht uit de database
* @throws SQLException if any
*/
public Bericht getExistingBericht(Bericht b) throws SQLException {
return getBerichtByNaturalKey(b.getObjectRef(), b.getDatum().getTime(), b.getVolgordeNummer());
}
private Bericht getBerichtByNaturalKey(String object_ref, long date, Integer volgnr)
throws SQLException {
List<Bericht> processen;
ResultSetHandler<List<Bericht>> h =
new BeanListHandler<>(Bericht.class, new StagingRowHandler());
processen =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"SELECT * FROM "
+ BrmoFramework.BERICHT_TABLE
+ " WHERE object_ref = ?"
+ " AND datum = ? and volgordenummer = ?",
h,
object_ref,
new Timestamp(date),
volgnr);
if (processen != null && processen.size() > 0) {
return processen.get(0);
}
return null;
}
public long getCountJob() throws SQLException {
Object o =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"select count(*) from " + BrmoFramework.JOB_TABLE,
new ScalarHandler<>());
if (o instanceof BigDecimal) {
return ((BigDecimal) o).longValue();
} else if (o instanceof Integer) {
return ((Integer) o).longValue();
}
return (Long) o;
}
public boolean removeJob() throws SQLException {
int count =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(getConnection(), "truncate table " + BrmoFramework.JOB_TABLE);
return count > 0;
}
public boolean cleanJob() throws SQLException {
int count;
if (geomToJdbc instanceof OracleJdbcConverter) {
count =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"delete from "
+ BrmoFramework.JOB_TABLE
+ " j where j.id in (select b.id from "
+ BrmoFramework.BERICHT_TABLE
+ " b "
// vanwege rare fout in oracle bij ophalen tabel
// metadata werkt dit niet
// https://issues.apache.org/jira/browse/DBUTILS-125
+ " where b.id = j.id and status != 'STAGING_OK')");
} else {
count =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"delete from "
+ BrmoFramework.JOB_TABLE
+ " j where j.id in (select b.id from "
+ BrmoFramework.BERICHT_TABLE
+ " b "
+ " where b.id = j.id and status != ?)",
Bericht.STATUS.STAGING_OK.toString());
}
return count > 0;
}
public long setBerichtenJobByStatus(Bericht.STATUS status, boolean orderBerichten)
throws SQLException {
StringBuilder q =
new StringBuilder(
"insert into "
+ BrmoFramework.JOB_TABLE
+ " (id, datum, volgordenummer, object_ref, br_xml, soort) "
+ " select id, datum, volgordenummer, object_ref, br_xml, soort from "
+ BrmoFramework.BERICHT_TABLE
+ " where status = ? and datum <= ? ");
if (orderBerichten) {
q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
} else {
q.append(" and volgordenummer < 0 ");
if (this.limitStandBerichtenToTransform > 0) {
q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
}
}
log.debug("pre-transformatie SQL: " + q);
return new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
q.toString(),
status.toString(),
new java.sql.Timestamp((new Date()).getTime()));
}
public long setBerichtenJobForUpdate(String soort, boolean orderBerichten) throws SQLException {
StringBuilder q =
new StringBuilder(
"insert into "
+ BrmoFramework.JOB_TABLE
+ " (id, datum, volgordenummer, object_ref, br_xml, soort) "
+ " select id, datum, volgordenummer, object_ref, br_xml, soort from "
+ BrmoFramework.BERICHT_TABLE
+ " where status = ? and soort = ? and datum <= ? ");
if (orderBerichten) {
q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
} else {
q.append(" and volgordenummer < 0 ");
if (this.limitStandBerichtenToTransform > 0) {
q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
}
}
log.debug("pre-transformatie SQL: " + q);
return new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
q.toString(),
Bericht.STATUS.RSGB_OK.toString(),
soort,
new java.sql.Timestamp((new Date()).getTime()));
}
public long setBerichtenJobByIds(long[] ids, boolean orderBerichten) throws SQLException {
StringBuilder q =
new StringBuilder(
"insert into "
+ BrmoFramework.JOB_TABLE
+ " (id, datum, volgordenummer, object_ref, br_xml, soort) "
+ " select id, datum, volgordenummer, object_ref, br_xml, soort from "
+ BrmoFramework.BERICHT_TABLE
+ " where id in (");
for (int i = 0; i < ids.length; i++) {
if (i != 0) {
q.append(",");
}
q.append(ids[i]);
}
q.append(") and datum <= ? ");
if (orderBerichten) {
q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
} else {
q.append(" and volgordenummer < 0 ");
if (this.limitStandBerichtenToTransform > 0) {
q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
}
}
log.debug("pre-transformatie SQL: " + q);
return new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(getConnection(), q.toString(), new java.sql.Timestamp((new Date()).getTime()));
}
public long setBerichtenJobByLaadprocessen(long[] laadprocesIds, boolean orderBerichten)
throws SQLException {
StringBuilder q =
new StringBuilder(
"insert into "
+ BrmoFramework.JOB_TABLE
+ " (id, datum, volgordenummer, object_ref, br_xml, soort) "
+ " select id, datum, volgordenummer, object_ref, br_xml, soort from "
+ BrmoFramework.BERICHT_TABLE
+ " where laadprocesid in (");
for (int i = 0; i < laadprocesIds.length; i++) {
if (i != 0) {
q.append(",");
}
q.append(laadprocesIds[i]);
}
q.append(") and status = ? and datum <= ? ");
if (orderBerichten) {
q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
} else {
q.append(" and volgordenummer < 0 ");
if (this.limitStandBerichtenToTransform > 0) {
q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
}
}
log.debug("pre-transformatie SQL: " + q);
return new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
q.toString(),
Bericht.STATUS.STAGING_OK.toString(),
new java.sql.Timestamp((new Date()).getTime()));
}
public void handleBerichtenByJob(
long total,
final BerichtenHandler handler,
final boolean enablePipeline,
int transformPipelineCapacity,
boolean orderBerichten)
throws Exception {
Split split = SimonManager.getStopwatch("b3p.rsgb.job").start();
final String dateTime = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
Split jobSplit = SimonManager.getStopwatch("b3p.rsgb.job." + dateTime).start();
((RsgbProxy) handler).setSimonNamePrefix("b3p.rsgb.job." + dateTime + ".");
final RowProcessor processor = new StagingRowHandler();
final ProcessDbXmlPipeline processDbXmlPipeline =
new ProcessDbXmlPipeline(
handler, transformPipelineCapacity, "b3p.rsgb.job." + dateTime + ".pipeline");
if (enablePipeline) {
processDbXmlPipeline.start();
}
long offset = 0L;
int batch = batchCapacity;
final MutableInt processed = new MutableInt(0);
final MutableInt lastJid = new MutableInt(0);
final boolean doOrderBerichten = orderBerichten;
boolean abort = false;
try {
do {
log.debug(
String.format(
"Ophalen berichten batch Job tabel, offset %d, limit %d",
lastJid.intValue(), batch));
String sql =
"select jid, id, datum, volgordenummer, object_ref, br_xml, soort from "
+ BrmoFramework.JOB_TABLE
+ " where jid > "
+ lastJid.intValue()
+ " order by jid ";
sql = geomToJdbc.buildPaginationSql(sql, 0, batch);
log.debug("SQL voor ophalen berichten batch: " + sql);
processed.setValue(0);
final Split getBerichten =
SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".staging.berichten.getbatch")
.start();
Exception e =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
sql,
rs -> {
getBerichten.stop();
final Split processResultSet =
SimonManager.getStopwatch(
"b3p.rsgb.job."
+ dateTime
+ ".staging.berichten.processresultset")
.start();
while (rs.next()) {
try {
Bericht bericht = processor.toBean(rs, Bericht.class);
if (bericht.getVolgordeNummer() >= 0 && !doOrderBerichten) {
// mutaties hebben volgnr >=0 dan sortering
// verplicht
throw new Exception(
String.format(
"Het sorteren van berichten staat uit, terwijl bericht (id: %d) geen standbericht is (volgnummer >=0)",
bericht.getId()));
}
final BerichtWorkUnit workUnit = new BerichtWorkUnit(bericht);
if (enablePipeline) {
List<TableData> tableData = handler.transformToTableData(bericht);
if (tableData == null) {
// Exception during transform
// updateProcessingResult() is
// aangeroepen, geen
// updateResultPipeline
continue;
}
workUnit.setTableData(tableData);
workUnit.setTypeOfWork(BerichtTypeOfWork.PROCESS_DBXML);
Split pipelinePut =
SimonManager.getStopwatch(
"b3p.rsgb.job." + dateTime + ".pipeline.processdbxml.put")
.start();
processDbXmlPipeline.getQueue().put(workUnit);
pipelinePut.stop();
} else {
Split berichtSplit =
SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".bericht")
.start();
handler.handle(bericht, null, true);
berichtSplit.stop();
}
lastJid.setValue(rs.getInt("jid"));
} catch (Exception e1) {
return e1;
}
processed.increment();
}
processResultSet.stop();
return null;
});
offset += processed.intValue();
// If handler threw exception processing row, rethrow it
if (e != null) {
throw e;
}
} while (processed.intValue() > 0 && (offset < total));
if (offset < total) {
log.warn(String.format("Minder berichten verwerkt (%d) dan verwacht (%d)!", offset, total));
}
// als succesvol beeindigd, dan verwijderen, anders herstart mogelijk maken
removeJob();
} catch (Exception t) {
abort = true;
throw t;
} finally {
if (enablePipeline) {
if (abort) {
// Let threads stop by themselves, don't join
processDbXmlPipeline.setAbortFlag();
} else {
processDbXmlPipeline.stopWhenQueueEmpty();
try {
processDbXmlPipeline.join();
} catch (InterruptedException e) {
}
}
}
}
jobSplit.stop();
split.stop();
}
public void updateBerichtenDbXml(List<Bericht> berichten, RsgbTransformer transformer)
throws SAXException, IOException, TransformerException {
for (Bericht ber : berichten) {
Split split = SimonManager.getStopwatch("b3p.staging.bericht.dbxml.transform").start();
String dbxml = transformer.transformToDbXml(ber);
// HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
// https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
if (geomToJdbc instanceof OracleJdbcConverter && dbxml != null && dbxml.length() == 8000) {
log.debug("DB XML is 8000 bytes, er wordt een spatie aangeplakt.");
dbxml += " ";
log.debug("DB XML is nu " + dbxml.length() + " bytes.");
}
ber.setDbXml(dbxml);
split.stop();
}
}
private PreparedStatement getOldBerichtStatement;
private PreparedStatement getOldBerichtStatementById;
public Bericht getOldBericht(Bericht nieuwBericht, StringBuilder loadLog) throws SQLException {
return getOldBericht(nieuwBericht.getObjectRef(), loadLog);
}
public Bericht getOldBericht(String objectRef, StringBuilder loadLog) throws SQLException {
Split split = SimonManager.getStopwatch("b3p.staging.bericht.getold").start();
Bericht bericht = null;
ResultSetHandler<List<Bericht>> h =
new BeanListHandler<>(Bericht.class, new StagingRowHandler());
if (getOldBerichtStatement == null) {
String sql =
"SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
+ BrmoFramework.BERICHT_TABLE
+ " WHERE"
+ " object_ref = ?"
+ " AND status in ('RSGB_OK', 'ARCHIVE')"
+ " ORDER BY datum desc, volgordenummer desc";
sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
getOldBerichtStatement = getConnection().prepareStatement(sql);
} else {
getOldBerichtStatement.clearParameters();
}
getOldBerichtStatement.setString(1, objectRef);
ResultSet rs = getOldBerichtStatement.executeQuery();
List<Bericht> list = h.handle(rs);
rs.close();
if (!list.isEmpty()) {
loadLog.append("Vorig bericht gevonden:\n");
for (Bericht b : list) {
if ((Bericht.STATUS.RSGB_OK.equals(b.getStatus())
|| Bericht.STATUS.ARCHIVE.equals(b.getStatus()))
&& bericht == null) {
loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
bericht = b;
} else {
loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
}
}
}
if (bericht != null) {
// bericht nu wel vullen met alle kolommen
if (getOldBerichtStatementById == null) {
String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
getOldBerichtStatementById = getConnection().prepareStatement(sql);
} else {
getOldBerichtStatementById.clearParameters();
}
getOldBerichtStatementById.setLong(1, bericht.getId());
ResultSet rs2 = getOldBerichtStatementById.executeQuery();
List<Bericht> list2 = h.handle(rs2);
rs2.close();
if (!list2.isEmpty()) {
bericht = list2.get(0);
}
}
split.stop();
return bericht;
}
private PreparedStatement getPreviousBerichtStatement;
public Bericht getPreviousBericht(Bericht ber, StringBuilder loadLog) throws SQLException {
return getPreviousBericht(ber.getObjectRef(), ber.getDatum(), ber.getId(), loadLog);
}
/** Gets the previous bericht (not the first). */
public Bericht getPreviousBericht(
String objectRef, Date datum, Long currentBerichtId, StringBuilder loadLog)
throws SQLException {
Split split = SimonManager.getStopwatch("b3p.staging.bericht.getprevious").start();
Bericht bericht = null;
ResultSetHandler<List<Bericht>> h =
new BeanListHandler<>(Bericht.class, new StagingRowHandler());
if (getPreviousBerichtStatement == null) {
String sql =
"SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
+ BrmoFramework.BERICHT_TABLE
+ " WHERE"
+ " object_ref = ? and datum <= ? and id <> ?"
+ " ORDER BY datum asc, volgordenummer desc";
sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
getPreviousBerichtStatement = getConnection().prepareStatement(sql);
} else {
getPreviousBerichtStatement.clearParameters();
}
getPreviousBerichtStatement.setString(1, objectRef);
getPreviousBerichtStatement.setTimestamp(2, new java.sql.Timestamp(datum.getTime()));
getPreviousBerichtStatement.setLong(3, currentBerichtId);
ResultSet rs = getPreviousBerichtStatement.executeQuery();
List<Bericht> list = h.handle(rs);
rs.close();
if (!list.isEmpty()) {
loadLog.append("Vorig bericht gevonden:\n");
for (Bericht b : list) {
if (bericht == null) {
loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
bericht = b;
} else {
loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
}
}
}
if (bericht != null) {
// bericht nu wel vullen met alle kolommen
if (getOldBerichtStatementById == null) {
String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
getOldBerichtStatementById = getConnection().prepareStatement(sql);
} else {
getOldBerichtStatementById.clearParameters();
}
getOldBerichtStatementById.setLong(1, bericht.getId());
ResultSet rs2 = getOldBerichtStatementById.executeQuery();
List<Bericht> list2 = h.handle(rs2);
rs2.close();
if (!list2.isEmpty()) {
bericht = list2.get(0);
}
}
split.stop();
return bericht;
}
/**
* Update (overschrijft) een bericht in job tabel. (object_ref, datum, volgordenummer, soort,
* opmerking, status, status_datum, br_xml, br_orgineel_xml, db_xml, xsl_version)
*
* @param b bij te werken bericht
* @throws SQLException if any
*/
public void updateBericht(Bericht b) throws SQLException {
Split split = SimonManager.getStopwatch("b3p.staging.bericht.update").start();
String brXML = b.getBrXml();
// HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
// https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
brXML += " ";
log.debug("BR XML is nu " + brXML.length() + " bytes.");
}
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"UPDATE "
+ BrmoFramework.BERICHT_TABLE
+ " set "
+ "object_ref = ?, "
+ "datum = ?, "
+ "volgordenummer = ?, "
+ "soort = ?, "
+ "opmerking = ?, "
+ "status = ?, "
+ "status_datum = ?, "
+ "br_xml = ?, "
+ "br_orgineel_xml = ?, "
+ "db_xml = ?, "
+ "xsl_version = ? "
+ "WHERE id = ?",
b.getObjectRef(),
new Timestamp(b.getDatum().getTime()),
b.getVolgordeNummer(),
b.getSoort(),
b.getOpmerking(),
b.getStatus().toString(),
new Timestamp(b.getStatusDatum().getTime()),
brXML,
b.getBrOrgineelXml(),
b.getDbXml(),
b.getXslVersion(),
b.getId());
split.stop();
}
/**
* Update een bericht in job tabel met processing status. (opmerking, status, status_datum,
* db_xml)
*
* @param b updatebericht
* @throws SQLException als de update mislukt
*/
public void updateBerichtProcessing(Bericht b) throws SQLException {
Split split = SimonManager.getStopwatch("b3p.staging.bericht.updateprocessing").start();
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"UPDATE "
+ BrmoFramework.BERICHT_TABLE
+ " set "
+ "opmerking = ?, "
+ "status = ?, "
+ "status_datum = ?, "
+ "db_xml = ? "
+ " WHERE id = ?",
b.getOpmerking(),
b.getStatus().toString(),
new Timestamp(b.getStatusDatum().getTime()),
b.getDbXml(),
b.getId());
split.stop();
}
public void deleteByLaadProcesId(Long id) throws SQLException {
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"DELETE FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
id);
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?", id);
}
public List<LaadProces> getLaadProcessen() throws SQLException {
List<LaadProces> list;
ResultSetHandler<List<LaadProces>> h =
new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
list =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(getConnection(), "select * from " + BrmoFramework.LAADPROCES_TABEL, h);
return list;
}
public List<Bericht> getBerichten() throws SQLException {
List<Bericht> berichten;
ResultSetHandler<List<Bericht>> h =
new BeanListHandler<>(Bericht.class, new StagingRowHandler());
berichten =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(getConnection(), "select * from " + BrmoFramework.BERICHT_TABLE, h);
return berichten;
}
/**
* Laadt het bestand uit de stream in de database.
*
* @param stream input
* @param type type registratie, bijv. {@value BrmoFramework#BR_BRK}
* @param fileName naam van het bestand (ter identificatie)
* @param d bestandsdatum
* @param listener progress listener
* @throws Exception if any
* @deprecated gebruik de variant die een automatischProcesId als argument heeft
* @see #loadBr(InputStream, String, String, Date, ProgressUpdateListener, Long)
*/
@Deprecated
public void loadBr(
InputStream stream, String type, String fileName, Date d, ProgressUpdateListener listener)
throws Exception {
this.loadBr(stream, type, fileName, d, listener, null);
}
/**
* Laadt het bestand uit de stream in de database.
*
* @param stream input
* @param type type registratie, bijv. {@value BrmoFramework#BR_BRK}
* @param fileName naam van het bestand (ter identificatie)
* @param d bestandsdatum
* @param automatischProces id van het automatisch proces
* @param listener progress listener
* @throws Exception if any
*/
public void loadBr(
InputStream stream,
String type,
String fileName,
Date d,
ProgressUpdateListener listener,
Long automatischProces)
throws Exception {
CountingInputStream cis = new CountingInputStream(stream);
BrmoXMLReader brmoXMLReader;
if (type.equals(BrmoFramework.BR_BRK)) {
brmoXMLReader = new BrkSnapshotXMLReader(cis);
} else if (type.equals(BrmoFramework.BR_BRK2)) {
brmoXMLReader = new Brk2SnapshotXMLReader(cis);
} else if (type.equals(BrmoFramework.BR_NHR)) {
brmoXMLReader = new NhrXMLReader(cis);
} else if (TopNLType.isTopNLType(type)) {
brmoXMLReader = new TopNLFileReader(fileName, type);
} else if (type.equals(BrmoFramework.BR_BRP)) {
brmoXMLReader = new BRPXMLReader(cis, d, this);
} else if (type.equals(BrmoFramework.BR_GBAV)) {
brmoXMLReader = new GbavXMLReader(cis);
} else if (type.equals(BrmoFramework.BR_WOZ)) {
brmoXMLReader = new WozXMLReader(cis, d, this);
} else {
throw new UnsupportedOperationException("Ongeldige basisregistratie: " + type);
}
if (brmoXMLReader.getBestandsDatum() == null) {
throw new BrmoException("Header van bestand bevat geen datum, verkeerd formaat?");
}
LaadProces lp = new LaadProces();
lp.setBestandNaam(fileName);
lp.setBestandDatum(brmoXMLReader.getBestandsDatum());
lp.setSoort(type);
lp.setGebied(brmoXMLReader.getGebied());
lp.setStatus(LaadProces.STATUS.STAGING_OK);
lp.setStatusDatum(new Date());
lp.setAutomatischProcesId(automatischProces);
if (laadProcesExists(lp)) {
throw new BrmoDuplicaatLaadprocesException("Laadproces al gerund voor bestand " + fileName);
}
lp = writeLaadProces(lp);
if (TopNLType.isTopNLType(type)) {
// van een TopNL GML bestand maken we alleen een LP, geen bericht,
// de datum halen we van het zip bestand
if (listener != null) {
listener.total(((TopNLFileReader) brmoXMLReader).getFileSize());
listener.progress(((TopNLFileReader) brmoXMLReader).getFileSize());
}
} else {
if (!brmoXMLReader.hasNext()) {
updateLaadProcesStatus(
lp,
LaadProces.STATUS.STAGING_OK,
"Leeg bestand, geen berichten gevonden in " + fileName);
throw new BrmoLeegBestandException("Leeg bestand, geen berichten gevonden in " + fileName);
}
boolean isBerichtGeschreven = false;
int berichten = 0;
int foutBerichten = 0;
String lastErrorMessage = null;
while (brmoXMLReader.hasNext()) {
Bericht b;
try {
b = brmoXMLReader.next();
b.setLaadProcesId(lp.getId());
b.setStatus(Bericht.STATUS.STAGING_OK);
b.setStatusDatum(new Date());
b.setSoort(type);
if (StringUtils.isEmpty(b.getObjectRef())) {
// geen object_ref kunnen vaststellen; dan ook niet transformeren,
// bijvoorbeeld bij WOZ
b.setStatus(Bericht.STATUS.STAGING_NOK);
b.setOpmerking(
"Er kon geen object_ref bepaald worden uit de natuurlijke sleutel van het bericht.");
}
if (b.getDatum() == null) {
throw new BrmoException("Datum bericht is null");
}
log.debug(b);
Bericht existingBericht = getExistingBericht(b);
if (type.equals(BrmoFramework.BR_BRK) && !isBerichtGeschreven) {
// haal alleen voor eerste
BrkBericht brkBericht = (BrkBericht) b;
lp.setBestandNaamHersteld(
brkBericht.getRestoredFileName(lp.getBestandDatum(), b.getVolgordeNummer()));
updateLaadProcesBestandNaamHersteld(lp);
}
// TODO BRK2 bepalen of dit nog nodig is voor BRK2
if (type.equals(BrmoFramework.BR_BRK2) && !isBerichtGeschreven) {
// haal alleen voor eerste
Brk2Bericht brk2Bericht = (Brk2Bericht) b;
lp.setBestandNaamHersteld(
brk2Bericht.getRestoredFileName(lp.getBestandDatum(), b.getVolgordeNummer()));
updateLaadProcesBestandNaamHersteld(lp);
}
if (existingBericht == null) {
writeBericht(b);
isBerichtGeschreven = true;
} else if (existingBericht.getStatus().equals(Bericht.STATUS.STAGING_OK)) {
// als bericht nog niet getransformeerd is, dan overschrijven.
b.setId(existingBericht.getId());
this.updateBericht(b);
}
if (listener != null) {
listener.progress(cis.getByteCount());
}
berichten++;
} catch (Exception e) {
lastErrorMessage =
String.format(
"Laden bericht uit %s mislukt vanwege: %s", fileName, e.getLocalizedMessage());
log.error(lastErrorMessage);
log.trace(lastErrorMessage, e);
if (listener != null) {
listener.exception(e);
}
foutBerichten++;
}
}
if (listener != null) {
listener.total(berichten);
}
if (foutBerichten > 0) {
String opmerking =
"Laden van "
+ foutBerichten
+ " bericht(en) mislukt, laatste melding: "
+ lastErrorMessage
+ ", zie logs voor meer info.";
this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_NOK, opmerking);
} else if (!isBerichtGeschreven) {
String opmerking = "Dit bestand is waarschijnlijk al eerder geladen.";
this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_DUPLICAAT, opmerking);
}
}
}
public void writeBericht(Bericht b) throws SQLException {
String brXML = b.getBrXml();
// HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
// https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
brXML += " ";
log.debug("BR XML is nu " + brXML.length() + " bytes.");
}
Object berId =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.insert(
getConnection(),
"INSERT INTO "
+ BrmoFramework.BERICHT_TABLE
+ " (laadprocesid, "
+ "object_ref, "
+ "datum, "
+ "volgordenummer, "
+ "soort, "
+ "opmerking, "
+ "status, "
+ "status_datum, "
+ "br_xml, "
+ "br_orgineel_xml, "
+ "db_xml, "
+ "xsl_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
new ScalarHandler<>(),
b.getLaadProcesId(),
b.getObjectRef(),
new Timestamp(b.getDatum().getTime()),
b.getVolgordeNummer(),
b.getSoort(),
b.getOpmerking(),
b.getStatus().toString(),
new Timestamp(b.getStatusDatum().getTime()),
brXML,
b.getBrOrgineelXml(),
b.getDbXml(),
b.getXslVersion());
log.trace("opgeslagen bericht heeft (row) id: " + berId);
}
/** Bepaal aan de hand van bestandsnaam en bestandsdatum van het laadproces of dit al bestaat. */
private boolean laadProcesExists(LaadProces lp) throws SQLException {
Object o =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
"select 1 from laadproces where bestand_naam = ? and" + " bestand_datum = ?",
new ScalarHandler<>(),
lp.getBestandNaam(),
new Timestamp(lp.getBestandDatum().getTime()));
return o != null;
}
private LaadProces writeLaadProces(LaadProces lp) throws SQLException {
log.trace("opslaan van laadproces: " + lp);
if (lp == null) {
return null;
}
final String sql =
"INSERT INTO "
+ BrmoFramework.LAADPROCES_TABEL
+ "(bestand_naam, "
+ "bestand_datum, soort, gebied, opmerking, status, "
+ "status_datum, contact_email, automatisch_proces "
// nieuwe kolommen in 2.0.0
+ ", afgifteid, afgiftereferentie, artikelnummer, beschikbaar_tot, bestandsreferentie, "
+ "contractafgiftenummer, contractnummer, klantafgiftenummer, bestand_naam_hersteld"
+ ") VALUES (?,?,?,?,?,?,?,?,?"
+ ",?,?,?,?,?,?,?,?,?"
+ ")";
// Oracle geeft een ROWID terug als "generated key" en niet de PK-kolom "ID" die we willen
// hebben, daarom zelf doen
// zie ook: https://issues.apache.org/jira/browse/DBUTILS-54
QueryRunner queryRunner = new QueryRunner(geomToJdbc.isPmdKnownBroken());
try (
// PG: Caused by: org.postgresql.util.PSQLException: ERROR: column "ID" does not exist
// maar lowercase werkt ook met oracle
PreparedStatement stmt = getConnection().prepareStatement(sql, new String[] {"id"})
// door een bug in de PG driver geen kolom index gebruiken, zie
// https://github.com/pgjdbc/pgjdbc/issues/1661
// PreparedStatement stmt = getConnection().prepareStatement(sql, new int[]{1});
) {
queryRunner.fillStatement(
stmt,
lp.getBestandNaam(),
new Timestamp(lp.getBestandDatum().getTime()),
lp.getSoort(),
lp.getGebied(),
lp.getOpmerking(),
lp.getStatus().toString(),
new Timestamp(lp.getStatusDatum().getTime()),
lp.getContactEmail(),
lp.getAutomatischProcesId(),
// nieuwe kolommen in 2.0.0
lp.getAfgifteid(),
lp.getAfgiftereferentie(),
lp.getArtikelnummer(),
(lp.getBeschikbaar_tot() == null)
? null
: new Timestamp(lp.getBeschikbaar_tot().getTime()),
lp.getBestandsreferentie(),
lp.getContractafgiftenummer(),
lp.getContractnummer(),
lp.getKlantafgiftenummer(),
lp.getBestandNaamHersteld());
stmt.executeUpdate();
ResultSetHandler<Number> rsh = new ScalarHandler<>();
Number lpId = rsh.handle(stmt.getGeneratedKeys());
log.trace("opgeslagen laadproces heeft id: " + lpId);
return this.getLaadProcesById(lpId.longValue());
}
}
public void updateLaadProcesStatus(LaadProces lp, LaadProces.STATUS status, String opmerking)
throws SQLException {
log.trace("update van laadproces status: " + lp);
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"update "
+ BrmoFramework.LAADPROCES_TABEL
+ " set status = ?, opmerking = ?, status_datum = ? where id = ?",
status.toString(),
opmerking,
new Timestamp(new Date().getTime()),
lp.getId());
}
/**
* update laadproces (GDS2 afgifte) metadata.
*
* @param lpId laadproces id
* @param klantafgiftenummer klantafgiftenummer
* @param contractafgiftenummer contractafgiftenummer
* @param artikelnummer artikelnummer
* @param contractnummer contractnummer
* @param afgifteid afgifteid
* @param afgiftereferentie afgiftereferentie
* @param bestandsreferentie bestandsreferentie
* @param beschikbaar_tot beschikbaar_tot
* @throws SQLException if any
*/
public void updateLaadProcesMeta(
Long lpId,
BigInteger klantafgiftenummer,
BigInteger contractafgiftenummer,
String artikelnummer,
String contractnummer,
String afgifteid,
String afgiftereferentie,
String bestandsreferentie,
Date beschikbaar_tot)
throws SQLException {
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"update "
+ BrmoFramework.LAADPROCES_TABEL
+ " set "
+ "afgifteid = ?,"
+ "afgiftereferentie = ?,"
+ "artikelnummer = ?,"
+ "beschikbaar_tot = ?,"
+ "bestandsreferentie = ?,"
+ "contractafgiftenummer = ?,"
+ "contractnummer = ?,"
+ "klantafgiftenummer = ?"
+ "where id = ?",
afgifteid,
afgiftereferentie,
artikelnummer,
new Timestamp(beschikbaar_tot.getTime()),
bestandsreferentie,
contractafgiftenummer,
contractnummer,
klantafgiftenummer,
lpId);
}
public void updateLaadProcesBestandNaamHersteld(LaadProces lp) throws SQLException {
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(
getConnection(),
"update "
+ BrmoFramework.LAADPROCES_TABEL
+ " set bestand_naam_hersteld = ? where id = ?",
lp.getBestandNaamHersteld(),
lp.getId());
}
public void emptyStagingDb() throws SQLException {
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(getConnection(), "DELETE FROM " + BrmoFramework.BERICHT_TABLE);
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.update(getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL);
}
public List<Bericht> getBerichten(
int page,
int start,
int limit,
String sort,
String dir,
String filterSoort,
String filterStatus)
throws SQLException {
List<String> params = new ArrayList<>();
if (sort == null || sort.trim().isEmpty()) {
sort = "id";
}
if (dir == null || dir.trim().isEmpty()) {
sort = "asc";
}
String sql =
"SELECT * FROM "
+ BrmoFramework.BERICHT_TABLE
+ buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
sql = geomToJdbc.buildPaginationSql(sql, start, limit);
return new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
sql,
new BeanListHandler<>(Bericht.class, new StagingRowHandler()),
params.toArray());
}
public long getCountBerichten(String filterSoort, String filterStatus) throws SQLException {
List<String> params = new ArrayList<>();
String filter = buildFilterSql(0, null, null, filterSoort, filterStatus, params);
String sql;
// gebruik estimate voor postgresql indien geen filter
if (StringUtils.isBlank(filter) && geomToJdbc instanceof PostgisJdbcConverter) {
sql =
"SELECT reltuples::BIGINT AS estimate FROM pg_class WHERE relname= '"
+ BrmoFramework.BERICHT_TABLE
+ "'";
} else {
sql = "SELECT count(*) FROM " + BrmoFramework.BERICHT_TABLE + filter;
}
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = getConnection().prepareStatement(sql);
try {
pstmt.setQueryTimeout(300); // seconds to wait
} catch (Exception e) {
log.warn("Driver does not support setQueryTimeout, please update driver.");
}
if (!params.isEmpty()) {
for (int i = 0; i < params.size(); i++) {
if (params.get(i) != null) {
pstmt.setObject(i + 1, params.get(i));
} else {
pstmt.setNull(i + 1, Types.VARCHAR);
}
}
}
rs = pstmt.executeQuery();
if (rs.next()) {
Object o = rs.getObject(1);
if (o instanceof BigDecimal) {
return ((BigDecimal) o).longValue();
} else if (o instanceof Integer) {
return ((Integer) o).longValue();
}
return (Long) o;
} else {
return -1;
}
} catch (Exception e) {
return -1;
} finally {
try {
if (rs != null && !rs.isClosed()) {
rs.close();
}
} catch (SQLException e) {
// ignore
}
try {
if (pstmt != null && !pstmt.isClosed()) {
pstmt.close();
}
} catch (SQLException e) {
// ignore
}
}
}
private String buildFilterSql(
int page,
String sort,
String dir,
String filterSoort,
String filterStatus,
List<String> params) {
StringBuilder builder = new StringBuilder();
List<String> conjunctions = new ArrayList<>();
if (!StringUtils.isBlank(filterSoort)) {
String[] soorten = filterSoort.split(",");
params.addAll(Arrays.asList(soorten));
conjunctions.add("soort in (" + StringUtils.repeat("?", ", ", soorten.length) + ")");
}
if (!StringUtils.isBlank(filterStatus)) {
String[] statussen = filterStatus.split(",");
params.addAll(Arrays.asList(statussen));
conjunctions.add("status in (" + StringUtils.repeat("?", ", ", statussen.length) + ")");
}
// build where part
if (!conjunctions.isEmpty()) {
builder.append(" where ");
builder.append(StringUtils.join(conjunctions.toArray(), " and "));
}
// build order by part
if (sort != null && dir != null) {
builder.append(" ORDER BY ");
builder.append(sort);
builder.append(" ");
builder.append(dir);
}
return builder.toString();
}
public long getCountLaadProces(String filterSoort, String filterStatus) throws SQLException {
List<String> params = new ArrayList<>();
String sql =
"SELECT count(*) FROM "
+ BrmoFramework.LAADPROCES_TABEL
+ buildFilterSql(0, null, null, filterSoort, filterStatus, params);
Object o =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(getConnection(), sql, new ScalarHandler<>(), params.toArray());
if (o instanceof BigDecimal) {
return ((BigDecimal) o).longValue();
} else if (o instanceof Integer) {
return ((Integer) o).longValue();
}
return (Long) o;
}
public List<LaadProces> getLaadprocessen(
int page,
int start,
int limit,
String sort,
String dir,
String filterSoort,
String filterStatus)
throws SQLException {
List<String> params = new ArrayList<>();
if (sort == null || sort.trim().isEmpty()) {
sort = "id";
}
if (dir == null || dir.trim().isEmpty()) {
sort = "asc";
}
String sql =
"SELECT * FROM "
+ BrmoFramework.LAADPROCES_TABEL
+ buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
sql = geomToJdbc.buildPaginationSql(sql, start, limit);
return new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(
getConnection(),
sql,
new BeanListHandler<>(LaadProces.class, new StagingRowHandler()),
params.toArray());
}
public Long[] getLaadProcessenIds(
String sort, String dir, String filterSoort, String filterStatus) throws SQLException {
List<String> params = new ArrayList<>();
if (sort == null || sort.trim().isEmpty()) {
sort = "id";
}
if (dir == null || dir.trim().isEmpty()) {
dir = "asc";
}
String sql =
"SELECT ID FROM "
+ BrmoFramework.LAADPROCES_TABEL
+ buildFilterSql(-1, sort, dir, filterSoort, filterStatus, params);
List<Long> ids =
new QueryRunner(geomToJdbc.isPmdKnownBroken())
.query(getConnection(), sql, new LongColumnListHandler("id"), params.toArray());
return ids.toArray(new Long[ids.size()]);
}
/**
* @param batchCapacity the batchCapacity to set
*/
public void setBatchCapacity(Integer batchCapacity) {
this.batchCapacity = batchCapacity;
}
public void setLimitStandBerichtenToTransform(Integer limitStandBerichtenToTransform) {
this.limitStandBerichtenToTransform = limitStandBerichtenToTransform;
}
}