-

   rss_rss_hh_new

 - e-mail

 

 -

 LiveInternet.ru:
: 17.03.2011
:
:
: 51

:


[] R : Replyr

, 29 2017 . 15:37 +
replyr REmote PLYing of big data for R ( R).

replyr? ( Spark).

, data.frame. replyr :
  • : replyr_summary().
  • : replyr_union_all().
  • : replyr_bind_rows().
  • , , (dplyr::do()): replyr_split(), replyr::gapply().
  • /: replyr_moveValuesToRows() / replyr_moveValuesToColumns().
  • .
  • .

, , Spark sparklyr .

replyr R , .

.

, .
base::date()

## [1] "Thu Jul  6 15:56:28 2017"

# devtools::install_github('rstudio/sparklyr')
# devtools::install_github('tidyverse/dplyr')
# devtools::install_github('tidyverse/dbplyr')
# install.packages("replyr")
suppressPackageStartupMessages(library("dplyr"))
packageVersion("dplyr")

## [1] '0.7.1.9000'

packageVersion("dbplyr")

## [1] '1.1.0.9000'

library("tidyr")
packageVersion("tidyr")

## [1] '0.6.3'

library("replyr")
packageVersion("replyr")

## [1] '0.4.2'

suppressPackageStartupMessages(library("sparklyr"))
packageVersion("sparklyr")

## [1] '0.5.6.9012'

#  ,    https://github.com/rstudio/sparklyr/issues/783
config <- spark_config()
config[["sparklyr.shell.driver-memory"]] <- "8G"
sc <- sparklyr::spark_connect(version='2.1.0', 
                              hadoop_version = '2.7',
                              master = "local",
                              config = config)

Summary


summary() glance(), Spark.
mtcars_spark <- copy_to(sc, mtcars)

#  ,   
summary(mtcars_spark)

##     Length Class          Mode
## src 1      src_spark      list
## ops 2      op_base_remote list

packageVersion("broom")

## [1] '0.4.2'

broom::glance(mtcars_spark)

## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl

replyr_summary .
replyr_summary(mtcars_spark) %>%
  select(-lexmin, -lexmax, -nunique, -index)

##    column   class nrows nna    min     max       mean          sd
## 1     mpg numeric    32   0 10.400  33.900  20.090625   6.0269481
## 2     cyl numeric    32   0  4.000   8.000   6.187500   1.7859216
## 3    disp numeric    32   0 71.100 472.000 230.721875 123.9386938
## 4      hp numeric    32   0 52.000 335.000 146.687500  68.5628685
## 5    drat numeric    32   0  2.760   4.930   3.596563   0.5346787
## 6      wt numeric    32   0  1.513   5.424   3.217250   0.9784574
## 7    qsec numeric    32   0 14.500  22.900  17.848750   1.7869432
## 8      vs numeric    32   0  0.000   1.000   0.437500   0.5040161
## 9      am numeric    32   0  0.000   1.000   0.406250   0.4989909
## 10   gear numeric    32   0  3.000   5.000   3.687500   0.7378041
## 11   carb numeric    32   0  1.000   8.000   2.812500   1.6152000

/


tidyr .
mtcars2 <- mtcars %>%
  mutate(car = row.names(mtcars)) %>%
  copy_to(sc, ., 'mtcars2')

# 
mtcars2 %>% 
  tidyr::gather('fact', 'value')

## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

mtcars2 %>%
  replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', 
                          nameForNewValueColumn= 'value', 
                          columnsToTakeFrom= colnames(mtcars),
                          nameForNewClassColumn= 'class') %>%
  arrange(car, fact)

## # Source:     lazy query [?? x 4]
## # Database:   spark_connection
## # Ordered by: car, fact
##            car  fact  value   class
##                
##  1 AMC Javelin    am   0.00 numeric
##  2 AMC Javelin  carb   2.00 numeric
##  3 AMC Javelin   cyl   8.00 numeric
##  4 AMC Javelin  disp 304.00 numeric
##  5 AMC Javelin  drat   3.15 numeric
##  6 AMC Javelin  gear   3.00 numeric
##  7 AMC Javelin    hp 150.00 numeric
##  8 AMC Javelin   mpg  15.20 numeric
##  9 AMC Javelin  qsec  17.30 numeric
## 10 AMC Javelin    vs   0.00 numeric
## # ... with 342 more rows


dplyr bind_rows, union union_all Spark. replyr::replyr_union_all() replyr::replyr_bind_rows() .

bind_rows()


db1 <- copy_to(sc, 
               data.frame(x=1:2, y=c('a','b'), 
                          stringsAsFactors=FALSE),
               name='db1')
db2 <- copy_to(sc, 
               data.frame(y=c('c','d'), x=3:4, 
                          stringsAsFactors=FALSE),
               name='db2')

#  -     ,   
bind_rows(list(db1, db2))

## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl

union_all


#         
union_all(db1, db2)

## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     1     a
## 2     2     b
## 3     3     c
## 4     4     d

union


#         
#  ,    
union(db1, db2)

## # Source:   lazy query [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     4     d
## 2     1     a
## 3     3     c
## 4     2     b

replyr_bind_rows


replyr::replyr_bind_rows data.frame-.
replyr_bind_rows(list(db1, db2))

