{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-}

#if MIN_VERSION_monad_control(0,3,0)
{-# LANGUAGE FlexibleContexts #-}
#endif

#if !MIN_VERSION_base(4,3,0)
{-# LANGUAGE RankNTypes #-}
#endif

-- |
-- Module:      Data.Pool
-- Copyright:   (c) 2011 MailRank, Inc.
-- License:     BSD3
-- Maintainer:  Bryan O'Sullivan <bos@serpentine.com>,
--              Bas van Dijk <v.dijk.bas@gmail.com>
-- Stability:   experimental
-- Portability: portable
--
-- A high-performance striped pooling abstraction for managing
-- flexibly-sized collections of resources such as database
-- connections.
--
-- \"Striped\" means that a single 'Pool' consists of several
-- sub-pools, each managed independently.  A single stripe is fine for
-- many applications, and probably what you should choose by default.
-- More stripes will lead to reduced contention in high-performance
-- multicore applications, at a trade-off of causing the maximum
-- number of simultaneous resources in use to grow.
module Data.Pool
    (
      Pool(idleTime, maxResources, numStripes)
    , LocalPool
    , createPool
    , withResource
    , takeResource
    , tryWithResource
    , tryTakeResource
    , destroyResource
    , putResource
    , destroyAllResources
    ) where

import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import GHC.Conc.Sync (labelThread)
import qualified Control.Exception as E
import qualified Data.Vector as V

#if MIN_VERSION_monad_control(0,3,0)
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Control.Monad.Base (liftBase)
#else
import Control.Monad.IO.Control (MonadControlIO, controlIO)
import Control.Monad.IO.Class (liftIO)
#define control controlIO
#define liftBase liftIO
#endif

#if MIN_VERSION_base(4,3,0)
import Control.Exception (mask)
#else
-- Don't do any async exception protection for older GHCs.
mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b
mask f = f id
#endif

-- | A single resource pool entry.
data Entry a = Entry {
      forall a. Entry a -> a
entry :: a
    , forall a. Entry a -> UTCTime
lastUse :: UTCTime
    -- ^ Time of last return.
    }

-- | A single striped pool.
data LocalPool a = LocalPool {
      forall a. LocalPool a -> TVar Int
inUse :: TVar Int
    -- ^ Count of open entries (both idle and in use).
    , forall a. LocalPool a -> TVar [Entry a]
entries :: TVar [Entry a]
    -- ^ Idle entries.
    , forall a. LocalPool a -> IORef ()
lfin :: IORef ()
    -- ^ empty value used to attach a finalizer to (internal)
    } deriving (Typeable)

data Pool a = Pool {
      forall a. Pool a -> IO a
create :: IO a
    -- ^ Action for creating a new entry to add to the pool.
    , forall a. Pool a -> a -> IO ()
destroy :: a -> IO ()
    -- ^ Action for destroying an entry that is now done with.
    , forall a. Pool a -> Int
numStripes :: Int
    -- ^ The number of stripes (distinct sub-pools) to maintain.
    -- The smallest acceptable value is 1.
    , forall a. Pool a -> NominalDiffTime
idleTime :: NominalDiffTime
    -- ^ Amount of time for which an unused resource is kept alive.
    -- The smallest acceptable value is 0.5 seconds.
    --
    -- The elapsed time before closing may be a little longer than
    -- requested, as the reaper thread wakes at 1-second intervals.
    , forall a. Pool a -> Int
maxResources :: Int
    -- ^ Maximum number of resources to maintain per stripe.  The
    -- smallest acceptable value is 1.
    --
    -- Requests for resources will block if this limit is reached on a
    -- single stripe, even if other stripes have idle resources
    -- available.
    , forall a. Pool a -> Vector (LocalPool a)
localPools :: V.Vector (LocalPool a)
    -- ^ Per-capability resource pools.
    , forall a. Pool a -> IORef ()
fin :: IORef ()
    -- ^ empty value used to attach a finalizer to (internal)
    } deriving (Typeable)

instance Show (Pool a) where
    show :: Pool a -> String
show Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = String
"Pool {numStripes = " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Int
numStripes forall a. [a] -> [a] -> [a]
++ String
", " forall a. [a] -> [a] -> [a]
++
                    String
"idleTime = " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show NominalDiffTime
idleTime forall a. [a] -> [a] -> [a]
++ String
", " forall a. [a] -> [a] -> [a]
++
                    String
"maxResources = " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Int
maxResources forall a. [a] -> [a] -> [a]
++ String
"}"

-- | Create a striped resource pool.
--
-- Although the garbage collector will destroy all idle resources when
-- the pool is garbage collected it's recommended to manually
-- 'destroyAllResources' when you're done with the pool so that the
-- resources are freed up as soon as possible.
createPool
    :: IO a
    -- ^ Action that creates a new resource.
    -> (a -> IO ())
    -- ^ Action that destroys an existing resource.
    -> Int
    -- ^ The number of stripes (distinct sub-pools) to maintain.
    -- The smallest acceptable value is 1.
    -> NominalDiffTime
    -- ^ Amount of time for which an unused resource is kept open.
    -- The smallest acceptable value is 0.5 seconds.
    --
    -- The elapsed time before destroying a resource may be a little
    -- longer than requested, as the reaper thread wakes at 1-second
    -- intervals.
    -> Int
    -- ^ Maximum number of resources to keep open per stripe.  The
    -- smallest acceptable value is 1.
    --
    -- Requests for resources will block if this limit is reached on a
    -- single stripe, even if other stripes have idle resources
    -- available.
     -> IO (Pool a)
createPool :: forall a.
IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool IO a
create a -> IO ()
destroy Int
numStripes NominalDiffTime
idleTime Int
maxResources = do
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
numStripes forall a. Ord a => a -> a -> Bool
< Int
1) forall a b. (a -> b) -> a -> b
$
    forall a. String -> String -> a
modError String
"pool " forall a b. (a -> b) -> a -> b
$ String
"invalid stripe count " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Int
numStripes
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
idleTime forall a. Ord a => a -> a -> Bool
< NominalDiffTime
0.5) forall a b. (a -> b) -> a -> b
$
    forall a. String -> String -> a
