library(dplyr)
##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library(sparklyr)
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config <- spark_config()
config$spark.executor.memory = "8G"
config$spark.driver.memory = "16G"
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)
尚未連上Spark的介面
已連上Spark的介面, 變數為空
gabq = "hdfs:/home/tonychuo/GABQ/"
GA = spark_read_parquet(sc, "ga", paste0(gabq, "pq/GAID"))
此Connections即為Spark的環境, 因此讀取的資料’ga’會出現在Connections裡面
而原本RStudio本機的環境, 則會紀錄連結Spark的位址, 以List形態存放
collect
把資料從Spark裡面抓回本機GA %>% filter(eCommerceAction_type > 0) %>%
group_by(sid) %>%
mutate(act = as.character(eCommerceAction_type)) %>%
arrange(time) %>%
summarise(
uid = first_value(uid),
channel = first_value(channelGrouping),
country = first_value(country),
city = first_value(city),
revenue = first_value(totalTransactionRevenue),
n_trans = first_value(transactions),
on_site = first_value(timeOnSite),
n_page = first_value(pageviews),
date = first_value(date),
landing = first_value(pageTitle),
acts = paste(collect_list(act), sep="")
) %>% collect -> SX # sessions with action
count(SX) # 175293
## # A tibble: 1 x 1
## n
## <int>
## 1 175293
因此未經filter
篩選條件的SX0資料數目比SX龐大,而且做group by
之後會耗掉很多運算能力,我們擔心一次collect不回來,造成Spark堵塞,所以需要分段進行:
sx0 = GA %>% group_by(sid) %>%
arrange(time) %>%
summarise(
uid = first_value(uid),
channel = first_value(channelGrouping),
country = first_value(country),
city = first_value(city),
revenue = first_value(totalTransactionRevenue),
n_trans = first_value(transactions),
on_site = first_value(timeOnSite),
n_page = first_value(pageviews),
date = first_value(date),
landing = first_value(pageTitle)
) # sessions without action
count(sx0) # 524386
## # Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 524386
由於SX0未做collect
,因此在本機Global Environment內看不到完整的資料框,只能看到List存放的連結。
若我們想將資料從Hadoop裡面寫回Spark,則需執行spark_write_csv
。
因為spark的特性,在寫回spark的時候Header會重複。
為了避免此問題發生,我們先把資料框的欄位名稱記錄下來,並將Header的設定為False,再之後自己把欄位名稱加回資料框。
nx = colnames(sx0) # 紀錄欄位名稱
spark_write_csv(sx0, 'hdfs:/home/b041010004/GA/SX0.csv', header=F, # 不要欄位名稱
delimiter="\t", quote="", mode='overwrite')
此時,到RStudio工作選單裡:Tool >> Terminal >> New Terminal,並在Terminal裡打入以下指令(記得把目錄位址改成自己的),以確認是否寫入成功:
hdfs dfs -ls /home/b041010004/GA
若回傳以下代表寫入成功:
💡Unix指令教學
▪ 操作Hadoop時,都要先打hdfs dfs
▪ -ls
代表要查詢目錄位址
▪ home/後面接的是自己的RStudio帳號名稱及資料夾位址
注意上圖Terminal中寫著:[b041010004@hnamenode ~],代表因為目前的工作路徑在Home裡。
因此,我們須先透過cd
將Terminal中的工作路徑設定至你要匯出的資料夾位址,才不會出錯。以本案為例,我們欲匯出到在Home底下名為’GA’的資料夾:
cd GA
為了將檔案從切碎的Hadoop裡面整合回Spark,我們將使用getmerge
的指令來執行:
hdfs dfs -getmerge /home/b041010004/GA/SX0.csv SX0.csv
(前半段hdfs dfs -getmerge /home/b041010004/GA/SX0.csv
是在Hadoop的檔案名稱;後面的SX0.csv
是你要命名的檔案名稱)
接著,你就會發現在Files內可以看到’SX0.csv’檔了:
此時,就可以回到RStudio的Console把SX0讀到本機的Global Environment進行操作了
library(readr)
SX0 = read_delim("SX0.csv", col_names=F, delim="\t", quote="")
## Parsed with column specification:
## cols(
## X1 = col_integer(),
## X2 = col_integer(),
## X3 = col_character(),
## X4 = col_character(),
## X5 = col_character(),
## X6 = col_double(),
## X7 = col_integer(),
## X8 = col_integer(),
## X9 = col_integer(),
## X10 = col_datetime(format = ""),
## X11 = col_character()
## )
names(SX0) = nx # 重新把欄位名稱命名
attr(SX0, 'spec') = NULL
# save(SX, SX0, file="SX.rdata", compress=T) # 可以將好不容易整好的資料存下, 下次只要用laod就可以開始分析囉
最後請記得一定要斷開Spark的連結,釋放資源給其他人使用。
spark_disconnect(sc)
## NULL
我們創造的兩個資料框皆是以session為最小單位:
💡網站瀏覽動作類型(eCommerceAction_type)介紹
▪ 產品清單點閱率 = 1
▪ 產品詳細資料檢視 = 2
▪ 在購物車中放進產品 = 3
▪ 從購物車中移除產品 = 4
▪ 結帳 = 5
▪ 完成購買 = 6
▪ 退貨 = 7
▪ 結帳選項 = 8
▪ 不明 = 06
另外,為了看出各個session的瀏覽途徑,我們作出了acts
的字串變數,裡頭的數字即對應上述瀏覽動作類型,可拿來分析網站瀏覽行為的軌跡。
load('SX.rdata')
SX$date = as.Date(SX$date, format('%Y-%m-%d'))
SX0$date = as.Date(SX0$date, format('%Y-%m-%d'))
summary(SX) # 175293筆資料
## sid uid channel country
## Min. : 1 Min. : 1 Length:175293 Length:175293
## 1st Qu.:131088 1st Qu.: 97775 Class :character Class :character
## Median :262149 Median :194640 Mode :character Mode :character
## Mean :262094 Mean :194882
## 3rd Qu.:393179 3rd Qu.:292227
## Max. :524380 Max. :389929
## city revenue n_trans
## Length:175293 Length:175293 Length:175293
## Class :character Class :character Class :character
## Mode :character Mode :character Mode :character
##
##
##
## on_site n_page date
## Length:175293 Length:175293 Min. :2016-07-31
## Class :character Class :character 1st Qu.:2016-11-08
## Mode :character Mode :character Median :2017-01-23
## Mean :2017-01-29
## 3rd Qu.:2017-04-30
## Max. :2017-07-31
## landing acts
## Length:175293 Length:175293
## Class :character Class :character
## Mode :character Mode :character
##
##
##
summary(SX0) # 524386筆資料
## sid uid channel country
## Min. : 1 Min. : 1 Length:524386 Length:524386
## 1st Qu.:131097 1st Qu.: 97782 Class :character Class :character
## Median :262194 Median :194674 Mode :character Mode :character
## Mean :262194 Mean :194955
## 3rd Qu.:393290 3rd Qu.:292259
## Max. :524386 Max. :389934
##
## city revenue n_trans on_site
## Length:524386 Min. : 1.2 Min. : 1 Min. : 1.0
## Class :character 1st Qu.: 53.2 1st Qu.: 1 1st Qu.: 45.0
## Mode :character Median : 126.0 Median : 1 Median : 130.0
## Mean : 193.2 Mean : 1 Mean : 340.3
## 3rd Qu.: 240.0 3rd Qu.: 1 3rd Qu.: 380.0
## Max. :47082.1 Max. :25 Max. :19017.0
## NA's :500495 NA's :500459 NA's :132303
## n_page date landing
## Min. : 1.000 Min. :2016-07-31 Length:524386
## 1st Qu.: 1.000 1st Qu.:2016-11-18 Class :character
## Median : 4.000 Median :2017-02-10 Mode :character
## Mean : 6.812 Mean :2017-02-09
## 3rd Qu.: 8.000 3rd Qu.:2017-05-13
## Max. :469.000 Max. :2017-07-31
## NA's :62
rf = SX0 %>%
mutate(days = as.integer(as.Date("2017-08-01") - date)) %>% # 最後一天為:2017-07-31
group_by(uid) %>% summarise(
recent = min(days), # 最近來網站距今天數
freq = n(), # 來網站次數
senior = max(days), # 第一次來網站距今天數
since = min(date) # 第一次來網站日期
) %>% data.frame
summary(rf)
## uid recent freq senior
## Min. : 1 Min. : 1.0 Min. : 1.000 Min. : 1.0
## 1st Qu.: 97484 1st Qu.: 74.0 1st Qu.: 1.000 1st Qu.: 77.0
## Median :194968 Median :164.0 Median : 1.000 Median :168.0
## Mean :194968 Mean :167.3 Mean : 1.345 Mean :170.9
## 3rd Qu.:292451 3rd Qu.:252.0 3rd Qu.: 1.000 3rd Qu.:256.0
## Max. :389934 Max. :366.0 Max. :244.000 Max. :366.0
## since
## Min. :2016-07-31
## 1st Qu.:2016-11-18
## Median :2017-02-14
## Mean :2017-02-11
## 3rd Qu.:2017-05-16
## Max. :2017-07-31
# table(SX0$uid) %>% table # 查看freq有無做錯
p0 = par(cex=0.8, mfrow=c(3,1), mar=c(3,3,4,2))
hist(rf$recent,20,main="recency",ylab="",xlab="")
hist(pmin(rf$freq, 10),0:10,main="frequency",ylab="",xlab="")
hist(rf$senior,20,main="seniority",ylab="",xlab="")
rfm = SX %>%
filter(revenue != 'NA') %>%
mutate(days = as.integer(as.Date("2017-08-01") - date)) %>% # 最後一天為:2017-07-31
group_by(uid) %>% summarise(
recent = min(days), # 最近來網站距今天數
freq = n(), # 來網站次數
money = mean(as.numeric(revenue)), # 平均購買金額
senior = max(days), # 第一次來網站距今天數
since = min(date) # 第一次來網站日期
) %>% data.frame
summary(rfm)
## uid recent freq money
## Min. : 12 Min. : 1.0 Min. : 1.000 Min. : 1.20
## 1st Qu.: 98709 1st Qu.: 82.0 1st Qu.: 1.000 1st Qu.: 49.82
## Median :193203 Median :173.0 Median : 1.000 Median : 125.00
## Mean :194246 Mean :166.1 Mean : 1.197 Mean : 177.29
## 3rd Qu.:290594 3rd Qu.:240.0 3rd Qu.: 1.000 3rd Qu.: 231.20
## Max. :389904 Max. :366.0 Max. :58.000 Max. :16032.75
## senior since
## Min. : 1.0 Min. :2016-07-31
## 1st Qu.: 87.0 1st Qu.:2016-11-29
## Median :183.0 Median :2017-01-30
## Mean :172.1 Mean :2017-02-09
## 3rd Qu.:245.0 3rd Qu.:2017-05-06
## Max. :366.0 Max. :2017-07-31
# table(SX0$uid) %>% table # 查看freq有無做錯
p0 = par(cex=0.8, mfrow=c(2,2), mar=c(3,3,4,2))
hist(rfm$recent,20,main="recency",ylab="",xlab="")
hist(pmin(rfm$freq, 10),0:10,main="frequency",ylab="",xlab="")
hist(rfm$senior,20,main="seniority",ylab="",xlab="")
hist(log(rfm$money,10),main="log(money)",ylab="",xlab="")
set.seed(2019) # 讓每次分群結果一致
rf$group = kmeans(scale(rf[,2:3]),10)$cluster
rfm$group = kmeans(scale(rfm[,2:4]),10)$cluster
table(rf$group)
##
## 1 2 3 4 5 6 7 8 9 10
## 39281 836 82377 64509 16 56735 9128 22174 44790 70088
table(rfm$group)
##
## 1 2 3 4 5 6 7 8 9 10
## 2186 1980 40 3176 3511 4898 3010 933 7 237
library(googleVis)
## Creating a generic function for 'toJSON' from package 'jsonlite' in package 'googleVis'
##
## Welcome to googleVis version 0.6.4
##
## Please read Google's Terms of Use
## before you start using the package:
## https://developers.google.com/terms/
##
## Note, the plot method of googleVis will by default use
## the standard browser to display its output.
##
## See the googleVis package vignettes for more details,
## or visit https://github.com/mages/googleVis.
##
## To suppress this message use:
## suppressPackageStartupMessages(library(googleVis))
op = options(gvis.plot.tag = 'chart')
rf1 = rf %>%
group_by(group) %>% summarise( # 改以分群的結果為群組
size = n(),
recent = mean(recent),
freq = mean(freq),
senior = mean(senior)
)
rf1$dummy = 2016
grpname = c('01','02','03','04','05','06','07','08','09','10') # 重新命名十個組別的名稱
rf1$group = grpname[as.integer(rf1$group)]
rfm1 = rfm %>%
group_by(group) %>% summarise(
size = n(),
recent = mean(recent),
freq = mean(freq),
tot_revenue = sum(money),
avg_revenue = mean(money),
senior = mean(senior)
)
rfm1$dummy = 2016
grpname = c('01','02','03','04','05','06','07','08','09','10')
rfm1$group = grpname[as.integer(rfm1$group)]
# save(rf, rfm, file='RFM.rdata')
op = options(gvis.plot.tag='chart')
load("RFM1.rdata")
plot( gvisMotionChart(rf1, "group", "dummy") )
plot( gvisMotionChart(rfm1, "group", "dummy") )