首页 > 解决方案 > 使用 rxjava 进行正确的视频流

问题描述

为了处理来自网络摄像头的视频流(由 opencv 提供),我正在考虑使用 RxJava。

我希望实现以下目标:

我一直在尝试使用 RxJava,但我对 debounce、throttleFirst 和async 运算符感到困惑

https://stackoverflow.com/a/48723331/1497139之类的示例显示了一些代码,但我缺少更详细的解释。

我在哪里可以找到一个像样的视频处理示例或与上述需求类似的东西?

下面的代码此时执行了一些非异步逻辑 - 如果我可以构建它,请告诉我:

图像提取器

import org.opencv.core.Mat;
import org.opencv.videoio.VideoCapture;

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

/**
 * fetcher for Images
 *
 */
public class ImageFetcher {
  // OpenCV video capture
  private VideoCapture capture = new VideoCapture();
  private String source;
  protected int frameIndex;

  public int getFrameIndex() {
    return frameIndex;
  }

  /**
   * fetch from the given source
   * 
   * @param source
   *          - the source to fetch from
   */
  public ImageFetcher(String source) {
    this.source = source;
  }

  /**
   * try opening my source
   * 
   * @return true if successful
   */
  public boolean open() {
    boolean ret = this.capture.open(source);
    frameIndex=0;
    return ret;
  }

  /**
   * fetch an image Matrix
   * 
   * @return - the image fetched
   */
  public Mat fetch() {
    if (!this.capture.isOpened()) {
      boolean ret = this.open();
      if (!ret) {
        String msg = String.format(
            "Trying to fetch image from unopened VideoCapture and open %s failed",
            source);
        throw new IllegalStateException(msg);
      }
    }
    final Mat frame = new Mat();
    this.capture.read(frame);
    frameIndex++;
    return !frame.empty() ? frame : null;
  }

  @Override
  protected void finalize() throws Throwable {
    super.finalize();
  }

  /**
   * convert me to an observable
   * @return a Mat emitting Observable 
   */
  public Observable<Mat> toObservable() {
    // Resource creation.
    Func0<VideoCapture> resourceFactory = () -> {
      VideoCapture capture = new VideoCapture();
      capture.open(source);
      return capture;
    };

    // Convert to observable.
    Func1<VideoCapture, Observable<Mat>> observableFactory = capture -> Observable
        .<Mat> create(subscriber -> {
          boolean hasNext = true;
          while (hasNext) {
            final Mat frame = this.fetch();
            hasNext = frame!=null && frame.rows()>0 && frame.cols()>0;
            if (hasNext) {
               String msg = String.format("->%6d:%4dx%d", frameIndex, frame.cols(), frame.rows());
               System.out.println(msg);
               subscriber.onNext(frame);
            }
          }
          subscriber.onCompleted();
        });

    // Disposal function.
    Action1<VideoCapture> dispose = VideoCapture::release;

    return Observable.using(resourceFactory, observableFactory, dispose);
  }
}

图像订阅者

import org.opencv.core.Mat;

import rx.Subscriber;

public class ImageSubscriber extends Subscriber<Mat> {

  public Throwable error;
  public int cols = 0;
  public int rows=0;
  public int frameIndex=0;
  public boolean completed = false;
  public boolean debug = false;

  @Override
  public void onCompleted() {
    completed = true;
  }

  @Override
  public void onError(Throwable e) {
    error = e;
  }

  @Override
  public void onNext(Mat mat) {
    cols = mat.cols();
    rows = mat.rows();
    frameIndex++;
    if (cols==0 || rows==0)
      System.err.println("invalid frame "+frameIndex);
    if (debug) {
      String msg = String.format("%6d:%4dx%d", frameIndex, cols, rows);
      System.out.println(msg);
    }
  }
};

标签: opencvrx-java

解决方案


推荐阅读