| Version: | 0.2-2 | 
| Date: | 2020-01-12 | 
| Title: | Hadoop InteractiVE | 
| Description: | Hadoop InteractiVE facilitates distributed computing via the MapReduce paradigm through R and Hadoop. An easy to use interface to Hadoop, the Hadoop Distributed File System (HDFS), and Hadoop Streaming is provided. | 
| License: | GPL-3 | 
| Imports: | methods, rJava (≥ 0.9-3), tools, XML | 
| Depends: | R (≥ 2.9.0) | 
| Enhances: | HadoopStreaming | 
| SystemRequirements: | Apache Hadoop >= 2.7.2 (https://hadoop.apache.org/releases.html#Download); Obsolete: Hadoop core >= 0.19.1 and <= 1.0.3 or CDH3 (https://www.cloudera.com); standard unix tools (e.g., chmod) | 
| NeedsCompilation: | no | 
| Packaged: | 2020-01-12 16:31:57 UTC; theussl | 
| Author: | Ingo Feinerer [aut], Stefan Theussl [aut, cre] | 
| Maintainer: | Stefan Theussl <Stefan.Theussl@R-project.org> | 
| Repository: | CRAN | 
| Date/Publication: | 2020-01-12 17:10:02 UTC | 
Hadoop Distributed File System
Description
Functions providing high-level access to the Hadoop Distributed File System (HDFS).
Usage
DFS_cat( file, con = stdout(), henv = hive() )
DFS_delete( file, recursive = FALSE, henv = hive() )
DFS_dir_create( path, henv = hive() )
DFS_dir_exists( path, henv = hive() )
DFS_dir_remove( path, recursive = TRUE, henv = hive() )
DFS_file_exists( file, henv = hive() )
DFS_get_object( file, henv = hive() )
DFS_read_lines( file, n = -1L, henv = hive() )
DFS_rename( from, to, henv = hive() )
DFS_list( path = ".", henv = hive() )
DFS_tail( file, n = 6L, size = 1024L, henv = hive() )
DFS_put( files, path = ".", henv = hive() )
DFS_put_object( obj, file, henv = hive() )
DFS_write_lines( text, file, henv = hive() )
Arguments
| henv | An object containing the local Hadoop configuration. | 
| file | a character string representing a file on the DFS. | 
| files | a character string representing files located on the local file system to be copied to the DFS. | 
| n | an integer specifying the number of lines to read. | 
| obj | an R object to be serialized to/from the DFS. | 
| path | a character string representing a full path name in the
DFS (without the leading  | 
| recursive | logical. Should elements of the path other than the last be deleted recursively? | 
| size | an integer specifying the number of bytes to be read. Must
be sufficiently large otherwise  | 
| text | a (vector of) character string(s) to be written to the DFS. | 
| con | A connection to be used for printing the output provided by
 | 
| from | a character string representing a file or directory on the DFS to be renamed. | 
| to | a character string representing the new filename on the DFS. | 
Details
The Hadoop Distributed File System (HDFS) is typically part of a Hadoop cluster or can be used as a stand-alone general purpose distributed file system (DFS). Several high-level functions provide easy access to distributed storage.
DFS_cat is useful for producing output in user-defined
functions. It reads from files on the DFS and typically prints the
output to the standard output. Its behaviour is similar to the base
function cat.
DFS_dir_create creates directories with the given path names if
they do not already exist. It's behaviour is similar to the base
function dir.create.
DFS_dir_exists and DFS_file_exists return a logical
vector indicating whether the directory or file respectively named by
its argument exist. See also function file.exists.
DFS_dir_remove attempts to remove the directory named in its
argument and if recursive is set to TRUE also attempts
to remove subdirectories in a recursive manner.
DFS_list produces a character vector of the names of files
in the directory named by its argument.
DFS_read_lines is a reader for (plain text) files stored on the
DFS. It returns a vector of character strings representing lines in
the (text) file. If n is given as an argument it reads that
many lines from the given file. It's behaviour is similar to the base
function readLines.
DFS_put copies files named by its argument to a given path in
the DFS.
DFS_put_object serializes an R object to the DFS.
DFS_write_lines writes a given vector of character strings to a
file stored on the DFS. It's behaviour is similar to the base
function writeLines.
Value
DFS_delete(), DFS_dir_create(), and DFS_dir_remove return a logical value indicating if the 
operation succeeded for the given argument.
DFS_dir_exists() and DFS_file_exists() return TRUE if
the named directories or files exist in the HDFS.
DFS_get__object() returns the deserialized object stored in a
file on the HDFS.
DFS_list() returns a character vector representing the directory listing of the corresponding
path on the HDFS.
DFS_read_lines() returns a character vector of length the
number of lines read.
DFS_tail() returns a character vector of length the number of
lines to read until the end of a file on the HDFS.
Author(s)
Stefan Theussl
References
The Hadoop Distributed File System (https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html).
Examples
## Do we have access to the root directory of the DFS?
## Not run: DFS_dir_exists("/")
## Some self-explanatory DFS interaction
## Not run: 
DFS_list( "/" )
DFS_dir_create( "/tmp/test" )
DFS_write_lines( c("Hello HDFS", "Bye Bye HDFS"), "/tmp/test/hdfs.txt" )
DFS_list( "/tmp/test" )
DFS_read_lines( "/tmp/test/hdfs.txt" )
## End(Not run)
## Serialize an R object to the HDFS
## Not run: 
foo <- function()
"You got me serialized."
sro <- "/tmp/test/foo.sro"
DFS_put_object(foo, sro)
DFS_get_object( sro )()
## End(Not run)
## finally (recursively) remove the created directory
## Not run: DFS_dir_remove( "/tmp/test" )
Managing the Hadoop configuration
Description
Functions for showing/changing Hadoop configuration.
Usage
hive_get_parameter( x, henv = hive() )
hive_get_masters( henv = hive() )
hive_get_workers( henv = hive() )
hive_get_nreducer( henv = hive() )
hive_set_nreducer( n, henv = hive() )
Arguments
| henv | An object containing the local Hadoop configuration. | 
| x | A character string naming the parameter in the Hadoop configuration. | 
| n | An integer specifying the number of reducers to be used in
 | 
Details
The function hive_get_parameter() is used to get parameters from
the Hadoop cluster configuration.
The functions hive_get_workers() and hive_get_masters()
return the hostnames of the configured nodes in the cluster. 
The functions hive_get_nreducer() and hive_set_nreducer()
are used to get/set the number of reducers which are used in Hadoop
Streaming using hive_stream().
Value
hive_get_parameter() returns the specified parameter as a character string.
hive_get_workers() returns a character vector naming the hostnames
of the configured worker nodes in the cluster.
hive_get_masters() returns a character vector of the hostnames of
the configured master nodes in the cluster.
hive_get_nreducer() returns an integer representing the number
of configured reducers.
Author(s)
Stefan Theussl
References
Apache Hadoop cluster configuration (https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html#Configuring_Hadoop_in_Non-Secure_Mode).
Examples
## Which tmp directory is set in the Hadoop configuration?
## Not run: hive_get_parameter("hadoop.tmp.dir")
## The master nodes of the cluster
## Not run: hive_get_masters()
## The worker nodes of the cluster
## Not run: hive_get_workers()
## The number of configured reducers
## Not run: hive_get_nreducer()
Hadoop Interactive Framework Control
Description
High-level functions to control Hadoop framework.
Usage
hive( new )
.hinit( hadoop_home )
hive_start( henv = hive() )
hive_stop( henv = hive() )
hive_is_available( henv = hive() )
Arguments
| hadoop_home | A character string pointing to the local Hadoop
installation. If not given, then  | 
| henv | An object containing the local Hadoop configuration. | 
| new | An object specifying the Hadoop environment. | 
Details
High-level functions to control Hadoop framework.
The function hive() is used to get/set the Hadoop cluster
object. This object consists of an environment holding information
about the Hadoop cluster.
The function .hinit() is used to initialize a Hadoop cluster. It
retrieves most configuration options via searching the
HADOOP_HOME directory given as an environment variable, or,
alternatively, by searching the /etc/hadoop directory in case
the https://www.cloudera.com distribution (i.e., CDH3) is used.
The functions hive_start() and hive_stop() are used to
start/stop the Hadoop framework. The latter is not applicable for
system-wide installations like CDH3.
The function hive_is_available() is used to check the status of
a Hadoop cluster.
Value
hive() returns an object of class "hive" representing
the currently used cluster configuration.
hive_is_available() returns TRUE if the given Hadoop
framework is running.
Author(s)
Stefan Theussl
References
Apache Hadoop: https://hadoop.apache.org/.
Cloudera's distribution including Apache Hadoop (CDH): https://www.cloudera.com/downloads/cdh.html.
Examples
## read configuration and initialize a Hadoop cluster:
## Not run: h <- .hinit( "/etc/hadoop" )
## Not run: hive( h )
## Start hadoop cluster:
## Not run: hive_start()
## check the status of an Hadoop cluste:
## Not run: hive_is_available()
## return cluster configuration 'h':
hive()
## Stop hadoop cluster:
## Not run: hive_stop()
Hadoop Streaming with package hive
Description
High-level R function for using Hadoop Streaming.
Usage
hive_stream( mapper, reducer, input, output, henv = hive(),
             mapper_args = NULL, reducer_args = NULL, cmdenv_arg = NULL,
streaming_args = NULL)
Arguments
| mapper | a function which is executed on each worker node. The so-called mapper typically maps input key/value pairs to a set of intermediate key/value pairs. | 
| reducer | a function which is executed on each worker node. The so-called reducer reduces a set of intermediate values which share a key to a smaller set of values. If no reducer is used leave empty. | 
| input | specifies the directory holding the data in the DFS. | 
| output | specifies the output directory in the DFS containing the results after the streaming job finished. | 
| henv | Hadoop local environment. | 
| mapper_args | additional arguments to the mapper. | 
| reducer_args | additional arguments to the reducer. | 
| cmdenv_arg | additional arguments passed as environment variables to distributed tasks. | 
| streaming_args | additional arguments passed to the Hadoop
Streaming utility. By default, only the number of reducers will be
set using  | 
Details
The function hive_stream() starts a MapReduce job on the given
data located on the HDFS.
Author(s)
Stefan Theussl
References
Apache Hadoop Streaming (https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html).
Examples
## A simple word count example
## Put some xml files on the HDFS:
## Not run: DFS_put( system.file("defaults/core/", package = "hive"),
                  "/tmp/input" )
## End(Not run)
## Not run: DFS_put( system.file("defaults/hdfs/hdfs-default.xml", package = "hive"),
                  "/tmp/input" )
## End(Not run)
## Not run: DFS_put( system.file("defaults/mapred/mapred-default.xml", package = "hive"),
                  "/tmp/input" )
