C#でProducer-Consumerパターンのコレクションを長時間維持し、たまに生成/解除する

ただの個人的な調査結果です。

C#で、次のような要求があると仮定します。

  • 1つ以上のソース(UIなりネットワークなり)からデータが入ってくる
  • データが入るスレッドは待たせない
  • 入ったデータは1系列に整理(シリアライズ)され、すぐに処理(UIへの反映など)
  • 上記の入力は、アプリケーション全体の中で、0回以上(せいぜい数回)、ON/OFFされる

全体像

全体としては、「UIから、ある程度時間のかかる作業を起こす」のではなく「UIと並列で処理する。時々開始や終了がある(のでアプリケーションよりは短いライフサイクルとなる)」ものなので、async/awaitではなくTaskを直接扱うことにします*1

シリアライズには、C# 4.0で加わったSystem.Collection.Concurrent.BlockingCollection<T>クラスがあっています。

今回は、入力ON/OFFをボタンのクリック、入力データの処理をテキストボックスへの追加という形にしてみます。あと、手抜きなので入力は別のボタンのクリックを当てはめてみます。

BlockingCollection(キャンセルつき)

BlockingCollectionは、Takeメソッドを呼んだ時、コレクションにデータがないとブロック(無限ループ)します。.NETライブラリでの処理なので負荷も小さいでしょう。

さらに、入力処理のOFFを行うためには、Take(CancellationToken)メソッドを使います。Tokenの元(CancellationTokenSourceインスタンス)でCancel()メソッドを実行すれば、自動的にTakeメソッドは中断され、OperationCanceledException例外を投げます。

TaskとBlockingCollection、CancellationTokenを組み合わせる

上記の内容を、入力をstring型として、仮コードにしてみます。

// 非同期処理用インスタンス
BlockingCollection<string> bc;
CancellationTokenSource cts;

// 入力開始(どこかのメソッド内の処理)
cts = new CancellationTokenSource();
bc = new BlockingCollection<string>();

Task.Run(() =>
{
    while (cts.IsCancellationRequested == false)
    {
        try
        {
            var s = bc.Take(cts.Token); // 適宜ブロック
            // sを使った処理
        }
        catch (OperationCanceledException) { } // キャンセル時
    }
});

// 入力終了(どこかのメソッド内の処理)
cts.Cancel();

// 入力そのもの(どこかのメソッド内の処理)
bc.Add("Hello");

無限ループの判定とBlockingCollection.Takeで、まとめて同一のCancellationTokenSourceインスタンス(cts)を使っています。このため、cts.Cancel()の呼び出しで入力処理を一気に終わらせられます。

GUIサンプル

この処理のテスト用に、WinFormsでボタン3つと複数行テキストボックスがあるフォームを作りました。

フォームには次のコントロールがおいてあります。

  • button1 : 入力処理をONにするボタン
  • button2 : 入力処理をOFFにするボタン。初期状態でEnabled=false
  • button3 : 入力を行うボタン。初期状態でEnabled=false
  • textBox1 : 入力データの処理先。MultiLine=trueかつScrollBars = ScrollBars.Vertical。button2で入力処理をOFFした時も表示する。
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Diagnostics;
using System.Collections.Concurrent;

namespace WinFormApplication01
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        BlockingCollection<string> bc;
        CancellationTokenSource cts;

        // Utility:呼び出し元スレッドを無視してTextBoxに文字列を追加
        void AddMessage(string msg)
        {
            textBox1.Invoke(new Action(() => {
                textBox1.AppendText(msg + Environment.NewLine);
            }));
        }

        private void button1_Click(object sender, EventArgs e)
        {
            cts = new CancellationTokenSource();
            bc = new BlockingCollection<string>();

            Task.Run(() =>
            {
                while (cts.IsCancellationRequested == false)
                {
                    string s;
                    try
                    {
                        s = bc.Take(cts.Token);
                    }
                    catch (OperationCanceledException)
                    {
                        s = "[canceled]";
                    }
                    AddMessage(DateTime.Now.ToString() + " " + s);
                }
            }).ContinueWith(t =>
            {
                cts.Dispose();
                bc.Dispose();
                button1.Invoke(new Action(() =>
                {
                    button3.Enabled = false;
                    button2.Enabled = false;
                    button1.Enabled = true;
                }));
            });
            button1.Enabled = false;
            button2.Enabled = true;
            button3.Enabled = true;
        }

        private void button2_Click(object sender, EventArgs e)
        {
            cts.Cancel();
        }

        private void button3_Click(object sender, EventArgs e)
        {
            bc.Add("[Add]");
        }
    }
}

ここでは、入力処理のOFF後の処理(disposeやGUIのEnabled切替)のためにContinueWithを使っています。

*1:こういう場合でもasync/awaitの方がいいようならご教示いただければ幸いです。