본문 바로가기
카테고리 없음

BlockingCollection은 “스레드를 대신하는 것”이 아니라,“스레드/Task 사이에서 안전하게 데이터를 주고받기 위한 큐(컨테이너)”입니다.

by 공부봇 2025. 11. 7.
반응형

BlockingCollection은 “스레드를 대신하는 것”이 아니라,
“스레드/Task 사이에서 안전하게 데이터를 주고받기 위한 큐(컨테이너)”입니다.

그래서:

  • Thread ≠ BlockingCollection
  • 보통은 Thread/Task + BlockingCollection를 같이 씁니다.
  • 대신, 예전처럼 while(true) { lock(queue) ... Monitor.Wait(...) } 이런 걸 직접 짜던 걸 BlockingCollection으로 대체하는 것이고, 그게 훨씬 유리합니다.

아래에서 차근차근 정리해 보겠습니다.


1. Thread와 BlockingCollection의 역할 차이

1) Thread의 역할

  • 실행 흐름(코드를 실제로 돌리는 주체)
  • new Thread(...), Task.Run(...) 등으로 만들고, 루프 돌리면서 일을 처리
  • I/O 대기, CPU 연산 등 “일 자체”를 수행

장점:

  • 아주 낮은 레벨까지 컨트롤 가능 (우선순위, ApartmentState 등)
  • Legacy 코드에서 익숙함

단점:

  • 직접 만들고, 직접 종료 제어, 예외 처리, 동기화(락, Monitor, AutoResetEvent 등)를 전부 스스로 처리해야 함
  • 코드가 쉽게 복잡해지고, 버그(데드락, 레이스 컨디션)가 생기기 쉽다.

2) BlockingCollection의 역할

  • “안전하게 데이터를 넣고/빼는 큐” + “필요할 때까지 기다렸다가 꺼내는 기능(Blocking)”
  • 내부적으로는 ConcurrentQueue<T> 같은 컬렉션을 감싸고 있고,
  • Add, Take, GetConsumingEnumerable 같은 API를 제공

장점:

  • 락(lock), Monitor.Wait/Pulse를 직접 안 써도 됨
  • 생산자-소비자(Producer/Consumer) 패턴 구현이 매우 쉬움
  • BoundedCapacity 설정으로 큐가 너무 커지는 것(메모리 폭주)을 막아줌
  • 여러 스레드/Task가 동시에 Add / Take 해도 thread-safe

단점:

  • “스레드를 만드는 역할”은 아니다. 소비할 스레드/Task는 여전히 필요
  • .NET Core 이후에는 System.Threading.Channels가 더 현대적인 대안이긴 하지만, C# 7.3 + .NET Framework 환경에서는 여전히 BlockingCollection이 좋은 선택

2. “Thread를 BlockingCollection으로 대체할 수 있나?”에 대한 답

1) 엄밀히 말하면 “대체” 개념이 다릅니다

  • Thread: “코드를 실행하는 주체”
  • BlockingCollection: “데이터를 여러 Thread/Task가 안전하게 공유하는 도구”

그래서:

  • Thread 자체를 BlockingCollection으로 바꿀 수는 없습니다.
  • 하지만 예전에 많이 쓰던 이런 패턴:
Queue<Job> queue = new Queue<Job>();
object _lock = new object();
bool _running = true;

void Worker()
{
    while (_running)
    {
        Job job = null;
        lock (_lock)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(_lock); // 대기
            }
            job = queue.Dequeue();
        }

        Process(job);
    }
}

이런 구조를

BlockingCollection<Job> queue = new BlockingCollection<Job>();

void Worker()
{
    foreach (var job in queue.GetConsumingEnumerable())
    {
        Process(job);
    }
}

이렇게 BlockingCollection 기반으로 대체할 수 있습니다.

즉,

“직접 락 + Queue + Monitor로 만든 생산자-소비자 코드를 BlockingCollection으로 대체하는 것이 좋다”
(Thread/Task는 그대로 사용)


3. 왜 BlockingCollection을 써야 하는가?

