TopNLDirectoryScanner.java

/*
 * Copyright (C) 2017 B3Partners B.V.
 */
package nl.b3p.brmo.service.scanner;

import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.ERROR;
import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.PROCESSING;
import static nl.b3p.brmo.persistence.staging.AutomatischProces.ProcessingStatus.WAITING;
import static nl.b3p.topnl.TopNLType.*;

import java.io.File;
import java.io.FilenameFilter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.persistence.Transient;
import nl.b3p.brmo.loader.util.BrmoException;
import nl.b3p.brmo.persistence.staging.AutomatischProces;
import nl.b3p.brmo.persistence.staging.ClobElement;
import nl.b3p.brmo.persistence.staging.LaadProces;
import nl.b3p.brmo.persistence.staging.TopNLScannerProces;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.stripesstuff.stripersist.Stripersist;

/**
 * Directory scanner for topNL 10/50/100/250 files.
 *
 * @author Meine Toonen
 */
public class TopNLDirectoryScanner extends AbstractExecutableProces {

  private static final Log LOG = LogFactory.getLog(TopNLDirectoryScanner.class);

  private final TopNLScannerProces config;
  private final int defaultCommitPageSize = 1000;
  @Transient private ProgressUpdateListener listener;

  @Transient private Integer filterAlVerwerkt = 0;
  @Transient private Integer aantalGeladen = 0;
  @Transient private Integer progress = 0;

  public static final String[] subdirectoryNames = {
    TOP10NL.getType(), TOP50NL.getType(), TOP100NL.getType(), TOP250NL.getType()
  };

  public TopNLDirectoryScanner(TopNLScannerProces config) {
    this.config = config;
  }

  @Override
  public void execute() throws BrmoException {
    this.execute(
        new ProgressUpdateListener() {
          @Override
          public void total(long total) {}

          @Override
          public void progress(long progress) {}

          @Override
          public void exception(Throwable t) {
            LOG.error(t);
          }

          @Override
          public void updateStatus(String status) {}

          @Override
          public void addLog(String log) {}
        });
  }

  @Override
  public void execute(ProgressUpdateListener listener) {
    this.listener = listener;
    config.setStatus(PROCESSING);
    StringBuilder sb = new StringBuilder(AutomatischProces.LOG_NEWLINE);
    String oldLog = config.getLogfile();
    if (oldLog != null) {
      if (oldLog.length() > OLD_LOG_LENGTH) {
        sb.append(oldLog.substring(oldLog.length() - OLD_LOG_LENGTH / 10));
      } else {
        sb.append(oldLog);
      }
    }

    String msg =
        String.format(
            "De TopNL scanner met ID %d is gestart op %tc.",
            config.getId(), Calendar.getInstance());
    LOG.info(msg);
    listener.addLog(msg);
    sb.append(msg);

    // validatie van de directories, kunnen we lezen/bladeren en evt. schrijven?
    final File scanDirectory = new File(this.config.getScanDirectory());
    if (!scanDirectory.isDirectory() || !scanDirectory.canExecute()) {
      config.setStatus(ERROR);
      msg = String.format("De scan directory '%s' is geen executable directory", scanDirectory);
      config.setLogfile(msg);
      config.setSamenvatting("Er is een fout opgetreden, details staan in de logs");
      this.listener.exception(new BrmoException(msg));
      return;
    }

    config.setLogfile(sb.toString());
    int total = 0;
    Map<String, List<File>> filesPerDir = new HashMap<>();
    FilenameFilter ff = (dir, name) -> name.toLowerCase().endsWith(".gml");
    for (String topNLDir : subdirectoryNames) {
      File subdir = new File(scanDirectory, topNLDir);
      // File files[] = subdir.listFiles();
      List<File> fs = getFilesFromDirectory(subdir, ff);
      total += fs.size();
      filesPerDir.put(topNLDir, fs);
    }

    listener.total(total);
    for (String topNLDir : subdirectoryNames) {
      List<File> files = filesPerDir.get(topNLDir);
      Collections.sort(files);

      processTopNLDirectory(files, scanDirectory, topNLDir);
    }

    Stripersist.getEntityManager().flush();
    Stripersist.getEntityManager().getTransaction().commit();
    Stripersist.getEntityManager().clear();
  }

