{- A parallel map This sets up an input channel, an output channel, and 'p' worker threads that read from the input, applies a function 'f', and pushes the result to the output channel. A 'feeder' reads from a list and pushes it into the input, and a 'reader' reads the output to create the result list. When the input is exhausted, the feeder pushes a 'Nothing' for each of the workers, and the reader terminates when everything else has finished. (A TVar keeps track of the number of running workers and the feeder.) -} {-# OPTIONS -cpp #-} #define DEBUG -- module PMap where import System.IO.Unsafe import Control.Monad import Control.Concurrent import Control.Concurrent.STM pmap p f = unsafePerformIO . pmapM p f pmapM p f (x:xs) = do inp <- newTChanIO out <- newTChanIO ws <- newTVarIO 1 -- already count the feeder mapM (forkIO . worker ws f inp out) [1..p] forkIO $ feeder ws inp (x:xs) reader ws out reader :: TVar Int -> TChan a -> IO [a] reader c t = do mi <- atomically (do count <- readTVar c e <- isEmptyTChan t if e && count == 0 then return Nothing else do i <- readTChan t return (Just i)) DEBUG atomically (readTVar c) >>= \x -> putStrLn ("reading...("++show x++" threads)") case mi of Nothing -> DEBUG putStrLn "stopped reader" >> return [] Just i -> do rest <- unsafeInterleaveIO $ reader c t return (i : rest) worker :: Show a => TVar Int -> (a->b) -> TChan (Maybe a) -> TChan b -> Int -> IO () worker count f i o no = let work = do inp <- atomically (readTChan i) DEBUG putStrLn ("worker "++show no++" just read "++show inp) threadDelay 1000 case inp of Nothing -> atomically (readTVar count >>= \ct -> return (ct-1) >>= writeTVar count >> return ct) DEBUG >>= \ct -> putStrLn ("stopped worker "++show no++" ("++show ct++" remaining)") >> return () Just x -> atomically (writeTChan o $ f x) >> work in do c <- atomically (readTVar count >>= \ct -> return (ct+1) >>= writeTVar count >> return (ct+1)) DEBUG putStrLn ("started worker no "++show no++" of "++show c++".") work feeder :: Show a => TVar Int -> TChan (Maybe a) -> [a] -> IO () feeder c t (x:xs) = DEBUG putStrLn ("feeding: "++show x) >> atomically (writeTChan t (Just x)) >> feeder c t xs feeder c t [] = let close = atomically (do ct <- readTVar c sequence_ (map (writeTChan t) (replicate ct Nothing))) in atomically (readTVar c >>= \ct -> return (subtract 1 ct) >>= writeTVar c >> return ct ) DEBUG >>= \ct -> putStrLn ("feeder closing, remaining threads: "++show ct) >> close