์ฌ๋ด์์ ๊ธฐ์กด ๋ฉ์ธ์งํ ๊ธฐ์ ์คํ์ผ๋ก Kafka ๋ฅผ ์ฌ์ฉํ๊ณ ์์ต๋๋ค.
Confluent์ฌ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ Kafka Consumer ์ Publisher ๋ฅผ ๊ตฌ์ฑํ๊ณ , ๊ฐ ๋๋ฉ์ธ์ ๋ง๋ ํ ํฝ์ ์์ฑํด ๋ฐ์ดํฐ ํ๋ฆ์ ๋ถ๋ฆฌํด์์ต๋๋ค. ๋ฉ์์ง๋ ํ ํฝ ๋จ์ ์คํธ๋ฆผ์ผ๋ก ๋ฐํ๋๊ณ , ์ปจ์๋จธ ๊ทธ๋ฃน์ด ์ด๋ฅผ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ๋ ๊ตฌ์กฐ์ ๋๋ค.
์ด ๋ฐฉ์์ ๋๋ ์ด๋ฒคํธ ์ฒ๋ฆฌ๋ ๋ก๊ทธ ํ์ดํ๋ผ์ธ์ฒ๋ผ “๊ณ์ ํ๋ฌ๊ฐ๋ ๋ฐ์ดํฐ”๋ฅผ ์๋นํ๋ ์๋๋ฆฌ์ค์์๋ ๋งค์ฐ ๊ฐ๋ ฅํฉ๋๋ค. ์ค์ ๋ก ์ฌ๋ด ์ด๊ธฐ ์ํคํ ์ฒ ๋จ๊ณ์์ Kafka๋ ์ ์์๊ฑฐ๋ ์ฃผ๋ฌธ ์์ง ๋ก๊ทธ, ERP ์ด๋ฒคํธ ๋ธ๋ก๋์บ์คํ ๊ฐ์ ๋ฐ์ดํฐ ์ค์ฌ ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์ญ์์ ์ถฉ๋ถํ ์ญํ ์ ์ํํด์์ต๋๋ค.
ํ์ง๋ง, ์๊ฐ์ด ํ๋ฅด๋ฉฐ ๋ฉ์์ง ํ์ ์ฉ๋๊ฐ ์ด๋ฒคํธ ์คํธ๋ฆผ์ ๋์ด “์
๋ฌด ์คํ(Task Execution)” ์์ญ์ผ๋ก ํ์ฅ๋๊ธฐ ์์ํ์ต๋๋ค. ์ธ๋ถ ์ผํ๋ชฐ ์ฃผ๋ฌธ์ API๋ก ์์งํ๊ณ , ์ด๋ฅผ ๋ด๋ถ ERP ๋๋ฉ์ธ์ ๋ฐ์ํ๋ฉฐ, ์คํจ ์ ์ฌ์๋ํ๊ฑฐ๋ ๋ถ๊ธฐ ์ฒ๋ฆฌํ๋ ํ๋ก์ธ์ค๋ ๋จ์ ์ด๋ฒคํธ ์๋น๊ฐ ์๋๋ผ ์์
(work) ์์ฒด์
๋๋ค. ํ๋์ ๋ฉ์์ง๊ฐ “์ด๋ฒคํธ”๊ฐ ์๋๋ผ ์คํํด์ผ ํ ํ์(Action) ๋ฅผ ์๋ฏธํ๊ธฐ ์์ํ ๊ฒ์
๋๋ค.
์ด ์ง์ ์์ Kafka๋ ๊ตฌ์กฐ์ ์ผ๋ก ํ๊ณ๋ฅผ ๋๋ฌ๋
๋๋ค. Kafka๋ ์ด๋ฒคํธ ๋ก๊ทธ์ด๋ฉฐ, ๋ฉ์์ง๋ append-only ์คํธ๋ฆผ์ผ๋ก ์ ์ฅ๋ฉ๋๋ค. Consumer๋ ๋ฉ์์ง๋ฅผ ์ฝ์๋๊ฐ(Offset)๋ง ํ๋จํ ๋ฟ, ์ฒ๋ฆฌ๊ฐ ์ฑ๊ณตํ๋์ง, ์คํจํ๋์ง, ์ฌ์๋ ๊ฐ๋ฅํ์ง, ์์
์ ์ชผ๊ฐ์ผ ํ๋์ง๋ฅผ ์์คํ
์์ฒด๋ ์์ง ๋ชปํฉ๋๋ค. ๊ฒฐ๊ตญ ์ด ๋ชจ๋ ์ฑ
์์ ์ดํ๋ฆฌ์ผ์ด์
์ฝ๋์ ๊ท์๋๊ณ , ๋น์ฆ๋์ค ๋ก์ง์ด ๊ฐํ๊ฒ ์ฎ์ด๊ฒ ๋ฉ๋๋ค. Worker๋ Kafka API์ ์์กดํ ์ํ ๋จธ์ ์ฒ๋ผ ๋ณํด๋ฒ๋ฆฌ๊ณ , ์๋ฌ ๋ฐ์ ์ ๋จํธ์ ๋์ ์ธ์๋ ๋๋ฉ์ธ ๊ธฐ๋ฐ์ ๋ณต๊ตฌ๊ฐ ์ด๋ ค์์ง๋๋ค.
ํนํ Action Timeout๊ณผ ์ฅ๊ธฐ ์คํ ์์
์ด ๊ฒฐํฉ๋๋ ์๊ฐ ๋ฌธ์ ๊ฐ ์ฌ๊ฐํ๊ฒ ๋๋ฌ๋ฉ๋๋ค. ์๋ฅผ ๋ค์ด ์คํฌ๋ํ API ์๋ต์ด 45์ด ์ด์ ์ง์ฐ๋๋ฉด ์๋ฒ๋ ํ์์์์ ๋ฐํํ๊ณ , ์ฌ์ฉ์๋ ์คํจ๋ฅผ ๊ฒฝํํฉ๋๋ค. ์ด ์คํจ๋ ๊ธฐ์ ์ ๋ฌธ์ ์ผ ๋ฟ ๋น์ฆ๋์ค ์คํจ๋ ์๋์ง๋ง, Kafka ๊ธฐ๋ฐ์์๋ ์ด ์คํจ๋ฅผ ์ ์ ํ “ํ๋ณต ๊ฐ๋ฅํ ์ํ”๋ก ์ฎ๊ธฐ๊ธฐ ์ด๋ ต์ต๋๋ค. ๋ฉ์์ง๋ ์ด๋ฏธ ์๋น๋ ๊ฒ์ผ๋ก ๊ฐ์ฃผ๋๊ณ , ์ฌ์๋๋ ๋ณ๋ ํ ํฝ ๋๋ DLQ๋ฅผ ๊ฑฐ์ณ์ผ ํ๋ฉฐ, ์ฌ์๋ ๊ณผ์ ์ ์ฒด๊ฐ ์ ํ๋ฆฌ์ผ์ด์
๋ ๋ฒจ์์ ์ปค์คํ
๊ตฌํ๋ฉ๋๋ค. ์ค๊ณ ๋์ด๋๋ ๊ธฐํ๊ธ์์ ์ผ๋ก ์ฆ๊ฐํ๊ณ , ์ฝ๋์ ์ธํ๋ผ๊ฐ ๋จ๋จํ ๊ฒฐํฉ๋ฉ๋๋ค.
์ด๋ฐ ๋น์ฆ๋์ค ์๊ตฌ์ฌํญ ๋ฌธ์ ์ ๋ฟ ์๋๋ผ ์ธํ๋ผ ์ด์ ๊ด์ ์์๋ ์กฐ๊ธ ํ๊ณ๊ฐ ์์์ต๋๋ค. ์ ํฌ ์กฐ์ง์ PostgreSQL์ ๋น๋กฏํ RDB ์ค์ฌ์ผ๋ก ๊ตฌ์ถ๋ ERP SaaS ํ์ฌ์
๋๋ค.
์ฆ, DB ์กฐ์ง์ด ๋งค์ฐ ๊ฐ๋ ฅํฉ๋๋ค. DB ์ฑ๋ฅ ์ต์ ํ, ํํฐ์
๋, ํธ๋์ญ์
ํ๋, ์ธ๋ฑ์ฑ ์ ๋ต ๋ฑ์์ ์๋ง์ ๊ฒฝํ๊ณผ ๋ด์ฌ๋ ์ง์์ด ์ถ์ ๋์ด ์๊ณ , Postgres๋ฅผ ๋ค๋ฃจ๋ ์ ๋ฌธ๊ฐ๋ ๋ง์ต๋๋ค.
ํ์ง๋ง Kafka ์ธํ๋ผ๋ฅผ ์์ ์ ์ผ๋ก ๊ด๋ฆฌํ๊ณ , ํ ํฝ ์ฌ๊ตฌ์ฑ, consumer group migration ๋ฑ์ ์ํฐํ๋ผ์ด์ฆ ์ค์ผ์ผ์์ ๋ค๋ฃฐ ์ ์๋ ์์ง๋์ด๋ ์์์์ต๋๋ค.
ํ์ฌ๋ ์ด๋ฌํ ๊ธฐ์ ์ ·๋๋ฉ์ธ์ ·์กฐ์ง์ ๋ฌธ์ ๋ฅผ ์ข ํฉ์ ์ผ๋ก ์ธ์ํ๊ณ , ๋ฉ์์ง ์์คํ ์ Kafka ์ค์ฌ์ ์คํธ๋ฆผ ์ฒ๋ฆฌ ๋ชจ๋ธ์์ ์ ๋ฌด(Task) ์ค์ฌ์ ํธ๋์ญ์ ์ฒ๋ฆฌ ๋ชจ๋ธ๋ก ์ ํํ๊ธฐ๋ก ๊ฒฐ์ ํ์ต๋๋ค. ๊ทธ ๊ฒฐ๊ณผ ์ ํ๋ ๊ฒ์ด ๋ฐ๋ก PGMQ(PostgreSQL Message Queue) ์ ๋๋ค.
PGMQ๋ ์ฌ๋ด DB ์ ๋ฌธ๊ฐ๋ค์ด ๋ณด์ ํ ์ญ๋์ ๊ทธ๋๋ก ํ์ฉํ๋ฉด์, ๋ฉ์์ง ์์ฒด๋ฅผ PostgreSQL์ Row์ ํธ๋์ญ์
์ผ๋ก ๋ค๋ฃจ๋ ๋ฐฉ์์ ์ฑํํ์ต๋๋ค.
PostgreSQL Message Queue๋ฅผ ์ ์์ค์์ ์ง์ ๊ตฌํํจ์ผ๋ก์จ, ํ์ฌ์ ๋น์ฆ๋์ค ๋๋ฉ์ธ์ ๋ง๊ฒ ๋ฉ์์ง ์ฒ๋ฆฌ ๊ตฌ์กฐ๋ฅผ ์ปค์คํ ํ๊ณ ํ์ํ ์ต์ ์ ์ ์ฐํ๊ฒ ํ์ฅํ ์ ์๋๋ก ์ค๊ณํ์ต๋๋ค.
์ด๋ฒ ๋ฉ์์ง ํ ๊ธฐ์ ์คํ ์ ํ ํ๋ก์ ํธ์์ ์ ๊ฐ ๋ด๋นํ๋ ํต์ฌ ์์
์ ํฌ๊ฒ ์ธ ๊ฐ์ง์์ต๋๋ค.
์ฒซ์งธ, FOR UPDATE SKIP LOCKED์ pg_notify() ๋ฑ SQL ์ ๊ธ ์ (Locking Clause)๊ณผ ๋ด์ฅ ํจ์๋ฅผ ์ฌ๋ด ORM ํ๋ ์์ํฌ์์ ์ง์ ์ฌ์ฉํ ์ ์๋๋ก ์ง์ํ์ต๋๋ค.
๋์งธ, ๊ธฐ์กด Kafka ์ค์ฌ์ ๋ฉ์์ง ์ธํ๋ผ๋ฅผ PostgreSQL Message Queue(PGMQ)๋ก ์ ํ๋จ์ ๋ฐ๋ผ, ๋ฉ์์ง๋ฅผ “๋ก๊ทธ ์คํธ๋ฆผ”์ด ์๋ ํธ๋์ญ์
๋ฐ์ดํฐ๋ก ๋ค๋ฃจ๋๋ก ๋ณ๊ฒฝํ์์ต๋๋ค.
์
์งธ, ๋ฉ์์ง ์ฒ๋ฆฌ ๊ณผ์ ์์ ์ฒด์ด๋(Chaining) ์ต์
์ ํ์ฉํด Task chaining ์๋ํํ๊ณ , ์ด๋ฅผ ํตํด ๋ฉ์์ง ํ์ด๋ก๋ ํฌ๊ธฐ๋ฅผ ์ค์ด๋ ๋์์ ๋๊ท๋ชจ ์์
์ ์์ Task ๋จ์๋ก ๋ถ๋ฆฌํด ์ ์ฒด ์ฒ๋ฆฌ ์ฑ๊ณต๋ฅ ์ ํฌ๊ฒ ๊ฐ์ (67%)ํ์ต๋๋ค.
์ด๋ฒ ๊ธ์์๋ ์ด๋ฌํ ์ ํ ๊ณผ์ ์์ ํ์ํ๋ ํต์ฌ SQL ํจํด, PGMQ ๊ตฌ์กฐ, ๊ทธ๋ฆฌ๊ณ ์ค์ ์๋น์ค ํ๊ฒฝ์์์ ์ฑ๋ฅ ํฅ์ ์ฌ๋ก๋ฅผ ์ค์ฌ์ผ๋ก ๊ณต์ ํ๊ณ ์ ํฉ๋๋ค.
๐ช์นดํ์นด์ PGMQ ๋?
Apache Kafka
Apache Kafka๋ ๋ณธ์ง์ ์ผ๋ก ๋ถ์ฐ ๋ก๊ทธ(distributed commit log) ์์คํ ์ ๋๋ค. ๋ฉ์์ง ๋ฐ์ดํฐ๋ append-only ๊ตฌ์กฐ๋ก ํํฐ์ ์ ์์ฐจ์ ์ผ๋ก ๊ธฐ๋ก๋๊ณ , ์ปจ์๋จธ๋ ์ค์ค๋ก์ ์คํ์ ์ ๊ธฐ๋ฐ์ผ๋ก ์ฝ๊ธฐ๋ฅผ ์งํํฉ๋๋ค. ์ด ๊ตฌ์กฐ๋ ๋์ ์ฐ๊ธฐ ์ฒ๋ฆฌ๋์ ๋ณด์ฅํ๋ฉฐ, ํํฐ์ ์ ํตํ ์ํ์ ํ์ฅ์ฑ์ด ์ฐ์ํฉ๋๋ค.
์ด๋ฒคํธ ์คํธ๋ฆผ ๋ถ์·๋๊ท๋ชจ ๋ก๊ทธ ์ฒ๋ฆฌ·CDC ์ฒ๋ผ ๋จ์ผ ๋ฉ์์ง ์ฒ๋ฆฌ๋๋ณด๋ค “์ง์์ ์ธ ๋ฐ์ดํฐ ํ๋ฆ”์ ์ฒ๋ฆฌ์ ์ด์ ์ ๋ ๋๋ฉ์ธ์์ Kafka๋ ์๋์ ์ผ๋ก ๊ฐ๋ ฅํฉ๋๋ค. ๊ทธ๋์ ๋ง์ ๊ธฐ์ ๋ค์ ๋ฐ์ดํฐ ํ๋ซํผ์์ Kafka๊ฐ ๊ธฐ๋ณธ ์ธํ๋ผ์ฒ๋ผ ์ฌ์ฉ๋๋ ์ด์ ๋ ์ฌ๊ธฐ์ ์์ต๋๋ค.
์นดํ์นด์ ๋ฐ์ดํฐ ํ๋ฆ์ ๋ ผ๋ฆฌ์ ์ธ ํ ํํ๋ก ํํํ๋ฉด ์๋ ์ด๋ฏธ์ง์ ๊ฐ์ต๋๋ค.

