Задачи (параллельное программирование). Часть 2

Эта часть дополняет первую и является, скорее, некоторыми краткими заметками, чем полноценным материалом. Тема весьма обширна и, я думаю, требует отдельного изучения. Здесь же я даю только краткое введение. Изложенный материал и примеры почти полностью взяты из книги А. Гавва "Адское" программирование. Ada-95. Компилятор GNAT." Более подробно информацию можно посмотреть в этой книге.

1. Задачи можно создавать динамически (только нужно не забывать освобождать после их завершения память). Тему динамического создания переменных и записей мы уже рассматривали. С задачами всё аналогично.

2. Любую задачу при необходимости можно завершить аварийно с помощью команды abort:

abort Какая-то_Задача;

Прибегать к этому способу рекомендуется, если только задача зависла. Иначе можно натворить всяких бед. 🙂

3. Не смотря на то, что задача, как и пакет, имеет спецификацию и тело, она не может быть самостоятельно компилируемой единицей и обязательно должна быть помещена в пакет или процедуру.

4. Каждая задача в Аде имеет уникальный идентификатор (Task_ID). Работа с идентификаторами обеспечивается:

a) С помощью средств пакета Ada.Task_Identification (Помимо всего прочего, в этом пакете определяется тип Task_ID (по сути, просто ссылка на Integer)).

b) С помощью атрибутов задачи:

T'Identity   --"T" - любая задача. Возвращает значение типа Task_ID которое
             --идентифицирует "T".
T'Callable   --"T" - любая задача. Возвращает значение "True", если задача "T" может быть
             --вызвана.
T'Terminated --"T" - любая задача. Возвращает "True" если выполнение задачи "T" прекращено.
E'Caller     --"E" - имя любого входа задачи. Возвращает значение типа " Task_ID ", которое
             --идентифицирует обрабатываемую в текущий момент задачу, обратившуюся к входу
             --задачи "E". Использование этого атрибута допустимо только внутри инструкции
             --принятия (для задачи-сервера).
E'Count      --"E" - имя любого входа задачи. Возвращает значение типа "Universal_Integer",
             --показывающее число обращений к входу "E", которые находятся в очереди.

5. В Аде каждой задаче с помощью прагмы компилятора Priority можно указать приоритет:

task My_Task is
    pragma Priority(3);
    ....
end My_Task;

Задача, приоритет которой выше, будет обрабатываться раньше, чем задача с более низким приоритетом. Более того, приоритет задачи может быть изменён в любое время. Такую возможность предоставляет пакет Ada.Dynamic_Priorities. Процедура Set_Priority() позволяет задать приоритет, а функция Get_Priority() - получить приоритет задачи:

Priority : System.Any_Priority; --тип Any_Priority - грубо говоря, это положительное целое
                                --число. Реализацию можно посмотреть в пакете Ada.System
T : Ada.Task_Identification.Task_ID; --Идентификатор задачи.
...
Set_Priority(Priority; T); --Если T не задан, то по умолчанию будет задействован
                           --идентификатор текущей задачи
...
Priority := Get_Priority(T); --Если T не задан, то по умолчанию будет задействован
                             --идентификатор текущей задачи

Реализацию этих подпрограмм можно посмотреть в пакете Ada.Dynamic_Priorities.

6. Ада позволяет задачам использовать разделяемые (общие) переменные, которые могут использоваться для организации взаимодействия задач и/или для хранения каких-либо общих данных.

При выполнении программы возможна ситуация, когда разные задачи одновременно пытаются получить доступ к одной разделяемой переменной. Для избежания таких ситуаций необходима синхронизация доступа к разделяемым переменным. Поэтому в какой-либо момент времени право на доступ к разделяемой переменной необходимо предоставить только одной задаче.

В Аде для указания разделяемых переменных используются следующие директивы компилятора:

pragma Atomic(Имя_Переменной_Или_Типа) ;
pragma Atomic_Components(Имя_Массива_Или_Типа_Массива) ;
pragma Volatile(Имя_Переменной_Или_Типа) ;
pragma Volatile_Components(Имя_Массива_Или_Типа_Массива);

