본문 바로가기
빅데이터 시스템

Pair RDD와 함께 데이터 병합.

by 볼록티 2019. 12. 15.
728x90
반응형

-이번장에서 배울 것

0.일반 RDD로 부터 키-값 형태의 Pair RDD를 어떻게 생성하는지.

 -Pair RDD는 RDD의 특별한 형태이다. 각 원소들이 키-값 형태로 되어 있고, 키와 벨류는 어

  떤 타입도 될 수 있다.

 -왜 Pair RDD를 사용하냐면, 바로 맵리듀스 알고리즘을 사용하기 위해이다. 흔한 데이터

  처리에 필요한 많은 추가적인 함수들이 이용가능하다.

  ex)sorting, joining, grouping, counting, etc.

pair RDD

 

 -Pair RDD를 만들기.

 우선 map, flatMap/flatMapValues, keyBy와 같은 함수가 필요하다.

 

 

1.Pair RDD에 이용가능한 특별한 연산은 어떤게 있는지.

  1.간단한 Pair RDD만들기.

 users= sc.textFile(file).map(lambda line: line.split(‘\t’).map(lambda fields: (fields[0], fields[1]))

  ->형태를 만들어준다.

 

 

 -userID에 따라 웹로그기록을 keying하기.

 sc.textFile(logfile).keyBy(lambda line: line.split(‘ ’)[2])

 -> KeyBy를 활용하여 밑에서 해본다.

 

  2.하나의 레코드를 여러 Pair로 매핑하기.

  sc.textFile(file).map(lambda line: line.split(‘\t’)) \

  .map(lambda fields: (fields[0], fields[1])) \ #Pair RDD를 만들기 위한 튜플 형식으로 바꿔줌.

  .flatMapValues(lambda skus: skus.split(“:”)) # ":"기준으로 하나의 ID에 가진 values 만큼 레코드가 생성됨.

 

결과

 

2.맵리듀스 알고리즘이 스파크에서 어떻게 작동하는지.

 -맵리듀스는 거대한 데이터 셋을 쉽게 분산처리 해버릴 수 있습니다.

 -하둡 맵리듀스의 주요 작동법은 다소 제한적인데, 우선 각 작업에는 하나의 맵함수와 하나의 

 리듀스 함수가 있어야한다. 작업의 아웃풋은 파일로 저장이 된다. 스파크에서 하는 맵리듀스

 는 훨씬 적응성이 좋다. 맵리듀스 함수들은 다른 보통이 아닌 방법들과 혼합할 수 있다. 작업 

 결과는 메모리에 저장되고, 연산은 쉽게 연결될 수 있다.

 

 -PairRDD의 스파크에서의 맵리듀스 함수

 -Map Phase

  1.한번에 하나의 레코드를 처리한다.

  2.맵은 하나의 레코더 또는 다수 개의 레코더를 의미한디.

  3.map, flatMap, filter, keyBy 가 있다.

 

 -Reduce Phase

  1.맵에서 작업한 결과이다.

  2.다양한 레코드들 강화한다.

  3.예를 들면 reduceByKey, sortByKey, mean

 

 -단어의 개수를 세는 예시

 wordcount = sc.textFile(mydata).flatMap(lambda line: line.split())\

.map(lambda word: (word, ,1)) # RDD형태로 만들어 줌.

.reduceByKey(lambda v1,v2: v1+v2)) #연산을 순서가 다를 수 있기에 교환/결합 법칙이 성립해야함.

reduceByKey

 

 -PairRDD의 연산자들

  -countByKey: 각 키에서 출현 횟수에 따라 맵을 리턴한다.

  -groupByKey: RDD에서 각 키값에 대한 모든 값들을 그룹화 시켜준다.

  -sortByKey: 오름/내림 차순 정렬.

  -join: 두 RDD로 부터 매칭 되는 키를 기준으로 모든 pair를 포함하는 새로운 RDD로 리턴.

  movies = moviegross.join(movieyear) #키-값 pair RDD로 분리->키에 따라 조인->포맷에 따라 데이터 조인 ->저장, 리턴, 작업 . . .  . .

 -join하는 과정을 시원하게 예시로 보여줍니다.

 1.두 파일을 각각 pair RDD형태로 바꿔줍니다.

  import re

  def getRequestDoc(s):

     return re.search(r’KBDOC-[0-9]*’,s).group()

 

  kbreqs = sc.textFile(logfile)

  .filter(lambda line: ‘KBDOC-’ in line) \

  .map(lambda line: (getRequestDoc(line) , line.split(‘ ’)[2])).distinct()

  kblist = sc.textFile(kblistfile).map(lambda line: line.split(‘:’)) \

  .map(lambda fields: (fields[0], fields[1]))

 

 2.키를 기준으로 join하기

  titlereqs = kbreqs.join(kblist)

 

 3.원하는 포맷(아이디, 제목) 으로 결과 맵함수 적용하기.

  titlereqs = kbreqs.join(kblist) \

  .map(lambda (docid,(userid,title)): (userid, title))

 4.계속되는 작업 - 아이디에 따라 제목 그룹핑하기.

    titlereqs = kbreqs.join(kblist) \

  .map(lambda (docid,(userid,title)): (userid, title))

  .groupByKey()

 




 -다른 Pair 연산자들

 -keys: 값없이 키값만 리턴

 -values: 키값없이 값만 리턴.

 -lookup(key): key에 대한 값을 리턴.

 -leftOuterJoin, rightOuterJoin, fullOuterJoin: 왼쪽, 오른쪽, 둘다에 해당하는 기준에 맞추어

  조인.

 -mapValues, flatMapValues: 키는 그대로 유지하면서 값에 대해서만 함수를 실행시킴.

 

 

-주요 포인트

 1.pair RDD는 키-값(튜플형태)으로 구성된 RDD의 특별한 형태.

 2.스파크는 pair RDD로 작업하는 몇가지 연산을 제공함.

 3.맵리듀스는 분산처리를 위한 일반적인 프로그래밍 모델임.

    -스파크는 pair RDD를 맵리듀스로 수행함.

    -하둡 맵리듀스와 다른 작업방법은 작업마다 하나의 맵과 하나의 리듀스를 하는데에 제한적이다. spark good.

    -스파크는 맵과 리듀스 연산의 유연한 체이닝(chaining)을 제공한다.

    -스파크는 joining, sorting, grouping과 같은 흔한 맵리듀스 알고리즘을 쉽게 수행하게끔 연산자를 제공한다.

728x90
반응형

'빅데이터 시스템' 카테고리의 다른 글

스파크 병렬 처리  (0) 2019.12.15
스파크 애플리케이션 작성 및 전개  (0) 2019.12.15
RDD와 함께 스파크 활용하기.  (0) 2019.12.15
스파크 기초  (0) 2019.12.15
데이터 파일 파티셔닝  (0) 2019.12.15

댓글