summaryrefslogtreecommitdiff
path: root/cmd/restic/lock.go
blob: 11c1ed8f564583a01c882367b27cf346ac432b94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/restic/restic/internal/debug"
	"github.com/restic/restic/internal/errors"
	"github.com/restic/restic/internal/restic"
)

type lockContext struct {
	lock      *restic.Lock
	cancel    context.CancelFunc
	refreshWG sync.WaitGroup
}

var globalLocks struct {
	locks map[*restic.Lock]*lockContext
	sync.Mutex
	sync.Once
}

func lockRepo(ctx context.Context, repo restic.Repository, retryLock time.Duration, json bool) (*restic.Lock, context.Context, error) {
	return lockRepository(ctx, repo, false, retryLock, json)
}

func lockRepoExclusive(ctx context.Context, repo restic.Repository, retryLock time.Duration, json bool) (*restic.Lock, context.Context, error) {
	return lockRepository(ctx, repo, true, retryLock, json)
}

var (
	retrySleepStart = 5 * time.Second
	retrySleepMax   = 60 * time.Second
)

func minDuration(a, b time.Duration) time.Duration {
	if a <= b {
		return a
	}
	return b
}

// lockRepository wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func lockRepository(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, json bool) (*restic.Lock, context.Context, error) {
	// make sure that a repository is unlocked properly and after cancel() was
	// called by the cleanup handler in global.go
	globalLocks.Do(func() {
		AddCleanupHandler(unlockAll)
	})

	lockFn := restic.NewLock
	if exclusive {
		lockFn = restic.NewExclusiveLock
	}

	var lock *restic.Lock
	var err error

	retrySleep := minDuration(retrySleepStart, retryLock)
	retryMessagePrinted := false
	retryTimeout := time.After(retryLock)

retryLoop:
	for {
		lock, err = lockFn(ctx, repo)
		if err != nil && restic.IsAlreadyLocked(err) {

			if !retryMessagePrinted {
				if !json {
					Verbosef("repo already locked, waiting up to %s for the lock\n", retryLock)
				}
				retryMessagePrinted = true
			}

			debug.Log("repo already locked, retrying in %v", retrySleep)
			retrySleepCh := time.After(retrySleep)

			select {
			case <-ctx.Done():
				return nil, ctx, ctx.Err()
			case <-retryTimeout:
				debug.Log("repo already locked, timeout expired")
				// Last lock attempt
				lock, err = lockFn(ctx, repo)
				break retryLoop
			case <-retrySleepCh:
				retrySleep = minDuration(retrySleep*2, retrySleepMax)
			}
		} else {
			// anything else, either a successful lock or another error
			break retryLoop
		}
	}
	if restic.IsInvalidLock(err) {
		return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
	}
	if err != nil {
		return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
	}
	debug.Log("create lock %p (exclusive %v)", lock, exclusive)

	ctx, cancel := context.WithCancel(ctx)
	lockInfo := &lockContext{
		lock:   lock,
		cancel: cancel,
	}
	lockInfo.refreshWG.Add(2)
	refreshChan := make(chan struct{})
	forceRefreshChan := make(chan refreshLockRequest)

	globalLocks.Lock()
	globalLocks.locks[lock] = lockInfo
	go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan)
	go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan)
	globalLocks.Unlock()

	return lock, ctx, err
}

var refreshInterval = 5 * time.Minute

// consider a lock refresh failed a bit before the lock actually becomes stale
// the difference allows to compensate for a small time drift between clients.
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2

type refreshLockRequest struct {
	result chan bool
}

