### 스트림 딜레이

인터넷이 없는 구간 즉 버퍼가 생길때의 차이가

시간차가 나며 데이터의 이동이 멈춘 상태

 

@@@Stream data

스트림의 데이타는 즉 보낸 시간

 

@@@Real-time processing

받은 시간 ??? 받는 시간이 보넨시간과 같은가??

 

### 처리 방법

1. element 마다 변환 방법

 

2. Aggregation

  1)그룹바이로 만든다

  2)윈도윙 (windowing) 어떤 시간에 따른 구릅으로 만듬

    1] Fixed window

        타임으로 나눔

     2] Sliding window

        조금식 겹치게 나눔

     3] Sessions window

        로그인 해놓은거 시작 그리고 끝을 따로만들어 

        중간에 계속 할 수 있도록 만든거 순서처리방식이 아님

 

### watermark

딜레이로 모든 데이터가 다왓나 확인하는것

 

간단히 요즘 AI 마이크 챗봇 같은느낌 말하고 기다리면 그걸로 끝인지 인지하는 듯한 느낌

 

### Stream Query Execution model 

1.Continuous Query

   전에 온 데이터를 가지고 있음 그리고 다음 데이터도 계속 받으며 처리함

2.Micro-batch

   제귀식으로 전에 데이터를 처리완료하고 다음 데이터를 받아서 찾아감

 

 

###Spark Structured Steaming

데이터를 스텍처럼 쌓는다

그리고 하나씩 처리한다

즉 데이터를 버리지 않고 계속 쌓아서 처리한다

적립식임 ㅇㅇ

 

###WordCount Program

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
     .builder \
     .appName("WordCountWithKafka")\
     .getOrCreate()
# Create DataFrame representing the stream of input lines
# from connection to a kafka server
lines = spark \
    .readStream \
    .format("kafka")\
    .option("kafka.bootstrap.servers", ip_addr:port) \ 
    .option("subscribe", "wc") \
    .load()
## Word Count Program ##
#Split the lines into words
words = lines.select(
    explode(split(lines.value." ")).alias("word")
)
#Generate running word count
wordCounts = words.groupBy("word").count()
#Start running the query that prints the running counts
# to the console
query = wordCounts \
    .writeStream \
    .queryName("wordcount_kafka") \
    .outputMode("complete")\
    .format("memory")\
    .start()
query.awaitTermination()

## Widow Operation ## 
words = ...#streaming DataFrame of schema
        #{ts: Timestamp, word: String}
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.ts, "3 minutes","1 minute"),
    words.word
).count()

## Spark structured Streaming ##
words = ...#streaming DataFrame of schema
        #{ts: Timestamp, word: String}
#Add watermark
windowedCounts = words
    .withWatermark("ts", "5 minutes")
    .groupBy(window(word.ts,"3 minutes","1 minute"), words.word)
    .count()

## Stream-Static Join##
# a static DataFrame about device info
deviceInfoDF = spark.read, ...
# a stream DataFrame about device signal
deviceSignalStreamDF = spark.readStream, ...

#inner equijoin with a static DataFrame
deviceSignalStreamDF.join(deviceInfoDF, "deviceType")

## Stream-Stream Join ## 
from pyspark.sql.functions import expr
# Joining two streams of data from dfferent sorces: posts and likes.
posts = spark.readStream, ...
likes = spark.readStream, ...

# Apply watermarks on event-time columns
postsWithWatermark = posts.withWatermark("postTime", "2 hours")
likesWithWatermark = likes.withWatermark("likeTime", "3 hours")

# Join with event-time constraints
postsWithWatermark.join(
    likesWithWatermark, 
    expr("""likePostId = postId AND 
         likeTime >= postTime AND
         likeTime <= postTime + 1 hour
         """)
)

    

 

헥헥 실험해 보고 싶다 

블로그 이미지

Or71nH

,