前言:商業大數據分析方法

在真實的數據中,數據往往動輒千萬或億,當為了進一步分析而將數據轉為內容相關矩陣或是將資料合併時,更使資料筆數大幅提升,此時並不能依照以往的方式處理,因為資料的龐大將會需要非常久的時間處理,每一步轉換、處理分析都要再等一次,還有可能等了好幾個小時,最後發現資料無法讀入。
那這個時候要如何分析呢?我們將介紹一套系統,不僅能夠將龐大的資料在極短時間內讀取並觀看,還能夠將數億筆資料直接性的用眾所皆知的dyplr四大要素:select、arrange、filter、group_by來進行資料過篩,猶如在面對只有幾萬的小數據。

本章主要內容

1.大數據存儲與運算系統介紹
2.基本操作與函數介紹

Hadoop與Spark系統運作流程圖

Hadoop與Spark系統運作流程圖


現在就讓我們來介紹一下如此威猛的大數據分析系統與其元素吧!這套系統主要分成兩個部分,分別為:

簡單而言,hadoop分布式檔案系統就是好比一個由很多電腦組成的伺服器,而我們將手上可觀數目的數據切碎分別投入到這些電腦當中,讓每台電腦都有分布部分的數據(有可能重疊),這樣能夠加高效能、使用數據的速度,方便於存儲與之後的運算。

[維基百科] 而Spark由許多組成的要素,我們在此也只說明「Spark核心與彈性分散式資料集」Spark最主要的功能(核心)就是提供分散式任務調度而其基礎的程式抽象則稱為彈性分散式資料集(RDDs),是一個可以並列操作、有容錯機制的資料集合。RDDs可以透過參照外部儲存系統的資料集建立(例如:共用檔案系統、HDFS、HBase或其他Hadoop資料格式的資料來源)。或者是透過在現有RDDs的轉換而建立(比如:map、filter、reduce、join等等)。 RDD抽象化是經由一個以Scala, Java,Python的語言整合API所呈現,簡化了編程複雜性,應用程式操縱RDDs的方法類似於操縱本地端的資料集合。,用途很廣泛,而我們這裡則使用於「分散式計算」,,因為資料非常的有規模,我們就必須

精簡來說,Hadoop是個很有效率的存儲方式,而Spark則是能夠迅速的將這些資料在未下載的情況下(仍在hadoop中),進行一系列資料轉換、篩選、排序、計算,如dyplr中select、group_by、arrange、filter,讓3億多筆資料的大數據如同僅有一萬筆資料的小數據一般,在一分鐘內完成運算。

如同我們剛剛講的,我們透過Spark在hadoop上進行資料的計算,你所命名計算之資料雖然也會出現在environment中,但是出現的名稱,僅僅只是hadoop上資料的connection,又稱「Handler connector」,在enviroment以list格式出現,負責連結在hadoop的大型資料,因此他們仍在hadoop上,必須要透過collect()此函數將數據抓取,你所命名之資料才會抓取到你的R studio的伺服器上,就可以直接的使用。(搭配上圖會更清楚)

若仍不清楚的話,請直接依序操作下列CODE,手把手教學,讓你以實際動作直接學習、體會!!!

✿ : 導入基本設定
大數據資料導入起手式,連接上spark context(Spark的環境)

library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
library(sparklyr)
Sys.setenv(SPARK_HOME="/usr/local/spark/spark-2.1.0-bin-hadoop2.7/")
config <- spark_config()
config$spark.executor.memory = "8G"
config$spark.driver.memory = "8G"
config$spark.yarn.executor.memoryOverhead = "4096"
sc <- spark_connect(master = "spark://hnamenode:7077", config = config)

✿ : 載入基本資料
我們可以明顯看到放上hadoop的數據檔案,以「網址」的形式出現。我們將它命名為acquire,接著我們透過spark_read_parquet()這個函數來將資料跑到spark環境中(parquet有點像是rdata),讀取之後我們可以透過Rstudio中的「connections」看到每個欄位的一些資訊,由於此時我們不能利用nrow()來觀察資料,因為資料被切碎分配到不同的電腦上,為此,我們就得透過“count()”這個函數來看此資料的總筆數,而count(資料,欄位),則能告訴你那個欄位每個不同的觀察值有多少個數,另外我們仍能透過colnames()函數來瞭解資料的每個欄位名稱。

