summaryrefslogtreecommitdiff
path: root/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java')
-rw-r--r--src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java74
1 files changed, 50 insertions, 24 deletions
diff --git a/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java b/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java
index 4b7c4a3d..dbad6794 100644
--- a/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java
+++ b/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java
@@ -4,7 +4,7 @@ package de.lmu.ifi.dbs.elki.datasource.bundle;
This file is part of ELKI:
Environment for Developing KDD-Applications Supported by Index-Structures
- Copyright (C) 2013
+ Copyright (C) 2014
Ludwig-Maximilians-Universität München
Lehr- und Forschungseinheit für Datenbanksysteme
ELKI Development Team
@@ -29,9 +29,13 @@ import java.nio.channels.WritableByteChannel;
import de.lmu.ifi.dbs.elki.data.type.SimpleTypeInformation;
import de.lmu.ifi.dbs.elki.data.type.TypeInformationSerializer;
+import de.lmu.ifi.dbs.elki.data.type.TypeUtil;
+import de.lmu.ifi.dbs.elki.database.ids.DBID;
+import de.lmu.ifi.dbs.elki.database.ids.DBIDUtil;
+import de.lmu.ifi.dbs.elki.database.ids.DBIDVar;
import de.lmu.ifi.dbs.elki.logging.Logging;
-import de.lmu.ifi.dbs.elki.persistent.ByteBufferSerializer;
import de.lmu.ifi.dbs.elki.utilities.exceptions.AbortException;
+import de.lmu.ifi.dbs.elki.utilities.io.ByteBufferSerializer;
/**
* Write an object bundle stream to a file channel.
@@ -40,8 +44,8 @@ import de.lmu.ifi.dbs.elki.utilities.exceptions.AbortException;
*
* @author Erich Schubert
*
- * @apiviz.uses BundleStreamSource
- * @apiviz.uses WritableByteChannel
+ * @apiviz.uses BundleStreamSource - - «reads»
+ * @apiviz.uses WritableByteChannel - - «writes»
*/
public class BundleWriter {
/**
@@ -69,22 +73,36 @@ public class BundleWriter {
public void writeBundleStream(BundleStreamSource source, WritableByteChannel output) throws IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(INITIAL_BUFFER);
- ByteBufferSerializer<Object>[] serializers = null;
- loop: while (true) {
+ DBIDVar var = DBIDUtil.newVar();
+ ByteBufferSerializer<?>[] serializers = null;
+ loop: while(true) {
BundleStreamSource.Event ev = source.nextEvent();
- switch(ev) {
+ switch(ev){
case NEXT_OBJECT:
- if (serializers == null) {
+ if(serializers == null) {
serializers = writeHeader(source, buffer, output);
}
- for (int i = 0; i < serializers.length; i++) {
- int size = serializers[i].getByteSize(source.data(i));
+ if(serializers[0] != null) {
+ if(!source.assignDBID(var)) {
+ throw new AbortException("An object did not have an DBID assigned.");
+ }
+ DBID id = DBIDUtil.deref(var);
+ @SuppressWarnings("unchecked")
+ ByteBufferSerializer<DBID> ser = (ByteBufferSerializer<DBID>) serializers[0];
+ int size = ser.getByteSize(id);
buffer = ensureBuffer(size, buffer, output);
- serializers[i].toByteBuffer(buffer, source.data(i));
+ ser.toByteBuffer(buffer, id);
+ }
+ for(int i = 1, j = 0; i < serializers.length; ++i, ++j) {
+ @SuppressWarnings("unchecked")
+ ByteBufferSerializer<Object> ser = (ByteBufferSerializer<Object>) serializers[i];
+ int size = ser.getByteSize(source.data(j));
+ buffer = ensureBuffer(size, buffer, output);
+ ser.toByteBuffer(buffer, source.data(j));
}
break; // switch
case META_CHANGED:
- if (serializers != null) {
+ if(serializers != null) {
throw new AbortException("Meta changes are not supported, once the block header has been written.");
}
break; // switch
@@ -95,7 +113,7 @@ public class BundleWriter {
break; // switch
}
}
- if (buffer.position() > 0) {
+ if(buffer.position() > 0) {
flushBuffer(buffer, output);
}
}
@@ -124,11 +142,11 @@ public class BundleWriter {
* @throws IOException on IO errors
*/
private ByteBuffer ensureBuffer(int size, ByteBuffer buffer, WritableByteChannel output) throws IOException {
- if (buffer.remaining() >= size) {
+ if(buffer.remaining() >= size) {
return buffer;
}
flushBuffer(buffer, output);
- if (buffer.remaining() >= size) {
+ if(buffer.remaining() >= size) {
return buffer;
}
// Aggressively grow the buffer
@@ -144,25 +162,33 @@ public class BundleWriter {
* @return Array of serializers
* @throws IOException on IO errors
*/
- @SuppressWarnings("unchecked")
- private ByteBufferSerializer<Object>[] writeHeader(BundleStreamSource source, ByteBuffer buffer, WritableByteChannel output) throws IOException {
+ private ByteBufferSerializer<?>[] writeHeader(BundleStreamSource source, ByteBuffer buffer, WritableByteChannel output) throws IOException {
final BundleMeta meta = source.getMeta();
final int nummeta = meta.size();
@SuppressWarnings("rawtypes")
- final ByteBufferSerializer[] serializers = new ByteBufferSerializer[nummeta];
+ final ByteBufferSerializer[] serializers = new ByteBufferSerializer[1 + nummeta];
// Write our magic ID first.
assert (buffer.position() == 0) : "Buffer is supposed to be at 0.";
buffer.putInt(MAGIC);
- // Write the number of metas next
- buffer.putInt(nummeta);
- for (int i = 0; i < nummeta; i++) {
+ // Write the number of metas next.
+ // For compatibility with earlier versions, treat DBIDs as extra type
+ if(source.hasDBIDs()) {
+ buffer.putInt(1 + nummeta);
+ ByteBufferSerializer<?> ser = TypeUtil.DBID.getSerializer();
+ TypeInformationSerializer.STATIC.toByteBuffer(buffer, TypeUtil.DBID);
+ serializers[0] = ser;
+ }
+ else {
+ buffer.putInt(nummeta);
+ }
+ for(int i = 0; i < nummeta; i++) {
SimpleTypeInformation<?> type = meta.get(i);
- ByteBufferSerializer<Object> ser = (ByteBufferSerializer<Object>) type.getSerializer();
- if (ser == null) {
+ ByteBufferSerializer<?> ser = type.getSerializer();
+ if(ser == null) {
throw new AbortException("Cannot serialize - no serializer found for type: " + type.toString());
}
TypeInformationSerializer.STATIC.toByteBuffer(buffer, type);
- serializers[i] = ser;
+ serializers[i + 1] = ser;
}
return serializers;
}