diff options
Diffstat (limited to 'src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java')
-rw-r--r-- | src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java | 74 |
1 files changed, 50 insertions, 24 deletions
diff --git a/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java b/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java index 4b7c4a3d..dbad6794 100644 --- a/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java +++ b/src/de/lmu/ifi/dbs/elki/datasource/bundle/BundleWriter.java @@ -4,7 +4,7 @@ package de.lmu.ifi.dbs.elki.datasource.bundle; This file is part of ELKI: Environment for Developing KDD-Applications Supported by Index-Structures - Copyright (C) 2013 + Copyright (C) 2014 Ludwig-Maximilians-Universität München Lehr- und Forschungseinheit für Datenbanksysteme ELKI Development Team @@ -29,9 +29,13 @@ import java.nio.channels.WritableByteChannel; import de.lmu.ifi.dbs.elki.data.type.SimpleTypeInformation; import de.lmu.ifi.dbs.elki.data.type.TypeInformationSerializer; +import de.lmu.ifi.dbs.elki.data.type.TypeUtil; +import de.lmu.ifi.dbs.elki.database.ids.DBID; +import de.lmu.ifi.dbs.elki.database.ids.DBIDUtil; +import de.lmu.ifi.dbs.elki.database.ids.DBIDVar; import de.lmu.ifi.dbs.elki.logging.Logging; -import de.lmu.ifi.dbs.elki.persistent.ByteBufferSerializer; import de.lmu.ifi.dbs.elki.utilities.exceptions.AbortException; +import de.lmu.ifi.dbs.elki.utilities.io.ByteBufferSerializer; /** * Write an object bundle stream to a file channel. @@ -40,8 +44,8 @@ import de.lmu.ifi.dbs.elki.utilities.exceptions.AbortException; * * @author Erich Schubert * - * @apiviz.uses BundleStreamSource - * @apiviz.uses WritableByteChannel + * @apiviz.uses BundleStreamSource - - «reads» + * @apiviz.uses WritableByteChannel - - «writes» */ public class BundleWriter { /** @@ -69,22 +73,36 @@ public class BundleWriter { public void writeBundleStream(BundleStreamSource source, WritableByteChannel output) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(INITIAL_BUFFER); - ByteBufferSerializer<Object>[] serializers = null; - loop: while (true) { + DBIDVar var = DBIDUtil.newVar(); + ByteBufferSerializer<?>[] serializers = null; + loop: while(true) { BundleStreamSource.Event ev = source.nextEvent(); - switch(ev) { + switch(ev){ case NEXT_OBJECT: - if (serializers == null) { + if(serializers == null) { serializers = writeHeader(source, buffer, output); } - for (int i = 0; i < serializers.length; i++) { - int size = serializers[i].getByteSize(source.data(i)); + if(serializers[0] != null) { + if(!source.assignDBID(var)) { + throw new AbortException("An object did not have an DBID assigned."); + } + DBID id = DBIDUtil.deref(var); + @SuppressWarnings("unchecked") + ByteBufferSerializer<DBID> ser = (ByteBufferSerializer<DBID>) serializers[0]; + int size = ser.getByteSize(id); buffer = ensureBuffer(size, buffer, output); - serializers[i].toByteBuffer(buffer, source.data(i)); + ser.toByteBuffer(buffer, id); + } + for(int i = 1, j = 0; i < serializers.length; ++i, ++j) { + @SuppressWarnings("unchecked") + ByteBufferSerializer<Object> ser = (ByteBufferSerializer<Object>) serializers[i]; + int size = ser.getByteSize(source.data(j)); + buffer = ensureBuffer(size, buffer, output); + ser.toByteBuffer(buffer, source.data(j)); } break; // switch case META_CHANGED: - if (serializers != null) { + if(serializers != null) { throw new AbortException("Meta changes are not supported, once the block header has been written."); } break; // switch @@ -95,7 +113,7 @@ public class BundleWriter { break; // switch } } - if (buffer.position() > 0) { + if(buffer.position() > 0) { flushBuffer(buffer, output); } } @@ -124,11 +142,11 @@ public class BundleWriter { * @throws IOException on IO errors */ private ByteBuffer ensureBuffer(int size, ByteBuffer buffer, WritableByteChannel output) throws IOException { - if (buffer.remaining() >= size) { + if(buffer.remaining() >= size) { return buffer; } flushBuffer(buffer, output); - if (buffer.remaining() >= size) { + if(buffer.remaining() >= size) { return buffer; } // Aggressively grow the buffer @@ -144,25 +162,33 @@ public class BundleWriter { * @return Array of serializers * @throws IOException on IO errors */ - @SuppressWarnings("unchecked") - private ByteBufferSerializer<Object>[] writeHeader(BundleStreamSource source, ByteBuffer buffer, WritableByteChannel output) throws IOException { + private ByteBufferSerializer<?>[] writeHeader(BundleStreamSource source, ByteBuffer buffer, WritableByteChannel output) throws IOException { final BundleMeta meta = source.getMeta(); final int nummeta = meta.size(); @SuppressWarnings("rawtypes") - final ByteBufferSerializer[] serializers = new ByteBufferSerializer[nummeta]; + final ByteBufferSerializer[] serializers = new ByteBufferSerializer[1 + nummeta]; // Write our magic ID first. assert (buffer.position() == 0) : "Buffer is supposed to be at 0."; buffer.putInt(MAGIC); - // Write the number of metas next - buffer.putInt(nummeta); - for (int i = 0; i < nummeta; i++) { + // Write the number of metas next. + // For compatibility with earlier versions, treat DBIDs as extra type + if(source.hasDBIDs()) { + buffer.putInt(1 + nummeta); + ByteBufferSerializer<?> ser = TypeUtil.DBID.getSerializer(); + TypeInformationSerializer.STATIC.toByteBuffer(buffer, TypeUtil.DBID); + serializers[0] = ser; + } + else { + buffer.putInt(nummeta); + } + for(int i = 0; i < nummeta; i++) { SimpleTypeInformation<?> type = meta.get(i); - ByteBufferSerializer<Object> ser = (ByteBufferSerializer<Object>) type.getSerializer(); - if (ser == null) { + ByteBufferSerializer<?> ser = type.getSerializer(); + if(ser == null) { throw new AbortException("Cannot serialize - no serializer found for type: " + type.toString()); } TypeInformationSerializer.STATIC.toByteBuffer(buffer, type); - serializers[i] = ser; + serializers[i + 1] = ser; } return serializers; } |