summaryrefslogtreecommitdiff
path: root/src/de/lmu/ifi/dbs/elki/persistent/PersistentPageFile.java
blob: b40e56f5a1e50a113e2effff2cb3280a50dc4a38 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
package de.lmu.ifi.dbs.elki.persistent;

/*
 This file is part of ELKI:
 Environment for Developing KDD-Applications Supported by Index-Structures

 Copyright (C) 2011
 Ludwig-Maximilians-Universität München
 Lehr- und Forschungseinheit für Datenbanksysteme
 ELKI Development Team

 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU Affero General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU Affero General Public License for more details.

 You should have received a copy of the GNU Affero General Public License
 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;

import de.lmu.ifi.dbs.elki.index.tree.TreeIndexHeader;
import de.lmu.ifi.dbs.elki.logging.Logging;
import de.lmu.ifi.dbs.elki.utilities.exceptions.AbortException;

/**
 * A PersistentPageFile stores objects persistently that implement the
 * <code>Page</code> interface. For convenience each page is represented by a
 * single file. All pages are stored in a specified directory.
 * 
 * @author Elke Achtert
 * 
 * @apiviz.composedOf PageHeader
 * @apiviz.composedOf RandomAccessFile
 * 
 * @param <P> Page type
 */
public class PersistentPageFile<P extends Page> extends AbstractStoringPageFile<P> {
  /**
   * Our logger
   */
  private static final Logging logger = Logging.getLogger(PersistentPageFile.class);

  /**
   * Indicates an empty page.
   */
  private static final int EMPTY_PAGE = 0;

  /**
   * Indicates a filled page.
   */
  private static final int FILLED_PAGE = 1;

  /**
   * The file storing the pages.
   */
  private final RandomAccessFile file;

  /**
   * The header of this page file.
   */
  protected PageHeader header;

  /**
   * The type of pages we use.
   */
  protected final Class<P> pageclass;

  /**
   * Whether we are initializing from an existing file.
   */
  private boolean existed;

  /**
   * Creates a new PersistentPageFile from an existing file.
   * 
   * @param pageSize the page size
   * @param pageclass the class of pages to be used
   */
  public PersistentPageFile(int pageSize, String fileName, Class<P> pageclass) {
    super(pageSize);
    this.pageclass = pageclass;
    // init the file
    File f = new File(fileName);

    // create from existing file
    existed = f.exists();
    try {
      file = new RandomAccessFile(f, "rw");
    }
    catch(IOException e) {
      throw new AbortException("IO error in loading persistent page file.", e);
    }
  }

  /**
   * Reads the page with the given id from this file.
   * 
   * @param pageID the id of the page to be returned
   * @return the page with the given pageId
   */
  @Override
  public P readPage(int pageID) {
    try {
      readAccess++;
      long offset = ((long) (header.getReservedPages() + pageID)) * (long) pageSize;
      byte[] buffer = new byte[pageSize];
      file.seek(offset);
      file.read(buffer);
      return byteArrayToPage(buffer);
    }
    catch(IOException e) {
      throw new RuntimeException("IOException occurred during reading of page " + pageID + "\n", e);
    }
  }

