BGTLoaderMain.java

/*
 * Copyright (C) 2021 B3Partners B.V.
 *
 * SPDX-License-Identifier: MIT
 */

package nl.b3p.brmo.bgt.loader.cli;

import static nl.b3p.brmo.bgt.loader.Utils.formatTimeSince;
import static nl.b3p.brmo.bgt.loader.Utils.getBundleString;
import static nl.b3p.brmo.bgt.loader.Utils.getLoaderVersion;
import static nl.b3p.brmo.bgt.loader.Utils.getMessageFormattedString;
import static nl.b3p.brmo.bgt.loader.Utils.getUserAgent;
import static nl.b3p.brmo.bgt.schema.BGTSchemaMapper.Metadata;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import nl.b3p.brmo.bgt.download.model.DeltaCustomDownloadRequest;
import nl.b3p.brmo.bgt.loader.BGTDatabase;
import nl.b3p.brmo.bgt.loader.ProgressReporter;
import nl.b3p.brmo.bgt.loader.ResumingBGTDownloadInputStream;
import nl.b3p.brmo.bgt.loader.Utils;
import nl.b3p.brmo.bgt.schema.BGTObjectTableWriter;
import nl.b3p.brmo.bgt.schema.BGTSchemaMapper;
import nl.b3p.brmo.sql.dialect.SQLDialect;
import nl.b3p.brmo.util.CountingSeekableByteChannel;
import nl.b3p.brmo.util.http.HttpSeekableByteChannel;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.io.input.CloseShieldInputStream;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.PropertyConfigurator;
import org.geotools.util.logging.Logging;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.ExitCode;
import picocli.CommandLine.IVersionProvider;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

@Command(
    name = "bgt-loader",
    mixinStandardHelpOptions = true,
    versionProvider = BGTLoaderMain.class,
    resourceBundle = Utils.BUNDLE_NAME,
    subcommands = {DownloadCommand.class})
public class BGTLoaderMain implements IVersionProvider {
  private static Log log;

  /**
   * init logging.
   *
   * @param standAlone set to {@code false} when using in a preconfigured environment, eg. calling
   *     methods from a servlet, use {@code true} for commandline usage.
   */
  public static void configureLogging(boolean standAlone) {
    if (standAlone) {
      PropertyConfigurator.configure(
          BGTLoaderMain.class.getResourceAsStream("/bgt-loader-cli-log4.properties"));
      log = LogFactory.getLog(BGTLoaderMain.class);
      try {
        Logging.ALL.setLoggerFactory("org.geotools.util.logging.Log4JLoggerFactory");
      } catch (ClassNotFoundException ignored) {
      }
    } else {
      log = LogFactory.getLog(BGTLoaderMain.class);
    }
  }

  public static void main(String[] args) {
    configureLogging(true);
    CommandLine cmd = new CommandLine(new BGTLoaderMain()).setUsageHelpAutoWidth(true);
    System.exit(cmd.execute(args));
  }

  @Command(name = "schema", sortOptions = false)
  public int schema(
      @Option(names = "--dialect", paramLabel = "<dialect>", defaultValue = "postgis")
          BGTDatabase.SQLDialectEnum dialectEnum,
      @Mixin FeatureTypeSelectionOptions featureTypeSelectionOptions,
      @Option(names = "--table-prefix", defaultValue = "", hidden = true) String tablePrefix,
      @Option(
              names = {"-h", "--help"},
              usageHelp = true)
          boolean showHelp)
      throws SQLException {
    SQLDialect dialect = BGTDatabase.createDialect(dialectEnum);
    // For schema generation include plaatsbepalingspunt with 'all' and 'bgt'
    if (featureTypeSelectionOptions.featureTypes.contains("all")
        || featureTypeSelectionOptions.featureTypes.contains("bgt")) {
      featureTypeSelectionOptions
          .getFeatureTypes()
          .add(DeltaCustomDownloadRequest.FeaturetypesEnum.PLAATSBEPALINGSPUNT.getValue());
    }
    Set<String> tableNames =
        featureTypeSelectionOptions.getFeatureTypesList().stream()
            .map(DeltaCustomDownloadRequest.FeaturetypesEnum::getValue)
            .collect(Collectors.toSet());
    BGTSchemaMapper bgtSchemaMapper = BGTSchemaMapper.getInstance();
    bgtSchemaMapper.printSchema(
        dialect,
        tablePrefix,
        objectType ->
            tableNames.contains(bgtSchemaMapper.getTableNameForObjectType(objectType, "")));
    return ExitCode.OK;
  }

