首页 > 解决方案 > How To Test Web Sockets In Haskell

问题描述

Suggested approach to test WebSockets with haskell?

I'm working on a websockets server and I have been using the websockets library and haven't had any problem's with implementing my server. The problem's I've had are related to the testing of my server.

I would like to open up a bidirectional communication channel that exists inside of each of my ClientApp's that I spawn for testing. This allows me to easily control things that the Client sends to the server and retrieve the responses for use in my hspec tests. My first implementation used MVar's and worked fine but a Chan or TChan is really a more suitable structure for this purpose.

I am having difficulty with the TChan implementation and I believe that it has something to do with how I am implementing the STM transactions which causes the program to halt. I unfortunately havent been able to figure this issue out by doing simple print debugging.

I've created a pseudo unit test that implements somewhat similar logic and it works fine in that test (which should obviously mean it's a bad test).

Below is a simple echo server to reproduce the problem as well as the tests both MVar and TChan based.

I was hoping that i could get some help on what would be a good approach to this I am sure the way i am going about it is naive. Also any suggestions would be appreciated. All of the runIO and the blocking code seems like I should be using the async library for a cleaner interface.

The program can be run with ghcid and it will kill and restart the server between reloads.

ghcid command:

ghcpid=$(pgrep ghc)
if [ -z "$ghcpid" ]
killall -9 ghc
then 
    echo "No GHC process Found"
    ghcid  --command "stack ghci HaskSockets:lib HaskSockets:HaskSockets-test --ghci-options=-fobject-code" --test "Spec.main"
else
    echo $ghcpid
    kill $ghcpid
    ghcid  --command "stack ghci HaskSockets:lib HaskSockets:HaskSockets-test --ghci-options=-fobject-code" --test "Spec.main"
fi

module DevelMain
    ( main
    , Greeting(..)
    , Message(..)
    ) where

import Data.Text (Text)
import Control.Exception (finally)
import Control.Monad (forM_, forever)
import Control.Concurrent (MVar, newMVar, modifyMVar_, modifyMVar, readMVar, takeMVar, ThreadId(..), putMVar)
import qualified Data.Text as T
import qualified Data.Text.IO as T
import qualified Network.WebSockets as WS
import qualified Data.HashMap.Strict as HM 
import Control.Concurrent.Async
import Data.Aeson

type Client = (Text, WS.Connection)
type ServerState = [Client]

data Greeting = Greeting
  { greeting :: Text
  } deriving (Eq, Show)

instance FromJSON Greeting where
  parseJSON (Object o) = do
   g <- o .: "greeting"
   return $ Greeting g

instance ToJSON Greeting where 
  toJSON (Greeting g) = object 
    [ "greeting" .= g
    ]

data Message = Message
  { message :: Text
  } deriving (Eq, Show)

instance FromJSON Message where
  parseJSON (Object o) = do
   m <- o .: "message"
   return $ Message m

instance ToJSON Message where 
  toJSON (Message m) = object 
    [ "message" .= m
    ]

newServerState :: ServerState
newServerState = []

numClients :: ServerState -> Int
numClients = length

clientExists :: Client -> ServerState -> Bool
clientExists client = any ((== fst client) . fst)

addClient :: Client -> ServerState -> ServerState
addClient client clients = client : clients

removeClient :: Client -> ServerState -> ServerState
removeClient client = filter ((/= fst client) . fst)

broadcast :: Text -> ServerState -> IO ()
broadcast message clients = do
  forM_ clients $ \(n, conn) -> do
   T.putStrLn message
   WS.sendTextData conn message

main :: MVar () -> IO ()
main kill = do
    state <- newMVar newServerState
    race_ (takeMVar kill) (WS.runServer "127.0.0.1" 9160 $ app state)

app :: MVar ServerState -> WS.ServerApp
app state pending = do
    conn <- WS.acceptRequest pending
    WS.withPingThread conn 30 (return ()) $ do
        msg <- WS.receiveData conn
        clients <- readMVar state
        let client = ("NEW CLIENT", conn)
        modifyMVar_ state $ \s -> do
         let clientList = addClient client s
         return clientList
        cli <- readMVar state
        print (map fst cli)
        readMVar state >>= broadcast msg
        flip finally (disconnect client) (talk client state)
  where 
   disconnect client = do
    modifyMVar_ state $ \s -> do
     let s' = removeClient client s
     return (s)

