View Javadoc
1   package nl.b3p.brmo.loader;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.math.BigDecimal;
6   import java.math.BigInteger;
7   import java.sql.*;
8   import java.text.SimpleDateFormat;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Date;
12  import java.util.List;
13  import javax.sql.DataSource;
14  import javax.xml.transform.TransformerException;
15  import nl.b3p.brmo.loader.entity.Bericht;
16  import nl.b3p.brmo.loader.entity.BerichtenSorter;
17  import nl.b3p.brmo.loader.entity.Brk2Bericht;
18  import nl.b3p.brmo.loader.entity.LaadProces;
19  import nl.b3p.brmo.loader.pipeline.BerichtTypeOfWork;
20  import nl.b3p.brmo.loader.pipeline.BerichtWorkUnit;
21  import nl.b3p.brmo.loader.pipeline.ProcessDbXmlPipeline;
22  import nl.b3p.brmo.loader.util.BrmoDuplicaatLaadprocesException;
23  import nl.b3p.brmo.loader.util.BrmoException;
24  import nl.b3p.brmo.loader.util.BrmoLeegBestandException;
25  import nl.b3p.brmo.loader.util.RsgbTransformer;
26  import nl.b3p.brmo.loader.util.StagingRowHandler;
27  import nl.b3p.brmo.loader.util.TableData;
28  import nl.b3p.brmo.loader.xml.BRPXMLReader;
29  import nl.b3p.brmo.loader.xml.Brk2SnapshotXMLReader;
30  import nl.b3p.brmo.loader.xml.BrmoXMLReader;
31  import nl.b3p.brmo.loader.xml.GbavXMLReader;
32  import nl.b3p.brmo.loader.xml.NhrXMLReader;
33  import nl.b3p.brmo.loader.xml.WozXMLReader;
34  import nl.b3p.jdbc.util.converter.GeometryJdbcConverter;
35  import nl.b3p.jdbc.util.converter.GeometryJdbcConverterFactory;
36  import nl.b3p.jdbc.util.converter.OracleJdbcConverter;
37  import nl.b3p.jdbc.util.converter.PostgisJdbcConverter;
38  import nl.b3p.jdbc.util.dbutils.LongColumnListHandler;
39  import org.apache.commons.dbutils.DbUtils;
40  import org.apache.commons.dbutils.QueryRunner;
41  import org.apache.commons.dbutils.ResultSetHandler;
42  import org.apache.commons.dbutils.RowProcessor;
43  import org.apache.commons.dbutils.handlers.BeanListHandler;
44  import org.apache.commons.dbutils.handlers.ScalarHandler;
45  import org.apache.commons.io.input.CountingInputStream;
46  import org.apache.commons.lang3.StringUtils;
47  import org.apache.commons.lang3.mutable.MutableInt;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.javasimon.SimonManager;
51  import org.javasimon.Split;
52  import org.xml.sax.SAXException;
53  
54  /**
55   * @author Boy de Wit
56   */
57  public class StagingProxy {
58  
59    private static final Log log = LogFactory.getLog(StagingProxy.class);
60  
61    private Connection connStaging;
62    private DataSource dataSourceStaging;
63    private GeometryJdbcConverter geomToJdbc;
64    private Integer batchCapacity = 150;
65    private Integer limitStandBerichtenToTransform = -1;
66  
67    public StagingProxy(DataSource dataSourceStaging) throws SQLException, BrmoException {
68      this.dataSourceStaging = dataSourceStaging;
69      this.connStaging = getConnection();
70      geomToJdbc = GeometryJdbcConverterFactory.getGeometryJdbcConverter(connStaging);
71    }
72  
73    public void closeStagingProxy() {
74      if (getOldBerichtStatement != null) {
75        DbUtils.closeQuietly(getOldBerichtStatement);
76      }
77      DbUtils.closeQuietly(connStaging);
78    }
79  
80    private Connection getConnection() throws SQLException {
81      if (connStaging == null || connStaging.isClosed()) {
82        connStaging = dataSourceStaging.getConnection();
83      }
84      return connStaging;
85    }
86  
87    public LaadProces getLaadProcesById(Long id) throws SQLException {
88      List<LaadProces> processen;
89  
90      ResultSetHandler<List<LaadProces>> h =
91          new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
92  
93      processen =
94          new QueryRunner(geomToJdbc.isPmdKnownBroken())
95              .query(
96                  getConnection(),
97                  "SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?",
98                  h,
99                  id);
100 
101     if (processen != null && processen.size() == 1) {
102       return processen.get(0);
103     }
104 
105     return null;
106   }
107 
108   public Bericht getBerichtById(Long id) throws SQLException {
109     List<Bericht> berichten;
110     ResultSetHandler<List<Bericht>> h =
111         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
112 
113     berichten =
114         new QueryRunner(geomToJdbc.isPmdKnownBroken())
115             .query(
116                 getConnection(),
117                 "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?",
118                 h,
119                 id);
120 
121     if (berichten != null && berichten.size() == 1) {
122       return berichten.get(0);
123     }
124 
125     return null;
126   }
127 
128   public List<Bericht> getBerichtByLaadProces(LaadProces lp) throws SQLException {
129     List<Bericht> berichten;
130     ResultSetHandler<List<Bericht>> h =
131         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
132 
133     berichten =
134         new QueryRunner(geomToJdbc.isPmdKnownBroken())
135             .query(
136                 getConnection(),
137                 "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
138                 h,
139                 lp.getId());
140     return berichten;
141   }
142 
143   public LaadProces getLaadProcesByFileName(String name) throws SQLException {
144     List<LaadProces> processen;
145     ResultSetHandler<List<LaadProces>> h =
146         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
147 
148     processen =
149         new QueryRunner(geomToJdbc.isPmdKnownBroken())
150             .query(
151                 getConnection(),
152                 "SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE bestand_naam = ?",
153                 h,
154                 name);
155 
156     if (processen != null && processen.size() == 1) {
157       return processen.get(0);
158     }
159 
160     return null;
161   }
162 
163   public LaadProces getLaadProcesByRestoredFilename(String name) throws SQLException {
164     List<LaadProces> processen;
165     ResultSetHandler<List<LaadProces>> h =
166         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
167 
168     processen =
169         new QueryRunner(geomToJdbc.isPmdKnownBroken())
170             .query(
171                 getConnection(),
172                 "SELECT * FROM "
173                     + BrmoFramework.LAADPROCES_TABEL
174                     + " WHERE bestand_naam_hersteld = ?",
175                 h,
176                 name);
177 
178     if (processen != null && processen.size() == 1) {
179       return processen.get(0);
180     }
181 
182     return null;
183   }
184 
185   /**
186    * bepaal of bericht bestaat aan de hand van laadprocesid, object_ref, datum en volgordenummer.
187    *
188    * @param b het bestaande bericht
189    * @return het bestaande bericht uit de database
190    * @throws SQLException if any
191    */
192   public Bericht getExistingBericht(Bericht b) throws SQLException {
193     return getBerichtByNaturalKey(b.getObjectRef(), b.getDatum().getTime(), b.getVolgordeNummer());
194   }
195 
196   private Bericht getBerichtByNaturalKey(String object_ref, long date, Integer volgnr)
197       throws SQLException {
198     List<Bericht> processen;
199     ResultSetHandler<List<Bericht>> h =
200         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
201 
202     processen =
203         new QueryRunner(geomToJdbc.isPmdKnownBroken())
204             .query(
205                 getConnection(),
206                 "SELECT * FROM "
207                     + BrmoFramework.BERICHT_TABLE
208                     + " WHERE object_ref = ?"
209                     + " AND datum = ? and volgordenummer = ?",
210                 h,
211                 object_ref,
212                 new Timestamp(date),
213                 volgnr);
214 
215     if (processen != null && processen.size() > 0) {
216       return processen.get(0);
217     }
218 
219     return null;
220   }
221 
222   public long getCountJob() throws SQLException {
223     Object o =
224         new QueryRunner(geomToJdbc.isPmdKnownBroken())
225             .query(
226                 getConnection(),
227                 "select count(*) from " + BrmoFramework.JOB_TABLE,
228                 new ScalarHandler<>());
229     if (o instanceof BigDecimal) {
230       return ((BigDecimal) o).longValue();
231     } else if (o instanceof Integer) {
232       return ((Integer) o).longValue();
233     }
234     return (Long) o;
235   }
236 
237   public boolean removeJob() throws SQLException {
238     int count =
239         new QueryRunner(geomToJdbc.isPmdKnownBroken())
240             .update(getConnection(), "truncate table " + BrmoFramework.JOB_TABLE);
241     return count > 0;
242   }
243 
244   public boolean cleanJob() throws SQLException {
245     int count;
246     if (geomToJdbc instanceof OracleJdbcConverter) {
247       count =
248           new QueryRunner(geomToJdbc.isPmdKnownBroken())
249               .update(
250                   getConnection(),
251                   "delete from "
252                       + BrmoFramework.JOB_TABLE
253                       + " j where j.id in (select b.id from "
254                       + BrmoFramework.BERICHT_TABLE
255                       + " b "
256                       // vanwege rare fout in oracle bij ophalen tabel
257                       // metadata werkt dit niet
258                       // https://issues.apache.org/jira/browse/DBUTILS-125
259                       + " where b.id = j.id and status != 'STAGING_OK')");
260 
261     } else {
262       count =
263           new QueryRunner(geomToJdbc.isPmdKnownBroken())
264               .update(
265                   getConnection(),
266                   "delete from "
267                       + BrmoFramework.JOB_TABLE
268                       + " j where j.id in (select b.id from "
269                       + BrmoFramework.BERICHT_TABLE
270                       + " b "
271                       + " where b.id = j.id and status != ?)",
272                   Bericht.STATUS.STAGING_OK.toString());
273     }
274     return count > 0;
275   }
276 
277   public long setBerichtenJobByStatus(Bericht.STATUS status, boolean orderBerichten)
278       throws SQLException {
279     StringBuilder q =
280         new StringBuilder(
281             "insert into "
282                 + BrmoFramework.JOB_TABLE
283                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
284                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
285                 + BrmoFramework.BERICHT_TABLE
286                 + " where status = ? and datum <= ? ");
287     if (orderBerichten) {
288       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
289     } else {
290       q.append(" and volgordenummer < 0 ");
291       if (this.limitStandBerichtenToTransform > 0) {
292         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
293       }
294     }
295     log.debug("pre-transformatie SQL: " + q);
296     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
297         .update(
298             getConnection(),
299             q.toString(),
300             status.toString(),
301             new java.sql.Timestamp((new Date()).getTime()));
302   }
303 
304   public long setBerichtenJobForUpdate(String soort, boolean orderBerichten) throws SQLException {
305     StringBuilder q =
306         new StringBuilder(
307             "insert into "
308                 + BrmoFramework.JOB_TABLE
309                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
310                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
311                 + BrmoFramework.BERICHT_TABLE
312                 + " where status = ? and soort = ? and datum <= ? ");
313     if (orderBerichten) {
314       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
315     } else {
316       q.append(" and volgordenummer < 0 ");
317       if (this.limitStandBerichtenToTransform > 0) {
318         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
319       }
320     }
321     log.debug("pre-transformatie SQL: " + q);
322     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
323         .update(
324             getConnection(),
325             q.toString(),
326             Bericht.STATUS.RSGB_OK.toString(),
327             soort,
328             new java.sql.Timestamp((new Date()).getTime()));
329   }
330 
331   public long setBerichtenJobByIds(long[] ids, boolean orderBerichten) throws SQLException {
332     StringBuilder q =
333         new StringBuilder(
334             "insert into "
335                 + BrmoFramework.JOB_TABLE
336                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
337                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
338                 + BrmoFramework.BERICHT_TABLE
339                 + " where id in (");
340     for (int i = 0; i < ids.length; i++) {
341       if (i != 0) {
342         q.append(",");
343       }
344       q.append(ids[i]);
345     }
346     q.append(") and datum <= ? ");
347     if (orderBerichten) {
348       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
349     } else {
350       q.append(" and volgordenummer < 0 ");
351       if (this.limitStandBerichtenToTransform > 0) {
352         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
353       }
354     }
355     log.debug("pre-transformatie SQL: " + q);
356     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
357         .update(getConnection(), q.toString(), new java.sql.Timestamp((new Date()).getTime()));
358   }
359 
360   public long setBerichtenJobByLaadprocessen(long[] laadprocesIds, boolean orderBerichten)
361       throws SQLException {
362 
363     StringBuilder q =
364         new StringBuilder(
365             "insert into "
366                 + BrmoFramework.JOB_TABLE
367                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
368                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
369                 + BrmoFramework.BERICHT_TABLE
370                 + " where laadprocesid in (");
371     for (int i = 0; i < laadprocesIds.length; i++) {
372       if (i != 0) {
373         q.append(",");
374       }
375       q.append(laadprocesIds[i]);
376     }
377     q.append(") and status = ? and datum <= ? ");
378     if (orderBerichten) {
379       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
380     } else {
381       q.append(" and volgordenummer < 0 ");
382       if (this.limitStandBerichtenToTransform > 0) {
383         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
384       }
385     }
386     log.debug("pre-transformatie SQL: " + q);
387     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
388         .update(
389             getConnection(),
390             q.toString(),
391             Bericht.STATUS.STAGING_OK.toString(),
392             new java.sql.Timestamp((new Date()).getTime()));
393   }
394 
395   public void handleBerichtenByJob(
396       long total,
397       final BerichtenHandler handler,
398       final boolean enablePipeline,
399       int transformPipelineCapacity,
400       boolean orderBerichten)
401       throws Exception {
402     Split split = SimonManager.getStopwatch("b3p.rsgb.job").start();
403     final String dateTime = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
404     Split jobSplit = SimonManager.getStopwatch("b3p.rsgb.job." + dateTime).start();
405     ((RsgbProxy) handler).setSimonNamePrefix("b3p.rsgb.job." + dateTime + ".");
406     final RowProcessor processor = new StagingRowHandler();
407 
408     final ProcessDbXmlPipeline processDbXmlPipeline =
409         new ProcessDbXmlPipeline(
410             handler, transformPipelineCapacity, "b3p.rsgb.job." + dateTime + ".pipeline");
411     if (enablePipeline) {
412       processDbXmlPipeline.start();
413     }
414     long offset = 0L;
415     int batch = batchCapacity;
416     final MutableInt processed = new MutableInt(0);
417     final MutableInt lastJid = new MutableInt(0);
418     final boolean doOrderBerichten = orderBerichten;
419     boolean abort = false;
420     try {
421       do {
422         log.debug(
423             String.format(
424                 "Ophalen berichten batch Job tabel, offset %d, limit %d",
425                 lastJid.intValue(), batch));
426         String sql =
427             "select jid, id, datum, volgordenummer, object_ref, br_xml, soort from "
428                 + BrmoFramework.JOB_TABLE
429                 + " where jid > "
430                 + lastJid.intValue()
431                 + " order by jid ";
432         sql = geomToJdbc.buildPaginationSql(sql, 0, batch);
433         log.debug("SQL voor ophalen berichten batch: " + sql);
434 
435         processed.setValue(0);
436         final Split getBerichten =
437             SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".staging.berichten.getbatch")
438                 .start();
439         Exception e =
440             new QueryRunner(geomToJdbc.isPmdKnownBroken())
441                 .query(
442                     getConnection(),
443                     sql,
444                     rs -> {
445                       getBerichten.stop();
446                       final Split processResultSet =
447                           SimonManager.getStopwatch(
448                                   "b3p.rsgb.job."
449                                       + dateTime
450                                       + ".staging.berichten.processresultset")
451                               .start();
452 
453                       while (rs.next()) {
454                         try {
455                           Bericht bericht = processor.toBean(rs, Bericht.class);
456                           if (bericht.getVolgordeNummer() >= 0 && !doOrderBerichten) {
457                             // mutaties hebben volgnr >=0 dan sortering
458                             // verplicht
459                             throw new Exception(
460                                 String.format(
461                                     "Het sorteren van berichten staat uit, terwijl bericht (id: %d) geen standbericht is (volgnummer >=0)",
462                                     bericht.getId()));
463                           }
464                           final BerichtWorkUnit workUnit = new BerichtWorkUnit(bericht);
465 
466                           if (enablePipeline) {
467                             List<TableData> tableData = handler.transformToTableData(bericht);
468                             if (tableData == null) {
469                               // Exception during transform
470                               // updateProcessingResult() is
471                               // aangeroepen, geen
472                               // updateResultPipeline
473                               continue;
474                             }
475                             workUnit.setTableData(tableData);
476                             workUnit.setTypeOfWork(BerichtTypeOfWork.PROCESS_DBXML);
477                             Split pipelinePut =
478                                 SimonManager.getStopwatch(
479                                         "b3p.rsgb.job." + dateTime + ".pipeline.processdbxml.put")
480                                     .start();
481                             processDbXmlPipeline.getQueue().put(workUnit);
482                             pipelinePut.stop();
483                           } else {
484                             Split berichtSplit =
485                                 SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".bericht")
486                                     .start();
487                             handler.handle(bericht, null, true);
488                             berichtSplit.stop();
489                           }
490                           lastJid.setValue(rs.getInt("jid"));
491                         } catch (Exception e1) {
492                           return e1;
493                         }
494                         processed.increment();
495                       }
496                       processResultSet.stop();
497                       return null;
498                     });
499 
500         offset += processed.intValue();
501 
502         // If handler threw exception processing row, rethrow it
503         if (e != null) {
504           throw e;
505         }
506 
507       } while (processed.intValue() > 0 && (offset < total));
508       if (offset < total) {
509         log.warn(String.format("Minder berichten verwerkt (%d) dan verwacht (%d)!", offset, total));
510       }
511       // als succesvol beeindigd, dan verwijderen, anders herstart mogelijk maken
512       removeJob();
513     } catch (Exception t) {
514       abort = true;
515       throw t;
516     } finally {
517       if (enablePipeline) {
518         if (abort) {
519           // Let threads stop by themselves, don't join
520           processDbXmlPipeline.setAbortFlag();
521         } else {
522           processDbXmlPipeline.stopWhenQueueEmpty();
523           try {
524             processDbXmlPipeline.join();
525           } catch (InterruptedException e) {
526           }
527         }
528       }
529     }
530     jobSplit.stop();
531     split.stop();
532   }
533 
534   public void updateBerichtenDbXml(List<Bericht> berichten, RsgbTransformer transformer)
535       throws SAXException, IOException, TransformerException {
536     for (Bericht ber : berichten) {
537       Split split = SimonManager.getStopwatch("b3p.staging.bericht.dbxml.transform").start();
538       String dbxml = transformer.transformToDbXml(ber);
539       // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
540       // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
541       if (geomToJdbc instanceof OracleJdbcConverter && dbxml != null && dbxml.length() == 8000) {
542         log.debug("DB XML is 8000 bytes, er wordt een spatie aangeplakt.");
543         dbxml += " ";
544         log.debug("DB XML is nu " + dbxml.length() + " bytes.");
545       }
546 
547       ber.setDbXml(dbxml);
548       split.stop();
549     }
550   }
551 
552   private PreparedStatement getOldBerichtStatement;
553   private PreparedStatement getOldBerichtStatementById;
554 
555   public Bericht getOldBericht(Bericht nieuwBericht, StringBuilder loadLog) throws SQLException {
556     return getOldBericht(nieuwBericht.getObjectRef(), loadLog);
557   }
558 
559   public Bericht getOldBericht(String objectRef, StringBuilder loadLog) throws SQLException {
560     Split split = SimonManager.getStopwatch("b3p.staging.bericht.getold").start();
561 
562     Bericht bericht = null;
563     ResultSetHandler<List<Bericht>> h =
564         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
565 
566     if (getOldBerichtStatement == null) {
567       String sql =
568           "SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
569               + BrmoFramework.BERICHT_TABLE
570               + " WHERE"
571               + " object_ref = ?"
572               + " AND status in ('RSGB_OK', 'ARCHIVE')"
573               + " ORDER BY datum desc, volgordenummer desc";
574       sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
575 
576       getOldBerichtStatement = getConnection().prepareStatement(sql);
577     } else {
578       getOldBerichtStatement.clearParameters();
579     }
580     getOldBerichtStatement.setString(1, objectRef);
581 
582     ResultSet rs = getOldBerichtStatement.executeQuery();
583     List<Bericht> list = h.handle(rs);
584     rs.close();
585 
586     if (!list.isEmpty()) {
587       loadLog.append("Vorig bericht gevonden:\n");
588       for (Bericht b : list) {
589         if ((Bericht.STATUS.RSGB_OK.equals(b.getStatus())
590                 || Bericht.STATUS.ARCHIVE.equals(b.getStatus()))
591             && bericht == null) {
592           loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
593           bericht = b;
594         } else {
595           loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
596         }
597       }
598     }
599 
600     if (bericht != null) {
601       // bericht nu wel vullen met alle kolommen
602       if (getOldBerichtStatementById == null) {
603         String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
604 
605         getOldBerichtStatementById = getConnection().prepareStatement(sql);
606       } else {
607         getOldBerichtStatementById.clearParameters();
608       }
609       getOldBerichtStatementById.setLong(1, bericht.getId());
610 
611       ResultSet rs2 = getOldBerichtStatementById.executeQuery();
612       List<Bericht> list2 = h.handle(rs2);
613       rs2.close();
614 
615       if (!list2.isEmpty()) {
616         bericht = list2.get(0);
617       }
618     }
619 
620     split.stop();
621     return bericht;
622   }
623 
624   private PreparedStatement getPreviousBerichtStatement;
625 
626   public Bericht getPreviousBericht(Bericht ber, StringBuilder loadLog) throws SQLException {
627     return getPreviousBericht(ber.getObjectRef(), ber.getDatum(), ber.getId(), loadLog);
628   }
629 
630   /** Gets the previous bericht (not the first). */
631   public Bericht getPreviousBericht(
632       String objectRef, Date datum, Long currentBerichtId, StringBuilder loadLog)
633       throws SQLException {
634     Split split = SimonManager.getStopwatch("b3p.staging.bericht.getprevious").start();
635 
636     Bericht bericht = null;
637     ResultSetHandler<List<Bericht>> h =
638         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
639 
640     if (getPreviousBerichtStatement == null) {
641       String sql =
642           "SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
643               + BrmoFramework.BERICHT_TABLE
644               + " WHERE"
645               + " object_ref = ? and datum <= ? and id <> ?"
646               + " ORDER BY datum asc, volgordenummer desc";
647       sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
648 
649       getPreviousBerichtStatement = getConnection().prepareStatement(sql);
650     } else {
651       getPreviousBerichtStatement.clearParameters();
652     }
653     getPreviousBerichtStatement.setString(1, objectRef);
654     getPreviousBerichtStatement.setTimestamp(2, new java.sql.Timestamp(datum.getTime()));
655     getPreviousBerichtStatement.setLong(3, currentBerichtId);
656 
657     ResultSet rs = getPreviousBerichtStatement.executeQuery();
658     List<Bericht> list = h.handle(rs);
659     rs.close();
660 
661     if (!list.isEmpty()) {
662       loadLog.append("Vorig bericht gevonden:\n");
663       for (Bericht b : list) {
664         if (bericht == null) {
665           loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
666           bericht = b;
667         } else {
668           loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
669         }
670       }
671     }
672 
673     if (bericht != null) {
674       // bericht nu wel vullen met alle kolommen
675       if (getOldBerichtStatementById == null) {
676         String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
677 
678         getOldBerichtStatementById = getConnection().prepareStatement(sql);
679       } else {
680         getOldBerichtStatementById.clearParameters();
681       }
682       getOldBerichtStatementById.setLong(1, bericht.getId());
683 
684       ResultSet rs2 = getOldBerichtStatementById.executeQuery();
685       List<Bericht> list2 = h.handle(rs2);
686       rs2.close();
687 
688       if (!list2.isEmpty()) {
689         bericht = list2.get(0);
690       }
691     }
692 
693     split.stop();
694     return bericht;
695   }
696 
697   /**
698    * Update (overschrijft) een bericht in job tabel. (object_ref, datum, volgordenummer, soort,
699    * opmerking, status, status_datum, br_xml, br_orgineel_xml, db_xml, xsl_version)
700    *
701    * @param b bij te werken bericht
702    * @throws SQLException if any
703    */
704   public void updateBericht(Bericht b) throws SQLException {
705     Split split = SimonManager.getStopwatch("b3p.staging.bericht.update").start();
706     String brXML = b.getBrXml();
707     // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
708     // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
709     if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
710       log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
711       brXML += " ";
712       log.debug("BR XML is nu " + brXML.length() + " bytes.");
713     }
714 
715     new QueryRunner(geomToJdbc.isPmdKnownBroken())
716         .update(
717             getConnection(),
718             "UPDATE "
719                 + BrmoFramework.BERICHT_TABLE
720                 + " set "
721                 + "object_ref = ?, "
722                 + "datum = ?, "
723                 + "volgordenummer = ?, "
724                 + "soort = ?, "
725                 + "opmerking = ?, "
726                 + "status = ?, "
727                 + "status_datum = ?, "
728                 + "br_xml = ?, "
729                 + "br_orgineel_xml = ?, "
730                 + "db_xml = ?, "
731                 + "xsl_version = ? "
732                 + "WHERE id = ?",
733             b.getObjectRef(),
734             new Timestamp(b.getDatum().getTime()),
735             b.getVolgordeNummer(),
736             b.getSoort(),
737             b.getOpmerking(),
738             b.getStatus().toString(),
739             new Timestamp(b.getStatusDatum().getTime()),
740             brXML,
741             b.getBrOrgineelXml(),
742             b.getDbXml(),
743             b.getXslVersion(),
744             b.getId());
745     split.stop();
746   }
747 
748   /**
749    * Update een bericht in job tabel met processing status. (opmerking, status, status_datum,
750    * db_xml)
751    *
752    * @param b updatebericht
753    * @throws SQLException als de update mislukt
754    */
755   public void updateBerichtProcessing(Bericht b) throws SQLException {
756     Split split = SimonManager.getStopwatch("b3p.staging.bericht.updateprocessing").start();
757 
758     new QueryRunner(geomToJdbc.isPmdKnownBroken())
759         .update(
760             getConnection(),
761             "UPDATE "
762                 + BrmoFramework.BERICHT_TABLE
763                 + " set "
764                 + "opmerking = ?, "
765                 + "status = ?, "
766                 + "status_datum = ?, "
767                 + "db_xml = ? "
768                 + " WHERE id = ?",
769             b.getOpmerking(),
770             b.getStatus().toString(),
771             new Timestamp(b.getStatusDatum().getTime()),
772             b.getDbXml(),
773             b.getId());
774     split.stop();
775   }
776 
777   public void deleteByLaadProcesId(Long id) throws SQLException {
778     new QueryRunner(geomToJdbc.isPmdKnownBroken())
779         .update(
780             getConnection(),
781             "DELETE FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
782             id);
783     new QueryRunner(geomToJdbc.isPmdKnownBroken())
784         .update(
785             getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?", id);
786   }
787 
788   public List<LaadProces> getLaadProcessen() throws SQLException {
789     List<LaadProces> list;
790     ResultSetHandler<List<LaadProces>> h =
791         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
792     list =
793         new QueryRunner(geomToJdbc.isPmdKnownBroken())
794             .query(getConnection(), "select * from " + BrmoFramework.LAADPROCES_TABEL, h);
795     return list;
796   }
797 
798   public List<Bericht> getBerichten() throws SQLException {
799     List<Bericht> berichten;
800     ResultSetHandler<List<Bericht>> h =
801         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
802     berichten =
803         new QueryRunner(geomToJdbc.isPmdKnownBroken())
804             .query(getConnection(), "select * from " + BrmoFramework.BERICHT_TABLE, h);
805     return berichten;
806   }
807 
808   /**
809    * Laadt het bestand uit de stream in de database.
810    *
811    * @param stream input
812    * @param type type registratie, bijv. {@value BrmoFramework#BR_BRK2}
813    * @param fileName naam van het bestand (ter identificatie)
814    * @param d bestandsdatum
815    * @param listener progress listener
816    * @throws Exception if any
817    * @deprecated gebruik de variant die een automatischProcesId als argument heeft
818    * @see #loadBr(InputStream, String, String, Date, ProgressUpdateListener, Long)
819    */
820   @Deprecated
821   public void loadBr(
822       InputStream stream, String type, String fileName, Date d, ProgressUpdateListener listener)
823       throws Exception {
824     this.loadBr(stream, type, fileName, d, listener, null);
825   }
826 
827   /**
828    * Laadt het bestand uit de stream in de database.
829    *
830    * @param stream input
831    * @param type type registratie, bijv. {@value BrmoFramework#BR_BRK2}
832    * @param fileName naam van het bestand (ter identificatie)
833    * @param d bestandsdatum
834    * @param automatischProces id van het automatisch proces
835    * @param listener progress listener
836    * @throws Exception if any
837    */
838   public void loadBr(
839       InputStream stream,
840       String type,
841       String fileName,
842       Date d,
843       ProgressUpdateListener listener,
844       Long automatischProces)
845       throws Exception {
846 
847     CountingInputStream cis = new CountingInputStream(stream);
848 
849     BrmoXMLReader brmoXMLReader;
850     if (type.equals(BrmoFramework.BR_BRK2)) {
851       brmoXMLReader = new Brk2SnapshotXMLReader(cis);
852     } else if (type.equals(BrmoFramework.BR_NHR)) {
853       brmoXMLReader = new NhrXMLReader(cis);
854     } else if (type.equals(BrmoFramework.BR_BRP)) {
855       brmoXMLReader = new BRPXMLReader(cis, d, this);
856     } else if (type.equals(BrmoFramework.BR_GBAV)) {
857       brmoXMLReader = new GbavXMLReader(cis);
858     } else if (type.equals(BrmoFramework.BR_WOZ)) {
859       brmoXMLReader = new WozXMLReader(cis, d, this);
860     } else {
861       throw new UnsupportedOperationException("Ongeldige basisregistratie: " + type);
862     }
863 
864     if (brmoXMLReader.getBestandsDatum() == null) {
865       throw new BrmoException("Header van bestand bevat geen datum, verkeerd formaat?");
866     }
867 
868     LaadProces lp = new LaadProces();
869     lp.setBestandNaam(fileName);
870     lp.setBestandDatum(brmoXMLReader.getBestandsDatum());
871     lp.setSoort(type);
872     lp.setGebied(brmoXMLReader.getGebied());
873     lp.setStatus(LaadProces.STATUS.STAGING_OK);
874     lp.setStatusDatum(new Date());
875     lp.setAutomatischProcesId(automatischProces);
876 
877     if (laadProcesExists(lp)) {
878       throw new BrmoDuplicaatLaadprocesException("Laadproces al gerund voor bestand " + fileName);
879     }
880     lp = writeLaadProces(lp);
881 
882     if (!brmoXMLReader.hasNext()) {
883       updateLaadProcesStatus(
884           lp, LaadProces.STATUS.STAGING_OK, "Leeg bestand, geen berichten gevonden in " + fileName);
885       throw new BrmoLeegBestandException("Leeg bestand, geen berichten gevonden in " + fileName);
886     }
887 
888     boolean isBerichtGeschreven = false;
889     int berichten = 0;
890     int foutBerichten = 0;
891     String lastErrorMessage = null;
892 
893     while (brmoXMLReader.hasNext()) {
894       Bericht b;
895       try {
896         b = brmoXMLReader.next();
897         b.setLaadProcesId(lp.getId());
898         b.setStatus(Bericht.STATUS.STAGING_OK);
899         b.setStatusDatum(new Date());
900         b.setSoort(type);
901 
902         if (StringUtils.isEmpty(b.getObjectRef())) {
903           // geen object_ref kunnen vaststellen; dan ook niet transformeren,
904           // bijvoorbeeld bij WOZ
905           b.setStatus(Bericht.STATUS.STAGING_NOK);
906           b.setOpmerking(
907               "Er kon geen object_ref bepaald worden uit de natuurlijke sleutel van het bericht.");
908         }
909 
910         if (b.getDatum() == null) {
911           throw new BrmoException("Datum bericht is null");
912         }
913         log.debug(b);
914 
915         Bericht existingBericht = getExistingBericht(b);
916         // TODO BRK2 bepalen of dit nog nodig is voor BRK2
917         if (type.equals(BrmoFramework.BR_BRK2) && !isBerichtGeschreven) {
918           // haal alleen voor eerste
919           Brk2Bericht brk2Bericht = (Brk2Bericht) b;
920           lp.setBestandNaamHersteld(
921               brk2Bericht.getRestoredFileName(lp.getBestandDatum(), b.getVolgordeNummer()));
922           updateLaadProcesBestandNaamHersteld(lp);
923         }
924 
925         if (existingBericht == null) {
926           writeBericht(b);
927           isBerichtGeschreven = true;
928         } else if (existingBericht.getStatus().equals(Bericht.STATUS.STAGING_OK)) {
929           // als bericht nog niet getransformeerd is, dan overschrijven.
930           b.setId(existingBericht.getId());
931           this.updateBericht(b);
932         }
933         if (listener != null) {
934           listener.progress(cis.getByteCount());
935         }
936         berichten++;
937       } catch (Exception e) {
938         lastErrorMessage =
939             String.format(
940                 "Laden bericht uit %s mislukt vanwege: %s", fileName, e.getLocalizedMessage());
941         log.error(lastErrorMessage);
942         log.trace(lastErrorMessage, e);
943         if (listener != null) {
944           listener.exception(e);
945         }
946         foutBerichten++;
947       }
948     }
949     if (listener != null) {
950       listener.total(berichten);
951     }
952     if (foutBerichten > 0) {
953       String opmerking =
954           "Laden van "
955               + foutBerichten
956               + " bericht(en) mislukt, laatste melding: "
957               + lastErrorMessage
958               + ", zie logs voor meer info.";
959       this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_NOK, opmerking);
960     } else if (!isBerichtGeschreven) {
961       String opmerking = "Dit bestand is waarschijnlijk al eerder geladen.";
962       this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_DUPLICAAT, opmerking);
963     }
964   }
965 
966   public void writeBericht(Bericht b) throws SQLException {
967     String brXML = b.getBrXml();
968     // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
969     // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
970     if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
971       log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
972       brXML += " ";
973       log.debug("BR XML is nu " + brXML.length() + " bytes.");
974     }
975     Object berId =
976         new QueryRunner(geomToJdbc.isPmdKnownBroken())
977             .insert(
978                 getConnection(),
979                 "INSERT INTO "
980                     + BrmoFramework.BERICHT_TABLE
981                     + " (laadprocesid, "
982                     + "object_ref, "
983                     + "datum, "
984                     + "volgordenummer, "
985                     + "soort, "
986                     + "opmerking, "
987                     + "status, "
988                     + "status_datum, "
989                     + "br_xml, "
990                     + "br_orgineel_xml, "
991                     + "db_xml, "
992                     + "xsl_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
993                 new ScalarHandler<>(),
994                 b.getLaadProcesId(),
995                 b.getObjectRef(),
996                 new Timestamp(b.getDatum().getTime()),
997                 b.getVolgordeNummer(),
998                 b.getSoort(),
999                 b.getOpmerking(),
1000                 b.getStatus().toString(),
1001                 new Timestamp(b.getStatusDatum().getTime()),
1002                 brXML,
1003                 b.getBrOrgineelXml(),
1004                 b.getDbXml(),
1005                 b.getXslVersion());
1006     log.trace("opgeslagen bericht heeft (row) id: " + berId);
1007   }
1008 
1009   /** Bepaal aan de hand van bestandsnaam en bestandsdatum van het laadproces of dit al bestaat. */
1010   private boolean laadProcesExists(LaadProces lp) throws SQLException {
1011     Object o =
1012         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1013             .query(
1014                 getConnection(),
1015                 "select 1 from laadproces where bestand_naam = ? and" + " bestand_datum = ?",
1016                 new ScalarHandler<>(),
1017                 lp.getBestandNaam(),
1018                 new Timestamp(lp.getBestandDatum().getTime()));
1019 
1020     return o != null;
1021   }
1022 
1023   private LaadProces writeLaadProces(LaadProces lp) throws SQLException {
1024     log.trace("opslaan van laadproces: " + lp);
1025     if (lp == null) {
1026       return null;
1027     }
1028 
1029     final String sql =
1030         "INSERT INTO "
1031             + BrmoFramework.LAADPROCES_TABEL
1032             + "(bestand_naam, "
1033             + "bestand_datum, soort, gebied, opmerking, status, "
1034             + "status_datum, contact_email, automatisch_proces "
1035             // nieuwe kolommen in 2.0.0
1036             + ", afgifteid, afgiftereferentie, artikelnummer, beschikbaar_tot, bestandsreferentie, "
1037             + "contractafgiftenummer, contractnummer, klantafgiftenummer, bestand_naam_hersteld"
1038             + ") VALUES (?,?,?,?,?,?,?,?,?"
1039             + ",?,?,?,?,?,?,?,?,?"
1040             + ")";
1041 
1042     // Oracle geeft een ROWID terug als "generated key" en niet de PK-kolom "ID" die we willen
1043     // hebben, daarom zelf doen
1044     // zie ook: https://issues.apache.org/jira/browse/DBUTILS-54
1045     QueryRunner queryRunner = new QueryRunner(geomToJdbc.isPmdKnownBroken());
1046     try (
1047     // PG: Caused by: org.postgresql.util.PSQLException: ERROR: column "ID" does not exist
1048     // maar lowercase werkt ook met oracle
1049     PreparedStatement stmt = getConnection().prepareStatement(sql, new String[] {"id"})
1050     // door een bug in de PG driver geen kolom index gebruiken, zie
1051     // https://github.com/pgjdbc/pgjdbc/issues/1661
1052     // PreparedStatement stmt = getConnection().prepareStatement(sql, new int[]{1});
1053     ) {
1054       queryRunner.fillStatement(
1055           stmt,
1056           lp.getBestandNaam(),
1057           new Timestamp(lp.getBestandDatum().getTime()),
1058           lp.getSoort(),
1059           lp.getGebied(),
1060           lp.getOpmerking(),
1061           lp.getStatus().toString(),
1062           new Timestamp(lp.getStatusDatum().getTime()),
1063           lp.getContactEmail(),
1064           lp.getAutomatischProcesId(),
1065           // nieuwe kolommen in 2.0.0
1066           lp.getAfgifteid(),
1067           lp.getAfgiftereferentie(),
1068           lp.getArtikelnummer(),
1069           (lp.getBeschikbaar_tot() == null)
1070               ? null
1071               : new Timestamp(lp.getBeschikbaar_tot().getTime()),
1072           lp.getBestandsreferentie(),
1073           lp.getContractafgiftenummer(),
1074           lp.getContractnummer(),
1075           lp.getKlantafgiftenummer(),
1076           lp.getBestandNaamHersteld());
1077       stmt.executeUpdate();
1078       ResultSetHandler<Number> rsh = new ScalarHandler<>();
1079       Number lpId = rsh.handle(stmt.getGeneratedKeys());
1080       log.trace("opgeslagen laadproces heeft id: " + lpId);
1081       return this.getLaadProcesById(lpId.longValue());
1082     }
1083   }
1084 
1085   public void updateLaadProcesStatus(LaadProces lp, LaadProces.STATUS status, String opmerking)
1086       throws SQLException {
1087     log.trace("update van laadproces status: " + lp);
1088     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1089         .update(
1090             getConnection(),
1091             "update "
1092                 + BrmoFramework.LAADPROCES_TABEL
1093                 + " set status = ?, opmerking = ?, status_datum = ? where id = ?",
1094             status.toString(),
1095             opmerking,
1096             new Timestamp(new Date().getTime()),
1097             lp.getId());
1098   }
1099 
1100   /**
1101    * update laadproces (GDS2 afgifte) metadata.
1102    *
1103    * @param lpId laadproces id
1104    * @param klantafgiftenummer klantafgiftenummer
1105    * @param contractafgiftenummer contractafgiftenummer
1106    * @param artikelnummer artikelnummer
1107    * @param contractnummer contractnummer
1108    * @param afgifteid afgifteid
1109    * @param afgiftereferentie afgiftereferentie
1110    * @param bestandsreferentie bestandsreferentie
1111    * @param beschikbaar_tot beschikbaar_tot
1112    * @throws SQLException if any
1113    */
1114   public void updateLaadProcesMeta(
1115       Long lpId,
1116       BigInteger klantafgiftenummer,
1117       BigInteger contractafgiftenummer,
1118       String artikelnummer,
1119       String contractnummer,
1120       String afgifteid,
1121       String afgiftereferentie,
1122       String bestandsreferentie,
1123       Date beschikbaar_tot)
1124       throws SQLException {
1125     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1126         .update(
1127             getConnection(),
1128             "update "
1129                 + BrmoFramework.LAADPROCES_TABEL
1130                 + " set "
1131                 + "afgifteid = ?,"
1132                 + "afgiftereferentie = ?,"
1133                 + "artikelnummer = ?,"
1134                 + "beschikbaar_tot = ?,"
1135                 + "bestandsreferentie = ?,"
1136                 + "contractafgiftenummer = ?,"
1137                 + "contractnummer = ?,"
1138                 + "klantafgiftenummer = ?"
1139                 + "where id = ?",
1140             afgifteid,
1141             afgiftereferentie,
1142             artikelnummer,
1143             new Timestamp(beschikbaar_tot.getTime()),
1144             bestandsreferentie,
1145             contractafgiftenummer,
1146             contractnummer,
1147             klantafgiftenummer,
1148             lpId);
1149   }
1150 
1151   public void updateLaadProcesBestandNaamHersteld(LaadProces lp) throws SQLException {
1152     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1153         .update(
1154             getConnection(),
1155             "update "
1156                 + BrmoFramework.LAADPROCES_TABEL
1157                 + " set bestand_naam_hersteld = ? where id = ?",
1158             lp.getBestandNaamHersteld(),
1159             lp.getId());
1160   }
1161 
1162   public void emptyStagingDb() throws SQLException {
1163     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1164         .update(getConnection(), "DELETE FROM " + BrmoFramework.BERICHT_TABLE);
1165     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1166         .update(getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL);
1167   }
1168 
1169   public List<Bericht> getBerichten(
1170       int page,
1171       int start,
1172       int limit,
1173       String sort,
1174       String dir,
1175       String filterSoort,
1176       String filterStatus)
1177       throws SQLException {
1178 
1179     List<String> params = new ArrayList<>();
1180     if (sort == null || sort.trim().isEmpty()) {
1181       sort = "id";
1182     }
1183     if (dir == null || dir.trim().isEmpty()) {
1184       sort = "asc";
1185     }
1186     String sql =
1187         "SELECT * FROM "
1188             + BrmoFramework.BERICHT_TABLE
1189             + buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
1190 
1191     sql = geomToJdbc.buildPaginationSql(sql, start, limit);
1192 
1193     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
1194         .query(
1195             getConnection(),
1196             sql,
1197             new BeanListHandler<>(Bericht.class, new StagingRowHandler()),
1198             params.toArray());
1199   }
1200 
1201   public long getCountBerichten(String filterSoort, String filterStatus) throws SQLException {
1202 
1203     List<String> params = new ArrayList<>();
1204     String filter = buildFilterSql(0, null, null, filterSoort, filterStatus, params);
1205 
1206     String sql;
1207     // gebruik estimate voor postgresql indien geen filter
1208     if (StringUtils.isBlank(filter) && geomToJdbc instanceof PostgisJdbcConverter) {
1209       sql =
1210           "SELECT reltuples::BIGINT AS estimate FROM pg_class WHERE relname= '"
1211               + BrmoFramework.BERICHT_TABLE
1212               + "'";
1213     } else {
1214       sql = "SELECT count(*) FROM " + BrmoFramework.BERICHT_TABLE + filter;
1215     }
1216 
1217     ResultSet rs = null;
1218     PreparedStatement pstmt = null;
1219     try {
1220       pstmt = getConnection().prepareStatement(sql);
1221       try {
1222         pstmt.setQueryTimeout(300); // seconds to wait
1223       } catch (Exception e) {
1224         log.warn("Driver does not support setQueryTimeout, please update driver.");
1225       }
1226       if (!params.isEmpty()) {
1227         for (int i = 0; i < params.size(); i++) {
1228           if (params.get(i) != null) {
1229             pstmt.setObject(i + 1, params.get(i));
1230           } else {
1231             pstmt.setNull(i + 1, Types.VARCHAR);
1232           }
1233         }
1234       }
1235       rs = pstmt.executeQuery();
1236       if (rs.next()) {
1237         Object o = rs.getObject(1);
1238         if (o instanceof BigDecimal) {
1239           return ((BigDecimal) o).longValue();
1240         } else if (o instanceof Integer) {
1241           return ((Integer) o).longValue();
1242         }
1243         return (Long) o;
1244       } else {
1245         return -1;
1246       }
1247     } catch (Exception e) {
1248       return -1;
1249     } finally {
1250       try {
1251         if (rs != null && !rs.isClosed()) {
1252           rs.close();
1253         }
1254       } catch (SQLException e) {
1255         // ignore
1256       }
1257       try {
1258         if (pstmt != null && !pstmt.isClosed()) {
1259           pstmt.close();
1260         }
1261       } catch (SQLException e) {
1262         // ignore
1263       }
1264     }
1265   }
1266 
1267   private String buildFilterSql(
1268       int page,
1269       String sort,
1270       String dir,
1271       String filterSoort,
1272       String filterStatus,
1273       List<String> params) {
1274 
1275     StringBuilder builder = new StringBuilder();
1276 
1277     List<String> conjunctions = new ArrayList<>();
1278 
1279     if (!StringUtils.isBlank(filterSoort)) {
1280       String[] soorten = filterSoort.split(",");
1281       params.addAll(Arrays.asList(soorten));
1282       conjunctions.add("soort in (" + StringUtils.repeat("?", ", ", soorten.length) + ")");
1283     }
1284 
1285     if (!StringUtils.isBlank(filterStatus)) {
1286       String[] statussen = filterStatus.split(",");
1287       params.addAll(Arrays.asList(statussen));
1288       conjunctions.add("status in (" + StringUtils.repeat("?", ", ", statussen.length) + ")");
1289     }
1290 
1291     // build where part
1292     if (!conjunctions.isEmpty()) {
1293       builder.append(" where ");
1294       builder.append(StringUtils.join(conjunctions.toArray(), " and "));
1295     }
1296 
1297     // build order by part
1298     if (sort != null && dir != null) {
1299       builder.append(" ORDER BY ");
1300       builder.append(sort);
1301       builder.append(" ");
1302       builder.append(dir);
1303     }
1304 
1305     return builder.toString();
1306   }
1307 
1308   public long getCountLaadProces(String filterSoort, String filterStatus) throws SQLException {
1309 
1310     List<String> params = new ArrayList<>();
1311     String sql =
1312         "SELECT count(*) FROM "
1313             + BrmoFramework.LAADPROCES_TABEL
1314             + buildFilterSql(0, null, null, filterSoort, filterStatus, params);
1315 
1316     Object o =
1317         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1318             .query(getConnection(), sql, new ScalarHandler<>(), params.toArray());
1319     if (o instanceof BigDecimal) {
1320       return ((BigDecimal) o).longValue();
1321     } else if (o instanceof Integer) {
1322       return ((Integer) o).longValue();
1323     }
1324     return (Long) o;
1325   }
1326 
1327   public List<LaadProces> getLaadprocessen(
1328       int page,
1329       int start,
1330       int limit,
1331       String sort,
1332       String dir,
1333       String filterSoort,
1334       String filterStatus)
1335       throws SQLException {
1336 
1337     List<String> params = new ArrayList<>();
1338     if (sort == null || sort.trim().isEmpty()) {
1339       sort = "id";
1340     }
1341     if (dir == null || dir.trim().isEmpty()) {
1342       sort = "asc";
1343     }
1344     String sql =
1345         "SELECT * FROM "
1346             + BrmoFramework.LAADPROCES_TABEL
1347             + buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
1348 
1349     sql = geomToJdbc.buildPaginationSql(sql, start, limit);
1350 
1351     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
1352         .query(
1353             getConnection(),
1354             sql,
1355             new BeanListHandler<>(LaadProces.class, new StagingRowHandler()),
1356             params.toArray());
1357   }
1358 
1359   public Long[] getLaadProcessenIds(
1360       String sort, String dir, String filterSoort, String filterStatus) throws SQLException {
1361     List<String> params = new ArrayList<>();
1362     if (sort == null || sort.trim().isEmpty()) {
1363       sort = "id";
1364     }
1365     if (dir == null || dir.trim().isEmpty()) {
1366       dir = "asc";
1367     }
1368     String sql =
1369         "SELECT ID FROM "
1370             + BrmoFramework.LAADPROCES_TABEL
1371             + buildFilterSql(-1, sort, dir, filterSoort, filterStatus, params);
1372     List<Long> ids =
1373         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1374             .query(getConnection(), sql, new LongColumnListHandler("id"), params.toArray());
1375     return ids.toArray(new Long[ids.size()]);
1376   }
1377 
1378   /**
1379    * @param batchCapacity the batchCapacity to set
1380    */
1381   public void setBatchCapacity(Integer batchCapacity) {
1382     this.batchCapacity = batchCapacity;
1383   }
1384 
1385   public void setLimitStandBerichtenToTransform(Integer limitStandBerichtenToTransform) {
1386     this.limitStandBerichtenToTransform = limitStandBerichtenToTransform;
1387   }
1388 }