func refreshLocks(ctx context.Context, backend restic.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest) {
	debug.Log("start")
	lock := lockInfo.lock
	ticker := time.NewTicker(refreshInterval)
	lastRefresh := lock.Time

	defer func() {
		ticker.Stop()
		// ensure that the context was cancelled before removing the lock
		lockInfo.cancel()

		// remove the lock from the repo
		debug.Log("unlocking repository with lock %v", lock)
		if err := lock.Unlock(); err != nil {
			debug.Log("error while unlocking: %v", err)
			Warnf("error while unlocking: %v", err)
		}

		lockInfo.refreshWG.Done()
	}()

	for {
		select {
		case <-ctx.Done():
			debug.Log("terminate")
			return

		case req := <-forceRefresh:
			debug.Log("trying to refresh stale lock")
			// keep on going if our current lock still exists
			success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel)
			// inform refresh goroutine about forced refresh
			select {
			case <-ctx.Done():
			case req.result <- success:
			}

			if success {
				// update lock refresh time
				lastRefresh = lock.Time
			}

		case <-ticker.C:
			if time.Since(lastRefresh) > refreshabilityTimeout {
				// the lock is too old, wait until the expiry monitor cancels the context
				continue
			}

			debug.Log("refreshing locks")
			err := lock.Refresh(context.TODO())
			if err != nil {
				Warnf("unable to refresh lock: %v\n", err)
			} else {
				lastRefresh = lock.Time
				// inform monitor goroutine about successful refresh
				select {
				case <-ctx.Done():
				case refreshed <- struct{}{}:
				}
			}
		}
	}
}

func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest) {
	// time.Now() might use a monotonic timer which is paused during standby
	// convert to unix time to ensure we compare real time values
	lastRefresh := time.Now().UnixNano()
	pollDuration := 1 * time.Second
	if refreshInterval < pollDuration {
		// require for TestLockFailedRefresh
		pollDuration = refreshInterval / 5
	}
	// timers are paused during standby, which is a problem as the refresh timeout
	// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
	// https://github.com/golang/go/issues/35012
	ticker := time.NewTicker(pollDuration)
	defer func() {
		ticker.Stop()
		lockInfo.cancel()
		lockInfo.refreshWG.Done()
	}()

	var refreshStaleLockResult chan bool

	for {
		select {
		case <-ctx.Done():
			debug.Log("terminate expiry monitoring")
			return
		case <-refreshed:
			if refreshStaleLockResult != nil {
				// ignore delayed refresh notifications while the stale lock is refreshed
				continue
			}
			lastRefresh = time.Now().UnixNano()
		case <-ticker.C:
			if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
				continue
			}

			debug.Log("trying to refreshStaleLock")
			// keep on going if our current lock still exists
			refreshReq := refreshLockRequest{
				result: make(chan bool),
			}
			refreshStaleLockResult = refreshReq.result

			// inform refresh goroutine about forced refresh
			select {
			case <-ctx.Done():
			case forceRefresh <- refreshReq:
			}
		case success := <-refreshStaleLockResult:
			if success {
				lastRefresh = time.Now().UnixNano()
				refreshStaleLockResult = nil
				continue
			}

			Warnf("Fatal: failed to refresh lock in time\n")
			return
		}
	}
}

func tryRefreshStaleLock(ctx context.Context, backend restic.Backend, lock *restic.Lock, cancel context.CancelFunc) bool {
	freeze := restic.AsBackend[restic.FreezeBackend](backend)
	if freeze != nil {
		debug.Log("freezing backend")
		freeze.Freeze()
		defer freeze.Unfreeze()
	}

	err := lock.RefreshStaleLock(ctx)
	if err != nil {
		Warnf("failed to refresh stale lock: %v\n", err)
		// cancel context while the backend is still frozen to prevent accidental modifications
		cancel()
		return false
	}

	return true
}

func unlockRepo(lock *restic.Lock) {
	if lock == nil {
		return
	}

	globalLocks.Lock()
	lockInfo, exists := globalLocks.locks[lock]
	delete(globalLocks.locks, lock)
	globalLocks.Unlock()

	if !exists {
		debug.Log("unable to find lock %v in the global list of locks, ignoring", lock)
		return
	}
	lockInfo.cancel()
	lockInfo.refreshWG.Wait()
}

func unlockAll(code int) (int, error) {
	globalLocks.Lock()
	locks := globalLocks.locks
	debug.Log("unlocking %d locks", len(globalLocks.locks))
	for _, lockInfo := range globalLocks.locks {
		lockInfo.cancel()
	}
	globalLocks.locks = make(map[*restic.Lock]*lockContext)
	globalLocks.Unlock()

	for _, lockInfo := range locks {
		lockInfo.refreshWG.Wait()
	}

	return code, nil
}

func init() {
	globalLocks.locks = make(map[*restic.Lock]*lockContext)
}