summaryrefslogtreecommitdiff
path: root/Types/WorkerPool.hs
diff options
context:
space:
mode:
authorJoey Hess <joeyh@joeyh.name>2019-06-05 17:54:35 -0400
committerJoey Hess <joeyh@joeyh.name>2019-06-05 17:54:35 -0400
commit659640e22493cc39a2351a4d0d7ac575b0ff3b7a (patch)
tree9ff9d7d0f09b4dd972c68cd63301fcefbf2aa730 /Types/WorkerPool.hs
parentc04b2af3e1a8316e7cf640046ad0aa68826650ed (diff)
separate queue for cleanup actions
When running multiple concurrent actions, the cleanup phase is run in a separate queue than the main action queue. This can make some commands faster, because less time is spent on bookkeeping in between each file transfer. But as far as I can see, nothing will be sped up much by this yet, because all the existing cleanup actions are very light-weight. This is just groundwork for deferring checksum verification to cleanup time. This change does mean that if the user expects -J2 will mean that they see no more than 2 jobs running at a time, they may be surprised to see 4 in some cases (if the cleanup actions are slow enough to notice). It might also make sense to enable background cleanup without the -J, for at least one cleanup action. Indeed, that's the behavior that -J1 has now. At some point in the future, it make make sense to make the behavior with no -J the same as -J1. The only reason it's not currently is that git-annex can build w/o concurrent-output, and also any bugs in concurrent-output (such as perhaps misbehaving on non-VT100 compatible terminals) are avoided by default by only using it when -J is used.
Diffstat (limited to 'Types/WorkerPool.hs')
-rw-r--r--Types/WorkerPool.hs53
1 files changed, 46 insertions, 7 deletions
diff --git a/Types/WorkerPool.hs b/Types/WorkerPool.hs
index acc11c8843..03a95b47ba 100644
--- a/Types/WorkerPool.hs
+++ b/Types/WorkerPool.hs
@@ -7,8 +7,8 @@
module Types.WorkerPool where
+import Control.Concurrent
import Control.Concurrent.Async
-import Data.Either
-- | Pool of worker threads.
data WorkerPool t
@@ -16,15 +16,54 @@ data WorkerPool t
| WorkerPool [Worker t]
-- | A worker can either be idle or running an Async action.
-type Worker t = Either t (Async t)
+-- And it is used for some stage.
+data Worker t
+ = IdleWorker t WorkerStage
+ | ActiveWorker (Async t) WorkerStage
+-- | These correspond to CommandPerform and CommandCleanup.
+data WorkerStage = PerformStage | CleanupStage
+ deriving (Eq)
+
+workerStage :: Worker t -> WorkerStage
+workerStage (IdleWorker _ s) = s
+workerStage (ActiveWorker _ s) = s
+
+workerAsync :: Worker t -> Maybe (Async t)
+workerAsync (IdleWorker _ _) = Nothing
+workerAsync (ActiveWorker aid _) = Just aid
+
+-- | Allocates a WorkerPool that has the specified number of workers
+-- in it, of each stage.
+--
+-- The stages are distributed evenly throughout.
allocateWorkerPool :: t -> Int -> WorkerPool t
-allocateWorkerPool t n = WorkerPool $ replicate n (Left t)
+allocateWorkerPool t n = WorkerPool $ take (n+n) $
+ map (uncurry IdleWorker) $ zip (repeat t) stages
+ where
+ stages = concat $ repeat [PerformStage, CleanupStage]
-addWorkerPool :: WorkerPool t -> Worker t -> WorkerPool t
-addWorkerPool (WorkerPool l) w = WorkerPool (w:l)
-addWorkerPool UnallocatedWorkerPool w = WorkerPool [w]
+addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t
+addWorkerPool w (WorkerPool l) = WorkerPool (w:l)
+addWorkerPool w UnallocatedWorkerPool = WorkerPool [w]
idleWorkers :: WorkerPool t -> [t]
idleWorkers UnallocatedWorkerPool = []
-idleWorkers (WorkerPool l) = lefts l
+idleWorkers (WorkerPool l) = go l
+ where
+ go [] = []
+ go (IdleWorker t _ : rest) = t : go rest
+ go (ActiveWorker _ _ : rest) = go rest
+
+-- | Removes a worker from the pool whose Async uses the ThreadId.
+--
+-- Each Async has its own ThreadId, so this stops once it finds
+-- a match.
+removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
+removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing
+removeThreadIdWorkerPool tid (WorkerPool l) = go [] l
+ where
+ go _ [] = Nothing
+ go c (ActiveWorker a stage : rest)
+ | asyncThreadId a == tid = Just ((a, stage), WorkerPool (c++rest))
+ go c (v : rest) = go (v:c) rest