  @Command(name = "load", sortOptions = false)
  public int load(
      @Mixin DatabaseOptions dbOptions,
      @Mixin LoadOptions loadOptions,
      @Mixin FeatureTypeSelectionOptions featureTypeSelectionOptions,
      @Parameters(paramLabel = "<file>") String file,
      @Mixin CLIOptions cliOptions,
      @Option(
              names = {"-h", "--help"},
              usageHelp = true)
          boolean showHelp)
      throws Exception {

    log.info(getUserAgent());

    try (BGTDatabase db = new BGTDatabase(dbOptions)) {
      BGTObjectTableWriter writer = db.createObjectTableWriter(loadOptions, dbOptions);

      ProgressReporter progressReporter =
          cliOptions.isConsoleProgressEnabled()
              ? new ConsoleProgressReporter()
              : new ProgressReporter();
      writer.setProgressUpdater(progressReporter);

      if (loadOptions.createSchema) {
        db.createMetadataTable(loadOptions);
      }

      if (file.endsWith(".zip") && (file.startsWith("http://") || file.startsWith("https://"))) {
        loadZipFromURI(new URI(file), writer, featureTypeSelectionOptions, loadOptions, true);
      } else if (file.endsWith(".zip")) {
        loadZip(new File(file), writer, featureTypeSelectionOptions);
      } else if (file.matches(".*\\.[xg]ml")) {
        loadXml(new File(file), writer);
      } else {
        log.error(getMessageFormattedString("load.invalid_extension", file));
        return ExitCode.USAGE;
      }

      if (writer.getProgress() == null) {
        log.error(getBundleString("error.no_feature_types"));
        return ExitCode.SOFTWARE;
      }
      db.setMetadataValue(Metadata.LOADER_VERSION, getLoaderVersion());
      // Set feature types list from options, not MutatieInhoud (if input has it)...
      // FIXME if downloaded initial extract has less object types, update will fail -- should
      // set to only encountered
      // feature types
      db.setFeatureTypesEnumMetadata(featureTypeSelectionOptions.getFeatureTypesList());
      db.setMetadataValue(Metadata.INCLUDE_HISTORY, loadOptions.includeHistory + "");
      db.setMetadataValue(Metadata.LINEARIZE_CURVES, loadOptions.linearizeCurves + "");
      db.setMetadataValue(Metadata.TABLE_PREFIX, loadOptions.tablePrefix);
      BGTObjectTableWriter.BGTProgress progress = writer.getProgress();
      if (progress.getMutatieInhoud() != null) {
        db.setMetadataForMutaties(progress.getMutatieInhoud());
        db.setMetadataValue(Metadata.GEOM_FILTER, progress.getMutatieInhoud().getGebied());

        log.info(
            getMessageFormattedString(
                "load.mutatie",
                progress.getMutatieInhoud().getMutatieType(),
                progress.getMutatieInhoud().getLeveringsId()));
      }
      progressReporter.reportTotalSummary();
      db.getConnection().commit();
    }

    return ExitCode.OK;
  }

