Producent i konsument – przykład użycia słowa kluczowego lock

Niejednokrotnie podczas pisania aplikacji napotyka się na sytuację gdy jedna metoda produkuje pewne dane, inna natomiast w pewien sposób je konsumuje. Czasem dobrym pomysłem jest, w przypadku gdy produkowane dane są w pewien sposób podzielne na części, wykonywać produkcję i konsumpcję w równoległych wątkach. Tutaj pojawia się istotny problem z zagadnienia wielowątkowości – synchronizacja. Oba (wszystkie) wątki współdzielące dany zasób muszą z niego korzystać w pewien ustalony sposób, tak aby w danej chwili korzystał z niego tylko jeden z wątków. W uproszczeniu, zapewnienie tego stanu nazywamy synchronizacją. Problem ten jest na tyle niebanalny i powszechny, że platforma .NET wspomaga synchronizację za pomocą wbudowanych mechanizmów. Jednym z nich jest słowo kluczowe lock. Użycie słowa kluczowego lock wygląda tak:

 lock(somethingSharedToLock)
{
     // operations on the locked object
}

Jak widać składnia jest bardzo prosta, ale co tak właściwie daje nam lock? Użycie słowa kluczowego lock gwarantuje nam, iż żaden inny wątek nie będzie ingerował w zablokowany obiekt podczas wykonywania instrukcji z nawiasu klamrowego. Natomiast jeśli obiekt jest zablokowany przez inny wątek, to kolejny lock będzie cierpliwie czekał na zwolnienie obiektu i dopiero wtedy go zablokuje. Pięknie, prawda? :) Oczywiście lock, jak wszystko – ma swoje wady. Częste i nierozważne lockowanie może doprowadzić do tzw. zakleszczenia (ang. deadlock). Starczy tej teorii, przejdźmy do przykładu (przykład z całą pewnością nie pokazuje best practices tworzenia kodu, ale chodziło mi o maksymalne uproszczenie przykładu).

Stwórzmy sobie prostą klasę producenta:

class Producer
{
public bool IsFinished
{
    get;
    set;
}
public List List
{
    get;
    set;
}
public Producer()
{
    List = new List();
    IsFinished = false;
}
public void Produce()
{
    Random random = new Random();

    for (int i = 0; i < 20; i++)
    {
        List.Add(random.Next(0, 20));
        Console.WriteLine("Producer[{0}]={1}", i, List[i]);
        Thread.Sleep(500);
    }
    IsFinished = true;
    Console.WriteLine("Producer has finished his work");
}
}

Klasa ta, jak widać, w konstruktorze przyjmuję listę, a następnie po wywołaniu Produce() wpisuje do tej listy losowe liczby całkowite z przedziału <0,20) wypisując je przy okazji. Klasa posiada pole które poinformuje nas, że praca została zakończona. Usypianie w metodzie produkującej umożliwi nam zaobserwowanie pewnej ciekawej rzeczy, ale o tym dalej.

Klasa konsumenta:

class Consument
{
    private Producer producer;
    public Consument(Producer producer)
    {
        this.producer = producer;
    }
    public void Consume()
    {
        int currentElement = 0;
        int elementCount = 0;

        while (true)
        {
              elementCount = producer.List.Count;

              if (producer.List.Count > currentElement)
              {
                   producer.List[currentElement] += 2;
                   Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
                   currentElement++;
              }
              if (producer.List.Count == currentElement && producer.IsFinished)
                   break;
         }
         Console.WriteLine("Consument has finished his work");
    }
}

Konsument, w konstruktorze przyjmuje producenta, a po wywołaniu Consume(), o ile pojawiło się coś nowego zwiększa wartość każdego elementu o 2, a następnie go wypisuje. Metoda kończy swoje działanie w momencie, gdy Producent da o tym znać i wszystkie elementy zostaną zmienione.

Pozostaje nam odpalić produkcje i konsumpcje w oddzielnych wątkach:

private static void Main(string[] args)
{
     Producer producer = new Producer();
     Consument consument = new Consument(producer);

     Thread producerThread = new Thread(new ThreadStart(producer.Produce));
     Thread consumentThread = new Thread(new ThreadStart(consument.Consume));

     producerThread.Start();
     consumentThread.Start();
     producerThread.Join();
     consumentThread.Join();
}

Wynik działania programu (przykładowy!!):

