summaryrefslogtreecommitdiff
path: root/src/s3ql/metadata.py
blob: 5a8bffb1b5444167103db36bfcd0e25494311bec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
'''
metadata.py - this file is part of S3QL.

Copyright © 2008 Nikolaus Rath <Nikolaus@rath.org>

This work can be distributed under the terms of the GNU GPLv3.
'''

from .logging import logging # Ensure use of custom logger class
from .database import Connection
from . import BUFSIZE
from .common import pretty_print_size
from .deltadump import INTEGER, BLOB, dump_table, load_table
from .backends.common import NoSuchObject, CorruptedObjectError
import os
import tempfile
import bz2
import stat

log = logging.getLogger(__name__)

# Has to be kept in sync with create_tables()!
DUMP_SPEC = [
             ('objects', 'id', (('id', INTEGER, 1),
                                ('size', INTEGER),
                                ('refcount', INTEGER))),

             ('blocks', 'id', (('id', INTEGER, 1),
                             ('hash', BLOB, 32),
                             ('size', INTEGER),
                             ('obj_id', INTEGER, 1),
                             ('refcount', INTEGER))),

             ('inodes', 'id', (('id', INTEGER, 1),
                               ('uid', INTEGER),
                               ('gid', INTEGER),
                               ('mode', INTEGER),
                               ('mtime_ns', INTEGER),
                               ('atime_ns', INTEGER),
                               ('ctime_ns', INTEGER),
                               ('size', INTEGER),
                               ('rdev', INTEGER),
                               ('locked', INTEGER),
                               ('refcount', INTEGER))),

             ('inode_blocks', 'inode, blockno',
              (('inode', INTEGER),
               ('blockno', INTEGER, 1),
               ('block_id', INTEGER, 1))),

             ('symlink_targets', 'inode', (('inode', INTEGER, 1),
                                           ('target', BLOB))),

             ('names', 'id', (('id', INTEGER, 1),
                              ('name', BLOB),
                              ('refcount', INTEGER))),

             ('contents', 'parent_inode, name_id',
              (('name_id', INTEGER, 1),
               ('inode', INTEGER, 1),
               ('parent_inode', INTEGER))),

             ('ext_attributes', 'inode', (('inode', INTEGER),
                                          ('name_id', INTEGER),
                                          ('value', BLOB))),
]



def restore_metadata(fh, dbfile):
    '''Read metadata from *fh* and write into *dbfile*

    Return database connection to *dbfile*.

    *fh* must be able to return an actual file descriptor from
    its `fileno` method.

    *dbfile* will be created with 0600 permissions. Data is
    first written into a temporary file *dbfile* + '.tmp', and
    the file is renamed once all data has been loaded.
    '''

    tmpfile = dbfile + '.tmp'
    fd = os.open(tmpfile, os.O_RDWR | os.O_CREAT | os.O_TRUNC,
                 stat.S_IRUSR | stat.S_IWUSR)
    try:
        os.close(fd)

        db = Connection(tmpfile)
        db.execute('PRAGMA locking_mode = NORMAL')
        db.execute('PRAGMA synchronous = OFF')
        db.execute('PRAGMA journal_mode = OFF')
        create_tables(db)

        for (table, _, columns) in DUMP_SPEC:
            log.info('..%s..', table)
            load_table(table, columns, db=db, fh=fh)
        db.execute('ANALYZE')

        # We must close the database to rename it
        db.close()
    except:
        os.unlink(tmpfile)
        raise

    os.rename(tmpfile, dbfile)

    return Connection(dbfile)

def cycle_metadata(backend, keep=10):
    '''Rotate metadata backups'''

    # Since we always overwrite the source afterwards, we can
    # use either copy or rename - so we pick whatever is faster.
    if backend.has_native_rename:
        cycle_fn = backend.rename
    else:
        cycle_fn = backend.copy

    log.info('Backing up old metadata...')
    for i in range(keep)[::-1]:
        try:
            cycle_fn("s3ql_metadata_bak_%d" % i, "s3ql_metadata_bak_%d" % (i + 1))
        except NoSuchObject:
            pass

    # If we use backend.rename() and crash right after this instruction,
    # we will end up without an s3ql_metadata object. However, fsck.s3ql
    # is smart enough to use s3ql_metadata_new in this case.
    try:
        cycle_fn("s3ql_metadata", "s3ql_metadata_bak_0")
    except NoSuchObject:
        # In case of mkfs, there may be no metadata object yet
        pass
    cycle_fn("s3ql_metadata_new", "s3ql_metadata")

    # Note that we can't compare with "is" (maybe because the bound-method
    # is re-created on the fly on access?)
    if cycle_fn == backend.copy:
        backend.delete('s3ql_metadata_new')

def dump_metadata(db, fh):
    '''Dump metadata into fh

    *fh* must be able to return an actual file descriptor from
    its `fileno` method.
    '''

    locking_mode = db.get_val('PRAGMA locking_mode')
    try:
        # Ensure that we don't hold a lock on the db
        # (need to access DB to actually release locks)
        db.execute('PRAGMA locking_mode = NORMAL')
        db.has_val('SELECT rowid FROM %s LIMIT 1' % DUMP_SPEC[0][0])

        for (table, order, columns) in DUMP_SPEC:
            log.info('..%s..', table)
            dump_table(table, order, columns, db=db, fh=fh)

    finally:
        db.execute('PRAGMA locking_mode = %s' % locking_mode)