Прагмы Atomic и Atomic_Components обеспечивают непрерывность операций чтения/записи над указанными в них атомарными объектами, причём, операции над ними выполняются только последовательно.

Прагмы Volatile и Volatile_Components обеспечивают выполнение операций чтения/записи над указанными в них объектами непосредственно в памяти.

Примеры применения этих директив компилятора могут иметь следующий вид:

Array_Size : Positive; --Positive - положительное целое
pragma Atomic(Array_Size);
pragma Volatile(Array_Size);
Store_Array is array(1..Array_Size) of Integer;
pragma Atomic_Components(Store_Array);
pragma Volatile_Components(Store_Array);

7. В Аде можно организовать взаимно исключающий доступ к данным из разных, одновременно выполняющихся задач. Для этого используются т.н. защищенные модули.

Пусть у нас выполняются задачи и в определённый момент времени им нужно обменяться информацией. Логично для обмена информацией использовать какие-то переменные. Однако задач может выполняться много и для предотвращения одновременного изменения переменной разными задачами в Аде существуют защищённые модули. Их использование гарантирует, что изменение содержимого переменных будет происходить в режиме взаимного исключения.

Защищённый модуль может быть создан как одиночный защищённый объект или как защищённый (лимитированный!) тип. Пример (примеры взяты из книги А. Гавва "Адское" программирование. Ada-95. Компилятор GNAT.):

--Одиночный защищённый объект предоставляет переменную Data типа Item и две подпрограммы
--для чтения и записи переменной:
 
--спецификация защищенного объекта
protected Variable is
    function Read return Item ;
    procedure Write (New_Value : Item );
private
    Data : Item ;
end Variable ;
 
-- тело защищенного объекта
protected body Variable is
 
    function Read return Item is
    begin
        return Item ;
    end Read;
    procedure Write (New_Value : Item ) is
    begin
        Data := New_Value ;
    end Write;
 
end Variable ;

Таким образом, запись переменной (процедура Read) в конкретный момент времени возможна только одной задачей, а чтение (функция Write) может одновременно осуществляться несколькими задачами одновременно.

Для обращения к защищенным подпрограммам используется оператор точка:

X := Variable.Read;
. . .
Variable.Write(New_Value => Y);

Защищенный модуль может иметь защищенные входы. Защищённые входы предохраняются т.н. барьером (логическим выражением) и, следовательно, результат этого выражения имеет тип Boolean (может быть только True или False).

Если при вызове защищенного входа значение барьера False, то выполнение вызывающей задачи приостанавливается до тех пор, пока значение барьера не станет равным True и внутри защищенного модуля будут отсутствовать активные задачи (задачи, которые выполняют тело какого-либо защищенного входа или какой-либо защищенной подпрограммы). Следовательно, вызов защищенного входа может быть использован для реализации условной синхронизации.

--спецификация защищенного типа
protected type Bounded_Buffer is
    entry Put(X: in Item );
    entry Get(X: out Item );
private
    A: Item_Array( 1 . . Max);
    I , J : Integer range 1..Max := 1;
    Count : Integer range 0..Max := 0;
end Bounded_Buffer ;
--тело защищенного типа
protected body Bounded_Buffer is
    entry Put(X: in Item ) when Count < Max is
    begin
        A(I) := X;
        I := I mod Max + 1; Count := Count + 1;
    end Put;
    entry Get(X: out Item ) when Count > 0 is
    begin
        X := A(J);
        J := J mod Max + 1; Count := Count - 1;
    end Get;
end Bounded_Buffer;

Поведение защищенного типа контролируется барьерами. При вызове входа защищенного объекта выполняется проверка соответствующего барьера. Если значение барьера False, то вызов помещается в очередь, подобно тому, как это происходит при вызове входа задачи. При описании переменной My_Buffer буфер - пуст, и, таким образом, барьер для входа Put имеет значение True, а для входа Get - False. Следовательно, будет выполняться только вызов Put, а вызов Get будет отправлен в очередь.

