AdvancedFunctionsActionBean.java

package nl.b3p.brmo.service.stripes;

import static org.apache.commons.dbutils.DbUtils.closeQuietly;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import net.sourceforge.stripes.action.ActionBean;
import net.sourceforge.stripes.action.ActionBeanContext;
import net.sourceforge.stripes.action.Before;
import net.sourceforge.stripes.action.DefaultHandler;
import net.sourceforge.stripes.action.ForwardResolution;
import net.sourceforge.stripes.action.Resolution;
import net.sourceforge.stripes.action.SimpleMessage;
import net.sourceforge.stripes.action.StrictBinding;
import net.sourceforge.stripes.controller.LifecycleStage;
import net.sourceforge.stripes.validation.Validate;
import nl.b3p.brmo.loader.BrmoFramework;
import nl.b3p.brmo.loader.ProgressUpdateListener;
import nl.b3p.brmo.loader.RsgbProxy;
import nl.b3p.brmo.loader.StagingProxy;
import nl.b3p.brmo.loader.advancedfunctions.AdvancedFunctionProcess;
import nl.b3p.brmo.loader.entity.Bericht;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.loader.util.StagingRowHandler;
import nl.b3p.brmo.loader.xml.WozXMLReader;
import nl.b3p.brmo.service.util.ConfigUtil;
import nl.b3p.jdbc.util.converter.GeometryJdbcConverter;
import nl.b3p.jdbc.util.converter.GeometryJdbcConverterFactory;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.RowProcessor;
import org.apache.commons.dbutils.handlers.ScalarHandler;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.stripesstuff.plugin.waitpage.WaitPage;

/**
 * @author Chris van Lith
 * @author mprins
 */
@StrictBinding
public class AdvancedFunctionsActionBean implements ActionBean, ProgressUpdateListener {

  private static final Log LOG = LogFactory.getLog(AdvancedFunctionsActionBean.class);

  private static final String JSP = "/WEB-INF/jsp/transform/advancedfunctions.jsp";
  private static final String JSP_PROGRESS = "/WEB-INF/jsp/transform/advancedfunctionsprogress.jsp";

  private ActionBeanContext context;

  private List<AdvancedFunctionProcess> advancedFunctionProcesses;

  @Validate(required = true, on = "perform")
  private String advancedFunctionProcessName;

  private double progress;
  private long total;
  private long processed;
  private boolean complete;
  private Date start;
  private Date update;
  private String exceptionStacktrace;

  private final String NHR_FIX_TYPERING = "Fix 'typering' en 'clazz' van nHR persoon";
  private final String NHR_ARCHIVING =
      "Opschonen en archiveren van nHR berichten met status RSGB_OK, ouder dan 3 maanden";
  private final String NHR_REMOVAL = "Verwijderen van nHR berichten met status ARCHIVE";
  private final String NHR_OPNIEUW_VERWERKEN = "Opnieuw verwerken van nHR berichten";
  private final String WOZ_OPNIEUW_VERWERKING = "Originele WOZ berichten opnieuw verwerken";

  // <editor-fold defaultstate="collapsed" desc="getters en setters">
  @Override
  public ActionBeanContext getContext() {
    return context;
  }

  @Override
  public void setContext(ActionBeanContext context) {
    this.context = context;
  }

  public double getProgress() {
    return progress;
  }

  public void setProgress(double progress) {
    this.progress = progress;
  }

  public long getTotal() {
    return total;
  }

  public void setTotal(long total) {
    this.total = total;
  }

  public long getProcessed() {
    return processed;
  }

  public void setProcessed(long processed) {
    this.processed = processed;
  }

  public boolean isComplete() {
    return complete;
  }

  public void setComplete(boolean complete) {
    this.complete = complete;
  }

  public Date getStart() {
    return start;
  }

  public void setStart(Date start) {
    this.start = start;
  }

  public Date getUpdate() {
    return update;
  }

  public void setUpdate(Date update) {
    this.update = update;
  }

  public String getExceptionStacktrace() {
    return exceptionStacktrace;
  }

  public void setExceptionStacktrace(String exceptionStacktrace) {
    this.exceptionStacktrace = exceptionStacktrace;
  }

