首页 > 解决方案 > Dart 如何实现连续输出的 web-socket

问题描述

如何实现数据从服务器到客户端的连续输出。例如,start服务器接收到客户端的命令后,不断向客户端输出当前时间,直到服务器接收到该quit命令。

问题:

  1. 为什么服务器收不到quit命令
  2. 为什么只有第一个客户端才能收到服务器连续输出的消息

我在 Dart 中的代码是

服务器:

import 'dart:io';

main() async {
  var running = true;
  var q = 'quit';
  var p;
  final port = 8080;
  
  var webSocketTransformer = WebSocketTransformer();
  
  HttpServer server = await HttpServer.bind(InternetAddress.anyIPv6, port);
  server.transform(webSocketTransformer).listen((WebSocket webSocket) {
    print('WebSocket opened.');

    webSocket.listen((_) {
      q = _;
      print(_);
    }, onError: (err) {
      running = false;
      print(err);
    }, onDone: () {
      running = false;
      print('WebSocket closed.');
    });

    while (running && p != q) {
      sleep(Duration(seconds: 1));
      webSocket.add(DateTime.now().toString());
    }
  });
  print('Listening..');
}

客户:

import 'dart:io';
import 'package:web_socket_channel/io.dart';

void main(List<String> arguments) {
  final url = 'ws://localhost:8080';
  final channel = IOWebSocketChannel.connect(url);

  channel.sink.add('start');
  channel.stream.listen((msg) {
    print(msg);
  });

  sleep(Duration(seconds: 30));
  channel.sink.add('quit');
}

标签: flutterdartwebsocket

解决方案


我认为如果它在无限循环中忙碌,您当前的方法将阻止更多客户端的套接字。最好的办法是使用周期性流,它会在给定的时间间隔发出消息,这样它仍然可以监听未来的客户端。

服务器:

import 'dart:io';
import 'dart:async';

main() async {
  final port = 8080;
  
  var webSocketTransformer = WebSocketTransformer();

  // Use a periodic stream that emits events at a given interval
  Stream<int> streamPeriodic =
          Stream.periodic(const Duration(milliseconds: 1000), (count) {
        return count;
   });
  late StreamSubscription<int> subscription;
  bool openSocket = false;

  HttpServer server = await HttpServer.bind(InternetAddress.anyIPv6, port);
  server.transform(webSocketTransformer).listen((WebSocket webSocket) {
       print('WebSocket opened.');
       
       webSocket.listen( (event) {
         if (!openSocket) {
              subscription = streamPeriodic.listen((event) {
                   webSocket.add(DateTime.now().toString());
              });
            openSocket = true;
         }
         else {
             event as String;
             if (event == 'start') {
                subscription.resume();
             }
             else if (event == 'quit') {
                subscription.pause();  
             }
         }
    });         
  });
  print('Listening..');
}

推荐阅读