반응형
BlockingCollection은 “스레드를 대신하는 것”이 아니라,
“스레드/Task 사이에서 안전하게 데이터를 주고받기 위한 큐(컨테이너)”입니다.
그래서:
- Thread ≠ BlockingCollection
- 보통은 Thread/Task + BlockingCollection를 같이 씁니다.
- 대신, 예전처럼 while(true) { lock(queue) ... Monitor.Wait(...) } 이런 걸 직접 짜던 걸 BlockingCollection으로 대체하는 것이고, 그게 훨씬 유리합니다.
아래에서 차근차근 정리해 보겠습니다.
1. Thread와 BlockingCollection의 역할 차이
1) Thread의 역할
- 실행 흐름(코드를 실제로 돌리는 주체)
- new Thread(...), Task.Run(...) 등으로 만들고, 루프 돌리면서 일을 처리
- I/O 대기, CPU 연산 등 “일 자체”를 수행
장점:
- 아주 낮은 레벨까지 컨트롤 가능 (우선순위, ApartmentState 등)
- Legacy 코드에서 익숙함
단점:
- 직접 만들고, 직접 종료 제어, 예외 처리, 동기화(락, Monitor, AutoResetEvent 등)를 전부 스스로 처리해야 함
- 코드가 쉽게 복잡해지고, 버그(데드락, 레이스 컨디션)가 생기기 쉽다.
2) BlockingCollection의 역할
- “안전하게 데이터를 넣고/빼는 큐” + “필요할 때까지 기다렸다가 꺼내는 기능(Blocking)”
- 내부적으로는 ConcurrentQueue<T> 같은 컬렉션을 감싸고 있고,
- Add, Take, GetConsumingEnumerable 같은 API를 제공
장점:
- 락(lock), Monitor.Wait/Pulse를 직접 안 써도 됨
- 생산자-소비자(Producer/Consumer) 패턴 구현이 매우 쉬움
- BoundedCapacity 설정으로 큐가 너무 커지는 것(메모리 폭주)을 막아줌
- 여러 스레드/Task가 동시에 Add / Take 해도 thread-safe
단점:
- “스레드를 만드는 역할”은 아니다. 소비할 스레드/Task는 여전히 필요
- .NET Core 이후에는 System.Threading.Channels가 더 현대적인 대안이긴 하지만, C# 7.3 + .NET Framework 환경에서는 여전히 BlockingCollection이 좋은 선택
2. “Thread를 BlockingCollection으로 대체할 수 있나?”에 대한 답
1) 엄밀히 말하면 “대체” 개념이 다릅니다
- Thread: “코드를 실행하는 주체”
- BlockingCollection: “데이터를 여러 Thread/Task가 안전하게 공유하는 도구”
그래서:
- Thread 자체를 BlockingCollection으로 바꿀 수는 없습니다.
- 하지만 예전에 많이 쓰던 이런 패턴:
Queue<Job> queue = new Queue<Job>();
object _lock = new object();
bool _running = true;
void Worker()
{
while (_running)
{
Job job = null;
lock (_lock)
{
while (queue.Count == 0)
{
Monitor.Wait(_lock); // 대기
}
job = queue.Dequeue();
}
Process(job);
}
}
이런 구조를
BlockingCollection<Job> queue = new BlockingCollection<Job>();
void Worker()
{
foreach (var job in queue.GetConsumingEnumerable())
{
Process(job);
}
}
이렇게 BlockingCollection 기반으로 대체할 수 있습니다.
즉,
“직접 락 + Queue + Monitor로 만든 생산자-소비자 코드를 BlockingCollection으로 대체하는 것이 좋다”
(Thread/Task는 그대로 사용)
3. 왜 BlockingCollection을 써야 하는가?
정리하면 다음과 같습니다.
- 코드 양 감소 + 가독성 향상
- Lock, Monitor, AutoResetEvent, ManualResetEvent 등 직접 다룰 필요가 줄어듭니다.
- foreach (var item in collection.GetConsumingEnumerable()) 한 줄로 "대기 + dequeue + 종료 처리"까지 처리.
- 자연스러운 종료 패턴 제공
- CompleteAdding() 한 번만 호출하면,
GetConsumingEnumerable() 루프가 자연스럽게 끝납니다. - 직접 “종료 플래그 + PulseAll” 같은 걸 만들 필요가 없음.
- CompleteAdding() 한 번만 호출하면,
- Bounded Capacity로 Back Pressure 구현
- new BlockingCollection<T>(capacity) 로 만들면,
큐가 꽉 찼을 때 Add가 자동으로 블록(block) 되므로
생산자가 너무 빨리 생산해서 메모리가 터지는 상황을 막을 수 있습니다.
- new BlockingCollection<T>(capacity) 로 만들면,
- 멀티 소비자 / 멀티 생산자 지원
- 여러 Task가 동시에 Add/Take 해도 thread-safe
- 멀티코어 환경에서 성능 좋고, 설계가 간단해짐
머신비전처럼:
- 카메라가 미친 듯이 이미지를 쏟아내고
- 검사/DB/로그가 뒤에서 처리하는 구조에서는
BlockingCollection + 여러 소비자 Task 구조가 가장 자연스럽고, 튼튼합니다.
4. BlockingCollection 입문: 단계별 예제
모두 C# 7.3 기준입니다.
STEP 1. 가장 기본적인 생산자-소비자 (단일 소비자)
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class BasicExample
{
public void Run()
{
var collection = new BlockingCollection<int>();
// 소비자 Task
var consumer = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consume: " + item);
Thread.Sleep(100); // 처리 시간 흉내
}
Console.WriteLine("Consumer finished.");
});
// 생산자
for (int i = 0; i < 10; i++)
{
Console.WriteLine("Produce: " + i);
collection.Add(i);
}
// 더 이상 안 넣겠다고 알림
collection.CompleteAdding();
consumer.Wait();
}
}
동작 설명
- Add는 큐에 값 넣기
- GetConsumingEnumerable()는
- 큐에 값이 없으면 자동으로 기다렸다가, 값이 생기면 꺼냄
- CompleteAdding() + 큐가 비면 루프 종료
STEP 2. 멀티 소비자 (스레드 여러 개로 병렬 처리)
class MultiConsumerExample
{
public void Run()
{
var collection = new BlockingCollection<int>();
int workerCount = 3;
var consumers = new Task[workerCount];
for (int i = 0; i < workerCount; i++)
{
int workerId = i;
consumers[i] = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"[Worker {workerId}] consume {item}");
Thread.Sleep(100);
}
Console.WriteLine($"[Worker {workerId}] finished.");
});
}
// 생산자
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Produce: " + i);
collection.Add(i);
}
collection.CompleteAdding();
Task.WaitAll(consumers);
}
}
특징
- 동일한 BlockingCollection을 여러 소비자가 동시에 읽어도 자동으로 나눠 가집니다.
- 비전 검사에서 “Unit 검사 워커 여러 개” 만들 때 이런 방식으로 사용할 수 있습니다.
STEP 3. CancellationToken과 함께 쓰기
실제 장비에서는 정상 종료 / 긴급 정지가 중요합니다.
class CancellationExample
{
public void Run(CancellationToken token)
{
var collection = new BlockingCollection<int>();
// 소비자
var consumer = Task.Run(() =>
{
try
{
foreach (var item in collection.GetConsumingEnumerable(token))
{
Console.WriteLine("Consume: " + item);
Thread.Sleep(100);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer cancelled.");
}
}, token);
// 생산자
var producer = Task.Run(() =>
{
int i = 0;
try
{
while (!token.IsCancellationRequested)
{
collection.Add(i, token);
Console.WriteLine("Produce: " + i);
i++;
Thread.Sleep(50);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Producer cancelled.");
}
finally
{
collection.CompleteAdding();
}
}, token);
Task.WaitAll(producer, consumer);
}
}
STEP 4. Bounded Capacity (큐가 너무 커지지 않게 제어)
머신비전 환경에서 카메라 > 검사속도인 경우,
가만히 두면 큐에 Mat이 수천 개 쌓이면서 메모리 터질 수 있습니다.
class BoundedExample
{
public void Run()
{
// 최대 5개까지만 큐에 쌓이도록 제한
var collection = new BlockingCollection<int>(boundedCapacity: 5);
// 소비자
var consumer = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consume: " + item);
Thread.Sleep(500); // 느린 소비자
}
});
// 생산자
var producer = Task.Run(() =>
{
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Try Produce: " + i);
collection.Add(i); // 큐가 꽉 차면 여기서 BLOCK됨
Console.WriteLine("Produced: " + i);
}
collection.CompleteAdding();
});
Task.WaitAll(producer, consumer);
}
}
중요 포인트
- 소비자가 느리면, 큐가 5개까지 차고
- 6번째 Add 호출에서, 빈 공간이 생길 때까지 대기(블록)
- 즉, 카메라(생산자)의 속도를 검사/DB(소비자) 속도에 맞춰 자연스럽게 제한(Back Pressure)
STEP 5. TryTake / TryAdd (타임아웃 기반 폴링)
- 때로는 “무한 대기”가 아니라 “1초 기다려보고 없으면 다른 일 하기”가 필요할 수 있습니다.
class TryTakeExample
{
public void Run()
{
var collection = new BlockingCollection<int>();
// 생산자
var producer = Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
collection.Add(i);
Console.WriteLine("Produced: " + i);
Thread.Sleep(500);
}
collection.CompleteAdding();
});
// 소비자
Task.Run(() =>
{
while (!collection.IsCompleted)
{
int item;
if (collection.TryTake(out item, TimeSpan.FromMilliseconds(200)))
{
Console.WriteLine("Consume: " + item);
}
else
{
Console.WriteLine("No item. Do other work...");
}
}
Console.WriteLine("Consumer finished.");
}).Wait();
}
}
5. 머신비전 환경에 어떻게 적용?
당신 장비 구조(라인스캔 + 검사 + DB + 로그)에 그대로 매핑하면:
- 카메라 → 병합 → MergedFrameQueue
- 카메라 콜백에서 병합 빌더에 라인 추가
- 트레이 완료 시 BlockingCollection<MergedFrame>에 Add
- BoundedCapacity로 설정해서 프레임 폭주 방지
- MergedFrameQueue → InspectionService 워커들
- 여러 Task가 GetConsumingEnumerable()로 병합 이미지를 소비
- Unit 크롭 + 룰 + AI 검사 수행
- 결과를 BlockingCollection<InspectionResult>에 Add
- InspectionResultQueue → DbUploadService
- 1개 또는 2개 Task가 InspectionResult를 읽어서 DB Insert/Update
- DB 쿼리는 ExecuteNonQueryAsync 같은 async로 처리
- LogQueue → AsyncLogManager
- 모든 서비스에서 Log.Info/Debug/Error 호출만 해주면,
- 내부적으로 BlockingCollection<LogItem>에 Add
- LogService 한 개 Task가 파일에 순차적으로 기록 (스레드 안전 + 고속)
이 구조를 쓰면:
- Thread를 직접 관리하는 부분은 거의 없고,
- Task.Run + BlockingCollection 조합으로 모든 스레드 작업을 통일할 수 있습니다.
- 코드가 훨씬 읽기 좋고, 장애/종료 제어도 쉬워집니다.
6. 결론
- BlockingCollection은 Thread/Task를 “대체”하는 게 아니라,
Thread/Task 간 데이터 전달을 단순하고 안전하게 만들어 주는 도구입니다. - 기존의
- Queue<T> + lock + Monitor.Wait/Pulse + 종료 플래그
같은 코드를 BlockingCollection 하나로 깔끔하게 대체할 수 있습니다.
- Queue<T> + lock + Monitor.Wait/Pulse + 종료 플래그
- 머신비전 프로젝트처럼
- 고속 스트림(라인스캔)
- 병렬 검사
- DB 업로드
- 비동기 로그
가 동시에 돌아가는 환경에서는
조합이 구조/안정성/확장성 면에서 가장 유리합니다.
반응형