  private static boolean isBGTZipEntrySelected(
      String entryName,
      FeatureTypeSelectionOptions featureTypeSelectionOptions,
      boolean logSkipAsInfo) {
    Set<DeltaCustomDownloadRequest.FeaturetypesEnum> featureTypes =
        featureTypeSelectionOptions.getFeatureTypesList();
    Pattern p = Pattern.compile("bgt_(.+).[xg]ml");

    Matcher m = p.matcher(entryName);
    if (!m.matches()) {
      log.warn(getMessageFormattedString("load.skip_entry", entryName));
      return false;
    }
    String tableName = m.group(1);
    try {
      DeltaCustomDownloadRequest.FeaturetypesEnum featureType =
          DeltaCustomDownloadRequest.FeaturetypesEnum.fromValue(tableName);
      if (!featureTypes.contains(featureType)) {
        String msg = getMessageFormattedString("load.skip_unselected", tableName);
        if (logSkipAsInfo) {
          log.info(msg);
        } else {
          log.debug(msg);
        }
        return false;
      } else {
        return true;
      }
    } catch (IllegalArgumentException e) {
      log.warn(getMessageFormattedString("load.skip_unknown_feature_type", entryName));
      return false;
    }
  }

  public void loadZipFromURI(
      URI uri,
      BGTObjectTableWriter writer,
      FeatureTypeSelectionOptions featureTypeSelectionOptions,
      LoadOptions loadOptions,
      boolean showSelected)
      throws Exception {
    log.info(getMessageFormattedString("download.downloading_from", uri));
    if (loadOptions.isHttpZipRandomAccess()) {
      loadZipFromURIUsingRandomAccess(
          uri, writer, featureTypeSelectionOptions, showSelected, loadOptions.isDebugHttpSeeks());
    } else {
      loadZipFromURIUsingStreaming(uri, writer, featureTypeSelectionOptions);
    }
  }

  public void loadZipFromURIUsingRandomAccess(
      URI uri,
      BGTObjectTableWriter writer,
      FeatureTypeSelectionOptions featureTypeSelectionOptions,
      boolean showSelected,
      boolean debugHttpSeeks)
      throws Exception {
    Instant start = Instant.now();

    // NOTE: it can happen that not all entries from a ZIP are read because of
    // https://issues.apache.org/jira/browse/COMPRESS-584
    // This happened with
    // https://api.pdok.nl/lv/bgt/download/v1_0/cache/2/ebe787b3-e113-4331-ab96-edd1e9bf5aa7/bgt-citygml-nl-nopbp.zip

    try (HttpSeekableByteChannel channel =
            new HttpSeekableByteChannel(uri).withDebug(debugHttpSeeks);
        CountingSeekableByteChannel loggingChannel = new CountingSeekableByteChannel(channel);
        org.apache.commons.compress.archivers.zip.ZipFile zipFile =
            new org.apache.commons.compress.archivers.zip.ZipFile(
                loggingChannel, uri.toString(), "UTF8", false, true)) {
      if (debugHttpSeeks) {
        System.out.println();
      }
      int count = 0;
      long uncompressed = 0;
      for (Iterator<ZipArchiveEntry> it = zipFile.getEntries().asIterator(); it.hasNext(); ) {
        ZipArchiveEntry entry = it.next();
        count++;
        uncompressed += entry.getSize();
      }
      log.info(
          getMessageFormattedString(
              "download.zip.read",
              formatTimeSince(start),
              count,
              byteCountToDisplaySize(channel.size()),
              byteCountToDisplaySize(uncompressed)));
      if (debugHttpSeeks) {
        log.info(
            getMessageFormattedString(
                "download.zip.debug-http-seeks.entries",
                loggingChannel.getNonConsecutiveIops(),
                channel.getHttpRequestCount(),
                channel.getBytesRead()));
      }
      loggingChannel.setLoggingEnabled(false);

      ProgressReporter progressReporter = (ProgressReporter) writer.getProgressUpdater();
      List<ZipArchiveEntry> selected = new ArrayList<>();
      zipFile
          .getEntries()
          .asIterator()
          .forEachRemaining(
              entry -> {
                if (isBGTZipEntrySelected(entry.getName(), featureTypeSelectionOptions, false)) {
                  selected.add(entry);
                }
              });

      if (selected.size() > 1) {
        // Only report total percentage when more than one entry
        long totalSize = selected.stream().map(ZipArchiveEntry::getSize).reduce(0L, Long::sum);
        Long totalCompressedSize =
            selected.stream().map(ZipArchiveEntry::getCompressedSize).reduce(0L, Long::sum);
        progressReporter.setTotalBytes(totalSize);
        if (showSelected) {
          log.info(
              getMessageFormattedString(
                  "download.zip.selected",
                  selected.size(),
                  byteCountToDisplaySize(totalCompressedSize),
                  byteCountToDisplaySize(totalSize)));
        }
      }
      Long[] previousEntriesBytesRead = new Long[] {0L};
      progressReporter.setTotalBytesReadFunction(
          () -> previousEntriesBytesRead[0] + writer.getProgress().getBytesRead());

      for (ZipArchiveEntry entry : selected) {
        progressReporter.startNewFile(entry.getName(), entry.getSize());
        writer.write(zipFile.getInputStream(entry));
      }
      if (debugHttpSeeks) {
        log.info(
            getMessageFormattedString(
                "download.zip.debug-http-seeks.totals",
                loggingChannel.getNonConsecutiveIops(),
                channel.getHttpRequestCount(),
                channel.getBytesRead(),
                byteCountToDisplaySize(channel.getBytesRead())));
      }
    }
  }

