diff options
author | Joey Hess <joeyh@joeyh.name> | 2019-06-05 17:54:35 -0400 |
---|---|---|
committer | Joey Hess <joeyh@joeyh.name> | 2019-06-05 17:54:35 -0400 |
commit | 659640e22493cc39a2351a4d0d7ac575b0ff3b7a (patch) | |
tree | 9ff9d7d0f09b4dd972c68cd63301fcefbf2aa730 /Types/WorkerPool.hs | |
parent | c04b2af3e1a8316e7cf640046ad0aa68826650ed (diff) |
separate queue for cleanup actions
When running multiple concurrent actions, the cleanup phase is run in a
separate queue than the main action queue. This can make some commands
faster, because less time is spent on bookkeeping in between each file
transfer.
But as far as I can see, nothing will be sped up much by this yet, because
all the existing cleanup actions are very light-weight. This is just groundwork
for deferring checksum verification to cleanup time.
This change does mean that if the user expects -J2 will mean that they see no
more than 2 jobs running at a time, they may be surprised to see 4 in some
cases (if the cleanup actions are slow enough to notice).
It might also make sense to enable background cleanup without the -J,
for at least one cleanup action. Indeed, that's the behavior that -J1
has now. At some point in the future, it make make sense to make the
behavior with no -J the same as -J1. The only reason it's not currently
is that git-annex can build w/o concurrent-output, and also any bugs
in concurrent-output (such as perhaps misbehaving on non-VT100 compatible
terminals) are avoided by default by only using it when -J is used.
Diffstat (limited to 'Types/WorkerPool.hs')
-rw-r--r-- | Types/WorkerPool.hs | 53 |
1 files changed, 46 insertions, 7 deletions
diff --git a/Types/WorkerPool.hs b/Types/WorkerPool.hs index acc11c8843..03a95b47ba 100644 --- a/Types/WorkerPool.hs +++ b/Types/WorkerPool.hs @@ -7,8 +7,8 @@ module Types.WorkerPool where +import Control.Concurrent import Control.Concurrent.Async -import Data.Either -- | Pool of worker threads. data WorkerPool t @@ -16,15 +16,54 @@ data WorkerPool t | WorkerPool [Worker t] -- | A worker can either be idle or running an Async action. -type Worker t = Either t (Async t) +-- And it is used for some stage. +data Worker t + = IdleWorker t WorkerStage + | ActiveWorker (Async t) WorkerStage +-- | These correspond to CommandPerform and CommandCleanup. +data WorkerStage = PerformStage | CleanupStage + deriving (Eq) + +workerStage :: Worker t -> WorkerStage +workerStage (IdleWorker _ s) = s +workerStage (ActiveWorker _ s) = s + +workerAsync :: Worker t -> Maybe (Async t) +workerAsync (IdleWorker _ _) = Nothing +workerAsync (ActiveWorker aid _) = Just aid + +-- | Allocates a WorkerPool that has the specified number of workers +-- in it, of each stage. +-- +-- The stages are distributed evenly throughout. allocateWorkerPool :: t -> Int -> WorkerPool t -allocateWorkerPool t n = WorkerPool $ replicate n (Left t) +allocateWorkerPool t n = WorkerPool $ take (n+n) $ + map (uncurry IdleWorker) $ zip (repeat t) stages + where + stages = concat $ repeat [PerformStage, CleanupStage] -addWorkerPool :: WorkerPool t -> Worker t -> WorkerPool t -addWorkerPool (WorkerPool l) w = WorkerPool (w:l) -addWorkerPool UnallocatedWorkerPool w = WorkerPool [w] +addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t +addWorkerPool w (WorkerPool l) = WorkerPool (w:l) +addWorkerPool w UnallocatedWorkerPool = WorkerPool [w] idleWorkers :: WorkerPool t -> [t] idleWorkers UnallocatedWorkerPool = [] -idleWorkers (WorkerPool l) = lefts l +idleWorkers (WorkerPool l) = go l + where + go [] = [] + go (IdleWorker t _ : rest) = t : go rest + go (ActiveWorker _ _ : rest) = go rest + +-- | Removes a worker from the pool whose Async uses the ThreadId. +-- +-- Each Async has its own ThreadId, so this stops once it finds +-- a match. +removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t) +removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing +removeThreadIdWorkerPool tid (WorkerPool l) = go [] l + where + go _ [] = Nothing + go c (ActiveWorker a stage : rest) + | asyncThreadId a == tid = Just ((a, stage), WorkerPool (c++rest)) + go c (v : rest) = go (v:c) rest |