  @Override
  public void total(long total) {
    this.total = total;
  }

  @Override
  public void progress(long progress) {
    this.processed = progress;
    if (this.total != 0) {
      this.progress = (100.0 / this.total) * this.processed;
    }
    this.update = new Date();
  }

  @Override
  public void exception(Throwable t) {
    StringWriter sw = new StringWriter();
    t.printStackTrace(new PrintWriter(sw));
    this.exceptionStacktrace = sw.toString();
  }

  public List<AdvancedFunctionProcess> getAdvancedFunctionProcesses() {
    return advancedFunctionProcesses;
  }

  public void setAdvancedFunctionProcesses(
      List<AdvancedFunctionProcess> advancedFunctionProcesses) {
    this.advancedFunctionProcesses = advancedFunctionProcesses;
  }

  public String getAdvancedFunctionProcessName() {
    return advancedFunctionProcessName;
  }

  public void setAdvancedFunctionProcessName(String advancedFunctionProcessName) {
    this.advancedFunctionProcessName = advancedFunctionProcessName;
  }

  // </editor-fold>

  @Before(stages = LifecycleStage.BindingAndValidation)
  public void populateAdvancedFunctionProcesses() {
    // bij een nieuw proces ook de wiki bijwerken:
    // https://github.com/B3Partners/brmo/wiki/Geavanceerde-functies
    advancedFunctionProcesses =
        Arrays.asList(
            new AdvancedFunctionProcess(
                NHR_ARCHIVING, BrmoFramework.BR_NHR, Bericht.STATUS.RSGB_OK.toString()),
            new AdvancedFunctionProcess(
                NHR_REMOVAL, BrmoFramework.BR_NHR, Bericht.STATUS.ARCHIVE.toString()),
            new AdvancedFunctionProcess(NHR_FIX_TYPERING, BrmoFramework.BR_NHR, null),
            new AdvancedFunctionProcess(NHR_OPNIEUW_VERWERKEN, BrmoFramework.BR_NHR, null),
            new AdvancedFunctionProcess(WOZ_OPNIEUW_VERWERKING, BrmoFramework.BR_WOZ, null));
  }

  @DefaultHandler
  public Resolution form() {
    return new ForwardResolution(complete ? JSP_PROGRESS : JSP);
  }

  @WaitPage(path = JSP_PROGRESS, delay = 1000, refresh = 1000)
  public Resolution perform() {
    AdvancedFunctionProcess process = null;

    for (AdvancedFunctionProcess p : advancedFunctionProcesses) {
      if (p.getName().equals(advancedFunctionProcessName)) {
        process = p;
        break;
      }
    }
    if (process == null) {
      getContext().getMessages().add(new SimpleMessage("Ongeldig proces"));
      return new ForwardResolution(JSP);
    }

    start = new Date();
    LOG.info("Start process: " + process.getName());
    processed = 0;

    // Get berichten
    try {
      switch (process.getName()) {
        case NHR_ARCHIVING:
          cleanupBerichten(process.getConfig(), BrmoFramework.BR_NHR);
          break;
        case NHR_REMOVAL:
          deleteBerichten(process.getConfig(), BrmoFramework.BR_NHR);
          break;
        case NHR_OPNIEUW_VERWERKEN:
          replayNHRVerwerking(process.getSoort(), process.getConfig());
          break;
        case NHR_FIX_TYPERING:
          fixNHRTypering(process.getSoort(), process.getConfig());
          break;
        case WOZ_OPNIEUW_VERWERKING:
          replayWOZVerwerking();
          break;
      }

      if (this.exceptionStacktrace == null) {
        getContext().getMessages().add(new SimpleMessage("Geavanceerde functie afgerond."));
      }
    } catch (Throwable t) {
      LOG.error("Fout bij uitvoeren geavanceerde functie", t);
      String m = "Fout bij uitvoeren geavanceerde functie: " + ExceptionUtils.getMessage(t);
      if (t.getCause() != null) {
        m += ", oorzaak: " + ExceptionUtils.getRootCauseMessage(t);
      }
      getContext().getMessages().add(new SimpleMessage(m));
    } finally {
      complete = true;
    }
    return new ForwardResolution(JSP_PROGRESS);
  }