talk :: Client -> MVar ServerState -> IO ()
talk (user, conn) state = forever $ do
    msg <- WS.receiveData conn
    readMVar state >>= broadcast msg 


module Spec where

import           Control.Concurrent  (forkIO, threadDelay, killThread, ThreadId)
import           Control.Concurrent.MVar
import           Control.Concurrent.Chan
import           Control.Monad       (forever, unless)
import           Control.Monad.Trans (liftIO)
import           Network.Socket      (withSocketsDo)
import           Data.Text           (Text)
import qualified Data.Text           as T
import qualified Data.Text.IO        as T
import qualified Network.WebSockets  as WS
import qualified DevelMain
import           DevelMain (Message(..), Greeting(..))
import           Test.Hspec
import           Data.Word
import           Foreign.Store
import           Data.ByteString.Lazy (ByteString(..))
import           Data.Aeson
import           Control.Concurrent.STM.TChan
import           Control.Monad.STM

import qualified MVarController as MVC
import qualified TChanController as TCC

threadStoreIndex :: Word32
threadStoreIndex = 1

main :: IO ()
main = do
  mThreadId <- lookupStore threadStoreIndex :: IO (Maybe (Store ThreadId))
  case (mThreadId) of
   Nothing -> do
    print "No Stored Thread"
    kill <- newEmptyMVar
    tId <- forkIO (DevelMain.main kill)
    _ <- writeStore (Store threadStoreIndex) tId
    threadDelay 1000000
    TCC.clientConnectionTest
    putMVar kill ()
   Just x -> do
    print "Found Thread ID in Store"
    otId <- readStore x
    print (otId)
    killThread otId 
    kill <- newEmptyMVar
    threadDelay 100000
    tId <- forkIO (DevelMain.main kill)
    _ <- writeStore (Store threadStoreIndex) tId
    threadDelay 100000
    TCC.clientConnectionTest
    putMVar kill ()


module MVarController where

import           Control.Concurrent  (forkIO, threadDelay, killThread, ThreadId)
import           Control.Concurrent.MVar
import           Control.Monad       (forever, unless)
import           Control.Monad.Trans (liftIO)
import           Network.Socket      (withSocketsDo)
import qualified Data.Text.IO        as T
import qualified Network.WebSockets  as WS
import           Test.Hspec
import           Data.ByteString.Lazy (ByteString(..))
import           Data.Aeson
import           DevelMain (Message(..), Greeting(..))

type ReadVar a = MVar a
type WriteVar a = MVar a
type Controller a = (WriteVar a, ReadVar a)

wsSlave :: Controller ByteString -> WS.ClientApp ()
wsSlave controller@(wvar, rvar) conn = do
  _ <- forkIO $ writeOut
  loop

  where loop = do 
         val <- tryTakeMVar wvar
         case (val) of
           Just x -> do
             WS.sendTextData conn x
             loop
           _ -> loop

        writeOut = forever $ do
         msg <- WS.receiveData conn
         ise <- isEmptyMVar rvar
         print ise
         case (ise) of
          True -> putMVar rvar msg
          False -> do
           modifyMVar_ rvar $ \m -> do
            return msg

clientConnectionTest :: IO ()
clientConnectionTest = hspec $ do
  let msg = Message "Hello Nice to Meet You all"
  let grt = Greeting "Hello I'm Ethan"
  describe "Test Initial Connection" $ do
    controller <- runIO openController
    runIO $ forkIO $ withSocketsDo $ WS.runClient "127.0.0.1" 9160 "/" $ wsSlave controller
    runIO $ threadDelay 100000
    runIO $ putMVar (fst controller) (encode grt)
    grt1 <- runIO $ takeMVar (snd controller) 
    it "It should allow for a client to connect to the application with a greeting" $ do
      (decode grt1 :: Maybe Greeting) `shouldBe` (Just grt)
    it "It should allow a connected client to send a message" $ do
      putMVar (fst controller) (encode msg)
      msg1 <- readMVar (snd controller) 
      (decode msg1 :: Maybe Message) `shouldBe` (Just msg)

