RxJava的Observable订阅时如何作到在onComplete被调用时取消订阅?

例:

table.subscribe(tableIns -> {

// System.out.println("-------Information-------");

System.out.println(tableIns);

}, throwable -> {

throw new SchemaExportException(throwable);

}, new Action() {

@Override

public void run() throws Exception {

System.out.println("Complete");

// 在这里取消订阅

});

注意:是非Android 运行环境, 使用的是RxJava2.x


回答:

暂时只能在onComplete中设置: CompletableFuture.complete来通知调用方结束了. 示例:

Flowable由持久层方法返回,是调用方中的:result.getAll(dbName.get(), strategy)result.getTableColumn(table)

public class ConsoleSchemaFlowableOutput implements SchemaFlowableOutput {

private final static Logger logger = LoggerFactory.getLogger(ConsoleSchemaFlowableOutput.class);

private volatile CompletableFuture<String> future = new CompletableFuture<>();

private AtomicInteger count = new AtomicInteger(0);

@Override

public Disposable flush(Information information, Flowable<Table> table) throws SchemaExportException {

logger.info("Start Flowable Flush");

Disposable export_flush_complete = table.subscribe(tableIns -> {

System.out.println(printAsciiTable(tableIns));

System.out.println(printAsciiColumns(tableIns.getColumns()));

System.out.println("\r\n");

count.addAndGet(1);

}, throwable -> {

logger.debug("Export Break, reason: " + throwable.getMessage());

future.cancel(true);

throw new SchemaExportException(throwable);

}, new Action() {

@Override

public void run() throws Exception {

logger.debug("Export Complete, Affect Size:"+count.get());

future.complete("OK");

}

});

return export_flush_complete;

}

@Override

public CompletableFuture<String> getFuture() {

return future;

}

...

}

调用方:

    public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException{

long startStamp = System.currentTimeMillis();

// Flowable

Flowable<Table> tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function<Table, Publisher<Table>>() {

@Override

public Publisher<Table> apply(@NonNull Table table) throws Exception {

return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {

@Override

public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {

return Single.just(table.fillColumn(columns));

}

}).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {

@Override

public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {

return Flowable.just(table);

}

});

}

});

Disposable disposable = null;

try {

disposable = out.flush(info, tableFlowable);

CompletableFuture<String> future= out.getFuture();

while(!future.isDone()){

logger.info("[ERE-Flowable]未完成,线程休眠1秒");

Thread.currentThread().sleep(1000,0);

}

String result = future.get();

logger.info("[ERE-Flowable]完成, 结果:"+result);

if(result.equals("OK")){

long finishStamp = System.currentTimeMillis();

clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: "+(finishStamp-startStamp));

}

}catch (Exception e){

clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: "+e.getMessage());

}

}

private void clearHander(Disposable disposable, String reason){

logger.info(reason);

if(null!=disposable && !disposable.isDisposed()) {

disposable.dispose();

}else{

if(null != disposable) {

logger.info("[CH]disposable status:" + disposable.isDisposed());

}else{

logger.info("[CH]disposable is null:");

}

}

// 结束后的回调,执行一些清理工作

completeHandler.apply();

}

以上是 RxJava的Observable订阅时如何作到在onComplete被调用时取消订阅? 的全部内容, 来源链接: utcz.com/p/945408.html

回到顶部