Streaming data, consisting of indefinitely evolving sequences, are becoming ubiquitous in many branches of science and in various applications. Computer scientists have developed streaming applications such as Storm and the S4 distributed stream computing platform
Streaming data, consisting of indefinitely and possibly time-evolving sequences, are becoming ubiquitous in many branches of science (Chu et al. 2007; Michalak et al. 2012). The omnipresence of streaming data poses new challenges for statistics and machine learning. To enable user friendly development and evaluation of algorithms dealing with data streams this paper introduces RStorm.
Streaming learning algorithms can informally be described as algorithms
which never “look back” to earlier data arriving at
Computer scientists recently developed a series of software packages for the streaming processing of data in production environments. Frameworks such as S4 by Yahoo! (Gopalakrishna et al. 2013), and Twitter’s Storm (Storm User Group 2013) provide an infrastructure for real-time streaming computation of event-driven data (e.g., Babcock et al. 2002; Anagnostopoulos et al. 2012) which is scalable and reliable.
Recently, efforts have been made to facilitate easy testing and
development of streaming processes within R for example with the
stream. stream allows
users of R to setup (or simulate) a data stream and specify data stream
tasks to analyze the stream (Hahsler et al. 2014). While stream allows for
the development and testing of streaming analysis in R, it does not have
a strong link to current production environments in which streams can be
utilized. Implementations of data streams in R analogous to production
environments such as Twitter’s Storm are currently lacking. RStorm
models the topology structure introduced by Storm
In this section RStorm is introduced using the canonical streaming
example used often for the introduction of Storm: a streaming word
count. For RStorm the basic terminology and concepts from Storm
The topology is a description of the whole streaming process, and a
solution to the word-count problem is given by the simple topology that
is graphically presented in Figure 1. This
topology describes that sentences (tuples) are emitted by the spout.
These tuples – containing a full sentence – are analyzed by the first
processing bolt. This first bolt, SplitSentence(tuple)
, splits a
sentence up into individual words and emits these single words as
tuples. Next, these individual words are counted by the
CountWords(tuple)
bolt. The topology depicted in
Figure 1 contains the core elements needed to
understand the functioning of RStorm for a general streaming process.
A topology consists of a description of the ordering of spouts and bolts
in a stream. Tuples are the main data format to pass information between
bolts. A call to Emit(tuple, ...)
within a bolt will make the emitted
tuple available for other bolts. Table 1 summarizes the most
important functions of the RStorm package to facilitate a stream and
briefly explains their functionality.
Function | Description |
---|---|
Bolt(FUNC, listen = 0, ...) |
Used to create a new bolt. A bolt consists of an R function, and a specification of the bolt / spouts from which it receives tuples. The listen argument is used to indicate the order of bolts. |
Emit(x, ...) |
Used to emit tuples from inside a bolt. |
RStorm(topology, ...) |
Used to run a stream once a full topology has been specified. |
GetHash(name, ...) |
Used to retrieve, inside a bolt, values from a hashmap. |
SetHash(name, data) |
Used to store, inside a bolt, values in a hashmap. |
Topology(spout, ...) |
Used to create a topology by specifying the datasource (a data.frame ) as the first spout. |
AddBolt(topology, bolt, ...) |
Used to add a bolt to a stream. Once a bolt is added it receives an ID, which can be used in subsequent specification of bolts (listen=ID ) to determine the order of the stream. |
Tuple(x, ...) |
A single row data.frame . Used as the primary data format to be passed along the stream. |
In RStorm the emulation of a streaming word count can be setup as follows: First, one loads RStorm and opens a datafile containing multiple sentences:
library(RStorm) # Include package RStorm
data(sentences)
The data, which is a data.frame
, will function as the spout by
emitting data from it row-by-row. After defining the spout, the
functional bolts need to be specified. Table 2 presents
both the RStorm as well as the Storm implementation of the first
processing bolt. The Storm implementation is done partly in Java and
partly in Python. For the RStorm implementation the full functional
code is provided, while for the Storm implementation a number of details
are omitted. However, it is easy to see how an actual Storm
implementation maps to implementations in RStorm.
RStorm | Java |
---|---|
|
|
|
|
RStorm
# R function that receives a tuple
# (a sentence in this case)
# and splits it into words:
SplitSentence <- function(tuple, ...)
{
# Split the sentence into words
words <- unlist(
strsplit(as.character(
tuple$sentence), " "
))
# For each word emit a tuple
for (word in words)
Emit(Tuple(
data.frame(word = word)),
...)
}
Java
/**
* A Java function which makes
* a call from the topology
* to an external Python script: */
public SplitSentence()
{
super("Python",
"splitsentence.py")
}
/* The Python script (.py) */
import storm
class SplitSentenceBolt
(storm.BasicBolt):
def process(self, tuple)
words =
tuple.values[0].split(" ")
for word in words:
storm.emit([word])
In both cases the SplitSentence()
function receives tuples, each of
which contains a sentence. Each sentence is split into words which are
emitted further down the stream using the Emit()
(or storm.emit()
)
functionCountWords()
bolt, for which the
RStorm code and the analogous Java code are presented in
Table 3.
RStorm | Java |
---|---|
|
|
|
|
RStorm
# R word counting function:
CountWord <- function(tuple, ...) {
# Get the hashmap "word count"
words <- GetHash("wordcount")
if (tuple$word %in% words$word) {
# Increment the word count:
words[words$word == tuple$word,]
$count <-
words[words$word == tuple$word,]
$count + 1
} else { # If the word does not exist
# Add the word with count 1
words <- rbind(words, data.frame(
word = tuple$word, count = 1))
}
# Store the hashmap
SetHash("wordcount", words)
}
Java
/**
* A Java function which stores
* the word count. */
public void
execute(Tuple tuple, ...)
{
/* collect word from the tuple */
String word = tuple.getString(0);
/* get counts from hashmap */
Integer count = counts.get(word);
if (count == null) count = 0;
/* increment counts */
count++;
/* store counts */
counts.put(word, count);
}
The CountWords()
bolt receives tuples containing individual words. The
RStorm implementation first uses the GetHash()
function to get the
entries of a hashmap / local-store called "wordcount"
. In production
systems this often is a hashmap, or, if need be, some kind of database
system. In RStorm this functionality is implemented using GetHash
and SetHash
as methods to easily store and retrieve objects. If the
hashmap exists, the function subsequently checks whether the word is
already in the hashmap. If the word is not found, the new word is added
to the hashmap with a count of
RStorm |
---|
|
Java |
|
After specifying the two bolts the topology needs to be specified. The
topology determines the processing order of the streaming process.
Table 4 presents how this is implemented in RStorm
and Javalisten
argument can be used to specify which emitted tuples a bolt
should receive. Once the topology is fully specified, the stream can be
run using the following call:
# Run the stream:
result <- RStorm(topology)
# Obtain results stored in "wordcount"
counts <- GetHash("wordcount", result)
The function GetHash()
is overloaded for when the stream has finished
and the function is used outside of a Bolt
. It can be used to retrieve
a hashmap once the result of a streaming process is passed to it as a
second argument. The returned counts
object is a data.frame
containing columns of words and their associated counts and can be used
to create a table of word counts.
The word count example shows the direct analogy between the
implementation of a data stream in RStorm and in Storm. However, by
focusing on an implementation that is analogous to the Storm
implementation, a number of desirable R specific properties are lost.
For example, the use of for (word in words) {...}
in the word count
example defies the efficient vectorisation of R, and thus is relatively
slow. In R one would approach the word count problem (non streaming)
differently: e.g.,
table(unlist(strsplit(as.character(sentences$sentence), " ")))
. The
latter is much faster since it uses R properly, but the implementation
in a data stream based on this code is not at all evident. Further note
that while RStorm is modeled specifically after Storm, many other
emergent streaming production packages – such as Yahoo!’s S4 – have a
comparable structure. In all cases, the machinery to setup the stream
can be separated from a number of functional pieces of code that update
a set of parameters. These functional blocks of code are implemented in
the RStorm bolts, and these can, after development in R, easily be
implemented in production environments.
The following section shows a number of streaming examples and demonstrates some of RStorm’s additional features.
This first example compares two bolts for the streaming computation of a
sample variance. It introduces the TrackRow(data)
functionality
implemented in RStorm which can be used to monitor the progress of
parameters at each time point in the stream. Table 5
shows two bolts with competing implementations of streaming variance
algorithms. The first bolt uses the standard Sum of Squares algorithm,
while the second uses Welford’s method (Welford 1962).
Sum of Squares method | Welford’s method |
---|---|
|
|
|
|
Sum of Squares method
var.SS <- function(x, ...) {
# Get values stored in hashmap
params <- GetHash("params1")
if (!is.data.frame(params)) {
# If no hashmap exists initialise:
params <- list()
params$n <- params$sum <-
params$sum2 <- 0
}
# Perform updates:
n <- params$n + 1
S <- params$sum + as.numeric(x[1])
SS <- params$sum2 +
as.numeric(x[1]^2)
# Store the hashmap:
SetHash("params1",
data.frame(n = n, sum = S,
sum2 = SS))
# Track the variance at time t:
var <- 1/(n * (n-1)) * (n * SS - S^2)
TrackRow("var.SS",
data.frame(var = var))
}
Welford’s method
var.Welford <- function(x, ...) {
x <- as.numeric(x[1])
params <- GetHash("params2")
if (!is.data.frame(params)) {
params <- list()
params$M <- params$S <-
params$n <- 0
}
n <- params$n + 1
M <- params$M +
(x - params$M) / (n + 1)
S <- params$S +
(x - params$M) * (x - M)
SetHash("params2",
data.frame(n = n, M = M, S = S))
var <- ifelse(n > 1,
S / (n - 1), 0)
TrackRow("var.Welford",
data.frame(var = var))
}
After specifying the functional bolts, the topology can be specified.
Creating a topology object starts with the specification of a
data.frame
. This dataframe will be iterated through row-by-row to
emulate a steam.
t <- 1000
x <- rnorm(t, 10^8, 1)
topology <- Topology(data.frame(x = x))
The spout defined in the object topology
now contains a dataframe with
a single column x
, which contains
topology <- AddBolt(topology, Bolt(var.SS, listen = 0))
topology <- AddBolt(topology, Bolt(var.Welford, listen = 0))
result <- RStorm(topology)
The TrackRow()
function called within both functional bolts allows for
inspection of the two variances at each point in time: using
TrackRow()
the values are stored for each time point. Using (e.g.,)
GetTrack("var.SS", result)
on the result
object after running the
topology allows for the creation of Figure 2.
This example provides an implementation in RStorm of an logistic
regression using stochastic gradient descent [SGD; e.g.,
Zinkevich et al. (2010)], together with a Double or Nothing [DoNB; Owen and Eckles (2012)]
bootstrap to estimate the uncertainty of the parameters. The functional
bolt first performs the sampling needed for the DoNB bootstrap and
subsequently computes the update of the feature vector
StochasticGradientDescent <- function(tuple, learn = .5, boltID, ...) {
if (rbinom(1, 1, .5) == 1) { # Only add the observation half of the times
# get the set up weights for this bolt
weights <- GetHash(paste("Weights_", boltID, sep = ""))
if (!is.data.frame(weights)) {
weights <- data.frame(beta = c(-1, 2))
}
w <- weights$beta # get weights-vector w
y <- as.double(tuple[1]) # get scalar y
X <- as.double(tuple[2:3]) # get feature-vector X
grad <- (1 / ( 1 + exp(-t(w) %*% X)) - as.double(tuple[1])) * X
SetHash(paste("Weights_", boltID, sep = ""),
data.frame(beta = w - learn * grad)) # save weights
} # otherwise ignore
}
The dataset for this example contains
n <- 1000
X <- matrix(c(rep(1, n), rnorm(n, 0, 1)), ncol = 2)
beta <- c(1, 2)
y <- rbinom(n, 1, plogis(X %*% beta))
The DoNB is implemented by specifying within the functional bolt
whether or not a datapoint in the stream should contribute to the update
of the weights. Using the boltID
parameter the same functional bolt
can be used multiple times in the stream, each with its own local store.
The topology is specified as follows:
topology <- Topology(data.frame(data), .verbose = FALSE)
for (i in 1:100) {
topology <- AddBolt(topology, Bolt(StochasticGradientDescent,
listen = 0, boltID = i), .verbose = FALSE)
}
This topology is represented graphically in
Figure 3. After running the topology, the
GetHashList()
function can be used to retrieve all of the objects
saved using SetHash()
at once. This object is a list containing all
the dataframes that are stored during the stream. It can be used derive
the estimates of glm
:
The last example presents a situation in which streaming data naturally
arises: bandit problems (e.g., Whittle 1980). In the canonical bandit
problem, the two-armed Bernoulli bandit problem, the data stream
consists of rewards
RStorm can be used to compare competing solutions to the
createCounterFactuals <- function(k = 2, t = 100, p.max = .5, epsilon = .1) {
p <- c(p.max, rep(p.max - epsilon, k - 1))
obs <- data.frame(matrix(rbinom(t * k, 1, p), ncol = k, byrow = TRUE))
}
This function creates a dataframe with p.max
, and the other p.max-epsilon
. Here we compare playing the best action
(optimal play – typically unknown) to a policy called Thompson sampling
(Thompson 1933; Scott 2010). Each datapoint
For optimal play, the first bolt emits the reward observed by playing
arm selectRPM
) determines which arm to play given a set of
estimates of the success for each arm and emits the observed reward. The
second bolt (updateRPM
) updates the estimated success of the arm that
was played (using a simple beta-Bernoulli model), and the last bolt
(countRPM
) computes the cumulative reward
Play optimal |
---|
|
Play Thompson |
|
The topology is graphically presented in Figure 4. The topology is initially specified using an empty dataset to enable the setup of multiple simulations:
topology <- Topology(data.frame())
topology <- AddBolt(topology, Bolt(selectMax, listen = 0))
topology <- AddBolt(topology, Bolt(countMax, listen = 1))
topology <- AddBolt(topology, Bolt(selectRPM, listen = 0))
topology <- AddBolt(topology, Bolt(updateRPM, listen = 3))
topology <- AddBolt(topology, Bolt(countRPM, listen = 3))
After specifying the bolts, the ChangeSpout()
function is used to run
the same topology with a different datasource. At each simulation run
the spout is changed, and the regret,
sims <- 100
regret <- rep(NA, sims)
for (i in 1:sims) {
obs <- createCounterFactuals(k = 5, t = 10000, p.max = .5)
topology <- ChangeSpout(topology, obs)
result <- RStorm(topology)
regret[i] <- GetHash("maxSum", result)$sum - GetHash("rpmSum", result)$sum
}
After running p.max = .5
for
Datasets in all areas of science are growing increasingly large, and they are often collected continuously. There is a need for novel analysis methods which synchronize current methodological advances with the emerging opportunities of streaming data. Streaming algorithms provide opportunities to deal with extremely large and ever growing data sets in (near) real time. However, the development of streaming algorithms for complex models is often cumbersome: the software packages that facilitate streaming processing in production environments do not provide statisticians with the simulation, estimation, and plotting tools they are used to. RStorm implements a streaming architecture modeled on Storm for easy development and testing of streaming algorithms in R.
In the future we intend to further develop the RStorm package to include a) default implementations of often occurring bolts (such as streaming means and variances of variables), and b) the ability to use, one-to-one, the bolts developed in RStorm in Storm. Storm provides the ability to write bolts in languages other than Java (for example Python, as demonstrated in the word count example). We hope to further develop RStorm such that true data streams in Storm can use functional bolts developed in R. RStorm is not designed as a scalable tool for production processing of data streams, and we do not believe that this is R’s core strength. However, by providing the ability to test and develop functional bolts in R, and use these bolts directly in production streaming processing applications, RStorm aims to support users of R to quickly implement scalable and fault tolerant streaming applications.
This article is converted from a Legacy LaTeX article using the texor package. The pdf version is the official version. To report a problem with the html, refer to CONTRIBUTE on the R Journal homepage.
Text and figures are licensed under Creative Commons Attribution CC BY 4.0. The figures that have been reused from other sources don't fall under this license and can be recognized by a note in their caption: "Figure from ...".
For attribution, please cite this work as
Kaptein, "RStorm: Developing and Testing Streaming Algorithms in R", The R Journal, 2014
BibTeX citation
@article{RJ-2014-012, author = {Kaptein, Maurits}, title = {RStorm: Developing and Testing Streaming Algorithms in R}, journal = {The R Journal}, year = {2014}, note = {https://rjournal.github.io/}, volume = {6}, issue = {1}, issn = {2073-4859}, pages = {123-132} }