### 스트림 딜레이
인터넷이 없는 구간 즉 버퍼가 생길때의 차이가
시간차가 나며 데이터의 이동이 멈춘 상태
@@@Stream data
스트림의 데이타는 즉 보낸 시간
@@@Real-time processing
받은 시간 ??? 받는 시간이 보넨시간과 같은가??
### 처리 방법
1. element 마다 변환 방법
2. Aggregation
1)그룹바이로 만든다
2)윈도윙 (windowing) 어떤 시간에 따른 구릅으로 만듬
1] Fixed window
타임으로 나눔
2] Sliding window
조금식 겹치게 나눔
3] Sessions window
로그인 해놓은거 시작 그리고 끝을 따로만들어
중간에 계속 할 수 있도록 만든거 순서처리방식이 아님
### watermark
딜레이로 모든 데이터가 다왓나 확인하는것
간단히 요즘 AI 마이크 챗봇 같은느낌 말하고 기다리면 그걸로 끝인지 인지하는 듯한 느낌
### Stream Query Execution model
1.Continuous Query
전에 온 데이터를 가지고 있음 그리고 다음 데이터도 계속 받으며 처리함
2.Micro-batch
제귀식으로 전에 데이터를 처리완료하고 다음 데이터를 받아서 찾아감
###Spark Structured Steaming
데이터를 스텍처럼 쌓는다
그리고 하나씩 처리한다
즉 데이터를 버리지 않고 계속 쌓아서 처리한다
적립식임 ㅇㅇ
###WordCount Program
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("WordCountWithKafka")\
.getOrCreate()
# Create DataFrame representing the stream of input lines
# from connection to a kafka server
lines = spark \
.readStream \
.format("kafka")\
.option("kafka.bootstrap.servers", ip_addr:port) \
.option("subscribe", "wc") \
.load()
## Word Count Program ##
#Split the lines into words
words = lines.select(
explode(split(lines.value." ")).alias("word")
)
#Generate running word count
wordCounts = words.groupBy("word").count()
#Start running the query that prints the running counts
# to the console
query = wordCounts \
.writeStream \
.queryName("wordcount_kafka") \
.outputMode("complete")\
.format("memory")\
.start()
query.awaitTermination()
## Widow Operation ##
words = ...#streaming DataFrame of schema
#{ts: Timestamp, word: String}
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.ts, "3 minutes","1 minute"),
words.word
).count()
## Spark structured Streaming ##
words = ...#streaming DataFrame of schema
#{ts: Timestamp, word: String}
#Add watermark
windowedCounts = words
.withWatermark("ts", "5 minutes")
.groupBy(window(word.ts,"3 minutes","1 minute"), words.word)
.count()
## Stream-Static Join##
# a static DataFrame about device info
deviceInfoDF = spark.read, ...
# a stream DataFrame about device signal
deviceSignalStreamDF = spark.readStream, ...
#inner equijoin with a static DataFrame
deviceSignalStreamDF.join(deviceInfoDF, "deviceType")
## Stream-Stream Join ##
from pyspark.sql.functions import expr
# Joining two streams of data from dfferent sorces: posts and likes.
posts = spark.readStream, ...
likes = spark.readStream, ...
# Apply watermarks on event-time columns
postsWithWatermark = posts.withWatermark("postTime", "2 hours")
likesWithWatermark = likes.withWatermark("likeTime", "3 hours")
# Join with event-time constraints
postsWithWatermark.join(
likesWithWatermark,
expr("""likePostId = postId AND
likeTime >= postTime AND
likeTime <= postTime + 1 hour
""")
)
헥헥 실험해 보고 싶다
'K-MOOC > 빅데이터와 머신러닝 소프트웨어' 카테고리의 다른 글
6주차 텐서플로우 (0) | 2020.04.11 |
---|---|
5주차 머신러닝 시스템 개요 (0) | 2020.04.02 |
3주차 배치 분석 (0) | 2020.03.24 |
2주차 데이터 처리 개요 (0) | 2020.03.22 |
1주차 빅데이터 소프트웨어 (0) | 2020.03.20 |