acquire = "hdfs:/home/tonychuo/acquire/"
TX = spark_read_parquet(sc, "tx", paste0(acquire, "pq/TX"))
count(TX)  # 349,655,789 order items
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##           n
##       <dbl>
## 1 349655789
colnames(TX)
##  [1] "id"               "chain"            "dept"            
##  [4] "category"         "company"          "brand"           
##  [7] "date"             "productsize"      "productmeasure"  
## [10] "purchasequantity" "purchaseamount"

✿ : 進行篩選
由此我們可以看到經過Spark的切割存儲之後,再大的數據都能夠使用dyplr套件的方式進行資料探勘,就猶如小資料的數據分析一般簡易操作。我們一開始先使用Select將我們需要的欄位進行擷取,並且找出所求之各欄位不同之觀察值數量。 此時注意!!!我們將TX前兩百項觀察值從hadoop上抓取下來並且命名df,可以藉此查看有否執行collect()函數的差異。 在進行任何運算命名之後,只要沒有collect 他都是屬於一種connection 不會直接在R環境裡面。

select(TX, id:brand) %>% summarise_all(n_distinct)
## # Source:   lazy query [?? x 6]
## # Database: spark_connection
##       id chain  dept category company brand
##    <dbl> <dbl> <dbl>    <dbl>   <dbl> <dbl>
## 1 311541   134    83      836   32773 35689
TX %>% head(200) %>% collect -> df

✿ : 資料格式轉換
我們將日期格式轉換,並且將資料依照id,date,chain進行分組,建立出id,date,chain、購買總數、特定產品購買數量與產品購買總金額之新資料框。 其中sdf_with_unique_id(id=‘order’)是將這些分組好的資料額外在設定一個「訂單編號」。

ORD = TX %>% mutate(date = as.Date(date)) %>% 
  group_by(id, date, chain) %>% summarise(
    items = n(),
    qty = sum(purchasequantity),
    amount = sum(purchaseamount)
  ) %>% sdf_with_unique_id(id = 'order')
## Warning: Missing values are always removed in SQL.
## Use `SUM(x, na.rm = TRUE)` to silence this warning