В конце выполнения тела входа (или тела процедуры) защищенного объекта производится вычисление значений всех барьеров, у которых есть задачи, ожидающие в очереди, разрешая, таким образом, обработку обращений к входам, которые ранее были помещены в очередь в результате того, что значение барьера было вычислено как False. Таким образом, после завершения обработки первого же вызова Put, если вызов Get уже находится в очереди, то значение барьера для Get будет вычислено заново, и это позволит обслужить ожидающий в очереди вызов Get.

Защитный механизм барьеров налагается на обычное взаимное исключение защищенной конструкции, предоставляя, таким образом, два различных уровня защиты.

Входы защищенных объектов имеют атрибуты, назначение которых подобно назначению атрибутов для входов задач ("E" - имя любого входа защищенного объекта):

  • E'Caller - Возвращает значение типа Task_ID, которое идентифицирует обрабатываемую в текущий момент задачу, обратившуюся на вход защищенного объекта "E". Использование этого атрибута допустимо только внутри тела входа (для защищенного объекта).
  • E'Count - Возвращает значение типа Universal_Integer, показывающее число обращений на входе "E", которые находятся в очереди.

8. Иногда необходимо, чтобы  задача-клиент, запрашивающая сервис, была приостановлена после выполнения первого этапа до тех пор, пока какие-либо условия не позволят выполнить последующий этап обработки. В таком случае необходимы последовательные вызовы двух (или более) входов. С целью поддержки построения необходимых управляющих алгоритмов, позволяющих разделить обработку какой-либо задачи на несколько (два и более) этапов и для организации предпочтительного управления, в Аде введена инструкция перенаправления очереди requeue. Она используется при  необходимости перенаправления вызова клиента от одного входа задачи или защищенного модуля к другому входу (или даже к тому же самому входу).

requeue имя_входа [with abort] ;

С помощью инструкции перенаправления requeue, можно просто переслать вызов клиента в очередь другого входа.

Инструкция перенаправления requeue предназначена для обработки двух основных ситуаций:

  • После начала выполнения инструкции принятия accept или тела защищенного входа, может быть определено, что запрос не может быть удовлетворен немедленно. Взамен, существует необходимость перенаправлять вызывающего клиента до тех пор, пока его запрос/вызов не сможет быть обработан.
  • В качестве альтернативы, часть запроса клиента может быть обработана немедленно, но могут существовать дополнительные шаги, которые должны быть выполнены несколько позже.

В обоих случаях, инструкция принятия accept или тело защищенного входа, нуждается в передаче управления. Таким образом, могут быть обработаны запросы от других клиентов или выполнена какая-либо другая обработка. Инструкция перенаправления requeue позволяет разделить обработку оригинального запроса/вызова на два (и более) этапа.

with Ada.Text_IO; use Ada.Text_IO;
 
