RxJava合并请求序列

回答:

我有两个Apis。Api 1为我提供了一个项目列表,Api 2为我提供了我从Api 1获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能下降。

回答:

借助Retrofit和RxJava,可以快速有效地解决此问题。

回答:

当下,我的解决方案如下所示:

步骤1:Single<ArrayList<Information>>从Api 1 执行改造。

第2步:我遍历此项目,并向Api 2请求每个项目。

步骤3:Single<ExtendedInformation>对每个项目依次执行改造退货

步骤4:在完全执行Api 2的所有调用之后,我为所有包含信息和扩展信息的项创建一个新对象。

回答:

 public void addExtendedInformations(final Information[] informations) {

final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();

final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {

@Override

public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {

informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));

if (informationDetailArrayList.size() >= informations.length){

listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);

}

}

};

for (Information information : informations) {

getExtendedInformation(ratingRequestListener, information);

}

}

public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {

Single<ExtendedInformation> repos = service.findForTitle(information.title);

disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {

@Override

public void onSuccess(ExtendedInformation extendedInformation) {

ratingRequestListener.onDownloadFinished(information, extendedInformation);

}

@Override

public void onError(Throwable e) {

ExtendedInformation extendedInformation = new ExtendedInformation();

ratingRequestListener.onDownloadFinished(extendedInformation, information);

}

}));

}

public interface RatingRequestListener {

void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

}

回答:

异步地或在调度程序上使用concatMapEagerflatMap执行子调用。


我不是android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2)。

如果我看对图片,所需的流程是:

some query param 

\--> Execute query on API_1 -> list of items

|-> Execute query for item 1 on API_2 -> extended info of item1

|-> Execute query for item 2 on API_2 -> extended info of item1

|-> Execute query for item 3 on API_2 -> extended info of item1

...

\-> Execute query for item n on API_2 -> extended info of item1

\----------------------------------------------------------------------/

|

\--> stream (or list) of extended item info for the query param

假设Retrofit为

interface Api1 {

@GET("/api1") Observable<List<Item>> items(@Query("param") String param);

}

interface Api2 {

@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);

}

如果项目的顺序不重要,则flatMap只能使用:

api1.items(queryParam)

.flatMap(itemList -> Observable.fromIterable(itemList)))

.flatMap(item -> api2.extendedInfo(item.id()))

.subscribe(...)

但 改造生成器配置有

  • 要么使用异步适配器(调用将在okhttp内部执行程序中排队)。我个人认为这不是一个好主意,因为您无法控制此执行器。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()

  • 或使用基于调度程序的适配器(调用将在RxJava调度程序上进行调度)。这是我的首选,因为您明确选择了使用哪个调度程序,因此很可能是IO调度程序,但是您可以自由尝试其他调度程序。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))

原因是flatMap将订阅由创建的每个可观察对象api2.extendedInfo(...),并将它们合并到结果可观察对象中。因此结果将按接收顺序显示。

改装客户端 设置为异步或组对调度运行,也可以设置一个:

api1.items(queryParam)

.flatMap(itemList -> Observable.fromIterable(itemList)))

.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))

.subscribe(...)

这种结构几乎与先前的执行相同,它 在本地 指示api2.extendedInfo应该在每个调度程序上运行。

可以调整的maxConcurrency参数flatMap以控制您想同时执行多少个请求。尽管我对此会保持谨慎,但您不想同时运行所有查询。通常,默认值maxConcurrency足够好(128)。

concatMap通常是运算符,它flatMap按顺序执行相同的操作,但是顺序执行,如果代码需要等待所有子查询执行,则结果会很慢。尽管解决方案与相比又迈出了一步concatMapEager,这一步将按顺序订阅可观察的内容,并根据需要缓冲结果。

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)

.flatMap(itemList -> Observable.fromIterable(itemList)))

.concatMapEager(item -> api2.extendedInfo(item.id()))

.subscribe(...)

或者,如果必须在本地设置调度程序:

api1.items(queryParam)

.flatMap(itemList -> Observable.fromIterable(itemList)))

.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))

.subscribe(...)

也可以在此运算符中调整并发性。


另外,如果Api返回Flowable,则可以.parallel在RxJava

2.1.7中使用仍处于beta状态的Api。但是结果却是不规则的,我还不知道有没有排序的方法(还可以吗?)。

api.items(queryParam) // Flowable<Item>

.parallel(10)

.runOn(Schedulers.io())

.map(item -> api2.extendedInfo(item.id()))

.sequential(); // Flowable<ItemExtended>

以上是 RxJava合并请求序列 的全部内容, 来源链接: utcz.com/qa/428705.html

回到顶部