modError String
"pool " forall a b. (a -> b) -> a -> b
$ String
"invalid idle time " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show NominalDiffTime
idleTime
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxResources forall a. Ord a => a -> a -> Bool
< Int
1) forall a b. (a -> b) -> a -> b
$
    forall a. String -> String -> a
modError String
"pool " forall a b. (a -> b) -> a -> b
$ String
"invalid maximum resource count " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show Int
maxResources
  Vector (LocalPool a)
localPools <- forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
numStripes forall a b. (a -> b) -> a -> b
$
                forall (m :: * -> *) a1 a2 a3 r.
Monad m =>
(a1 -> a2 -> a3 -> r) -> m a1 -> m a2 -> m a3 -> m r
liftM3 forall a. TVar Int -> TVar [Entry a] -> IORef () -> LocalPool a
LocalPool (forall a. a -> IO (TVar a)
newTVarIO Int
0) (forall a. a -> IO (TVar a)
newTVarIO []) (forall a. a -> IO (IORef a)
newIORef ())
  ThreadId
reaperId <- String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask String
"resource-pool: reaper" forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
                forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ forall a.
(a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
localPools
  IORef ()
fin <- forall a. a -> IO (IORef a)
newIORef ()
  let p :: Pool a
p = Pool {
            IO a
create :: IO a
create :: IO a
create
          , a -> IO ()
destroy :: a -> IO ()
destroy :: a -> IO ()
destroy
          , Int
numStripes :: Int
numStripes :: Int
numStripes
          , NominalDiffTime
idleTime :: NominalDiffTime
idleTime :: NominalDiffTime
idleTime
          , Int
maxResources :: Int
maxResources :: Int
maxResources
          , Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools :: Vector (LocalPool a)
localPools
          , IORef ()
fin :: IORef ()
fin :: IORef ()
fin
          }
  forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
fin (ThreadId -> IO ()
killThread ThreadId
reaperId) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    forall (m :: * -> *) a b. Monad m => (a -> m b) -> Vector a -> m ()
V.mapM_ (\LocalPool a
lp -> forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef (forall a. LocalPool a -> IORef ()
lfin LocalPool a
lp) (forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool a
lp)) Vector (LocalPool a)
localPools
  forall (m :: * -> *) a. Monad m => a -> m a
return Pool a
p

-- TODO: Propose 'forkIOLabeledWithUnmask' for the base library.

-- | Sparks off a new thread using 'forkIOWithUnmask' to run the given
-- IO computation, but first labels the thread with the given label
-- (using 'labelThread').
--
-- The implementation makes sure that asynchronous exceptions are
-- masked until the given computation is executed. This ensures the
-- thread will always be labeled which guarantees you can always
-- easily find it in the GHC event log.
--
-- Like 'forkIOWithUnmask', the given computation is given a function
-- to unmask asynchronous exceptions. See the documentation of that
-- function for the motivation of this.
--
-- Returns the 'ThreadId' of the newly created thread.
forkIOLabeledWithUnmask :: String
                        -> ((forall a. IO a -> IO a) -> IO ())
                        -> IO ThreadId
forkIOLabeledWithUnmask :: String -> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOLabeledWithUnmask String
label (forall a. IO a -> IO a) -> IO ()
m = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> do
                                    ThreadId
tid <- IO ThreadId
myThreadId
                                    ThreadId -> String -> IO ()
labelThread ThreadId
tid String
label
                                    (forall a. IO a -> IO a) -> IO ()
m forall a. IO a -> IO a
unmask

-- | Periodically go through all pools, closing any resources that
-- have been left idle for too long.
reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper :: forall a.
(a -> IO ()) -> NominalDiffTime -> Vector (LocalPool a) -> IO ()
reaper a -> IO ()
destroy NominalDiffTime
idleTime Vector (LocalPool a)
pools = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
  Int -> IO ()
threadDelay (Int
1 forall a. Num a => a -> a -> a
* Int
1000000)
  UTCTime
now <- IO UTCTime
getCurrentTime
  let isStale :: Entry a -> Bool
isStale Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..} = UTCTime
now UTCTime -> UTCTime -> NominalDiffTime
`diffUTCTime` UTCTime
lastUse forall a. Ord a => a -> a -> Bool
> NominalDiffTime
idleTime
  forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