procedure main is
    --Защищённый объект
    protected Event is
        --вход, на котором задачи будут ожидать возникновения условия для дальнейшего выполнения
        entry Wait(str : access String)
        entry Signal(str : access String); --вход, на котором будет изменяться условие
    private
            --вход, на который будет перенаправляться задача, изменяющая условие, ожидая,
            --когда остальные задачи пройдут вход Wait
            entry Reset;
            Condition : Boolean := False; --условие. Если будет True, то задачи пройдут вход Wait
    end Event;
 
    protected body Event is
        --Вход Wait принимает имя задачи
        entry Wait(str : access String) when Condition is
        begin
            Put_Line(str(1..9) & "прошла вход Wait. Задач в очереди входа Wait:" & Integer'Image(Wait'Count));
            delay 2.0; --Для наглядности (пауза при выводе сообщения на экран)
        end Wait;
 
        entry Signal(str : access String) when True is --барьер всегда True
        begin
            Put_Line("Барьер опущен. " & str.all & " ожидает выполнения задач на входе Wait.");
            delay 3.0; --Для наглядности (пауза при выводе сообщения на экран)
            Condition := True; --Изменение условия
            New_Line;
            --Переход на вход Reset и ожидание на нем, когда все задачи пройдут вход Wait
            --(Wait'Count станет 0)
            requeue Reset;
        end Signal;
 
        --Пока Wait'Count не 0, задача, меняющая условие, будет ожидать на этом входе
        entry Reset when Wait'Count = 0 is 
        begin
            New_Line;
            Condition := False;
            Put_Line("Барьер поднят!");
        end Reset;
    end Event;
 
    --Тип задач, которые будут ожидать на входе Wait
    task type My_Task(s : access String);
    task body My_Task is
    begin
        Put_Line(s.all);
        Event.Wait(s);
    end My_Task;
 
    --Тип задачи, которая будет менять условие
    task type Task_Change_Condition(s : access String);
    task body Task_Change_Condition is
    begin
        New_Line;
        for i in reverse 1..5 loop
            Put_Line(s.all & " ожидает" & Integer'Image(i) & " секунд до опускания барьера.");
            delay 1.0;
        end loop;
        New_Line;
        Event.Signal(s);
    end Task_Change_Condition;
 
    str_A : aliased String := "Задача 1 ожидает на входе Wait";
    str_B : aliased String := "Задача 2 ожидает на входе Wait";
    str_C : aliased String := "Задача 3 ожидает на входе Wait";
    str_Condition : aliased String := "Задача, изменяющая условие Condition";
 
    pstr_A : access String := str_A'Access;
    pstr_B : access String := str_B'Access;
    pstr_C : access String := str_C'Access;
    pstr_Condition : access String := str_Condition'Access;
 
    --Запуск задач
    task_A : My_Task(pstr_A);
    task_B : My_Task(pstr_B);
    task_C : My_Task(pstr_C);
    task_Condition : Task_Change_Condition(pstr_Condition);
begin
    null;
end main;

Задачи task_A, task_B и task_C ожидают событие (когда условие Condition станет True), выполняя вызов:

Event.Wait;

Событие наступает тогда, когда задача task_Condition вызывает вход Signal:

Event.Signal;

после чего Condition становится True и все приостановленные в ожидании события задачи (task_A, task_B и task_C) продолжат свое выполнение. При этом задача task_Condition перенаправляется на вход Reset, где будет ожидать, когда очередь входа Wait очистится. Когда в очереди входа Wait не останется ни одной задачи (Wait'Count = 0), то событие очистится (т.е задача task_Condition пройдёт вход Reset и Condition станет равным False). Таким образом, последующие вызовы входа Wait работают корректно.

Особый интерес представляет вход Signal. Значение его барьера постоянно и всегда равно True, таким образом, он всегда открыт для обработки. Если нет задач ожидающих событие (нет задач вызвавших вход Wait), то его вызов просто ничего не делает. С другой стороны, при наличии задач ожидающих событие, он должен позволить им продолжить выполнение, причем так, чтобы ни одна новая задача не попала в очередь ожидания события, после чего, он должен сбросить флаг, чтобы восстановить управление. Он выполняет это перенаправляя себя на вход Reset (с помощью инструкции перенаправления requeue) после установки флага Condition в значение True, для индикации появления события.

Семантика инструкции перенаправления requeue подобна тому, что описано при рассмотрении алгоритма работы Signal. В конце обработки тела входа или защищенной процедуры осуществляется повторное вычисление состояний тех барьерных условий в очереди которых находятся задачи, приостановленные в ожидании обслуживания. В этом случае, действительно существуют задачи находящиеся в очереди входа Wait (task_A, task_B и task_C), и существует задача в очереди входа Reset (task_Condition). Барьер для Wait теперь имеет значение True, а барьер для Reset, естественно, False, поскольку очередь задач на входе Wait не пуста. Таким образом, ожидающая задача способна выполнить тело входа Wait, после чего опять осуществляется повторное вычисление значений барьеров. Этот процесс повторяется до тех пор, пока все приостановленные в ожидании задачи не возобновят свое выполнение и значение барьера для Reset не получит значение True. Оригинальная задача, которая вызвала сигнал, теперь выполняет тело входа Reset, сбрасывая флаг Condition в значение False, возвращая всю систему в исходное состояние еще раз. Теперь, защищенный объект (как единое целое) полностью освобожден, поскольку нет ни одной ожидающей задачи, ни на одном барьере.

Следует обратить внимание на то, что если какие-либо задачи пытаются вызвать Wait или Signal, когда происходит обработка всего выше описанного процесса, то эти задачи будут заблокированы, поскольку защищенный объект, как единое целое, будет находиться в занятом состоянии. Это иллюстрирует два уровня защиты и является смысловой основой отсутствия возможности появления состязания задач за ресурс.

Вход Reset описан в приватной части защищенного объекта и не может быть вызван извне.

Инструкция requeue может быть указана с принудительным завершением with abort. Такое поведение весьма обосновано. После того как сервер начал обработку запроса, он находится в неустойчивом состоянии, и асинхронное удаление вызывающего клиента может нарушить внутренние структуры данных сервера. В дополнение, при наличии параметров переданных по ссылке, сервер, принимающий вызов, должен быть способен получить доступ к данным вызывающего клиента (таким как данные в стеке). При исчезновении вызывающего клиента, это может привести к "висячим" ссылкам и последующему краху работы программы.

Поскольку не существует единственно возможного наилучшего решения сразу для всех приложений, и поскольку отсутствует легкий обходной путь, то использование инструкции requeue с принудительным завершением (with abort) предоставляет программисту возможность выбора для его приложения наиболее подходящего механизма. В основном, когда допускается аннулирование вызова в процессе перенаправления, сервер будет сохранять состояние своих внутренних структур данных перед началом выполнения инструкции перенаправления с принудительным завершением, после чего, если вызывающий клиент будет удален из следующей очереди, то сервер сможет нормально продолжить свою работу. Когда это не возможно, или когда не требуется аннулирование вызова в процессе перенаправления, будет достаточно использования простого перенаправления, и вызывающий клиент будет удерживаться до полного завершения обработки запроса.

Requeue позволяет перенаправить вызывающего клиента в очередь того же самого или какого-либо другого входа. При этом, вызывающий клиент не обязан заботиться о таком перенаправлении и даже о фактическом количестве необходимых для удовлетворения его запроса этапов обработки, которые вообще могут быть не видимыми извне защищенного типа или типа задачи. Использование такого эффекта, предоставляет гибкие возможности построения сервера со сложной внутренней архитектурой и простым, однозначным для клиента, интерфейсом.

В процессе выполнения перенаправления, не происходит никаких переопределений параметров. Вместо этого, значения параметров прямо переносятся в новый вызов. Если был предусмотрен новый список параметров, то он может включать ссылки на данные, которые локальны для инструкции принятия accept или тела защищенного входа. Это может вызвать некоторые трудности, поскольку выполнение инструкции принятия accept или тела защищенного входа будет завершено в результате выполнения инструкции перенаправления requeue и локальные переменные будут, таким образом, деаллоцированы (deallocated). Необходимо соответствие используемых подтипов между вновь вызываемым целевым входом (если он имеет какие-либо параметры) и текущим входом. Это позволяет использовать то же самое представление для нового множества параметров, когда они передаются по значению (by-copy) или по ссылке (by-reference), а также исключить необходимость размещения (allocate) нового пространства для хранения параметров.

Необходимо отметить, что при выполнении перенаправления, кроме передачи тех же самых параметров, существует еще только одна возможность - не передавать никаких параметров вовсе.

Итого:

  • Инструкция перенаправления requeue допустима внутри тела защищенного входа или внутри инструкции принятия рандеву accept. Целевой вход (допустим тот же самый вход) может быть в той же самой задаче или том же защищенном объекте, или другой задаче или другом защищенном объекте. Возможно использование любых перечисленных комбинаций.
  • Любые фактические параметры оригинального вызова передаются к новому входу. Следовательно, новый вход должен иметь такой же самый профиль параметров или не иметь никаких параметров.