## # Source:   table [?? x 2]
## # Database: spark_connection
##       x     y
##    
## 1     1     a
## 2     2     b
## 3     3     c
## 4     4     d

dplyr::do


. : arrange, ( Spark).

dplyr::do


help('do', package='dplyr'):
by_cyl <- group_by(mtcars, cyl)
do(by_cyl, head(., 2))

## # A tibble: 6 x 11
## # Groups:   cyl [3]
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 2  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 3  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 4  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

dplyr::do Spark


by_cyl <- group_by(mtcars_spark, cyl)
do(by_cyl, head(., 2))

## # A tibble: 3 x 2
##     cyl     V2
##    
## 1     6 
## 2     4 
## 3     8 

, .

replyr /


mtcars_spark %>%
  replyr_split('cyl', 
               partitionMethod = 'extract') %>%
  lapply(function(di) head(di, 2)) %>%
  replyr_bind_rows()

## # Source:   table [?? x 11]
## # Database: spark_connection
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 2  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 3  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 4  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

replyr gapply


mtcars_spark %>%
  gapply('cyl',
         partitionMethod = 'extract',
         function(di) head(di, 2))

## # Source:   table [?? x 11]
## # Database: spark_connection
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21.0     6 160.0   110  3.90 2.620 16.46     0     1     4     4
## 2  21.0     6 160.0   110  3.90 2.875 17.02     0     1     4     4
## 3  22.8     4 108.0    93  3.85 2.320 18.61     1     1     4     1
## 4  24.4     4 146.7    62  3.69 3.190 20.00     1     0     4     2
## 5  18.7     8 360.0   175  3.15 3.440 17.02     0     0     3     2
## 6  14.3     8 360.0   245  3.21 3.570 15.84     0     0     3     4

replyr::replyr_apply_f_mapped


: , (.. , ).

, , . , . replyr::replyr_apply_f_mapped(), .

, , , (, ). ( ), , ( , wrapr::let()).
#         
DecreaseRankColumnByOne <- function(d) {
  d$RankColumn <- d$RankColumn - 1
  d
}

d ( , , !), replyr::replyr_apply_f_mapped() :
#  
d <- data.frame(Sepal_Length = c(5.8,5.7),
                Sepal_Width = c(4.0,4.4),
                Species = 'setosa',
                rank = c(1,2))

#    
DecreaseRankColumnByOneNamed <- function(d, ColName) {
  replyr::replyr_apply_f_mapped(d, 
                                f = DecreaseRankColumnByOne, 
                                nmap = c(RankColumn = ColName),
                                restrictMapIn = FALSE, 
                                restrictMapOut = FALSE)
}

# 
dF <- DecreaseRankColumnByOneNamed(d, 'rank')
print(dF)

##   Sepal_Length Sepal_Width Species rank
## 1          5.8         4.0  setosa    0
## 2          5.7         4.4  setosa    1

replyr::replyr_apply_f_mapped() , DecreaseRankColumnByOne ( nmap), DecreaseRankColumnByOne , .


Sparklyr . dplyr::copy_to() dplyr::compute(). .

replyr , : , ( ).

:
print(replyr::makeTempNameGenerator)

## function (prefix, suffix = NULL) 
## {
##     force(prefix)
##     if ((length(prefix) != 1) || (!is.character(prefix))) {
##         stop("repyr::makeTempNameGenerator prefix must be a string")
##     }
##     if (is.null(suffix)) {
##         alphabet <- c(letters, toupper(letters), as.character(0:9))
##         suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), 
##             collapse = "")
##     }
##     count <- 0
##     nameList <- list()
##     function(..., peek = FALSE, dumpList = FALSE, remove = NULL) {
##         if (length(list(...)) > 0) {
##             stop("replyr::makeTempNameGenerator tempname generate unexpected argument")
##         }
##         if (peek) {
##             return(names(nameList))
##         }
##         if (dumpList) {
##             v <- names(nameList)
##             nameList <<- list()
##             return(v)
##         }
##         if (!is.null(remove)) {
##             victims <- intersect(remove, names(nameList))
##             nameList[victims] <<- NULL
##             return(victims)
##         }
##         nm <- paste(prefix, suffix, sprintf("%010d", count), 
##             sep = "_")
##         nameList[[nm]] <<- 1
##         count <<- count + 1
##         nm
##     }
## }
## 
## 

, , compute ( SQL ). :
#    
names <- paste('table', 1:5, sep='_')
tables <- lapply(names, 
                 function(ni) {
                   di <- data.frame(key= 1:3)
                   di[[paste('val',ni,sep='_')]] <- runif(nrow(di))
                   copy_to(sc, di, ni)
                 })

#    
tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')

#     
joined <- tables[[1]]
for(i in seq(2,length(tables))) {
  ti <- tables[[i]]
  if(icode>

( , ) . , Sparklyr, , .


Spark R, replyr dplyr sparklyr.
sparklyr::spark_disconnect(sc)
rm(list=ls())
gc()

##           used (Mb) gc trigger (Mb) max used (Mb)
## Ncells  821292 43.9    1442291 77.1  1168576 62.5
## Vcells 1364897 10.5    2552219 19.5  1694265 13.0
Original source: habrahabr.ru (comments, light).

https://habrahabr.ru/post/334398/

:  

: [1] []
 

:
: 

: ( )

:

  URL