Integrating Facebook with Akka Social Stream

by Javier Puerto
Tags: Open Source , Akka , Scala

In previous posts, we have explained what is the Akka Social Stream project and how to use it. Now we are going to extend the functionality by supporting Facebook page events.

Facebook feeds with Akka Social Stream

What do we want to achieve?

At BeCompany, we publish content in several social networks like Twitter, Facebook or GitHub. In addition to Twitter tweets and GitHub events, we want Akka Social Stream to support Facebook page events.

What should we do?

We need to implement a social provider that consumes the Facebook page events and transforms them to our internal status event format. This way the implementation should be smooth. At first, we look at how the Facebook API works. We just want to get the text from the BeCompany page events. According to the documentation, the service will respond with a list of Events, so we will need to transform these JSON events into a list of Status objects.

Implementing the Facebook support

Creating the client

To read the page events with the Facebook Graph API v2.8, we need to make a request to https://graph.facebook.com/v2.8/{page-id}/events.

  • The page-id parameter can be easily obtained from the page source itself (search for "pageId").

We made a request and got an error 400:

{
  "error": {
    "message": "An access token is required to request this resource.",
    "type": "OAuthException",
    "code": 104,
    "fbtrace_id": "AciaqWt36ti"
  }
}

We will need an application token to perform the request. First we need to create a new Facebook application in the developers center or use an existing one. We already have support for OAuth, but Facebook offers another alterative that does not require us to deal with OAuth by just concatenating the application ID and secret. We end up with a URL like:

{baseUrl}/v2.8/{PageId}/events?limit={EventsLimit}&access_token={AppID}|{AppSecret}

Implementing the client should be easy:

object FacebookClient extends HttpClient with FacebookJsonSupport with CachingSupport with LazyLogging {

  private val accessToken = ConfigFactory.load.getConfig(s"scalaSocialFeed.facebook")

  private val baseUrl = Uri("https://graph.facebook.com")
  private val graphVersion = "v2.8"

  private implicit val handler = new UnmarshallingHttpHandler[List[(Instant, Status)]]()

  def posts(pageId: String)(implicit ec: ExecutionContext): Future[List[(Instant, Status)]] = {
    logger.debug("Requesting latest updates from Facebook page. [pageId={}]", pageId)
    val header = scala.collection.immutable.Seq(headers.`Accept`(MediaTypes.`application/json`))
    req[List[(Instant, Status)]](HttpRequest(uri = getPageFeedUri(pageId), headers = header))
      .recover {
        case e => logger.error("Error building the Facebook status.", e); List.empty
      }
  }
}

Here we define a new method posts that accepts the pageId as parameter. The access details should be included in the application configuration file. The client just defines an implicit Unmarshaller to produce the desired list of status messages, in the case of an error this is logged and an empty list is returned instead. The implementation of the method getPageFeedUri(pageId) has been removed as an exercise for the reader.

Unmarshalling the response into objects

For the task we need to know how the JSON support works with Akka and the format of the incoming events. We only need the message text and the publication date so we see that the information we need is structured as follows:

{
  data: [
    {message: "interesting text", created_time: ... }
  ],
  page: {}
}

Our FacebookJsonSupport trait will then define an implicit object that will extends from RootJsonReader, that means that we will only be able to unmarshal.

trait FacebookJsonSupport extends SprayJsonSupport with DefaultJsonProtocol {

  implicit object FacebookJsonFormat extends RootJsonReader[List[(Instant, Status)]] {
    override def read(json: JsValue): List[(Instant, Status)] =
      json.asJsObject.getFields("data") match {
        case posts: Seq[JsValue] => posts(0) match {
          case JsArray(array) =>
            array.map( post => {
              post.asJsObject.getFields("message", "created_time") match {
                case Seq(JsString(message), JsString(createdTime)) =>
                  Some(
                    (Instant.parse(createdTime.split("\\+")(0)+"Z"),
                      Status(User("Unknown", "Unknown"), span(message)))
                  )
                case _ => Option.empty
              }
            }).flatten.toList

          case _ => throw new DeserializationException("Invalid JSON.")

        }
        case _ => throw new DeserializationException("Invalid JSON.")
      }
  }

}

I have found the JSON read a bit cumbersome, maybe any of the readers found a better way to transform the Facebook events into a list of Statuses. Please leave your comments!

Implementing FacebookFeed

Akka Social Stream currently supports polling feeds out of the box. For the task we need to include the trait PollingSocialFeed and implement the method to obtain a list of event and the minimum polling interval that depends on the social provider.

class FacebookFeed(pageId: String, userUpdateInterval: FiniteDuration = 1 minute)(implicit ec: ExecutionContext)
  extends PollingSocialFeed
{

  private val updateIntervalMin: FiniteDuration = 1 minute
  override val updateInterval = updateIntervalMin.max(userUpdateInterval)

  /**
    * Returns the latest `num` social media status messages.
    *
    * @param num The number of previous status messages to prepend to the stream.
    * @return A list of status messages.
    */
  override def latest(num: Int): Future[List[(Instant, Status)]] = FacebookClient.posts(pageId)

}

Conclusions

You can start using the Facebook support upgrading your Akka Social Stream to the latest stable version. You can see a demonstration on the BeCompany Homepage.