[] R : Replyr |
replyr
REmote PLYing of big data for R ( R). replyr
? ( Spark).data.frame
. replyr
:replyr_summary()
.replyr_union_all()
.replyr_bind_rows()
.dplyr::do()
): replyr_split()
, replyr::gapply()
.replyr_moveValuesToRows()
/ replyr_moveValuesToColumns()
.Spark
sparklyr
.replyr
R , .base::date()
## [1] "Thu Jul 6 15:56:28 2017"
# devtools::install_github('rstudio/sparklyr')
# devtools::install_github('tidyverse/dplyr')
# devtools::install_github('tidyverse/dbplyr')
# install.packages("replyr")
suppressPackageStartupMessages(library("dplyr"))
packageVersion("dplyr")
## [1] '0.7.1.9000'
packageVersion("dbplyr")
## [1] '1.1.0.9000'
library("tidyr")
packageVersion("tidyr")
## [1] '0.6.3'
library("replyr")
packageVersion("replyr")
## [1] '0.4.2'
suppressPackageStartupMessages(library("sparklyr"))
packageVersion("sparklyr")
## [1] '0.5.6.9012'
# , https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.1.0',
hadoop_version = '2.7',
master = "local",
config = config)
summary()
glance()
, Spark
.mtcars_spark <- copy_to(sc, mtcars)
# ,
summary(mtcars_spark)
## Length Class Mode
## src 1 src_spark list
## ops 2 op_base_remote list
packageVersion("broom")
## [1] '0.4.2'
broom::glance(mtcars_spark)
## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl
replyr_summary
.replyr_summary(mtcars_spark) %>%
select(-lexmin, -lexmax, -nunique, -index)
## column class nrows nna min max mean sd
## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481
## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216
## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938
## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685
## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787
## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574
## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432
## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161
## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909
## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041
## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000
tidyr
.mtcars2 <- mtcars %>%
mutate(car = row.names(mtcars)) %>%
copy_to(sc, ., 'mtcars2')
#
mtcars2 %>%
tidyr::gather('fact', 'value')
## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
mtcars2 %>%
replyr_moveValuesToRows(nameForNewKeyColumn= 'fact',
nameForNewValueColumn= 'value',
columnsToTakeFrom= colnames(mtcars),
nameForNewClassColumn= 'class') %>%
arrange(car, fact)
## # Source: lazy query [?? x 4]
## # Database: spark_connection
## # Ordered by: car, fact
## car fact value class
##
## 1 AMC Javelin am 0.00 numeric
## 2 AMC Javelin carb 2.00 numeric
## 3 AMC Javelin cyl 8.00 numeric
## 4 AMC Javelin disp 304.00 numeric
## 5 AMC Javelin drat 3.15 numeric
## 6 AMC Javelin gear 3.00 numeric
## 7 AMC Javelin hp 150.00 numeric
## 8 AMC Javelin mpg 15.20 numeric
## 9 AMC Javelin qsec 17.30 numeric
## 10 AMC Javelin vs 0.00 numeric
## # ... with 342 more rows
dplyr bind_rows
, union
union_all
Spark
. replyr::replyr_union_all()
replyr::replyr_bind_rows()
.db1 <- copy_to(sc,
data.frame(x=1:2, y=c('a','b'),
stringsAsFactors=FALSE),
name='db1')
db2 <- copy_to(sc,
data.frame(y=c('c','d'), x=3:4,
stringsAsFactors=FALSE),
name='db2')
# - ,
bind_rows(list(db1, db2))
## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl
#
union_all(db1, db2)
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## x y
##
## 1 1 a
## 2 2 b
## 3 3 c
## 4 4 d
#
# ,
union(db1, db2)
## # Source: lazy query [?? x 2]
## # Database: spark_connection
## x y
##
## 1 4 d
## 2 1 a
## 3 3 c
## 4 2 b
replyr::replyr_bind_rows
data.frame
-.replyr_bind_rows(list(db1, db2))
## # Source: table [?? x 2]
## # Database: spark_connection
## x y
##
## 1 1 a
## 2 2 b
## 3 3 c
## 4 4 d
Spark
).dplyr::do
help('do', package='dplyr')
:by_cyl <- group_by(mtcars, cyl)
do(by_cyl, head(., 2))
## # A tibble: 6 x 11
## # Groups: cyl [3]
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
dplyr::do
Spark
by_cyl <- group_by(mtcars_spark, cyl)
do(by_cyl, head(., 2))
## # A tibble: 3 x 2
## cyl V2
##
## 1 6
## 2 4
## 3 8
replyr
/mtcars_spark %>%
replyr_split('cyl',
partitionMethod = 'extract') %>%
lapply(function(di) head(di, 2)) %>%
replyr_bind_rows()
## # Source: table [?? x 11]
## # Database: spark_connection
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr gapply
mtcars_spark %>%
gapply('cyl',
partitionMethod = 'extract',
function(di) head(di, 2))
## # Source: table [?? x 11]
## # Database: spark_connection
## mpg cyl disp hp drat wt qsec vs am gear carb
##
## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr::replyr_apply_f_mapped
replyr::replyr_apply_f_mapped()
, .wrapr::let()
).#
DecreaseRankColumnByOne <- function(d) {
d$RankColumn <- d$RankColumn - 1
d
}
d
( , , !), replyr::replyr_apply_f_mapped()
:#
d <- data.frame(Sepal_Length = c(5.8,5.7),
Sepal_Width = c(4.0,4.4),
Species = 'setosa',
rank = c(1,2))
#
DecreaseRankColumnByOneNamed <- function(d, ColName) {
replyr::replyr_apply_f_mapped(d,
f = DecreaseRankColumnByOne,
nmap = c(RankColumn = ColName),
restrictMapIn = FALSE,
restrictMapOut = FALSE)
}
#
dF <- DecreaseRankColumnByOneNamed(d, 'rank')
print(dF)
## Sepal_Length Sepal_Width Species rank
## 1 5.8 4.0 setosa 0
## 2 5.7 4.4 setosa 1
replyr::replyr_apply_f_mapped()
, DecreaseRankColumnByOne
( nmap
), DecreaseRankColumnByOne
, .Sparklyr
. dplyr::copy_to()
dplyr::compute()
. .replyr
, : , ( ).print(replyr::makeTempNameGenerator)
## function (prefix, suffix = NULL)
## {
## force(prefix)
## if ((length(prefix) != 1) || (!is.character(prefix))) {
## stop("repyr::makeTempNameGenerator prefix must be a string")
## }
## if (is.null(suffix)) {
## alphabet <- c(letters, toupper(letters), as.character(0:9))
## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE),
## collapse = "")
## }
## count <- 0
## nameList <- list()
## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) {
## if (length(list(...)) > 0) {
## stop("replyr::makeTempNameGenerator tempname generate unexpected argument")
## }
## if (peek) {
## return(names(nameList))
## }
## if (dumpList) {
## v <- names(nameList)
## nameList <<- list()
## return(v)
## }
## if (!is.null(remove)) {
## victims <- intersect(remove, names(nameList))
## nameList[victims] <<- NULL
## return(victims)
## }
## nm <- paste(prefix, suffix, sprintf("%010d", count),
## sep = "_")
## nameList[[nm]] <<- 1
## count <<- count + 1
## nm
## }
## }
##
##
compute
( SQL ). :#
names <- paste('table', 1:5, sep='_')
tables <- lapply(names,
function(ni) {
di <- data.frame(key= 1:3)
di[[paste('val',ni,sep='_')]] <- runif(nrow(di))
copy_to(sc, di, ni)
})
#
tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')
#
joined <- tables[[1]]
for(i in seq(2,length(tables))) {
ti <- tables[[i]]
if(icode>
Sparklyr
, , .Spark
R
, replyr
dplyr
sparklyr
.sparklyr::spark_disconnect(sc)
rm(list=ls())
gc()
## used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 821292 43.9 1442291 77.1 1168576 62.5
## Vcells 1364897 10.5 2552219 19.5 1694265 13.0