  /**
   * Deletes the node with the specified id from this file.
   * 
   * @param pageID the id of the node to be deleted
   */
  @Override
  public void deletePage(int pageID) {
    try {
      // / put id to empty pages list
      super.deletePage(pageID);

      // delete from file
      writeAccess++;
      byte[] array = pageToByteArray(null);
      long offset = ((long) (header.getReservedPages() + pageID)) * (long) pageSize;
      file.seek(offset);
      file.write(array);
    }
    catch(IOException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * This method is called by the cache if the <code>page</code> is not longer
   * stored in the cache and has to be written to disk.
   * 
   * @param page the page which has to be written to disk
   */
  @Override
  public void writePage(Integer pageID, P page) {
    try {
      writeAccess++;
      byte[] array = pageToByteArray(page);
      long offset = ((long) (header.getReservedPages() + pageID)) * (long) pageSize;
      assert offset >= 0 : header.getReservedPages() + " " + pageID + " " + pageSize + " " + offset;
      file.seek(offset);
      file.write(array);
      page.setDirty(false);
    }
    catch(IOException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Closes this file.
   */
  @Override
  public void close() {
    try {
      super.close();
      if(!emptyPages.isEmpty() && header instanceof TreeIndexHeader) {
        // write the list of empty pages to the end of the file
        ((TreeIndexHeader) header).writeEmptyPages(emptyPages, file);
      }
      ((TreeIndexHeader) header).setLargestPageID(nextPageID);
      header.writeHeader(file);
      file.close();
    }
    catch(IOException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Clears this PageFile.
   */
  @Override
  public void clear() {
    try {
      file.setLength(header.size());
    }
    catch(IOException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Reconstruct a serialized object from the specified byte array.
   * 
   * @param array the byte array from which the object should be reconstructed
   * @return a serialized object from the specified byte array
   */
  private P byteArrayToPage(byte[] array) {
    try {
      ByteArrayInputStream bais = new ByteArrayInputStream(array);
      ObjectInputStream ois = new ObjectInputStream(bais);
      int type = ois.readInt();
      if(type == EMPTY_PAGE) {
        return null;
      }
      else if(type == FILLED_PAGE) {
        P page;
        try {
          page = pageclass.newInstance();
          page.readExternal(ois);
        }
        catch(InstantiationException e) {
          throw new AbortException("Error instanciating an index page", e);
        }
        catch(IllegalAccessException e) {
          throw new AbortException("Error instanciating an index page", e);
        }
        catch(ClassNotFoundException e) {
          throw new AbortException("Error instanciating an index page", e);
        }
        return page;
      }
      else {
        throw new IllegalArgumentException("Unknown type: " + type);
      }
    }
    catch(IOException e) {
      throw new AbortException("IO Error in page file", e);
    }
  }

  /**
   * Serializes an object into a byte array.
   * 
   * @param page the object to be serialized
   * @return the byte array
   */
  private byte[] pageToByteArray(P page) {
    try {
      if(page == null) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeInt(EMPTY_PAGE);
        oos.close();
        baos.close();
        byte[] array = baos.toByteArray();
        byte[] result = new byte[pageSize];
        System.arraycopy(array, 0, result, 0, array.length);
        return result;
      }
      else {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        oos.writeInt(FILLED_PAGE);
        page.writeExternal(oos);
        oos.close();
        baos.close();
        byte[] array = baos.toByteArray();
        if(array.length > this.pageSize) {
          throw new IllegalArgumentException("Size of page " + page + " is greater than specified" + " pagesize: " + array.length + " > " + pageSize);
        }
        else if(array.length == this.pageSize) {
          return array;
        }

        else {
          byte[] result = new byte[pageSize];
          System.arraycopy(array, 0, result, 0, array.length);
          return result;
        }
      }
    }
    catch(IOException e) {
      throw new RuntimeException("IOException occurred! ", e);
    }
  }

  /** @return the random access file storing the pages. */
  public RandomAccessFile getFile() {
    return file;
  }

  /**
   * Get the header of this persistent page file.
   * 
   * @return the header used by this page file
   */
  public PageHeader getHeader() {
    return header;
  }

  /**
   * Increases the {@link AbstractStoringPageFile#readAccess readAccess} counter by
   * one.
   */
  public void increaseReadAccess() {
    readAccess++;
  }

  /**
   * Increases the {@link AbstractStoringPageFile#writeAccess writeAccess} counter by
   * one.
   */
  public void increaseWriteAccess() {
    writeAccess++;
  }

  /**
   * Set the next page id to the given value. If this means that any page ids
   * stored in <code>emptyPages</code> are smaller than
   * <code>next_page_id</code>, they are removed from this file's observation
   * stack.
   * 
   * @param next_page_id the id of the next page to be inserted (if there are no
   *        more empty pages to be filled)
   */
  @Override
  public void setNextPageID(int next_page_id) {
    this.nextPageID = next_page_id;
    while(!emptyPages.isEmpty() && emptyPages.peek() >= this.nextPageID) {
      emptyPages.pop();
    }
  }

  @Override
  public boolean initialize(PageHeader header) {
    try {
      if(existed) {
        logger.debug("Initializing from an existing page file.");

        // init the header
        this.header = header;
        header.readHeader(file);

        // reading empty nodes in Stack
        if(header instanceof TreeIndexHeader) {
          TreeIndexHeader tiHeader = (TreeIndexHeader) header;
          nextPageID = tiHeader.getLargestPageID();
          try {
            emptyPages = tiHeader.readEmptyPages(file);
          }
          catch(ClassNotFoundException e) {
            throw new RuntimeException("ClassNotFoundException occurred when reading empty pages.", e);
          }
        }
        else { // must scan complete file
          int i = 0;
          while(file.getFilePointer() + pageSize <= file.length()) {
            long offset = ((long) (header.getReservedPages() + i)) * (long) pageSize;
            byte[] buffer = new byte[pageSize];
            file.seek(offset);
            file.read(buffer);

            ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
            ObjectInputStream ois = new ObjectInputStream(bais);
            int type = ois.readInt();
            if(type == EMPTY_PAGE) {
              emptyPages.push(i);
            }
            else if(type == FILLED_PAGE) {
              nextPageID = i + 1;
            }
            else {
              throw new IllegalArgumentException("Unknown type: " + type);
            }
            i++;
          }
        }
      }
      // create new file
      else {
        logger.debug("Initializing with a new page file.");

        // writing header
        this.header = header;
        header.writeHeader(file);
      }
    }
    catch(IOException e) {
      throw new RuntimeException("IOException occurred.", e);
    }

    // Return "new file" status
    return existed;
  }
}