1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
|
'''
metadata.py - this file is part of S3QL.
Copyright © 2008 Nikolaus Rath <Nikolaus@rath.org>
This work can be distributed under the terms of the GNU GPLv3.
'''
from .logging import logging # Ensure use of custom logger class
from .database import Connection
from . import BUFSIZE
from .common import pretty_print_size
from .deltadump import INTEGER, BLOB, dump_table, load_table
from .backends.common import NoSuchObject, CorruptedObjectError
import os
import tempfile
import bz2
import stat
log = logging.getLogger(__name__)
# Has to be kept in sync with create_tables()!
DUMP_SPEC = [
('objects', 'id', (('id', INTEGER, 1),
('size', INTEGER),
('refcount', INTEGER))),
('blocks', 'id', (('id', INTEGER, 1),
('hash', BLOB, 32),
('size', INTEGER),
('obj_id', INTEGER, 1),
('refcount', INTEGER))),
('inodes', 'id', (('id', INTEGER, 1),
('uid', INTEGER),
('gid', INTEGER),
('mode', INTEGER),
('mtime_ns', INTEGER),
('atime_ns', INTEGER),
('ctime_ns', INTEGER),
('size', INTEGER),
('rdev', INTEGER),
('locked', INTEGER),
('refcount', INTEGER))),
('inode_blocks', 'inode, blockno',
(('inode', INTEGER),
('blockno', INTEGER, 1),
('block_id', INTEGER, 1))),
('symlink_targets', 'inode', (('inode', INTEGER, 1),
('target', BLOB))),
('names', 'id', (('id', INTEGER, 1),
('name', BLOB),
('refcount', INTEGER))),
('contents', 'parent_inode, name_id',
(('name_id', INTEGER, 1),
('inode', INTEGER, 1),
('parent_inode', INTEGER))),
('ext_attributes', 'inode', (('inode', INTEGER),
('name_id', INTEGER),
('value', BLOB))),
]
def restore_metadata(fh, dbfile):
'''Read metadata from *fh* and write into *dbfile*
Return database connection to *dbfile*.
*fh* must be able to return an actual file descriptor from
its `fileno` method.
*dbfile* will be created with 0600 permissions. Data is
first written into a temporary file *dbfile* + '.tmp', and
the file is renamed once all data has been loaded.
'''
tmpfile = dbfile + '.tmp'
fd = os.open(tmpfile, os.O_RDWR | os.O_CREAT | os.O_TRUNC,
stat.S_IRUSR | stat.S_IWUSR)
try:
os.close(fd)
db = Connection(tmpfile)
db.execute('PRAGMA locking_mode = NORMAL')
db.execute('PRAGMA synchronous = OFF')
db.execute('PRAGMA journal_mode = OFF')
create_tables(db)
for (table, _, columns) in DUMP_SPEC:
log.info('..%s..', table)
load_table(table, columns, db=db, fh=fh)
db.execute('ANALYZE')
# We must close the database to rename it
db.close()
except:
os.unlink(tmpfile)
raise
os.rename(tmpfile, dbfile)
return Connection(dbfile)
def cycle_metadata(backend, keep=10):
'''Rotate metadata backups'''
# Since we always overwrite the source afterwards, we can
# use either copy or rename - so we pick whatever is faster.
if backend.has_native_rename:
cycle_fn = backend.rename
else:
cycle_fn = backend.copy
log.info('Backing up old metadata...')
for i in range(keep)[::-1]:
try:
cycle_fn("s3ql_metadata_bak_%d" % i, "s3ql_metadata_bak_%d" % (i + 1))
except NoSuchObject:
pass
# If we use backend.rename() and crash right after this instruction,
# we will end up without an s3ql_metadata object. However, fsck.s3ql
# is smart enough to use s3ql_metadata_new in this case.
try:
cycle_fn("s3ql_metadata", "s3ql_metadata_bak_0")
except NoSuchObject:
# In case of mkfs, there may be no metadata object yet
pass
cycle_fn("s3ql_metadata_new", "s3ql_metadata")
# Note that we can't compare with "is" (maybe because the bound-method
# is re-created on the fly on access?)
if cycle_fn == backend.copy:
backend.delete('s3ql_metadata_new')
def dump_metadata(db, fh):
'''Dump metadata into fh
*fh* must be able to return an actual file descriptor from
its `fileno` method.
'''
locking_mode = db.get_val('PRAGMA locking_mode')
try:
# Ensure that we don't hold a lock on the db
# (need to access DB to actually release locks)
db.execute('PRAGMA locking_mode = NORMAL')
db.has_val('SELECT rowid FROM %s LIMIT 1' % DUMP_SPEC[0][0])
for (table, order, columns) in DUMP_SPEC:
log.info('..%s..', table)
dump_table(table, order, columns, db=db, fh=fh)
finally:
db.execute('PRAGMA locking_mode = %s' % locking_mode)
def create_tables(conn):
# Table of storage objects
# Refcount is included for performance reasons
# size == -1 indicates block has not been uploaded yet
conn.execute("""
CREATE TABLE objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
refcount INT NOT NULL,
size INT NOT NULL
)""")
# Table of known data blocks
# Refcount is included for performance reasons
conn.execute("""
CREATE TABLE blocks (
id INTEGER PRIMARY KEY,
hash BLOB(32) UNIQUE,
refcount INT,
size INT NOT NULL,
obj_id INTEGER NOT NULL REFERENCES objects(id)
)""")
# Table with filesystem metadata
# The number of links `refcount` to an inode can in theory
# be determined from the `contents` table. However, managing
# this separately should be significantly faster (the information
# is required for every getattr!)
conn.execute("""
CREATE TABLE inodes (
-- id has to specified *exactly* as follows to become
-- an alias for the rowid.
id INTEGER PRIMARY KEY AUTOINCREMENT,
uid INT NOT NULL,
gid INT NOT NULL,
mode INT NOT NULL,
mtime_ns INT NOT NULL,
atime_ns INT NOT NULL,
ctime_ns INT NOT NULL,
refcount INT NOT NULL,
size INT NOT NULL DEFAULT 0,
rdev INT NOT NULL DEFAULT 0,
locked BOOLEAN NOT NULL DEFAULT 0
)""")
# Further Blocks used by inode (blockno >= 1)
conn.execute("""
CREATE TABLE inode_blocks (
inode INTEGER NOT NULL REFERENCES inodes(id),
blockno INT NOT NULL,
block_id INTEGER NOT NULL REFERENCES blocks(id),
PRIMARY KEY (inode, blockno)
)""")
# Symlinks
conn.execute("""
CREATE TABLE symlink_targets (
inode INTEGER PRIMARY KEY REFERENCES inodes(id),
target BLOB NOT NULL
)""")
# Names of file system objects
conn.execute("""
CREATE TABLE names (
id INTEGER PRIMARY KEY,
name BLOB NOT NULL,
refcount INT NOT NULL,
UNIQUE (name)
)""")
# Table of filesystem objects
# rowid is used by readdir() to restart at the correct position
conn.execute("""
CREATE TABLE contents (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
name_id INT NOT NULL REFERENCES names(id),
inode INT NOT NULL REFERENCES inodes(id),
parent_inode INT NOT NULL REFERENCES inodes(id),
UNIQUE (parent_inode, name_id)
)""")
# Extended attributes
conn.execute("""
CREATE TABLE ext_attributes (
inode INTEGER NOT NULL REFERENCES inodes(id),
name_id INTEGER NOT NULL REFERENCES names(id),
value BLOB NOT NULL,
PRIMARY KEY (inode, name_id)
)""")
# Shortcuts
conn.execute("""
CREATE VIEW contents_v AS
SELECT * FROM contents JOIN names ON names.id = name_id
""")
conn.execute("""
CREATE VIEW ext_attributes_v AS
SELECT * FROM ext_attributes JOIN names ON names.id = name_id
""")
def stream_write_bz2(ifh, ofh):
'''Compress *ifh* into *ofh* using bz2 compression'''
compr = bz2.BZ2Compressor(9)
while True:
buf = ifh.read(BUFSIZE)
if not buf:
break
buf = compr.compress(buf)
if buf:
ofh.write(buf)
buf = compr.flush()
if buf:
ofh.write(buf)
def stream_read_bz2(ifh, ofh):
'''Uncompress bz2 compressed *ifh* into *ofh*'''
decompressor = bz2.BZ2Decompressor()
while True:
buf = ifh.read(BUFSIZE)
if not buf:
break
buf = decompressor.decompress(buf)
if buf:
ofh.write(buf)
if decompressor.unused_data or ifh.read(1) != b'':
raise CorruptedObjectError('Data after end of bz2 stream')
def download_metadata(backend, db_file, name='s3ql_metadata'):
with tempfile.TemporaryFile() as tmpfh:
def do_read(fh):
tmpfh.seek(0)
tmpfh.truncate()
stream_read_bz2(fh, tmpfh)
log.info('Downloading and decompressing metadata...')
backend.perform_read(do_read, name)
log.info("Reading metadata...")
tmpfh.seek(0)
return restore_metadata(tmpfh, db_file)
def dump_and_upload_metadata(backend, db, param):
with tempfile.TemporaryFile() as fh:
log.info('Dumping metadata...')
dump_metadata(db, fh)
upload_metadata(backend, fh, param)
def upload_metadata(backend, fh, param):
log.info("Compressing and uploading metadata...")
def do_write(obj_fh):
fh.seek(0)
stream_write_bz2(fh, obj_fh)
return obj_fh
obj_fh = backend.perform_write(do_write, "s3ql_metadata_new",
metadata=param, is_compressed=True)
log.info('Wrote %s of compressed metadata.',
pretty_print_size(obj_fh.get_obj_size()))
log.info('Cycling metadata backups...')
cycle_metadata(backend)
|