## Warning: Missing values are always removed in SQL.
## Use `SUM(x, na.rm = TRUE)` to silence this warning
#觀察數據筆數
count(ORD)                     # 26496798 order
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##          n
##      <dbl>
## 1 26496798
#我們想知道有多少不同的訂單,並且想找出離資料最早的訂單時間與最晚的訂單時間
ORD %>% summarise(n_distinct(order), min(date), max(date))
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MAX(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `MAX(x, na.rm = TRUE)` to silence this warning
## # Source:   lazy query [?? x 3]
## # Database: spark_connection
##   `n_distinct(order)` `min(date)` `max(date)`
##                 <dbl> <date>      <date>     
## 1            26496798 2012-03-01  2013-07-27

✿ : 建立RFM模型分析
RFM模型是衡量客戶價值和客戶創利能力的重要工具和手段。該模型通過一個客戶的近期購買行為、購買的總體頻率以及花了多少錢三項指標來描述該客戶的價值狀況。我們再電子商務平台上,能夠分析的資料包羅萬象,其中有三種重要的指標是能夠抓住主要獲利來源、顧客分布以及進行更深入行銷策略,此三個資料所建立的分析,又稱「RFM分析」。R(Recency最近一次購買時間)、F(Frequence購買頻率)、M(Monetary 交易金額)

rfm = ORD %>% mutate(k = datediff(as.Date("2013-07-28"), date)) %>% 
  group_by(id) %>% summarise(
    recent = min(k),
    freq = n(),
    money = mean(amount)
  ) %>% collect  # 資料筆數僅剩 311541
## Warning: Missing values are always removed in SQL.
## Use `MIN(x, na.rm = TRUE)` to silence this warning
## Warning: Missing values are always removed in SQL.
## Use `AVG(x, na.rm = TRUE)` to silence this warning
# 計算每位客人的訂單量 26496798/311541 = 85.051 orders per customer

#另外建立一個除去掉離群值之後的rfm分析
rfm2 = subset(
  rfm, money >= 5 & money <= 250 & 
    recent >=1 & recent <= 200 & freq <= 400)

由上方rfm分析我們可以得知,在進行篩選、整理之後,原資料(raw data)所剩的資料筆數只剩下約30萬筆,相較原先3億筆資料已經大幅度的下降,而我們使用Spark所建立的資料皆放在我們「hadoop」伺服器上,因此先前使用都是以「hadoop」的方式以伺服器進行資料的篩選、運算,而不是直接地放在Rstudio的伺服器上或我們使用的電腦本機上,格式通常都會呈現List的狀態,並且無法觀看其細節資訊,再加上在rstudio上面可以進行更多視覺化、美化以及更有彈性的函數應用,因此當我們整理到一定大小之後,就要使用collect()函數,將資料從我們存放的伺服器上抓下來。

#將抓下來的資料進行視覺化的分析,我們可以看到資料的分布,可以著手進行資料的美化、觀察,篩減離群值
hist(rfm2$money)

hist(rfm2$recent)

hist(rfm2$freq)

✿ : 建立熱圖分析

#進行熱圖的外觀設定
library(d3heatmap); library(RColorBrewer)
spectral = brewer.pal(11, "Spectral")
#pivot下方會講解概念
mxCD = sdf_pivot(TX, chain ~ dept) %>% collect
mxCD = as.matrix(mxCD); mxCD[is.na(mxCD)] = 0; dim(mxCD)
## [1] 134  84
rownames(mxCD) = paste0('chain', mxCD[,1]); mxCD = mxCD[,-1]
colnames(mxCD) = paste0('dept',colnames(mxCD))
dim(mxCD)
## [1] 134  83
#建立完美熱圖,要先看分布,若差異太大,取LOG可以改善。
range(mxCD); hist(mxCD) 
## [1]       0 3971119

range(log(1+mxCD)); hist(log(1+mxCD))
## [1]  0.00000 15.19456

d3heatmap(log(1+mxCD), col=rev(spectral))


第二堂

本章主要內容

1.Pivot函數介紹 2.抓取仍具規模的數據資料

✿ : 樞紐分析
有想過在一個超大資料分析時沒有table、tapply這兩個超爆好用的函數嗎?你猜對了!在Spark平台運算時,沒有辦法使用這兩個霹靂無敵好用加三級的函數,這時候就要使用sdf_pivot(資料,列呈現之變數觀察值~欄呈現之變數觀察值,function),這個功能與excel樞紐分析相似,會直接幫我們創建列連表,也就與table、tapply異曲同工之妙囉!但要注意的地方是,他會將"列變數觀察值"放入第一欄。(P.S. sdf意思為Spark DataFrame)

# tapply(TX$purchaseamount, list(TX$id,TX$dept), sum)
mxUD = sdf_pivot(TX, id ~ dept, list(purchaseamount='sum'))
count(mxUD) #311541
## # Source:   lazy query [?? x 1]
## # Database: spark_connection
##        n
##    <dbl>
## 1 311541
#為了之後重整欄位名稱,我們先將欄位獨立儲存起來。
nx = colnames(mxUD); nx
##  [1] "id" "0"  "1"  "2"  "3"  "4"  "5"  "6"  "7"  "8"  "9"  "10" "11" "12"
## [15] "13" "14" "15" "16" "17" "18" "19" "20" "21" "22" "23" "24" "25" "26"
## [29] "27" "28" "29" "30" "31" "32" "33" "34" "35" "36" "37" "38" "39" "40"
## [43] "41" "42" "43" "44" "45" "46" "47" "49" "50" "51" "53" "54" "55" "56"
## [57] "57" "58" "59" "60" "62" "63" "64" "65" "67" "69" "70" "71" "72" "73"
## [71] "74" "75" "78" "79" "81" "82" "91" "92" "94" "95" "96" "97" "98" "99"

✿ : 篩選過後檔案仍太大,該如何解決呢?

這個時候就必須先將自己的資料上傳到hadoop之後,再進行抓取下來 (我們以剛剛資料作為例子)

step1:你先將所要存的資料寫上你的hadoop位置 mxUD為要存儲數據,“hdfs:/home/m074111025/temp.csv”則為你的存儲位置,F則是將headr(表頭)設定不要儲存,因為我們資料室切碎的,如果表頭設定T的情況下,將會時不時出現表頭,導致數據變很混亂。

spark_write_csv(mxUD, "hdfs:/home/m074111032/temp.csv",F,mode='overwrite')

✿ : 存檔
將hadoop上面的資料抓取後,設定所要存放的路徑。(此為範例,要改為自己的路徑名稱)

#建立terminal,並且找到自己工作區域的路徑,並在terminal區域打入下列Coding
# hdfs dfs -getmerge /home/m074111025/temp.csv temp.csv

# hdfs dfs 這個是說明你要做的動作是在hodoop上來操作
#而getmerge則是因為hadoop資料都是經過切割成碎片放上伺服器,因此你要抓取資料轉為csv檔案存儲時,必須要將這些碎片合併。
# /home/m074111025/temp.csv則是你存儲路徑
#temp.csv 則為你所命名的檔案名稱

✿ : 斷開連結
當資料確定抓取下來後,為了避免伺服器資源持續地佔用,我們就會斷開連接。

spark_disconnect(sc)

✿ : 導入先前抓取並存到hadoop上的資料

pacman::p_load(readr)
mx = read_csv('temp.csv',col_names=F)
## Parsed with column specification:
## cols(
##   .default = col_double(),
##   X79 = col_character()
## )
## See spec(...) for full column specifications.
mx[,1:ncol(mx)] = lapply(mx[,1:ncol(mx)], as.numeric)
mx[is.na(mx)] = 0
colnames(mx) = c('id', paste0('dept',nx[2:84]))
colSums(is.na(mx))
##     id  dept0  dept1  dept2  dept3  dept4  dept5  dept6  dept7  dept8 
##      0      0      0      0      0      0      0      0      0      0 
##  dept9 dept10 dept11 dept12 dept13 dept14 dept15 dept16 dept17 dept18 
##      0      0      0      0      0      0      0      0      0      0 
## dept19 dept20 dept21 dept22 dept23 dept24 dept25 dept26 dept27 dept28 
##      0      0      0      0      0      0      0      0      0      0 
## dept29 dept30 dept31 dept32 dept33 dept34 dept35 dept36 dept37 dept38 
##      0      0      0      0      0      0      0      0      0      0 
## dept39 dept40 dept41 dept42 dept43 dept44 dept45 dept46 dept47 dept49 
##      0      0      0      0      0      0      0      0      0      0 
## dept50 dept51 dept53 dept54 dept55 dept56 dept57 dept58 dept59 dept60 
##      0      0      0      0      0      0      0      0      0      0 
## dept62 dept63 dept64 dept65 dept67 dept69 dept70 dept71 dept72 dept73 
##      0      0      0      0      0      0      0      0      0      0 
## dept74 dept75 dept78 dept79 dept81 dept82 dept91 dept92 dept94 dept95 
##      0      0      0      0      0      0      0      0      0      0 
## dept96 dept97 dept98 dept99 
##      0      0      0      0
colSums(mx[,2:84]) %>% sort %>% barplot

q20 = colSums(mx[,2:84]) %>% quantile(0.2)
table(colSums(mx[,2:84]) > q20)
## 
## FALSE  TRUE 
##    17    66
x = left_join(rfm2[,"id",F], data.frame(mx))
## Joining, by = "id"
identical(x$id, rfm2$id) # TRUE
## [1] TRUE
x = x[, colSums(mx) > q20]
x = x[,-1] %>% scale 
dim(x)
## [1] 310381     66
save(rfm, rfm2, x, file='rfm.rdata', compress=T)

💡 : 首先將所需要的packages套件下載下來並載入

pacman::p_load(d3heatmap, googleVis, readr, dplyr, RColorBrewer)


選擇所需要顏色11種

spectral = brewer.pal(11, "Spectral")


下載先前已儲存的rfm資料,裡頭有ID(顧客ID)、recent(最近一次購買離當前天數)、freq(購買頻率)、money(平均消費金額)四個欄位

load("rfm.rdata")
dim(x)
## [1] 310381     66


✿ : Kmeans分群分析

使用setseed(10),將隨機數固定,接著使用kmeans分群方法,將顧客分為80群,並選擇“MacQueen”演算法

t0 = Sys.time()
gc(); set.seed(10) 
##            used  (Mb) gc trigger   (Mb)  max used   (Mb)
## Ncells   930985  49.8    1442291   77.1   1442291   77.1
## Vcells 50373784 384.4  137124910 1046.2 137049613 1045.7
km = kmeans(x, 80, algorithm="MacQueen")
## Warning: did not converge in 10 iterations
Sys.time() - t0
## Time difference of 2.834129 mins


使用kmeans分群結果,近一步了解這80群的每一群內部的recent、freq、money的平均值,並用熱圖來呈現

km80 = km$cluster
table(km80) %>% sort
## km80
##    54     1     2    61    14    55    65    40     4    35    64     9 
##    69    75   180   208   271   295   392   491   547   614   614   659 
##    42     6    49    60    26     3    27    66    21    70    80    15 
##   702   831   854   891   924   986  1012  1024  1029  1088  1167  1293 
##    59    16    39    48    63    56    18    37    13    23    30    32 
##  1314  1343  1403  1505  1529  1533  1559  1559  1575  1637  1662  1690 
##    53     5    67    45    36     7    79    41    33    28    46    12 
##  1708  1811  1841  1878  1919  1977  2073  2257  2346  2365  2371  2387 
##    76    77    29    44    31    57    68    51    69     8    72    11 
##  2396  2496  2670  2739  2810  2827  2962  3078  3109  3280  3672  3787 
##    24    25    50    78    20    73    17    71    38    10    47    43 
##  4015  4081  4102  4892  4979  6058  6258  6945  7000  7167  7189  8620 
##    19    52    58    75    62    34    74    22 
##  9431 10373 14499 15697 16649 17399 27478 32265
df = sapply(split(as.data.frame.matrix(x), km80), colMeans)
range(df)
## [1] -0.9668977 31.2451342
log(1+df) %>% d3heatmap(colors=rev(spectral))


進一步用df這個資料集統整每一群的平均購買頻率、平均消費金額、每群總收益、族群大小、平均收益、平均最近一次購買離當前天數

rfm2 = cbind(rfm2, km80)
df = rfm2 %>% group_by(km80) %>% summarise(
  # avg_freq = mean(freq),
  '平均購買頻率' = mean(freq),
  '平均消費金額' = mean(money),
  '每群總收益' = sum(freq*money),
  '族群大小' = n(),
  '平均收益' = mean(freq*money),
  '平均最近一次購買離當前天數' = mean(recent),
  dummy = 2013)


Google Motion Chart I
🗿 : 請將flash開啟為“允許”,使用泡泡做資料視覺化並做調整與觀察族群間差異 我們先將時間因素排除在外,而要建成泡泡圖時必須要設定X軸與Y軸變數,因此我們再給予一個dummy(虛擬)變數進行X軸度的缺口。

op = options(gvis.plot.tag='chart')
plot(gvisMotionChart(
  df, "km80", "dummy",
  options=list(width=720, height=480)))


Google Motion Chart II
將時間因素考慮進去的動態泡泡圖

op = options(gvis.plot.tag='chart')
df = rfm2 %>% 
  mutate(km80 =  sprintf("%02d", km80)) %>% 
  group_by(km80) %>% summarise(
  '平均購買頻率' = mean(freq),
  '平均購買金額' = mean(money),
  '集群總營收貢獻' = sum(freq*money),
  '集群大小'  = n(),
  '平均營收貢獻'  = mean(freq*money),
  '平均距今購買天數' = mean(recent),
  year = 2013)

plot(gvisMotionChart(
  df, "km80", "year", options=list(width=800, height=600)))
# plot( gvisMotionChart(
#   subset(df, group_size >= 20 & group_size <= 1200),     
#   "kg", "dummy", options=list(width=800, height=600) ) )

🗿 : 問題:我們原先是依照dept(部門)作為我們的分群依據,那假若我們想改成以Brand(品牌)作為分類依據時,該如何做呢?