/* * Copyright (C) 2015 higherfrequencytrading.com * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 3 of the License. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program. If not, see . */ package net.openhft.chronicle.bytes; import net.openhft.chronicle.core.*; import net.openhft.chronicle.core.annotation.ForceInline; import net.openhft.chronicle.core.io.IORuntimeException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.misc.Cleaner; import sun.nio.ch.DirectBuffer; import java.lang.reflect.Field; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @SuppressWarnings("sunapi") public class NativeBytesStore implements BytesStore, Underlying> { private static final long MEMORY_MAPPED_SIZE = 128 << 10; private static final Logger LOGGER = LoggerFactory.getLogger(NativeBytesStore.class); private static final Field BB_ADDRESS, BB_CAPACITY; static { Class directBB = ByteBuffer.allocateDirect(0).getClass(); BB_ADDRESS = Jvm.getField(directBB, "address"); BB_CAPACITY = Jvm.getField(directBB, "capacity"); } @Nullable private final Throwable createdHere = Jvm.isDebug() ? new Throwable("Created here") : null; // on release, set this to null. @Nullable protected Memory memory = OS.memory(); protected long address; long maximumLimit; @Nullable private Cleaner cleaner; private final ReferenceCounter refCount = ReferenceCounter.onReleased(this::performRelease); private boolean elastic; @Nullable private Underlying underlyingObject; private Error releasedHere; private NativeBytesStore() { } private NativeBytesStore(@NotNull ByteBuffer bb, boolean elastic) { init(bb, elastic); } public NativeBytesStore( long address, long maximumLimit) { this(address, maximumLimit, null, false); } public NativeBytesStore( long address, long maximumLimit, @Nullable Runnable deallocator, boolean elastic) { setAddress(address); this.maximumLimit = maximumLimit; cleaner = deallocator == null ? null : Cleaner.create(this, deallocator); underlyingObject = null; this.elastic = elastic; } @NotNull public static NativeBytesStore wrap(@NotNull ByteBuffer bb) { return new NativeBytesStore<>(bb, false); } @NotNull public static NativeBytesStore uninitialized() { return new NativeBytesStore<>(); } /** * this is an elastic native store * * @param capacity of the buffer. */ @NotNull public static NativeBytesStore nativeStore(long capacity) throws IllegalArgumentException { return of(capacity, true, true); } @NotNull private static NativeBytesStore of(long capacity, boolean zeroOut, boolean elastic) throws IllegalArgumentException { Memory memory = OS.memory(); long address = memory.allocate(capacity); if (zeroOut || capacity < MEMORY_MAPPED_SIZE) { memory.setMemory(address, capacity, (byte) 0); memory.storeFence(); } Deallocator deallocator = new Deallocator(address, capacity); return new NativeBytesStore<>(address, capacity, deallocator, elastic); } @NotNull public static NativeBytesStore nativeStoreWithFixedCapacity(long capacity) throws IllegalArgumentException { return of(capacity, true, false); } @NotNull public static NativeBytesStore lazyNativeBytesStoreWithFixedCapacity(long capacity) throws IllegalArgumentException { return of(capacity, false, false); } @NotNull public static NativeBytesStore elasticByteBuffer() { return elasticByteBuffer(OS.pageSize()); } @NotNull public static NativeBytesStore elasticByteBuffer(int size) { return new NativeBytesStore<>(ByteBuffer.allocateDirect(size), true); } public void init(@NotNull ByteBuffer bb, boolean elastic) { this.elastic = elastic; underlyingObject = (Underlying) bb; setAddress(((DirectBuffer) bb).address()); this.maximumLimit = bb.capacity(); cleaner = ((DirectBuffer) bb).cleaner(); } public void uninit() { underlyingObject = null; address = 0; maximumLimit = 0; cleaner = null; } @NotNull @Override public BytesStore, Underlying> copy() throws IllegalStateException { if (underlyingObject == null) { NativeBytesStore copy = of(realCapacity(), false, true); OS.memory().copyMemory(address, copy.address, capacity()); return (BytesStore) copy; } else if (underlyingObject instanceof ByteBuffer) { ByteBuffer bb = ByteBuffer.allocateDirect(Maths.toInt32(capacity())); bb.put((ByteBuffer) underlyingObject); bb.clear(); return (BytesStore) wrap(bb); } else { throw new UnsupportedOperationException(); } } @Override public VanillaBytes bytesForWrite() throws IllegalStateException { return elastic ? new NativeBytes<>(this) : new VanillaBytes<>(this); } @Override @ForceInline public long realCapacity() { return maximumLimit; } @Override @ForceInline public long capacity() { return maximumLimit; } @Nullable @Override @ForceInline public Underlying underlyingObject() { return underlyingObject; } @NotNull @Override @ForceInline public NativeBytesStore zeroOut(long start, long end) throws IllegalArgumentException { if (start < writePosition() || end > writeLimit()) throw new IllegalArgumentException("position: " + writePosition() + ", start: " + start + ", end: " + end + ", limit: " + writeLimit()); if (start >= end) return this; memory.setMemory(address + translate(start), end - start, (byte) 0); return this; } @Override @ForceInline public boolean compareAndSwapInt(long offset, int expected, int value) { return memory.compareAndSwapInt(address + translate(offset), expected, value); } @Override @ForceInline public boolean compareAndSwapLong(long offset, long expected, long value) { return memory.compareAndSwapLong(address + translate(offset), expected, value); } long translate(long offset) { long offset2 = offset - start(); // assert checkTranslatedBounds(offset2); return offset2; } public long start() { return 0L; } @Override public void reserve() throws IllegalStateException { refCount.reserve(); } @Override public void release() throws IllegalStateException { refCount.release(); if (Jvm.isDebug() && refCount.get() == 0) releasedHere = new Error("Released here"); } @Override public long refCount() { return refCount.get(); } @Override @ForceInline public byte readByte(long offset) { if (Jvm.isDebug()) checkReleased(); return memory.readByte(address + translate(offset)); } public void checkReleased() { if (releasedHere != null) throw new InternalError("Accessing a released resource", releasedHere); } @Override @ForceInline public short readShort(long offset) { return memory.readShort(address + translate(offset)); } @Override @ForceInline public int readInt(long offset) { return memory.readInt(address + translate(offset)); } @Override @ForceInline public long readLong(long offset) { return memory.readLong(address + translate(offset)); } @Override @ForceInline public float readFloat(long offset) { return memory.readFloat(address + translate(offset)); } @Override @ForceInline public double readDouble(long offset) { return memory.readDouble(address + translate(offset)); } @Override @ForceInline public byte readVolatileByte(long offset) { return memory.readVolatileByte(address + translate(offset)); } @Override @ForceInline public short readVolatileShort(long offset) { return memory.readVolatileShort(address + translate(offset)); } @Override @ForceInline public int readVolatileInt(long offset) { return memory.readVolatileInt(address + translate(offset)); } @Override @ForceInline public long readVolatileLong(long offset) { return memory.readVolatileLong(address + translate(offset)); } @NotNull @Override @ForceInline public NativeBytesStore writeByte(long offset, byte i8) { memory.writeByte(address + translate(offset), i8); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeShort(long offset, short i16) { memory.writeShort(address + translate(offset), i16); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeInt(long offset, int i32) { memory.writeInt(address + translate(offset), i32); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeOrderedInt(long offset, int i) { memory.writeOrderedInt(address + translate(offset), i); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeLong(long offset, long i64) { memory.writeLong(address + translate(offset), i64); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeOrderedLong(long offset, long i) { memory.writeOrderedLong(address + translate(offset), i); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeFloat(long offset, float f) { memory.writeFloat(address + translate(offset), f); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeDouble(long offset, double d) { memory.writeDouble(address + translate(offset), d); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeVolatileByte(long offset, byte i8) { memory.writeVolatileByte(address + translate(offset), i8); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeVolatileShort(long offset, short i16) { memory.writeVolatileShort(address + translate(offset), i16); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeVolatileInt(long offset, int i32) { memory.writeVolatileInt(address + translate(offset), i32); return this; } @NotNull @Override @ForceInline public NativeBytesStore writeVolatileLong(long offset, long i64) { memory.writeVolatileLong(address + translate(offset), i64); return this; } @NotNull @Override @ForceInline public NativeBytesStore write( long offsetInRDO, byte[] bytes, int offset, int length) { memory.copyMemory(bytes, offset, address + translate(offsetInRDO), length); return this; } @Override @ForceInline public void write( long offsetInRDO, @NotNull ByteBuffer bytes, int offset, int length) { if (bytes.isDirect()) { memory.copyMemory(((DirectBuffer) bytes).address(), address + translate(offsetInRDO), length); } else { memory.copyMemory(bytes.array(), offset, address + translate(offsetInRDO), length); } } @NotNull @Override @ForceInline public NativeBytesStore write( long offsetInRDO, @NotNull RandomDataInput bytes, long offset, long length) throws BufferOverflowException, BufferUnderflowException, IORuntimeException { // TODO optimize, call unsafe.copyMemory when possible, copy 4, 2 bytes at once long i = 0; for (; i < length - 7; i += 8) { writeLong(offsetInRDO + i, bytes.readLong(offset + i)); } for (; i < length; i++) { writeByte(offsetInRDO + i, bytes.readByte(offset + i)); } return this; } @Override public long address(long offset) throws UnsupportedOperationException { if (offset < start() || offset >= capacity()) throw new IllegalArgumentException(); return address + translate(offset); } private void performRelease() { if (refCount.get() > 0) { LOGGER.info("NativeBytesStore discarded without releasing ", createdHere); } memory = null; if (cleaner != null) cleaner.clean(); } @NotNull @Override public String toString() { try { return BytesInternal.toString(this); } catch (IllegalStateException | IORuntimeException e) { return e.toString(); } } @Override @ForceInline public void nativeRead(long position, long address, long size) { // TODO add bounds checking. OS.memory().copyMemory(address(position), address, size); } @Override @ForceInline public void nativeWrite(long address, long position, long size) { // TODO add bounds checking. this.memory.copyMemory(address, address(position), size); } void write8bit(long position, char[] chars, int offset, int length) { long addr = address + translate(position); Memory memory = this.memory; for (int i = 0; i < length; i++) memory.writeByte(addr + i, (byte) chars[offset + i]); } void read8bit(long position, char[] chars, int length) { long addr = address + translate(position); Memory memory = OS.memory(); for (int i = 0; i < length; i++) chars[i] = (char) (memory.readByte(addr + i) & 0xFF); } public long readIncompleteLong(long offset) { int remaining = (int) Math.min(8, readRemaining() - offset); long l = 0; for (int i = 0; i < remaining; i++) { byte b = memory.readByte(address + offset + i); l |= (long) (b & 0xFF) << (i * 8); } return l; } @Override public boolean equals(Object obj) { try { return obj instanceof BytesStore && BytesInternal.contentEqual(this, (BytesStore) obj); } catch (IORuntimeException e) { throw new AssertionError(e); } } public void setAddress(long address) { if ((address & ~0x3FFF) == 0) throw new AssertionError("Invalid address " + Long.toHexString(address)); this.address = address; } @Deprecated public long appendUTF(long pos, char[] chars, int offset, int length) { return appendUtf8(pos, chars, offset, length); } public long appendUtf8(long pos, char[] chars, int offset, int length) { if (pos + length > realCapacity()) throw new BufferOverflowException(); long address = this.address + translate(0); Memory memory = this.memory; int i; ascii: { for (i = 0; i < length; i++) { char c = chars[offset + i]; if (c > 0x007F) break ascii; memory.writeByte(address + pos++, (byte) c); } return pos; } return appendUTF0(pos, chars, offset, length, i); } private long appendUTF0(long pos, char[] chars, int offset, int length, int i) { for (; i < length; i++) { char c = chars[offset + i]; if (c <= 0x007F) { writeByte(pos++, (byte) c); } else if (c <= 0x07FF) { writeByte(pos++, (byte) (0xC0 | ((c >> 6) & 0x1F))); writeByte(pos++, (byte) (0x80 | c & 0x3F)); } else { writeByte(pos++, (byte) (0xE0 | ((c >> 12) & 0x0F))); writeByte(pos++, (byte) (0x80 | ((c >> 6) & 0x3F))); writeByte(pos++, (byte) (0x80 | (c & 0x3F))); } } return pos; } @Override public long copyTo(@NotNull BytesStore store) throws IllegalStateException, IORuntimeException { if (store instanceof NativeBytesStore) return copyTo((NativeBytesStore) store); else return BytesStore.super.copyTo(store); } public long copyTo(NativeBytesStore store) { long addr = address; long addr2 = store.address; long read = readRemaining(); long toWrite = writeRemaining(); if (toWrite < read) throw new BufferUnderflowException(); Memory memory = OS.memory(); memory.copyMemory(addr, addr2, read); return read; } @Override public ByteBuffer toTemporaryDirectByteBuffer() { ByteBuffer bb = ByteBuffer.allocateDirect(0); try { BB_ADDRESS.setLong(bb, address); BB_CAPACITY.setInt(bb, Maths.toUInt31(readRemaining())); } catch (Exception e) { throw new AssertionError(e); } bb.clear(); return bb; } static class Deallocator implements Runnable { private volatile long address, size; Deallocator(long address, long size) { assert address != 0; this.address = address; this.size = size; } @Override public void run() { if (address == 0) return; long addressToFree = address; address = 0; OS.memory().freeMemory(addressToFree, size); } } }