Reactive Streams example - Java 9

JEP 266basically answers the need for an interoperable publish-subscribe framework that sought to standardize reactive streams solutions for the JVM. This way library implementations now have a standard that warrants compliance for interoperability and a common base to derive from, the JVM.

This post highlight a comprehensive example that underscores how the abstractions in the standard cooperate to create reactive stream solutions. the complete source code is available on github.

Time for action

The basic idea is to make use of Reactive Streams to stream tweets from a Twitter search. For that will use the well known Twitter4J library.

First off, we need to implement a Publisher and a Subscriber.

TweetSubscriber.java

The Subscriber should implement the Flow.Subscriber interface and override three main methods:

  • onComplete() which gets called when there is no more items are to be expected
  • onError(Throwable) which is a separate channel for errors that you should listen on for errors.
  • onNext(T) which is used to retrieve the next item(s).
  • Before any of those things happen, though, the publisher calls onSubscription(Subscription).
public class TweetSubscriber implements Flow.Subscriber<Status> {

    private final String id = UUID.randomUUID().toString();
    private Flow.Subscription subscription;
    private static final int SUB_REQ=5;
    private static final int SLEEP=1000;
    @Inject
    private Logger logger ;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        logger.info(  "subscriber: "+ id +" ==> Subscribed");
        this.subscription = subscription;
        subscription.request(SUB_REQ);
    }

    @Override
    public void onNext(Status status) {
        try {
            Thread.sleep(SLEEP);
        } catch (InterruptedException e) {
           logger.log(Level.SEVERE,"An error has occured: {}", e);
        }
        Tweet t = new Tweet(status.getUser().getScreenName(), status.getText(), status.getCreatedAt());
        logger.info(t.toString());
        subscription.request(SUB_REQ);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.log(Level.SEVERE, "An error occured", throwable);

    }

    @Override
    public void onComplete() {
        logger.info("Done!");

    }
}

This is pretty much self-explanatory. The interesting part is the onNext method, where I instantiate a tweet object and print it. The 1s sleep is to simulate some very expensive computing (and for readability purposes).

TweetPublisher

Let's take a look at our publisher before exploring it piece by piece:

public class TweetPublisher implements Flow.Publisher {
    private static final int CORE_POOL_SIZE = 1;
    private static final int NB_THREADS = 1;
    private static final int INITIAL_DELAY = 1;
    private static final int DELAY = 5;
    @Inject
    @Property("consumerKey")
    private String _consumerKey;

    @Inject
    @Property("consumerSecret")
    private String _consumerSecret;

    @Inject
    @Property("accessToken")
    private String _accessToken;

    @Inject
    @Property("accessTokenSecret")
    private String _accesessTokenSecret;

    @Inject
    @Property("query")
    private String query;

    @Inject
    private Logger logger;

    private  Query twitterQuery;
    private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(NB_THREADS);
    private Twitter twitter;
    private SubmissionPublisher<Status> sb = new SubmissionPublisher<Status>(EXECUTOR, Flow.defaultBufferSize());

    private Map<Long, Object> tweetCache = new HashMap<>();

    @PostConstruct
    public void setup() {
        twitterQuery = new Query(query);
        twitterQuery.setResultType(Query.ResultType.mixed);
        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(true).setOAuthConsumerKey(_consumerKey)
                .setOAuthConsumerSecret(_consumerSecret)
                .setOAuthAccessToken(_accessToken)
                .setOAuthAccessTokenSecret(_accesessTokenSecret);
        TwitterFactory tf = new TwitterFactory(cb.build());
        twitter = tf.getInstance();
    }

    public void getTweets() throws Exception{
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
        Runnable tweets = () -> {
            try {
                twitter.search(twitterQuery).getTweets()
                        .stream()
                        .filter(status -> !tweetCache.containsKey(status.getId()))
                        .forEach(status -> {
                            tweetCache.put(status.getId(), null);
                            sb.submit(status);
                        });
            } catch (TwitterException e) {
                logger.log(Level.SEVERE, "AN error occured while fetching tweets");
                // close stream
                sb.closeExceptionally(e);

            }
        };
        executor.scheduleWithFixedDelay(tweets, INITIAL_DELAY, DELAY, TimeUnit.SECONDS);
    }

    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        sb.subscribe(subscriber);
    }
}

Our publisher make use of SubmissionPublisher. The SubmissionPublisher class uses the supplied Executor, which delivers to the Subscriber. We are free to use any Executor depending upon the scenario in which we are working on. In our case we've created SubmissionPublisher with a fixed thread pool of 1.

The setup method configure and instantiate out Twitter and Query instance.

Next, I define a Scheduler to run every 5 seconds a search on Twitter for the query. tweetCache Map plays the role of a cache to prevent from printing same tweet twice.

The subscribe method takes a Flow.Subscriber and subscribes it to the SubmissionPublisher.

Final result

Here's our very basic reactive stream example in action

Ton run it locally, you need to create a twitter app and add the secrets in application.properties file.


Huge thanks to community rockstart Faissal Boutaounte for his help.