BGTObjectTableWriter.java
/*
* Copyright (C) 2021 B3Partners B.V.
*
* SPDX-License-Identifier: MIT
*
*/
package nl.b3p.brmo.bgt.schema;
import static nl.b3p.brmo.bgt.schema.BGTObject.MutatieStatus.WAS_WORDT;
import static nl.b3p.brmo.bgt.schema.BGTObject.MutatieStatus.WORDT;
import static nl.b3p.brmo.bgt.schema.BGTSchema.EIND_REGISTRATIE;
import java.io.InputStream;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import nl.b3p.brmo.bgt.loader.BGTObjectStreamer;
import nl.b3p.brmo.schema.ObjectTableWriter;
import nl.b3p.brmo.schema.ObjectType;
import nl.b3p.brmo.schema.SchemaSQLMapper;
import nl.b3p.brmo.sql.PreparedStatementQueryBatch;
import nl.b3p.brmo.sql.QueryBatch;
import nl.b3p.brmo.sql.dialect.SQLDialect;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class BGTObjectTableWriter extends ObjectTableWriter {
private static final Log log = LogFactory.getLog(BGTObjectTableWriter.class);
private boolean currentObjectsOnly = true;
public boolean isCurrentObjectsOnly() {
return currentObjectsOnly;
}
public void setCurrentObjectsOnly(boolean currentObjectsOnly) {
this.currentObjectsOnly = currentObjectsOnly;
}
public class BGTProgress extends ObjectTableWriter.Progress {
private CountingInputStream counter;
private Map<ObjectType, QueryBatch> deleteBatches = new HashMap<>();
private long objectUpdatedCount = 0;
private long objectRemovedCount = 0;
private long historicObjectsCount = 0;
private BGTObjectStreamer.MutatieInhoud mutatieInhoud;
public long getBytesRead() {
return counter.getByteCount();
}
public long getObjectUpdatedCount() {
return objectUpdatedCount;
}
public long getObjectRemovedCount() {
return objectRemovedCount;
}
public long getHistoricObjectsCount() {
return historicObjectsCount;
}
public BGTObjectStreamer.MutatieInhoud getMutatieInhoud() {
return mutatieInhoud;
}
@Override
public BGTObjectTableWriter getWriter() {
return BGTObjectTableWriter.this;
}
}
public BGTObjectTableWriter(
Connection connection, SQLDialect dialect, SchemaSQLMapper schemaSQLMapper) {
super(connection, dialect, schemaSQLMapper);
}
@Override
public BGTProgress getProgress() {
return (BGTProgress) super.getProgress();
}
private void deletePreviousVersion(BGTObject object) throws Exception {
BGTObjectType objectType = object.getObjectType();
String idAttributeName = objectType.getPrimaryKeys().get(0).getName();
String tableName = getSchemaSQLMapper().getTableNameForObjectType(objectType, getTablePrefix());
Map<ObjectType, QueryBatch> deleteBatches = getProgress().deleteBatches;
if (!deleteBatches.containsKey(objectType)) {
String sql =
"delete from "
+ tableName
+ " where "
+ getSchemaSQLMapper().getColumnNameForObjectType(objectType, idAttributeName)
+ " = ?";
deleteBatches.put(
objectType, new PreparedStatementQueryBatch(getConnection(), sql, getBatchSize()));
}
QueryBatch batch = deleteBatches.get(objectType);
boolean executed = batch.addBatch(new Object[] {object.getMutatiePreviousVersionGmlId()});
for (ObjectType oneToManyObjectType : objectType.getOneToManyAttributeObjectTypes()) {
if (!deleteBatches.containsKey(oneToManyObjectType)) {
String tableNameNoPrefix = getSchemaSQLMapper().getTableNameForObjectType(objectType, "");
String sql =
"delete from "
+ getSchemaSQLMapper()
.getTableNameForObjectType(oneToManyObjectType, getTablePrefix())
+ " where "
+ getSchemaSQLMapper()
.getColumnNameForObjectType(
oneToManyObjectType, tableNameNoPrefix + idAttributeName)
+ " = ?";
deleteBatches.put(
oneToManyObjectType,
new PreparedStatementQueryBatch(getConnection(), sql, getBatchSize()));
}
QueryBatch deleteBatch = deleteBatches.get(oneToManyObjectType);
executed =
executed | deleteBatch.addBatch(new Object[] {object.getMutatiePreviousVersionGmlId()});
}
if (executed) {
updateProgress();
}
}
public void write(InputStream bgtXml) throws Exception {
BGTProgress progress = this.new BGTProgress();
super.start(progress);
updateProgress(Stage.PARSE_INPUT);
try (CountingInputStream counter = new CountingInputStream(bgtXml)) {
progress.counter = counter;
BGTObjectStreamer streamer = new BGTObjectStreamer(counter);
progress.mutatieInhoud = streamer.getMutatieInhoud();
updateProgress(Stage.LOAD_OBJECTS);
progress.setInitialLoad(
progress.mutatieInhoud == null
|| "initial".equals(progress.getMutatieInhoud().getMutatieType()));
for (BGTObject object : streamer) {
// We must prepare even if all objects are historic (such as can happen with
// "ongeclassificeerdobject"),
// otherwise there won't be a table for updates
prepareDatabaseForObject(object);
boolean skipHistoricObject =
object.getAttributes().get(EIND_REGISTRATIE) != null && currentObjectsOnly;
if (object.getMutatieStatus() == WAS_WORDT) {
// Deletes and inserts do not need to be DELETE'd and INSERT-ed in order,
// because different versions
// still have a unique primary key value (gml ID)
// Also delete oneToMany
deletePreviousVersion(object);
if (skipHistoricObject) {
progress.objectRemovedCount++;
} else {
progress.objectUpdatedCount++;
}
} else if (object.getMutatieStatus() == WORDT) {
if (skipHistoricObject) {
progress.historicObjectsCount++;
} else {
progress.incrementObjectCount();
}
}
if (skipHistoricObject) {
continue;
}
addObjectToBatch(object);
if (getObjectLimit() != null && progress.getObjectCount() == getObjectLimit()) {
break;
}
}
super.endOfObjects();
for (QueryBatch batch : progress.deleteBatches.values()) {
batch.executeBatch();
}
super.complete();
} finally {
super.closeBatches();
progress.deleteBatches.values().stream().forEach(QueryBatch::closeQuietly);
}
}
}