Aggregating REST and Real-Time Data Sources

by Brad Johnson, on May 31, 2019 7:57:00 AM

Aggregating data from heterogeneous REST APIs and streaming sources can be a pain. In order to achieve real-time insights or visualizations, developers need to efficiently combine REST and streaming data sources. But streaming data is created continuously and storing high volumes of raw stream logs prior to processing requires significant storage and bandwidth resources. Furthermore, just getting data into a database doesn’t help you build an application and it certainly doesn’t help if you’re aiming for real-time performance. So what’s the fastest way to integrate multiple heterogeneous data sources so that developers can aggregate and perform real-time transformations on the new combined data streams?

Standardizing Data Streams Using Web Agents

The first step is to standardize heterogeneous data feeds into a unified format. One way to do this, which has the added benefit of implicit state management, is to use the open source Swim platform. Swim uses stateful Web Agents to provide a uniform way to handle data. Using Web Agents, streaming data and stateless REST API feeds are both converted into stateful WARP data streams. Web Agents can then subscribe to peers to create aggregations. These aggregator Web Agents can be treated by application developers as a single, new data source and consumed via a unique API.

Here’s an example of a Swim Web Agent:

// swim/basic/ package swim.basic;  import swim.api.agent.AbstractAgent;  public class UnitAgent extends AbstractAgent {   private void logMessage(Object msg) {     System.out.println(nodeUri() + ": " + msg);   } }

Building Ingress Bridges from REST APIs or Data Streams

In order to get any kind of data to a Web Agent, you need to setup an ingress bridge. Ingress bridges can either push data into Swim or pull data from an external data source, such as a database. Below, we’ll cover building ingress bridges that push data into Swim, but you can learn about building bridges which pull from databases or other sources here.

Writes to a Swim server are most easily accomplished using a Swim client instance, but doing so requires one of the following:

  1. The data source itself is written in either Java or Javascript (currently the only two languages that support Swim clients)
  2. The data source pushes messages, using any networking protocol of your choice, to a different Java/Javascript process, which then uses a Swim client to relay data to the Swim server

Note that the second is simply the first with an intermediary process. Either way, the process that talks directly to the Swim server updates the server by either sending commands or writing to downlinks.

Here’s how to setup a push ingress bridge using Swim:

// swim/basic/ public class SwimWriter {   // example usage:   //   new SwimWriter()   //      .generateOnce("warp://localhost:9001", "/unit/foo", "publish",   //        Text.from("PushOption1"));   // Note that this `SwimWriter` wrapper class is mostly just pedantic; nothing   // wrong with directly operating with `ClientRuntime` instances    private final ClientRuntime swim;    public SwimWriter() {     this.swim = new ClientRuntime();     this.swim.start();   }    public void generateOnce(String host, String node, String lane, Value v) {     this.swim.command(host, node, lane, v);   } }Screen Shot 2019-05-30 at 10.19.34 AM

Building Ingress Bridges from Websockets

You can also create an ingress bridge using websockets. Because WARP is built on top of websockets, sending the right websocket messages in the right order, even without a proper Swim handle, can trigger actions on a Swim server. Downlinks are near-impossible instantiate in this manner, but sending commands is very simple (by design, because this is how we want non-Swim clients to write to Swim).

Commanding a lane without WARP just requires two steps:

  1. Open a websocket connection to the desired server's hostUri
  2. Write a string of the form @command(node:"%n",lane:"%l")%p through this connection, where %n is the desired nodeUri , %l is the laneUri , and %p is the payload
Consequently, this kind of ingress bridge can be written in any language that supports websockets. For example, here's what it looks like Python:

# # Prereq: install websocket-client:  from websocket import create_connection  ws = create_connection('ws://localhost:9001')  # all parameters are strings def generate_once(host, node, lane, v):   message = '@command(node:{},lane:{}){}'.format(node, lane, v)   # equivalent old-school syntax:   #   message = '@command(node:%s,lane:%s)%s' % (node, lane v)   ws.send(message)

Creating Aggregations using Web Agents

Fundamentally, the way to create aggregations between Web Agents is by using downlinks. Using downlinks, you can create aggregator Web Agents which subscribe to multiple data sources. This method works regardless of whether the original source was a REST API, Kafka instance, Kubernetes pod, or data stream.

Here’s how to create a downlink between Web Agents:

Screen Shot 2019-05-30 at 10.25.10 AM

Client-side, downlinks must be issued from a ClientRuntime , but the builder syntax is otherwise identical:

Screen Shot 2019-05-30 at 10.27.05 AM

Learn More

Hopefully that was a useful introduction for creating real-time aggregations using the open source Swim platform. You can get started with Swim here or check us out on GitHub.

Topics:SWIM AIdistributed computingHTTPweb applicationsswimOSmiddlewareRESTWARPstreamingpythonweb agentsapistreaming api