pools forall a b. (a -> b) -> a -> b
$ \LocalPool{TVar Int
TVar [Entry a]
IORef ()
lfin :: IORef ()
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} -> do
    [a]
resources <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      ([Entry a]
stale,[Entry a]
fresh) <- forall a. (a -> Bool) -> [a] -> ([a], [a])
partition forall {a}. Entry a -> Bool
isStale forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Entry a]
stale) forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
fresh
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (forall a. Num a => a -> a -> a
subtract (forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
stale))
      forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a. Entry a -> a
entry [Entry a]
stale)
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources forall a b. (a -> b) -> a -> b
$ \a
resource -> do
      a -> IO ()
destroy a
resource forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Destroy all idle resources of the given 'LocalPool' and remove them from
-- the pool.
purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool :: forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy LocalPool{TVar Int
TVar [Entry a]
IORef ()
lfin :: IORef ()
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} = do
  [a]
resources <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
idle <- forall a. TVar a -> a -> STM a
swapTVar TVar [Entry a]
entries []
    forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (forall a. Num a => a -> a -> a
subtract (forall (t :: * -> *) a. Foldable t => t a -> Int
length [Entry a]
idle))
    forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a. Entry a -> a
entry [Entry a]
idle)
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [a]
resources forall a b. (a -> b) -> a -> b
$ \a
resource ->
    a -> IO ()
destroy a
resource forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Temporarily take a resource from a 'Pool', perform an action with
-- it, and return it to the pool afterwards.
--
-- * If the pool has an idle resource available, it is used
--   immediately.
--
-- * Otherwise, if the maximum number of resources has not yet been
--   reached, a new resource is created and used.
--
-- * If the maximum number of resources has been reached, this
--   function blocks until a resource becomes available.
--
-- If the action throws an exception of any type, the resource is
-- destroyed, and not returned to the pool.
--
-- It probably goes without saying that you should never manually
-- destroy a pooled resource, as doing so will almost certainly cause
-- a subsequent user (who expects the resource to be valid) to throw
-- an exception.
withResource ::
#if MIN_VERSION_monad_control(0,3,0)
    (MonadBaseControl IO m)
#else
    (MonadControlIO m)
#endif
  => Pool a -> (a -> m b) -> m b
{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-}
withResource :: forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool a
pool a -> m b
act = forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO -> forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
  (a
resource, LocalPool a
local) <- forall a. Pool a -> IO (a, LocalPool a)
takeResource Pool a
pool
  StM m b
ret <- forall a. IO a -> IO a
restore (RunInBase m IO
runInIO (a -> m b
act a
resource)) forall a b. IO a -> IO b -> IO a
`onException`
            forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
  forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
  forall (m :: * -> *) a. Monad m => a -> m a
return StM m b
ret
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE withResource #-}
#endif