  private void replayWOZVerwerking() throws Exception {
    final DataSource dataSourceStaging = ConfigUtil.getDataSourceStaging();
    StagingProxy stagingProxy = new StagingProxy(dataSourceStaging);

    try (Connection conn = dataSourceStaging.getConnection()) {
      final GeometryJdbcConverter geomToJdbc =
          GeometryJdbcConverterFactory.getGeometryJdbcConverter(conn);

      Number o =
          new QueryRunner(geomToJdbc.isPmdKnownBroken())
              .query(conn, "SELECT count(*) FROM eerder_geladen_woz", new ScalarHandler<>());
      int count = o.intValue();

      this.total(count);
      LOG.info("Aantal te verwerken WOZ berichten: " + count);

      int offset = 0;
      int batch = 10000;
      final String selectSql =
          "SELECT id, br_orgineel_xml, laadprocesid, datum FROM eerder_geladen_woz";
      Bericht b;
      while (offset < count) {
        LOG.info(
            "Ophalen WOZ berichten vanaf offset: "
                + offset
                + " tot: "
                + (offset + batch)
                + " van: "
                + count);
        PreparedStatement ps =
            conn.prepareStatement(geomToJdbc.buildPaginationSql(selectSql, offset, batch));
        ResultSet rs = ps.executeQuery();
        while (rs.next()) {
          LOG.trace(
              "Verwerken WOZ bericht voor laadprocesid: "
                  + rs.getLong("laadprocesid")
                  + " met id: "
                  + rs.getLong("id"));
          InputStream origineelXMLInputStream =
              new ByteArrayInputStream(
                  rs.getString("br_orgineel_xml").getBytes(StandardCharsets.UTF_8));
          WozXMLReader reader =
              new WozXMLReader(origineelXMLInputStream, /*rs.getDate("datum")*/ null, stagingProxy);

          while (reader.hasNext()) {
            b = reader.next();
            b.setLaadProcesId(rs.getLong("laadprocesid"));
            b.setStatus(Bericht.STATUS.STAGING_OK);
            b.setStatusDatum(new Date());
            b.setSoort(BrmoFramework.BR_WOZ);
            b.setOpmerking("Herstel van eerder geladen WOZ bericht");
            if (null == b.getObjectRef()) {
              b.setStatus(Bericht.STATUS.STAGING_NOK);
              b.setOpmerking(Bericht.GEEN_OBJECT_REF_MSG);
            }

            Bericht existingBericht;
            if (b.getVolgordeNummer() == 1) {
              existingBericht = stagingProxy.getBerichtById(rs.getLong("id"));
            } else {
              existingBericht = stagingProxy.getExistingBericht(b);
            }

            if (existingBericht == null) {
              stagingProxy.writeBericht(b);
            } else {
              b.setId(existingBericht.getId());
              stagingProxy.updateBericht(b);
            }
          }
          processed++;
        }
        closeQuietly(rs);
        closeQuietly(ps);
        offset += batch;
        progress(processed);
      }
    } finally {
      stagingProxy.closeStagingProxy();
      LOG.info("Originele WOZ berichten opnieuw verwerken afgerond.");
    }
  }

