View Javadoc
1   package nl.b3p.brmo.loader;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.math.BigDecimal;
6   import java.math.BigInteger;
7   import java.sql.*;
8   import java.text.SimpleDateFormat;
9   import java.util.ArrayList;
10  import java.util.Arrays;
11  import java.util.Date;
12  import java.util.List;
13  import javax.sql.DataSource;
14  import javax.xml.transform.TransformerException;
15  import nl.b3p.brmo.loader.entity.Bericht;
16  import nl.b3p.brmo.loader.entity.BerichtenSorter;
17  import nl.b3p.brmo.loader.entity.Brk2Bericht;
18  import nl.b3p.brmo.loader.entity.BrkBericht;
19  import nl.b3p.brmo.loader.entity.LaadProces;
20  import nl.b3p.brmo.loader.pipeline.BerichtTypeOfWork;
21  import nl.b3p.brmo.loader.pipeline.BerichtWorkUnit;
22  import nl.b3p.brmo.loader.pipeline.ProcessDbXmlPipeline;
23  import nl.b3p.brmo.loader.util.BrmoDuplicaatLaadprocesException;
24  import nl.b3p.brmo.loader.util.BrmoException;
25  import nl.b3p.brmo.loader.util.BrmoLeegBestandException;
26  import nl.b3p.brmo.loader.util.RsgbTransformer;
27  import nl.b3p.brmo.loader.util.StagingRowHandler;
28  import nl.b3p.brmo.loader.util.TableData;
29  import nl.b3p.brmo.loader.xml.BRPXMLReader;
30  import nl.b3p.brmo.loader.xml.Brk2SnapshotXMLReader;
31  import nl.b3p.brmo.loader.xml.BrkSnapshotXMLReader;
32  import nl.b3p.brmo.loader.xml.BrmoXMLReader;
33  import nl.b3p.brmo.loader.xml.GbavXMLReader;
34  import nl.b3p.brmo.loader.xml.NhrXMLReader;
35  import nl.b3p.brmo.loader.xml.TopNLFileReader;
36  import nl.b3p.brmo.loader.xml.WozXMLReader;
37  import nl.b3p.jdbc.util.converter.GeometryJdbcConverter;
38  import nl.b3p.jdbc.util.converter.GeometryJdbcConverterFactory;
39  import nl.b3p.jdbc.util.converter.OracleJdbcConverter;
40  import nl.b3p.jdbc.util.converter.PostgisJdbcConverter;
41  import nl.b3p.jdbc.util.dbutils.LongColumnListHandler;
42  import nl.b3p.topnl.TopNLType;
43  import org.apache.commons.dbutils.DbUtils;
44  import org.apache.commons.dbutils.QueryRunner;
45  import org.apache.commons.dbutils.ResultSetHandler;
46  import org.apache.commons.dbutils.RowProcessor;
47  import org.apache.commons.dbutils.handlers.BeanListHandler;
48  import org.apache.commons.dbutils.handlers.ScalarHandler;
49  import org.apache.commons.io.input.CountingInputStream;
50  import org.apache.commons.lang3.StringUtils;
51  import org.apache.commons.lang3.mutable.MutableInt;
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.javasimon.SimonManager;
55  import org.javasimon.Split;
56  import org.xml.sax.SAXException;
57  
58  /**
59   * @author Boy de Wit
60   */
61  public class StagingProxy {
62  
63    private static final Log log = LogFactory.getLog(StagingProxy.class);
64  
65    private Connection connStaging;
66    private DataSource dataSourceStaging;
67    private GeometryJdbcConverter geomToJdbc;
68    private Integer batchCapacity = 150;
69    private Integer limitStandBerichtenToTransform = -1;
70  
71    public StagingProxy(DataSource dataSourceStaging) throws SQLException, BrmoException {
72      this.dataSourceStaging = dataSourceStaging;
73      this.connStaging = getConnection();
74      geomToJdbc = GeometryJdbcConverterFactory.getGeometryJdbcConverter(connStaging);
75    }
76  
77    public void closeStagingProxy() {
78      if (getOldBerichtStatement != null) {
79        DbUtils.closeQuietly(getOldBerichtStatement);
80      }
81      DbUtils.closeQuietly(connStaging);
82    }
83  
84    private Connection getConnection() throws SQLException {
85      if (connStaging == null || connStaging.isClosed()) {
86        connStaging = dataSourceStaging.getConnection();
87      }
88      return connStaging;
89    }
90  
91    public LaadProces getLaadProcesById(Long id) throws SQLException {
92      List<LaadProces> processen;
93  
94      ResultSetHandler<List<LaadProces>> h =
95          new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
96  
97      processen =
98          new QueryRunner(geomToJdbc.isPmdKnownBroken())
99              .query(
100                 getConnection(),
101                 "SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?",
102                 h,
103                 id);
104 
105     if (processen != null && processen.size() == 1) {
106       return processen.get(0);
107     }
108 
109     return null;
110   }
111 
112   public Bericht getBerichtById(Long id) throws SQLException {
113     List<Bericht> berichten;
114     ResultSetHandler<List<Bericht>> h =
115         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
116 
117     berichten =
118         new QueryRunner(geomToJdbc.isPmdKnownBroken())
119             .query(
120                 getConnection(),
121                 "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?",
122                 h,
123                 id);
124 
125     if (berichten != null && berichten.size() == 1) {
126       return berichten.get(0);
127     }
128 
129     return null;
130   }
131 
132   public List<Bericht> getBerichtByLaadProces(LaadProces lp) throws SQLException {
133     List<Bericht> berichten;
134     ResultSetHandler<List<Bericht>> h =
135         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
136 
137     berichten =
138         new QueryRunner(geomToJdbc.isPmdKnownBroken())
139             .query(
140                 getConnection(),
141                 "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
142                 h,
143                 lp.getId());
144     return berichten;
145   }
146 
147   public LaadProces getLaadProcesByFileName(String name) throws SQLException {
148     List<LaadProces> processen;
149     ResultSetHandler<List<LaadProces>> h =
150         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
151 
152     processen =
153         new QueryRunner(geomToJdbc.isPmdKnownBroken())
154             .query(
155                 getConnection(),
156                 "SELECT * FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE bestand_naam = ?",
157                 h,
158                 name);
159 
160     if (processen != null && processen.size() == 1) {
161       return processen.get(0);
162     }
163 
164     return null;
165   }
166 
167   public LaadProces getLaadProcesByRestoredFilename(String name) throws SQLException {
168     List<LaadProces> processen;
169     ResultSetHandler<List<LaadProces>> h =
170         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
171 
172     processen =
173         new QueryRunner(geomToJdbc.isPmdKnownBroken())
174             .query(
175                 getConnection(),
176                 "SELECT * FROM "
177                     + BrmoFramework.LAADPROCES_TABEL
178                     + " WHERE bestand_naam_hersteld = ?",
179                 h,
180                 name);
181 
182     if (processen != null && processen.size() == 1) {
183       return processen.get(0);
184     }
185 
186     return null;
187   }
188 
189   /**
190    * bepaal of bericht bestaat aan de hand van laadprocesid, object_ref, datum en volgordenummer.
191    *
192    * @param b het bestaande bericht
193    * @return het bestaande bericht uit de database
194    * @throws SQLException if any
195    */
196   public Bericht getExistingBericht(Bericht b) throws SQLException {
197     return getBerichtByNaturalKey(b.getObjectRef(), b.getDatum().getTime(), b.getVolgordeNummer());
198   }
199 
200   private Bericht getBerichtByNaturalKey(String object_ref, long date, Integer volgnr)
201       throws SQLException {
202     List<Bericht> processen;
203     ResultSetHandler<List<Bericht>> h =
204         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
205 
206     processen =
207         new QueryRunner(geomToJdbc.isPmdKnownBroken())
208             .query(
209                 getConnection(),
210                 "SELECT * FROM "
211                     + BrmoFramework.BERICHT_TABLE
212                     + " WHERE object_ref = ?"
213                     + " AND datum = ? and volgordenummer = ?",
214                 h,
215                 object_ref,
216                 new Timestamp(date),
217                 volgnr);
218 
219     if (processen != null && processen.size() > 0) {
220       return processen.get(0);
221     }
222 
223     return null;
224   }
225 
226   public long getCountJob() throws SQLException {
227     Object o =
228         new QueryRunner(geomToJdbc.isPmdKnownBroken())
229             .query(
230                 getConnection(),
231                 "select count(*) from " + BrmoFramework.JOB_TABLE,
232                 new ScalarHandler<>());
233     if (o instanceof BigDecimal) {
234       return ((BigDecimal) o).longValue();
235     } else if (o instanceof Integer) {
236       return ((Integer) o).longValue();
237     }
238     return (Long) o;
239   }
240 
241   public boolean removeJob() throws SQLException {
242     int count =
243         new QueryRunner(geomToJdbc.isPmdKnownBroken())
244             .update(getConnection(), "truncate table " + BrmoFramework.JOB_TABLE);
245     return count > 0;
246   }
247 
248   public boolean cleanJob() throws SQLException {
249     int count;
250     if (geomToJdbc instanceof OracleJdbcConverter) {
251       count =
252           new QueryRunner(geomToJdbc.isPmdKnownBroken())
253               .update(
254                   getConnection(),
255                   "delete from "
256                       + BrmoFramework.JOB_TABLE
257                       + " j where j.id in (select b.id from "
258                       + BrmoFramework.BERICHT_TABLE
259                       + " b "
260                       // vanwege rare fout in oracle bij ophalen tabel
261                       // metadata werkt dit niet
262                       // https://issues.apache.org/jira/browse/DBUTILS-125
263                       + " where b.id = j.id and status != 'STAGING_OK')");
264 
265     } else {
266       count =
267           new QueryRunner(geomToJdbc.isPmdKnownBroken())
268               .update(
269                   getConnection(),
270                   "delete from "
271                       + BrmoFramework.JOB_TABLE
272                       + " j where j.id in (select b.id from "
273                       + BrmoFramework.BERICHT_TABLE
274                       + " b "
275                       + " where b.id = j.id and status != ?)",
276                   Bericht.STATUS.STAGING_OK.toString());
277     }
278     return count > 0;
279   }
280 
281   public long setBerichtenJobByStatus(Bericht.STATUS status, boolean orderBerichten)
282       throws SQLException {
283     StringBuilder q =
284         new StringBuilder(
285             "insert into "
286                 + BrmoFramework.JOB_TABLE
287                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
288                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
289                 + BrmoFramework.BERICHT_TABLE
290                 + " where status = ? and datum <= ? ");
291     if (orderBerichten) {
292       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
293     } else {
294       q.append(" and volgordenummer < 0 ");
295       if (this.limitStandBerichtenToTransform > 0) {
296         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
297       }
298     }
299     log.debug("pre-transformatie SQL: " + q);
300     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
301         .update(
302             getConnection(),
303             q.toString(),
304             status.toString(),
305             new java.sql.Timestamp((new Date()).getTime()));
306   }
307 
308   public long setBerichtenJobForUpdate(String soort, boolean orderBerichten) throws SQLException {
309     StringBuilder q =
310         new StringBuilder(
311             "insert into "
312                 + BrmoFramework.JOB_TABLE
313                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
314                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
315                 + BrmoFramework.BERICHT_TABLE
316                 + " where status = ? and soort = ? and datum <= ? ");
317     if (orderBerichten) {
318       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
319     } else {
320       q.append(" and volgordenummer < 0 ");
321       if (this.limitStandBerichtenToTransform > 0) {
322         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
323       }
324     }
325     log.debug("pre-transformatie SQL: " + q);
326     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
327         .update(
328             getConnection(),
329             q.toString(),
330             Bericht.STATUS.RSGB_OK.toString(),
331             soort,
332             new java.sql.Timestamp((new Date()).getTime()));
333   }
334 
335   public long setBerichtenJobByIds(long[] ids, boolean orderBerichten) throws SQLException {
336     StringBuilder q =
337         new StringBuilder(
338             "insert into "
339                 + BrmoFramework.JOB_TABLE
340                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
341                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
342                 + BrmoFramework.BERICHT_TABLE
343                 + " where id in (");
344     for (int i = 0; i < ids.length; i++) {
345       if (i != 0) {
346         q.append(",");
347       }
348       q.append(ids[i]);
349     }
350     q.append(") and datum <= ? ");
351     if (orderBerichten) {
352       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
353     } else {
354       q.append(" and volgordenummer < 0 ");
355       if (this.limitStandBerichtenToTransform > 0) {
356         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
357       }
358     }
359     log.debug("pre-transformatie SQL: " + q);
360     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
361         .update(getConnection(), q.toString(), new java.sql.Timestamp((new Date()).getTime()));
362   }
363 
364   public long setBerichtenJobByLaadprocessen(long[] laadprocesIds, boolean orderBerichten)
365       throws SQLException {
366 
367     StringBuilder q =
368         new StringBuilder(
369             "insert into "
370                 + BrmoFramework.JOB_TABLE
371                 + " (id, datum, volgordenummer, object_ref, br_xml, soort) "
372                 + " select id, datum, volgordenummer, object_ref, br_xml, soort from "
373                 + BrmoFramework.BERICHT_TABLE
374                 + " where laadprocesid in (");
375     for (int i = 0; i < laadprocesIds.length; i++) {
376       if (i != 0) {
377         q.append(",");
378       }
379       q.append(laadprocesIds[i]);
380     }
381     q.append(") and status = ? and datum <= ? ");
382     if (orderBerichten) {
383       q.append(" order by ").append(BerichtenSorter.SQL_ORDER_BY);
384     } else {
385       q.append(" and volgordenummer < 0 ");
386       if (this.limitStandBerichtenToTransform > 0) {
387         q = geomToJdbc.buildLimitSql(q, limitStandBerichtenToTransform);
388       }
389     }
390     log.debug("pre-transformatie SQL: " + q);
391     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
392         .update(
393             getConnection(),
394             q.toString(),
395             Bericht.STATUS.STAGING_OK.toString(),
396             new java.sql.Timestamp((new Date()).getTime()));
397   }
398 
399   public void handleBerichtenByJob(
400       long total,
401       final BerichtenHandler handler,
402       final boolean enablePipeline,
403       int transformPipelineCapacity,
404       boolean orderBerichten)
405       throws Exception {
406     Split split = SimonManager.getStopwatch("b3p.rsgb.job").start();
407     final String dateTime = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
408     Split jobSplit = SimonManager.getStopwatch("b3p.rsgb.job." + dateTime).start();
409     ((RsgbProxy) handler).setSimonNamePrefix("b3p.rsgb.job." + dateTime + ".");
410     final RowProcessor processor = new StagingRowHandler();
411 
412     final ProcessDbXmlPipeline processDbXmlPipeline =
413         new ProcessDbXmlPipeline(
414             handler, transformPipelineCapacity, "b3p.rsgb.job." + dateTime + ".pipeline");
415     if (enablePipeline) {
416       processDbXmlPipeline.start();
417     }
418     long offset = 0L;
419     int batch = batchCapacity;
420     final MutableInt processed = new MutableInt(0);
421     final MutableInt lastJid = new MutableInt(0);
422     final boolean doOrderBerichten = orderBerichten;
423     boolean abort = false;
424     try {
425       do {
426         log.debug(
427             String.format(
428                 "Ophalen berichten batch Job tabel, offset %d, limit %d",
429                 lastJid.intValue(), batch));
430         String sql =
431             "select jid, id, datum, volgordenummer, object_ref, br_xml, soort from "
432                 + BrmoFramework.JOB_TABLE
433                 + " where jid > "
434                 + lastJid.intValue()
435                 + " order by jid ";
436         sql = geomToJdbc.buildPaginationSql(sql, 0, batch);
437         log.debug("SQL voor ophalen berichten batch: " + sql);
438 
439         processed.setValue(0);
440         final Split getBerichten =
441             SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".staging.berichten.getbatch")
442                 .start();
443         Exception e =
444             new QueryRunner(geomToJdbc.isPmdKnownBroken())
445                 .query(
446                     getConnection(),
447                     sql,
448                     rs -> {
449                       getBerichten.stop();
450                       final Split processResultSet =
451                           SimonManager.getStopwatch(
452                                   "b3p.rsgb.job."
453                                       + dateTime
454                                       + ".staging.berichten.processresultset")
455                               .start();
456 
457                       while (rs.next()) {
458                         try {
459                           Bericht bericht = processor.toBean(rs, Bericht.class);
460                           if (bericht.getVolgordeNummer() >= 0 && !doOrderBerichten) {
461                             // mutaties hebben volgnr >=0 dan sortering
462                             // verplicht
463                             throw new Exception(
464                                 String.format(
465                                     "Het sorteren van berichten staat uit, terwijl bericht (id: %d) geen standbericht is (volgnummer >=0)",
466                                     bericht.getId()));
467                           }
468                           final BerichtWorkUnit workUnit = new BerichtWorkUnit(bericht);
469 
470                           if (enablePipeline) {
471                             List<TableData> tableData = handler.transformToTableData(bericht);
472                             if (tableData == null) {
473                               // Exception during transform
474                               // updateProcessingResult() is
475                               // aangeroepen, geen
476                               // updateResultPipeline
477                               continue;
478                             }
479                             workUnit.setTableData(tableData);
480                             workUnit.setTypeOfWork(BerichtTypeOfWork.PROCESS_DBXML);
481                             Split pipelinePut =
482                                 SimonManager.getStopwatch(
483                                         "b3p.rsgb.job." + dateTime + ".pipeline.processdbxml.put")
484                                     .start();
485                             processDbXmlPipeline.getQueue().put(workUnit);
486                             pipelinePut.stop();
487                           } else {
488                             Split berichtSplit =
489                                 SimonManager.getStopwatch("b3p.rsgb.job." + dateTime + ".bericht")
490                                     .start();
491                             handler.handle(bericht, null, true);
492                             berichtSplit.stop();
493                           }
494                           lastJid.setValue(rs.getInt("jid"));
495                         } catch (Exception e1) {
496                           return e1;
497                         }
498                         processed.increment();
499                       }
500                       processResultSet.stop();
501                       return null;
502                     });
503 
504         offset += processed.intValue();
505 
506         // If handler threw exception processing row, rethrow it
507         if (e != null) {
508           throw e;
509         }
510 
511       } while (processed.intValue() > 0 && (offset < total));
512       if (offset < total) {
513         log.warn(String.format("Minder berichten verwerkt (%d) dan verwacht (%d)!", offset, total));
514       }
515       // als succesvol beeindigd, dan verwijderen, anders herstart mogelijk maken
516       removeJob();
517     } catch (Exception t) {
518       abort = true;
519       throw t;
520     } finally {
521       if (enablePipeline) {
522         if (abort) {
523           // Let threads stop by themselves, don't join
524           processDbXmlPipeline.setAbortFlag();
525         } else {
526           processDbXmlPipeline.stopWhenQueueEmpty();
527           try {
528             processDbXmlPipeline.join();
529           } catch (InterruptedException e) {
530           }
531         }
532       }
533     }
534     jobSplit.stop();
535     split.stop();
536   }
537 
538   public void updateBerichtenDbXml(List<Bericht> berichten, RsgbTransformer transformer)
539       throws SAXException, IOException, TransformerException {
540     for (Bericht ber : berichten) {
541       Split split = SimonManager.getStopwatch("b3p.staging.bericht.dbxml.transform").start();
542       String dbxml = transformer.transformToDbXml(ber);
543       // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
544       // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
545       if (geomToJdbc instanceof OracleJdbcConverter && dbxml != null && dbxml.length() == 8000) {
546         log.debug("DB XML is 8000 bytes, er wordt een spatie aangeplakt.");
547         dbxml += " ";
548         log.debug("DB XML is nu " + dbxml.length() + " bytes.");
549       }
550 
551       ber.setDbXml(dbxml);
552       split.stop();
553     }
554   }
555 
556   private PreparedStatement getOldBerichtStatement;
557   private PreparedStatement getOldBerichtStatementById;
558 
559   public Bericht getOldBericht(Bericht nieuwBericht, StringBuilder loadLog) throws SQLException {
560     return getOldBericht(nieuwBericht.getObjectRef(), loadLog);
561   }
562 
563   public Bericht getOldBericht(String objectRef, StringBuilder loadLog) throws SQLException {
564     Split split = SimonManager.getStopwatch("b3p.staging.bericht.getold").start();
565 
566     Bericht bericht = null;
567     ResultSetHandler<List<Bericht>> h =
568         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
569 
570     if (getOldBerichtStatement == null) {
571       String sql =
572           "SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
573               + BrmoFramework.BERICHT_TABLE
574               + " WHERE"
575               + " object_ref = ?"
576               + " AND status in ('RSGB_OK', 'ARCHIVE')"
577               + " ORDER BY datum desc, volgordenummer desc";
578       sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
579 
580       getOldBerichtStatement = getConnection().prepareStatement(sql);
581     } else {
582       getOldBerichtStatement.clearParameters();
583     }
584     getOldBerichtStatement.setString(1, objectRef);
585 
586     ResultSet rs = getOldBerichtStatement.executeQuery();
587     List<Bericht> list = h.handle(rs);
588     rs.close();
589 
590     if (!list.isEmpty()) {
591       loadLog.append("Vorig bericht gevonden:\n");
592       for (Bericht b : list) {
593         if ((Bericht.STATUS.RSGB_OK.equals(b.getStatus())
594                 || Bericht.STATUS.ARCHIVE.equals(b.getStatus()))
595             && bericht == null) {
596           loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
597           bericht = b;
598         } else {
599           loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
600         }
601       }
602     }
603 
604     if (bericht != null) {
605       // bericht nu wel vullen met alle kolommen
606       if (getOldBerichtStatementById == null) {
607         String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
608 
609         getOldBerichtStatementById = getConnection().prepareStatement(sql);
610       } else {
611         getOldBerichtStatementById.clearParameters();
612       }
613       getOldBerichtStatementById.setLong(1, bericht.getId());
614 
615       ResultSet rs2 = getOldBerichtStatementById.executeQuery();
616       List<Bericht> list2 = h.handle(rs2);
617       rs2.close();
618 
619       if (!list2.isEmpty()) {
620         bericht = list2.get(0);
621       }
622     }
623 
624     split.stop();
625     return bericht;
626   }
627 
628   private PreparedStatement getPreviousBerichtStatement;
629 
630   public Bericht getPreviousBericht(Bericht ber, StringBuilder loadLog) throws SQLException {
631     return getPreviousBericht(ber.getObjectRef(), ber.getDatum(), ber.getId(), loadLog);
632   }
633 
634   /** Gets the previous bericht (not the first). */
635   public Bericht getPreviousBericht(
636       String objectRef, Date datum, Long currentBerichtId, StringBuilder loadLog)
637       throws SQLException {
638     Split split = SimonManager.getStopwatch("b3p.staging.bericht.getprevious").start();
639 
640     Bericht bericht = null;
641     ResultSetHandler<List<Bericht>> h =
642         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
643 
644     if (getPreviousBerichtStatement == null) {
645       String sql =
646           "SELECT id, object_ref, datum, volgordenummer, soort, status, job_id, status_datum FROM "
647               + BrmoFramework.BERICHT_TABLE
648               + " WHERE"
649               + " object_ref = ? and datum <= ? and id <> ?"
650               + " ORDER BY datum asc, volgordenummer desc";
651       sql = geomToJdbc.buildPaginationSql(sql, 0, 1);
652 
653       getPreviousBerichtStatement = getConnection().prepareStatement(sql);
654     } else {
655       getPreviousBerichtStatement.clearParameters();
656     }
657     getPreviousBerichtStatement.setString(1, objectRef);
658     getPreviousBerichtStatement.setTimestamp(2, new java.sql.Timestamp(datum.getTime()));
659     getPreviousBerichtStatement.setLong(3, currentBerichtId);
660 
661     ResultSet rs = getPreviousBerichtStatement.executeQuery();
662     List<Bericht> list = h.handle(rs);
663     rs.close();
664 
665     if (!list.isEmpty()) {
666       loadLog.append("Vorig bericht gevonden:\n");
667       for (Bericht b : list) {
668         if (bericht == null) {
669           loadLog.append("Meest recent bericht gevonden: ").append(b).append("\n");
670           bericht = b;
671         } else {
672           loadLog.append("Niet geschikt bericht: ").append(b).append("\n");
673         }
674       }
675     }
676 
677     if (bericht != null) {
678       // bericht nu wel vullen met alle kolommen
679       if (getOldBerichtStatementById == null) {
680         String sql = "SELECT * FROM " + BrmoFramework.BERICHT_TABLE + " WHERE id = ?";
681 
682         getOldBerichtStatementById = getConnection().prepareStatement(sql);
683       } else {
684         getOldBerichtStatementById.clearParameters();
685       }
686       getOldBerichtStatementById.setLong(1, bericht.getId());
687 
688       ResultSet rs2 = getOldBerichtStatementById.executeQuery();
689       List<Bericht> list2 = h.handle(rs2);
690       rs2.close();
691 
692       if (!list2.isEmpty()) {
693         bericht = list2.get(0);
694       }
695     }
696 
697     split.stop();
698     return bericht;
699   }
700 
701   /**
702    * Update (overschrijft) een bericht in job tabel. (object_ref, datum, volgordenummer, soort,
703    * opmerking, status, status_datum, br_xml, br_orgineel_xml, db_xml, xsl_version)
704    *
705    * @param b bij te werken bericht
706    * @throws SQLException if any
707    */
708   public void updateBericht(Bericht b) throws SQLException {
709     Split split = SimonManager.getStopwatch("b3p.staging.bericht.update").start();
710     String brXML = b.getBrXml();
711     // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
712     // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
713     if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
714       log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
715       brXML += " ";
716       log.debug("BR XML is nu " + brXML.length() + " bytes.");
717     }
718 
719     new QueryRunner(geomToJdbc.isPmdKnownBroken())
720         .update(
721             getConnection(),
722             "UPDATE "
723                 + BrmoFramework.BERICHT_TABLE
724                 + " set "
725                 + "object_ref = ?, "
726                 + "datum = ?, "
727                 + "volgordenummer = ?, "
728                 + "soort = ?, "
729                 + "opmerking = ?, "
730                 + "status = ?, "
731                 + "status_datum = ?, "
732                 + "br_xml = ?, "
733                 + "br_orgineel_xml = ?, "
734                 + "db_xml = ?, "
735                 + "xsl_version = ? "
736                 + "WHERE id = ?",
737             b.getObjectRef(),
738             new Timestamp(b.getDatum().getTime()),
739             b.getVolgordeNummer(),
740             b.getSoort(),
741             b.getOpmerking(),
742             b.getStatus().toString(),
743             new Timestamp(b.getStatusDatum().getTime()),
744             brXML,
745             b.getBrOrgineelXml(),
746             b.getDbXml(),
747             b.getXslVersion(),
748             b.getId());
749     split.stop();
750   }
751 
752   /**
753    * Update een bericht in job tabel met processing status. (opmerking, status, status_datum,
754    * db_xml)
755    *
756    * @param b updatebericht
757    * @throws SQLException als de update mislukt
758    */
759   public void updateBerichtProcessing(Bericht b) throws SQLException {
760     Split split = SimonManager.getStopwatch("b3p.staging.bericht.updateprocessing").start();
761 
762     new QueryRunner(geomToJdbc.isPmdKnownBroken())
763         .update(
764             getConnection(),
765             "UPDATE "
766                 + BrmoFramework.BERICHT_TABLE
767                 + " set "
768                 + "opmerking = ?, "
769                 + "status = ?, "
770                 + "status_datum = ?, "
771                 + "db_xml = ? "
772                 + " WHERE id = ?",
773             b.getOpmerking(),
774             b.getStatus().toString(),
775             new Timestamp(b.getStatusDatum().getTime()),
776             b.getDbXml(),
777             b.getId());
778     split.stop();
779   }
780 
781   public void deleteByLaadProcesId(Long id) throws SQLException {
782     new QueryRunner(geomToJdbc.isPmdKnownBroken())
783         .update(
784             getConnection(),
785             "DELETE FROM " + BrmoFramework.BERICHT_TABLE + " WHERE laadprocesid = ?",
786             id);
787     new QueryRunner(geomToJdbc.isPmdKnownBroken())
788         .update(
789             getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL + " WHERE id = ?", id);
790   }
791 
792   public List<LaadProces> getLaadProcessen() throws SQLException {
793     List<LaadProces> list;
794     ResultSetHandler<List<LaadProces>> h =
795         new BeanListHandler<>(LaadProces.class, new StagingRowHandler());
796     list =
797         new QueryRunner(geomToJdbc.isPmdKnownBroken())
798             .query(getConnection(), "select * from " + BrmoFramework.LAADPROCES_TABEL, h);
799     return list;
800   }
801 
802   public List<Bericht> getBerichten() throws SQLException {
803     List<Bericht> berichten;
804     ResultSetHandler<List<Bericht>> h =
805         new BeanListHandler<>(Bericht.class, new StagingRowHandler());
806     berichten =
807         new QueryRunner(geomToJdbc.isPmdKnownBroken())
808             .query(getConnection(), "select * from " + BrmoFramework.BERICHT_TABLE, h);
809     return berichten;
810   }
811 
812   /**
813    * Laadt het bestand uit de stream in de database.
814    *
815    * @param stream input
816    * @param type type registratie, bijv. {@value BrmoFramework#BR_BRK}
817    * @param fileName naam van het bestand (ter identificatie)
818    * @param d bestandsdatum
819    * @param listener progress listener
820    * @throws Exception if any
821    * @deprecated gebruik de variant die een automatischProcesId als argument heeft
822    * @see #loadBr(InputStream, String, String, Date, ProgressUpdateListener, Long)
823    */
824   @Deprecated
825   public void loadBr(
826       InputStream stream, String type, String fileName, Date d, ProgressUpdateListener listener)
827       throws Exception {
828     this.loadBr(stream, type, fileName, d, listener, null);
829   }
830 
831   /**
832    * Laadt het bestand uit de stream in de database.
833    *
834    * @param stream input
835    * @param type type registratie, bijv. {@value BrmoFramework#BR_BRK}
836    * @param fileName naam van het bestand (ter identificatie)
837    * @param d bestandsdatum
838    * @param automatischProces id van het automatisch proces
839    * @param listener progress listener
840    * @throws Exception if any
841    */
842   public void loadBr(
843       InputStream stream,
844       String type,
845       String fileName,
846       Date d,
847       ProgressUpdateListener listener,
848       Long automatischProces)
849       throws Exception {
850 
851     CountingInputStream cis = new CountingInputStream(stream);
852 
853     BrmoXMLReader brmoXMLReader;
854     if (type.equals(BrmoFramework.BR_BRK)) {
855       brmoXMLReader = new BrkSnapshotXMLReader(cis);
856     } else if (type.equals(BrmoFramework.BR_BRK2)) {
857       brmoXMLReader = new Brk2SnapshotXMLReader(cis);
858     } else if (type.equals(BrmoFramework.BR_NHR)) {
859       brmoXMLReader = new NhrXMLReader(cis);
860     } else if (TopNLType.isTopNLType(type)) {
861       brmoXMLReader = new TopNLFileReader(fileName, type);
862     } else if (type.equals(BrmoFramework.BR_BRP)) {
863       brmoXMLReader = new BRPXMLReader(cis, d, this);
864     } else if (type.equals(BrmoFramework.BR_GBAV)) {
865       brmoXMLReader = new GbavXMLReader(cis);
866     } else if (type.equals(BrmoFramework.BR_WOZ)) {
867       brmoXMLReader = new WozXMLReader(cis, d, this);
868     } else {
869       throw new UnsupportedOperationException("Ongeldige basisregistratie: " + type);
870     }
871 
872     if (brmoXMLReader.getBestandsDatum() == null) {
873       throw new BrmoException("Header van bestand bevat geen datum, verkeerd formaat?");
874     }
875 
876     LaadProces lp = new LaadProces();
877     lp.setBestandNaam(fileName);
878     lp.setBestandDatum(brmoXMLReader.getBestandsDatum());
879     lp.setSoort(type);
880     lp.setGebied(brmoXMLReader.getGebied());
881     lp.setStatus(LaadProces.STATUS.STAGING_OK);
882     lp.setStatusDatum(new Date());
883     lp.setAutomatischProcesId(automatischProces);
884 
885     if (laadProcesExists(lp)) {
886       throw new BrmoDuplicaatLaadprocesException("Laadproces al gerund voor bestand " + fileName);
887     }
888     lp = writeLaadProces(lp);
889 
890     if (TopNLType.isTopNLType(type)) {
891       // van een TopNL GML bestand maken we alleen een LP, geen bericht,
892       // de datum halen we van het zip bestand
893       if (listener != null) {
894         listener.total(((TopNLFileReader) brmoXMLReader).getFileSize());
895         listener.progress(((TopNLFileReader) brmoXMLReader).getFileSize());
896       }
897     } else {
898       if (!brmoXMLReader.hasNext()) {
899         updateLaadProcesStatus(
900             lp,
901             LaadProces.STATUS.STAGING_OK,
902             "Leeg bestand, geen berichten gevonden in " + fileName);
903         throw new BrmoLeegBestandException("Leeg bestand, geen berichten gevonden in " + fileName);
904       }
905 
906       boolean isBerichtGeschreven = false;
907       int berichten = 0;
908       int foutBerichten = 0;
909       String lastErrorMessage = null;
910 
911       while (brmoXMLReader.hasNext()) {
912         Bericht b;
913         try {
914           b = brmoXMLReader.next();
915           b.setLaadProcesId(lp.getId());
916           b.setStatus(Bericht.STATUS.STAGING_OK);
917           b.setStatusDatum(new Date());
918           b.setSoort(type);
919 
920           if (StringUtils.isEmpty(b.getObjectRef())) {
921             // geen object_ref kunnen vaststellen; dan ook niet transformeren,
922             // bijvoorbeeld bij WOZ
923             b.setStatus(Bericht.STATUS.STAGING_NOK);
924             b.setOpmerking(
925                 "Er kon geen object_ref bepaald worden uit de natuurlijke sleutel van het bericht.");
926           }
927 
928           if (b.getDatum() == null) {
929             throw new BrmoException("Datum bericht is null");
930           }
931           log.debug(b);
932 
933           Bericht existingBericht = getExistingBericht(b);
934           if (type.equals(BrmoFramework.BR_BRK) && !isBerichtGeschreven) {
935             // haal alleen voor eerste
936             BrkBericht brkBericht = (BrkBericht) b;
937             lp.setBestandNaamHersteld(
938                 brkBericht.getRestoredFileName(lp.getBestandDatum(), b.getVolgordeNummer()));
939             updateLaadProcesBestandNaamHersteld(lp);
940           }
941 
942           // TODO BRK2 bepalen of dit nog nodig is voor BRK2
943           if (type.equals(BrmoFramework.BR_BRK2) && !isBerichtGeschreven) {
944             // haal alleen voor eerste
945             Brk2Bericht brk2Bericht = (Brk2Bericht) b;
946             lp.setBestandNaamHersteld(
947                 brk2Bericht.getRestoredFileName(lp.getBestandDatum(), b.getVolgordeNummer()));
948             updateLaadProcesBestandNaamHersteld(lp);
949           }
950 
951           if (existingBericht == null) {
952             writeBericht(b);
953             isBerichtGeschreven = true;
954           } else if (existingBericht.getStatus().equals(Bericht.STATUS.STAGING_OK)) {
955             // als bericht nog niet getransformeerd is, dan overschrijven.
956             b.setId(existingBericht.getId());
957             this.updateBericht(b);
958           }
959           if (listener != null) {
960             listener.progress(cis.getByteCount());
961           }
962           berichten++;
963         } catch (Exception e) {
964           lastErrorMessage =
965               String.format(
966                   "Laden bericht uit %s mislukt vanwege: %s", fileName, e.getLocalizedMessage());
967           log.error(lastErrorMessage);
968           log.trace(lastErrorMessage, e);
969           if (listener != null) {
970             listener.exception(e);
971           }
972           foutBerichten++;
973         }
974       }
975       if (listener != null) {
976         listener.total(berichten);
977       }
978       if (foutBerichten > 0) {
979         String opmerking =
980             "Laden van "
981                 + foutBerichten
982                 + " bericht(en) mislukt, laatste melding: "
983                 + lastErrorMessage
984                 + ", zie logs voor meer info.";
985         this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_NOK, opmerking);
986       } else if (!isBerichtGeschreven) {
987         String opmerking = "Dit bestand is waarschijnlijk al eerder geladen.";
988         this.updateLaadProcesStatus(lp, LaadProces.STATUS.STAGING_DUPLICAAT, opmerking);
989       }
990     }
991   }
992 
993   public void writeBericht(Bericht b) throws SQLException {
994     String brXML = b.getBrXml();
995     // HACK vanwege Oracle 8000 karakters bug, zie brmo wiki:
996     // https://github.com/B3Partners/brmo/wiki/Oracle-8000-bytes-bug
997     if (geomToJdbc instanceof OracleJdbcConverter && brXML != null && brXML.length() == 8000) {
998       log.debug("BR XML is 8000 bytes, er wordt een spatie aangeplakt.");
999       brXML += " ";
1000       log.debug("BR XML is nu " + brXML.length() + " bytes.");
1001     }
1002     Object berId =
1003         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1004             .insert(
1005                 getConnection(),
1006                 "INSERT INTO "
1007                     + BrmoFramework.BERICHT_TABLE
1008                     + " (laadprocesid, "
1009                     + "object_ref, "
1010                     + "datum, "
1011                     + "volgordenummer, "
1012                     + "soort, "
1013                     + "opmerking, "
1014                     + "status, "
1015                     + "status_datum, "
1016                     + "br_xml, "
1017                     + "br_orgineel_xml, "
1018                     + "db_xml, "
1019                     + "xsl_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
1020                 new ScalarHandler<>(),
1021                 b.getLaadProcesId(),
1022                 b.getObjectRef(),
1023                 new Timestamp(b.getDatum().getTime()),
1024                 b.getVolgordeNummer(),
1025                 b.getSoort(),
1026                 b.getOpmerking(),
1027                 b.getStatus().toString(),
1028                 new Timestamp(b.getStatusDatum().getTime()),
1029                 brXML,
1030                 b.getBrOrgineelXml(),
1031                 b.getDbXml(),
1032                 b.getXslVersion());
1033     log.trace("opgeslagen bericht heeft (row) id: " + berId);
1034   }
1035 
1036   /** Bepaal aan de hand van bestandsnaam en bestandsdatum van het laadproces of dit al bestaat. */
1037   private boolean laadProcesExists(LaadProces lp) throws SQLException {
1038     Object o =
1039         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1040             .query(
1041                 getConnection(),
1042                 "select 1 from laadproces where bestand_naam = ? and" + " bestand_datum = ?",
1043                 new ScalarHandler<>(),
1044                 lp.getBestandNaam(),
1045                 new Timestamp(lp.getBestandDatum().getTime()));
1046 
1047     return o != null;
1048   }
1049 
1050   private LaadProces writeLaadProces(LaadProces lp) throws SQLException {
1051     log.trace("opslaan van laadproces: " + lp);
1052     if (lp == null) {
1053       return null;
1054     }
1055 
1056     final String sql =
1057         "INSERT INTO "
1058             + BrmoFramework.LAADPROCES_TABEL
1059             + "(bestand_naam, "
1060             + "bestand_datum, soort, gebied, opmerking, status, "
1061             + "status_datum, contact_email, automatisch_proces "
1062             // nieuwe kolommen in 2.0.0
1063             + ", afgifteid, afgiftereferentie, artikelnummer, beschikbaar_tot, bestandsreferentie, "
1064             + "contractafgiftenummer, contractnummer, klantafgiftenummer, bestand_naam_hersteld"
1065             + ") VALUES (?,?,?,?,?,?,?,?,?"
1066             + ",?,?,?,?,?,?,?,?,?"
1067             + ")";
1068 
1069     // Oracle geeft een ROWID terug als "generated key" en niet de PK-kolom "ID" die we willen
1070     // hebben, daarom zelf doen
1071     // zie ook: https://issues.apache.org/jira/browse/DBUTILS-54
1072     QueryRunner queryRunner = new QueryRunner(geomToJdbc.isPmdKnownBroken());
1073     try (
1074     // PG: Caused by: org.postgresql.util.PSQLException: ERROR: column "ID" does not exist
1075     // maar lowercase werkt ook met oracle
1076     PreparedStatement stmt = getConnection().prepareStatement(sql, new String[] {"id"})
1077     // door een bug in de PG driver geen kolom index gebruiken, zie
1078     // https://github.com/pgjdbc/pgjdbc/issues/1661
1079     // PreparedStatement stmt = getConnection().prepareStatement(sql, new int[]{1});
1080     ) {
1081       queryRunner.fillStatement(
1082           stmt,
1083           lp.getBestandNaam(),
1084           new Timestamp(lp.getBestandDatum().getTime()),
1085           lp.getSoort(),
1086           lp.getGebied(),
1087           lp.getOpmerking(),
1088           lp.getStatus().toString(),
1089           new Timestamp(lp.getStatusDatum().getTime()),
1090           lp.getContactEmail(),
1091           lp.getAutomatischProcesId(),
1092           // nieuwe kolommen in 2.0.0
1093           lp.getAfgifteid(),
1094           lp.getAfgiftereferentie(),
1095           lp.getArtikelnummer(),
1096           (lp.getBeschikbaar_tot() == null)
1097               ? null
1098               : new Timestamp(lp.getBeschikbaar_tot().getTime()),
1099           lp.getBestandsreferentie(),
1100           lp.getContractafgiftenummer(),
1101           lp.getContractnummer(),
1102           lp.getKlantafgiftenummer(),
1103           lp.getBestandNaamHersteld());
1104       stmt.executeUpdate();
1105       ResultSetHandler<Number> rsh = new ScalarHandler<>();
1106       Number lpId = rsh.handle(stmt.getGeneratedKeys());
1107       log.trace("opgeslagen laadproces heeft id: " + lpId);
1108       return this.getLaadProcesById(lpId.longValue());
1109     }
1110   }
1111 
1112   public void updateLaadProcesStatus(LaadProces lp, LaadProces.STATUS status, String opmerking)
1113       throws SQLException {
1114     log.trace("update van laadproces status: " + lp);
1115     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1116         .update(
1117             getConnection(),
1118             "update "
1119                 + BrmoFramework.LAADPROCES_TABEL
1120                 + " set status = ?, opmerking = ?, status_datum = ? where id = ?",
1121             status.toString(),
1122             opmerking,
1123             new Timestamp(new Date().getTime()),
1124             lp.getId());
1125   }
1126 
1127   /**
1128    * update laadproces (GDS2 afgifte) metadata.
1129    *
1130    * @param lpId laadproces id
1131    * @param klantafgiftenummer klantafgiftenummer
1132    * @param contractafgiftenummer contractafgiftenummer
1133    * @param artikelnummer artikelnummer
1134    * @param contractnummer contractnummer
1135    * @param afgifteid afgifteid
1136    * @param afgiftereferentie afgiftereferentie
1137    * @param bestandsreferentie bestandsreferentie
1138    * @param beschikbaar_tot beschikbaar_tot
1139    * @throws SQLException if any
1140    */
1141   public void updateLaadProcesMeta(
1142       Long lpId,
1143       BigInteger klantafgiftenummer,
1144       BigInteger contractafgiftenummer,
1145       String artikelnummer,
1146       String contractnummer,
1147       String afgifteid,
1148       String afgiftereferentie,
1149       String bestandsreferentie,
1150       Date beschikbaar_tot)
1151       throws SQLException {
1152     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1153         .update(
1154             getConnection(),
1155             "update "
1156                 + BrmoFramework.LAADPROCES_TABEL
1157                 + " set "
1158                 + "afgifteid = ?,"
1159                 + "afgiftereferentie = ?,"
1160                 + "artikelnummer = ?,"
1161                 + "beschikbaar_tot = ?,"
1162                 + "bestandsreferentie = ?,"
1163                 + "contractafgiftenummer = ?,"
1164                 + "contractnummer = ?,"
1165                 + "klantafgiftenummer = ?"
1166                 + "where id = ?",
1167             afgifteid,
1168             afgiftereferentie,
1169             artikelnummer,
1170             new Timestamp(beschikbaar_tot.getTime()),
1171             bestandsreferentie,
1172             contractafgiftenummer,
1173             contractnummer,
1174             klantafgiftenummer,
1175             lpId);
1176   }
1177 
1178   public void updateLaadProcesBestandNaamHersteld(LaadProces lp) throws SQLException {
1179     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1180         .update(
1181             getConnection(),
1182             "update "
1183                 + BrmoFramework.LAADPROCES_TABEL
1184                 + " set bestand_naam_hersteld = ? where id = ?",
1185             lp.getBestandNaamHersteld(),
1186             lp.getId());
1187   }
1188 
1189   public void emptyStagingDb() throws SQLException {
1190     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1191         .update(getConnection(), "DELETE FROM " + BrmoFramework.BERICHT_TABLE);
1192     new QueryRunner(geomToJdbc.isPmdKnownBroken())
1193         .update(getConnection(), "DELETE FROM " + BrmoFramework.LAADPROCES_TABEL);
1194   }
1195 
1196   public List<Bericht> getBerichten(
1197       int page,
1198       int start,
1199       int limit,
1200       String sort,
1201       String dir,
1202       String filterSoort,
1203       String filterStatus)
1204       throws SQLException {
1205 
1206     List<String> params = new ArrayList<>();
1207     if (sort == null || sort.trim().isEmpty()) {
1208       sort = "id";
1209     }
1210     if (dir == null || dir.trim().isEmpty()) {
1211       sort = "asc";
1212     }
1213     String sql =
1214         "SELECT * FROM "
1215             + BrmoFramework.BERICHT_TABLE
1216             + buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
1217 
1218     sql = geomToJdbc.buildPaginationSql(sql, start, limit);
1219 
1220     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
1221         .query(
1222             getConnection(),
1223             sql,
1224             new BeanListHandler<>(Bericht.class, new StagingRowHandler()),
1225             params.toArray());
1226   }
1227 
1228   public long getCountBerichten(String filterSoort, String filterStatus) throws SQLException {
1229 
1230     List<String> params = new ArrayList<>();
1231     String filter = buildFilterSql(0, null, null, filterSoort, filterStatus, params);
1232 
1233     String sql;
1234     // gebruik estimate voor postgresql indien geen filter
1235     if (StringUtils.isBlank(filter) && geomToJdbc instanceof PostgisJdbcConverter) {
1236       sql =
1237           "SELECT reltuples::BIGINT AS estimate FROM pg_class WHERE relname= '"
1238               + BrmoFramework.BERICHT_TABLE
1239               + "'";
1240     } else {
1241       sql = "SELECT count(*) FROM " + BrmoFramework.BERICHT_TABLE + filter;
1242     }
1243 
1244     ResultSet rs = null;
1245     PreparedStatement pstmt = null;
1246     try {
1247       pstmt = getConnection().prepareStatement(sql);
1248       try {
1249         pstmt.setQueryTimeout(300); // seconds to wait
1250       } catch (Exception e) {
1251         log.warn("Driver does not support setQueryTimeout, please update driver.");
1252       }
1253       if (!params.isEmpty()) {
1254         for (int i = 0; i < params.size(); i++) {
1255           if (params.get(i) != null) {
1256             pstmt.setObject(i + 1, params.get(i));
1257           } else {
1258             pstmt.setNull(i + 1, Types.VARCHAR);
1259           }
1260         }
1261       }
1262       rs = pstmt.executeQuery();
1263       if (rs.next()) {
1264         Object o = rs.getObject(1);
1265         if (o instanceof BigDecimal) {
1266           return ((BigDecimal) o).longValue();
1267         } else if (o instanceof Integer) {
1268           return ((Integer) o).longValue();
1269         }
1270         return (Long) o;
1271       } else {
1272         return -1;
1273       }
1274     } catch (Exception e) {
1275       return -1;
1276     } finally {
1277       try {
1278         if (rs != null && !rs.isClosed()) {
1279           rs.close();
1280         }
1281       } catch (SQLException e) {
1282         // ignore
1283       }
1284       try {
1285         if (pstmt != null && !pstmt.isClosed()) {
1286           pstmt.close();
1287         }
1288       } catch (SQLException e) {
1289         // ignore
1290       }
1291     }
1292   }
1293 
1294   private String buildFilterSql(
1295       int page,
1296       String sort,
1297       String dir,
1298       String filterSoort,
1299       String filterStatus,
1300       List<String> params) {
1301 
1302     StringBuilder builder = new StringBuilder();
1303 
1304     List<String> conjunctions = new ArrayList<>();
1305 
1306     if (!StringUtils.isBlank(filterSoort)) {
1307       String[] soorten = filterSoort.split(",");
1308       params.addAll(Arrays.asList(soorten));
1309       conjunctions.add("soort in (" + StringUtils.repeat("?", ", ", soorten.length) + ")");
1310     }
1311 
1312     if (!StringUtils.isBlank(filterStatus)) {
1313       String[] statussen = filterStatus.split(",");
1314       params.addAll(Arrays.asList(statussen));
1315       conjunctions.add("status in (" + StringUtils.repeat("?", ", ", statussen.length) + ")");
1316     }
1317 
1318     // build where part
1319     if (!conjunctions.isEmpty()) {
1320       builder.append(" where ");
1321       builder.append(StringUtils.join(conjunctions.toArray(), " and "));
1322     }
1323 
1324     // build order by part
1325     if (sort != null && dir != null) {
1326       builder.append(" ORDER BY ");
1327       builder.append(sort);
1328       builder.append(" ");
1329       builder.append(dir);
1330     }
1331 
1332     return builder.toString();
1333   }
1334 
1335   public long getCountLaadProces(String filterSoort, String filterStatus) throws SQLException {
1336 
1337     List<String> params = new ArrayList<>();
1338     String sql =
1339         "SELECT count(*) FROM "
1340             + BrmoFramework.LAADPROCES_TABEL
1341             + buildFilterSql(0, null, null, filterSoort, filterStatus, params);
1342 
1343     Object o =
1344         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1345             .query(getConnection(), sql, new ScalarHandler<>(), params.toArray());
1346     if (o instanceof BigDecimal) {
1347       return ((BigDecimal) o).longValue();
1348     } else if (o instanceof Integer) {
1349       return ((Integer) o).longValue();
1350     }
1351     return (Long) o;
1352   }
1353 
1354   public List<LaadProces> getLaadprocessen(
1355       int page,
1356       int start,
1357       int limit,
1358       String sort,
1359       String dir,
1360       String filterSoort,
1361       String filterStatus)
1362       throws SQLException {
1363 
1364     List<String> params = new ArrayList<>();
1365     if (sort == null || sort.trim().isEmpty()) {
1366       sort = "id";
1367     }
1368     if (dir == null || dir.trim().isEmpty()) {
1369       sort = "asc";
1370     }
1371     String sql =
1372         "SELECT * FROM "
1373             + BrmoFramework.LAADPROCES_TABEL
1374             + buildFilterSql(page, sort, dir, filterSoort, filterStatus, params);
1375 
1376     sql = geomToJdbc.buildPaginationSql(sql, start, limit);
1377 
1378     return new QueryRunner(geomToJdbc.isPmdKnownBroken())
1379         .query(
1380             getConnection(),
1381             sql,
1382             new BeanListHandler<>(LaadProces.class, new StagingRowHandler()),
1383             params.toArray());
1384   }
1385 
1386   public Long[] getLaadProcessenIds(
1387       String sort, String dir, String filterSoort, String filterStatus) throws SQLException {
1388     List<String> params = new ArrayList<>();
1389     if (sort == null || sort.trim().isEmpty()) {
1390       sort = "id";
1391     }
1392     if (dir == null || dir.trim().isEmpty()) {
1393       dir = "asc";
1394     }
1395     String sql =
1396         "SELECT ID FROM "
1397             + BrmoFramework.LAADPROCES_TABEL
1398             + buildFilterSql(-1, sort, dir, filterSoort, filterStatus, params);
1399     List<Long> ids =
1400         new QueryRunner(geomToJdbc.isPmdKnownBroken())
1401             .query(getConnection(), sql, new LongColumnListHandler("id"), params.toArray());
1402     return ids.toArray(new Long[ids.size()]);
1403   }
1404 
1405   /**
1406    * @param batchCapacity the batchCapacity to set
1407    */
1408   public void setBatchCapacity(Integer batchCapacity) {
1409     this.batchCapacity = batchCapacity;
1410   }
1411 
1412   public void setLimitStandBerichtenToTransform(Integer limitStandBerichtenToTransform) {
1413     this.limitStandBerichtenToTransform = limitStandBerichtenToTransform;
1414   }
1415 }