\documentclass[11pt]{article}

\setlength{\oddsidemargin}{0.0in}
\setlength{\evensidemargin}{0.0in}
\setlength{\topmargin}{-0.25in}
\setlength{\headheight}{0in}
\setlength{\headsep}{0in}
\setlength{\textwidth}{6.5in}
\setlength{\textheight}{9.25in}
\setlength{\parindent}{0in}
\setlength{\parskip}{2mm} 

% \usepackage{times}
\usepackage{graphicx}

% library(knitr)
%\VignetteIndexEntry{Partools}

\title{{\bf partools: a Sensible Package for Large Data Sets}}

\author{Norm Matloff \\
University of California, Davis} 

\begin{document}

\maketitle

The {\bf partools} package is a general framework for parallel
processing of large data sets and/or highly-computational algorithms.
It is the ``un-MapReduce,'' avoiding that paradigm while retaining the
philosophy of highly-distributed memory objects and files of Hadoop and
Spark.

\section{Motivation}

With the advent of Big Data, the Hadoop framework became ubiquitous in
the years following its birth in 2006.  Yet it was clear from the start
that Hadoop had major shortcomings in performance, and eventually these
came under serious discussion\footnote{See for example ``The Hadoop
Honeymoon is Over,'' {\it
https://www.linkedin.com/pulse/hadoop-honeymoon-over-martyn-jones}} This
resulted in a new platform, Spark, gaining popularity.  As with Hadoop,
there is an R interface available for Spark, named SparkR. 

Spark overcomes one of Hadoop's major problems, which is the lack of
ability to cache data in a multi-pass computation.  However, Spark
unfortunately retains the drawbacks of Hadoop:

\begin{itemize}

\item Spark still relies on a MapReduce paradigm, featuring a {\it
shuffle} (systemwide sort) operation after each pass of the data.  This
can be quite slow.\footnote{See ``Low-Rank Matrix Factorizations
at Scale: Spark for Scientific Data Analytics,'' Alex Gittens, MMDS 2016
in the C context, and ``Size of Datasets for Analytics and Implications
for R,'' Szilard Pafka, useR! 2016, in the R context.}

\item Thus SparkR can be considerably slower than Plain Old R (POR).

\item Hadoop/Spark have a complex, rather opaque infrastructure, and
rely on Java/Scala.  This makes them difficult to install, configure and
use for those who are not computer systems experts. 

\item Although a major plus for Hadoop/Spark is fault tolerance, it is
needed only for users working on extremely large clusters, consisting of
hundreds or thousands of nodes. Disk failure rates are simply too low
for fault tolerance to be an issue for many users who think they need
Hadoop/Spark but who do not have such large
systems.\footnote{https://wiki.apache.org/hadoop/PoweredBy} 

\end{itemize}

The one firm advantage of Hadoop/Spark is their use of distributed file
systems.  Under the philosophy, ``Move the computation to the data,
rather than {\it vice versa},'' network traffic may be greatly reduced,
thus speeding up computation.  

In addition, this approach helps deal with the fact that Big Data sets
may not fit into the memory of a single machine.  (This aspect is too
often overlooked in discussions of parallel/distributed systems.)

\subsection{An Alternative to Hadoop/Spark}

Therefore:

\begin{quote} 

It is desirable to have a package that retains the distributed-file
nature of Hadoop/Spark  while staying fully within the simple, familiar,
yet powerful POR framework, no Java or other external language needed.  

The {\bf partools} package is designed to meet these goals.  It is
intended as {\bf a simple, sensible POR alternative to Hadoop/Spark}.
Though not necessarily appropriate for all settings, for many R
programmers, {\bf partools} may be a much better choice than {\bf
Hadoop/Spark}.  

\end{quote}

The package does not provide fault tolerance of its own.  If this is an
issue, one can provide it externally, say with the XtreemFS system.

\subsection{Software Alchemy}

Consider a large linear regression application.  We might consider
dividing the data into chunks, calling {\bf lm()} on each chunk, then
simply average the estimated coeficient vector over chunks.  It can be
shown that under fairly general conditions, this works, in the sense of
producing estimators with the same asymptotic variance as that of the
original estimator.  

This technique was developed independently by a number of researchers.
I call it Software Alchemy (SA), as it turns non-embarassingly parallel
problems into embarassingly parallel ones.\footnote{ Software Alchemy:
Turning Complex Statistical Computations into Embarrassingly-Parallel
Ones, Norman Matloff, {\it Journal of Statistical Software}, 71 (2016).}

The {\bf partools} package contains a number of functions using SA.

\section{Where Is the Magic?}

As you will see later, {\bf partools} can deliver some impressive
speedups.  But there is nothing magical about this.  Instead, the value
of the package stems from just two simple sources:

\begin{itemize}

\item [(a)] The package follows a Keep It Distributed philosophy:  Form
distributed objects {\it and keep using them in distributed form
throughout one's R session, accessing them repeatedly for one's various
desired operations --- and avoiding, to the extent possible, operations
that collect data from the worker nodes to the manager node.}  By
keeping things distributed in this way, whatever cost there was to
distributing the data originally eventually yields a net performance
gain.

\item [(b)] The package consists of a number of utility functions that
greatly facilitate creating and storing and distributed
objects, both in memory and on disk, and distributed applications such
as for regression and classification.

\end{itemize}

\section{Overview of the partools Package}

The package is based on the following very simple principles, involving
{\it distributed files} and {\it distributed data frames/matrices}.
We'll refer to nondistributed files and data frames/matrices as {\it
monolithic}.

\begin{itemize}

\item Files are stored in a distributed manner, in files with a common
basename. For example, the virtual file {\bf x} is stored as separate
files {\bf x.01}, {\bf x.02} etc. 

\item Data frames and matrices are stored in memory at the nodes in a
distributed manner, with a common name.  For example, the data frame
{\bf y} is stored in chunks at the cluster nodes, each chunk known as {\bf
y} at its node.

\end{itemize}

\subsection{Package Structure}

% To understand the function structure, and indeed the central philosophy,
% keep in mind the following definitions, for a cluster in the R {\bf
% parallel} package.
% 
% \begin{itemize}
% 
% \item A {\it distributed file} {\bf x} consists of a number of separate
% physical files, with names {\bf x.01}, {\bf x.02} and so
% on.\footnote{The number of leading 0s in the suffix will depend on the
% number of cluster nodes.}  The file {\bf x} consists of the {\bf
% rbind()} of the rows of those separate files, though {\bf x} may be
% virtual.  Cluster node {\bf i} is responsible for {\bf x.i}.
% 
% \item A {\it distributed data frame} (or distributed matrix) {\bf d}
% consists of a number of separate data frames, one at each cluster node,
% all named {\bf d}.  The data frame as a whole consists of the {\bf
% rbind()} of the rows of those separate data frames, though it may be
% virtual.
% 
% \end{itemize}

Again, in a distributed file, all the file chunks have the same prefix,
and in a distributed data frame, all chunks have the same name at the
various cluster nodes.  This plays a key role in the software.

The package consists of three main groups of functions:

\subsubsection{Distributed-file and distributed-data frame functions}

   \begin{itemize}

   \item {\bf filesplit():}  Create a distributed file from monolithic one.

   \item {\bf filesplitrand():}  Create a distributed file from monotlithic one,
   but randomize the record order.

   \item {\bf filecat():}  Create a monotlithic file from distributed one.

   \item {\bf fileread():}  Read a distributed file into distributed data
   frame.

   \item {\bf readnscramble():}  Read a distributed file into distributed
   data frame, 
   but randomize the record order.

   \item {\bf filesave():}  Write a distributed data frame to a distributed
   file.

   \item {\bf filechunkname():} Returns the full name of the file chunk,
   associated with the calling cluster node, including suffix, e.g.
   '01', '02' etc.

   \item {\bf distribsplit():}  Create a distributed data frame/matrix
   from monotlithic one.

   \item {\bf distribcat():}  Create a monotlithic data frame/matrix from
   distributed one.

   \end{itemize}

\subsubsection{Tabulative functions}

   \begin{itemize}

   \item {\bf distribagg():}  Distributed form of R's {\bf aggregate()}.

   \item {\bf distribcounts():}  Wrapper for {\bf distribagg()} to obtain
   table cell counts.

   \item {\bf dfileagg():}  Like {\bf distribagg()}, but file-based
   rather than in-memory, in order to handle files that are too big to
   fit in memory, even on a distributed basis.

   \item {\bf distribgetrows()}: Applies an R {\bf subset()} or similar
   filtering operation to the distributed object, and collects the
   resulting rows into a single object at the caller.

   \item {\bf distribrange():}  Distributed form of R's {\bf range()}.

   \end{itemize}

\subsection{Classical Computational Functions}

   \begin{itemize}

   \item {\bf ca():} General SA algorithm.

   \item {\bf cabase():} Core of {\bf ca()}.

   \item {\bf caagg():} SA analog of {\bf distribagg()}.

   \item {\bf cameans()}:  Finds means in the specified columns.

   \item {\bf caquantile():}  Wrapper for SA version of R's {\bf
   quantile()}.

   \item {\bf calm():}  Wrapper for SA version of R's {\bf lm()}.

   \item {\bf caglm():}  Wrapper for SA version of R's {\bf glm()}.

   \item {\bf cakm():}  Wrapper for SA version of R's {\bf kmeans()}.

   \item {\bf caprcomp():}  Wrapper for SA version of R's {\bf prcomp()}.

   \end{itemize}

\subsection{Modern Statistical/Machine Learning Functions}

   \begin{itemize}

   \item {\bf caclassfit():} Wrapper to do distributed fitting of a
   multiclass classification method such as random forests ({\bf rpart} 
   package) or SVM ({\bf e1071} package).

   \item {\bf caclasspred} Does prediction on new data from the output
   of {\bf caclassfit()}.

   \end{itemize}

\subsection{Support Functions}

   \begin{itemize}

   \item {\bf formrowchunks():} Form chunks of rows of a data
   frame/matrix.

   \item {\bf matrixtolist():} Forms a list of the rows or columns of a
   data frame or matrix.

   \item {\bf addlists():}  ``Add'' two lists, meaning add values of
   elements of the same name, and copy the others.

   \item {\bf dbs(), dbsmsg(), etc.:}  Debugging aids.

   \end{itemize}

\subsection{More on Software Alchemy}

All the functions with names beginning with ``ca'' use the Software
Alchemy (SA) method.  The idea is simple:  Apply the given estimator to
each chunk in the distributed object, and average over chunks.  It is
proven that the resulting distributed estimator has the same statistical
accuracy --- the same asymptotic variance --- as the original serial
one.\footnote{In the world of parallel computation, the standard word
for nonparallel is {\it serial}.}

A variant, suitable in many regression and classification algorithms,
retains the chunk output in the result, rather than performing an
avereging process.  Consider for instance the use of LASSO for
regression.\footnote{Not currently available in the package, but easily
coded under its framework.} Each chunk may settle on a different subset
of the predictor variables, so it would not make sense to average the
estimated coefficient vectors.  Instead, the predictor subsets and
corresponding coefficients are retained in the SA output.  When one is
faced with predicting the response variable for a new data point, a
prediction is calculated from each chunk, and those predictions are
averaged to obtain the final prediction for the new point.  In a
classification context, voting may be used.

Note that SA requires that the data be i.i.d.  If the data was stored
in some sorted order --- in the flight data below, it was sorted by date
--- it needs to be randomize it first, using one of the functions provided
by {\bf partools} for this purpose.

\section{Sample Session}

Our data set, from
{\it http://stat-computing.org/dataexpo/2009/the-data.html} consists of
the well-known records of airline flight delay.
For convenience, we'll just use the data for 2008, which consists of
about 7 million records.  This is large enough to illustrate speedup
due to parallelism, but small enough that we won't have to wait really
long amounts of time in our sample session here.  

The session was run on a 16-core machine, with a 16-node R virtual
cluster in the sense of the R {\bf parallel} package (which is loaded by
{\bf partools}).  Note carefully, though, that we should not expect a
16-fold speedup. In the world of parallel computation, one usually gets
of speedups of considerably less than $n$ for a platform of $n$
computational entities, in this case with $n = 16$.  Indeed, one is
often saddened to find that the parallel version is actually {\it
slower} than the serial one!

To create our cluster {\bf cls}, we ran

\begin{verbatim}
> cls <- makeCluster(16)  # from 'parallel' library
> setclsinfo(cls)  # from 'partools'
\end{verbatim}

\subsection{Distribute the Data}

The file, {\bf yr2008}, was first split into a distributed file, stored
in {\bf yr2008r.01},...,{\bf yr2008r.16}, using {\bf filesplitrand()}, and
then read into memory at the 16 cluster nodes using {\bf fileread()}:

\begin{verbatim}
> filesplitrand(cls,'yr2008','yr2008r',2,header=TRUE,sep=",")
> fileread(cls,'yr2008r','yr2008',2,header=TRUE, sep=",")
\end{verbatim}

The call to {\bf filesplitrand()} splits the file as described above;
since these files are permanent, we can skip this step in future R
sessions involving this data (if the data doesn't change).  The function
{\bf filesplitrand()} was used instead of {\bf filesplit()} to construct
the distributed file, in order to randomize the placement of the records
of {\bf yr2008} across cluster nodes.  As noted earlier, random
arrangement of the rows is required for SA.

The argument 2 here means that the suffixes are 2 digits, specifically
'01', '02' and so on.  So, we create files {\bf yr2008r.01} etc.\ using
{\bf filesplit()}.  The call to {\bf fileread()} specifies that cluster
node 1 will read the file {\bf yr2008r.01}, cluster node 2 will read
{\bf yr2008r.02} and so on.  Each node will place the data it reads into
a data frame {\bf yr2008}.

\subsection{Distributed Aggregation of Summary Statistics}

In order to run timing comparisons, the full file was also read into
memory at the cluster manager:

\begin{verbatim}
> yr2008 <- read.csv("yr2008")
\end{verbatim}

(In practice, though, this would not be done, i.e.\ we would not have
{\it both} distributed {\it and} monotlithic versions of the data at the
same time.)

The first operation run involved the package's distributed version of
R's {\bf aggregate()}.  Here we wanted to tabulate departure delay,
arrival delay and flight time, broken down into cells according to
flight origin and destination.  We'll find the maximum value in each
cell.

\begin{verbatim}
> system.time(print(distribagg(cls, c("DepDelay","ArrDelay","AirTime"),
   c("Origin","Dest"),"yr2008", FUN="max")))  
...
5193    CDV  YAK      327      325      54
5194    JNU  YAK      317      308      77
5195    SLC  YKM      110      118     115
5196    IPL  YUM      162      163      26
...
   user  system elapsed  
  2.291   0.084  15.952 
\end{verbatim}

What {\bf distribagg()} did here was have each cluster node run {\bf
aggregate()} on its own chunk of the data, then (pardon the pun)
aggregate the return values from the nodes.

The serial version was much slower.

\begin{verbatim}
> system.time(print(aggregate(cbind(DepDelay,ArrDelay,AirTime) ~ 
   Origin+Dest,data=yr2008,FUN=max)))  
...
5193    CDV  YAK      327      325      54
5194    JNU  YAK      317      308      77
5195    SLC  YKM      110      118     115
5196    IPL  YUM      162      163      26
...
   user  system elapsed  
249.038   0.444 249.634
\end{verbatim}

So, the results of {\bf distribagg()} did indeed match those of {\bf
aggregate()}, but did so more than 15 times faster!

\subsection{Un-distributing Data}

Remember, the Keep It Distributed  philosophy of {\bf partools} is to
create distributed objects and then keep using them repeatedly in
distributed form.  However, in some cases, we may wish to collect a
distributed result into a monolithic object, especially if the result is
small.  This is done in the next example:

Say we wish to do a filter operation, extracting the data on all the
Sunday evening flights, and collect it into one place.   Here is the
direct version:

\begin{verbatim}
> sundayeve <- with(yr2008,yr2008[DayOfWeek==1 & DepTime > 1800,])
\end{verbatim}

This actually is not a time-consuming operation, but again, in typical
{\bf partools} use, we would only have the distributed version of {\bf
yr2008}.  Here is how we would achieve the same effect from the
distributed object:

\begin{verbatim}
> sundayeved <- 
   distribgetrows(cls,'with(yr2008,yr2008[DayOfWeek==1 & DepTime > 1800,])')
\end{verbatim}

What {\bf distribgetrows()} does is produce a data frame at each cluster
node, per the user's instructions, then combine them together at the
caller via R's {\bf rbind()}.  The user sets the second argument, a
quoted string, to whatever she would have done on a serial basis.  A
simple concept, yet quite versatile.

\subsection{Distributed NA Processing}

As another example, say we are investigating data completeness.  We may
wish to flag all records having an inordinate number of NA values.  As a
first step, we may wish to add a column to our data frame, indicating
how many NA values there are in each row.  If we did not have the
advantage of distributed computation, here is how long it would take for
our flight delay data:

\begin{verbatim}
> sumna <- function(x) sum(is.na(x))
> system.time(yr2008$n1 <- apply(yr2008[,c(5,7,8,11:16,19:21)],1,sumna))
   user  system elapsed
268.463   0.773 269.542
\end{verbatim}

But it is of course much faster on a distributed basis, using the {\bf
parallel} package function {\bf clusterEvalQ()}:

\begin{verbatim}
> clusterExport(cls,"sumna",envir=environment())
> system.time(clusterEvalQ(cls,yr2008$n1 <- apply(yr2008[,c(5,7,8,11:16,19:21)],1,sumna)))
   user  system elapsed 
  0.094   0.012  16.758 
\end{verbatim}

The speedup here was about 16, fully utilizing all 16 cores.

Ordinarily, we would continue that NA analysis on a distributed basis,
in accord with the {\bf partools} Keep It Distributed philosophy of
setting up distributed objects and then repeatedly dealing with them on
a distributed basis.  If our subsequent operations continue to have time
complexity linear in the number of records processes, we should continue
to get speedups of about 16.

On the other hand, we may wish to gather together all the records have 8
or more NA values.  In the nonparallel context, it would take some time:

\begin{verbatim}
> system.time(na8 <- yr2008[yr2008$n1 > 7,])
   user  system elapsed 
  9.292   0.028   9.327 
\end{verbatim}

In the distributed manner, it is slightly faster:

\begin{verbatim}
> system.time(na8d <- distribgetrows(cls,'yr2008[yr2008$n1 > 7,]'))
   user  system elapsed 
  5.524   0.160   6.584 
\end{verbatim}

The speedup is less here, as the resulting data must travel from the
cluster nodes to the cluster manager. In our case here, this is just a
memory-to-memory transfer rather than across a network, as we are on a
multicore machine, but it still takes time.  If the number of records
satisfying the filtering condition had been smaller than the 136246 we
had here, the speedup factor would have been greater.

\subsection{SA: Linear Regression}

Now let's turn to statistical operations, starting of course with linear
regression.  As noted, some {\bf partools} functions make use of
Software Alchemy, which replaces the given operation by a {\it
distributed, statistically equivalent} operation.  This will often
produce a significant speedup.  Note again that though the result may
different from the non-distributed version, say in the third significant
digit, it is just as accurate statistically; neither estimate is
``better'' than the other.  

The SA function names begin with 'ca', for ``chunk averaging.''  The SA
version of {\bf lm()}, for instance, is {\bf calm()}.

In the flight data, we predicted the arrival delay from the departure
delay and distance, comparing the distributed and serial versions, 

\begin{verbatim}
> system.time(print(lm(ArrDelay ~ DepDelay+Distance,data=yr2008)))  
...     
Coefficients:
(Intercept)     DepDelay     Distance  
  -1.061369     1.019154    -0.001213  

   user  system elapsed 
 77.107  12.463  76.225 
> system.time(print(calm(cls,'ArrDelay ~ DepDelay+Distance,data=yr2008')$tht))  
 (Intercept)     DepDelay     Distance 
-1.061262941  1.019150592 -0.001213252 
   user  system elapsed 
 13.414   0.691  18.396 
\end{verbatim}

Note again the quoted-string argument.  This is the one the user will
give to {\bf lm()} in the serial case.

Linear regression is very hard to parallelize, so the speedup factor of
more than 4 here is nice.  Coefficient estimates were virtually
identical.

\subsection{SA: Principal ComponentsRegression}

Next, principal components.  Since R's {\bf prcomp()} does not handle NA
values for nonformula specifications, let's do that separately first:

\begin{verbatim}
> system.time(cc <- na.omit(yr2008[,c(12:16,19:21)]))
   user  system elapsed 
  9.540   0.351   9.907 
> system.time(clusterEvalQ(cls,cc <- na.omit(yr2008[,c(12:16,19:21)])))
   user  system elapsed 
  0.885   0.232   2.352 
\end{verbatim}

Note that this too was faster in the distributed approach, though both
times were small.  And now the PCA runs:

\begin{verbatim}
> system.time(ccout <- prcomp(cc))
   user  system elapsed 
 61.905  49.605  58.444 
> ccout$sdev
[1] 5.752546e+02 5.155227e+01 2.383117e+01 1.279210e+01 9.492825e+00
[6] 5.530152e+00 1.133015e-03 6.626621e-12
> system.time(ccoutdistr <- caprcomp(cls,'cc',8))
   user  system elapsed 
  5.023   0.604   8.949 
> ccoutdistr$sdev 
[1] 5.752554e+02 5.155127e+01 2.383122e+01 1.279184e+01 9.492570e+00
[6] 5.529869e+00 9.933142e-04 8.679427e-13
\end{verbatim}

Once again, the second argument of {\bf caprcomp()} is a quoted string,
in which the user specifies the desired arguments to {\bf prcomp()}.
Since those arguments may be complicated, the code cannot deduce the
number of variables, and thus needs to be specified in the third
argument.

We have more than a 6-fold speedup here.  Agreement of the component
standard deviations is good.\footnote{Note that the last component,
i.e.\ the eighth one, is minuscule, statistically  0.  Again, SA gives
results that are statistically equivalent to the serial ones.}

% THIS PART NEEDS TO BE REDONE

% Next, let's find the {\it interquartile range} for several columns.
% This is a robust measure of dispersion, defined at the difference
% between the 75$^{th}$ and 25$^{th}$ percentiles. 
% Here is serial code to find this:
% 
% \begin{verbatim}
% # find the interquartile range for a vector x; quantile function's
% # default is to find the seq(0, 1, 0.25) quantiles
% iqr <- function(x) {tmp <- quantile(x,na.rm=TRUE); tmp[4] - tmp[2]}
% # find the interquartile range for each column of a data frame dfr
% iqrm <- function(dfr) apply(dfr,2,iqr)
% \end{verbatim}
% 
% So, let's compare times.  First, the serial version:
% 
% \begin{verbatim}
% > system.time(print(iqrm(yr2008[,c(5:8,12:16,19:21)])))
%           DepTime        CRSDepTime           ArrTime        CRSArrTime 
%               800               790               802               792 
% ActualElapsedTime    CRSElapsedTime           AirTime          ArrDelay 
%                80                79                77                22 
%          DepDelay          Distance            TaxiIn           TaxiOut 
%                12               629                 4                 9 
%    user  system elapsed 
%  29.280   0.243  29.554 
% \end{verbatim}
% 
% For the distributed version,
% 
% \begin{verbatim}
% > system.time(print(colMeans(distribgetrows(cls,'iqrm(yr2008[,c(5:8,12:16,19:21)])'))))
%           DepTime        CRSDepTime           ArrTime        CRSArrTime 
%          800.1250          790.0625          801.8125          791.8750 
% ActualElapsedTime    CRSElapsedTime           AirTime          ArrDelay 
%           80.0000           78.9375           76.5625           22.0000 
%          DepDelay          Distance            TaxiIn           TaxiOut 
%           12.0000          627.6875            4.0000            9.0000 
%    user  system elapsed 
%   0.009   0.002   2.587 
% \end{verbatim}
% 
% Here the speedup was more than 11-fold, with agreement generally to
% three significant digits.  Once again, note that statistically speaking,
% both estimators have the same accuracy.

\subsection{SA: K-Means Clustering}

The package also includes a distributed version of k-means clustering.
Here it is run on the flight delay data.  First, retain only the NA-free
rows for the variables of interest, then run:

\begin{verbatim}
> fileread(cls,'yr2008r','yr2008',2,header=TRUE, sep=",")
> invisible(clusterEvalQ(cls,y28 <- na.omit(yr2008[,c(5:8,13:16,19:21)])))
> system.time(koutpar <- cakm(cls,'y28',3,11))
   user  system elapsed
  4.083   0.132   9.293
\end{verbatim}

Compare to serial:

\begin{verbatim}
> yr2008 <- read.csv('y2008')
> y28 <- na.omit(yr2008[,c(5:8,13:16,19:21)])
> system.time(koutser <- kmeans(y28,3))
   user  system elapsed
 54.394   0.558  55.032
\end{verbatim}

So, the distributed version is about 6 times faster.  Results are
virtually identical:

\begin{verbatim}
> koutpar$centers
          [,1]      [,2]     [,3]     [,4]     [,5]
[1,] 1741.3967 1718.0296 1876.433 1895.435 110.4398
[2,]  932.4057  936.6907 1081.743 1082.813 108.5091
[3,] 1311.2193 1308.1838 1496.267 1525.790 267.8620
          [,6]      [,7]      [,8]      [,9]    [,10]
[1,]  85.25672 13.101844 14.826940  569.2534 6.742315
[2,]  84.66763  3.091913  4.517502  561.0567 6.785464
[3,] 238.93152  8.668587 11.698193 1886.8964 7.541735
        [,11]
[1,] 16.71571
[2,] 15.63046
[3,] 18.35916
> koutser$centers
    DepTime CRSDepTime  ArrTime CRSArrTime
1 1741.3888  1718.0217 1876.418   1895.425
2  932.3681   936.6674 1081.737   1082.809
3 1311.4436  1308.3525 1496.404   1525.905
  CRSElapsedTime   AirTime  ArrDelay  DepDelay
1       110.4363  85.25292 13.100842 14.826083
2       108.5151  84.67361  3.092439  4.518112
3       267.8669 238.93668  8.672439 11.701706
   Distance   TaxiIn  TaxiOut
1  569.2226 6.742079 16.71604
2  561.1148 6.785409 15.63038
3 1886.9094 7.542760 18.35823
\end{verbatim}

\subsection{SA: Multiclass Classification}

Here we apply random forests to the UC Irvine forest cover type data,
which has 590,000 records of 55 variables.  The last variable is the
cover type, coded 1 through 7.  The machine used was quad core with
hyperthreading level of 2, giving a theoretically possible parallelism 
degree of 8.  Thus a cluster of size 8 was tried.  

We used random forests for our classification algorithm (perhaps
appropriate, given the data here!), but could have used any algorithm
whose R implemention has a {\bf predict()} method.  (Some output not shown.)

\begin{verbatim}
> library(rpart)
> cls <- makeCluster(8)
> setclsinfo(cls)
> clusterEvalQ(cls,library(rpart))  # have each node load pkg
> cvr <- read.csv('~/Research/DataSets/ForestCover/CovTypeFull.csv',
     header=FALSE)
> cvr$V55 <- as.factor(cvr$V55)  # rpart requires factor for class
> trnidxs <- sample(1:580000,290000)  # form training and test sets
> trn <- cvr[trnidxs,]
> tst <- cvr[-trnidxs,]
> distribsplit(cls,'trn')
# do fit at each node; again, one argument is the serial argument
> system.time(fit <- caclassfit(cls,'rpart(V55 ~ .,data=trn,xval=25)'))
   user  system elapsed
  0.126   0.005  25.294
> system.time(predout <- caclasspred(fit,tst,55,type='class'))
   user  system elapsed
 26.132   0.910  27.045
> predout$acc
[1] 0.6692542
> system.time(fitser <- rpart(V55 ~ .,data=trn,xval=25))
   user  system elapsed
 87.699   0.143  87.850
> # system.time(predoutser <- predict(fitser,tst[,-10],type='class'))
> system.time(predoutser <- predict(fitser,tst[,-55],type='class'))
   user  system elapsed
  0.302   0.101   0.403
> mean(predoutser == tst[,55])
[1] 0.6691202
\end{verbatim}

What {\bf caclassfit()} does is run our classification algorithm, in
this case {\bf rpart}, at each node, then collect all the fitted models
and return them to the caller.  So here, after the call, {\bf fit} will
be an R list, element {\it i} of which is the fit computed by cluster
node {\it i}.

The call to {\bf caclasspred()} then applies these fits (on the parent
node, not the cluster nodes) to our test data, {\bf tst}.  The result,
{\bf predout} contains the predictions for each of the 149 cases in {\bf
tst}, using voting among the eight fits.

Here SA more than tripled the fitting speed, with the same accuracy.
Prediction using SA is relatively slow, due to the voting process, but
arguably this is not an issue in a production setting, and in any case,
SA was substantially faster even in total fitting plus prediction time.

\section{Dealing with Memory Limitations}

The discussion so far has had two implicit assumptions:

\begin{itemize}

\item The number of file chunks and the number of (R {\bf parallel})
cluster nodes are equal, and the latter is equal to the number of
physical computing devices one has, e.g. the number of cores in a
multicore machine or the number of network nodes in a physical clsuter.

\item Each file chunk fits into the memory\footnote{Say, physical memory
plus swap space.} of the corresponding cluster node.

\end{itemize}

The first assumption is not very important.  If for some reason we have
created a distributed file with more chunks than our number of physical
computing devices, we can still set up an R {\bf parallel} cluster with
size equal to the number of file chunks.  Then more than one R process
will run on at least some of the cluster nodes, albeit possibly at the
expense of, say, an increase in virtual memory swap operations.

The second assumption is the more pressing one.  For this reason, the
{\bf partools} package includes functions such as {\bf dfileagg()}.  The
latter acts similarly to {\bf distribagg()}, but with a key difference:
Any given cluster node will read from many chunks of the distributed
file, and will process those chunks one at a time, never exceeding
memory constraints.

Consider again our flight delay data set.  As a very simple example, say
we have a two-node physical cluster, and that each node has memory
enough for only 1/4 of the data.  So, we break up the original data file
to 4 pieces, {\bf yr2008.1} through {\bf yr2008.4}, and we run, say,

\begin{verbatim}
# say we have machines pc28 and pc29 available for computation
> cls <- makeCluster(c('pc28','pc29'))
> dfileagg(cls,c('yr2008.1','yr2008.2','yr2008.3','yr2008.4'),
     c("DepDelay","ArrDelay","AirTime"),
     c("Origin","Dest"),"yr2008", FUN="max")
\end{verbatim}

Our first cluster node will read {\bf yr2008.1} and {\bf yr2008.2}, one
at a time, while the second will read {\bf yr2008.3} and {\bf yr2008.4},
again one at a time, At each node, at any given time only 1/4 of the
data is in memory, so we don't exceed memory capacity.  But they will 
get us the right answer, and will do so in parallel, roughly with a
speedup factor of 2.

More functions like this will be added to {\bf partools}.

% \section{What About Other R Functions and Packages, say reshape2/tidyr?}
% 
% It is easy to apply the {\bf partools} idiom to many R functions that
% return data frame or matrix values.  Consider for instance the {\bf
% reshape2} function {\bf melt()}.  Say we already have a distributed data
% frame {\bf ddf} at the cluster nodes.  We have two choices here:
% 
% \begin{itemize}
% 
% \item In accord with the Keep It Distributed {\bf partools} philosophy,
% we might simply call {\bf melt()} at each node, say using {\bf
% clusterEvalQ()}, and keep the result distributed.
% 
% \item If say we need to gather together the results of a {\bf melt()}
% operation, we could call {\bf melt()} at each node but then call the
% {\bf partools} function {\bf distribgetrows()} to collect the molten
% data into one centralized data frame.  A {\bf cast()} operation would
% requires a bit more care, depending on whether the distributed object
% crosses subject ID boundaries.
% 
% \end{itemize}

\end{document}


