rxjs首先完成整个流式链接

我有一个角度为5的应用程序,rxjs WebsocketSubject发送jsonrpc消息。rxjs首先完成整个流式链接

这是我sendRequest将功能

sendRequest(request: Request): Promise<Response> { 

console.log(request);

this.socket.next(JSON.stringify(request));

return this.onResponse().filter((response: Response) => {

return response.id === request.id;

}).first().toPromise().then((response) => {

console.log(response);

if (response.error) {

console.log('error');

throw new RpcError(response.error);

}

return response;

});

}

我使用的第一个()运算符来完成这个过滤规则订阅。但是,onResponse()直接来自我的WebSocketSubject,然后完成。

是否有解除原始主题的方法?

或者我应该创建一个新的Observale.create(...)?

写入的.filter函数会发生什么情况。它会持续到哪里?或者我必须在任何地方将其删除,以防止持续过滤呼叫?


编辑1

也是用这个没有帮助。

sendRequest(request: Request): Promise<Response> { 

console.log(request);

this.socket.next(JSON.stringify(request));

return new Promise<Response>((resolve, reject) => {

const responseSubscription = this.onResponse().filter((response: Response) => {

console.log('filter');

return response.id === request.id;

}).subscribe((response: Response) => {

// responseSubscription.unsubscribe();

resolve(response);

});

});

}

如果我执行取消订阅整个websocketSubject被关闭。不要这样做每个请求准时记录'过滤器'!


编辑2

这里是整个websocketService我写

import {Injectable} from "@angular/core"; 

import {WebSocketSubject, WebSocketSubjectConfig} from "rxjs/observable/dom/WebSocketSubject";

import {MessageFactory, Notification, Request, Response, RpcError} from "../misc/jsonrpc";

import {ReplaySubject} from "rxjs/ReplaySubject";

import {Observable} from "rxjs/Observable";

import 'rxjs/add/operator/toPromise';

import 'rxjs/add/operator/filter';

import 'rxjs/add/operator/first';

import 'rxjs/add/observable/from';

export enum ConnectionState {

CONNECTED = "Connected",

CONNECTING = "Connecting",

CLOSING = "Closing",

DISCONNECTED = "Disconnected"

}

@Injectable()

export class WebsocketService {

private connectionState = new ReplaySubject<ConnectionState>(1);

private socket: WebSocketSubject<ArrayBuffer | Object>;

private config: WebSocketSubjectConfig;

constructor() {

console.log('ctor');

const protocol = location.protocol === 'https' ? 'wss' : 'ws';

const host = location.hostname;

const port = 3000; // location.port;

this.config = {

binaryType: "arraybuffer",

url: `${protocol}://${host}:${port}`,

openObserver: {

next:() => this.connectionState.next(ConnectionState.CONNECTED)

},

closingObserver: {

next:() => this.connectionState.next(ConnectionState.CLOSING)

},

closeObserver: {

next:() => this.connectionState.next(ConnectionState.DISCONNECTED)

},

resultSelector: (e: MessageEvent) => {

try {

if (e.data instanceof ArrayBuffer) {

return e.data;

} else {

return JSON.parse(e.data);

}

} catch (e) {

console.error(e);

return null;

}

}

};

this.connectionState.next(ConnectionState.CONNECTING);

this.socket = new WebSocketSubject(this.config);

this.connectionState.subscribe((state) => {

console.log(`WS state ${state}`);

});

}

onBinaryData(): Observable<ArrayBuffer> {

return this.socket.filter((message: any) => {

return message instanceof ArrayBuffer;

});

}

onMessageData(): Observable<Object> {

return this.socket.filter((message: any) => {

return !(message instanceof ArrayBuffer);

});

}

onResponse(): Observable<Response> {

return this.onMessageData().filter((message) => {

return MessageFactory.from(message).isResponse();

}).map((message): Response => {

return MessageFactory.from(message).toResponse();

});

}

sendRequest(request: Request): Promise<Response> {

console.log(request);

this.socket.next(JSON.stringify(request));

return new Promise<Response>((resolve, reject) => {

const responseSubscription = this.onResponse().filter((response: Response) => {

console.log('filter');

return response.id === request.id;

}).subscribe((response: Response) => {

responseSubscription.unsubscribe();

resolve(response);

});

});

}

sendNotification(notification: Notification): void {

this.socket.next(JSON.stringify(notification));

}

}

