angular - 使用 rxjs forkjoin 和 angular 10 http 拦截器,如何仅用多个 http 调用交换 1 个刷新令牌?
问题描述
我正在研究如何在 Angular 10 中进行多个 http 调用,而不需要您编写一堆调用和订阅,从而使代码不必要地冗长。偶然发现了 rxjs forkjoin。代码很简单,只有 4 个 https 调用,当它们返回时我会使用它们
const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", 'Param2' ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
const observableCall$ = this._configService.getSingle( "Param1", "Param2" ).pipe( catchError( err => of( err ) ) );
forkJoin(
[
observableCall$,
observableCall$,
observableCall$,
observableCall$,
]
).subscribe( ( results: Config[] ) => {
//do stuff
} )
我遇到的问题是令牌过期。如果令牌过期了,获取新令牌的调用会发生 4 次,这显然是不必要的。
auth 拦截器的代码是这样的:
intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
const authToken = this._authentication.token;
return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
? next.handle( request.clone( {
headers: request.headers
.set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
} ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
if ( error instanceof HttpErrorResponse && error.status === 401 ) {
return this._authentication.exchangeRefreshToken()
.pipe( mergeMap( token => next.handle( request.clone( {
headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
} ) ) ) )
}
} ) )
}
交换刷新令牌的代码:
const headers = this._httpHeaders;
const body = new HttpParams()
.append( 'grant_type', 'refresh_token' )
.append( 'client_id', environment.clientId )
.append( 'client_secret', environment.clientSecret )
.append( 'refresh_token', this.token.refresh_token );
var token = this._httpClient.post( environment.authBaseUrl, body, { headers } )
.pipe( catchError( error => { return null; } ) ) as Observable<Token>;
if ( !( token instanceof Token ) ) return this.getToken();
else {
this.token = new Token( token );
return token;
}
现在我想我明白了这个问题,这是 forkjoin 同时调用所有 4 个,它们作为自己的调用通过拦截器,当令牌过期时,它们都得到 401,因此需要一个新的令牌 4 次。有没有一种优雅的方式让它只要求一个新的令牌一次,然后继续其余的调用?
编辑:我也尝试在 html 中使用异步管道进行这些调用。IE:
<div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>
<div *ngIf="observableCall$ | async">{{ observableCall | json }}</div>
同样的事情也发生了。如果有更好的方法可以做到这一点,我愿意接受建议。
谢谢!
编辑(2):感谢马克的帮助,验证拦截器中的代码
import {
HttpErrorResponse,
HttpEvent,
HttpHandler,
HttpInterceptor,
HttpRequest
} from '@angular/common/http';
import { Injectable } from '@angular/core';
import { merge } from 'lodash';
import { Observable, of, Subject } from 'rxjs';
import { catchError, exhaustMap, filter, first, mergeMap, share, shareReplay, startWith, switchMap, tap } from 'rxjs/operators';
import { Token } from 'src/app/models/token/token.model';
import { environment } from 'src/environments/environment';
import { AuthorizationService } from '../services/authorization/authorization.service';
@Injectable()
export class AuthorizationInterceptor implements HttpInterceptor {
constructor( private _authentication: AuthorizationService ) { }
private _exchangeToken$ = new Subject<boolean>();
private _refreshToken$ = this._exchangeToken$.pipe(
filter( x => x ),
exhaustMap( () => this._authentication.exchangeRefreshToken() ),
share()
);
private _refreshTokenCache$ = this._refreshToken$.pipe(
startWith( null ),
shareReplay( 1 )
);
exchangeRefreshToken( expiredToken: Token ): Observable<Token> {
console.log( '2' );
const exchange = () => {
const startToken$ = of( true ).pipe(
tap( x => this._exchangeToken$.next( x ) ),
filter( _ => false )
);
return merge( startToken$, this._refreshToken$.pipe( first() ) );
}
return this._refreshTokenCache$.pipe(
first(),
switchMap( token =>
token == null || token === expiredToken ?
exchange() :
of( token )
)
);
}
intercept( request: HttpRequest<unknown>, next: HttpHandler ): Observable<HttpEvent<unknown>> {
const authToken = this._authentication.token;
return ( !!authToken && ( request.url.indexOf( environment.authBaseUrl ) === -1 )
? next.handle( request.clone( {
headers: request.headers
.set( 'Authorization', `${authToken.token_type} ${authToken.access_token}` )
} ) ) : next.handle( request ) ).pipe( catchError( ( error ) => {
if ( error instanceof HttpErrorResponse && error.status === 401 ) {
return this.exchangeRefreshToken( authToken )
.pipe( mergeMap( token => next.handle( request.clone( {
headers: request.headers.append( 'Authorization', `${token.token_type} ${token.access_token}` )
} ) ) ) );
}
} ) );
}
}
解决方案
您需要为其创建一个this._authentication.exchangeRefreshToken()
多播的包装器,并且不会重复尝试获取刷新令牌。
一种解决方案是创建一个主题,以创建对刷新令牌的调用。
private _exchangeToken$ = new Subject<boolean>();
private _refreshToken$ = _exchangeToken$.pipe(
filter(x => x),
exhaustMap(_ => this._authentication.exchangeRefreshToken()),
share()
);
function exchangeRefreshToken() : Observable<Token>{
const startToken$ = of(true).pipe(
tap(x => this._exchangeToken$.next(x)),
filter(_ => false)
);
return merge(startToken$, this._refreshToken$.pipe(first()));
}
在这种模式中,您所做的一切都是确保this._authentication.exchangeRefreshToken()
永远不会同时调用 , 。如果在 _exchangeToken$ 主题上创建了第二次发射,它会被忽略,调用者将等待第一次调用的结果。这里的好处是你可以换掉而不做进一步this._authentication.exchangeRefreshToken()
的改变this.exchangeRefreshToken()
更好的解决方案是缓存刷新的令牌,并且仅在缓存中的令牌过期时才获取新令牌。这有点复杂,因为您需要一种机制来知道哪些令牌已过期。我们可以shareReplay(1)
用来保存最新令牌的缓存。我们还需要传入过期的令牌,以便我们可以比较缓存的令牌。
private _exchangeToken$ = new Subject<boolean>();
private _refreshToken$ = _exchangeToken$.pipe(
filter(x => x),
exhaustMap(_ => this._authentication.exchangeRefreshToken()),
share()
);
private _refreshTokenCache$ = _refreshToken$.pipe(
startWith(null),
shareReplay(1)
);
function exchangeRefreshToken(expiredToken) : Observable<Token>{
const exchange = () => {
const startToken$ = of(true).pipe(
tap(x => this._exchangeToken$.next(x)),
filter(_ => false)
);
return merge(startToken$, this._refreshToken$.pipe(first()));
}
return this._refreshTokenCache$.pipe(
first(),
switchMap(token =>
token == null || token === expiredToken ?
exchange() :
of(token)
)
);
}
这并不完美,因为缓存的令牌可能仍然过期,即使它恰好比传入的过期令牌更新。您真正想要的是一个可以读取令牌的有效负载并检查它是否过期的函数。
这样做的问题是,如何做到这一点取决于令牌。是智威汤逊吗?风俗?如果它是加密的,询问服务器可能是唯一的方法。如果您知道令牌的持续时间,您可以使用自己的时间戳存储新令牌,并使用它来决定何时刷新。
private _refreshTokenCache$ = _refreshToken$.pipe(
map(token => ({
token,
timeStamp: Date.now()
})),
shareReplay(1)
);
真的,没有一个万能的解决方案。
旁白:
这可能看起来有些奇怪:
of(true).pipe(
tap(x => this._exchangeToken$.next(x)),
filter(_ => false)
);
这是一个不发出任何内容然后完成的流。但作为副作用,它调用this._exchangeToken$.next(true)
.
那么为什么不this._exchangeToken$.next(true)
直接这样调用呢?
function exchangeRefreshToken() : Observable<Token>{
this._exchangeToken$.next(true);
return this._refreshToken$.pipe(first());
}
我们这样做是为了在订阅时创建此效果,而不是在创建时创建。所以这应该仍然有效。
const exchange$ = this.exchangeRefreshToken();
setTimeout(() => {
exchange$.subscribe(token => console.log("I got a token: ", token))
}, 60 x 60 x 1000);
在这里,我们得到一个流,然后等待一个小时并订阅一个刷新令牌。如果我们不生成令牌作为订阅的一部分,那么我们订阅得太晚而无法获取令牌,并且“我得到了令牌”将永远不会打印到控制台。
有关更多详细信息/其他方法的相关问题
推荐阅读
- python - 将多个 CSV 文件加载到一个嵌套字典中
- python - numpy reshape() 和 transpose() 之间的交互是否有规则?
- php - CakePHP ServerRequest getQuery() 在 Helper 类中返回空白数组
- scala - Spark Window Functions:过滤掉开始和结束日期在另一行开始和结束日期范围内的行
- netsuite - Netsuite - 如何将保存的搜索结果从工作流中保存到可编写脚本的电子邮件模板中
- teradata - Teradata Case When 语句 - 替换结果中不在 1 列中的值
- javascript - 从 FTP 下载 PDF 文件并加入它们
- orientation - 是否有设置可以在纵向和横向中正确显示 Bixby 胶囊?
- node.js - 如何使异步任务同步重复
- tsql - SSRS报告运行慢,但sql代码运行快