ํ์ง๋ง ๋ ผ๋ฆฌ์ ์ธ ํํ์ผ ๋ฟ ์ค์ ๋ก๋ ์ปจ์๋จธ๊ฐ ๋ฉ์ธ์ง๋ฅผ ๊ฐ์ ธ๊ฐ ์ฒ๋ฆฌํ๋๊ฒ ์๋, ์คํ์ ์ด ์์ง์ด๋ ๊ตฌ์กฐ์ ๋๋ค.
์๋๋ ์ข๋ ๋ฌผ๋ฆฌ์ ์ผ๋ก ๊ฐ๊น์ด ํํ์ ๋๋ค.
Topic Partition (.../CreateScrapingOriginAction)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MSG-0 โ MSG-1 โ MSG-2 โ MSG-3 โ MSG-4 โ MSG-5 โ ... โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
↑
Append Only (๋์๋ง ์ถ๊ฐ)
- ํ์ฒ๋ผ ๊ฐ์ฅ ์์์ ๊บผ๋ด๊ฐ๋ ๊ตฌ์กฐ๊ฐ ์๋๋๋ค.
- ํ์ผ ๋์ ๊ณ์ ๊ธฐ๋ก๋๋ ๋ก๊ทธ
- ๋ฉ์์ง๋ “์ญ์ ”๊ฐ ์๋๋ผ retention ์ ์ฑ ์ ์ํด ๋ง๋ฃ
Topic Partition (.../CreateScrapingOriginAction)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MSG-0 โ MSG-1 โ MSG-2 โ MSG-3 โ MSG-4 โ MSG-5 โ ... โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โฒ
โ
Consumer Offset
- Consumer๋ ๋ฐ์ดํฐ๋ฅผ “๊ฐ์ ธ๊ฐ๋ ๊ฒ”์ด ์๋๋ผ “์ฝ๊ณ offset์ ์ด๋”
- Consumer๋ MSG-1๊น์ง ์ฝ์์ → offset=1 ์ด๋ผ๋ ์๋ฏธ (Consumer๋ ํ์ผ์ ๋ฐ๋ผ ์ฝ๋ ์ปค์(offset))
- ๋ฉ์์ง๋ ์ฝ์๋ค๊ณ ์ญ์ ํ๋ ๊ฒ์ด ์๋๋ผ ๋จ์ ์์ต๋๋ค. (์ญ์ X)
- ๋์ผํ Consumer Group ๋ด๋ถ์์๋ offset์ ๊ณต์
Topic Partition (.../CreateScrapingOriginAction)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ MSG-0 โ MSG-1 โ MSG-2 โ MSG-3 โ MSG-4 โ MSG-5 โ MSG-6 โ MSG-7 โ ... โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โฒ โฒ โฒ
โ โ โ
Group A Group B Group C
Offset=1 Offset=3 Offset=5
- Consumer Group ๊ด์ ์์ “๊ฐ ๊ทธ๋ฃน์ด ๋ ๋ฆฝ๋ ์คํ์ (Offset)์ ๊ฐ์ง๋๋ค”
- ๋ฉ์์ง๋ ๋จ ํ ๋ฒ๋ง ์กด์ฌ
- ๊ฐ Consumer Group์ ๋ ๋ฆฝ์ ์ธ Offset ํฌ์ธํฐ๋ฅผ ๊ฐ์ง๋๋ค.
- ๋ค๋ฅธ ์์คํ ์ด ๋์ผ ๋ฐ์ดํฐ๋ฅผ ๋ ๋ฆฝ์ ์ผ๋ก ์ฌ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
PGMQ(PostgreSQL Message Queue)
PGMQ๋ ๋ง ๊ทธ๋๋ก PostgreSQL ์์ ์ค๊ณ๋ ๋ฉ์์ง ํ์
๋๋ค.
Kafka๊ฐ ๋ฉ์์ง๋ฅผ log stream์ผ๋ก ์ทจ๊ธํ๋ค๋ฉด, PGMQ๋ ๋ฉ์์ง๋ฅผ row ๋ฐ์ดํฐ๋ก ์ทจ๊ธํฉ๋๋ค.
์ด ์ฐจ์ด๋ ๋จ์ํ ๊ตฌํ ๋ฐฉ์์ ์ฐจ์์ด ์๋๋ผ, ๋๋ฉ์ธ ๊ตฌ์กฐ์์์ ๋ฉ์์ง ์๋ฏธ ์์ฒด๋ฅผ ๋ฐ๊ฟ๋๋ค.
PGMQ์ ํต์ฌ์ ๋ฉ์์ง๋ ๋ฐ์ดํฐ์ด๊ณ , ๋ฐ์ดํฐ๋ ํธ๋์ญ์
์ผ๋ก ๊ด๋ฆฌ๋ผ์ผ ํ๋ค ์
๋๋ค.
Kafka์์๋ ๋ฉ์์ง ์ฒ๋ฆฌ ์ํ๋ฅผ consumer๊ฐ ์ถ์ ํ์ง๋ง, PGMQ์์๋ ๋ฉ์์ง๊ฐ ํ
์ด๋ธ row๋ก ์กด์ฌํ๋ฉฐ ์ํ ์ ์ด๊ฐ ์์ ํ DB ๋ด๋ถ์์ ๊ด๋ฆฌ๋ฉ๋๋ค.
- insert → ๋ฉ์์ง ์์ฑ
- fetch → ๋๊ธฐ์ค ๋ฉ์์ง ์ ๊ธ(lease)
- ack → ์ฒ๋ฆฌ ์๋ฃ
- retry / dead → ์คํจ or lease timeout
DB๊ฐ ACID๋ฅผ ๋ณด์ฅํ๋ฏ๋ก, ์ ์ฅ๊ณผ ์ฒ๋ฆฌ ์ํ๊ฐ ๋์ผ ์คํ ๋ฆฌ์ง ๋ด์ ์ผ๊ด์ ์ผ๋ก ๋จ๊ฒ ๋ฉ๋๋ค.
์ปจ์๋จธ๊ฐ ์ฃฝ์ด๋, ์๋ฒ๊ฐ ๋กค๋ฐฑ๋์ด๋, worker๊ฐ restart๋์ด๋ ๋ฉ์์ง ์ํ๋ ๊ทธ๋๋ก ์
๋๋ค.
PGMQ๋ ๋
ผ๋ฆฌ์ ์ด๋ฏธ์ง๋ก๋ ํ ํ์์ผ๋ก ํํํ ์ ์์ต๋๋ค.