而结果在我的日志

Using Angular 5.0.2 

websocket.service.ts:27 ctor

websocket.service.ts:69 WS state Connecting

core.js:3565 Angular is running in the development mode. Call enableProdMode() to enable the production mode.

websocket.service.ts:96 Request {jsonrpc: "2.0", id: "b042005c-5fbf-5ffc-fbd1-df68fae5882e", method: "appointment_list_get", params: undefined}

websocket.service.ts:69 WS state Connected

websocket.service.ts:103 filter

websocket.service.ts:69 WS state Disconnected

我需要找到一个方式decouplin g以某种方式从原始流中过滤掉我的过滤器。

回答:

这是行得通的。 关键是将消息处理与下层websocketSubject分离。

import {Injectable} from "@angular/core"; 

import {WebSocketSubject, WebSocketSubjectConfig} from "rxjs/observable/dom/WebSocketSubject";

import {MessageFactory, Notification, Request, Response, RpcError} from "../misc/jsonrpc";

import {ReplaySubject} from "rxjs/ReplaySubject";

import {Observable} from "rxjs/Observable";

import 'rxjs/add/operator/toPromise';

import 'rxjs/add/operator/filter';

import 'rxjs/add/operator/first';

import 'rxjs/add/observable/from';

import {Subject} from "rxjs/Subject";

export enum ConnectionState {

CONNECTED = "Connected",

CONNECTING = "Connecting",

CLOSING = "Closing",

DISCONNECTED = "Disconnected"

}

@Injectable()

export class WebsocketService {

private connectionState = new ReplaySubject<ConnectionState>(1);

private socket: WebSocketSubject<ArrayBuffer | Object>;

private config: WebSocketSubjectConfig;

private messageObserver = new Subject<MessageFactory>();

private binaryObserver = new Subject<ArrayBuffer>();

constructor() {

const protocol = location.protocol === 'https' ? 'wss' : 'ws';

const host = location.hostname;

const port = 3000; // location.port;

this.config = {

binaryType: "arraybuffer",

url: `${protocol}://${host}:${port}`,

openObserver: {

next:() => this.connectionState.next(ConnectionState.CONNECTED)

},

closingObserver: {

next:() => this.connectionState.next(ConnectionState.CLOSING)

},

closeObserver: {

next:() => this.connectionState.next(ConnectionState.DISCONNECTED)

},

resultSelector: (e: MessageEvent) => {

try {

if (e.data instanceof ArrayBuffer) {

return e.data;

} else {

return JSON.parse(e.data);

}

} catch (e) {

console.error(e);

return null;

}

}

};

this.connectionState.next(ConnectionState.CONNECTING);

this.socket = new WebSocketSubject(this.config);

this.socket.filter((message: any) => {

return message instanceof ArrayBuffer;

}).subscribe((message: ArrayBuffer) => {

this.binaryObserver.next(message);

});

this.socket.filter((message: any) => {

return !(message instanceof ArrayBuffer);

}).subscribe((message: ArrayBuffer) => {

this.messageObserver.next(MessageFactory.from(message));

});

this.connectionState.subscribe((state) => {

console.log(`WS state ${state}`);

});

}

onResponse(): Observable<Response> {

return this.messageObserver.filter((message: MessageFactory) => {

return message.isResponse();

}).map((message: MessageFactory): Response => {

return message.toResponse();

});

}

sendRequest(request: Request): Promise<Response> {

console.log(request);

this.socket.next(JSON.stringify(request));

return this.onResponse().filter((response: Response) => {

return request.id === response.id;

}).first().toPromise().then((response) => {

console.log(response);

if (response.error) {

console.log('error');

throw new RpcError(response.error);

}

return response;

});

}

sendNotification(notification: Notification): void {

this.socket.next(JSON.stringify(notification));

}

}

以上是 rxjs首先完成整个流式链接 的全部内容, 来源链接: utcz.com/qa/260056.html

回到顶部