def create_tables(conn):
    # Table of storage objects
    # Refcount is included for performance reasons
    # size == -1 indicates block has not been uploaded yet
    conn.execute("""
    CREATE TABLE objects (
        id        INTEGER PRIMARY KEY AUTOINCREMENT,
        refcount  INT NOT NULL,
        size      INT NOT NULL
    )""")

    # Table of known data blocks
    # Refcount is included for performance reasons
    conn.execute("""
    CREATE TABLE blocks (
        id        INTEGER PRIMARY KEY,
        hash      BLOB(32) UNIQUE,
        refcount  INT,
        size      INT NOT NULL,
        obj_id    INTEGER NOT NULL REFERENCES objects(id)
    )""")

    # Table with filesystem metadata
    # The number of links `refcount` to an inode can in theory
    # be determined from the `contents` table. However, managing
    # this separately should be significantly faster (the information
    # is required for every getattr!)
    conn.execute("""
    CREATE TABLE inodes (
        -- id has to specified *exactly* as follows to become
        -- an alias for the rowid.
        id        INTEGER PRIMARY KEY AUTOINCREMENT,
        uid       INT NOT NULL,
        gid       INT NOT NULL,
        mode      INT NOT NULL,
        mtime_ns  INT NOT NULL,
        atime_ns  INT NOT NULL,
        ctime_ns  INT NOT NULL,
        refcount  INT NOT NULL,
        size      INT NOT NULL DEFAULT 0,
        rdev      INT NOT NULL DEFAULT 0,
        locked    BOOLEAN NOT NULL DEFAULT 0
    )""")

    # Further Blocks used by inode (blockno >= 1)
    conn.execute("""
    CREATE TABLE inode_blocks (
        inode     INTEGER NOT NULL REFERENCES inodes(id),
        blockno   INT NOT NULL,
        block_id    INTEGER NOT NULL REFERENCES blocks(id),
        PRIMARY KEY (inode, blockno)
    )""")

    # Symlinks
    conn.execute("""
    CREATE TABLE symlink_targets (
        inode     INTEGER PRIMARY KEY REFERENCES inodes(id),
        target    BLOB NOT NULL
    )""")

    # Names of file system objects
    conn.execute("""
    CREATE TABLE names (
        id     INTEGER PRIMARY KEY,
        name   BLOB NOT NULL,
        refcount  INT NOT NULL,
        UNIQUE (name)
    )""")

    # Table of filesystem objects
    # rowid is used by readdir() to restart at the correct position
    conn.execute("""
    CREATE TABLE contents (
        rowid     INTEGER PRIMARY KEY AUTOINCREMENT,
        name_id   INT NOT NULL REFERENCES names(id),
        inode     INT NOT NULL REFERENCES inodes(id),
        parent_inode INT NOT NULL REFERENCES inodes(id),

        UNIQUE (parent_inode, name_id)
    )""")

    # Extended attributes
    conn.execute("""
    CREATE TABLE ext_attributes (
        inode     INTEGER NOT NULL REFERENCES inodes(id),
        name_id   INTEGER NOT NULL REFERENCES names(id),
        value     BLOB NOT NULL,

        PRIMARY KEY (inode, name_id)
    )""")

    # Shortcuts
    conn.execute("""
    CREATE VIEW contents_v AS
    SELECT * FROM contents JOIN names ON names.id = name_id
    """)
    conn.execute("""
    CREATE VIEW ext_attributes_v AS
    SELECT * FROM ext_attributes JOIN names ON names.id = name_id
    """)

def stream_write_bz2(ifh, ofh):
    '''Compress *ifh* into *ofh* using bz2 compression'''

    compr = bz2.BZ2Compressor(9)
    while True:
        buf = ifh.read(BUFSIZE)
        if not buf:
            break
        buf = compr.compress(buf)
        if buf:
            ofh.write(buf)
    buf = compr.flush()
    if buf:
        ofh.write(buf)

def stream_read_bz2(ifh, ofh):
    '''Uncompress bz2 compressed *ifh* into *ofh*'''

    decompressor = bz2.BZ2Decompressor()
    while True:
        buf = ifh.read(BUFSIZE)
        if not buf:
            break
        buf = decompressor.decompress(buf)
        if buf:
            ofh.write(buf)

    if decompressor.unused_data or ifh.read(1) != b'':
        raise CorruptedObjectError('Data after end of bz2 stream')

def download_metadata(backend, db_file, name='s3ql_metadata'):
    with tempfile.TemporaryFile() as tmpfh:
        def do_read(fh):
            tmpfh.seek(0)
            tmpfh.truncate()
            stream_read_bz2(fh, tmpfh)

        log.info('Downloading and decompressing metadata...')
        backend.perform_read(do_read, name)

        log.info("Reading metadata...")
        tmpfh.seek(0)
        return restore_metadata(tmpfh, db_file)

def dump_and_upload_metadata(backend, db, param):
    with tempfile.TemporaryFile() as fh:
        log.info('Dumping metadata...')
        dump_metadata(db, fh)
        upload_metadata(backend, fh, param)

def upload_metadata(backend, fh, param):
    log.info("Compressing and uploading metadata...")
    def do_write(obj_fh):
        fh.seek(0)
        stream_write_bz2(fh, obj_fh)
        return obj_fh
    obj_fh = backend.perform_write(do_write, "s3ql_metadata_new",
                                   metadata=param, is_compressed=True)
    log.info('Wrote %s of compressed metadata.',
             pretty_print_size(obj_fh.get_obj_size()))

    log.info('Cycling metadata backups...')
    cycle_metadata(backend)