Reactive Streams example - Java 9
JEP 266basically answers the need for an interoperable publish-subscribe framework that sought to standardize reactive streams solutions for the JVM. This way library implementations now have a standard that warrants compliance for interoperability and a common base to derive from, the JVM.
This post highlight a comprehensive example that underscores how the abstractions in the standard cooperate to create reactive stream solutions. the complete source code is available on github.
Time for action
The basic idea is to make use of Reactive Streams to stream tweets from a Twitter search. For that will use the well known Twitter4J library.
First off, we need to implement a Publisher and a Subscriber.
TweetSubscriber.java
The Subscriber should implement the Flow.Subscriber interface and override three main methods:
onComplete()
which gets called when there is no more items are to be expectedonError(Throwable)
which is a separate channel for errors that you should listen on for errors.onNext(T)
which is used to retrieve the next item(s).- Before any of those things happen, though, the publisher calls
onSubscription(Subscription)
.
public class TweetSubscriber implements Flow.Subscriber<Status> {
private final String id = UUID.randomUUID().toString();
private Flow.Subscription subscription;
private static final int SUB_REQ=5;
private static final int SLEEP=1000;
@Inject
private Logger logger ;
@Override
public void onSubscribe(Flow.Subscription subscription) {
logger.info( "subscriber: "+ id +" ==> Subscribed");
this.subscription = subscription;
subscription.request(SUB_REQ);
}
@Override
public void onNext(Status status) {
try {
Thread.sleep(SLEEP);
} catch (InterruptedException e) {
logger.log(Level.SEVERE,"An error has occured: {}", e);
}
Tweet t = new Tweet(status.getUser().getScreenName(), status.getText(), status.getCreatedAt());
logger.info(t.toString());
subscription.request(SUB_REQ);
}
@Override
public void onError(Throwable throwable) {
logger.log(Level.SEVERE, "An error occured", throwable);
}
@Override
public void onComplete() {
logger.info("Done!");
}
}
This is pretty much self-explanatory. The interesting part is the onNext
method, where I instantiate a tweet object and print it. The 1s sleep is to simulate some very expensive computing (and for readability purposes).
TweetPublisher
Let's take a look at our publisher before exploring it piece by piece:
public class TweetPublisher implements Flow.Publisher {
private static final int CORE_POOL_SIZE = 1;
private static final int NB_THREADS = 1;
private static final int INITIAL_DELAY = 1;
private static final int DELAY = 5;
@Inject
@Property("consumerKey")
private String _consumerKey;
@Inject
@Property("consumerSecret")
private String _consumerSecret;
@Inject
@Property("accessToken")
private String _accessToken;
@Inject
@Property("accessTokenSecret")
private String _accesessTokenSecret;
@Inject
@Property("query")
private String query;
@Inject
private Logger logger;
private Query twitterQuery;
private final ExecutorService EXECUTOR = Executors.newFixedThreadPool(NB_THREADS);
private Twitter twitter;
private SubmissionPublisher<Status> sb = new SubmissionPublisher<Status>(EXECUTOR, Flow.defaultBufferSize());
private Map<Long, Object> tweetCache = new HashMap<>();
@PostConstruct
public void setup() {
twitterQuery = new Query(query);
twitterQuery.setResultType(Query.ResultType.mixed);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true).setOAuthConsumerKey(_consumerKey)
.setOAuthConsumerSecret(_consumerSecret)
.setOAuthAccessToken(_accessToken)
.setOAuthAccessTokenSecret(_accesessTokenSecret);
TwitterFactory tf = new TwitterFactory(cb.build());
twitter = tf.getInstance();
}
public void getTweets() throws Exception{
ScheduledExecutorService executor = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
Runnable tweets = () -> {
try {
twitter.search(twitterQuery).getTweets()
.stream()
.filter(status -> !tweetCache.containsKey(status.getId()))
.forEach(status -> {
tweetCache.put(status.getId(), null);
sb.submit(status);
});
} catch (TwitterException e) {
logger.log(Level.SEVERE, "AN error occured while fetching tweets");
// close stream
sb.closeExceptionally(e);
}
};
executor.scheduleWithFixedDelay(tweets, INITIAL_DELAY, DELAY, TimeUnit.SECONDS);
}
@Override
public void subscribe(Flow.Subscriber subscriber) {
sb.subscribe(subscriber);
}
}
Our publisher make use of SubmissionPublisher
. The SubmissionPublisher
class uses the supplied Executor, which delivers to the Subscriber. We are free to use any Executor depending upon the scenario in which we are working on. In our case we've created SubmissionPublisher
with a fixed thread pool of 1.
The setup
method configure and instantiate out Twitter
and Query
instance.
Next, I define a Scheduler to run every 5 seconds a search on Twitter for the query. tweetCache
Map plays the role of a cache to prevent from printing same tweet twice.
The subscribe
method takes a Flow.Subscriber
and subscribes it to the SubmissionPublisher
.
Final result
Here's our very basic reactive stream example in action
Ton run it locally, you need to create a twitter app and add the secrets in application.properties
file.
Huge thanks to community rockstart Faissal Boutaounte for his help.