  public void cleanupBerichten(String config, String soort) throws Exception {
    final int offset = 0;
    int progress = 0;
    int batch = 1000;
    final MutableInt processed = new MutableInt(0);
    final DataSource dataSourceStaging = ConfigUtil.getDataSourceStaging();
    final Connection conn = dataSourceStaging.getConnection();
    final GeometryJdbcConverter geomToJdbc =
        GeometryJdbcConverterFactory.getGeometryJdbcConverter(conn);
    final RowProcessor processor = new StagingRowHandler();

    Calendar c = Calendar.getInstance();
    c.setTime(new Date());
    c.add(Calendar.MONTH, -3);

    String countsql =
        "select count(*) from "
            + BrmoFramework.BERICHT_TABLE
            + " where soort='"
            + soort
            + "' "
            + " and status='"
            + config
            + "'"
            + " and status_datum < ? ";
    Number o =
        new QueryRunner(geomToJdbc.isPmdKnownBroken())
            .query(conn, countsql, new ScalarHandler<>(), new Timestamp(c.getTimeInMillis()));
    if (o instanceof BigDecimal) {
      total(o.longValue());
    } else if (o instanceof Integer) {
      total(o.longValue());
    } else {
      total((Long) o);
    }

    do {
      LOG.debug(
          String.format(
              "Ophalen berichten batch met offset %d, limit %d, tot datum %tc, voortgang %d",
              offset, batch, c, progress));
      String sql =
          "select * from "
              + BrmoFramework.BERICHT_TABLE
              + " where soort='"
              + soort
              + "' "
              + " and status='"
              + config
              + "'"
              + " and status_datum < ? "
              + " order by id ";
      sql = geomToJdbc.buildPaginationSql(sql, offset, batch);
      LOG.debug("SQL voor ophalen berichten batch: " + sql);

      processed.setValue(0);
      Exception e =
          new QueryRunner(geomToJdbc.isPmdKnownBroken())
              .query(
                  conn,
                  sql,
                  rs -> {
                    while (rs.next()) {
                      try {
                        Bericht bericht = processor.toBean(rs, Bericht.class);

                        bericht.setBrOrgineelXml("opgeschoond");
                        bericht.setBrXml("opgeschoond");
                        bericht.setOpmerking(
                            "opgeschoond, status was: " + bericht.getStatus().toString());
                        bericht.setStatus(Bericht.STATUS.ARCHIVE);
                        bericht.setStatusDatum(new Date());

                        new QueryRunner(geomToJdbc.isPmdKnownBroken())
                            .update(
                                conn,
                                "update "
                                    + BrmoFramework.BERICHT_TABLE
                                    + " set status_datum = ?, status = ?, opmerking = ?, br_xml = ?, br_orgineel_xml = ? where id = ?",
                                new Timestamp(bericht.getStatusDatum().getTime()),
                                bericht.getStatus().toString(),
                                bericht.getOpmerking(),
                                bericht.getBrXml(),
                                bericht.getBrOrgineelXml(),
                                bericht.getId());

                      } catch (Exception e1) {
                        return e1;
                      }
                      processed.increment();
                    }
                    return null;
                  },
                  new Timestamp(c.getTimeInMillis()));
      progress += processed.intValue();

      progress(progress);

      // If handler threw exception processing row, rethrow it
      if (e != null) {
        closeQuietly(conn);
        throw e;
      }
    } while (processed.intValue() > 0);
    closeQuietly(conn);
  }

  public void deleteBerichten(String config, String soort) throws Exception {
    final DataSource dataSourceStaging = ConfigUtil.getDataSourceStaging();
    final Connection conn = dataSourceStaging.getConnection();
    final GeometryJdbcConverter geomToJdbc =
        GeometryJdbcConverterFactory.getGeometryJdbcConverter(conn);

    String countsql =
        "select count(*) from "
            + BrmoFramework.BERICHT_TABLE
            + " WHERE soort='"
            + soort
            + "' "
            + " AND status='"
            + config
            + "'";
    Number o =
        new QueryRunner(geomToJdbc.isPmdKnownBroken()).query(conn, countsql, new ScalarHandler<>());
    if (o instanceof BigDecimal) {
      total(o.longValue());
    } else if (o instanceof Integer) {
      total(o.longValue());
    } else {
      total((Long) o);
    }
    LOG.debug("Totaal te verwijderen " + config + " berichten: " + o);

    o =
        new QueryRunner(geomToJdbc.isPmdKnownBroken())
            .update(
                conn,
                "DELETE FROM "
                    + BrmoFramework.BERICHT_TABLE
                    + " WHERE soort='"
                    + soort
                    + "' "
                    + " AND status='"
                    + config
                    + "'");

    if (o instanceof BigDecimal) {
      progress(o.longValue());
    } else if (o instanceof Integer) {
      progress(o.longValue());
    } else {
      progress((Long) o);
    }
    closeQuietly(conn);
  }