ํ์ง๋ง ๋ ์ ์์ค์ผ๋ก ํํํ์๋ฉด, ์ ๋ ๊ฒ ํ ํฝ๋ณ๋ก ๋๋์ด์ ธ ์๋ ํ๊ฐ ์๋, ํ ๊ณณ์ ๋ฐ์ดํฐ๊ฐ ๋ค ๊ฐ์ด ์๋ ํ ์ด๋ธ ํ์์ ๋๋ค.

PGMQ๋ ํ ํฝ(Queue)๋ณ๋ก ์ ์ฅ๋ ๋ฉ์์ง row๋ฅผ ์ง์ SELECT ํ์ฌ ๊ฐ์ ธ์ค๋ ๋ฐฉ์์ผ๋ก ๋์ํฉ๋๋ค.
์ด๋ ๋ฉ์์ง๋ ๋จ์ ์กฐํ๊ฐ ์๋๋ผ ์ฒ๋ฆฌ ์ค์ธ row๋ฅผ ๊ฑด๋๋ฐ๋ฉฐ ์ ๊ธ(lock)์ ํ๋ํ์ฌ ์ด์ค ์ฒ๋ฆฌ ๋์ง ์๊ฒ ํด์ผํ๋ ๋ฐ ์ด๋ ํ์ํ ์ ์ด FOR UPDATE SKIP LOCKED ์
๋๋ค.
์ด ์ ์ ํตํด ๋์ผ ํ๋ฅผ ์ฌ๋ฌ Worker๊ฐ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ๋๋ผ๋ ํ๋์ ๋ฉ์์ง๊ฐ ๋ ๋ฒ ์งํ๋์ง ์๋๋ก DB ๋ ๋ฒจ์์ ๋์์ฑ ์ ์ด๋ฅผ ๋ณด์ฅํฉ๋๋ค.
FOR UPDATE SKIP LOCKED
PostgreSQL์ FOR UPDATE SKIP LOCKED๋ ๋์์ฑ ์ฒ๋ฆฌ ํ๊ฒฝ์์ ํน์ ๋ ์ฝ๋(row)๋ฅผ ์ ๊ทธ๋, ์ด๋ฏธ ๋ค๋ฅธ ํธ๋์ญ์ ์ด ์ ๊ทผ ๋ ์ฝ๋๋ฅผ ๋ฌด์ํ๊ณ “์ฒ๋ฆฌ ๊ฐ๋ฅํ ๋ ์ฝ๋๋ง” ๊ฐ์ ธ์ค๋ ๋ฐฉ์์ ์๋ฏธํฉ๋๋ค.
"์ ๊ฒจ ์์ผ๋ฉด ๊ทธ๋ฅ ๊ฑด๋๋ฐ๊ณ ๋ค๋ฅธ row ๊ฐ์ ธ์ฌ๊ฒ" ๋๋์ผ๋ก Kafka๋ Queue์ฒ๋ผ ๋ฉ์์ง๋ฅผ ๋นผ์ ์ฒ๋ฆฌํ๋ ๋๋์ DB row ์ ๊ธ์ผ๋ก ๊ตฌํํ ๊ฒ์ ๋๋ค.
- SELECT๋ก row๋ฅผ ๊ฐ์ ธ์ค๋ฉด์ ๋์์ ์ ๊ทผ๋ค (FOR UPDATE)
- ๋ค์ ์กฐํ์ ์ด๋ฏธ ์ ๊ธด row๋ ๊ฑด๋๋ด๋ค (SKIP LOCKED)
- ์ด๋ก ์ธํด ๋ค๋ฅธ worker๊ฐ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ค๋ณต ์ฒ๋ฆฌํ์ง ์๋๋ค.
- Deadlock ์์ด ๋ณ๋ ฌ ์ฒ๋ฆฌ๋์ ๊ทน๋ํํ๋ค.
SQL์ ์์๋ก ๋ณด์๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
SELECT *
FROM mq_message
WHERE status = 'wait'
ORDER BY process_available_dtm
FOR UPDATE SKIP LOCKED
LIMIT 1;
- ๊ฐ์ฅ ๋จผ์ ์ฒ๋ฆฌ ๊ฐ๋ฅํ ๋ฉ์์ง๋ฅผ ์ ๊ทธ๋ฉด์ ๊ฐ์ ธ์ด
- ์ด๋ฏธ ๋ค๋ฅธ worker๊ฐ ์ฒ๋ฆฌ ์ค์ธ row๋ ์๋์ผ๋ก ์ ์ธ
pg_notify()
pg_notify()๋ PostgreSQL์ด ์ ๊ณตํ๋ ๋น๋๊ธฐ ๋ด์ฅ ํจ์์
๋๋ค. ํน์ ์ฑ๋(channel)์ ๋ฌธ์์ด(payload)์ ๋ณด๋ด๋ฉด,
ํด๋น ์ฑ๋์ ๊ตฌ๋
ํ๊ณ ์๋ ์ธ์
(listener)์๊ฒ ์ด๋ฒคํธ๋ฅผ pushํฉ๋๋ค.
DB ๋ด๋ถ์์ ์ด๋ฒคํธ๋ฅผ ๋ฐํํ๊ณ , ์ด๋ฅผ LISTEN ์ค์ธ ์ ํ๋ฆฌ์ผ์ด์
์ด ์ค์๊ฐ์ผ๋ก ๋ฐ์ ์ ์๊ฒ ํ๋ ํต์ฌ ํจ์์
๋๋ค.
-- ์ด๋ฒคํธ ๊ตฌ๋
(LISTEN)
LISTEN mq_event;
-- ์ด๋ฒคํธ ๋ฐํ (pg_notify)
SELECT pg_notify('message_id', 'meta_data');
pg_notify๋ ๋ฉ์์ง ์ ์ฅ ์์คํ
์ด ์๋๋ผ ์๋ฆผ ์์คํ
์
๋๋ค.
๊ทธ๋์
- ๋ฉ์์ง ๋ณธ๋ฌธ = PGMQ ํ ์ด๋ธ
- ์๋ฆผ signal = pg_notify()
์ด๋ผ๋ ๊ตฌ์กฐ๋ก ์ค๊ณํ๋ ๊ฒ์ด ์ ์์ ๋๋ค.
์ ์ฒด ํ๋ฆ์ ๋ณด์๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
๋ฉ์ธ์ง Row ์ ์ฅ(INSERT) → pg_notify()๋ก ์์ปค ๊นจ์ฐ๊ธฐ → ์์ปค๊ฐ SELECT ... FOR UPDATE SKIP LOCKED
๐๏ธ Kafka์์ PGMQ ๋ก ์ ํ
PGMQ๋ ๋ด๋ถ ๊ตฌํ์ฒด(Producer/Consumer)๋ ์์ ํ ๋ค๋ฅด์ง๋ง, ๋ฉ์์ง๋ฅผ ์ฌ์ฉํ๋ ๊ฐ๋ฐ ๊ฒฝํ์ Kafka์ ๊ฑฐ์ ๋์ผํ๊ฒ ์ ์งํ์ต๋๋ค. ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ , ("์ฑ๋๋ช
", "ํ ํฝ๋ช
")์ ์ง์ ํ์ฌ ๋ฐํํ๋ฉด ๋ฉ๋๋ค.
Kafka์์๋ ๋์ผํ๊ฒ ์ ์ธํ๊ณ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ธฐ ๋๋ฌธ์, ์ฌ์ฉ์ ์
์ฅ์์๋ ํ์ต ๋น์ฉ ์์ด ์ ํ์ด ๊ฐ๋ฅํ์ต๋๋ค.
๋จ, PGMQ๋ ๋ด๋ถ ๋ฉ์์ง ์ฒ๋ฆฌ ์์ง์ ์ ์์ค๋ถํฐ ์ง์ ๊ตฌํํ๊ธฐ ๋๋ฌธ์, Kafka์์๋ ์ ๊ณต๋์ง ์๊ฑฐ๋ ๊ตฌํ ๋๋๊ฐ ๋งค์ฐ ๋์ ๋ค์ํ ์คํ ์ต์
์ ๋น์ฆ๋์ค ๋ก์ง ๋ ๋ฒจ์์ ์ปค์คํ
ํ ์ ์์์ต๋๋ค.
๋ํ์ ์ผ๋ก Retry ์ปค์คํ , Debounce Limit, Delay ์ปค์คํ , Serial Execution, Chaining, Execute Result ๋ฑ์ด ์์ต๋๋ค.
1๏ธโฃ Retry - ์คํจ ์ ์ฌ์๋ (Worker ๋ ๋ฒจ Retry)
- ์ญํ : Action ์คํ ์ค Exception ๋ฐ์ ์ ์ง์ ํ์๋งํผ ์๋ ์ฌ๋ฐํ.
2๏ธโฃ Debounce Limit - ๊ณผ๋ํ ๋ฐํ ์ ์ด
- ์ญํ : ํน์ workId ๊ธฐ์ค ์ผ์ ๊ธฐ๊ฐ ๋ด ๋ฐํ ํ์๋ฅผ ์ ํ.
3๏ธโฃ Delay - ์ง์ฐ ์คํ / ์์ฝ ์คํ
- ์ญํ : ์์
์ ํน์ ์๊ฐ ์ดํ ๋๋ ๋ช
์์ ์์ ์ ์คํ.
scheduleAfter: N์ด ์ดํ ์คํ
scheduleAt: ํน์ ๋ ์ง/์๊ฐ์ ์คํ
4๏ธโฃ Serial Execution - ๊ทธ๋ฃน ๋จ์ ์์ฐจ ์คํ
- ์ญํ : ์ฌ๋ฌ ๋ฉ์์ง๋ฅผ ํ๋์ ๊ทธ๋ฃน์ผ๋ก ๋ฌถ์ด ์์ฐจ์ ์ผ๋ก ์คํ. “์ ์ฒด ๊ฐ์๋ฅผ ์๊ณ ์๊ณ , ์์๋๋ก ์ฒ๋ฆฌ”
5๏ธโฃ Chaining - ์คํ ๊ฒฐ๊ณผ์ ๋ฐ๋ผ ๋ค์ Task ์ฐ๊ฒฐ
- ์ญํ : ํ์ด์ง๋ค์ด์ ์คํฌ๋ํ / API rate-limit ์ํฉ ๋์ / ์ฅ๊ธฐ ์ํฌํ๋ก์ฐ ๋ถํด
6๏ธโฃ Execute Result - ๋ฉ์์ง ๊ฐ ๊ฒฐ๊ณผ ๊ณต์
- ์ญํ : ์ฐ๊ฒฐ๋ ์์ ๊ทธ๋ฃน ๋ด๋ถ์์ ๊ฒฐ๊ณผ ๊ฐ์ ์ ์ฅ/์กฐํ.
๐ชTask Chaining ๊ธฐ๋ฐ Payload ์ต์ ํ - “๋ฐ์ดํฐ๋ฅผ ์ด๋ฐํ์ง ๋ง๊ณ , ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ฐธ์กฐํ๊ธฐ”
์ฅ์๊ฐ ์คํ๋๋ ๋์ฉ๋ ์์ (์: ์นด๋ ๋ด์ญ 25~200๊ฑด Scraping, ERP ๋ฐ์ดํฐ ๋ฐฐ์น ๋ณํ ๋ฑ)์ ์ก์ ํ์์์ ์์ด ์ฒ๋ฆฌํ๊ธฐ ์ํด ์์ ์ ์ฌ๋ฌ ๋จ๊ณ(Chunk)๋ก ๋๋์ด ์ฒด์ด๋ํ๋ ๊ฒฝ์ฐ๊ฐ ๋น๋ฒํฉ๋๋ค.
์ด๋ ๋ฉ์์ง ๊ธฐ๋ฐ ์์คํ ์์ Task Chaining ์ ์ค๊ณํ ๋ ๊ฐ์ฅ ์ฝ๊ฒ ๋น ์ง๋ ํจ์ ์ ์ด์ ๋ฉ์์ง์ ๋ชจ๋ ๊ฒฐ๊ณผ๋ฅผ ๋ค์ ๋ฉ์์ง payload์ ์ฃ๋ ๋ฐฉ์์ ๋๋ค.
๊ธฐ์กด ์ฌ๋ด Kafka ์์คํ
์ ์ฌ์ฉํ ๋๋ ์ด๋ฐ ์ ๊ทผ ๋ฐฉ๋ฒ์ด ํ์ฐ์ ์ด์์ต๋๋ค.
Kafka๋ ๋ฉ์์ง๋ฅผ append-only log๋ก ์ ์ฅํ ๋ฟ, ์ค๊ฐ ์ํ๋ฅผ ์์ฒด์ ์ผ๋ก ๋ณด๊ดํ๊ฑฐ๋ ๋ค์ task๊ฐ ์ด์ ๋ฉ์์ง์ ์คํ ๊ฒฐ๊ณผ๋ฅผ ์กฐํํ ์ ์๋ ๊ตฌ์กฐ๋ฅผ ์ ๊ณตํ์ง ์๊ธฐ ๋๋ฌธ์
๋๋ค.
๊ฒฐ๊ณผ์ ์ผ๋ก ๊ฐ๋ฐ์๋ ์ฒด์ธ์ด ๊น์ด์ง์๋ก payload๋ฅผ ๊ณ์ ๋์ ์ํค๊ฒ ๋ฉ๋๋ค.
- ์ฒซ ๋ฒ์งธ ๋ฉ์์ง ๊ฒฐ๊ณผ → ๋ค์ ๋ฉ์์ง payload์ embed
- ๋ ๋ฒ์งธ ๋ฉ์์ง ๊ฒฐ๊ณผ + ์ด์ ๊ฒฐ๊ณผ → ๋ค์ ๋ฉ์์ง payload embed
- ์ธ ๋ฒ์งธ ๋ฉ์์ง ๊ฒฐ๊ณผ + ์ด์ 2๊ฐ์ ๊ฒฐ๊ณผ → ๋ค์ ๋ฉ์์ง payload embed
์ด ๋ฐฉ์์ ๊ณฑ์
์ ์ฆ๊ฐ๋ฅผ ๋ง๋ค์ด๋
๋๋ค.
๊ฐ ๋จ๊ณ์์ ๊ฐ์ ธ์จ ๊ฒฐ๊ณผ๋ฅผ ๊ณ์ payload์ ๋์ ์ํค๋ฉด, ์ฒด์ธ์ด ๊ธธ์ด์ง์๋ก ๋ฉ์์ง ํฌ๊ธฐ๊ฐ ๊ธฐํ๊ธ์์ ์ผ๋ก ์ฆ๊ฐํ๊ฒ ๋๊ณ , ์ด๋ ๋คํธ์ํฌ ์ ์ก ๋น์ฉ · JSON deserialize ๋น์ฉ · Worker ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋ · GC ๋น์ฉ ๋ฑ ์์คํ
์ ์ฒด์ ์
์ํฅ์ ๋ฏธ์นฉ๋๋ค.
โ ์์) ๊ธฐ์กด ๋ฐฉ์์ ์ฒด์ด๋ ๋ฉ์ธ์ง ๋ชจ์ต
- ์ฒซ๋ฒ์งธ ๋ฉ์ธ์ง์ payload
{ card_no: 'xxxx-xxxx-xxxx-1234' }
- ๋๋ฒ์งธ ๋ฉ์ธ์ง์ payload
{
card_no: 'xxxx-xxxx-xxxx-1234',
result: [
{ date: '20250101', amount: 20100, store: 'GS25' },
{ date: '20250102', amount: 15000, store: 'CU' },
]
}
- N๋ฒ์งธ ๋ฉ์ธ์ง์ payload
{
card_no: 'xxxx-xxxx-xxxx-1234',
result: [
{ date: '20250101', amount: 20100, store: 'GS25' },
{ date: '20250102', amount: 15000, store: 'CU' },
... (N๊ฑด)
]
}
PGMQ ์ Chaining ์ต์ ์ ํ์ฉํด ์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ์ต๋๋ค.
์ด ์ต์
์ ์ฌ์ฉํ๋ฉด ๋ฉ์์ง๋ ์คํ ํ๋ฆ๋ง ์ ๋ฌํ๊ณ , ๊ฒฐ๊ณผ๋ ํ ๋ด๋ถ ์ํ๋ก ์ ์ฅํ๊ฒ ๋ฉ๋๋ค.
๊ฒฐ๊ตญ ๋ฉ์์ง๋ฅผ “๋ฐ์ดํฐ ๋ฐฐ์ก ์๋จ”์ด ์๋๋ผ “์คํ ํธ๋ฆฌ๊ฑฐ(Action)์ Context๋ฅผ ์ ๋ฌํ๋ ์๋จ”์ผ๋ก ๋ณผ ์ ์๊ฒ ๋ฉ๋๋ค.
Task Chaining์ ๋ค์๊ณผ ๊ฐ์ด ๋์ํฉ๋๋ค
- ์ปจ์๋จธ๊ฐ ์์ ์ ์ํ
- ๊ฒฐ๊ณผ๋ฅผ ๋ฉ์์ง state์ ์ ์ฅ (PGMQ ๋ด๋ถ result ์ ์ฅ)
- ๋ค์ ๋ฉ์์ง๊ฐ ๋ฐํ๋ ๋ payload์๋ ๊ฒฐ๊ณผ๋ฅผ ๋ฃ์ง ์๋๋ค
- ์ปจ์๋จธ์์ ์คํ๋๋ ๋ง์ง๋ง ํ ํฝ์์ “์ ์ฅ๋ ๊ฒฐ๊ณผ”๋ฅผ ์กฐํ
๐์ฑ๊ณผ