openController :: IO (WriteVar a, ReadVar a)
openController = do
  wVar <- newEmptyMVar
  rVar <- newEmptyMVar
  return (wVar, rVar)

controlLoop :: Controller ByteString -> IO ()
controlLoop controller@(wvar, rvar) = do
  v <- tryTakeMVar wvar :: IO (Maybe ByteString)
  case (v) of
    Just x -> do
      putMVar rvar x
      controlLoop controller
    _ -> controlLoop controller

module TChanController where

import           Control.Concurrent  (forkIO, threadDelay, killThread, ThreadId)
import           Control.Monad       (forever, unless)
import           Network.Socket      (withSocketsDo)
import qualified Data.Text.IO        as T
import qualified Network.WebSockets  as WS
import           DevelMain (Message(..), Greeting(..))
import           Test.Hspec
import           Data.ByteString.Lazy (ByteString(..))
import           Data.Aeson
import           Control.Concurrent.STM.TChan
import           Control.Monad.STM

type ReadChan a = TChan a
type WriteChan a = TChan a
type ControllerChan a = (WriteChan a, ReadChan a)

wsSlave :: ControllerChan ByteString -> WS.ClientApp ()
wsSlave controller@(wchan, rchan) conn = do
  _ <- forkIO $ writeOut
  loop

  where loop = do 
         val <- atomically $ tryReadTChan wchan
         case (val) of
           Just x -> do
             T.putStrLn "HERE"
             WS.sendTextData conn x
             loop
           _ -> loop


        writeOut = forever $ do
         msg <- WS.receiveData conn :: IO ByteString
         print msg

clientConnectionTest :: IO ()
clientConnectionTest = hspec $ do
  let msg = Message "Hello Nice to Meet You all"
  let grt = Greeting "Hello I'm Ethan"
  controller <- runIO openChanController
  -- runIO $ forkIO $ withSocketsDo $ WS.runClient "127.0.0.1" 9160 "/" $ wsSlave controller
  describe "Test Initial Connection" $ do
    -- runIO $ print "ACQUISTION"
    runIO $ threadDelay 100000
    runIO $ forkIO $ controlLoop controller
    -- runIO $ threadDelay 100000
    -- runIO $ print "DELAYED THREAD"
    runIO $ atomically $ writeTChan (fst controller) (encode grt)
    -- runIO $ threadDelay 10000
    -- grt1 <- runIO $ initiateClient controller
    -- runIO $ threadDelay 1000
    runIO $ print "Got To Read"
    grt1 <- runIO $ atomically $ readTChan (snd controller) 
    --let grt1 = encode grt
    --runIO $ print "READ FROM CHANNEL"
    it "It should allow for a client to connect to the application with a greeting" $ do
      (decode grt1 :: Maybe Greeting) `shouldBe` (Just grt)
    {-
    it "It should allow a connected client to send a message" $ do
      atomically $ writeTChan (fst controller) (encode grt)
      msg1 <- atomically $ readTChan (snd controller) 
      (decode msg1 :: Maybe Message) `shouldBe` (Just msg)
    -}

openChanController :: IO (WriteChan a, ReadChan a)
openChanController = do
  wc <- newTChanIO
  rc <- newTChanIO
  return (wc, rc)

controlLoop :: ControllerChan ByteString -> IO ()
controlLoop controller@(wchan, rchan) = do
  v <- atomically $ tryReadTChan wchan :: IO (Maybe ByteString)
  case (v) of
    Just x -> do
      atomically $ writeTChan rchan x
      controlLoop controller
    _ -> controlLoop controller


Libraries Used:

- async
- aeson
- base >= 4.7 && < 5
- wai
- warp
- http-types
- scotty
- bytestring 
- text
- websockets
- transformers
- mtl
- unordered-containers
- network
- hspec
- foreign-store
- stm

标签: haskellwebsocketstm

解决方案


推荐阅读