## End(Not run)
## Define the mapper and reducer function to be applied:
## Note that a Hadoop map or reduce job retrieves data line by line from stdin.
## Not run: 
mapper <- function(x){
    con <- file( "stdin", open = "r" )
    while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
        terms <- unlist(strsplit(line, " "))
        terms <- terms[nchar(terms) > 1 ]
        if( length(terms) )
            cat( paste(terms, 1, sep = "\t"), sep = "\n")
    }
}
reducer <- function(x){
    env <- new.env( hash = TRUE )
    con <- file( "stdin", open = "r" )
    while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
        keyvalue <- unlist( strsplit(line, "\t") )
        if( exists(keyvalue[1], envir = env, inherits = FALSE) ){
            assign( keyvalue[1], get(keyvalue[1], envir = env) + as.integer(keyvalue[2]),
                    envir = env )
        } else {
            assign( keyvalue[1], as.integer(keyvalue[2]), envir = env )
        }
    }
    env <- as.list(env)
    for( term in names(env) )
        writeLines( paste(c(term, env[[term]]), collapse ="\t") )
}
hive_set_nreducer(1)
hive_stream( mapper = mapper, reducer = reducer, input = "/tmp/input", output = "/tmp/output" )
DFS_list("/tmp/output")
head( DFS_read_lines("/tmp/output/part-00000") )
## End(Not run)
## Don't forget to clean file system
## Not run: DFS_dir_remove("/tmp/input")
## Not run: DFS_dir_remove("/tmp/output")