  /**
   * Deze actie verwerkt alle NHR berichten met status RSGB_NOK opnieuw.
   *
   * @param status bericht status
   * @param soort soort bericht
   * @throws SQLException if any
   * @throws BrmoException if any
   * @throws Exception if any
   */
  public void replayNHRVerwerking(String soort, String status)
      throws SQLException, BrmoException, Exception {
    int offset = 0;
    int batch = 1000;
    final MutableInt processed = new MutableInt(0);
    final DataSource dataSourceStaging = ConfigUtil.getDataSourceStaging();
    final DataSource dataSourceRsgb = ConfigUtil.getDataSourceRsgb();
    final Connection conn = dataSourceStaging.getConnection();
    final GeometryJdbcConverter geomToJdbc =
        GeometryJdbcConverterFactory.getGeometryJdbcConverter(conn);
    final RowProcessor processor = new StagingRowHandler();

    LOG.debug("staging datasource: " + dataSourceStaging);
    LOG.debug("rsgb datasource: " + dataSourceRsgb);

    String countsql =
        "select count(id) from "
            + BrmoFramework.BERICHT_TABLE
            + " where soort='"
            + soort
            + "'"
            + " and status='"
            + status
            + "'";
    LOG.debug("SQL voor tellen van berichten batch: " + countsql);
    Number o =
        new QueryRunner(geomToJdbc.isPmdKnownBroken()).query(conn, countsql, new ScalarHandler<>());
    LOG.debug("Totaal te verwerken verwijder berichten: " + o);

    if (o instanceof BigDecimal) {
      total(o.longValue());
    } else if (o instanceof Integer) {
      total(o.longValue());
    } else {
      total((Long) o);
    }

    StagingProxy staging = new StagingProxy(dataSourceStaging);
    RsgbProxy rsgb = new RsgbProxy(dataSourceRsgb, null, staging, Bericht.STATUS.RSGB_NOK, this);
    rsgb.setErrorState(getContext().getServletContext().getInitParameter("error.state"));
    rsgb.setOrderBerichten(true);
    rsgb.init();

    do {
      LOG.debug(String.format("Ophalen berichten batch met offset %d, limit %d", offset, batch));
      String sql =
          "select * from "
              + BrmoFramework.BERICHT_TABLE
              + " where soort='"
              + soort
              + "'"
              + " and status='"
              + status
              + "'"
              + " order by id";
      sql = geomToJdbc.buildPaginationSql(sql, offset, batch);
      LOG.debug("SQL voor ophalen berichten batch: " + sql);

      processed.setValue(0);
      Exception e =
          new QueryRunner(geomToJdbc.isPmdKnownBroken())
              .query(
                  conn,
                  sql,
                  rs -> {
                    while (rs.next()) {
                      try {
                        Bericht bericht = processor.toBean(rs, Bericht.class);
                        LOG.debug("Opnieuw verwerken van bericht: " + bericht);
                        // bewaar oude log
                        String oudeOpmerkingen = bericht.getOpmerking();
                        // forceer verwerking door bericht op STAGING_OK te
                        // zetten en dan opnieuw te verwerken
                        bericht.setStatus(Bericht.STATUS.STAGING_OK);
                        new QueryRunner(geomToJdbc.isPmdKnownBroken())
                            .update(
                                conn,
                                "update "
                                    + BrmoFramework.BERICHT_TABLE
                                    + " set status_datum = ?, status = ? where id = ?",
                                new Timestamp(bericht.getStatusDatum().getTime()),
                                bericht.getStatus().toString(),
                                bericht.getId());

                        rsgb.handle(bericht, rsgb.transformToTableData(bericht), true);

                        bericht.setOpmerking(
                            "Opnieuw verwerkt met geavanceerde functies optie.\nNieuwe verwerkingslog (oude log daaronder)\n"
                                + bericht.getOpmerking()
                                + "\n\nOude verwerkingslog\n\n"
                                + oudeOpmerkingen);
                        bericht.setStatusDatum(new Date());
                        new QueryRunner(geomToJdbc.isPmdKnownBroken())
                            .update(
                                conn,
                                "update "
                                    + BrmoFramework.BERICHT_TABLE
                                    + " set opmerking = ? where id = ?",
                                bericht.getOpmerking(),
                                bericht.getId());
                      } catch (Exception e1) {
                        return e1;
                      }
                      processed.increment();
                    }
                    return null;
                  });
      offset += processed.intValue();

      progress(offset);

      // If handler threw exception processing row, rethrow it
      if (e != null) {
        closeQuietly(conn);
        throw e;
      }
    } while (processed.intValue() > 0);
    closeQuietly(conn);
    rsgb.close();
  }

