-이번장에서 배울 것
0.일반 RDD로 부터 키-값 형태의 Pair RDD를 어떻게 생성하는지.
-Pair RDD는 RDD의 특별한 형태이다. 각 원소들이 키-값 형태로 되어 있고, 키와 벨류는 어
떤 타입도 될 수 있다.
-왜 Pair RDD를 사용하냐면, 바로 맵리듀스 알고리즘을 사용하기 위해서이다. 흔한 데이터
처리에 필요한 많은 추가적인 함수들이 이용가능하다.
ex)sorting, joining, grouping, counting, etc.
-Pair RDD를 만들기.
우선 map, flatMap/flatMapValues, keyBy와 같은 함수가 필요하다.
1.Pair RDD에 이용가능한 특별한 연산은 어떤게 있는지.
1.간단한 Pair RDD만들기.
users= sc.textFile(file).map(lambda line: line.split(‘\t’).map(lambda fields: (fields[0], fields[1]))
->형태를 만들어준다.
-userID에 따라 웹로그기록을 keying하기.
sc.textFile(logfile).keyBy(lambda line: line.split(‘ ’)[2])
-> KeyBy를 활용하여 밑에서 해본다.
2.하나의 레코드를 여러 Pair로 매핑하기.
sc.textFile(file).map(lambda line: line.split(‘\t’)) \
.map(lambda fields: (fields[0], fields[1])) \ #Pair RDD를 만들기 위한 튜플 형식으로 바꿔줌.
.flatMapValues(lambda skus: skus.split(“:”)) # ":"기준으로 하나의 ID에 가진 values 만큼 레코드가 생성됨.
2.맵리듀스 알고리즘이 스파크에서 어떻게 작동하는지.
-맵리듀스는 거대한 데이터 셋을 쉽게 분산처리 해버릴 수 있습니다.
-하둡 맵리듀스의 주요 작동법은 다소 제한적인데, 우선 각 작업에는 하나의 맵함수와 하나의
리듀스 함수가 있어야한다. 작업의 아웃풋은 파일로 저장이 된다. 스파크에서 하는 맵리듀스
는 훨씬 적응성이 좋다. 맵리듀스 함수들은 다른 보통이 아닌 방법들과 혼합할 수 있다. 작업
결과는 메모리에 저장되고, 연산은 쉽게 연결될 수 있다.
-PairRDD의 스파크에서의 맵리듀스 함수
-Map Phase
1.한번에 하나의 레코드를 처리한다.
2.맵은 하나의 레코더 또는 다수 개의 레코더를 의미한디.
3.map, flatMap, filter, keyBy 가 있다.
-Reduce Phase
1.맵에서 작업한 결과이다.
2.다양한 레코드들 강화한다.
3.예를 들면 reduceByKey, sortByKey, mean
-단어의 개수를 세는 예시
wordcount = sc.textFile(mydata).flatMap(lambda line: line.split())\
.map(lambda word: (word, ,1)) # RDD형태로 만들어 줌.
.reduceByKey(lambda v1,v2: v1+v2)) #연산을 순서가 다를 수 있기에 교환/결합 법칙이 성립해야함.
-PairRDD의 연산자들
-countByKey: 각 키에서 출현 횟수에 따라 맵을 리턴한다.
-groupByKey: RDD에서 각 키값에 대한 모든 값들을 그룹화 시켜준다.
-sortByKey: 오름/내림 차순 정렬.
-join: 두 RDD로 부터 매칭 되는 키를 기준으로 모든 pair를 포함하는 새로운 RDD로 리턴.
movies = moviegross.join(movieyear) #키-값 pair RDD로 분리->키에 따라 조인->포맷에 따라 데이터 조인 ->저장, 리턴, 작업 . . . . .
-join하는 과정을 시원하게 예시로 보여줍니다.
1.두 파일을 각각 pair RDD형태로 바꿔줍니다.
import re
def getRequestDoc(s):
return re.search(r’KBDOC-[0-9]*’,s).group()
kbreqs = sc.textFile(logfile)
.filter(lambda line: ‘KBDOC-’ in line) \
.map(lambda line: (getRequestDoc(line) , line.split(‘ ’)[2])).distinct()
kblist = sc.textFile(kblistfile).map(lambda line: line.split(‘:’)) \
.map(lambda fields: (fields[0], fields[1]))
2.키를 기준으로 join하기
titlereqs = kbreqs.join(kblist)
3.원하는 포맷(아이디, 제목) 으로 결과 맵함수 적용하기.
titlereqs = kbreqs.join(kblist) \
.map(lambda (docid,(userid,title)): (userid, title))
4.계속되는 작업 - 아이디에 따라 제목 그룹핑하기.
titlereqs = kbreqs.join(kblist) \
.map(lambda (docid,(userid,title)): (userid, title))
.groupByKey()
-다른 Pair 연산자들
-keys: 값없이 키값만 리턴
-values: 키값없이 값만 리턴.
-lookup(key): key에 대한 값을 리턴.
-leftOuterJoin, rightOuterJoin, fullOuterJoin: 왼쪽, 오른쪽, 둘다에 해당하는 기준에 맞추어
조인.
-mapValues, flatMapValues: 키는 그대로 유지하면서 값에 대해서만 함수를 실행시킴.
-주요 포인트
1.pair RDD는 키-값(튜플형태)으로 구성된 RDD의 특별한 형태.
2.스파크는 pair RDD로 작업하는 몇가지 연산을 제공함.
3.맵리듀스는 분산처리를 위한 일반적인 프로그래밍 모델임.
-스파크는 pair RDD를 맵리듀스로 수행함.
-하둡 맵리듀스와 다른 작업방법은 작업마다 하나의 맵과 하나의 리듀스를 하는데에 제한적이다. spark good.
-스파크는 맵과 리듀스 연산의 유연한 체이닝(chaining)을 제공한다.
-스파크는 joining, sorting, grouping과 같은 흔한 맵리듀스 알고리즘을 쉽게 수행하게끔 연산자를 제공한다.
'빅데이터 시스템' 카테고리의 다른 글
스파크 병렬 처리 (0) | 2019.12.15 |
---|---|
스파크 애플리케이션 작성 및 전개 (0) | 2019.12.15 |
RDD와 함께 스파크 활용하기. (0) | 2019.12.15 |
스파크 기초 (0) | 2019.12.15 |
데이터 파일 파티셔닝 (0) | 2019.12.15 |
댓글