我们如何在Java 9中实现SubmissionPublisher类?

从Java 9开始,我们可以通过引入四个核心接口(Publisher,Subscriber,Subscription,Processor和一个具体的类)来创建R eactive Streams:实现Publisher 接口的SubmissionPublisher 。每个接口都扮演着不同的角色,这与响应 流的原理相对应。我们可以使用SubmissionPublisher 类的Submit()方法将提供的项目发布给每个订阅者。

语法

public class SubmissionPublisher<T> extends Object implements Flow.Publisher<T>, AutoCloseable

在下面的示例中,我们可以实现SubmissionPublisher类

示例

import java.util.concurrent.Flow.Subscriber;

import java.util.concurrent.Flow.Subscription;

import java.util.concurrent.SubmissionPublisher;

class MySubscriber<T> implements Subscriber<T> {

   private Subscription subscription;

   private String name;

   public MySubscriber(String name) {

      this.name = name;

   }

   @Override   public void onComplete() {

      System.out.println(name + ": onComplete");

   }

   @Override   public void onError(Throwable t) {

      System.out.println(name + ": onError");

      t.printStackTrace();

   }

   @Override   public void onNext(T msg) {

      System.out.println(name + ": " + msg.toString() + " received in onNext");

      subscription.request(1);

   }

   @Override   public void onSubscribe(Subscription subscription) {

      System.out.println(name + ": onSubscribe");

      this.subscription = subscription;

      subscription.request(1);

   }

}// Main classpublic class FlowTest {

   public static void main(String args[]) {

      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

      MySubscriber<String> subscriber = new MySubscriber<>("Mine");

      MySubscriber<String> subscriberYours = new MySubscriber<>("Yours");

      MySubscriber<String> subscriberHis = new MySubscriber<>("His");

      MySubscriber<String> subscriberHers = new MySubscriber<>("Her");

      publisher.subscribe(subscriber);

      publisher.subscribe(subscriberYours);

      publisher.subscribe(subscriberHis);

      publisher.subscribe(subscriberHers);

      publisher.submit("One");

      publisher.submit("Two");

      publisher.submit("Three");

      publisher.submit("Four");

      publisher.submit("Five");

      try {

         Thread.sleep(1000);

      } catch(InterruptedException e) {

         e.printStackTrace();

      }

      publisher.close();

   }

}

输出结果

Yours: onSubscribe

His: onSubscribe

Mine: onSubscribe

His: One received in onNext

Yours: One received in onNext

Mine: One received in onNext

Yours: Two received in onNext

His: Two received in onNext

Yours: Three received in onNext

Mine: Two received in onNext

Yours: Four received in onNext

His: Three received in onNext

Yours: Five received in onNext

Mine: Three received in onNext

Her: onSubscribe

His: Four received in onNext

Her: One received in onNext

Mine: Four received in onNext

Her: Two received in onNext

His: Five received in onNext

Her: Three received in onNext

Mine: Five received in onNext

Her: Four received in onNext

Her: Five received in onNext

Yours: onComplete

His: onComplete

Mine: onComplete

Her: onComplete

以上是 我们如何在Java 9中实现SubmissionPublisher类? 的全部内容, 来源链接: utcz.com/z/356730.html

回到顶部