{-# LANGUAGE DataKinds #-}
module Data.Conduit.Process.Typed
(
createSink
, createSinkClose
, createSource
, withLoggedProcess_
, module System.Process.Typed
) where
import System.Process.Typed
import qualified System.Process.Typed as P
import Data.Conduit (ConduitM, (.|), runConduit)
import qualified Data.Conduit.Binary as CB
import Control.Monad.IO.Unlift
import qualified Data.ByteString as S
import qualified Data.Conduit.List as CL
import qualified Data.ByteString.Lazy as BL
import Data.IORef (IORef, newIORef, readIORef, modifyIORef)
import Control.Exception (throwIO, catch)
import Control.Concurrent.Async (concurrently)
import System.IO (hSetBuffering, BufferMode (NoBuffering), hClose)
createSink :: MonadIO m => StreamSpec 'STInput (ConduitM S.ByteString o m ())
createSink :: forall (m :: * -> *) o.
MonadIO m =>
StreamSpec 'STInput (ConduitM ByteString o m ())
createSink =
(\Handle
h -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
CB.sinkHandle Handle
h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
createSinkClose :: MonadIO m => StreamSpec 'STInput (ConduitM S.ByteString o m ())
createSinkClose :: forall (m :: * -> *) o.
MonadIO m =>
StreamSpec 'STInput (ConduitM ByteString o m ())
createSinkClose =
(\Handle
h -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
CB.sinkHandle Handle
h forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> IO ()
hClose Handle
h))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
createSource :: MonadIO m => StreamSpec 'STOutput (ConduitM i S.ByteString m ())
createSource :: forall (m :: * -> *) i.
MonadIO m =>
StreamSpec 'STOutput (ConduitM i ByteString m ())
createSource =
(\Handle
h -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
CB.sourceHandle Handle
h)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
createSourceLogged
:: MonadIO m
=> IORef ([S.ByteString] -> [S.ByteString])
-> StreamSpec 'STOutput (ConduitM i S.ByteString m ())
createSourceLogged :: forall (m :: * -> *) i.
MonadIO m =>
IORef ([ByteString] -> [ByteString])
-> StreamSpec 'STOutput (ConduitM i ByteString m ())
createSourceLogged IORef ([ByteString] -> [ByteString])
ref =
(\Handle
h ->
( forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
CB.sourceHandle Handle
h
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
CL.iterM (\ByteString
bs -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef ([ByteString] -> [ByteString])
ref (forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
bsforall a. a -> [a] -> [a]
:))))
)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (anyStreamType :: StreamType).
StreamSpec anyStreamType Handle
createPipe
withLoggedProcess_
:: MonadUnliftIO m
=> ProcessConfig stdin stdoutIgnored stderrIgnored
-> (Process stdin (ConduitM () S.ByteString m ()) (ConduitM () S.ByteString m ()) -> m a)
-> m a
withLoggedProcess_ :: forall (m :: * -> *) stdin stdoutIgnored stderrIgnored a.
MonadUnliftIO m =>
ProcessConfig stdin stdoutIgnored stderrIgnored
-> (Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
-> m a)
-> m a
withLoggedProcess_ ProcessConfig stdin stdoutIgnored stderrIgnored
pc Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
-> m a
inner = forall (m :: * -> *) a.
MonadUnliftIO m =>
(UnliftIO m -> IO a) -> m a
withUnliftIO forall a b. (a -> b) -> a -> b
$ \UnliftIO m
u -> do
IORef ([ByteString] -> [ByteString])
stdoutBuffer <- forall a. a -> IO (IORef a)
newIORef forall a. a -> a
id
IORef ([ByteString] -> [ByteString])
stderrBuffer <- forall a. a -> IO (IORef a)
newIORef forall a. a -> a
id
let pc' :: ProcessConfig
stdin (ConduitM i ByteString m ()) (ConduitM i ByteString m ())
pc' = forall stdout stdin stdout0 stderr.
StreamSpec 'STOutput stdout
-> ProcessConfig stdin stdout0 stderr
-> ProcessConfig stdin stdout stderr
setStdout (forall (m :: * -> *) i.
MonadIO m =>
IORef ([ByteString] -> [ByteString])
-> StreamSpec 'STOutput (ConduitM i ByteString m ())
createSourceLogged IORef ([ByteString] -> [ByteString])
stdoutBuffer)
forall a b. (a -> b) -> a -> b
$ forall stderr stdin stdout stderr0.
StreamSpec 'STOutput stderr
-> ProcessConfig stdin stdout stderr0
-> ProcessConfig stdin stdout stderr
setStderr (forall (m :: * -> *) i.
MonadIO m =>
IORef ([ByteString] -> [ByteString])
-> StreamSpec 'STOutput (ConduitM i ByteString m ())
createSourceLogged IORef ([ByteString] -> [ByteString])
stderrBuffer) ProcessConfig stdin stdoutIgnored stderrIgnored
pc
forall (m :: * -> *) stdin stdout stderr a.
MonadUnliftIO m =>
ProcessConfig stdin stdout stderr
-> (Process stdin stdout stderr -> m a) -> m a
P.withProcessWait forall {i} {i}.
ProcessConfig
stdin (ConduitM i ByteString m ()) (ConduitM i ByteString m ())
pc' forall a b. (a -> b) -> a -> b
$ \Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
p -> do
a
a <- forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u forall a b. (a -> b) -> a -> b
$ Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
-> m a
inner Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
p
let drain :: ConduitT () b m () -> IO ()
drain ConduitT () b m ()
src = forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () b m ()
src forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) i o. Monad m => ConduitT i o m ()
CL.sinkNull))
((), ()) <- forall {b}. ConduitT () b m () -> IO ()
drain (forall stdin stdout stderr. Process stdin stdout stderr -> stdout
getStdout Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
p) forall a b. IO a -> IO b -> IO (a, b)
`concurrently`
forall {b}. ConduitT () b m () -> IO ()
drain (forall stdin stdout stderr. Process stdin stdout stderr -> stderr
getStderr Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
p)
forall (m :: * -> *) stdin stdout stderr.
MonadIO m =>
Process stdin stdout stderr -> m ()
checkExitCode Process
stdin (ConduitM () ByteString m ()) (ConduitM () ByteString m ())
p forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \ExitCodeException
ece -> do
[ByteString] -> [ByteString]
stdout <- forall a. IORef a -> IO a
readIORef IORef ([ByteString] -> [ByteString])
stdoutBuffer
[ByteString] -> [ByteString]
stderr <- forall a. IORef a -> IO a
readIORef IORef ([ByteString] -> [ByteString])
stderrBuffer
forall e a. Exception e => e -> IO a
throwIO ExitCodeException
ece
{ eceStdout :: ByteString
eceStdout = [ByteString] -> ByteString
BL.fromChunks forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
stdout []
, eceStderr :: ByteString
eceStderr = [ByteString] -> ByteString
BL.fromChunks forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
stderr []
}
forall (m :: * -> *) a. Monad m => a -> m a
return a
a