-- | Take a resource from the pool, following the same results as
-- 'withResource'. Note that this function should be used with caution, as
-- improper exception handling can lead to leaked resources.
--
-- This function returns both a resource and the @LocalPool@ it came from so
-- that it may either be destroyed (via 'destroyResource') or returned to the
-- pool (via 'putResource').
takeResource :: Pool a -> IO (a, LocalPool a)
takeResource :: forall a. Pool a -> IO (a, LocalPool a)
takeResource pool :: Pool a
pool@Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
  local :: LocalPool a
local@LocalPool{TVar Int
TVar [Entry a]
IORef ()
lfin :: IORef ()
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} <- forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
  a
resource <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
ents <- forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
    case [Entry a]
ents of
      (Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..}:[Entry a]
es) -> forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
entry)
      [] -> do
        Int
used <- forall a. TVar a -> STM a
readTVar TVar Int
inUse
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
used forall a. Eq a => a -> a -> Bool
== Int
maxResources) forall a. STM a
retry
        forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse forall a b. (a -> b) -> a -> b
$! Int
used forall a. Num a => a -> a -> a
+ Int
1
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
          IO a
create forall a b. IO a -> IO b -> IO a
`onException` forall a. STM a -> IO a
atomically (forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (forall a. Num a => a -> a -> a
subtract Int
1))
  forall (m :: * -> *) a. Monad m => a -> m a
return (a
resource, LocalPool a
local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
#endif

-- | Similar to 'withResource', but only performs the action if a resource could
-- be taken from the pool /without blocking/. Otherwise, 'tryWithResource'
-- returns immediately with 'Nothing' (ie. the action function is /not/ called).
-- Conversely, if a resource can be borrowed from the pool without blocking, the
-- action is performed and it's result is returned, wrapped in a 'Just'.
tryWithResource :: forall m a b.
#if MIN_VERSION_monad_control(0,3,0)
    (MonadBaseControl IO m)
#else
    (MonadControlIO m)
#endif
  => Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource :: forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m (Maybe b)
tryWithResource Pool a
pool a -> m b
act = forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO -> forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
  Maybe (a, LocalPool a)
res <- forall a. Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource Pool a
pool
  case Maybe (a, LocalPool a)
res of
    Just (a
resource, LocalPool a
local) -> do
      StM m (Maybe b)
ret <- forall a. IO a -> IO a
restore (RunInBase m IO
runInIO (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m b
act a
resource)) forall a b. IO a -> IO b -> IO a
`onException`
                forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool a
pool LocalPool a
local a
resource
      forall a. LocalPool a -> a -> IO ()
putResource LocalPool a
local a
resource
      forall (m :: * -> *) a. Monad m => a -> m a
return StM m (Maybe b)
ret
    Maybe (a, LocalPool a)
Nothing -> forall a. IO a -> IO a
restore forall b c a. (b -> c) -> (a -> b) -> a -> c
. RunInBase m IO
runInIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Maybe a
Nothing :: Maybe b)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryWithResource #-}
#endif

-- | A non-blocking version of 'takeResource'. The 'tryTakeResource' function
-- returns immediately, with 'Nothing' if the pool is exhausted, or @'Just' (a,
-- 'LocalPool' a)@ if a resource could be borrowed from the pool successfully.
tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource :: forall a. Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource pool :: Pool a
pool@Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
  local :: LocalPool a
local@LocalPool{TVar Int
TVar [Entry a]
IORef ()
lfin :: IORef ()
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} <- forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool a
pool
  Maybe a
resource <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    [Entry a]
ents <- forall a. TVar a -> STM a
readTVar TVar [Entry a]
entries
    case [Entry a]
ents of
      (Entry{a
UTCTime
lastUse :: UTCTime
entry :: a
lastUse :: forall a. Entry a -> UTCTime
entry :: forall a. Entry a -> a
..}:[Entry a]
es) -> forall a. TVar a -> a -> STM ()
writeTVar TVar [Entry a]
entries [Entry a]
es forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ a
entry)
      [] -> do
        Int
used <- forall a. TVar a -> STM a
readTVar TVar Int
inUse
        if Int
used forall a. Eq a => a -> a -> Bool
== Int
maxResources
          then forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing)
          else do
            forall a. TVar a -> a -> STM ()
writeTVar TVar Int
inUse forall a b. (a -> b) -> a -> b
$! Int
used forall a. Num a => a -> a -> a
+ Int
1
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
              IO a
