Social Media Feeds with Akka Streams

von Andreas Hartmann
Tags: Akka, Scala

The Akka Social Stream library can be used to aggregate a stream of status updates from multiple social media sources. This article introduces the idea behind the library and explains how to use it in your own project.

Social feeds with Akka Social Stream

Motivation

The BeCompany team implemented the Akka Social Stream library in the course of the migration of the BeCompany website to the Play framework. Originally, we had been using the jQuery lifestream plugin. Unfortunately, our social media feed suffered from frequent and persistent outages of third-party feed generation services required by the plugin, so we decided to eliminate the dependency on external applications.

Why Akka streams?

As explained in a previous blog post, Akka streams are a general-purpose tool for processing tasks involving complex flows with multiple data sources and/or sinks. They are particularly easy to integrate in Play applications, for instance as the source for chunked HTTP responses, server-sent events (check out this blog post for an example) or WebSocket messages. The Akka HTTP library provides an equally suitable web server technology for integrating Akka streams.

How it works

The Akka Social Stream library fetches status updates from a set of social network accounts. At the time of writing only GitHub organizations and Twitter accounts are supported. The status updates are obtained either via periodic polling (for GitHub events), or via streaming requests (for Twitter tweets). The status messages are merged into a single Akka stream. Client instances can subscribe to the stream using the Feed.subscribe method.

Getting started

The source code for the example project is available on github.

Unfortunately, before you start you have to register your own application on the Twitter website and generate an access token and secret. The consumer key, consumer secret, access token and access token secret have to be copied to src/main/resources/application.conf.

Now you can build the project and start the web server in the sbt console:

$ sbt
> re-start

Point your browser to http://localhost:8080. Initially, there should be a list of 5 status messages; when you wait a while, new status messages should appear on the top of the list. GitHub accounts are polled every 5 minutes; Twitter tweets should appear virtually immediately.

2017-01-18 09:03:55 - Google on GitHub - rmad17 starred repository google/leveldb

2017-01-18 09:03:41 - ASF on GitHub - yucang52555 forked repository apache/james-project

2017-01-18 09:04:34 - Google on Twitter - @Google @gmail whenever I try to send an attachment with a text from gmail it only sends the attachment not the text plz help#google #gmail

2017-01-18 09:03:38 - ASF on GitHub - ariestse9 starred repository apache/commons-beanutils

2017-01-18 09:03:31 - ASF on GitHub - asfbot commented on issue 2333 in repository apache/kafka

2017-01-18 09:03:29 - Google on Twitter - RT @Google: Want to know who got the W? Google Search can tell you the score. https://t.co/3gv9NGNv7h https://t.co/PbsTbnq3sN

2017-01-18 09:02:37 - ASF on GitHub - polarisagit starred repository apache/kafka

2017-01-18 09:02:29 - ASF on GitHub - asfbot commented on issue 2333 in repository apache/kafka

Architecture

The example project uses Akka HTTP as the server technology. Akka HTTP provides methods for facilitating Akka streams when processing HTTP requests. We use the WebSocket support of Akka HTTP to generate a continuous stream of status messages. Each social network status message is emitted as a web socket frame. On the client side, a simple JavaScript subscribes to the WebSocket endpoint and renders the incoming messages on the HTML page.

The social network feed

We use a dedicated Scala object called ExampleFeed to declare our social network feed:

object ExampleFeed {

  import WebServer.system
  private implicit val executionContext = system.dispatcher

  private val dateFormatter = DateTimeFormatter.
    ofPattern("yyyy-MM-dd HH:mm:ss").
    withZone(ZoneId.systemDefault())

The statusToMessage method converts a social network status to a simple WebSocket text message. A status can either be a success or a failure; in the latter case an error message is emitted.

  private def statusToMessage: (StatusUpdate[String]) => Message = {
    case (network, date, tryStatus) =>
      val msg = tryStatus match {
        case Success(status) => status.html.toString
        case Failure(e) => s"Error: ${e.getMessage}"
      }
      TextMessage(s"${dateFormatter.format(date)} - $network - $msg")
  }

We declare a feed which aggregates the Twitter and GitHub feeds of Google and the Apache Software Foundation. The parameter 5 denotes the number of past status messages to show when the stream is started.