Producer[0]=0
Consument[0]=2
Producer[1]=4
Consument[1]=4
Producer[2]=17
Consument[2]=17
Producer[3]=8
Consument[3]=8
Producer[4]=0
Consument[4]=2
Producer[5]=16
Consument[5]=16
Producer[6]=6
Consument[6]=6
Producer[7]=13
Consument[7]=13
Producer[8]=17
Consument[8]=19
Producer[9]=9
Consument[9]=11
Producer[10]=15
Consument[10]=17
Producer[11]=14
Consument[11]=14
Producer[12]=11
Consument[12]=13
Producer[13]=3
Consument[13]=5
Producer[14]=15
Consument[14]=15
Producer[15]=13
Consument[15]=13
Producer[16]=16
Consument[16]=18
Producer[17]=5
Consument[17]=7
Producer[18]=2
Consument[18]=2
Producer[19]=14
Consument[19]=16
Producer has finished his work
Consument has finished his work

Ups! Widzimy, że w wielu przypadkach (1, 2, 3, 5, 6, 7, 11, 14, 15, 18 - czyli w połowie wszystkich) wartość producenta nie została zmieniona, tak jakbyśmy chcieli. Czemu się tak stało? Otóż produkcja kolejnych elementów trwa dłużej niż konsumpcja (poprzez wspomniane wcześniej Thread.Sleep(500)) co powoduje iż w pewnych momentach dana jest "jeszcze" tworzona, a próbuje już być skonsumowana, co powoduje iż proces konsumpcji (dodanie do wartości 2) zostaje pominięte. Uniknięcie tej katastrofy jest możliwe dzięki wspomnianemu lockowi. Wystarczy napisać:

lock (((ICollection)List).SyncRoot)
{
    List.Add(random.Next(0, 20));
    Console.WriteLine("Producer[{0}]={1}", i, List[i]);
    Thread.Sleep(500);
}

oraz:

lock (((ICollection)producer.List).SyncRoot)
{
    elementCount = producer.List.Count;

    if (producer.List.Count > currentElement)
    {
        producer.List[currentElement] += 2;
        Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
        currentElement++;
    }
    if (producer.List.Count == currentElement && producer.IsFinished)
        break;
}

To zapewni nam prawidłowe działanie kodu (jak nie wierzycie - sprawdźcie sami :)). A co tak właściwie się stało? Kolekcja w momencie produkowania jest na chwilę blokowana, a więc konsumpcja jest wtedy wstrzymana, gdy produkcja się skończy, konsumpcja zabiera kolekcję dla siebie. Dzięki temu działanie jest zgrabnie zsynchronizowane. Dla kolekcji, w celu synchronizacji należy używać właściwości SyncRoot która jest wymagana przez interfejs ICollection. Wszystkie inne obiekty możemy lockować w sposób bezpośredni (lock(someObject)) chyba, że coś (dokumentacja) podpowiada nam inaczej ;)

To tyle w tej kwestii, dla zainteresowanych polecam poczytać Thread Synchronization oraz How to: Synchronize a Producer and a Consumer Thread.

Pełny listing:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace ProducentConsumerExample
{
    class Example
    {
        private static void Main(string[] args)
        {
            Producer producer = new Producer();
            Consument consument = new Consument(producer);

            Thread producerThread = new Thread(new ThreadStart(producer.Produce));
            Thread consumentThread = new Thread(new ThreadStart(consument.Consume));

            producerThread.Start();
            consumentThread.Start();

            producerThread.Join();
            consumentThread.Join();
        }
    }

    class Producer
    {
        public bool IsFinished
        {
            get;
            set;
        }

        public List List
        {
            get;
            set;
        }

        public Producer()
        {
            List = new List();
            IsFinished = false;
        }

        public void Produce()
        {
            Random random = new Random();

            for (int i = 0; i < 20; i++)
            {
                lock (((ICollection)List).SyncRoot)
                {
                    List.Add(random.Next(0, 20));
                    Console.WriteLine("Producer[{0}]={1}", i, List[i]);
                    Thread.Sleep(500);
                }
            }

            IsFinished = true;

            Console.WriteLine("Producer has finished his work");
        }
    }

    class Consument
    {
        private Producer producer;

        public Consument(Producer producer)
        {
            this.producer = producer;
        }

        public void Consume()
        {
            int currentElement = 0;
            int elementCount = 0;

            while (true)
            {
                lock (((ICollection)producer.List).SyncRoot)
                {
                    elementCount = producer.List.Count;

                    if (producer.List.Count > currentElement)
                    {
                        producer.List[currentElement] += 2;
                        Console.WriteLine("Consument[{0}]={1}", currentElement, producer.List[currentElement]);
                        currentElement++;
                    }
                    if (producer.List.Count == currentElement && producer.IsFinished)
                        break;
                }
            }

            Console.WriteLine("Consument has finished his work");
        }
    }
}