  private List<File> getFilesFromDirectory(File dir, FilenameFilter ff) {
    List<File> files = new ArrayList<>();
    File[] fs = dir.listFiles(ff);
    files.addAll(Arrays.asList(fs));

    File[] dirs = dir.listFiles();
    for (File d : dirs) {
      if (d.isDirectory()) {
        files.addAll(getFilesFromDirectory(d, ff));
      }
    }

    return files;
  }

  /**
   * verwerk een bestandenlijst.
   *
   * @param files array met xml bestanden
   * @param scanDirectory directory met xml bestanden
   * @param soort aanduiding
   */
  private void processTopNLDirectory(List<File> files, File scanDirectory, String soort) {
    StringBuilder sb = new StringBuilder(AutomatischProces.LOG_NEWLINE + config.getLogfile());
    String msg;
    SimpleDateFormat sdf = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
    int commitPageSize = this.getCommitPageSize();

    msg = String.format("Laden van TopNL type %s.", soort);
    LOG.info(msg);

    listener.addLog(msg);
    for (File f : files) {
      if (f.isDirectory()) {
        continue;
      }
      msg = String.format("Bestand %s is gevonden in %s.", f, scanDirectory);
      LOG.info(msg);
      listener.addLog(msg);
      sb.append(AutomatischProces.LOG_NEWLINE).append(msg).append(AutomatischProces.LOG_NEWLINE);
      if (this.isDuplicaatLaadProces(f, soort)) {
        msg = String.format("  Bestand %s is een duplicaat en wordt overgeslagen.", f);
        listener.addLog(msg);
        LOG.info(msg);
        sb.append(msg).append(AutomatischProces.LOG_NEWLINE);
        filterAlVerwerkt++;
      } else {
        LaadProces lp = new LaadProces();
        lp.setBestand_naam(getBestandsNaam(f));
        lp.setBestand_datum(getBestandsDatum(f));
        lp.setSoort(soort);
        lp.setStatus(LaadProces.STATUS.STAGING_OK);
        lp.setOpmerking(
            String.format(
                "Type %s bestand geladen van %s op %s",
                soort, f.getAbsolutePath(), sdf.format(new Date())));
        lp.setAutomatischProces(
            Stripersist.getEntityManager().find(AutomatischProces.class, config.getId()));
        Stripersist.getEntityManager().persist(lp);
        Stripersist.getEntityManager().merge(this.config);

        aantalGeladen++;
        msg =
            String.format(
                "  Bestand %s is geladen en heeft status: %s. En is van soort: %s",
                f, lp.getStatus(), soort);
        LOG.info(msg);
        this.listener.addLog(msg);
        sb.append(msg).append(AutomatischProces.LOG_NEWLINE);
        if (aantalGeladen % commitPageSize == 0) {
          LOG.debug("Tussentijds opslaan van berichten, 'commitPageSize' is bereikt");
          Stripersist.getEntityManager().flush();
          Stripersist.getEntityManager().getTransaction().commit();
          Stripersist.getEntityManager().clear();
        }
      }
      listener.progress(++progress);
    }
    msg = String.format("Klaar met run op %tc", Calendar.getInstance());
    LOG.info(msg);
    listener.updateStatus(msg);
    listener.addLog(msg);
    sb.append(msg);

    listener.addLog("\n\n**** resultaat ****");
    listener.addLog("\nAantal bestanden die al waren geladen: " + filterAlVerwerkt);
    listener.addLog("\nAantal bestanden geladen: " + aantalGeladen + "\n");

    config.setStatus(WAITING);
    config.setLogfile(sb.toString());
    config.setLastrun(new Date());
    config.updateSamenvattingEnLogfile(
        "Aantal bestanden die al waren verwerkt: "
            + filterAlVerwerkt
            + AutomatischProces.LOG_NEWLINE
            + "Aantal bestanden geladen: "
            + aantalGeladen
            + AutomatischProces.LOG_NEWLINE);
    Stripersist.getEntityManager().merge(config);
  }

  private int getCommitPageSize() {
    int commitPageSize;
    try {
      String s = ClobElement.nullSafeGet(config.getConfig().get("commitPageSize"));
      commitPageSize = Integer.parseInt(s);
      if (commitPageSize < 1 || commitPageSize > defaultCommitPageSize) {
        commitPageSize = defaultCommitPageSize;
      }
    } catch (NumberFormatException nfe) {
      commitPageSize = defaultCommitPageSize;
    }

    LOG.debug("Instellen van commit page size op: " + commitPageSize);
    return commitPageSize;
  }
}