정리하면 다음과 같습니다.

  1. 코드 양 감소 + 가독성 향상
    • Lock, Monitor, AutoResetEvent, ManualResetEvent 등 직접 다룰 필요가 줄어듭니다.
    • foreach (var item in collection.GetConsumingEnumerable()) 한 줄로 "대기 + dequeue + 종료 처리"까지 처리.
  2. 자연스러운 종료 패턴 제공
    • CompleteAdding() 한 번만 호출하면,
      GetConsumingEnumerable() 루프가 자연스럽게 끝납니다.
    • 직접 “종료 플래그 + PulseAll” 같은 걸 만들 필요가 없음.
  3. Bounded Capacity로 Back Pressure 구현
    • new BlockingCollection<T>(capacity) 로 만들면,
      큐가 꽉 찼을 때 Add가 자동으로 블록(block) 되므로
      생산자가 너무 빨리 생산해서 메모리가 터지는 상황을 막을 수 있습니다.
  4. 멀티 소비자 / 멀티 생산자 지원
    • 여러 Task가 동시에 Add/Take 해도 thread-safe
    • 멀티코어 환경에서 성능 좋고, 설계가 간단해짐

머신비전처럼:

  • 카메라가 미친 듯이 이미지를 쏟아내고
  • 검사/DB/로그가 뒤에서 처리하는 구조에서는

BlockingCollection + 여러 소비자 Task 구조가 가장 자연스럽고, 튼튼합니다.


4. BlockingCollection 입문: 단계별 예제

모두 C# 7.3 기준입니다.

STEP 1. 가장 기본적인 생산자-소비자 (단일 소비자)

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class BasicExample
{
    public void Run()
    {
        var collection = new BlockingCollection<int>();

        // 소비자 Task
        var consumer = Task.Run(() =>
        {
            foreach (var item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine("Consume: " + item);
                Thread.Sleep(100); // 처리 시간 흉내
            }
            Console.WriteLine("Consumer finished.");
        });

        // 생산자
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine("Produce: " + i);
            collection.Add(i);
        }

        // 더 이상 안 넣겠다고 알림
        collection.CompleteAdding();

        consumer.Wait();
    }
}

동작 설명

  • Add는 큐에 값 넣기
  • GetConsumingEnumerable()는
    • 큐에 값이 없으면 자동으로 기다렸다가, 값이 생기면 꺼냄
    • CompleteAdding() + 큐가 비면 루프 종료

STEP 2. 멀티 소비자 (스레드 여러 개로 병렬 처리)

class MultiConsumerExample
{
    public void Run()
    {
        var collection = new BlockingCollection<int>();

        int workerCount = 3;
        var consumers = new Task[workerCount];

        for (int i = 0; i < workerCount; i++)
        {
            int workerId = i;
            consumers[i] = Task.Run(() =>
            {
                foreach (var item in collection.GetConsumingEnumerable())
                {
                    Console.WriteLine($"[Worker {workerId}] consume {item}");
                    Thread.Sleep(100);
                }
                Console.WriteLine($"[Worker {workerId}] finished.");
            });
        }

        // 생산자
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Produce: " + i);
            collection.Add(i);
        }

        collection.CompleteAdding();
        Task.WaitAll(consumers);
    }
}

특징

  • 동일한 BlockingCollection을 여러 소비자가 동시에 읽어도 자동으로 나눠 가집니다.
  • 비전 검사에서 “Unit 검사 워커 여러 개” 만들 때 이런 방식으로 사용할 수 있습니다.

STEP 3. CancellationToken과 함께 쓰기

실제 장비에서는 정상 종료 / 긴급 정지가 중요합니다.

