BerichtPipelineThread.java

package nl.b3p.brmo.loader.pipeline;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import nl.b3p.brmo.loader.BerichtenHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.javasimon.SimonManager;
import org.javasimon.Split;

/**
 * @author Matthijs Laan
 */
public abstract class BerichtPipelineThread extends Thread {
  final Log log;

  private final BlockingQueue<BerichtWorkUnit> queue;
  final BerichtenHandler handler;
  private final String stopwatchPrefix;

  private boolean stopWhenQueueEmpty = false;
  private boolean abort = false;
  private Exception qe = null;

  public BerichtPipelineThread(BerichtenHandler handler, int capacity, String stopwatchPrefix) {
    log = LogFactory.getLog(this.getClass());
    this.queue = new LinkedBlockingQueue(capacity);
    this.handler = handler;
    this.stopwatchPrefix = stopwatchPrefix;
  }

  public void stopWhenQueueEmpty() {
    this.stopWhenQueueEmpty = true;
  }

  public void setAbortFlag() {
    this.abort = true;
  }

  public BlockingQueue<BerichtWorkUnit> getQueue() throws Exception {
    if (qe != null) {
      throw qe;
    }
    return queue;
  }

  abstract void work(BerichtWorkUnit workUnit) throws Exception;

  @Override
  public void run() {
    log.info("started, waiting for work");

    while (true) {
      BerichtWorkUnit workUnit = null;
      try {
        Split poll = SimonManager.getStopwatch(stopwatchPrefix + ".poll").start();
        workUnit = queue.poll(1, TimeUnit.SECONDS);
        poll.stop();
      } catch (InterruptedException e) {
        log.info("interrupted while polling work queue");
      }

      if (abort) {
        log.info("aborting!");
        return;
      }

      if (stopWhenQueueEmpty && workUnit == null) {
        log.info("work queue empty, stopping");
        return;
      }
      if (workUnit == null) {
        continue;
      }
      int queueSize = queue.size();
      if (log.isDebugEnabled()) {
        log.info(
            String.format(
                "processing %s for work unit bericht id %d, %s (queue size %d)",
                workUnit.getTypeOfWork().toString(),
                workUnit.getBericht().getId(),
                workUnit.getBericht().getObjectRef(),
                queueSize));
      }
      SimonManager.getCounter(stopwatchPrefix + ".queuesize").set(queueSize);
      try {
        work(workUnit);
      } catch (Exception e) {
        qe = e;
        abort = true;
        // Do not log stacktrace, in database bericht.opmerking
        log.error(
            "work method threw exception (continuing): " + e.getClass() + ": " + e.getMessage());
      }
    }
  }
}