View Javadoc
1   package nl.b3p.brmo.loader;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.math.BigDecimal;
6   import java.math.BigInteger;
7   import java.sql.*;
8   import java.text.SimpleDateFormat;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Date;
12  import java.util.List;
13  import javax.sql.DataSource;
14  import javax.xml.transform.TransformerException;
15  import nl.b3p.brmo.loader.entity.Bericht;
16  import nl.b3p.brmo.loader.entity.BerichtenSorter;
17  import nl.b3p.brmo.loader.entity.Brk2Bericht;
18  import nl.b3p.brmo.loader.entity.LaadProces;
19  import nl.b3p.brmo.loader.pipeline.BerichtTypeOfWork;
20  import nl.b3p.brmo.loader.pipeline.BerichtWorkUnit;
21  import nl.b3p.brmo.loader.pipeline.ProcessDbXmlPipeline;
22  import nl.b3p.brmo.loader.util.BrmoDuplicaatLaadprocesException;
23  import nl.b3p.brmo.loader.util.BrmoException;
24  import nl.b3p.brmo.loader.util.BrmoLeegBestandException;
25  import nl.b3p.brmo.loader.util.RsgbTransformer;
26  import nl.b3p.brmo.loader.util.StagingRowHandler;
27  import nl.b3p.brmo.loader.util.TableData;
28  import nl.b3p.brmo.loader.xml.BRPXMLReader;
29  import nl.b3p.brmo.loader.xml.Brk2SnapshotXMLReader;
30  import nl.b3p.brmo.loader.xml.BrmoXMLReader;
31  import nl.b3p.brmo.loader.xml.GbavXMLReader;
32  import nl.b3p.brmo.loader.xml.NhrXMLReader;
33  import nl.b3p.brmo.loader.xml.TopNLFileReader;
34  import nl.b3p.brmo.loader.xml.WozXMLReader;
35  import nl.b3p.jdbc.util.converter.GeometryJdbcConverter;
36  import nl.b3p.jdbc.util.converter.GeometryJdbcConverterFactory;
37  import nl.b3p.jdbc.util.converter.OracleJdbcConverter;
38  import nl.b3p.jdbc.util.converter.PostgisJdbcConverter;
39  import nl.b3p.jdbc.util.dbutils.LongColumnListHandler;
40  import nl.b3p.topnl.TopNLType;
41  import org.apache.commons.dbutils.DbUtils;
42  import org.apache.commons.dbutils.QueryRunner;
43  import org.apache.commons.dbutils.ResultSetHandler;
44  import org.apache.commons.dbutils.RowProcessor;
45  import org.apache.commons.dbutils.handlers.BeanListHandler;
46  import org.apache.commons.dbutils.handlers.ScalarHandler;
47  import org.apache.commons.io.input.CountingInputStream;
48  import org.apache.commons.lang3.StringUtils;
49  import org.apache.commons.lang3.mutable.MutableInt;
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  import org.javasimon.SimonManager;
53  import org.javasimon.Split;
54  import org.xml.sax.SAXException;
55  
56  /**
57   * @author Boy de Wit
58   */
59  public class StagingProxy {
60  
61    private static final Log log = LogFactory.getLog(StagingProxy.class);
62  
63    private Connection connStaging;
64    private DataSource dataSourceStaging;
65    private GeometryJdbcConverter geomToJdbc;
66    private Integer batchCapacity = 150;
67    private Integer limitStandBerichtenToTransform = -1;
68  
69    public StagingProxy(DataSource dataSourceStaging) throws SQLException, BrmoException {
70      this.dataSourceStaging = dataSourceStaging;
71      this.connStaging = getConnection();
72      geomToJdbc = GeometryJdbcConverterFactory.getGeometryJdbcConverter(connStaging);
73    }
74  
75    public void closeStagingProxy() {
76      if (getOldBerichtStatement != null) {
77        DbUtils.closeQuietly(getOldBerichtStatement);
78      }
79      DbUtils.closeQuietly(connStaging);
80    }
81  
82    private Connection getConnection() throws SQLException {
83      if (connStaging == null || connStaging.isClosed()) {
84        connStaging = dataSourceStaging.getConnection();
85      }
86      return connStaging;
87    }
88  
89    public LaadProces getLaadProcesById(Long id) throws SQLException {
90      List<LaadProces> processen;
91  
92      ResultSetHandler<List<LaadProces>> h =
93          new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
94  
95      processen =
96          new QueryRunner(geomToJdbc.isPmdKnownBroken())
97              .query(
98                  getConnection(),
99                  "SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?",
100                 h,
101                 id);
102 
103     if (processen != null && processen.size() == 1) {
104       return processen.get(0);
105     }
106 
107     return null;
108   }
109 
110   public Bericht getBerichtById(Long id) throws SQLException {
111     List<Bericht> berichten;
112     ResultSetHandler<List<Bericht>> h =
113         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
114 
115     berichten =
116         new QueryRunner(geomToJdbc.isPmdKnownBroken())
117             .query(
118                 getConnection(),
119                 "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?",
120                 h,
121                 id);
122 
123     if (berichten != null && berichten.size() == 1) {
124       return berichten.get(0);
125     }
126 
127     return null;
128   }
129 
130   public List<Bericht> getBerichtByLaadProces(LaadProces lp) throws SQLException {
131     List<Bericht> berichten;
132     ResultSetHandler<List<Bericht>> h =
133         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
134 
135     berichten =
136         new QueryRunner(geomToJdbc.isPmdKnownBroken())
137             .query(
138                 getConnection(),
139                 "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
140                 h,
141                 lp.getId());
142     return berichten;
143   }
144 
145   public LaadProces getLaadProcesByFileName(String name) throws SQLException {
146     List<LaadProces> processen;
147     ResultSetHandler<List<LaadProces>> h =
148         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
149 
150     processen =
151         new QueryRunner(geomToJdbc.isPmdKnownBroken())
152             .query(
153                 getConnection(),
154                 "SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE bestand_naam = ?",
155                 h,
156                 name);
157 
158     if (processen != null && processen.size() == 1) {
159       return processen.get(0);
160     }
161 
162     return null;
163   }
164 
165   public LaadProces getLaadProcesByRestoredFilename(String name) throws SQLException {
166     List<LaadProces> processen;
167     ResultSetHandler<List<LaadProces>> h =
168         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
169 
170     processen =
171         new QueryRunner(geomToJdbc.isPmdKnownBroken())
172             .query(
173                 getConnection(),
174                 "SELECT * FROM "
175                     + BrmoFramework.LAADPROCES_TABEL
176                     + " WHERE bestand_naam_hersteld = ?",
177                 h,
178                 name);
179 
180     if (processen != null && processen.size() == 1) {
181       return processen.get(0);
182     }
183 
184     return null;
185   }
186 
187   /**
188    * bepaal of bericht bestaat aan de hand van laadprocesid, object_ref, datum en volgordenummer.
189    *
190    * @param b het bestaande bericht
191    * @return het bestaande bericht uit de database
192    * @throws SQLException if any
193    */
194   public Bericht getExistingBericht(Bericht b) throws SQLException {
195     return getBerichtByNaturalKey(b.getObjectRef(), b.getDatum().getTime(), b.getVolgordeNummer());
196   }
197 
198   private Bericht getBerichtByNaturalKey(String object_ref, long date, Integer volgnr)
199       throws SQLException {
200     List<Bericht> processen;
201     ResultSetHandler<List<Bericht>> h =
202         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
203 
204     processen =
205         new QueryRunner(geomToJdbc.isPmdKnownBroken())
206             .query(
207                 getConnection(),
208                 "SELECT * FROM "
209                     + BrmoFramework.BERICHT_TABLE
210                     + " WHERE object_ref = ?"
211                     + " AND datum = ? and volgordenummer = ?",
212                 h,
213                 object_ref,
214                 new Timestamp(date),
215                 volgnr);
216 
217     if (processen != null && processen.size() > 0) {
218       return processen.get(0);
219     }
220 
221     return null;
222   }
223 
224   public long getCountJob() throws SQLException {
225     Object o =
226         new QueryRunner(geomToJdbc.isPmdKnownBroken())
227             .query(
228                 getConnection(),
229                 "select count(*) from " + BrmoFramework.JOB_TABLE,
230                 new ScalarHandler<>());
231     if (o instanceof BigDecimal) {
232       return ((BigDecimal) o).longValue();
233     } else if (o instanceof Integer) {
234       return ((Integer) o).longValue();
235     }
236     return (Long) o;
237   }
238 
239   public boolean removeJob() throws SQLException {
240     int count =
241         new QueryRunner(geomToJdbc.isPmdKnownBroken())
242             .update(getConnection(), "truncate table " + BrmoFramework.JOB_TABLE);
243     return count > 0;
244   }
245 
246   public boolean cleanJob() throws SQLException {
247     int count;
248     if (geomToJdbc instanceof OracleJdbcConverter) {
249       count =
250           new QueryRunner(geomToJdbc.isPmdKnownBroken())
251               .update(
252                   getConnection(),
253                   "delete from "
254                       + BrmoFramework.JOB_TABLE
255                       + " j where j.id in (select b.id from "
256                       + BrmoFramework.BERICHT_TABLE
257                       + " b "
258                       // vanwege rare fout in oracle bij ophalen tabel
259                       // metadata werkt dit niet
260                       // https://issues.apache.org/jira/browse/DBUTILS-125
261                       + " where b.id = j.id and status != 'STAGING_OK')");
262 
263     } else {
264       count =
265           new QueryRunner(geomToJdbc.isPmdKnownBroken())
266               .update(
267                   getConnection(),
268                   "delete from "
269                       + BrmoFramework.JOB_TABLE
270                       + " j where j.id in (select b.id from "
271                       + BrmoFramework.BERICHT_TABLE
272                       + " b "
273                       + " where b.id = j.id and status != ?)",
274                   Bericht.STATUS.STAGING_OK.toString());
275     }
276     return count > 0;
277   }
278 
279   public long setBerichtenJobByStatus(Bericht.STATUS status, boolean orderBerichten)
280       throws SQLException {
281     StringBuilder q =
282         new StringBuilder(
283             "insert into "
284                 + BrmoFramework.JOB_TABLE
285                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
286                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
287                 + BrmoFramework.BERICHT_TABLE
288                 + " where status = ? and datum <= ? ");
289     if (orderBerichten) {
290       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
291     } else {
292       q.append(" and volgordenummer < 0 ");
293       if (this.limitStandBerichtenToTransform > 0) {
294         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
295       }
296     }
297     log.debug("pre-transformatie SQL: " + q);
298     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
299         .update(
300             getConnection(),
301             q.toString(),
302             status.toString(),
303             new java.sql.Timestamp((new Date()).getTime()));
304   }
305 
306   public long setBerichtenJobForUpdate(String soort, boolean orderBerichten) throws SQLException {
307     StringBuilder q =
308         new StringBuilder(
309             "insert into "
310                 + BrmoFramework.JOB_TABLE
311                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
312                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
313                 + BrmoFramework.BERICHT_TABLE
314                 + " where status = ? and soort = ? and datum <= ? ");
315     if (orderBerichten) {
316       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
317     } else {
318       q.append(" and volgordenummer < 0 ");
319       if (this.limitStandBerichtenToTransform > 0) {
320         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
321       }
322     }
323     log.debug("pre-transformatie SQL: " + q);
324     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
325         .update(
326             getConnection(),
327             q.toString(),
328             Bericht.STATUS.RSGB_OK.toString(),
329             soort,
330             new java.sql.Timestamp((new Date()).getTime()));
331   }
332 
333   public long setBerichtenJobByIds(long[] ids, boolean orderBerichten) throws SQLException {
334     StringBuilder q =
335         new StringBuilder(
336             "insert into "
337                 + BrmoFramework.JOB_TABLE
338                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
339                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
340                 + BrmoFramework.BERICHT_TABLE
341                 + " where id in (");
342     for (int i = 0; i < ids.length; i++) {
343       if (i != 0) {
344         q.append(",");
345       }
346       q.append(ids[i]);
347     }
348     q.append(") and datum <= ? ");
349     if (orderBerichten) {
350       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
351     } else {
352       q.append(" and volgordenummer < 0 ");
353       if (this.limitStandBerichtenToTransform > 0) {
354         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
355       }
356     }
357     log.debug("pre-transformatie SQL: " + q);
358     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
359         .update(getConnection(), q.toString(), new java.sql.Timestamp((new Date()).getTime()));
360   }
361 
362   public long setBerichtenJobByLaadprocessen(long[] laadprocesIds, boolean orderBerichten)
363       throws SQLException {
364 
365     StringBuilder q =
366         new StringBuilder(
367             "insert into "
368                 + BrmoFramework.JOB_TABLE
369                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
370                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
371                 + BrmoFramework.BERICHT_TABLE
372                 + " where laadprocesid in (");
373     for (int i = 0; i < laadprocesIds.length; i++) {
374       if (i != 0) {
375         q.append(",");
376       }
377       q.append(laadprocesIds[i]);
378     }
379     q.append(") and status = ? and datum <= ? ");
380     if (orderBerichten) {
381       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
382     } else {
383       q.append(" and volgordenummer < 0 ");
384       if (this.limitStandBerichtenToTransform > 0) {
385         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
386       }
387     }
388     log.debug("pre-transformatie SQL: " + q);
389     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
390         .update(
391             getConnection(),
392             q.toString(),
393             Bericht.STATUS.STAGING_OK.toString(),
394             new java.sql.Timestamp((new Date()).getTime()));
395   }
396 
397   public void handleBerichtenByJob(
398       long total,
399       final BerichtenHandler handler,
400       final boolean enablePipeline,
401       int transformPipelineCapacity,
402       boolean orderBerichten)
403       throws Exception {
404     Split split = SimonManager.getStopwatch("b3p.rsgb.job").start();
405     final String dateTime = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
406     Split jobSplit = SimonManager.getStopwatch("b3p.rsgb.job." + dateTime).start();
407     ((RsgbProxy) handler).setSimonNamePrefix("b3p.rsgb.job." + dateTime + ".");
408     final RowProcessor processor = new StagingRowHandler();
409 
410     final ProcessDbXmlPipeline processDbXmlPipeline =
411         new ProcessDbXmlPipeline(
412             handler, transformPipelineCapacity, "b3p.rsgb.job." + dateTime + ".pipeline");
413     if (enablePipeline) {
414       processDbXmlPipeline.start();
415     }
416     long offset = 0L;
417     int batch = batchCapacity;
418     final MutableInt processed = new MutableInt(0);
419     final MutableInt lastJid = new MutableInt(0);
420     final boolean doOrderBerichten = orderBerichten;
421     boolean abort = false;
422     try {
423       do {
424         log.debug(
425             String.format(
426                 "Ophalen berichten batch Job tabel, offset %d, limit %d",
427                 lastJid.intValue(), batch));
428         String sql =
429             "select jid, id, datum, volgordenummer, object_ref, br_xml, soort from "
430                 + BrmoFramework.JOB_TABLE
431                 + " where jid > "
432                 + lastJid.intValue()
433                 + " order by jid ";
434         sql = geomToJdbc.buildPaginationSql(sql, 0, batch);
435         log.debug("SQL voor ophalen berichten batch: " + sql);
436 
437         processed.setValue(0);
438         final Split getBerichten =
439             SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".staging.berichten.getbatch")
440                 .start();
441         Exception e =
442             new QueryRunner(geomToJdbc.isPmdKnownBroken())
443                 .query(
444                     getConnection(),
445                     sql,
446                     rs -> {
447                       getBerichten.stop();
448                       final Split processResultSet =
449                           SimonManager.getStopwatch(
450                                   "b3p.rsgb.job."
451                                       + dateTime
452                                       + ".staging.berichten.processresultset")
453                               .start();
454 
455                       while (rs.next()) {
456                         try {
457                           Bericht bericht = processor.toBean(rs, Bericht.class);
458                           if (bericht.getVolgordeNummer() >= 0 && !doOrderBerichten) {
459                             // mutaties hebben volgnr >=0 dan sortering
460                             // verplicht
461                             throw new Exception(
462                                 String.format(
463                                     "Het sorteren van berichten staat uit, terwijl bericht (id: %d) geen standbericht is (volgnummer >=0)",
464                                     bericht.getId()));
465                           }
466                           final BerichtWorkUnit workUnit = new BerichtWorkUnit(bericht);
467 
468                           if (enablePipeline) {
469                             List<TableData> tableData = handler.transformToTableData(bericht);
470                             if (tableData == null) {
471                               // Exception during transform
472                               // updateProcessingResult() is
473                               // aangeroepen, geen
474                               // updateResultPipeline
475                               continue;
476                             }
477                             workUnit.setTableData(tableData);
478                             workUnit.setTypeOfWork(BerichtTypeOfWork.PROCESS_DBXML);
479                             Split pipelinePut =
480                                 SimonManager.getStopwatch(
481                                         "b3p.rsgb.job." + dateTime + ".pipeline.processdbxml.put")
482                                     .start();
483                             processDbXmlPipeline.getQueue().put(workUnit);
484                             pipelinePut.stop();
485                           } else {
486                             Split berichtSplit =
487                                 SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".bericht")
488                                     .start();
489                             handler.handle(bericht, null, true);
490                             berichtSplit.stop();
491                           }
492                           lastJid.setValue(rs.getInt("jid"));
493                         } catch (Exception e1) {
494                           return e1;
495                         }
496                         processed.increment();
497                       }
498                       processResultSet.stop();
499                       return null;
500                     });
501 
502         offset += processed.intValue();
503 
504         // If handler threw exception processing row, rethrow it
505         if (e != null) {
506           throw e;
507         }
508 
509       } while (processed.intValue() > 0 && (offset < total));
510       if (offset < total) {
511         log.warn(String.format("Minder berichten verwerkt (%d) dan verwacht (%d)!", offset, total));
512       }
513       // als succesvol beeindigd, dan verwijderen, anders herstart mogelijk maken
514       removeJob();
515     } catch (Exception t) {
516       abort = true;
517       throw t;
518     } finally {
519       if (enablePipeline) {
520         if (abort) {
521           // Let threads stop by themselves, don't join
522           processDbXmlPipeline.setAbortFlag();
523         } else {
524           processDbXmlPipeline.stopWhenQueueEmpty();
525           try {
526             processDbXmlPipeline.join();
527           } catch (InterruptedException e) {
528           }
529         }
530       }
531     }
532     jobSplit.stop();
533     split.stop();
534   }
535 
536   public void updateBerichtenDbXml(List<Bericht> berichten, RsgbTransformer transformer)
537       throws SAXException, IOException, TransformerException {
538     for (Bericht ber : berichten) {
539       Split split = SimonManager.getStopwatch("b3p.staging.bericht.dbxml.transform").start();
540       String dbxml = transformer.transformToDbXml(ber);
541       // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
542       // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
543       if (geomToJdbc instanceof OracleJdbcConverter && dbxml != null && dbxml.length() == 8000) {
544         log.debug("DB XML is 8000 bytes, er wordt een spatie aangeplakt.");
545         dbxml += " ";
546         log.debug("DB XML is nu " + dbxml.length() + " bytes.");
547       }
548 
549       ber.setDbXml(dbxml);
550       split.stop();
551     }
552   }
553 
554   private PreparedStatement getOldBerichtStatement;
555   private PreparedStatement getOldBerichtStatementById;
556 
557   public Bericht getOldBericht(Bericht nieuwBericht, StringBuilder loadLog) throws SQLException {
558     return getOldBericht(nieuwBericht.getObjectRef(), loadLog);
559   }
560 
561   public Bericht getOldBericht(String objectRef, StringBuilder loadLog) throws SQLException {
562     Split split = SimonManager.getStopwatch("b3p.staging.bericht.getold").start();
563 
564     Bericht bericht = null;
565     ResultSetHandler<List<Bericht>> h =
566         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
567 
568     if (getOldBerichtStatement == null) {
569       String sql =
570           "SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
571               + BrmoFramework.BERICHT_TABLE
572               + " WHERE"
573               + " object_ref = ?"
574               + " AND status in ('RSGB_OK', 'ARCHIVE')"
575               + " ORDER BY datum desc, volgordenummer desc";
576       sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
577 
578       getOldBerichtStatement = getConnection().prepareStatement(sql);
579     } else {
580       getOldBerichtStatement.clearParameters();
581     }
582     getOldBerichtStatement.setString(1, objectRef);
583 
584     ResultSet rs = getOldBerichtStatement.executeQuery();
585     List<Bericht> list = h.handle(rs);
586     rs.close();
587 
588     if (!list.isEmpty()) {
589       loadLog.append("Vorig bericht gevonden:\n");
590       for (Bericht b : list) {
591         if ((Bericht.STATUS.RSGB_OK.equals(b.getStatus())
592                 || Bericht.STATUS.ARCHIVE.equals(b.getStatus()))
593             && bericht == null) {
594           loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
595           bericht = b;
596         } else {
597           loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
598         }
599       }
600     }
601 
602     if (bericht != null) {
603       // bericht nu wel vullen met alle kolommen
604       if (getOldBerichtStatementById == null) {
605         String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
606 
607         getOldBerichtStatementById = getConnection().prepareStatement(sql);
608       } else {
609         getOldBerichtStatementById.clearParameters();
610       }
611       getOldBerichtStatementById.setLong(1, bericht.getId());
612 
613       ResultSet rs2 = getOldBerichtStatementById.executeQuery();
614       List<Bericht> list2 = h.handle(rs2);
615       rs2.close();
616 
617       if (!list2.isEmpty()) {
618         bericht = list2.get(0);
619       }
620     }
621 
622     split.stop();
623     return bericht;
624   }
625 
626   private PreparedStatement getPreviousBerichtStatement;
627 
628   public Bericht getPreviousBericht(Bericht ber, StringBuilder loadLog) throws SQLException {
629     return getPreviousBericht(ber.getObjectRef(), ber.getDatum(), ber.getId(), loadLog);
630   }
631 
632   /** Gets the previous bericht (not the first). */
633   public Bericht getPreviousBericht(
634       String objectRef, Date datum, Long currentBerichtId, StringBuilder loadLog)
635       throws SQLException {
636     Split split = SimonManager.getStopwatch("b3p.staging.bericht.getprevious").start();
637 
638     Bericht bericht = null;
639     ResultSetHandler<List<Bericht>> h =
640         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
641 
642     if (getPreviousBerichtStatement == null) {
643       String sql =
644           "SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
645               + BrmoFramework.BERICHT_TABLE
646               + " WHERE"
647               + " object_ref = ? and datum <= ? and id <> ?"
648               + " ORDER BY datum asc, volgordenummer desc";
649       sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
650 
651       getPreviousBerichtStatement = getConnection().prepareStatement(sql);
652     } else {
653       getPreviousBerichtStatement.clearParameters();
654     }
655     getPreviousBerichtStatement.setString(1, objectRef);
656     getPreviousBerichtStatement.setTimestamp(2, new java.sql.Timestamp(datum.getTime()));
657     getPreviousBerichtStatement.setLong(3, currentBerichtId);
658 
659     ResultSet rs = getPreviousBerichtStatement.executeQuery();
660     List<Bericht> list = h.handle(rs);
661     rs.close();
662 
663     if (!list.isEmpty()) {
664       loadLog.append("Vorig bericht gevonden:\n");
665       for (Bericht b : list) {
666         if (bericht == null) {
667           loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
668           bericht = b;
669         } else {
670           loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
671         }
672       }
673     }
674 
675     if (bericht != null) {
676       // bericht nu wel vullen met alle kolommen
677       if (getOldBerichtStatementById == null) {
678         String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
679 
680         getOldBerichtStatementById = getConnection().prepareStatement(sql);
681       } else {
682         getOldBerichtStatementById.clearParameters();
683       }
684       getOldBerichtStatementById.setLong(1, bericht.getId());
685 
686       ResultSet rs2 = getOldBerichtStatementById.executeQuery();
687       List<Bericht> list2 = h.handle(rs2);
688       rs2.close();
689 
690       if (!list2.isEmpty()) {
691         bericht = list2.get(0);
692       }
693     }
694 
695     split.stop();
696     return bericht;
697   }
698 
699   /**
700    * Update (overschrijft) een bericht in job tabel. (object_ref, datum, volgordenummer, soort,
701    * opmerking, status, status_datum, br_xml, br_orgineel_xml, db_xml, xsl_version)
702    *
703    * @param b bij te werken bericht
704    * @throws SQLException if any
705    */
706   public void updateBericht(Bericht b) throws SQLException {
707     Split split = SimonManager.getStopwatch("b3p.staging.bericht.update").start();
708     String brXML = b.getBrXml();
709     // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
710     // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
711     if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
712       log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
713       brXML += " ";
714       log.debug("BR XML is nu " + brXML.length() + " bytes.");
715     }
716 
717     new QueryRunner(geomToJdbc.isPmdKnownBroken())
718         .update(
719             getConnection(),
720             "UPDATE "
721                 + BrmoFramework.BERICHT_TABLE
722                 + " set "
723                 + "object_ref = ?, "
724                 + "datum = ?, "
725                 + "volgordenummer = ?, "
726                 + "soort = ?, "
727                 + "opmerking = ?, "
728                 + "status = ?, "
729                 + "status_datum = ?, "
730                 + "br_xml = ?, "
731                 + "br_orgineel_xml = ?, "
732                 + "db_xml = ?, "
733                 + "xsl_version = ? "
734                 + "WHERE id = ?",
735             b.getObjectRef(),
736             new Timestamp(b.getDatum().getTime()),
737             b.getVolgordeNummer(),
738             b.getSoort(),
739             b.getOpmerking(),
740             b.getStatus().toString(),
741             new Timestamp(b.getStatusDatum().getTime()),
742             brXML,
743             b.getBrOrgineelXml(),
744             b.getDbXml(),
745             b.getXslVersion(),
746             b.getId());
747     split.stop();
748   }
749 
750   /**
751    * Update een bericht in job tabel met processing status. (opmerking, status, status_datum,
752    * db_xml)
753    *
754    * @param b updatebericht
755    * @throws SQLException als de update mislukt
756    */
757   public void updateBerichtProcessing(Bericht b) throws SQLException {
758     Split split = SimonManager.getStopwatch("b3p.staging.bericht.updateprocessing").start();
759 
760     new QueryRunner(geomToJdbc.isPmdKnownBroken())
761         .update(
762             getConnection(),
763             "UPDATE "
764                 + BrmoFramework.BERICHT_TABLE
765                 + " set "
766                 + "opmerking = ?, "
767                 + "status = ?, "
768                 + "status_datum = ?, "
769                 + "db_xml = ? "
770                 + " WHERE id = ?",
771             b.getOpmerking(),
772             b.getStatus().toString(),
773             new Timestamp(b.getStatusDatum().getTime()),
774             b.getDbXml(),
775             b.getId());
776     split.stop();
777   }
778 
779   public void deleteByLaadProcesId(Long id) throws SQLException {
780     new QueryRunner(geomToJdbc.isPmdKnownBroken())
781         .update(
782             getConnection(),
783             "DELETE FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
784             id);
785     new QueryRunner(geomToJdbc.isPmdKnownBroken())
786         .update(
787             getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?", id);
788   }
789 
790   public List<LaadProces> getLaadProcessen() throws SQLException {
791     List<LaadProces> list;
792     ResultSetHandler<List<LaadProces>> h =
793         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
794     list =
795         new QueryRunner(geomToJdbc.isPmdKnownBroken())
796             .query(getConnection(), "select * from " + BrmoFramework.LAADPROCES_TABEL, h);
797     return list;
798   }
799 
800   public List<Bericht> getBerichten() throws SQLException {
801     List<Bericht> berichten;
802     ResultSetHandler<List<Bericht>> h =
803         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
804     berichten =
805         new QueryRunner(geomToJdbc.isPmdKnownBroken())
806             .query(getConnection(), "select * from " + BrmoFramework.BERICHT_TABLE, h);
807     return berichten;
808   }
809 
810   /**
811    * Laadt het bestand uit de stream in de database.
812    *
813    * @param stream input
814    * @param type type registratie, bijv. {@value BrmoFramework#BR_BRK2}
815    * @param fileName naam van het bestand (ter identificatie)
816    * @param d bestandsdatum
817    * @param listener progress listener
818    * @throws Exception if any
819    * @deprecated gebruik de variant die een automatischProcesId als argument heeft
820    * @see #loadBr(InputStream, String, String, Date, ProgressUpdateListener, Long)
821    */
822   @Deprecated
823   public void loadBr(
824       InputStream stream, String type, String fileName, Date d, ProgressUpdateListener listener)
825       throws Exception {
826     this.loadBr(stream, type, fileName, d, listener, null);
827   }
828 
829   /**
830    * Laadt het bestand uit de stream in de database.
831    *
832    * @param stream input
833    * @param type type registratie, bijv. {@value BrmoFramework#BR_BRK2}
834    * @param fileName naam van het bestand (ter identificatie)
835    * @param d bestandsdatum
836    * @param automatischProces id van het automatisch proces
837    * @param listener progress listener
838    * @throws Exception if any
839    */
840   public void loadBr(
841       InputStream stream,
842       String type,
843       String fileName,
844       Date d,
845       ProgressUpdateListener listener,
846       Long automatischProces)
847       throws Exception {
848 
849     CountingInputStream cis = new CountingInputStream(stream);
850 
851     BrmoXMLReader brmoXMLReader;
852     if (type.equals(BrmoFramework.BR_BRK2)) {
853       brmoXMLReader = new Brk2SnapshotXMLReader(cis);
854     } else if (type.equals(BrmoFramework.BR_NHR)) {
855       brmoXMLReader = new NhrXMLReader(cis);
856     } else if (TopNLType.isTopNLType(type)) {
857       brmoXMLReader = new TopNLFileReader(fileName, type);
858     } else if (type.equals(BrmoFramework.BR_BRP)) {
859       brmoXMLReader = new BRPXMLReader(cis, d, this);
860     } else if (type.equals(BrmoFramework.BR_GBAV)) {
861       brmoXMLReader = new GbavXMLReader(cis);
862     } else if (type.equals(BrmoFramework.BR_WOZ)) {
863       brmoXMLReader = new WozXMLReader(cis, d, this);
864     } else {
865       throw new UnsupportedOperationException("Ongeldige basisregistratie: " + type);
866     }
867 
868     if (brmoXMLReader.getBestandsDatum() == null) {
869       throw new BrmoException("Header van bestand bevat geen datum, verkeerd formaat?");
870     }
871 
872     LaadProces lp = new LaadProces();
873     lp.setBestandNaam(fileName);
874     lp.setBestandDatum(brmoXMLReader.getBestandsDatum());
875     lp.setSoort(type);
876     lp.setGebied(brmoXMLReader.getGebied());
877     lp.setStatus(LaadProces.STATUS.STAGING_OK);
878     lp.setStatusDatum(new Date());
879     lp.setAutomatischProcesId(automatischProces);
880 
881     if (laadProcesExists(lp)) {
882       throw new BrmoDuplicaatLaadprocesException("Laadproces al gerund voor bestand " + fileName);
883     }
884     lp = writeLaadProces(lp);
885 
886     if (TopNLType.isTopNLType(type)) {
887       // van een TopNL GML bestand maken we alleen een LP, geen bericht,
888       // de datum halen we van het zip bestand
889       if (listener != null) {
890         listener.total(((TopNLFileReader) brmoXMLReader).getFileSize());
891         listener.progress(((TopNLFileReader) brmoXMLReader).getFileSize());
892       }
893     } else {
894       if (!brmoXMLReader.hasNext()) {
895         updateLaadProcesStatus(
896             lp,
897             LaadProces.STATUS.STAGING_OK,
898             "Leeg bestand, geen berichten gevonden in " + fileName);
899         throw new BrmoLeegBestandException("Leeg bestand, geen berichten gevonden in " + fileName);
900       }
901 
902       boolean isBerichtGeschreven = false;
903       int berichten = 0;
904       int foutBerichten = 0;
905       String lastErrorMessage = null;
906 
907       while (brmoXMLReader.hasNext()) {
908         Bericht b;
909         try {
910           b = brmoXMLReader.next();
911           b.setLaadProcesId(lp.getId());
912           b.setStatus(Bericht.STATUS.STAGING_OK);
913           b.setStatusDatum(new Date());
914           b.setSoort(type);
915 
916           if (StringUtils.isEmpty(b.getObjectRef())) {
917             // geen object_ref kunnen vaststellen; dan ook niet transformeren,
918             // bijvoorbeeld bij WOZ
919             b.setStatus(Bericht.STATUS.STAGING_NOK);
920             b.setOpmerking(
921                 "Er kon geen object_ref bepaald worden uit de natuurlijke sleutel van het bericht.");
922           }
923 
924           if (b.getDatum() == null) {
925             throw new BrmoException("Datum bericht is null");
926           }
927           log.debug(b);
928 
929           Bericht existingBericht = getExistingBericht(b);
930           // TODO BRK2 bepalen of dit nog nodig is voor BRK2
931           if (type.equals(BrmoFramework.BR_BRK2) && !isBerichtGeschreven) {
932             // haal alleen voor eerste
933             Brk2Bericht brk2Bericht = (Brk2Bericht) b;
934             lp.setBestandNaamHersteld(
935                 brk2Bericht.getRestoredFileName(lp.getBestandDatum(), b.getVolgordeNummer()));
936             updateLaadProcesBestandNaamHersteld(lp);
937           }
938 
939           if (existingBericht == null) {
940             writeBericht(b);
941             isBerichtGeschreven = true;
942           } else if (existingBericht.getStatus().equals(Bericht.STATUS.STAGING_OK)) {
943             // als bericht nog niet getransformeerd is, dan overschrijven.
944             b.setId(existingBericht.getId());
945             this.updateBericht(b);
946           }
947           if (listener != null) {
948             listener.progress(cis.getByteCount());
949           }
950           berichten++;
951         } catch (Exception e) {
952           lastErrorMessage =
953               String.format(
954                   "Laden bericht uit %s mislukt vanwege: %s", fileName, e.getLocalizedMessage());
955           log.error(lastErrorMessage);
956           log.trace(lastErrorMessage, e);
957           if (listener != null) {
958             listener.exception(e);
959           }
960           foutBerichten++;
961         }
962       }
963       if (listener != null) {
964         listener.total(berichten);
965       }
966       if (foutBerichten > 0) {
967         String opmerking =
968             "Laden van "
969                 + foutBerichten
970                 + " bericht(en) mislukt, laatste melding: "
971                 + lastErrorMessage
972                 + ", zie logs voor meer info.";
973         this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_NOK, opmerking);
974       } else if (!isBerichtGeschreven) {
975         String opmerking = "Dit bestand is waarschijnlijk al eerder geladen.";
976         this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_DUPLICAAT, opmerking);
977       }
978     }
979   }
980 
981   public void writeBericht(Bericht b) throws SQLException {
982     String brXML = b.getBrXml();
983     // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
984     // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
985     if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
986       log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
987       brXML += " ";
988       log.debug("BR XML is nu " + brXML.length() + " bytes.");
989     }
990     Object berId =
991         new QueryRunner(geomToJdbc.isPmdKnownBroken())
992             .insert(
993                 getConnection(),
994                 "INSERT INTO "
995                     + BrmoFramework.BERICHT_TABLE
996                     + " (laadprocesid, "
997                     + "object_ref, "
998                     + "datum, "
999                     + "volgordenummer, "
1000                     + "soort, "
1001                     + "opmerking, "
1002                     + "status, "
1003                     + "status_datum, "
1004                     + "br_xml, "
1005                     + "br_orgineel_xml, "
1006                     + "db_xml, "
1007                     + "xsl_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
1008                 new ScalarHandler<>(),
1009                 b.getLaadProcesId(),
1010                 b.getObjectRef(),
1011                 new Timestamp(b.getDatum().getTime()),
1012                 b.getVolgordeNummer(),
1013                 b.getSoort(),
1014                 b.getOpmerking(),
1015                 b.getStatus().toString(),
1016                 new Timestamp(b.getStatusDatum().getTime()),
1017                 brXML,
1018                 b.getBrOrgineelXml(),
1019                 b.getDbXml(),
1020                 b.getXslVersion());
1021     log.trace("opgeslagen bericht heeft (row) id: " + berId);
1022   }
1023 
1024   /** Bepaal aan de hand van bestandsnaam en bestandsdatum van het laadproces of dit al bestaat. */
1025   private boolean laadProcesExists(LaadProces lp) throws SQLException {
1026     Object o =
1027         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1028             .query(
1029                 getConnection(),
1030                 "select 1 from laadproces where bestand_naam = ? and" + " bestand_datum = ?",
1031                 new ScalarHandler<>(),
1032                 lp.getBestandNaam(),
1033                 new Timestamp(lp.getBestandDatum().getTime()));
1034 
1035     return o != null;
1036   }
1037 
1038   private LaadProces writeLaadProces(LaadProces lp) throws SQLException {
1039     log.trace("opslaan van laadproces: " + lp);
1040     if (lp == null) {
1041       return null;
1042     }
1043 
1044     final String sql =
1045         "INSERT INTO "
1046             + BrmoFramework.LAADPROCES_TABEL
1047             + "(bestand_naam, "
1048             + "bestand_datum, soort, gebied, opmerking, status, "
1049             + "status_datum, contact_email, automatisch_proces "
1050             // nieuwe kolommen in 2.0.0
1051             + ", afgifteid, afgiftereferentie, artikelnummer, beschikbaar_tot, bestandsreferentie, "
1052             + "contractafgiftenummer, contractnummer, klantafgiftenummer, bestand_naam_hersteld"
1053             + ") VALUES (?,?,?,?,?,?,?,?,?"
1054             + ",?,?,?,?,?,?,?,?,?"
1055             + ")";
1056 
1057     // Oracle geeft een ROWID terug als "generated key" en niet de PK-kolom "ID" die we willen
1058     // hebben, daarom zelf doen
1059     // zie ook: https://issues.apache.org/jira/browse/DBUTILS-54
1060     QueryRunner queryRunner = new QueryRunner(geomToJdbc.isPmdKnownBroken());
1061     try (
1062     // PG: Caused by: org.postgresql.util.PSQLException: ERROR: column "ID" does not exist
1063     // maar lowercase werkt ook met oracle
1064     PreparedStatement stmt = getConnection().prepareStatement(sql, new String[] {"id"})
1065     // door een bug in de PG driver geen kolom index gebruiken, zie
1066     // https://github.com/pgjdbc/pgjdbc/issues/1661
1067     // PreparedStatement stmt = getConnection().prepareStatement(sql, new int[]{1});
1068     ) {
1069       queryRunner.fillStatement(
1070           stmt,
1071           lp.getBestandNaam(),
1072           new Timestamp(lp.getBestandDatum().getTime()),
1073           lp.getSoort(),
1074           lp.getGebied(),
1075           lp.getOpmerking(),
1076           lp.getStatus().toString(),
1077           new Timestamp(lp.getStatusDatum().getTime()),
1078           lp.getContactEmail(),
1079           lp.getAutomatischProcesId(),
1080           // nieuwe kolommen in 2.0.0
1081           lp.getAfgifteid(),
1082           lp.getAfgiftereferentie(),
1083           lp.getArtikelnummer(),
1084           (lp.getBeschikbaar_tot() == null)
1085               ? null
1086               : new Timestamp(lp.getBeschikbaar_tot().getTime()),
1087           lp.getBestandsreferentie(),
1088           lp.getContractafgiftenummer(),
1089           lp.getContractnummer(),
1090           lp.getKlantafgiftenummer(),
1091           lp.getBestandNaamHersteld());
1092       stmt.executeUpdate();
1093       ResultSetHandler<Number> rsh = new ScalarHandler<>();
1094       Number lpId = rsh.handle(stmt.getGeneratedKeys());
1095       log.trace("opgeslagen laadproces heeft id: " + lpId);
1096       return this.getLaadProcesById(lpId.longValue());
1097     }
1098   }
1099 
1100   public void updateLaadProcesStatus(LaadProces lp, LaadProces.STATUS status, String opmerking)
1101       throws SQLException {
1102     log.trace("update van laadproces status: " + lp);
1103     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1104         .update(
1105             getConnection(),
1106             "update "
1107                 + BrmoFramework.LAADPROCES_TABEL
1108                 + " set status = ?, opmerking = ?, status_datum = ? where id = ?",
1109             status.toString(),
1110             opmerking,
1111             new Timestamp(new Date().getTime()),
1112             lp.getId());
1113   }
1114 
1115   /**
1116    * update laadproces (GDS2 afgifte) metadata.
1117    *
1118    * @param lpId laadproces id
1119    * @param klantafgiftenummer klantafgiftenummer
1120    * @param contractafgiftenummer contractafgiftenummer
1121    * @param artikelnummer artikelnummer
1122    * @param contractnummer contractnummer
1123    * @param afgifteid afgifteid
1124    * @param afgiftereferentie afgiftereferentie
1125    * @param bestandsreferentie bestandsreferentie
1126    * @param beschikbaar_tot beschikbaar_tot
1127    * @throws SQLException if any
1128    */
1129   public void updateLaadProcesMeta(
1130       Long lpId,
1131       BigInteger klantafgiftenummer,
1132       BigInteger contractafgiftenummer,
1133       String artikelnummer,
1134       String contractnummer,
1135       String afgifteid,
1136       String afgiftereferentie,
1137       String bestandsreferentie,
1138       Date beschikbaar_tot)
1139       throws SQLException {
1140     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1141         .update(
1142             getConnection(),
1143             "update "
1144                 + BrmoFramework.LAADPROCES_TABEL
1145                 + " set "
1146                 + "afgifteid = ?,"
1147                 + "afgiftereferentie = ?,"
1148                 + "artikelnummer = ?,"
1149                 + "beschikbaar_tot = ?,"
1150                 + "bestandsreferentie = ?,"
1151                 + "contractafgiftenummer = ?,"
1152                 + "contractnummer = ?,"
1153                 + "klantafgiftenummer = ?"
1154                 + "where id = ?",
1155             afgifteid,
1156             afgiftereferentie,
1157             artikelnummer,
1158             new Timestamp(beschikbaar_tot.getTime()),
1159             bestandsreferentie,
1160             contractafgiftenummer,
1161             contractnummer,
1162             klantafgiftenummer,
1163             lpId);
1164   }
1165 
1166   public void updateLaadProcesBestandNaamHersteld(LaadProces lp) throws SQLException {
1167     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1168         .update(
1169             getConnection(),
1170             "update "
1171                 + BrmoFramework.LAADPROCES_TABEL
1172                 + " set bestand_naam_hersteld = ? where id = ?",
1173             lp.getBestandNaamHersteld(),
1174             lp.getId());
1175   }
1176 
1177   public void emptyStagingDb() throws SQLException {
1178     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1179         .update(getConnection(), "DELETE FROM " + BrmoFramework.BERICHT_TABLE);
1180     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1181         .update(getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL);
1182   }
1183 
1184   public List<Bericht> getBerichten(
1185       int page,
1186       int start,
1187       int limit,
1188       String sort,
1189       String dir,
1190       String filterSoort,
1191       String filterStatus)
1192       throws SQLException {
1193 
1194     List<String> params = new ArrayList<>();
1195     if (sort == null || sort.trim().isEmpty()) {
1196       sort = "id";
1197     }
1198     if (dir == null || dir.trim().isEmpty()) {
1199       sort = "asc";
1200     }
1201     String sql =
1202         "SELECT * FROM "
1203             + BrmoFramework.BERICHT_TABLE
1204             + buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
1205 
1206     sql = geomToJdbc.buildPaginationSql(sql, start, limit);
1207 
1208     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
1209         .query(
1210             getConnection(),
1211             sql,
1212             new BeanListHandler<>(Bericht.class, new StagingRowHandler()),
1213             params.toArray());
1214   }
1215 
1216   public long getCountBerichten(String filterSoort, String filterStatus) throws SQLException {
1217 
1218     List<String> params = new ArrayList<>();
1219     String filter = buildFilterSql(0, null, null, filterSoort, filterStatus, params);
1220 
1221     String sql;
1222     // gebruik estimate voor postgresql indien geen filter
1223     if (StringUtils.isBlank(filter) && geomToJdbc instanceof PostgisJdbcConverter) {
1224       sql =
1225           "SELECT reltuples::BIGINT AS estimate FROM pg_class WHERE relname= '"
1226               + BrmoFramework.BERICHT_TABLE
1227               + "'";
1228     } else {
1229       sql = "SELECT count(*) FROM " + BrmoFramework.BERICHT_TABLE + filter;
1230     }
1231 
1232     ResultSet rs = null;
1233     PreparedStatement pstmt = null;
1234     try {
1235       pstmt = getConnection().prepareStatement(sql);
1236       try {
1237         pstmt.setQueryTimeout(300); // seconds to wait
1238       } catch (Exception e) {
1239         log.warn("Driver does not support setQueryTimeout, please update driver.");
1240       }
1241       if (!params.isEmpty()) {
1242         for (int i = 0; i < params.size(); i++) {
1243           if (params.get(i) != null) {
1244             pstmt.setObject(i + 1, params.get(i));
1245           } else {
1246             pstmt.setNull(i + 1, Types.VARCHAR);
1247           }
1248         }
1249       }
1250       rs = pstmt.executeQuery();
1251       if (rs.next()) {
1252         Object o = rs.getObject(1);
1253         if (o instanceof BigDecimal) {
1254           return ((BigDecimal) o).longValue();
1255         } else if (o instanceof Integer) {
1256           return ((Integer) o).longValue();
1257         }
1258         return (Long) o;
1259       } else {
1260         return -1;
1261       }
1262     } catch (Exception e) {
1263       return -1;
1264     } finally {
1265       try {
1266         if (rs != null && !rs.isClosed()) {
1267           rs.close();
1268         }
1269       } catch (SQLException e) {
1270         // ignore
1271       }
1272       try {
1273         if (pstmt != null && !pstmt.isClosed()) {
1274           pstmt.close();
1275         }
1276       } catch (SQLException e) {
1277         // ignore
1278       }
1279     }
1280   }
1281 
1282   private String buildFilterSql(
1283       int page,
1284       String sort,
1285       String dir,
1286       String filterSoort,
1287       String filterStatus,
1288       List<String> params) {
1289 
1290     StringBuilder builder = new StringBuilder();
1291 
1292     List<String> conjunctions = new ArrayList<>();
1293 
1294     if (!StringUtils.isBlank(filterSoort)) {
1295       String[] soorten = filterSoort.split(",");
1296       params.addAll(Arrays.asList(soorten));
1297       conjunctions.add("soort in (" + StringUtils.repeat("?", ", ", soorten.length) + ")");
1298     }
1299 
1300     if (!StringUtils.isBlank(filterStatus)) {
1301       String[] statussen = filterStatus.split(",");
1302       params.addAll(Arrays.asList(statussen));
1303       conjunctions.add("status in (" + StringUtils.repeat("?", ", ", statussen.length) + ")");
1304     }
1305 
1306     // build where part
1307     if (!conjunctions.isEmpty()) {
1308       builder.append(" where ");
1309       builder.append(StringUtils.join(conjunctions.toArray(), " and "));
1310     }
1311 
1312     // build order by part
1313     if (sort != null && dir != null) {
1314       builder.append(" ORDER BY ");
1315       builder.append(sort);
1316       builder.append(" ");
1317       builder.append(dir);
1318     }
1319 
1320     return builder.toString();
1321   }
1322 
1323   public long getCountLaadProces(String filterSoort, String filterStatus) throws SQLException {
1324 
1325     List<String> params = new ArrayList<>();
1326     String sql =
1327         "SELECT count(*) FROM "
1328             + BrmoFramework.LAADPROCES_TABEL
1329             + buildFilterSql(0, null, null, filterSoort, filterStatus, params);
1330 
1331     Object o =
1332         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1333             .query(getConnection(), sql, new ScalarHandler<>(), params.toArray());
1334     if (o instanceof BigDecimal) {
1335       return ((BigDecimal) o).longValue();
1336     } else if (o instanceof Integer) {
1337       return ((Integer) o).longValue();
1338     }
1339     return (Long) o;
1340   }
1341 
1342   public List<LaadProces> getLaadprocessen(
1343       int page,
1344       int start,
1345       int limit,
1346       String sort,
1347       String dir,
1348       String filterSoort,
1349       String filterStatus)
1350       throws SQLException {
1351 
1352     List<String> params = new ArrayList<>();
1353     if (sort == null || sort.trim().isEmpty()) {
1354       sort = "id";
1355     }
1356     if (dir == null || dir.trim().isEmpty()) {
1357       sort = "asc";
1358     }
1359     String sql =
1360         "SELECT * FROM "
1361             + BrmoFramework.LAADPROCES_TABEL
1362             + buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
1363 
1364     sql = geomToJdbc.buildPaginationSql(sql, start, limit);
1365 
1366     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
1367         .query(
1368             getConnection(),
1369             sql,
1370             new BeanListHandler<>(LaadProces.class, new StagingRowHandler()),
1371             params.toArray());
1372   }
1373 
1374   public Long[] getLaadProcessenIds(
1375       String sort, String dir, String filterSoort, String filterStatus) throws SQLException {
1376     List<String> params = new ArrayList<>();
1377     if (sort == null || sort.trim().isEmpty()) {
1378       sort = "id";
1379     }
1380     if (dir == null || dir.trim().isEmpty()) {
1381       dir = "asc";
1382     }
1383     String sql =
1384         "SELECT ID FROM "
1385             + BrmoFramework.LAADPROCES_TABEL
1386             + buildFilterSql(-1, sort, dir, filterSoort, filterStatus, params);
1387     List<Long> ids =
1388         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1389             .query(getConnection(), sql, new LongColumnListHandler("id"), params.toArray());
1390     return ids.toArray(new Long[ids.size()]);
1391   }
1392 
1393   /**
1394    * @param batchCapacity the batchCapacity to set
1395    */
1396   public void setBatchCapacity(Integer batchCapacity) {
1397     this.batchCapacity = batchCapacity;
1398   }
1399 
1400   public void setLimitStandBerichtenToTransform(Integer limitStandBerichtenToTransform) {
1401     this.limitStandBerichtenToTransform = limitStandBerichtenToTransform;
1402   }
1403 }