BAG2ObjectTableWriter.java
/*
* Copyright (C) 2021 B3Partners B.V.
*
* SPDX-License-Identifier: MIT
*
*/
package nl.b3p.brmo.bag2.schema;
import static nl.b3p.brmo.bag2.schema.BAG2Schema.TIJDSTIP_NIETBAGLV;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import nl.b3p.brmo.bag2.loader.BAG2GMLMutatieGroepStream;
import nl.b3p.brmo.bag2.loader.BAG2Mutatie;
import nl.b3p.brmo.bag2.loader.BAG2MutatieGroep;
import nl.b3p.brmo.bag2.loader.BAG2ToevoegingMutatie;
import nl.b3p.brmo.bag2.loader.BAG2WijzigingMutatie;
import nl.b3p.brmo.schema.ObjectTableWriter;
import nl.b3p.brmo.schema.ObjectType;
import nl.b3p.brmo.schema.SchemaObjectInstance;
import nl.b3p.brmo.schema.SchemaSQLMapper;
import nl.b3p.brmo.schema.mapping.AttributeColumnMapping;
import nl.b3p.brmo.sql.PreparedStatementQueryBatch;
import nl.b3p.brmo.sql.QueryBatch;
import nl.b3p.brmo.sql.dialect.SQLDialect;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class BAG2ObjectTableWriter extends ObjectTableWriter {
private static final Log log = LogFactory.getLog(BAG2ObjectTableWriter.class);
private boolean ignoreDuplicates;
/** Set of seen keys per object type to enable skipping of duplicates */
private Map<BAG2ObjectType, Set<Pair<Object, Object>>> keysPerObjectType = null;
public class BAG2Progress extends Progress {
private Map<ObjectType, QueryBatch> deleteBatches = new HashMap<>();
private long updatedCount = 0;
private BAG2GMLMutatieGroepStream.BagInfo bagInfo;
private BAG2ObjectType currentObjectType = null;
public long getUpdatedCount() {
return updatedCount;
}
@Override
public BAG2ObjectTableWriter getWriter() {
return BAG2ObjectTableWriter.this;
}
public BAG2GMLMutatieGroepStream.BagInfo getMutatieInfo() {
return bagInfo;
}
public BAG2ObjectType getCurrentObjectType() {
return currentObjectType;
}
}
public BAG2ObjectTableWriter(
Connection connection, SQLDialect dialect, SchemaSQLMapper schemaSQLMapper) {
super(connection, dialect, schemaSQLMapper);
this.setProgress(this.new BAG2Progress());
}
public void setIgnoreDuplicates(boolean ignoreDuplicates) {
this.ignoreDuplicates = ignoreDuplicates;
}
public boolean getIgnoreDuplicates() {
return ignoreDuplicates;
}
@Override
public BAG2Progress getProgress() {
return (BAG2Progress) super.getProgress();
}
public Map<BAG2ObjectType, Set<Pair<Object, Object>>> getKeysPerObjectType() {
return keysPerObjectType;
}
public void setKeysPerObjectType(
Map<BAG2ObjectType, Set<Pair<Object, Object>>> keysPerObjectType) {
this.keysPerObjectType = keysPerObjectType;
}
public void start() throws SQLException {
BAG2Progress progress = this.new BAG2Progress();
progress.setInitialLoad(true);
super.start(progress);
updateProgress(Stage.PARSE_INPUT);
}
private void deletePreviousVersion(BAG2Object object) throws Exception {
BAG2ObjectType objectType = object.getObjectType();
Map<ObjectType, QueryBatch> deleteBatches = getProgress().deleteBatches;
if (!deleteBatches.containsKey(objectType)) {
String args =
objectType.getPrimaryKeys().stream()
.map(
k ->
getSchemaSQLMapper().getColumnNameForObjectType(objectType, k.getName())
+ " = ?")
.collect(Collectors.joining(" and "));
// Array attributes are deleted because of 'on delete cascade' on the foreign key
String sql =
String.format(
"delete from %s where %s",
getSchemaSQLMapper().getTableNameForObjectType(objectType, getTablePrefix()), args);
// Set batch size to 1 so deletes are executed immediately, for performance deletes
// could be batched but
// they would need to be executed before inserts of updated versions (with the same
// key), that is more
// complicated
deleteBatches.put(objectType, new PreparedStatementQueryBatch(getConnection(), sql, 1));
}
QueryBatch batch = deleteBatches.get(objectType);
Object[] params =
objectType.getPrimaryKeys().stream()
.map(
pk -> {
try {
AttributeColumnMapping mapping = objectType.getAttributeByName(pk.getName());
Object attribute = object.getAttributes().get(pk.getName());
return mapping.toQueryParameter(attribute, getDialect());
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.toArray();
boolean executed = batch.addBatch(params);
getProgress().updatedCount++;
if (executed) {
updateProgress();
}
}
@Override
protected void addObjectToBatch(SchemaObjectInstance object) throws Exception {
// Never write NIET BAG objects to database
if (!object.getAttributes().containsKey(TIJDSTIP_NIETBAGLV)) {
super.addObjectToBatch(object);
}
}
public void write(InputStream bagXml) throws Exception {
CountingInputStream counter = new CountingInputStream(bagXml);
BAG2GMLMutatieGroepStream bag2Objects = new BAG2GMLMutatieGroepStream(counter);
getProgress().bagInfo = bag2Objects.getBagInfo();
updateProgress(Stage.LOAD_OBJECTS);
try {
for (BAG2MutatieGroep mutatieGroep : bag2Objects) {
for (BAG2Mutatie mutatie : mutatieGroep.getMutaties()) {
if (mutatie instanceof BAG2WijzigingMutatie) {
// Don't do an update but a simpler delete and insert of the updated version
// Executed on main thread, but worker thread will not be executing new
// versions of the record we're
// deleting
// No check for duplicates for wijzigingen: no harm in doing the same
// wijziging twice and we
// can't tell by only the keys if it is exactly the same wijziging but in
// the maandmutaties for
// a different gemeente or the same version changed twice
BAG2WijzigingMutatie wijzigingMutatie = (BAG2WijzigingMutatie) mutatie;
deletePreviousVersion(wijzigingMutatie.getWas());
addObjectToBatch(wijzigingMutatie.getWordt());
} else if (mutatie instanceof BAG2ToevoegingMutatie) {
BAG2ToevoegingMutatie toevoegingMutatie = (BAG2ToevoegingMutatie) mutatie;
if (ignoreDuplicates && isDuplicate(toevoegingMutatie.getToevoeging())) {
continue;
}
prepareDatabaseForObject(toevoegingMutatie.getToevoeging());
getProgress().incrementObjectCount();
addObjectToBatch(toevoegingMutatie.getToevoeging());
}
if (getObjectLimit() != null && getProgress().getObjectCount() == getObjectLimit()) {
break;
}
}
}
} catch (Exception e) {
if (isMultithreading()) {
// Make sure worker thread exits
abortWorkerThread();
}
throw e;
}
}
private boolean isDuplicate(BAG2Object object) {
if (keysPerObjectType == null) {
throw new IllegalStateException(
"keysPerObject type must be set to enable ignoring of duplicates");
}
// Primary keys for all BAG2 objects are always same
Pair<Object, Object> keys =
Pair.of(
object.getAttributes().get("identificatie"),
object.getAttributes().get("voorkomenidentificatie"));
Set<Pair<Object, Object>> seenKeys =
keysPerObjectType.computeIfAbsent(object.getObjectType(), k -> new HashSet<>());
if (seenKeys.contains(keys)) {
if (log.isDebugEnabled()) {
log.debug(
String.format("\rIgnoring duplicate %s %s", object.getObjectType().getName(), keys));
}
return true;
}
seenKeys.add(keys);
return false;
}
@Override
public void complete() throws Exception {
super.endOfObjects();
for (QueryBatch batch : getProgress().deleteBatches.values()) {
batch.executeBatch();
}
super.complete();
super.closeBatches();
}
@Override
public void createKeys(ObjectType objectType) throws Exception {
this.getProgress().currentObjectType = (BAG2ObjectType) objectType;
super.createKeys(objectType);
}
@Override
public void createIndexes(ObjectType objectType) throws Exception {
this.getProgress().currentObjectType = (BAG2ObjectType) objectType;
super.createIndexes(objectType);
}
}