首页 > 解决方案 > RXJS 6 - 在 .pipe 中包装 .map

问题描述

我正在将 Observable 代码从 RXJS 5 更新到 6。

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs'
import { AppConfig } from '../config/app-config';
import { Xapi } from 'x-lib';
import { ClicksActive, ClicksBalance } from '../models/user.model';
import { getLambdaErrorMessage } from "../helpers/lambda-error.helper";
import { map } from 'rxjs/operators';
import { catchError } from 'rxjs/operators';


@Injectable(
    { providedIn: 'root' }
)
export class ClicksService {

    constructor() {
    }

    requestActive(
        idNumber: String
    ): Observable<ClicksActive> {

        let parameters =
        {
            apiUsername: this.getApiUsername(),
            apiPassword: this.getApiPassword(),
            apiBaseUrl: this.getApiBaseUrl(),
            idNumber: idNumber
        }

        return Xapi.addTransaction('clicksActive', parameters, { timeout: AppConfig.OTP_REQUEST_TIMEOUT, queue: false })
        .pipe(map(res => {
            let response = res.data;
            let clicksActive: ClicksActive = this.parseActiveResponse(response);
            return clicksActive;
        }), catchError((error) => {
            let errorMessage = getLambdaErrorMessage(error);
            let clicksActive: ClicksActive = {
                success: false,
                active: false,
                errorMessage: errorMessage
            }

            return Observable.of(clicksActive);
        })
        )
    }

    parseActiveResponse(response): ClicksActive {
        let clicksActive: ClicksActive = {
            success: response.success,
            active: response.active,
            errorMessage: response.errorMessage
        }
        return clicksActive;
    }

...

作为参考,Xapi 看起来像这样:

import { ApiTransactions } from "./api.model";
export declare class Xapi {
    private static transactions;
    private static timeoutCheckInterval;
    constructor();
    /**
     * Add transaction to be processed
     *
     * @param {string} lambda
     * @param {object} request
     * @param opts
     * @returns {Observable<any>}
     */
    addTransaction(lambda: string, request: object, opts?: any): Observable<any>;
    /**
     * Get all transactions
     *
     * @returns {ApiTransactions}
     */
    static getTransactions(): ApiTransactions;
    /**
     * Process transaction response
     *
     * When the transaction comes back from the lambdas, process it.
     *
     * @param rx
     */
    static transactionResponse(rx: any): void;
    /**
     * Process transaction that is not queued
     *
     * @param {string} pid
     */
    static processTransaction(pid: string): void;
    /**
     * Checks timeouts on transactions
     */
    private static checkTimeouts();
    /**
     *  Starts interval check for timeouts
     */
    private static startTimeoutCheck();
    /**
     * Stops interval for timeout check
     */
    private static stopTimeoutCheck();
}
export declare let Xapi: Xapi;

这反过来调用 clicksActive.js - 这只是一个 ajax 请求:

class ClicksActive {

    constructor(api, config, log, dbi) {

        this.dbi = dbi;
        this.log = log;

        this.apiUsername = '';
        this.apiPassword = '';
        this.apiBaseUrl = '';
        this.apiTokenExpiration = 3500;
    }

    request(tx) {
        this.apiUsername = tx.apiUsername;
        this.apiPassword = tx.apiPassword;
        this.apiBaseUrl = tx.apiBaseUrl;
        var _this = this;
        this.dbi.apiUsers.find({ username: this.apiUsername })
            .then(
                function(result) {
                    if (result.length === 0) {
                        _this.requestFromApiWithNewToken(tx, false);
                    } else {
                        var user = result[0];
                        var now = new Date();
                        now = Math.round(now.getTime() / 1000);
                        if (now >= user.expiration) {
                            _this.requestFromApiWithNewToken(tx, true);
                        } else {
                            _this.requestFromApi(tx, user.token);
                        }
                    }
                }
            ).catch();

    }

    requestFromApiWithNewToken(tx, update) {
        var https = require('https');
        var _this = this;

        var data = JSON.stringify({
            username: this.apiUsername,
            password: this.apiPassword
        });

        var options = {
            host: this.apiBaseUrl,
            port: 443,
            path: '/api/Authentication/Token',
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
                'Content-Length': data.length
            }
        };

        var req = https.request(options, function(res) {
            res.setEncoding('utf8');
            var response = '';
            res.on('data', function(data) {
                response += data;
            });

            res.on('end', function() {
                response = JSON.parse(response);
                if (update === false) {
                    _this.saveToken(_this.apiUsername, _this.apiPassword, response.access_token);
                } else {
                    _this.updateToken(_this.apiUsername, _this.apiPassword, response.access_token);
                }
                _this.requestFromApi(tx, response.access_token);
            });
        }).write(data);
    }

    requestFromApi(tx, token) {
        var https = require('https');
        var data = JSON.stringify({
            idNumber: tx.idNumber
        });

        var options = {
            host: this.apiBaseUrl,
            port: 443,
            path: '/api/Clicks/Active',
            method: 'POST',
            headers: {
                authorization: 'Bearer ' + token,
                'Content-Type': 'application/json',
                'Content-Length': data.length
            }
        };

        https.request(options, function(res) {
            res.setEncoding('utf8');
            var response = '';
            res.on('data', function(data) {
                response += data;
            });
            res.on('end', function() {
                response = JSON.parse(response);
                if (response instanceof Array) {
                    tx.resolve(response);
                } else {
                    var responseArray = [];
                    responseArray.push(response);
                    tx.resolve(responseArray);
                }
            });
        }).on('error', function(e) {
            tx.reject([]);
        }).write(data);
    }

   

    updateToken(username, password, token) {
        var now = new Date();
        var expirationTime = Math.round(now.getTime() / 1000) + this.apiTokenExpiration;

        this.dbi.apiUsers.update({ username: username }, { $set: { token: token } });
        this.dbi.apiUsers.update({ username: username }, { $set: { expiration: expirationTime } });
    }
}

//The following binds the Lambda to the Xapi Lambda Agent to commence receiving transactions
require('./at/lambda').bind(ClicksActive);

但不幸的是,即使我已经将地图包装到管道中,我也无法让它工作。

return Xapi.addTransaction('clicksActive', parameters, { timeout: AppConfig.OTP_REQUEST_TIMEOUT, queue: false })
        .pipe(map(res => {
            let response = res.data;
            let clicksActive: ClicksActive = this.parseActiveResponse(response);
            return clicksActive;
        })

错误是:

“OperatorFunction<any, ClicksActive>”类型的参数不可分配给“UnaryFunction<Obs​​ervable, Observable>”类型的参数。参数 'source' 和 'source' 的类型不兼容。

我正在使用 RXJS 6 和 Angular 11、Ionic 5。

任何指出我正确方向的建议都会很有帮助。

标签: angulartypescriptrxjs

解决方案


TL; 博士

尝试以下更改

将行更改return Observable.of(clicksActive);return of(clicksActive);

记得添加必要的导入

import { of } from 'rxjs'

在 rxjs v 6+ 中,我们使用静态运算符of查看of文档


推荐阅读