class CancellationExample
{
    public void Run(CancellationToken token)
    {
        var collection = new BlockingCollection<int>();

        // 소비자
        var consumer = Task.Run(() =>
        {
            try
            {
                foreach (var item in collection.GetConsumingEnumerable(token))
                {
                    Console.WriteLine("Consume: " + item);
                    Thread.Sleep(100);
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Consumer cancelled.");
            }
        }, token);

        // 생산자
        var producer = Task.Run(() =>
        {
            int i = 0;
            try
            {
                while (!token.IsCancellationRequested)
                {
                    collection.Add(i, token);
                    Console.WriteLine("Produce: " + i);
                    i++;
                    Thread.Sleep(50);
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Producer cancelled.");
            }
            finally
            {
                collection.CompleteAdding();
            }
        }, token);

        Task.WaitAll(producer, consumer);
    }
}

STEP 4. Bounded Capacity (큐가 너무 커지지 않게 제어)

머신비전 환경에서 카메라 > 검사속도인 경우,
가만히 두면 큐에 Mat이 수천 개 쌓이면서 메모리 터질 수 있습니다.

class BoundedExample
{
    public void Run()
    {
        // 최대 5개까지만 큐에 쌓이도록 제한
        var collection = new BlockingCollection<int>(boundedCapacity: 5);

        // 소비자
        var consumer = Task.Run(() =>
        {
            foreach (var item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine("Consume: " + item);
                Thread.Sleep(500); // 느린 소비자
            }
        });

        // 생산자
        var producer = Task.Run(() =>
        {
            for (int i = 0; i < 20; i++)
            {
                Console.WriteLine("Try Produce: " + i);
                collection.Add(i); // 큐가 꽉 차면 여기서 BLOCK됨
                Console.WriteLine("Produced: " + i);
            }
            collection.CompleteAdding();
        });

        Task.WaitAll(producer, consumer);
    }
}

중요 포인트

  • 소비자가 느리면, 큐가 5개까지 차고
  • 6번째 Add 호출에서, 빈 공간이 생길 때까지 대기(블록)
  • 즉, 카메라(생산자)의 속도를 검사/DB(소비자) 속도에 맞춰 자연스럽게 제한(Back Pressure)

STEP 5. TryTake / TryAdd (타임아웃 기반 폴링)

  • 때로는 “무한 대기”가 아니라 “1초 기다려보고 없으면 다른 일 하기”가 필요할 수 있습니다.
class TryTakeExample
{
    public void Run()
    {
        var collection = new BlockingCollection<int>();

        // 생산자
        var producer = Task.Run(() =>
        {
            for (int i = 0; i < 5; i++)
            {
                collection.Add(i);
                Console.WriteLine("Produced: " + i);
                Thread.Sleep(500);
            }
            collection.CompleteAdding();
        });

        // 소비자
        Task.Run(() =>
        {
            while (!collection.IsCompleted)
            {
                int item;
                if (collection.TryTake(out item, TimeSpan.FromMilliseconds(200)))
                {
                    Console.WriteLine("Consume: " + item);
                }
                else
                {
                    Console.WriteLine("No item. Do other work...");
                }
            }

            Console.WriteLine("Consumer finished.");
        }).Wait();
    }
}

5. 머신비전 환경에 어떻게 적용?

당신 장비 구조(라인스캔 + 검사 + DB + 로그)에 그대로 매핑하면:

  1. 카메라 → 병합 → MergedFrameQueue
    • 카메라 콜백에서 병합 빌더에 라인 추가
    • 트레이 완료 시 BlockingCollection<MergedFrame>에 Add
    • BoundedCapacity로 설정해서 프레임 폭주 방지
  2. MergedFrameQueue → InspectionService 워커들
    • 여러 Task가 GetConsumingEnumerable()로 병합 이미지를 소비
    • Unit 크롭 + 룰 + AI 검사 수행
    • 결과를 BlockingCollection<InspectionResult>에 Add
  3. InspectionResultQueue → DbUploadService
    • 1개 또는 2개 Task가 InspectionResult를 읽어서 DB Insert/Update
    • DB 쿼리는 ExecuteNonQueryAsync 같은 async로 처리
  4. LogQueue → AsyncLogManager
    • 모든 서비스에서 Log.Info/Debug/Error 호출만 해주면,
    • 내부적으로 BlockingCollection<LogItem>에 Add
    • LogService 한 개 Task가 파일에 순차적으로 기록 (스레드 안전 + 고속)

이 구조를 쓰면:

  • Thread를 직접 관리하는 부분은 거의 없고,
  • Task.Run + BlockingCollection 조합으로 모든 스레드 작업을 통일할 수 있습니다.
  • 코드가 훨씬 읽기 좋고, 장애/종료 제어도 쉬워집니다.

6. 결론

  1. BlockingCollection은 Thread/Task를 “대체”하는 게 아니라,
    Thread/Task 간 데이터 전달을 단순하고 안전하게 만들어 주는 도구
    입니다.
  2. 기존의
    • Queue<T> + lock + Monitor.Wait/Pulse + 종료 플래그
      같은 코드를 BlockingCollection 하나로 깔끔하게 대체할 수 있습니다.
  3. 머신비전 프로젝트처럼
    • 고속 스트림(라인스캔)
    • 병렬 검사
    • DB 업로드
    • 비동기 로그
      가 동시에 돌아가는 환경에서는
    Task + BlockingCollection + CancellationToken + BoundedCapacity
    조합이 구조/안정성/확장성 면에서 가장 유리합니다.

 

반응형