create forall a b. IO a -> IO b -> IO a
`onException` forall a. STM a -> IO a
atomically (forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (forall a. Num a => a -> a -> a
subtract Int
1))
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,) LocalPool a
local) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
resource
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE tryTakeResource #-}
#endif

-- | Get a (Thread-)'LocalPool'
--
-- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource'
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool :: forall a. Pool a -> IO (LocalPool a)
getLocalPool Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = do
  Int
i <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ ((forall a. Integral a => a -> a -> a
`mod` Int
numStripes) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Hashable a => a -> Int
hash) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Vector (LocalPool a)
localPools forall a. Vector a -> Int -> a
V.! Int
i
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE getLocalPool #-}
#endif

-- | Destroy a resource. Note that this will ignore any exceptions in the
-- destroy function.
destroyResource :: Pool a -> LocalPool a -> a -> IO ()
destroyResource :: forall a. Pool a -> LocalPool a -> a -> IO ()
destroyResource Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} LocalPool{TVar Int
TVar [Entry a]
IORef ()
lfin :: IORef ()
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} a
resource = do
   a -> IO ()
destroy a
resource forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch` \(SomeException
_::SomeException) -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
   forall a. STM a -> IO a
atomically (forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar Int
inUse (forall a. Num a => a -> a -> a
subtract Int
1))
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE destroyResource #-}
#endif

-- | Return a resource to the given 'LocalPool'.
putResource :: LocalPool a -> a -> IO ()
putResource :: forall a. LocalPool a -> a -> IO ()
putResource LocalPool{TVar Int
TVar [Entry a]
IORef ()
lfin :: IORef ()
entries :: TVar [Entry a]
inUse :: TVar Int
lfin :: forall a. LocalPool a -> IORef ()
entries :: forall a. LocalPool a -> TVar [Entry a]
inUse :: forall a. LocalPool a -> TVar Int
..} a
resource = do
    UTCTime
now <- IO UTCTime
getCurrentTime
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar [Entry a]
entries (forall a. a -> UTCTime -> Entry a
Entry a
resource UTCTime
nowforall a. a -> [a] -> [a]
:)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif

-- | Destroy all resources in all stripes in the pool. Note that this
-- will ignore any exceptions in the destroy function.
--
-- This function is useful when you detect that all resources in the
-- pool are broken. For example after a database has been restarted
-- all connections opened before the restart will be broken. In that
-- case it's better to close those connections so that 'takeResource'
-- won't take a broken connection from the pool but will open a new
-- connection instead.
--
-- Another use-case for this function is that when you know you are
-- done with the pool you can destroy all idle resources immediately
-- instead of waiting on the garbage collector to destroy them, thus
-- freeing up those resources sooner.
destroyAllResources :: Pool a -> IO ()
destroyAllResources :: forall a. Pool a -> IO ()
destroyAllResources Pool{Int
IO a
IORef ()
NominalDiffTime
Vector (LocalPool a)
a -> IO ()
fin :: IORef ()
localPools :: Vector (LocalPool a)
maxResources :: Int
idleTime :: NominalDiffTime
numStripes :: Int
destroy :: a -> IO ()
create :: IO a
fin :: forall a. Pool a -> IORef ()
localPools :: forall a. Pool a -> Vector (LocalPool a)
destroy :: forall a. Pool a -> a -> IO ()
create :: forall a. Pool a -> IO a
numStripes :: forall a. Pool a -> Int
maxResources :: forall a. Pool a -> Int
idleTime :: forall a. Pool a -> NominalDiffTime
..} = forall (m :: * -> *) a b. Monad m => Vector a -> (a -> m b) -> m ()
V.forM_ Vector (LocalPool a)
localPools forall a b. (a -> b) -> a -> b
$ forall a. (a -> IO ()) -> LocalPool a -> IO ()
purgeLocalPool a -> IO ()
destroy

modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ :: forall a. TVar a -> (a -> a) -> STM ()
modifyTVar_ TVar a
v a -> a
f = forall a. TVar a -> STM a
readTVar TVar a
v forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> forall a. TVar a -> a -> STM ()
writeTVar TVar a
v forall a b. (a -> b) -> a -> b
$! a -> a
f a
a

modError :: String -> String -> a
modError :: forall a. String -> String -> a
modError String
func String
msg =
    forall a. HasCallStack => String -> a
error forall a b. (a -> b) -> a -> b
$ String
"Data.Pool." forall a. [a] -> [a] -> [a]
++ String
func forall a. [a] -> [a] -> [a]
++ String
": " forall a. [a] -> [a] -> [a]
++ String
msg