  public void loadZipFromURIUsingStreaming(
      URI downloadURI,
      BGTObjectTableWriter writer,
      FeatureTypeSelectionOptions featureTypeSelectionOptions)
      throws Exception {
    ProgressReporter progressReporter = (ProgressReporter) writer.getProgressUpdater();

    try (InputStream input = new ResumingBGTDownloadInputStream(downloadURI, writer)) {
      CountingInputStream countingInputStream = new CountingInputStream(input);
      progressReporter.setTotalBytesReadFunction(countingInputStream::getByteCount);

      try (ZipInputStream zis = new ZipInputStream(countingInputStream)) {
        ZipEntry entry = zis.getNextEntry();
        while (entry != null) {
          if (isBGTZipEntrySelected(entry.getName(), featureTypeSelectionOptions, true)) {
            progressReporter.startNewFile(entry.getName(), null);
            writer.write(CloseShieldInputStream.wrap(zis));
          }
          entry = zis.getNextEntry();
        }
      }
    }
  }

  public void loadZip(
      File file,
      BGTObjectTableWriter writer,
      FeatureTypeSelectionOptions featureTypeSelectionOptions)
      throws Exception {
    try (ZipFile zipFile = new ZipFile(file)) {
      List<ZipEntry> entries =
          zipFile.stream()
              .filter(
                  entry ->
                      isBGTZipEntrySelected(entry.getName(), featureTypeSelectionOptions, false))
              .collect(Collectors.toList());

      ProgressReporter progressReporter = (ProgressReporter) writer.getProgressUpdater();
      if (entries.size() > 1) {
        // Only report total percentage when more than one entry
        Long totalSize = entries.stream().map(ZipEntry::getSize).reduce(0L, Long::sum);
        progressReporter.setTotalBytes(totalSize);
      }
      Long[] previousEntriesBytesRead = new Long[] {0L};
      progressReporter.setTotalBytesReadFunction(
          () -> previousEntriesBytesRead[0] + writer.getProgress().getBytesRead());

      for (ZipEntry entry : entries) {
        try (InputStream in = zipFile.getInputStream(entry)) {
          // getSize() will not return -1 because ZipFile uses random access to read the
          // ZIP central directory
          loadInputStream(entry.getName(), in, entry.getSize(), writer);
          previousEntriesBytesRead[0] += entry.getSize();
        }
      }
      progressReporter.reportTotalSummary();
    }
  }

  public void loadXml(File file, BGTObjectTableWriter writer) throws Exception {
    try (FileInputStream in = new FileInputStream(file)) {
      loadInputStream(file.getName(), in, file.length(), writer);
    }
  }

  public void loadInputStream(
      String name, InputStream input, long size, BGTObjectTableWriter writer) throws Exception {
    ProgressReporter progressReporter = (ProgressReporter) writer.getProgressUpdater();
    progressReporter.startNewFile(name, size);
    writer.write(input);
  }

  @Override
  public String[] getVersion() {
    return new String[] {Utils.getLoaderVersion(), Utils.getUserAgent()};
  }
}