  private val feed = Feed(
    "Google on GitHub" -> new GithubFeed("google"),
    "Google on Twitter" -> new TwitterFeed("Google"),
    "ASF on GitHub" -> new GithubFeed("apache"),
    "ASF on Twitter" -> new TwitterFeed("TheASF")
  )(5)

From the feed we generate an Akka stream of status messages, consisting of the past 5 messages and all future messages. Each WebSocket connection subscribes to the feed using the Feed.subscribe method. Each status message is converted to a WebSocket text message.

  def feedSource =
    feed.subscribe.map(statusToMessage)

WebSocket endpoint

In the route configuration of our web server, we declare an endpoint for the URL path /feed. The extractUpgradeToWebSocket directive ensures that the current request is a valid WebSocket request and extracts the UpgradeToWebsocket object which we an use to attach data streams to the inbound and outbound connections of the WebSocket.

The WebSocket endpoint is represented a Flow which accepts messages from the client (in Akka stream terminology, this is called a Sink) and emits messages to the client (this is called a Source). Since we don't want to process any inbound messages, we can use Sink.ignore as the inbound processing stage.

object WebServer {

  

    val route =
      
      path("feed") {
        get {
          extractUpgradeToWebSocket { upgrade =>
            complete(upgrade.handleMessagesWithSinkSource(
              Sink.ignore, ExampleFeed.feedSource))
          }
        }
      }
      
  }

}

Client

We implement a simple JavaScript program to render the status updates in the HTML page. The script opens a WebSocket connection to the path /feed.

  • The onopen event handler removes the "Loading …" indicator.
  • The onmessage event handler prepends a <p> element to the #feed container element for each incoming message. The status message contains HTML markup; this markup is appended to the <p> element.
  • The onclose event handler is invoked when the WebSocket connection is closed; it attempts to reconnect after 1 second.
$(function() {

  var url = "ws://" + location.host + "/feed";
  var $loading = $("#loading");
  var $container = $("#feed");

  function setupFeed() {
    var socket = new WebSocket(url);

    socket.onopen = function(event) {
      $loading.remove();
    }

    socket.onmessage = function(event) {
      var p = $("<p>").html(event.data);
      $container.prepend(p);
    }

    socket.onclose = function() {
      setTimeout(setupFeed, 1000);
    };

  }

  setupFeed();

});

Next steps

Now that we have established the basic application infrastructure, various improvements come to mind. Apart from the obvious optimization of the look and feel, you might want to consider the following:

  1. One limitation of the streaming approach, particularly when periodic polling is used, is that the messages are not necessarily emitted in the correct order. When a message is processed, it is not possible to anticipate whether another message with an earlier creation date (typically from a different social network) will be encountered later on. Therefore it is recommended to sort the status messages by date on the client side.

  2. The Akka Social Stream library uses Twitter's streaming API to generate the stream of tweets. Currently, it doesn't handle connection failures yet, so this has to be taken care in the client application. The PHP-based Phirehose project, a library specifically designed to consume the Twitter streaming API, could serve as an inspiration. Pull requests for this issue (or for any other issue for that matter) are very welcome.

  3. For the BeCompany website, we decided to use Scala.js to implement the client-side application. One major advantage is that the status message model code can be shared between server and client, which ensures code compatibility in the face of refactorings and redesigns. I highly recommend to consider Scala.js for your client-side development.

Conclusion

Thank you for reading; and good luck with setting up your own social media feed. Please feel free to leave your remarks in the comments section of this page. If you find any problems with the software, it would be appreciated if you could file an issue in the Akka Social Stream issue tracker – thanks a lot in advance!

If you are designing, implementing or extending your own application and are interested in professional support, please don't hesitate to contact me.

  1. Akka Social Stream library on GitHub
  2. Akka streams documentation
  3. Akka HTTP documentation