  /**
   * Verwijderen van enkele aanhalingstekens van typering en clazz van sommige nHR persoon records
   * door middel van SQL update. Fix voor issue #527, {@code 'INGESCHREVEN NIET-NATUURLIJK PERSOON'}
   * moet worden {@code INGESCHREVEN NIET-NATUURLIJK PERSOON} (evt. afgekort op 35 char voor de
   * 'ingeschr_niet_nat_prs' tabel/'typering' kolom)
   *
   * @param soort soort bericht
   * @param status bericht status
   * @throws SQLException if any
   * @throws BrmoException if any
   * @throws Exception if any
   */
  public void fixNHRTypering(String soort, String status)
      throws SQLException, BrmoException, Exception {

    final MutableInt _processed = new MutableInt(0);
    final DataSource dataSourceRsgb = ConfigUtil.getDataSourceRsgb();
    final Connection conn = dataSourceRsgb.getConnection();
    final GeometryJdbcConverter geomToJdbc =
        GeometryJdbcConverterFactory.getGeometryJdbcConverter(conn);

    final String was = "'INGESCHREVEN NIET-NATUURLIJK PERSOON'";
    final String wordt = was.replace("'", "");
    // typering kolom is smaller dan clazz kolom
    final int typeringColWidth = 35;

    // betroffen tabel + kolom
    final Map<String, String> tables =
        new HashMap<>() {
          {
            put("subject", "clazz");
            put("prs", "clazz");
            put("niet_nat_prs", "clazz");
            put("ingeschr_niet_nat_prs", "typering");
          }
        };

    for (Map.Entry<String, String> table : tables.entrySet()) {
      int offset = 0;
      int batch = 1000;

      final String _was =
          (table.getValue().equalsIgnoreCase("typering")
              ? "'" + was.substring(0, typeringColWidth)
              : "'" + was + "'");
      final String _wordt =
          (table.getValue().equalsIgnoreCase("typering")
              ? wordt.substring(0, typeringColWidth)
              : wordt);

      String countsql =
          "select count(*) from "
              + table.getKey()
              + " where "
              + table.getValue()
              + " = '"
              + _was
              + "'";
      LOG.debug("SQL voor tellen van berichten batch: " + countsql);

      Number o =
          new QueryRunner(geomToJdbc.isPmdKnownBroken())
              .query(conn, countsql, new ScalarHandler<>());
      LOG.info("Totaal te bewerken records in tabel/kolom: " + table + " is: " + o);

      if (o instanceof BigDecimal) {
        total(this.total + o.longValue());
      } else if (o instanceof Integer) {
        total(this.total + o.longValue());
      } else {
        total(this.total + (Long) o);
      }

      do {
        LOG.debug(String.format("Update berichten batch met offset %d, limit %d", offset, batch));
        String sql =
            "select * from " + table.getKey() + " where " + table.getValue() + " = '" + _was + "'";
        sql = geomToJdbc.buildPaginationSql(sql, offset, batch);
        LOG.trace("SQL voor ophalen berichten batch: " + sql);
        _processed.setValue(0);

        Exception e =
            new QueryRunner(geomToJdbc.isPmdKnownBroken())
                .query(
                    conn,
                    sql,
                    rs -> {
                      while (rs.next()) {
                        try {
                          new QueryRunner(geomToJdbc.isPmdKnownBroken())
                              .update(
                                  conn,
                                  "update "
                                      + table.getKey()
                                      + " set "
                                      + table.getValue()
                                      + " = '"
                                      + _wordt
                                      + "' where "
                                      + table.getValue()
                                      + " = '"
                                      + _was
                                      + "'");
                        } catch (Exception e1) {
                          return e1;
                        }
                        _processed.increment();
                      }
                      return null;
                    });
        offset += _processed.intValue();

        progress(this.processed + _processed.intValue());

        if (e != null) {
          closeQuietly(conn);
          throw e;
        }
      } while (_processed.intValue() > 0);
    }
    closeQuietly(conn);
  }
}