๊ตญ๋ฏผ ๊ฐ์ธ ์นด๋ ์์ง์ ์์๋ก ๋ค์์ ๋, Scraping ๊ฒฐ๊ณผ๊ฐ 25๊ฑด๋ง ๋์ด๋ Kafka ๋ฐฉ์์์๋ ๋ค์ ๋จ๊ณ ๋ฉ์์ง payload๊ฐ 11,000 bytes๊น์ง ์ฆ๊ฐํ์ต๋๋ค.
์ด๋ ๊ฒฐ๊ณผ ๋ฐ์ดํฐ ์ ์ฒด(JSON array + ํ๋ ์ด๋ฆ + ์ค์ฒฉ ๊ตฌ์กฐ)๋ฅผ ๊ทธ๋๋ก payload์ ์ฃ๊ธฐ ๋๋ฌธ์
๋๋ค.
card scraping์ด 100๊ฑด, 300๊ฑด์ด ๋๋ฉด ํฌ๊ธฐ๋ ์ ํ์ด ์๋๋ผ ๊ฑฐ์ ๋์ ํญ๋ฐ ์์ค์ผ๋ก ์ปค์ง๋๋ค.

๋ฐ๋ฉด PGMQ Task Chaining ๋ฐฉ์์ ์ ์ฉํ๋ฉด, ๋์ผ ์กฐ๊ฑด(๊ฐ์ ์นด๋·๊ฐ์ ๊ธฐ๊ฐ)์ ์์ ์์๋ ๋ค์ ์ฒด์ด๋ ๋ฉ์์ง ํฌ๊ธฐ๋ 3,609 bytes์ ๋ถ๊ณผํ์ต๋๋ค.
payload ๋ฐ์ดํฐ ํฌ๊ธฐ๋ฅผ ์ฝ 67% ์ ๊ฐํ ๊ฒฐ๊ณผ์ ๋๋ค.
์ด๋ฐ ์ฑ๋ฅ ๊ฐ์ ์ ๋จ์ํ ์ต์ ํ๊ฐ ์๋๋ผ,
- Worker ์ฒ๋ฆฌ์๋ ๊ฐ์
- JSON parse ๋น์ฉ ๊ฐ์
- ๋คํธ์ํฌ ํธ๋ํฝ ํฌ๊ธฐ ๊ฐ์
- CPU/GC ๊ฐ์
- ์์คํ ์์ ์ฑ ํฅ์
์ผ๋ก ์ด์ด์ง๋๋ค. ๋๋ถ์ด ์ด ๋ชจ๋ ๊ฒ์ด ์ฒด์ธ ๊ธธ์ด๊ฐ ๊ธธ์ด์ง์๋ก ๋ ๋น์ ๋ฐํฉ๋๋ค.
โจ๋ง๋ฌด๋ฆฌ
Kafka์์๋ ๋ฉ์์ง๊ฐ “๋ฐ์ดํฐ๋ฅผ ์ด๋ฐํ๋ ์ปจํ
์ด๋”์๊ธฐ ๋๋ฌธ์, ์ฒด์ด๋์ด ๊น์ด์ง์๋ก payload๊ฐ ๋์ ๋๊ณ ์์คํ
๋ฆฌ์์ค ๋ถ๋ด์ด ์ปค์ง ์๋ฐ์ ์์์ต๋๋ค.
๋ฐ๋ฉด PGMQ๋ ๋ฉ์์ง๋ฅผ “์์
๋จ์(Task)”๋ก ์ฌ์ ์ํ๊ณ , ๊ฒฐ๊ณผ๋ ๋ด๋ถ ์ํ๋ก ์ ์ฅํ๋ ๊ตฌ์กฐ๋ฅผ ์ทจํจ์ผ๋ก์จ ์ฒด์ด๋ ๋จ๊ณ๊ฐ ๋์ด๋๋๋ผ๋ payload๋ ์ต์ ์์ค์ผ๋ก ์ ์ง๋ฉ๋๋ค.
์ด๋ ๋จ์ง ๋ฉ์์ง ํฌ๊ธฐ๋ฅผ ์ค์๋ค๋ ์์ค์ ๊ฐ์ ์ด ์๋๋ผ, ๋ฉ์์ง ๋ชจ๋ธ ์์ฒด๋ฅผ ‘๋ฐ์ดํฐ ์ด๋ฐ’์์ ‘์
๋ฌด ์ํ ์ ์ฅ’์ผ๋ก ์ ํํ ๊ฒฐ๊ณผ์
๋๋ค.
์ด ๊ตฌ์กฐ๊ฐ ์ฅ๊ธฐ์ ์ผ๋ก ๋ ๋ง์ ์ฒ๋ฆฌ๋, ๋ ์์ ์ ์ธ Worker, ๋ ๋ฎ์ ์ด์ ๋น์ฉ์ ๋ณด์ฅํฉ๋๋ค.
๊ฒฐ๊ตญ PGMQ Task Chaining์ “์คํ์ ํ๋ฆ๋ง ์ ๋ฌํ๊ณ ๋ฐ์ดํฐ๋ ์์ ํ๊ฒ ์ ์ฅํ๋ค”๋ ์ค๊ณ๋ฅผ ์ค์ฌ์ผ๋ก, ๋๊ท๋ชจ Scraping·Batch·ERP ์ฐ๊ณ ์
๋ฌด์์ ์ง์ ๊ฐ๋ฅํ ๋ฉ์์ง ์์คํ
์ ๊ตฌ์ถํ๋ ํต์ฌ ์ ๋ต์์ ํ์ธํ ์ ์์์ต๋๋ค.
'Message Queue' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| Kafka ์ RabbitMQ ์ ์ฐจ์ด์ (0) | 2024.11.20 |
|---|
๋๊ธ