BerichtDoorsturenProces.java

package nl.b3p.brmo.service.scanner;

import java.util.Date;
import java.util.List;
import javax.persistence.Transient;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.persistence.staging.AutomatischProces;
import nl.b3p.brmo.persistence.staging.Bericht;
import nl.b3p.brmo.persistence.staging.BerichtDoorstuurProces;
import nl.b3p.brmo.persistence.staging.ClobElement;
import nl.b3p.brmo.persistence.staging.GDS2OphaalProces;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.stripesstuff.stripersist.Stripersist;

/**
 * @author Matthijs Laan
 */
public class BerichtDoorsturenProces extends AbstractExecutableProces {

  private static final Log log = LogFactory.getLog(BerichtDoorsturenProces.class);

  private final BerichtDoorstuurProces config;

  private final int defaultCommitPageSize = 1000;

  @Transient private ProgressUpdateListener l;

  BerichtDoorsturenProces(BerichtDoorstuurProces config) {
    this.config = config;
  }

  @Override
  public void execute() throws BrmoException {
    this.execute(
        new ProgressUpdateListener() {
          /* doet bijna niks listener */
          @Override
          public void total(long total) {
            log.info("Totaal aantal opgehaalde berichten: " + total);
          }

          @Override
          public void progress(long progress) {}

          @Override
          public void exception(Throwable t) {
            log.error(t);
          }

          @Override
          public void updateStatus(String status) {}

          @Override
          public void addLog(String msg) {
            log.info(msg);
          }
        });
  }

  @Override
  public void execute(ProgressUpdateListener listener) {
    this.l = listener;
    try {
      l.updateStatus("Laden berichten...");
      this.config.setStatus(AutomatischProces.ProcessingStatus.PROCESSING);

      int commitPageSize = this.getCommitPageSize();

      String id = ClobElement.nullSafeGet(config.getConfig().get("gds2_ophaalproces_id"));
      GDS2OphaalProces proces =
          Stripersist.getEntityManager().find(GDS2OphaalProces.class, Long.parseLong(id));

      List<Long> berichtIDs =
          Stripersist.getEntityManager()
              .createQuery(
                  "select b.id from Bericht b join b.laadprocesid l where b.status in ('STAGING_OK', 'STAGING_NOK') and l.automatischProces = :proces",
                  Long.class)
              .setParameter("proces", proces)
              .getResultList();

      if (berichtIDs.isEmpty()) {
        this.config.setSamenvatting("Geen berichten om door te sturen");
      } else {
        this.l.total(berichtIDs.size());
        log.info(String.format("Er zijn %s berichten om door te sturen.", berichtIDs.size()));

        String url = ClobElement.nullSafeGet(proces.getConfig().get("delivery_endpoint"));

        if (url == null) {
          this.config.setSamenvatting("GDS2 ophaal proces heeft geen afleveringsendpoint");
          log.warn("GDS2 ophaal proces heeft geen afleveringsendpoint.");
        } else {
          int doorgestuurd = 0, fouten = 0, verwerkt = 0;
          for (Long pkid : berichtIDs) {
            Bericht b = Stripersist.getEntityManager().find(Bericht.class, pkid);
            if (GDS2OphalenProces.doorsturenBericht(proces, l, b, url)) {
              doorgestuurd++;
              this.l.progress(doorgestuurd);
            } else {
              fouten++;
            }
            Stripersist.getEntityManager().merge(b);
            verwerkt++;
            if (verwerkt % commitPageSize == 0) {
              log.info(
                  String.format(
                      "Tussentijds opslaan van berichten, 'commitPageSize' (%s) is bereikt, totaal aantal verwerkt : %s",
                      commitPageSize, verwerkt));
              Stripersist.getEntityManager().flush();
              Stripersist.getEntityManager().getTransaction().commit();
              Stripersist.getEntityManager().clear();
            }
          }
          this.config.setSamenvatting(
              "Berichten doorgestuurd: " + doorgestuurd + ", fouten: " + fouten);
        }
      }

      l.updateStatus(this.config.getSamenvatting());

      this.config.setStatus(AutomatischProces.ProcessingStatus.WAITING);
      this.config.setLastrun(new Date());
    } catch (Exception e) {
      this.config.setLastrun(new Date());
      this.config.setSamenvatting("Er is een fout opgetreden, details staan in de logs");
      this.config.setStatus(AutomatischProces.ProcessingStatus.ERROR);
      l.exception(e);
    } finally {
      if (Stripersist.getEntityManager().getTransaction().getRollbackOnly()) {
        // XXX bij rollback only wordt status niet naar ERROR gezet vanwege
        // rollback, zou in aparte transactie moeten
        Stripersist.getEntityManager().getTransaction().rollback();
      } else {
        Stripersist.getEntityManager().merge(this.config);
        Stripersist.getEntityManager().getTransaction().commit();
      }
    }
  }

  private int getCommitPageSize() {
    int commitPageSize;
    try {
      String s = ClobElement.nullSafeGet(config.getConfig().get("commitPageSize"));
      commitPageSize = Integer.parseInt(s);
      if (commitPageSize < 1 || commitPageSize > defaultCommitPageSize) {
        commitPageSize = defaultCommitPageSize;
      }
    } catch (NumberFormatException nfe) {
      commitPageSize = defaultCommitPageSize;
    }

    log.debug("Instellen van commit page size op: " + commitPageSize);
    return commitPageSize;
  }
}