Мобильное программирование приложений реального времени в стандарте POSIX

         

Опрос и изменение атрибутов потоков управления


Следуя классическому принципу "познай самого себя", описание функций, обслуживающих потоки управления, мы начнем с функции pthread_self(), возвращающей в качестве результата идентификатор вызвавшего ее потока (см. листинг 1.1).

#include <pthread.h> pthread_t pthread_self (void);

Листинг 1.1. Описание функции pthread_self(). (html, txt)

Выше мы отмечали, что тип pthread_t трактуется стандартом POSIX-2001 как абстрактный. На уровне языка C он может быть представлен, например, структурой. Для работы со значениями типа pthread_t предусмотрены два метода: присваивание и сравнение на равенство, реализуемое функцией pthread_equal() (см. листинг 1.2).

#include <pthread.h> int pthread_equal (pthread_t t1, pthread_t t2);

Листинг 1.2. Описание функции pthread_equal(). (html, txt)

Если значения аргументов t1 и t2 равны, результат функции pthread_equal() отличен от нуля.

Атрибуты потоков управления, используемые при создании последних, сгруппированы в упоминавшиеся выше атрибутные объекты. Для инициализации и разрушения атрибутных объектов служат функции pthread_attr_init() и pthread_attr_destroy() (см. листинг 1.3).

#include <pthread.h>

int pthread_attr_init ( pthread_attr_t *attr);

int pthread_attr_destroy ( pthread_attr_t *attr);

Листинг 1.3. Описание функций pthread_attr_init() и pthread_attr_destroy(). (html, txt)

Функция pthread_attr_init() инициализирует атрибутный объект, заданный указателем attr, подразумеваемыми значениями для всех индивидуальных атрибутов потоков управления, предусмотренных реализацией.

Функция pthread_attr_destroy() разрушает заданный атрибутный объект. Впрочем, "разрушает", возможно, слишком сильный термин. Быть может, реализация просто присваивает значениям атрибутов недопустимые значения. Во всяком случае, разрушенный атрибутный объект в дальнейшем может быть вновь инициализирован.

Структура атрибутных объектов скрыта от приложений, но сам набор стандартизованных атрибутов выглядит вполне естественно.

Следуя классическому принципу "познай самого себя", описание функций, обслуживающих потоки управления, мы начнем с функции pthread_self(), возвращающей в качестве результата идентификатор вызвавшего ее потока (см. листинг 1.1).

#include <pthread.h> pthread_t pthread_self (void);

Листинг 1.1. Описание функции pthread_self().

Выше мы отмечали, что тип pthread_t трактуется стандартом POSIX-2001 как абстрактный. На уровне языка C он может быть представлен, например, структурой. Для работы со значениями типа pthread_t предусмотрены два метода: присваивание и сравнение на равенство, реализуемое функцией pthread_equal() (см. листинг 1.2).

#include <pthread.h> int pthread_equal (pthread_t t1, pthread_t t2);

Листинг 1.2. Описание функции pthread_equal().

Если значения аргументов t1 и t2 равны, результат функции pthread_equal() отличен от нуля.

Атрибуты потоков управления, используемые при создании последних, сгруппированы в упоминавшиеся выше атрибутные объекты. Для инициализации и разрушения атрибутных объектов служат функции pthread_attr_init() и pthread_attr_destroy() (см. листинг 1.3).



#include <pthread.h>

int pthread_attr_init ( pthread_attr_t *attr);

int pthread_attr_destroy ( pthread_attr_t *attr);

Листинг 1.3. Описание функций pthread_attr_init() и pthread_attr_destroy().

Функция pthread_attr_init() инициализирует атрибутный объект, заданный указателем attr, подразумеваемыми значениями для всех индивидуальных атрибутов потоков управления, предусмотренных реализацией.

Функция pthread_attr_destroy() разрушает заданный атрибутный объект. Впрочем, "разрушает", возможно, слишком сильный термин. Быть может, реализация просто присваивает значениям атрибутов недопустимые значения. Во всяком случае, разрушенный атрибутный объект в дальнейшем может быть вновь инициализирован.

Структура атрибутных объектов скрыта от приложений, но сам набор стандартизованных атрибутов выглядит вполне естественно. Их описание мы начнем с атрибутов стека – начального адреса и размера – и методов для их опроса и установки (см.


Их описание мы начнем с атрибутов стека – начального адреса и размера – и методов для их опроса и установки (см. листинг 1.4).

#include <pthread.h>

int pthread_attr_getstack ( const pthread_attr_t *restrict attr, void **restrict stackaddr, size_t *restrict stacksize);

int pthread_attr_setstack ( pthread_attr_t *attr, void *stackaddr, size_t stacksize);

Листинг 1.4. Описание функций pthread_attr_getstack() и pthread_attr_setstack(). (html, txt)

Размер стека должен составлять не менее PTHREAD_STACK_MIN, начальный адрес – должным образом выровнен. Память, отведенная под стек, должна быть доступна на чтение и запись.

Функция pthread_attr_getstack() помещает атрибуты стека по указателям stackaddr и stacksize. Это – проявление единообразной для семейства функций pthread*(), обслуживающих потоки управления, дисциплины возврата результатов. Содержательные данные помещаются в выходные аргументы. При нормальном завершении результат функции равен нулю; в противном случае выдается код ошибки.

Подобная дисциплинированность является похвальной, но вынужденной. Ее причина – в разделении данных между потоками. Нельзя просто вернуть указатель на статический буфер – другой поток может в это время так или иначе работать с ним. Поэтому поток должен зарезервировать индивидуальные области памяти для размещения выходных значений (обратившись, например, к malloc()) и передать функции указатели на них.

Для опроса и изменения размера защитной области, служащей цели обнаружения переполнения стека, предназначены функции pthread_attr_getguardsize() и pthread_attr_setguardsize() (см. листинг 1.5).

#include <pthread.h>

int pthread_attr_getguardsize ( const pthread_attr_t *restrict attr, size_t *restrict guardsize);

int pthread_attr_setguardsize ( pthread_attr_t *attr, size_t guardsize);

Листинг 1.5. Описание функций pthread_attr_getguardsize() и pthread_attr_setguardsize(). (html, txt)

Если значение аргумента guardsize функции pthread_attr_setguardsize() равно нулю, при создании потоков управления с атрибутным объектом *attr защитная область отводиться не будет.


Положительные величины guardsize также становятся новыми значениями одноименного атрибута, однако являются лишь указанием операционной системе; реальный размер защитной области может быть больше заданного.

Приложение, соответствующее стандарту POSIX, должно использовать значения guardsize, кратные конфигурационной константе PAGESIZE, которая одновременно является подразумеваемым значением данного атрибута.

Если приложение посредством функции pthread_attr_setstack() взяло на себя управление стеками потоков, атрибут guardsize игнорируется, операционная система не отводит защитную область, а контроль за переполнением стека возлагается на приложение.

Отметим, что, в зависимости от ситуации, приложениям есть смысл как отказываться от защитных областей (например, если потоков управления много, памяти на защитные области уходит также много, и авторы приложения уверены, что переполнения стека быть не может), так и делать их размер больше подразумеваемого (например, если используются большие массивы и указатель стека рискует оказаться за верхней границей защитной области).

Стандартом POSIX-2001 предусмотрена группа атрибутов, обслуживающих планирование потоков управления. Соответствующие описания размещены в заголовочном файле <sched.h>. Центральную роль среди них играет структура типа sched_param, которая должна содержать по крайней мере поле int sched_priority; /* Приоритет планирования при выполнении потока */

Реализация может поддерживать политику планирования SCHED_SPORADIC (спорадическое планирование), предусматривающую резервирование определенного количества вычислительной мощности для обработки с заданным приоритетом неких единичных, непериодических (спорадических) событий. В этом случае должны быть определены конфигурационные константы _POSIX_SPORADIC_SERVER и/или _POSIX_THREAD_SPORADIC_SERVER, а в структуре sched_param должны присутствовать следующие дополнительные поля.

int sched_ss_low_priority; /* Нижняя граница приоритета */ /* планирования сервера */ /* спорадических событий */ struct timespec sched_ss_repl_period; /* Период пополнения бюджета */ /* спорадического сервера */ struct timespec sched_ss_init_budget; /* Начальный бюджет */ /* спорадического сервера */ int sched_ss_max_repl; /* Максимальное число */ /* ждущих операций */ /* пополнений бюджета */ /* спорадического сервера */



Для опроса и установки атрибутов планирования в атрибутных объектах служат функции pthread_attr_getschedparam() и pthread_attr_setschedparam() (см. листинг 1.6).

#include <pthread.h>

int pthread_attr_getschedparam ( const pthread_attr_t *restrict attr, struct sched_param *restrict param);

int pthread_attr_setschedparam ( pthread_attr_t *restrict attr, const struct sched_param *restrict param);

Листинг 1.6. Описание функций pthread_attr_getschedparam() и pthread_attr_setschedparam(). (html, txt)

Атрибут "политика планирования", способный принимать значения SCHED_FIFO (планирование по очереди), SCHED_RR (циклическое планирование), SCHED_OTHER ("прочее" планирование) и, возможно, SCHED_SPORADIC (спорадическое планирование), можно опросить и установить посредством функций pthread_attr_getschedpolicy() и pthread_attr_setschedpolicy() (см. листинг 1.7).

#include <pthread.h>

int pthread_attr_getschedpolicy ( const pthread_attr_t *restrict attr, int *restrict policy);

int pthread_attr_setschedpolicy ( pthread_attr_t *attr, int policy);

Листинг 1.7. Описание функций pthread_attr_getschedpolicy() и pthread_attr_setschedpolicy(). (html, txt)



листинг 1.4).

#include <pthread.h>

int pthread_attr_getstack ( const pthread_attr_t *restrict attr, void **restrict stackaddr, size_t *restrict stacksize);

int pthread_attr_setstack ( pthread_attr_t *attr, void *stackaddr, size_t stacksize);

Листинг 1.4. Описание функций pthread_attr_getstack() и pthread_attr_setstack().

Размер стека должен составлять не менее PTHREAD_STACK_MIN, начальный адрес – должным образом выровнен. Память, отведенная под стек, должна быть доступна на чтение и запись.

Функция pthread_attr_getstack() помещает атрибуты стека по указателям stackaddr и stacksize. Это – проявление единообразной для семейства функций pthread*(), обслуживающих потоки управления, дисциплины возврата результатов. Содержательные данные помещаются в выходные аргументы. При нормальном завершении результат функции равен нулю; в противном случае выдается код ошибки.

Подобная дисциплинированность является похвальной, но вынужденной. Ее причина – в разделении данных между потоками. Нельзя просто вернуть указатель на статический буфер – другой поток может в это время так или иначе работать с ним. Поэтому поток должен зарезервировать индивидуальные области памяти для размещения выходных значений (обратившись, например, к malloc()) и передать функции указатели на них.

Для опроса и изменения размера защитной области, служащей цели обнаружения переполнения стека, предназначены функции pthread_attr_getguardsize() и pthread_attr_setguardsize() (см. листинг 1.5).

#include <pthread.h>

int pthread_attr_getguardsize ( const pthread_attr_t *restrict attr, size_t *restrict guardsize);

int pthread_attr_setguardsize ( pthread_attr_t *attr, size_t guardsize);

Листинг 1.5. Описание функций pthread_attr_getguardsize() и pthread_attr_setguardsize().

Если значение аргумента guardsize функции pthread_attr_setguardsize() равно нулю, при создании потоков управления с атрибутным объектом *attr защитная область отводиться не будет. Положительные величины guardsize также становятся новыми значениями одноименного атрибута, однако являются лишь указанием операционной системе; реальный размер защитной области может быть больше заданного.



Приложение, соответствующее стандарту POSIX, должно использовать значения guardsize, кратные конфигурационной константе PAGESIZE, которая одновременно является подразумеваемым значением данного атрибута.

Если приложение посредством функции pthread_attr_setstack() взяло на себя управление стеками потоков, атрибут guardsize игнорируется, операционная система не отводит защитную область, а контроль за переполнением стека возлагается на приложение.

Отметим, что, в зависимости от ситуации, приложениям есть смысл как отказываться от защитных областей (например, если потоков управления много, памяти на защитные области уходит также много, и авторы приложения уверены, что переполнения стека быть не может), так и делать их размер больше подразумеваемого (например, если используются большие массивы и указатель стека рискует оказаться за верхней границей защитной области).

Стандартом POSIX-2001 предусмотрена группа атрибутов, обслуживающих планирование потоков управления. Соответствующие описания размещены в заголовочном файле <sched.h>. Центральную роль среди них играет структура типа sched_param, которая должна содержать по крайней мере поле int sched_priority; /* Приоритет планирования при выполнении потока */

Реализация может поддерживать политику планирования SCHED_SPORADIC (спорадическое планирование), предусматривающую резервирование определенного количества вычислительной мощности для обработки с заданным приоритетом неких единичных, непериодических (спорадических) событий. В этом случае должны быть определены конфигурационные константы _POSIX_SPORADIC_SERVER и/или _POSIX_THREAD_SPORADIC_SERVER, а в структуре sched_param должны присутствовать следующие дополнительные поля.

int sched_ss_low_priority; /* Нижняя граница приоритета */ /* планирования сервера */ /* спорадических событий */ struct timespec sched_ss_repl_period; /* Период пополнения бюджета */ /* спорадического сервера */ struct timespec sched_ss_init_budget; /* Начальный бюджет */ /* спорадического сервера */ int sched_ss_max_repl; /* Максимальное число */ /* ждущих операций */ /* пополнений бюджета */ /* спорадического сервера */



Для опроса и установки атрибутов планирования в атрибутных объектах служат функции pthread_attr_getschedparam() и pthread_attr_setschedparam() (см. листинг 1.6).

#include <pthread.h>

int pthread_attr_getschedparam ( const pthread_attr_t *restrict attr, struct sched_param *restrict param);

int pthread_attr_setschedparam ( pthread_attr_t *restrict attr, const struct sched_param *restrict param);

Листинг 1.6. Описание функций pthread_attr_getschedparam() и pthread_attr_setschedparam().

Атрибут "политика планирования", способный принимать значения SCHED_FIFO (планирование по очереди), SCHED_RR (циклическое планирование), SCHED_OTHER ("прочее" планирование) и, возможно, SCHED_SPORADIC (спорадическое планирование), можно опросить и установить посредством функций pthread_attr_getschedpolicy() и pthread_attr_setschedpolicy() (см. листинг 1.7).

#include <pthread.h>

int pthread_attr_getschedpolicy ( const pthread_attr_t *restrict attr, int *restrict policy);

int pthread_attr_setschedpolicy ( pthread_attr_t *attr, int policy);

Листинг 1.7. Описание функций pthread_attr_getschedpolicy() и pthread_attr_setschedpolicy().

Описанный выше атрибут "область планирования конкуренции", способный принимать значения PTHREAD_SCOPE_SYSTEM и PTHREAD_SCOPE_PROCESS, обслуживают функции pthread_attr_getscope() и pthread_attr_setscope() (см. листинг 1.8).

#include <pthread.h>

int pthread_attr_getscope ( const pthread_attr_t *restrict attr, int *restrict contentionscope);

int pthread_attr_setscope ( pthread_attr_t *attr, int contentionscope);

Листинг 1.8. Описание функций pthread_attr_getscope() и pthread_attr_setscope().

При создании потока управления все рассмотренные выше атрибуты планирования, в зависимости от значения PTHREAD_INHERIT_SCHED или PTHREAD_EXPLICIT_SCHED атрибута inheritsched, могут наследоваться у создающего потока или извлекаться из атрибутного объекта. Для опроса и изменения этого атрибута предназначены функции pthread_attr_getinheritsched() и pthread_attr_setinheritsched() (см.


листинг 1.9).

#include <pthread.h>

int pthread_attr_getinheritsched ( const pthread_attr_t *restrict attr, int *restrict inheritsched);

int pthread_attr_setinheritsched ( pthread_attr_t *attr, int inheritsched);

Листинг 1.9. Описание функций pthread_attr_getinheritsched() и pthread_attr_setinheritsched().

Атрибут обособленности потока управления, присутствующий в атрибутном объекте, можно опросить и установить посредством функций pthread_attr_getdetachstate() и pthread_attr_setdetachstate() (см. листинг 1.10).

#include <pthread.h>

int pthread_attr_getdetachstate ( const pthread_attr_t *attr, int *detachstate);

int pthread_attr_setdetachstate ( pthread_attr_t *attr, int detachstate);

Листинг 1.10. Описание функций pthread_attr_getdetachstate() и pthread_attr_setdetachstate().

Напомним, что значение этого атрибута (PTHREAD_CREATE_DETACHED или PTHREAD_CREATE_JOINABLE) определяет, будет ли поток управления создан как обособленный или присоединяемый, то есть доступный другим потокам для ожидания завершения. Подразумеваемым является значение PTHREAD_CREATE_JOINABLE.

Значения атрибутов планирования могут задаваться не только при создании потока управления. Стандарт POSIX-2001 предоставляет средства для их динамического изменения и опроса (см. листинг 1.11).

#include <pthread.h>

int pthread_getschedparam ( pthread_t thread, int *restrict policy, struct sched_param *restrict param);

int pthread_setschedparam ( pthread_t thread, int policy, const struct sched_param *param);

Листинг 1.11. Описание функций pthread_getschedparam() и pthread_setschedparam().

Отметим две тонкости, связанные с функцией pthread_setschedparam(). Во-первых, возможно, что для ее успешного вызова процесс должен обладать соответствующими привилегиями. Во-вторых, реализация не обязана поддерживать динамический переход к политике спорадического планирования (SCHED_SPORADIC) (как, впрочем, и саму эту политику).

Если требуется изменить лишь приоритет планирования, не меняя политику, проще воспользоваться функцией pthread_setschedprio() (см.


листинг 1.12), которая, правда, является новой и в исторически сложившихся реализациях может отсутствовать.

#include <pthread.h> int pthread_setschedprio ( pthread_t thread, int prio);

Листинг 1.12. Описание функции pthread_setschedprio().

Сходную направленность, но более глобальный характер имеют функции pthread_getconcurrency() и pthread_setconcurrency() (см. листинг 1.13), позволяющие опросить и изменить уровень параллелизма выполнения потоков управления.

#include <pthread.h>

int pthread_getconcurrency (void);

int pthread_setconcurrency (int new_level);

Листинг 1.13. Описание функций pthread_getconcurrency() и pthread_setconcurrency().

По умолчанию операционная система предоставляет возможность параллельно проявлять активность некоему "достаточному числу" потоков управления в процессе, так, чтобы это не вело к перерасходу системных ресурсов. Некоторым приложениям, однако, может требоваться более высокий уровень параллелизма; это требование они могут передать ОС в виде значения аргумента new_level функции pthread_setconcurrency(). Впрочем, с точки зрения операционной системы это всего лишь просьба или рекомендация; стандарт не специфицирует реально устанавливаемый уровень.

Нулевое значение аргумента new_level означает переход к подразумеваемому уровню параллелизма, как если бы функция pthread_setconcurrency() ранее не вызывалась.

Функция pthread_getconcurrency() в качестве результата возвращает значение уровня параллелизма, установленное предыдущим вызовом pthread_setconcurrency(). Если такового не было, выдается нуль.

Отметим, что изменение уровня параллелизма не рекомендуется использовать при реализации библиотечных функций, так как это может конфликтовать с запросами приложений.

К числу атрибутов потока управления можно отнести обслуживающие его часы процессорного времени. Для выяснения их идентификатора достаточно обратиться к функции pthread_getcpuclockid() (см. листинг 1.14).

#include <pthread.h> #include <time.h>

int pthread_getcpuclockid ( pthread_t thread_id, clockid_t *clock_id);



Листинг 1.14. Описание функции pthread_getcpuclockid().

Еще один атрибут потока управления – маска блокированных сигналов. Поток может опросить и/или изменить ее посредством вызова функции pthread_sigmask() (см. листинг 1.15) – аналога рассмотренной в курсе [1] функции sigprocmask().

#include <signal.h> int pthread_sigmask ( int how, const sigset_t *restrict set, sigset_t *restrict oset);

Листинг 1.15. Описание функции pthread_sigmask().

На листинге 1.16 приведен пример программы, использующей большинство описанных выше функций для опроса и изменения атрибутов потоков управления. Возможные результаты работы этой программы показаны на листинге 1.17.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа опрашивает атрибуты потоков управления */ /* и изменяет некоторые из них */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#define _XOPEN_SOURCE 600

#include <stdio.h> #include <pthread.h> #include <errno.h> #include <assert.h>

int main (void) { pthread_t ct_id; /* Идентификатор текущего потока управления */ pthread_attr_t patob; /* Атрибутный объект для создания потоков управления */ int res; /* Переменная для запоминания результатов "потоковых" функций */ void *stackaddr; /* Начало стека как атрибут потока управления */ size_t atrsize; /* Размеры как атрибуты потока управления */ /* Структура с параметрами планирования */ struct sched_param shdprm; char *spname; /* Названия политики планирования, области */ /* планирования конкуренции и т.п. */

printf ("Идентификатор текущего потока управления: %lx\n", (ct_id = pthread_self ()));

if ((errno = pthread_attr_init (&patob)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); }

printf ("Значения, установленные системой " "в атрибутном объекте\n");

if ((errno = pthread_attr_getstack (&patob, &stackaddr, &atrsize)) != 0) { perror ("PTHREAD_ATTR_GETSTACK"); return (errno); } printf ("Адрес начала стека: %p\n", stackaddr); printf ("Размер стека: %d\n", atrsize);



assert (pthread_attr_getguardsize (&patob, &atrsize) == 0); printf ("Размер защитной области: %d\n", atrsize);

assert (pthread_attr_getschedparam (&patob, &shdprm) == 0); assert (pthread_attr_getschedpolicy (&patob, &res) == 0); switch (res) { case SCHED_FIFO: spname = "Планирование по очереди"; break; case SCHED_RR: spname = "Циклическое планирование"; break; case SCHED_OTHER: spname = "Прочее планирование"; break; default: spname = "Неизвестная политика планирования"; } printf ("Политика планирования: %s\n", spname); printf ("Приоритет планирования: %d\n", shdprm.sched_priority);

assert (pthread_attr_getscope (&patob, &res) == 0); switch (res) { case PTHREAD_SCOPE_SYSTEM: spname = "Система"; break; case PTHREAD_SCOPE_PROCESS: spname = "Процесс"; break; default: spname = "Неизвестная область планирования " "конкуренции"; } printf ("Область планирования конкуренции: %s\n", spname);

assert (pthread_attr_getinheritsched (&patob, &res) == 0); switch (res) { case PTHREAD_INHERIT_SCHED: spname = "Наследуются у родительского потока"; break; case PTHREAD_EXPLICIT_SCHED: spname = "Извлекаются из атрибутного объекта"; break; default: spname = "Устанавливаются неизвестным образом"; } printf ("Атрибуты планирования: %s\n", spname);

assert (pthread_attr_getdetachstate (&patob, &res) == 0); switch (res) { case PTHREAD_CREATE_JOINABLE: spname = "Присоединяемые"; break; case PTHREAD_CREATE_DETACHED: spname = "Обособленные"; break; default: spname = "Неизвестные"; } printf (" Потоки управления создаются как: %s\n", spname);

/* Изменим значения атрибутов планирования и уровня */ /* параллелизма */ shdprm.sched_priority = 1; if ((errno = pthread_setschedparam (ct_id, SCHED_RR, &shdprm)) != 0) { perror ("PTHREAD_SETSCHEDPARAM"); } if ((errno = pthread_setconcurrency (8192)) != 0) { perror ("PTHREAD_SETCONCURRENCY"); } printf ("\nТекущие значения атрибутов потоков управления\n");



assert (pthread_getschedparam (ct_id, &res, &shdprm) == 0); switch (res) { case SCHED_FIFO: spname = "Планирование по очереди"; break; case SCHED_RR: spname = "Циклическое планирование"; break; case SCHED_OTHER: spname = "Прочее планирование"; break; default: spname = "Неизвестная политика планирования"; } printf ("Политика планирования: %s\n", spname); printf ("Приоритет планирования: %d\n", shdprm.sched_priority); printf ("Уровень параллелизма: %d\n", pthread_getconcurrency());

return 0; }

Листинг 1.16. Пример программы, опрашивающей и изменяющей значения атрибутов потоков управления.

Идентификатор текущего потока управления: 400 Значения, установленные системой в атрибутном объекте Адрес начала стека: 0xffe01000 Размер стека: 2093056 Размер защитной области: 4096 Политика планирования: Прочее планирование Приоритет планирования: 0 Область планирования конкуренции: Система Атрибуты планирования: Извлекаются из атрибутного объекта Потоки управления создаются как: Присоединяемые

Текущие значения атрибутов потоков управления Политика планирования: Циклическое планирование Приоритет планирования: 1 Уровень параллелизма: 8192

Листинг 1.17. Возможные результаты работы программы, опрашивающей и изменяющей значения атрибутов потоков управления.

Листинг 1.18 содержит результаты выполнения упрощенного варианта этой же программы (без вызовов функций pthread_attr_getstack(), pthread_attr_getguardsize(), pthread_getconcurrency(), pthread_setconcurrency() и без соответствующих выдач) для операционной системы реального времени oc2000, соответствующей подмножеству требований стандарта POSIX-2001.

Идентификатор текущего потока управления: f31ae0 Значения, установленные системой в атрибутном объекте Политика планирования: Планирование по очереди Приоритет планирования: 100 Область планирования конкуренции: Процесс Атрибуты планирования: Извлекаются из атрибутного объекта Потоки управления создаются как: Присоединяемые

Текущие значения атрибутов потоков управления Политика планирования: Циклическое планирование Приоритет планирования: 1

Листинг 1.18. Возможные результаты работы программы, опрашивающей и изменяющей значения атрибутов потоков управления, для операционной системы реального времени oc2000.


Основные идеи, понятия и объекты


Напомним, уточним и дополним определения, которые были даны в курсе [1] применительно к потокам управления.

Процесс – это адресное пространство вместе с выполняемыми в нем потоками управления, а также системными ресурсами, которые этим потокам требуются.

После того, как процесс создан с помощью функции fork(), он считается активным. Сразу после создания в его рамках существует ровно один поток управления – копия того, что вызвал fork().

До завершения процесса в его рамках существуют по крайней мере один поток управления и адресное пространство.

Большинство атрибутов процесса разделяются существующими в его рамках потоками управления. К числу индивидуальных атрибутов относятся идентификатор, приоритет и политика планирования, значение переменной errno, ассоциированные с потоком управления пары ключ/значение (служащие для организации индивидуальных данных потока и доступа к ним), а также системные ресурсы, требующиеся для поддержки потока управления.

Идентификатор потока управления уникален в пределах процесса, но не системы в целом.

Идентификаторы потоков управления представлены значениями типа pthread_t, который трактуется в стандарте POSIX-2001 как абстрактный. В частности, для него определен метод сравнения значений на равенство.

Всем потокам управления одного процесса доступны все объекты, адреса которых могут быть определены потоком. В число таких объектов входят статические переменные, области динамической памяти, полученные от функции malloc(), прямоадресуемая память, полученная от системно-зависимых функций, автоматические переменные и т.д.

По отношению к потокам управления вводится понятие безопасных функций, которые можно вызывать параллельно в нескольких потоках без нарушения корректности их функционирования. К числу безопасных принадлежат "чистые" функции, а также функции, обеспечивающие взаимное исключение перед доступом к разделяемым объектам. Если в стандарте явно не оговорено противное, функция считается потоково-безопасной.

Выполняющимся (активным) называется поток управления, обрабатываемый в данный момент процессором.
В многопроцессорных конфигурациях может одновременно выполняться несколько потоков.

Поток управления считается готовым к выполнению, если он способен стать активным, но не может этого сделать из-за отсутствия свободного процессора.

Поток управления называется вытесненным, если его выполнение приостановлено из-за того, что другой поток с более высоким приоритетом стал готов к выполнению.

Поток управления считается блокированным, если для продолжения его выполнения должно стать истинным некоторое условие, отличное от доступности процессора.

Списком потоков управления называется упорядоченный набор равноприоритетных потоков, готовых к выполнению. Порядок потоков в списке определяется политикой планирования. Множество наборов включает все потоки в системе, готовые к выполнению.

Планированием, согласно стандарту POSIX-2001, называется применение политики выбора процесса или потока управления, готового к выполнению, для его перевода в число активных, а также политики изменения списков потоков управления.

Под политикой планирования понимается набор правил, используемых для определения порядка выполнения процессов или потоков управления для достижения некоторой цели.

Политика планирования воздействует на порядок процессов (потоков управления) по крайней мере в следующих ситуациях:

когда активный процесс (поток управления) блокируется или вытесняется; когда блокированный процесс (поток управления) становится готовым к выполнению.

Область планирования размещения – это набор процессоров, по отношению к которым в некоторый момент времени может планироваться поток управления.

Областью планирования конкуренции называется свойство потока управления, определяющее набор потоков, с которыми он конкурирует за ресурсы, например, за процессор. В стандарте POSIX-2001 предусмотрены две подобные области – PTHREAD_SCOPE_SYSTEM (конкуренция в масштабе системы) и PTHREAD_SCOPE_PROCESS (конкуренция в масшабе процесса).

Пожалуй, общей проблемой всех приложений является контроль переполнения стека.


Для ее решения стандартом POSIX- 2001 предусмотрено существование так называемой защитной области, расположенной за верхней границей стека. При переполнении и попадания указателя стека в защитную область операционная система должна фиксировать ошибку – например, доставлять потоку управления сигнал SIGSEGV.

С каждым потоком управления ассоциирован атрибутный объект – собрание атрибутов потока с конфигурируемыми значениями, таких как адрес и размер стека, параметры планирования и т.п. В стандарте POSIX-2001 атрибутные объекты представлены как значения абстрактного типа pthread_attr_t, внутренняя структура значений которого скрыта от приложений. Смысл введения атрибутных объектов – сгруппировать немобильные параметры потоков, чтобы облегчить адаптацию приложений к новым целевым платформам. Использование идеологии абстрактных объектов позволяет безболезненно добавлять новые атрибуты, не теряя обратной совместимости.

Обратим внимание на следующее обстоятельство, важное для реализации многопотоковых приложений. Иногда потоки управления называют легковесными процессами, поскольку они требуют существенно меньшей, чем обычные процессы, аппаратно-программной поддержки и, кроме того, их функционирование сопряжено с меньшими накладными расходами. С этой точки зрения потоками можно пользоваться более свободно, чем процессами. С другой стороны, потоки одного процесса никак не защищены друг от друга, они разделяют одно адресное пространство, поэтому, в отличие от полноценных процессов, ошибки в программе одного из них могут сказаться на других, породить ситуации, которые трудно воспроизвести, что делает поиск и исправление ошибок крайне сложными. В этом смысле легковесной можно назвать аналогию с процессами, основанную только на возможности параллельной работы. Она обманчива, поскольку из вида упускается очень важный аспект разделения доменов выполнения. Вообще говоря, многопотоковые приложения существенно менее надежны, чем многопроцессные, поэтому потоками управления следует пользоваться осторожно, систематически.



Операции с потоками управления можно подразделить на две группы:

создание, терминирование, выполнение других управляющих операций; синхронизация.

В таком порядке они и будут рассматриваться далее. Отметим, что стандарт POSIX-2001 относит их к необязательной части, именуемой, как нетрудно догадаться, "Потоки управления" ("Threads", THR).

Модель, принятая в стандарте применительно к созданию потоков управления, отличается от соответствующей модели для процессов. При создании нового потока задается функция, с вызова которой начнется его выполнение, то есть вместо пары вида fork()/exec() создающий поток должен обратиться лишь к одной функции – pthread_create(). Впрочем, как мы увидим далее, и для процессов в стандарте POSIX-2001 произошел отход от классических канонов – введены средства (функции семейства posix_spawn()) для порождения процессов "в один ход".

Потоки управления бывают обособленными (отсоединенными) и присоединяемыми; только последние доступны другим потокам для ожидания завершения и, быть может, утилизации освободившихся ресурсов. Ресурсы, освободившиеся после завершения обособленных потоков управления, утилизирует операционная система.

Поток управления можно терминировать изнутри и извне (из других потоков того же процесса). Поток может управлять состоянием восприимчивости к терминированию (разрешить/запретить собственное терминирование извне), а также специфицировать тип терминирования (отложенное или немедленное, асинхронное).

Отложенное терминирование происходит только по достижении потоком управления точек терминирования – мест в оговоренных в стандарте POSIX функциях, где поток должен отреагировать на ждущие запросы на терминирование (если оно разрешено), перед тем как его выполнение будет приостановлено на неопределенное время с сохранением состояния восприимчивости к терминированию.

Согласно стандарту POSIX-2001, точки терминирования имеются в таких функциях, как accept(), connect(), msgrcv(), msgsnd(), pause(), read(), sleep(), wait(), write() и сходных с ними по поведению.Допускается существование подобных точек и в других, также оговоренных в стандарте POSIX функциях – printf(), scanf(), semop() и т.п.


pthread_t pthread_self


#include <pthread.h> pthread_t pthread_self (void);
Листинг 1.1. Описание функции pthread_self().
Закрыть окно




#include <pthread.h> int pthread_equal ( pthread_t t1, pthread_t t2);
Листинг 1.2. Описание функции pthread_equal().
Закрыть окно




#include <pthread.h>
int pthread_attr_init ( pthread_attr_t *attr);
int pthread_attr_destroy ( pthread_attr_t *attr);
Листинг 1.3. Описание функций pthread_attr_init() и pthread_attr_destroy().
Закрыть окно




#include <pthread.h>
int pthread_attr_getstack ( const pthread_attr_t * restrict attr, void **restrict stackaddr, size_t *restrict stacksize);
int pthread_attr_setstack ( pthread_attr_t *attr, void *stackaddr, size_t stacksize);
Листинг 1.4. Описание функций pthread_attr_getstack() и pthread_attr_setstack().
Закрыть окно




#include <pthread.h>
int pthread_attr_getguardsize ( const pthread_attr_t * restrict attr, size_t *restrict guardsize);
int pthread_attr_setguardsize ( pthread_attr_t *attr, size_t guardsize);
Листинг 1.5. Описание функций pthread_attr_getguardsize() и pthread_attr_setguardsize().
Закрыть окно




#include <pthread.h>
int pthread_attr_getschedparam ( const pthread_attr_t *restrict attr, struct sched_param *restrict param);
int pthread_attr_setschedparam ( pthread_attr_t * restrict attr, const struct sched_param *restrict param);
Листинг 1.6. Описание функций pthread_attr_getschedparam() и pthread_attr_setschedparam().
Закрыть окно




#include <pthread.h>
int pthread_attr_getschedpolicy ( const pthread_attr_t * restrict attr, int *restrict policy);
int pthread_attr_setschedpolicy ( pthread_attr_t *attr, int policy);
Листинг 1.7. Описание функций pthread_attr_getschedpolicy() и pthread_attr_setschedpolicy().
Закрыть окно




#include <pthread.h>
int pthread_attr_getscope ( const pthread_attr_t * restrict attr, int *restrict contentionscope);
int pthread_attr_setscope ( pthread_attr_t *attr, int contentionscope);
Листинг 1.8. Описание функций pthread_attr_getscope() и pthread_attr_setscope().
Закрыть окно




#include <pthread.h>
int pthread_attr_getinheritsched ( const pthread_attr_t * restrict attr, int *restrict inheritsched);
int pthread_attr_setinheritsched ( pthread_attr_t *attr, int inheritsched);
Листинг 1.9. Описание функций pthread_attr_getinheritsched() и pthread_attr_setinheritsched().
Закрыть окно




#include <pthread.h>
int pthread_attr_getdetachstate ( const pthread_attr_t *attr, int *detachstate);
int pthread_attr_setdetachstate ( pthread_attr_t *attr, int detachstate);
Листинг 1.10. Описание функций pthread_attr_getdetachstate() и pthread_attr_setdetachstate().
Закрыть окно




#include <pthread.h>
int pthread_getschedparam ( pthread_t thread, int *restrict policy, struct sched_param *restrict param);
int pthread_setschedparam ( pthread_t thread, int policy, const struct sched_param *param);
Листинг 1.11. Описание функций pthread_getschedparam() и pthread_setschedparam().
Закрыть окно




#include <pthread.h> int pthread_setschedprio ( pthread_t thread, int prio);
Листинг 1.12. Описание функции pthread_setschedprio().
Закрыть окно




#include <pthread.h>
int pthread_getconcurrency (void);
int pthread_setconcurrency (int new_level);
Листинг 1.13. Описание функций pthread_getconcurrency() и pthread_setconcurrency().
Закрыть окно




#include <pthread.h> #include <time.h>
int pthread_getcpuclockid ( pthread_t thread_id, clockid_t *clock_id);
Листинг 1.14. Описание функции pthread_getcpuclockid().
Закрыть окно




#include <signal.h> int pthread_sigmask ( int how, const sigset_t *restrict set, sigset_t *restrict oset);
Листинг 1.15. Описание функции pthread_sigmask().
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа опрашивает атрибуты потоков управления */ /* и изменяет некоторые из них */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */
#define _XOPEN_SOURCE 600
#include <stdio.h> #include <pthread.h> #include <errno.h> #include <assert.h>
int main (void) { pthread_t ct_id; /* Идентификатор текущего потока управления */ pthread_attr_t patob; /* Атрибутный объект для создания потоков управления */ int res; /* Переменная для запоминания результатов "потоковых" функций */ void *stackaddr; /* Начало стека как атрибут потока управления */ size_t atrsize; /* Размеры как атрибуты потока управления */ /* Структура с параметрами планирования */ struct sched_param shdprm; char *spname; /* Названия политики планирования, области */ /* планирования конкуренции и т.п. */
printf ("Идентификатор текущего потока управления: %lx\n", (ct_id = pthread_self ()));
if ((errno = pthread_attr_init (&patob)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); }
printf ("Значения, установленные системой " "в атрибутном объекте\n");
if ((errno = pthread_attr_getstack (&patob, &stackaddr, &atrsize)) != 0) { perror ("PTHREAD_ATTR_GETSTACK"); return (errno); } printf ("Адрес начала стека: %p\n", stackaddr); printf ("Размер стека: %d\n", atrsize);
assert (pthread_attr_getguardsize (&patob, &atrsize) == 0); printf ("Размер защитной области: %d\n", atrsize);
assert (pthread_attr_getschedparam (&patob, &shdprm) == 0); assert (pthread_attr_getschedpolicy (&patob, &res) == 0); switch (res) { case SCHED_FIFO: spname = "Планирование по очереди"; break; case SCHED_RR: spname = "Циклическое планирование"; break; case SCHED_OTHER: spname = "Прочее планирование"; break; default: spname = "Неизвестная политика планирования"; } printf ("Политика планирования: %s\n", spname); printf ("Приоритет планирования: %d\n", shdprm.sched_priority);
assert (pthread_attr_getscope (&patob, &res) == 0); switch (res) { case PTHREAD_SCOPE_SYSTEM: spname = "Система"; break; case PTHREAD_SCOPE_PROCESS: spname = "Процесс"; break; default: spname = "Неизвестная область планирования " "конкуренции"; } printf ("Область планирования конкуренции: %s\n", spname);
assert (pthread_attr_getinheritsched (&patob, &res) == 0); switch (res) { case PTHREAD_INHERIT_SCHED: spname = "Наследуются у родительского потока"; break; case PTHREAD_EXPLICIT_SCHED: spname = "Извлекаются из атрибутного объекта"; break; default: spname = "Устанавливаются неизвестным образом"; } printf ("Атрибуты планирования: %s\n", spname);
assert (pthread_attr_getdetachstate (&patob, &res) == 0); switch (res) { case PTHREAD_CREATE_JOINABLE: spname = "Присоединяемые"; break; case PTHREAD_CREATE_DETACHED: spname = "Обособленные"; break; default: spname = "Неизвестные"; } printf ("Потоки управления создаются как: %s\n", spname);
/* Изменим значения атрибутов планирования и уровня */ /* параллелизма */ shdprm.sched_priority = 1; if ((errno = pthread_setschedparam (ct_id, SCHED_RR, &shdprm)) != 0) { perror ("PTHREAD_SETSCHEDPARAM"); } if ((errno = pthread_setconcurrency (8192)) != 0) { perror ("PTHREAD_SETCONCURRENCY"); } printf ("\nТекущие значения атрибутов потоков управления\n");
assert (pthread_getschedparam (ct_id, &res, &shdprm) == 0); switch (res) { case SCHED_FIFO: spname = "Планирование по очереди"; break; case SCHED_RR: spname = "Циклическое планирование"; break; case SCHED_OTHER: spname = "Прочее планирование"; break; default: spname = "Неизвестная политика планирования"; } printf ("Политика планирования: %s\n", spname); printf ("Приоритет планирования: %d\n", shdprm.sched_priority); printf ("Уровень параллелизма: %d\n", pthread_getconcurrency());
return 0; }
Листинг 1.16. Пример программы, опрашивающей и изменяющей значения атрибутов потоков управления.
Закрыть окно




Идентификатор текущего потока управления: 400 Значения, установленные системой в атрибутном объекте Адрес начала стека: 0xffe01000 Размер стека: 2093056 Размер защитной области: 4096 Политика планирования: Прочее планирование Приоритет планирования: 0 Область планирования конкуренции: Система Атрибуты планирования: Извлекаются из атрибутного объекта Потоки управления создаются как: Присоединяемые
Текущие значения атрибутов потоков управления Политика планирования: Циклическое планирование Приоритет планирования: 1 Уровень параллелизма: 8192
Листинг 1.17. Возможные результаты работы программы, опрашивающей и изменяющей значения атрибутов потоков управления.
Закрыть окно




Идентификатор текущего потока управления: f31ae0 Значения, установленные системой в атрибутном объекте Политика планирования: Планирование по очереди Приоритет планирования: 100 Область планирования конкуренции: Процесс Атрибуты планирования: Извлекаются из атрибутного объекта Потоки управления создаются как: Присоединяемые
Текущие значения атрибутов потоков управления Политика планирования: Циклическое планирование Приоритет планирования: 1
Листинг 1.18. Возможные результаты работы программы, опрашивающей и изменяющей значения атрибутов потоков управления, для операционной системы реального времени oc2000.
Закрыть окно




#include <pthread.h>
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once ( pthread_once_t *once_control_ptr, void (*init_routine) (void));
Листинг 1.19. Описание функции pthread_once().
Закрыть окно




#include <pthread.h>
int pthread_key_create ( pthread_key_t *key_ptr, void (*destructor) (void *));
int pthread_key_delete ( pthread_key_t key);
Листинг 1.20. Описание функций pthread_key_create() и pthread_key_delete().
Закрыть окно




#include <pthread.h>
void *pthread_getspecific ( pthread_key_t key);
int pthread_setspecific ( pthread_key_t key, const void *value);
Листинг 1.21. Описание функций pthread_getspecific() и pthread_setspecific().
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа запоминает в качестве индивидуальных данных */ /* потока управления время начала активных операций */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <sys/time.h> static pthread_key_t data_key; static pthread_once_t key_once = PTHREAD_ONCE_INIT;
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Деструктор индивидуальных данных, в роли которых */ /* выступает указатель на структуру типа timeval. */ /* Поскольку она не содержит указателей, достаточно */ /* освободить занимаемую ею память */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void data_destructor (void *p) { free (p); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция создания ключа индивидуальных данных, */ /* ассоциирующая с ним деструктор, освобождающий память */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void create_data_key (void) { (void) pthread_key_create (&data_key, data_destructor); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция инициализации индивидуальных данных. */ /* Запрашивает астрономическое время */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_func (void) { struct timeval *tmvl_ptr;
/* Запомним астрономическое время начала операций */ /* потока управления */ if ((tmvl_ptr = (struct timeval *) malloc (sizeof (struct timeval))) == NULL) { return (NULL); } (void) gettimeofday (tmvl_ptr, NULL);
/* Создадим ключ индивидуальных данных, перепоручив */ /* вызов pthread_key_create() функции pthread_once() */ (void) pthread_once (&key_once, create_data_key);
(void) pthread_setspecific (data_key, tmvl_ptr); return (tmvl_ptr); }
/* * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() вызывает функцию инициализации */ /* и запрашивает индивидуальные данные потока */ /* управления */ /* * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { struct timeval *tmvl_ptr;
if (start_func () == NULL) { return (1); }
if ((tmvl_ptr = (struct timeval *) pthread_getspecific (data_key)) != NULL) { printf ("Время начала операций потока управления: " "%ld сек, %ld мсек\n", tmvl_ptr->tv_sec, tmvl_ptr->tv_usec); } else { printf ("Отсутствуют индивидуальные данные потока " "управления.\n"); printf ("Время начала операций неизвестно\n"); return (2); }
return 0; }
Листинг 1.22. Пример программы, формирующей и опрашивающей индивидуальные данные потоков управления.
Закрыть окно




Время начала операций потока управления: 1075707670 сек, 584737 мсек
Листинг 1.23. Возможные результаты работы программы, формирующей и опрашивающей индивидуальные данные потоков управления.
Закрыть окно




#include <pthread.h> int pthread_create ( pthread_t * restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine) (void *), void *restrict arg);
Листинг 1.24. Описание функции pthread_create().
Закрыть окно




#include <pthread.h> int pthread_atfork ( void (*prepare) (void), void (*parent) (void), void (*child) (void));
Листинг 1.25. Описание функции pthread_atfork().
Закрыть окно




#include <pthread.h> void pthread_exit (void *value_ptr);
Листинг 1.26. Описание функции pthread_exit().
Закрыть окно




#include <pthread.h> int pthread_join ( pthread_t thread, void **value_ptr_ptr);
Листинг 1.27. Описание функции pthread_join().
Закрыть окно




#include <pthread.h>
void pthread_cleanup_push ( void (*routine) (void *), void *arg);
void pthread_cleanup_pop (int execute);
Листинг 1.28. Описание функций pthread_cleanup_push() и pthread_cleanup_pop().
Закрыть окно




#define pthread_cleanup_push (rtn, arg) { \ struct _pthread_handler_rec \ __cleanup_handler, \ **__head; \ __cleanup_handler.rtn = rtn; \ __cleanup_handler.arg = arg; \ (void) pthread_getspecific \ (_pthread_handler_key, &__head); \ __cleanup_handler.next = *__head; \ *__head = &__cleanup_handler;
#define pthread_cleanup_pop (ex) \ *__head = __cleanup_handler.next; \ if (ex) (*__cleanup_handler.rtn) \ (__cleanup_handler.arg); \ }
Листинг 1.29. Возможная реализация функций pthread_cleanup_push() и pthread_cleanup_pop() как макросов.
Закрыть окно




#include <pthread.h> int pthread_cancel (pthread_t thread);
Листинг 1.30. Описание функции pthread_cancel().
Закрыть окно




#include <pthread.h>
int pthread_setcancelstate ( int state, int *oldstate);
int pthread_setcanceltype ( int type, int *oldtype);
void pthread_testcancel (void);
Листинг 1.31. Описание функций pthread_setcancelstate(), pthread_setcanceltype(), pthread_testcancel().
Закрыть окно




#include <signal.h> int pthread_kill ( pthread_t thread, int sig);
Листинг 1.32. Описание функции pthread_kill().
Закрыть окно




/* * * * * * * * * * * * * * * * * */ /* Программа демонстрирует генерацию */ /* и доставку сигналов */ /* потокам управления */ /* * * * * * * * * * * * * * * * * */
#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <signal.h> #include <errno.h>
/* * * * * * * * * * * * * * / /* Функция обработки сигнала */ /* * * * * * * * * * * * * * / static void signal_handler (int dummy) { printf ("Идентификатор потока, обрабатывающего сигнал: %lx\n", pthread_self ()); }
/* * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока управления, */ /* которому будет направлен сигнал */ /* * * * * * * * * * * * * * * * * * * */ static void *thread_start (void *dummy) { printf ("Идентификатор нового потока управления: %lx\n", pthread_self ()); while (1) { sleep (1); }
return (NULL); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGINT, */ /* создает поток управления и посылает ему сигнал */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t thread_id; struct sigaction act;
/* Установим реакцию на сигнал SIGINT */ act.sa_handler = signal_handler; (void) sigemptyset (&act.sa_mask); act.sa_flags = 0; (void) sigaction (SIGINT, &act, (struct sigaction *) NULL);
if ((errno = pthread_create (&thread_id, NULL, thread_start, NULL)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } printf ("Идентификатор созданного потока управления: %lx\n", thread_id);
(void) pthread_kill (thread_id, SIGINT); printf ("После вызова pthread_kill()\n");
sleep (1); printf ("Выспались...\n");
return (0); }
Листинг 1.33. Пример использования механизма сигналов в многопотоковой программе.
Закрыть окно




Идентификатор созданного потока управления: 402 После вызова pthread_kill() Идентификатор потока, обрабатывающего сигнал: 402 Идентификатор нового потока управления: 402 Выспались...
Листинг 1.34. Возможные результаты работы многопотоковой программы, использующей механизм сигналов.
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа демонстрирует взаимодействие сигналов */ /* и ожидания завершения потока управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <stdio.h> #include <pthread.h> #include <signal.h> #include <errno.h>
/* * * * * * * * * * * * * * */ /* Функция обработки сигнала */ /* * * * * * * * * * * * * * */ static void signal_handler (int dummy) { printf ("Идентификатор потока, обрабатывающего сигнал: %lx\n", pthread_self ()); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция создаваемого потока управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void *thread_start (void *thread_id) { printf ("Идентификатор нового потока управления: %lx\n", pthread_self ()); (void) pthread_kill ((pthread_t) thread_id, SIGINT);
return ((void *) pthread_self ()); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGINT, */ /* создает поток управления и ожидает его завершения */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t thread_id; struct sigaction act; void *pv;
/* Установим реакцию на сигнал SIGINT */ act.sa_handler = signal_handler; (void) sigemptyset (&act.sa_mask); act.sa_flags = 0; (void) sigaction (SIGINT, &act, (struct sigaction *) NULL);
if ((errno = pthread_create (&thread_id, NULL, thread_start, (void *) pthread_self ())) != 0) { perror ("PTHREAD_CREATE"); return (errno); } printf ("Идентификаторы начального и созданного потоков " "управления: " "%lx %lx\n", pthread_self (), thread_id);
/* Дождемся завершения созданного потока управления */ if ((errno = pthread_join (thread_id, &pv)) != 0) { perror ("PTHREAD_JOIN"); return (errno); } printf ("Статус завершения созданного потока " "управления: %p\n", pv);
return (0); }
Листинг 1.35. Пример программы, обрабатывающей сигнал во время ожидания завершения потока управления.
Закрыть окно




Идентификаторы начального и созданного потоков управления: 400 402 Идентификатор нового потока управления: 402 Идентификатор потока, обрабатывающего сигнал: 400 Статус завершения созданного потока управления: 0x402
Листинг 1.36. Возможные результаты работы программы, обрабатывающей сигнал во время ожидания завершения потока управления.
Закрыть окно




#include <pthread.h> int pthread_detach (pthread_t thread);
Листинг 1.37. Описание функции pthread_detach().
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа процесса (будем называть его серверным), */ /* принимающего запросы на установления соединения и */ /* запускающего потоки управления для их обслуживания */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <stdio.h> #include <netdb.h> #include <pthread.h> #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h>
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая (и единственная) функция потоков управления, */ /* обслуживающих запросы на копирование строк, */ /* поступающих из сокета */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *srv_thread_start (void *ad) { FILE *fpad; /* Поток данных, соответствующий */ /* дескриптору ad */ char line [LINE_MAX]; /* Буфер для принимаемых строк */ /* Структура для записи адреса */ struct sockaddr_in sai; /* Длина адреса */ socklen_t sai_len = sizeof (struct sockaddr_in);
/* Опросим адрес партнера по общению (передающего сокета) */ if (getpeername ((int) ad, (struct sockaddr *) &sai, &sai_len) < 0) { perror ("GETPEERNAME"); return (NULL); }
/* По файловому дескриптору ad сформируем */ /* буферизованный поток данных */ if ((fpad = fdopen ((int) ad, "r")) == NULL) { perror ("FDOPEN"); return (NULL); }
/* Цикл чтения строк из сокета */ /* и выдачи их на стандартный вывод */ while (fgets (line, sizeof (line), fpad) != NULL) { printf ("Вы ввели и отправили с адреса %s, " "порт %d :", inet_ntoa (sai.sin_addr), ntohs (sai.sin_port)); fputs (line, stdout); }
/* Закрытие соединения */ shutdown ((int) ad, SHUT_RD); (void) fclose (fpad);
return (NULL); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* В функции main() принимаются запросы на установление */ /* соединения и запускаются потоки управления для их обслуживания */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { int sd; /* Дескриптор слушающего сокета */ int ad; /* Дескриптор приемного сокета */ /* Буфер для принимаемых строк */ struct addrinfo hints = {AI_PASSIVE, AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; /* Указатель – выходной аргумент getaddrinfo */ struct addrinfo *addr_res; int res; /* Результат getaddrinfo */ pthread_attr_t patob; /* Атрибутный объект для создания */ /* потоков управления */ pthread_t adt_id; /* Идентификатор обслуживающего потока управления */
/* Создадим слушающий сокет */ if ((sd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { perror ("SOCKET"); return (1); }
/* Привяжем этот сокет к адресу сервиса spooler */ /* на локальном хосте */ if ((res = getaddrinfo (NULL, "spooler", &hints, &addr_res)) != 0) { fprintf (stderr, "GETADDRINFO: %s\n", gai_strerror (res)); return (2); } if (bind (sd, addr_res->ai_addr, addr_res->ai_addrlen) < 0) { perror ("BIND"); return (3); }
/* Можно освободить память, которую запрашивала */ /* функция getaddrinfo() */ freeaddrinfo (addr_res);
/* Пометим сокет как слушающий */ if (listen (sd, SOMAXCONN) < 0) { perror ("LISTEN"); return (4); }
/* Инициализируем атрибутный объект потоков управления */ if ((errno = pthread_attr_init (&patob)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); } /* Потоки управления будем создавать обособленными */ (void) pthread_attr_setdetachstate (&patob, PTHREAD_CREATE_DETACHED);
/* Цикл приема соединений и запуска */ /* обслуживающих потоков управления */ while (1) { /* Примем соединение. */ /* Адрес партнера по общению нас */ /* в данном случае не интересует */ if ((ad = accept (sd, NULL, NULL)) < 0) { perror ("ACCEPT"); return (6); }
/* Запустим обслуживающий поток управления */ if ((errno = pthread_create (&adt_id, &patob, srv_thread_start,(void *) ad)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } }
return (0); }
Листинг 1.38. Пример многопотоковой программы, обслуживающей запросы на копирование строк, поступающих через сокеты.
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа вызывает функции обработки в рамках */ /* порождаемых потоков управления и контролирует время */ /* их выполнения с помощью интервального таймера */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <stdio.h> #include <pthread.h> #include <sys/time.h> #include <signal.h> #include <errno.h>
/* Период интервального таймера (в секундах) */ #define IT_PERIOD 1
static pthread_t cthread_id; /* Идентификатор текущего */ /* потока управления, */ /* обрабатывающего данные */
static int in_proc_data = 0; /* Признак активности */ /* потока обработки данных */
static double s; /* Результат функций */ /* обработки данных */
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция обработки срабатывания таймера реального */ /* времени (сигнал SIGALRM) */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_sigalrm (int dummy) { if (in_proc_data) { /* Не имеет значения, какой поток обрабатывает сигнал */ /* и заказывает терминирование (быть может, себя) */ (void) pthread_cancel (cthread_id); } }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Обработчик завершения потока управления. */ /* Сбрасывает признак активности потока обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_cleanup_handler (void *arg) { in_proc_data = (int) arg; }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока управления, обрабатывающего */ /* данные. Аргумент – указатель на функцию обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_func (void *proc_data_func) { /* Поместим в стек обработчик завершения */ pthread_cleanup_push (proc_data_cleanup_handler, 0); in_proc_data = 1; /* Время пошло ... */
/* На время выполнения функции обработки данных установим */ /* асинхронный тип терминирования, иначе оно не сработает */ (void) pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
/* Выполним функцию обработки данных */ ((void (*) (void)) (proc_data_func)) ();
/* Установим отложенный тип терминирования, */ /* иначе изъятие обработчика из стека */ /* будет небезопасным действием */ (void) pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
/* Выполним обработчик завершения и удалим его из стека */ pthread_cleanup_pop (1); return (NULL); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Первая функция обработки данных (вычисляет ln (2)) */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_1 (void) { double d = 1; int i;
s = 0; for (i = 1; i <= 100000000; i++) { s += d / i; d = -d; } }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Вторая функция обработки данных (вычисляет sqrt (2))*/ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_2 (void) { s = 1; do { s = (s + 2 / s) * 0.5; } while ((s * s – 2) > 0.000000001); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGALRM, */ /* взводит периодический таймер реального времени */ /* и запускает в цикле потоки обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { /* Массив указателей на функции обработки данных */ void (*fptrs []) (void) = {proc_data_1, proc_data_2, NULL}; /* Указатель на указатель на */ /* текущую функцию обработки данных */ void (**tfptr) (void); void *pstat; /* Статус завершения потока */ /* обработки данных */ struct itimerval itvl; struct sigaction sact; int i;
/* Установим реакцию на сигнал SIGALRM */ sact.sa_handler = proc_sigalrm; sact.sa_flags = 0; (void) sigemptyset (&sact.sa_mask); if (sigaction (SIGALRM, &sact, NULL) < 0) { perror ("SIGACTION"); return (1); }
/* Сделаем таймер реального времени периодическим */ itvl.it_interval.tv_sec = IT_PERIOD; itvl.it_interval.tv_usec = 0;
/* Цикл запуска потоков обработки данных. */ /* Выполним его дважды */ for (i = 0; i < 2; i++) { for (tfptr = fptrs; *tfptr != NULL; tfptr++) { /* Взведем интервальный таймер реального времени */ itvl.it_value.tv_sec = IT_PERIOD; itvl.it_value.tv_usec = 0; if (setitimer (ITIMER_REAL, &itvl, NULL) < 0) { perror ("SETITIMER"); return (2); }
/* Создадим поток обработки данных, */ /* затем дождемся его завершения */ if ((errno = pthread_create (&cthread_id, NULL, start_func, (void *) *tfptr)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } if ((errno = pthread_join (cthread_id, &pstat)) != 0) { perror ("PTHREAD_JOIN"); return (errno); }
if (pstat == PTHREAD_CANCELED) { printf ("Частичный результат функции " "обработки данных: %g\n", s); } else { printf ("Полный результат функции " "обработки данных: %g\n", s); } } }
return 0; }
Листинг 1.39. Пример многопотоковой программы, осуществляющей обработку данных с контролем времени.
Закрыть окно




Частичный результат функции обработки данных: 0.693147 Полный результат функции обработки данных: 1.41421 Частичный результат функции обработки данных: 0.693147 Полный результат функции обработки данных: 1.41421
Листинг 1.40. Возможные результаты работы многопотоковой программы, осуществляющей обработку данных с контролем времени.
Закрыть окно




#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <sys/wait.h>
#define N 10000
int main (void) { int i;
for (i = 0; i < N; i++) { switch (fork ()) { case -1: perror ("FORK"); return (1); case 0: /* Порожденный процесс */ (void) execl ("./dummy", "dummy", (char *) 0); exit (0); default: /* Родительский процесс */ (void) wait (NULL); } }
return 0; }
Листинг 1.41. Пример программы, порождающей в цикле практически пустые процессы.
Закрыть окно




int main (void) { return 0; }
Листинг 1.42. Содержимое файла dummy.c
Закрыть окно




real 34.97 user 12.36 sys 22.61
Листинг 1.43. Возможные результаты измерения времени работы программы, порождающей в цикле практически пустые процессы (вариант с вызовом execl()).
Закрыть окно




real 11.49 user 2.38 sys 9.11
Листинг 1.44. Возможные результаты измерения времени работы программы, порождающей в цикле практически пустые процессы (вариант без вызова execl()).
Закрыть окно




#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <errno.h>
#define N 10000
static void *thread_start (void *arg) { pthread_exit (arg); }
int main (void) { pthread_t thread_id; int i;
for (i = 0; i < N; i++) { if ((errno = pthread_create ( &thread_id, NULL, thread_start, NULL)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } if ((errno = pthread_join ( thread_id, NULL)) != 0) { perror ("PTHREAD_JOIN"); return (errno); } }
return (0); }
Листинг 1.45. Пример программы, порождающей в цикле потоки управления.
Закрыть окно




real 2.08 user 0.52 sys 1.56
Листинг 1.46. Возможные результаты измерения времени работы программы, порождающей в цикле потоки управления.
Закрыть окно



Работа с индивидуальными данными потоков управления


Все потоки управления одного процесса разделяют общее адресное пространство и, следовательно, имеют общие данные. Чтобы сделать некоторые данные индивидуальными для потока, нужно принять специальные меры: с помощью функции pthread_key_create() создать ключ и ассоциировать с ним индивидуальные данные, воспользовавшись функцией pthread_setspecific(). В дальнейшем эти данные можно извлекать посредством функции pthread_getspecific(). Подчеркнем, что при обращении по одному (разделяемому) ключу разные потоки будут получать доступ к разным данным.

Создать один ключ, очевидно, нужно один раз. В ситуации, когда несколько потоков управления выполняют одну программу, это сопряжено с определенными проблемами. Стандартный прием, заключающийся во введении статической переменной, хранящей признак инициализированности (в данном случае – признак того, что ключ уже создан) в многопотоковой среде не работает, поскольку проверка и последующее изменение значения подобной переменной не являются атомарным действием. В принципе, манипуляции со статической переменной можно защитить каким-либо средством синхронизации, но его тоже нужно инициализировать!

Для решения проблемы однократного выполнения инициализирующих действий в многопотоковой среде стандарт POSIX-2001 предлагает функцию pthread_once() (см. листинг 1.19).

#include <pthread.h>

pthread_once_t once_control = PTHREAD_ONCE_INIT;

int pthread_once ( pthread_once_t *once_control_ptr, void (*init_routine) (void));

Листинг 1.19. Описание функции pthread_once(). (html, txt)

При первом и только при первом обращении к функции pthread_once() с фиксированным значением аргумента once_control_ptr, вне зависимости от того, какой из потоков процесса его выполняет, будет вызвана функция (*init_routine) (), которая по идее должна осуществлять инициализирующие действия, такие как создание ключа индивидуальных данных потоков управления.

Переменная, на которую указывает аргумент once_control_ptr, должна иметь начальное значение PTHREAD_ONCE_INIT и не должна быть автоматической.


Все потоки управления одного процесса разделяют общее адресное пространство и, следовательно, имеют общие данные. Чтобы сделать некоторые данные индивидуальными для потока, нужно принять специальные меры: с помощью функции pthread_key_create() создать ключ и ассоциировать с ним индивидуальные данные, воспользовавшись функцией pthread_setspecific(). В дальнейшем эти данные можно извлекать посредством функции pthread_getspecific(). Подчеркнем, что при обращении по одному (разделяемому) ключу разные потоки будут получать доступ к разным данным.

Создать один ключ, очевидно, нужно один раз. В ситуации, когда несколько потоков управления выполняют одну программу, это сопряжено с определенными проблемами. Стандартный прием, заключающийся во введении статической переменной, хранящей признак инициализированности (в данном случае – признак того, что ключ уже создан) в многопотоковой среде не работает, поскольку проверка и последующее изменение значения подобной переменной не являются атомарным действием. В принципе, манипуляции со статической переменной можно защитить каким-либо средством синхронизации, но его тоже нужно инициализировать!

Для решения проблемы однократного выполнения инициализирующих действий в многопотоковой среде стандарт POSIX-2001 предлагает функцию pthread_once() (см. листинг 1.19).

#include <pthread.h>

pthread_once_t once_control = PTHREAD_ONCE_INIT;

int pthread_once ( pthread_once_t *once_control_ptr, void (*init_routine) (void));

Листинг 1.19. Описание функции pthread_once().

При первом и только при первом обращении к функции pthread_once() с фиксированным значением аргумента once_control_ptr, вне зависимости от того, какой из потоков процесса его выполняет, будет вызвана функция (*init_routine) (), которая по идее должна осуществлять инициализирующие действия, такие как создание ключа индивидуальных данных потоков управления.

Переменная, на которую указывает аргумент once_control_ptr, должна иметь начальное значение PTHREAD_ONCE_INIT и не должна быть автоматической.




За создание и удаление ключа индивидуальных данных потоков управления, согласно стандарту POSIX-2001, отвечают функции pthread_key_create() и pthread_key_delete() (см. листинг 1.20).

#include <pthread.h>

int pthread_key_create ( pthread_key_t *key_ptr, void (*destructor) (void *));

int pthread_key_delete ( pthread_key_t key);

Листинг 1.20. Описание функций pthread_key_create() и pthread_key_delete(). (html, txt)

Функция pthread_key_create() создает новый ключ, которым могут воспользоваться все входящие в процесс потоки управления, и, в соответствии со "штабной дисциплиной", помещает его по указателю key_ptr. Сразу после создания ключа для всех потоков в качестве индивидуальных данных с ним ассоциируется значение NULL. (Аналогично, после создания потока управления он не имеет индивидуальных данных, поэтому со всеми доступными ему ключами также ассоциированы пустые указатели.)

С ключом может быть ассоциирован деструктор, который вызывается при завершении потока управления с индивидуальными данными в качестве аргумента. Обычно в роли индивидуальных данных выступает указатель на динамически зарезервированную потоком область памяти, которую деструктор должен освободить.

Функция pthread_key_delete() удаляет заданный ключ, не заботясь о том, пусты ли ассоциированные значения, и не вызывая каких-либо деструкторов (напротив, ее обычно вызывают из деструктора). Вся ответственность по освобождению памяти и выполнению других необходимых зачисток возлагается на приложение.

Для выборки и изменения ассоциированных с ключом key индивидуальных данных вызывающего потока управления предназначены функции pthread_getspecific() и pthread_setspecific() (см. листинг 1.21).

#include <pthread.h>

void *pthread_getspecific ( pthread_key_t key);

int pthread_setspecific ( pthread_key_t key, const void *value);

Листинг 1.21. Описание функций pthread_getspecific() и pthread_setspecific(). (html, txt)

Функция pthread_getspecific() возвращает индивидуальные данные в качестве результата; функция pthread_setspecific() ассоциирует с ключом key значение аргумента value.

На листинге 1.22 показана программа, использующая стандартную схему создания ключа и манипулирования индивидуальными данными потоков управления.

Листинг 1.22. Пример программы, формирующей и опрашивающей индивидуальные данные потоков управления. (html, txt)

Результат работы этой программы может выглядеть так, как показано на листинге 1.23.

Время начала операций потока управления: 1075707670 сек, 584737 мсек

Листинг 1.23. Возможные результаты работы программы, формирующей и опрашивающей индивидуальные данные потоков управления. (html, txt)



За создание и удаление ключа индивидуальных данных потоков управления, согласно стандарту POSIX-2001, отвечают функции pthread_key_create() и pthread_key_delete() (см. листинг 1.20).

#include <pthread.h>

int pthread_key_create ( pthread_key_t *key_ptr, void (*destructor) (void *));

int pthread_key_delete ( pthread_key_t key);

Листинг 1.20. Описание функций pthread_key_create() и pthread_key_delete().

Функция pthread_key_create() создает новый ключ, которым могут воспользоваться все входящие в процесс потоки управления, и, в соответствии со "штабной дисциплиной", помещает его по указателю key_ptr. Сразу после создания ключа для всех потоков в качестве индивидуальных данных с ним ассоциируется значение NULL. (Аналогично, после создания потока управления он не имеет индивидуальных данных, поэтому со всеми доступными ему ключами также ассоциированы пустые указатели.)

С ключом может быть ассоциирован деструктор, который вызывается при завершении потока управления с индивидуальными данными в качестве аргумента. Обычно в роли индивидуальных данных выступает указатель на динамически зарезервированную потоком область памяти, которую деструктор должен освободить.

Функция pthread_key_delete() удаляет заданный ключ, не заботясь о том, пусты ли ассоциированные значения, и не вызывая каких-либо деструкторов (напротив, ее обычно вызывают из деструктора). Вся ответственность по освобождению памяти и выполнению других необходимых зачисток возлагается на приложение.

Для выборки и изменения ассоциированных с ключом key индивидуальных данных вызывающего потока управления предназначены функции pthread_getspecific() и pthread_setspecific() (см. листинг 1.21).

#include <pthread.h>

void *pthread_getspecific ( pthread_key_t key);

int pthread_setspecific ( pthread_key_t key, const void *value);

Листинг 1.21. Описание функций pthread_getspecific() и pthread_setspecific().

Функция pthread_getspecific() возвращает индивидуальные данные в качестве результата; функция pthread_setspecific() ассоциирует с ключом key значение аргумента value.



На листинге 1. 22 показана программа, использующая стандартную схему создания ключа и манипулирования индивидуальными данными потоков управления.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа запоминает в качестве индивидуальных данных */ /* потока управления время начала активных операций */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <sys/time.h> static pthread_key_t data_key; static pthread_once_t key_once = PTHREAD_ONCE_INIT;

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Деструктор индивидуальных данных, в роли которых */ /* выступает указатель на структуру типа timeval. */ /* Поскольку она не содержит указателей, достаточно */ /* освободить занимаемую ею память */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void data_destructor (void *p) { free (p); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция создания ключа индивидуальных данных, */ /* ассоциирующая с ним деструктор, освобождающий память */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void create_data_key (void) { (void) pthread_key_create (&data_key, data_destructor); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция инициализации индивидуальных данных. */ /* Запрашивает астрономическое время */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_func (void) { struct timeval *tmvl_ptr;

/* Запомним астрономическое время начала операций */ /* потока управления */ if ((tmvl_ptr = (struct timeval *) malloc (sizeof (struct timeval))) == NULL) { return (NULL); } (void) gettimeofday (tmvl_ptr, NULL);

/* Создадим ключ индивидуальных данных, перепоручив */ /* вызов pthread_key_create() функции pthread_once() */ (void) pthread_once (&key_once, create_data_key);

(void) pthread_setspecific (data_key, tmvl_ptr); return (tmvl_ptr); }

/* * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() вызывает функцию инициализации */ /* и запрашивает индивидуальные данные потока */ /* управления */ /* * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { struct timeval *tmvl_ptr;



if (start_func () == NULL) { return (1); }

if ((tmvl_ptr = (struct timeval *) pthread_getspecific (data_key)) != NULL) { printf (" Время начала операций потока управления: " "%ld сек, %ld мсек\n", tmvl_ptr->tv_sec, tmvl_ptr->tv_usec); } else { printf ("Отсутствуют индивидуальные данные потока " "управления.\n"); printf ("Время начала операций неизвестно\n"); return (2); }

return 0; }

Листинг 1.22. Пример программы, формирующей и опрашивающей индивидуальные данные потоков управления.

Результат работы этой программы может выглядеть так, как показано на листинге 1.23.

Время начала операций потока управления: 1075707670 сек, 584737 мсек

Листинг 1.23. Возможные результаты работы программы, формирующей и опрашивающей индивидуальные данные потоков управления.


Создание и терминирование потоков управления


Для создания нового потока управления служит функция pthread_create() (см. листинг 1.24).

#include <pthread.h> int pthread_create ( pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine) (void *), void *restrict arg);

Листинг 1.24. Описание функции pthread_create(). (html, txt)

Выполнение созданного потока управления начнется с вызова (*start_routine) (arg); возврат из этой функции приведет к терминированию потока с возвращаемым значением в качестве статуса завершения. Из этого правила стандартом предусмотрено одно, вполне естественное исключение: для потока, выполнение которого началось с функции main(), возврат из нее означает завершение процесса, содержащего поток, со всеми вытекающими отсюда последствиями.

Аргумент attr задает атрибуты нового потока; если значение attr равно NULL, используются зависящие от реализации подразумеваемые атрибуты.

От "родительского" вновь созданный поток управления наследует немногое: маску сигналов и характеристики вещественной арифметики.

К числу средств создания потоков можно отнести и функцию fork(). Правда, здесь нас будет интересовать не она сама, а ассоциированные с ней обработчики, зарегистрированные с помощью функции pthread_atfork() (см. листинг 1.25).

#include <pthread.h> int pthread_atfork ( void (*prepare) (void), void (*parent) (void), void (*child) (void));

Листинг 1.25. Описание функции pthread_atfork(). (html, txt)

В каждом обращении к pthread_atfork() фигурируют три обработчика (если, конечно, в качестве значения аргумента не задан пустой указатель). Первый ((*prepare)()) выполняется в контексте потока, вызвавшего fork(), до разветвления процесса; второй ((*parent)()) – в том же контексте, но после разветвления; третий ((*child)()) – в контексте единственного потока порожденного процесса.

С помощью pthread_atfork() можно зарегистрировать несколько троек обработчиков. Первые элементы троек вызываются в порядке, обратном по отношению к регистрации; вторые и третьи выполняются в прямом порядке.


Для создания нового потока управления служит функция pthread_create() (см. листинг 1.24).

#include <pthread.h> int pthread_create ( pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine) (void *), void *restrict arg);

Листинг 1.24. Описание функции pthread_create().

Выполнение созданного потока управления начнется с вызова (*start_routine) (arg); возврат из этой функции приведет к терминированию потока с возвращаемым значением в качестве статуса завершения. Из этого правила стандартом предусмотрено одно, вполне естественное исключение: для потока, выполнение которого началось с функции main(), возврат из нее означает завершение процесса, содержащего поток, со всеми вытекающими отсюда последствиями.

Аргумент attr задает атрибуты нового потока; если значение attr равно NULL, используются зависящие от реализации подразумеваемые атрибуты.

От "родительского" вновь созданный поток управления наследует немногое: маску сигналов и характеристики вещественной арифметики.

К числу средств создания потоков можно отнести и функцию fork(). Правда, здесь нас будет интересовать не она сама, а ассоциированные с ней обработчики, зарегистрированные с помощью функции pthread_atfork() (см. листинг 1.25).

#include <pthread.h> int pthread_atfork ( void (*prepare) (void), void (*parent) (void), void (*child) (void));

Листинг 1.25. Описание функции pthread_atfork().

В каждом обращении к pthread_atfork() фигурируют три обработчика (если, конечно, в качестве значения аргумента не задан пустой указатель). Первый ((*prepare)()) выполняется в контексте потока, вызвавшего fork(), до разветвления процесса; второй ((*parent)()) – в том же контексте, но после разветвления; третий ((*child)()) – в контексте единственного потока порожденного процесса.

С помощью pthread_atfork() можно зарегистрировать несколько троек обработчиков. Первые элементы троек вызываются в порядке, обратном по отношению к регистрации; вторые и третьи выполняются в прямом порядке.

Как и процесс, поток управления можно терминировать изнутри и извне. С одним, неявным, но наиболее естественным способом "самоликвидации" – выходом из стартовой функции потока – мы уже познакомились. Тот же эффект достигается вызовом функции pthread_exit() (см. листинг 1.26).

#include <pthread.h> void pthread_exit (void *value_ptr);

Листинг 1.26. Описание функции pthread_exit().

Терминирование потока управления в идейном плане существенно сложнее создания. Чтобы осветить все тонкости, необходимо ввести несколько новых понятий и описать целый ряд функций. Пока мы укажем лишь, что терминирование последнего потока в процессе вызывает его (процесса) завершение с нулевым кодом.

Из общих соображений (например, если исходить из аналогии между процессами и потоками управления) очевидно, что должна существовать возможность дождаться завершения заданного потока управления. Эта возможность реализуется функцией pthread_join() (см. листинг 1.27), напоминающей waitpid().

#include <pthread.h> int pthread_join ( pthread_t thread, void **value_ptr_ptr);

Листинг 1.27. Описание функции pthread_join().

Поток управления, вызвавший функцию pthread_join(), приостанавливает выполнение до завершения потока, идентификатор которого задан аргументом thread. При успешном возврате из pthread_join() результат, как и положено, равен нулю, а по указателю value_ptr_ptr (если он не пуст) помещается значение (указатель value_ptr), переданное в качестве аргумента функции pthread_exit(). Тем самым ждуший поток получает данные о статусе завершения ожидаемого.

Отметим, что трактовка значения value_ptr возлагается на приложение. Например, оно может считать его целым числом, а не указателем; по этой причине операционная система при выполнении функции pthread_exit() не вправе выдавать ошибку типа "неверный адрес" каким бы ни был аргумент value_ptr. Если он все же является указателем, то ему нельзя присваивать адрес автоматической переменной, поскольку к моменту его использования ожидаемый поток уже завершится и состояние его автоматических переменных станет неопределенным.

Второе общее соображение касается того обстоятельства, что вызов такой функции, как ожидание завершения (pthread_join()) способен приостановить выполнение вызывающего потока управления на неопределенное время, в течение которого ему может быть доставлен обрабатываемый сигнал. Как правило, в подобных ситуациях выполнение функций (таких, например, как read()) завершается с частично достигнутым результатом (например, с числом прочитанных байт, меньшим запрошенного) и кодом ошибки EINTR, нуждающимся в нестандартной обработке, далеко не всегда реализуемой разработчиками приложений. Из-за этого в программах появляются дефекты, которые трудно воспроизвести и, соответственно, исправить.

Согласно стандарту POSIX-2001, функции, обслуживающие потоки управления, свободны от этого недостатка. Они никогда не завершаются с частичным результатом и не выдают код ошибки EINTR. Восстановление нормального состояния после того, как ожидание было прервано доставкой и обработкой сигнала, возлагается на операционную систему, а не на приложение.

Третье общее соображение состоит в том, что такое критически важное событие, как завершение потока управления, не может оставаться без функций-обработчиков. Стандартом POSIX-2001 предусмотрено существование не одного, а целого стека подобных обработчиков, ассоциированного с потоком управления. Операции над этим стеком возложены на функции pthread_cleanup_push() и pthread_cleanup_pop() (см. листинг 1.28).

#include <pthread.h>

void pthread_cleanup_push ( void (*routine) (void *), void *arg);

void pthread_cleanup_pop (int execute);

Листинг 1.28. Описание функций pthread_cleanup_push() и pthread_cleanup_pop().

Функция pthread_cleanup_push() помещает заданный аргументами routine и arg обработчик в стек обработчиков вызывающего потока. Функция pthread_cleanup_pop() извлекает верхний обработчик из этого стека и, если значение аргумента execute отлично от нуля, вызывает его (как (*routine) (arg)).

Разумеется, все обработчики, начиная с верхнего, извлекаются из стека и вызываются при терминировании потока управления (вне зависимости от того, объясняется ли терминирование внутренними или внешними причинами). В частности, это происходит после того, как поток обратится к функции pthread_exit().

Напомним, что обработчики завершения существуют и для процессов (они регистрируются с помощью функции atexit()), однако применительно к потокам управления идея стека обработчиков оформлена в более явном и систематическом виде.

Пару функций pthread_cleanup_push() и pthread_cleanup_pop() можно представлять себе как открывающую и закрывающую скобки, оформленные в виде отдельных инструкций языка C и обрамляющие обслуживаемый обработчиком участок программы. Согласно стандарту POSIX-2001, этот участок должен представлять собой фрагмент одной лексической области видимости (блока), а pthread_cleanup_push() и pthread_cleanup_pop() могут быть реализованы как макросы (см. листинг 1.29).

#define pthread_cleanup_push (rtn, arg) { \ struct _pthread_handler_rec \ __cleanup_handler, \ **__head; \ __cleanup_handler.rtn = rtn; \ __cleanup_handler.arg = arg; \ (void) pthread_getspecific \ (_pthread_handler_key, &__head); \ __cleanup_handler.next = *__head; \ *__head = &__cleanup_handler;

#define pthread_cleanup_pop (ex) \ *__head = __cleanup_handler.next; \ if (ex) (*__cleanup_handler.rtn) \ (__cleanup_handler.arg); \ }

Листинг 1.29. Возможная реализация функций pthread_cleanup_push() и pthread_cleanup_pop() как макросов.

Обратим внимание на то, что в определении макроса pthread_cleanup_push() открывается внутренний блок, в котором декларируются два необходимых объекта – структура __cleanup_handler, описывающая обработчик, и указатель __head на вершину стека, представленного в виде односвязанного (линейного) списка. В определении pthread_cleanup_pop() этот блок закрывается. Так что даже из соображений синтаксической корректности вызовы pthread_cleanup_push() и pthread_cleanup_pop() должны быть парными и располагаться в одном блоке, но более существенной нам представляется корректность семантическая.

Если поток управления по внутренним или внешним причинам терминируется при выполнении обслуживаемого обработчиком участка, тот должен обеспечить аккуратное завершение с восстановлением (если это необходимо) целостного, корректного состояния объектов, видимых в блоке, и освобождением ресурсов, лексически доступных из текущей области видимости. Иными словами, характер обработки завершения определяется программным контекстом, в котором прервано выполнение потока управления. Отсюда и привязка стека обработчиков к блочной структуре программы.

После того, как выполнятся все обработчики завершения, в неспецифицированном порядке вызываются деструкторы индивидуальных данных (если у потока управления таковые имеются). То, что деструкторы вызываются после обработчиков завершения, делает доступными для последних индивидуальные данные потоков управления.

Отметим, что если поток – не последний в процессе, то при его завершении, разумеется, не происходит автоматического освобождения видимых приложению ресурсов процесса (таких, например, как файловые дескрипторы) и не выполняются другие общепроцессные зачистки (такие, как выполнение функций, зарегистрированных посредством atexit()).

Заказать терминирование извне потока управления с заданным идентификатором можно, воспользовавшись функцией pthread_cancel() (см. листинг 1.30).

#include <pthread.h> int pthread_cancel (pthread_t thread);

Листинг 1.30. Описание функции pthread_cancel().

Напомним, что на выполнение "заказа" влияют состояние восприимчивости и тип терминирования, установленные для потока, а также достижение точки терминирования. Эти атрибуты опрашиваются и изменяются с помощью функций pthread_setcancelstate(), pthread_setcanceltype() и pthread_testcancel() (см. листинг 1.31).

#include <pthread.h>

int pthread_setcancelstate ( int state, int *oldstate);

int pthread_setcanceltype ( int type, int *oldtype);

void pthread_testcancel (void);

Листинг 1.31. Описание функций pthread_setcancelstate(), pthread_setcanceltype(), pthread_testcancel().

Функции pthread_setcancelstate() и pthread_setcanceltype() атомарным образом, в рамках неделимой транзакции устанавливают новые значения (state и type) для состояния восприимчивости и типа и помещают по заданным указателям (соответственно, oldstate и oldtype) старые значения. Допустимыми значениями для состояния восприимчивости к терминированию являются PTHREAD_CANCEL_ENABLE (терминирование разрешено – подразумеваемое значение для вновь созданных потоков управления) и PTHREAD_CANCEL_DISABLE, для типа – PTHREAD_CANCEL_DEFERRED (отложенное терминирование – подразумеваемое значение) и PTHREAD_CANCEL_ASYNCHRONOUS (немедленное, асинхронное терминирование).

Функция pthread_testcancel() создает в вызывающем потоке точку терминирования, то есть проверяет наличие ждущего заказа на терминирование и, при наличии такового, инициирует его обработку (если она разрешена).

Из общих соображений следует, что манипуляции с атрибутами терминирования необходимы для обеспечения атомарности транзакций и сохранения программных инвариантов. При входе в транзакцию терминирование запрещают, при выходе восстанавливают старое значение. Аналогично, если в пределах критического интервала нарушается некий программный инвариант, на этом интервале поток должен защититься от терминирования. Ресурсы, ассоциированные с потоком, обязаны оставаться в корректном состоянии, а осмысленные действия – или выполняться до конца, или не выполняться вообще.

Возвращаясь к функции pthread_cancel(), отметим, что иногда обработку заказа на терминирование сравнивают с реакцией на доставку сигнала. На наш взгляд, к подобной аналогии нужно относиться осторожно, поскольку она довольно поверхностна и основана в первую очередь на том, что и сигналы, и заказы на терминирование являются средствами асинхронного программного воздействия на потоки управления, и других механизмов подобной направленности стандарт POSIX-2001 не предусматривает. В частности, доставка как сигналов, так и заказов на терминирование способна вывести поток управления из состояния ожидания, быть может, потенциально бесконечного. Еще одна параллель – запрещение терминирования при входе в обработчик завершения и блокирование сигнала при входе в функцию его обработки.

Противоречат отмеченной аналогии следующие обстоятельства. Во-первых, обработка сигналов зачастую направлена на продолжение, а не завершение выполнения и, следовательно, носит принципиально иной, чем у обработчика завершения, характер. Во-вторых, для обработки терминирования предоставлено больше средств (не одна функция, как в случае сигналов, а целый стек обработчиков завершения плюс деструкторы индивидуальных данных) и они строже регламентированы (добавление и изъятие обработчиков стандартизованы как парные операции, выполняемые в пределах одной лексической области видимости, отложенное терминирование производится только в определенных точках программы, нелокальные переходы (siglongjmp()), являющиеся стандартным элементом обработки сигналов, применительно к обработчикам завершения допускаются с многочисленными оговорками и т.п.). Конечно, терминирование извне может быть реализовано на основе механизма сигналов, но есть и другие возможности.

Разумеется, функция pthread_cancel() не ожидает выполнения заказа на терминирование, так что вызывающий и целевой потоки управления продолжают выполняться параллельно, пока последний не осуществит всех специфицированных действий по зачистке и не завершится, выдав ждущим этого события потокам значение PTHREAD_CANCELED.

Помимо заказа на терминирование, потоку управления можно направить и "честный" сигнал, воспользовавшись средством "внутрипроцессного межпотокового" взаимодействия – функцией pthread_kill() (см. листинг 1.32).

#include <signal.h> int pthread_kill ( pthread_t thread, int sig);

Листинг 1.32. Описание функции pthread_kill().

Как и в случае функции kill(), при нулевом значении аргумента sig проверяется корректность заданного идентификатора потока, но никакой сигнал не генерируется.

Напомним (см. курс [1]), что сигналы генерируются для конкретного потока управления (так, в частности, поступает функция pthread_kill()) или для процесса в целом (как это делает функция kill()), но доставляются они всегда одному потоку, только во втором случае его выбор определяется реализацией из соображений простоты доставки: обычно берется активный поток, если он не блокирует данный сигнал. Естественно, если для сигнала определена функция обработки, она выполняется в контексте целевого потока управления. Другие возможные действия (терминирование, остановка) всегда применяются к процессу в целом.

Для иллюстрации изложенного приведем небольшую программу (см. листинг 1.33), в которой создается поток управления с последующей доставкой ему сигнала SIGINT. Возможные результаты работы этой программы показаны на листинге 1.34.

/* * * * * * * * * * * * * * * * * */ /* Программа демонстрирует генерацию */ /* и доставку сигналов */ /* потокам управления */ /* * * * * * * * * * * * * * * * * */

#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <signal.h> #include <errno.h>

/* * * * * * * * * * * * * * / /* Функция обработки сигнала */ /* * * * * * * * * * * * * * / static void signal_handler (int dummy) { printf ("Идентификатор потока, обрабатывающего сигнал: %lx\n", pthread_self ()); }

/* * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока управления, */ /* которому будет направлен сигнал */ /* * * * * * * * * * * * * * * * * * * */ static void *thread_start (void *dummy) { printf ("Идентификатор нового потока управления: %lx\n", pthread_self ()); while (1) { sleep (1); }

return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGINT, */ /* создает поток управления и посылает ему сигнал */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t thread_id; struct sigaction act;

/* Установим реакцию на сигнал SIGINT */ act.sa_handler = signal_handler; (void) sigemptyset (&act.sa_mask); act.sa_flags = 0; (void) sigaction (SIGINT, &act, (struct sigaction *) NULL);

if ((errno = pthread_create (&thread_id, NULL, thread_start, NULL)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } printf ("Идентификатор созданного потока управления: %lx\n", thread_id);

(void) pthread_kill (thread_id, SIGINT); printf ("После вызова pthread_kill()\n");

sleep (1); printf ("Выспались...\n");

return (0); }

Листинг 1.33. Пример использования механизма сигналов в многопотоковой программе.

Идентификатор созданного потока управления: 402 После вызова pthread_kill() Идентификатор потока, обрабатывающего сигнал: 402 Идентификатор нового потока управления: 402 Выспались...

Листинг 1.34. Возможные результаты работы многопотоковой программы, использующей механизм сигналов.

Обратим внимание на два любопытных (хотя и довольно очевидных) момента. Во-первых, начальный поток управления процесса продолжает выполнение после вызова pthread_kill() и успевает сообщить об этом. Затем он на секунду засыпает, активным становится вновь созданный поток и первым делом приступает к обработке доставленного ему сигнала. Уже после возврата из функции обработки начинается выполнение инструкций стартовой функции потока управления.

Читателю предлагается самостоятельно проанализировать, как будет вести себя приведенная программа при подразумеваемом способе обработки сигнала SIGINT.

Отмеченную выше устойчивость ожидания в функциях, обслуживающих потоки управления, к доставке и обработке сигналов проиллюстрируем программой, показанной на листинге 1.35.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа демонстрирует взаимодействие сигналов */ /* и ожидания завершения потока управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <pthread.h> #include <signal.h> #include <errno.h>

/* * * * * * * * * * * * * * */ /* Функция обработки сигнала */ /* * * * * * * * * * * * * * */ static void signal_handler (int dummy) { printf ("Идентификатор потока, обрабатывающего сигнал: %lx\n", pthread_self ()); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция создаваемого потока управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void *thread_start (void *thread_id) { printf ("Идентификатор нового потока управления: %lx\n", pthread_self ()); (void) pthread_kill ((pthread_t) thread_id, SIGINT);

return ((void *) pthread_self ()); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGINT, */ /* создает поток управления и ожидает его завершения */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t thread_id; struct sigaction act; void *pv;

/* Установим реакцию на сигнал SIGINT */ act.sa_handler = signal_handler; (void) sigemptyset (&act.sa_mask); act.sa_flags = 0; (void) sigaction (SIGINT, &act, (struct sigaction *) NULL);

if ((errno = pthread_create (&thread_id, NULL, thread_start, (void *) pthread_self ())) != 0) { perror ("PTHREAD_CREATE"); return (errno); } printf ("Идентификаторы начального и созданного потоков " "управления: " "%lx %lx\n", pthread_self (), thread_id);

/* Дождемся завершения созданного потока управления */ if ((errno = pthread_join (thread_id, &pv)) != 0) { perror ("PTHREAD_JOIN"); return (errno); } printf ("Статус завершения созданного потока " "управления: %p\n", pv);

return (0); }

Листинг 1.35. Пример программы, обрабатывающей сигнал во время ожидания завершения потока управления.

Если посмотреть на возможные результаты работы этой программы (см. листинг 1.36), можно сделать вывод, что, несмотря на получение и обработку сигнала, функция pthread_join отрабатывает с нормальным (нулевым) результатом, получая статус завершения "ожидаемого" потока.

Идентификаторы начального и созданного потоков управления: 400 402 Идентификатор нового потока управления: 402 Идентификатор потока, обрабатывающего сигнал: 400 Статус завершения созданного потока управления: 0x402

Листинг 1.36. Возможные результаты работы программы, обрабатывающей сигнал во время ожидания завершения потока управления.

И здесь читателю рекомендуется самостоятельно выяснить, как поведет себя приведенная программа при подразумеваемой реакции на сигнал SIGINT.

Еще одна полезная операция, связанная с обработкой завершения потока управления, – его динамическое обособление, выполняемое функцией pthread_detach() (см. листинг 1.37).

#include <pthread.h> int pthread_detach (pthread_t thread);

Листинг 1.37. Описание функции pthread_detach().

При завершении обособленного потока операционная система может освободить использовавшуюся им память.

Может показаться, что возможность динамического обособления является излишней (мол, достаточно соответствующего атрибута, принимаемого во внимание при создании потока функцией pthread_create()), однако это не так. Во-первых, начальный поток процесса создается нестандартным образом и обособить его статически невозможно. Во-вторых, если терминируется поток, ждущий в функции pthread_join(), обработчик его завершения должен обособить того, чье завершение является предметом ожидания, иначе с утилизацией памяти могут возникнуть проблемы.

Если приложение заботится об аккуратном освобождении памяти, то для всех потоков управления, созданных с атрибутом PTHREAD_CREATE_JOINABLE, следует предусмотреть вызов либо pthread_join(), либо pthread_detach().

В качестве примера многопотоковой программы приведем серверную часть рассматривавшегося в курсе [1] приложения, копирующего строки со стандартного ввода на стандартный вывод с "прокачиванием" их через потоковые сокеты (см. листинг 1.38).

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа процесса (будем называть его серверным), */ /* принимающего запросы на установления соединения и */ /* запускающего потоки управления для их обслуживания */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <netdb.h> #include <pthread.h> #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h>

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая (и единственная) функция потоков управления, */ /* обслуживающих запросы на копирование строк, */ /* поступающих из сокета */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *srv_thread_start (void *ad) { FILE *fpad; /* Поток данных, соответствующий */ /* дескриптору ad */ char line [LINE_MAX]; /* Буфер для принимаемых строк */ /* Структура для записи адреса */ struct sockaddr_in sai; /* Длина адреса */ socklen_t sai_len = sizeof (struct sockaddr_in);

/* Опросим адрес партнера по общению (передающего сокета) */ if (getpeername ((int) ad, (struct sockaddr *) &sai, &sai_len) < 0) { perror ("GETPEERNAME"); return (NULL); }

/* По файловому дескриптору ad сформируем */ /* буферизованный поток данных */ if ((fpad = fdopen ((int) ad, "r")) == NULL) { perror ("FDOPEN"); return (NULL); }

/* Цикл чтения строк из сокета */ /* и выдачи их на стандартный вывод */ while (fgets (line, sizeof (line), fpad) != NULL) { printf ("Вы ввели и отправили с адреса %s, " "порт %d :", inet_ntoa (sai.sin_addr), ntohs (sai.sin_port)); fputs (line, stdout); }

/* Закрытие соединения */ shutdown ((int) ad, SHUT_RD); (void) fclose (fpad);

return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* В функции main() принимаются запросы на установление */ /* соединения и запускаются потоки управления для их обслуживания */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { int sd; /* Дескриптор слушающего сокета */ int ad; /* Дескриптор приемного сокета */ /* Буфер для принимаемых строк */ struct addrinfo hints = {AI_PASSIVE, AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; /* Указатель – выходной аргумент getaddrinfo */ struct addrinfo *addr_res; int res; /* Результат getaddrinfo */ pthread_attr_t patob; /* Атрибутный объект для создания */ /* потоков управления */ pthread_t adt_id; /* Идентификатор обслуживающего потока управления */

/* Создадим слушающий сокет */ if ((sd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { perror ("SOCKET"); return (1); }

/* Привяжем этот сокет к адресу сервиса spooler */ /* на локальном хосте */ if ((res = getaddrinfo (NULL, "spooler", &hints, &addr_res)) != 0) { fprintf (stderr, "GETADDRINFO: %s\n", gai_strerror (res)); return (2); } if (bind (sd, addr_res->ai_addr, addr_res->ai_addrlen) < 0) { perror ("BIND"); return (3); }

/* Можно освободить память, которую запрашивала */ /* функция getaddrinfo() */ freeaddrinfo (addr_res);

/* Пометим сокет как слушающий */ if (listen (sd, SOMAXCONN) < 0) { perror ("LISTEN"); return (4); }

/* Инициализируем атрибутный объект потоков управления */ if ((errno = pthread_attr_init (&patob)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); } /* Потоки управления будем создавать обособленными */ (void) pthread_attr_setdetachstate (&patob, PTHREAD_CREATE_DETACHED);

/* Цикл приема соединений и запуска */ /* обслуживающих потоков управления */ while (1) { /* Примем соединение. */ /* Адрес партнера по общению нас */ /* в данном случае не интересует */ if ((ad = accept (sd, NULL, NULL)) < 0) { perror ("ACCEPT"); return (6); }

/* Запустим обслуживающий поток управления */ if ((errno = pthread_create (&adt_id, &patob, srv_thread_start,(void *) ad)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } }

return (0); }

Листинг 1.38. Пример многопотоковой программы, обслуживающей запросы на копирование строк, поступающих через сокеты.

Многопотоковая реализация в данном случае уместнее многопроцессной: она и выглядит проще (поскольку потоки проще создавать и не обязательно в явном виде ждать их завершения), и ресурсов потребляет меньше.

Можно надеяться, что и следующая программа (см. листинг 1.39), реализующая идею обработки данных с контролем времени, подтверждает, что применение потоков управления позволяет сделать исходный текст более простым и наглядным по сравнению с "беспотоковым" вариантом (см. курс [1]). Удалось избавиться от такого сугубо "неструктурного" средства, как нелокальные переходы, и от ассоциированных с ними тонкостей, чреватых ошибками.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа вызывает функции обработки в рамках */ /* порождаемых потоков управления и контролирует время */ /* их выполнения с помощью интервального таймера */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <pthread.h> #include <sys/time.h> #include <signal.h> #include <errno.h>

/* Период интервального таймера (в секундах) */ #define IT_PERIOD 1

static pthread_t cthread_id; /* Идентификатор текущего */ /* потока управления, */ /* обрабатывающего данные */

static int in_proc_data = 0; /* Признак активности */ /* потока обработки данных */

static double s; /* Результат функций */ /* обработки данных */

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция обработки срабатывания таймера реального */ /* времени (сигнал SIGALRM) */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_sigalrm (int dummy) { if (in_proc_data) { /* Не имеет значения, какой поток обрабатывает сигнал */ /* и заказывает терминирование (быть может, себя) */ (void) pthread_cancel (cthread_id); } }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Обработчик завершения потока управления. */ /* Сбрасывает признак активности потока обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_cleanup_handler (void *arg) { in_proc_data = (int) arg; }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока управления, обрабатывающего */ /* данные. Аргумент – указатель на функцию обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_func (void *proc_data_func) { /* Поместим в стек обработчик завершения */ pthread_cleanup_push (proc_data_cleanup_handler, 0); in_proc_data = 1; /* Время пошло ... */

/* На время выполнения функции обработки данных установим */ /* асинхронный тип терминирования, иначе оно не сработает */ (void) pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

/* Выполним функцию обработки данных */ ((void (*) (void)) (proc_data_func)) ();

/* Установим отложенный тип терминирования, */ /* иначе изъятие обработчика из стека */ /* будет небезопасным действием */ (void) pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);

/* Выполним обработчик завершения и удалим его из стека */ pthread_cleanup_pop (1); return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Первая функция обработки данных (вычисляет ln (2)) */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_1 (void) { double d = 1; int i;

s = 0; for (i = 1; i <= 100000000; i++) { s += d / i; d = -d; } }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Вторая функция обработки данных (вычисляет sqrt (2))*/ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_2 (void) { s = 1; do { s = (s + 2 / s) * 0.5; } while ((s * s – 2) > 0.000000001); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGALRM, */ /* взводит периодический таймер реального времени */ /* и запускает в цикле потоки обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { /* Массив указателей на функции обработки данных */ void (*fptrs []) (void) = {proc_data_1, proc_data_2, NULL}; /* Указатель на указатель на */ /* текущую функцию обработки данных */ void (**tfptr) (void); void *pstat; /* Статус завершения потока */ /* обработки данных */ struct itimerval itvl; struct sigaction sact; int i;

/* Установим реакцию на сигнал SIGALRM */ sact.sa_handler = proc_sigalrm; sact.sa_flags = 0; (void) sigemptyset (&sact.sa_mask); if (sigaction (SIGALRM, &sact, NULL) < 0) { perror ("SIGACTION"); return (1); }

/* Сделаем таймер реального времени периодическим */ itvl.it_interval.tv_sec = IT_PERIOD; itvl.it_interval.tv_usec = 0;

/* Цикл запуска потоков обработки данных. */ /* Выполним его дважды */ for (i = 0; i < 2; i++) { for (tfptr = fptrs; *tfptr != NULL; tfptr++) { /* Взведем интервальный таймер реального времени */ itvl.it_value.tv_sec = IT_PERIOD; itvl.it_value.tv_usec = 0; if (setitimer (ITIMER_REAL, &itvl, NULL) < 0) { perror ("SETITIMER"); return (2); }

/* Создадим поток обработки данных, */ /* затем дождемся его завершения */ if ((errno = pthread_create (&cthread_id, NULL, start_func, (void *) *tfptr)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } if ((errno = pthread_join (cthread_id, &pstat)) != 0) { perror ("PTHREAD_JOIN"); return (errno); }

if (pstat == PTHREAD_CANCELED) { printf ("Частичный результат функции " "обработки данных: %g\n", s); } else { printf ("Полный результат функции " "обработки данных: %g\n", s); } } }

return 0; }

Листинг 1.39. Пример многопотоковой программы, осуществляющей обработку данных с контролем времени.

Возможные результаты работы приведенной программы показаны на листинге 1.40.

Частичный результат функции обработки данных: 0.693147 Полный результат функции обработки данных: 1.41421 Частичный результат функции обработки данных: 0.693147 Полный результат функции обработки данных: 1.41421

Листинг 1.40. Возможные результаты работы многопотоковой программы, осуществляющей обработку данных с контролем времени.

К сожалению, там, где есть недетерминированность и асинхронность, без тонкостей все равно не обойтись. Мы обратим внимание на три из них. Во-первых, возможно срабатывание таймера и доставка сигнала SIGALRM до того, как завершится (или даже начнется) первый вызов pthread_create() и будет инициализирована переменная cthread_id. Чтобы не допустить терминирования "неопределенного" потока управления в функции обработки сигнала, введен признак активности потока обработки данных. Если он установлен, переменная cthread_id заведомо инициализирована.

Во-вторых, не имеет значения, в контексте какого из потоков выполняется функция обработки сигнала – вполне допустимо, чтобы поток заказал собственное терминирование.

В-третьих, поскольку функции обработки данных не содержат точек терминирования, на время их выполнения необходимо установить асинхронный тип терминирования, иначе оно попросту не сработает. Однако использование этого типа крайне опасно, поэтому при первой возможности следует вернуться к более надежному отложенному типу. Перед манипуляциями со стеком обработчиков завершения установка данного типа или запрет терминирования являются обязательными.

Как мы уже упоминали, потоки управления иногда называют легковесными процессами. Любопытно оценить, какова степень их легковесности, насколько накладные расходы на их обслуживание меньше, чем для "настоящих" процессов.

На листингах 1.41 и 1.42 показана программа, которая в цикле порождает практически пустые процессы и дожидается их завершения; на листинге 1.43 приведены данные о времени ее работы, полученные с помощью команды time -p. Даже если сделать процессам послабление и убрать вызов execl(), времена получатся довольно большими (см. листинг 1.44) в сравнении с аналогичными данными для варианта с потоками управления (см. листинги 1.45 и 1.46).

#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <sys/wait.h>

#define N 10000

int main (void) { int i;

for (i = 0; i < N; i++) { switch (fork ()) { case -1: perror ("FORK"); return (1); case 0: /* Порожденный процесс */ (void) execl ("./dummy", "dummy", (char *) 0); exit (0); default: /* Родительский процесс */ (void) wait (NULL); } }

return 0; }

Листинг 1.41. Пример программы, порождающей в цикле практически пустые процессы.

int main (void) { return 0; }

Листинг 1.42. Содержимое файла dummy.c

real 34.97 user 12.36 sys 22.61

Листинг 1.43. Возможные результаты измерения времени работы программы, порождающей в цикле практически пустые процессы (вариант с вызовом execl()).

real 11.49 user 2.38 sys 9.11

Листинг 1.44. Возможные результаты измерения времени работы программы, порождающей в цикле практически пустые процессы (вариант без вызова execl()).

#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <errno.h>

#define N 10000

static void *thread_start (void *arg) { pthread_exit (arg); }

int main (void) { pthread_t thread_id; int i;

for (i = 0; i < N; i++) { if ((errno = pthread_create ( &thread_id, NULL, thread_start, NULL)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } if ((errno = pthread_join ( thread_id, NULL)) != 0) { perror ("PTHREAD_JOIN"); return (errno); } }

return (0); }

Листинг 1.45. Пример программы, порождающей в цикле потоки управления.

real 2.08 user 0.52 sys 1.56

Листинг 1.46. Возможные результаты измерения времени работы программы, порождающей в цикле потоки управления.

В первом приближении можно считать, что потоки управления на порядок дешевле процессов. На самом деле, в реальных ситуациях, когда процессы существенно превосходят по размеру приведенные выше "пустышки", различие будет еще больше.

Можно сделать вывод, что потоки управления допускают довольно свободное использование, накладные расходы на их обслуживание невелики, особенно в сравнении с обслуживанием процессов. Поэтому, проектируя приложение с параллельно выполняемыми компонентами, следует в первую очередь проанализировать возможность многопотоковой реализации. Главное препятствие в осуществлении подобной возможности – разделение потоками одного адресного пространства. Только если это препятствие окажется непреодолимым, целесообразно воспользоваться механизмом процессов.

<


Как и процесс, поток управления можно терминировать изнутри и извне. С одним, неявным, но наиболее естественным способом "самоликвидации" – выходом из стартовой функции потока – мы уже познакомились. Тот же эффект достигается вызовом функции pthread_exit() (см. листинг 1.26).

#include <pthread.h> void pthread_exit (void *value_ptr);

Листинг 1.26. Описание функции pthread_exit(). (html, txt)

Терминирование потока управления в идейном плане существенно сложнее создания. Чтобы осветить все тонкости, необходимо ввести несколько новых понятий и описать целый ряд функций. Пока мы укажем лишь, что терминирование последнего потока в процессе вызывает его (процесса) завершение с нулевым кодом.

Из общих соображений (например, если исходить из аналогии между процессами и потоками управления) очевидно, что должна существовать возможность дождаться завершения заданного потока управления. Эта возможность реализуется функцией pthread_join() (см. листинг 1.27), напоминающей waitpid().

#include <pthread.h> int pthread_join ( pthread_t thread, void **value_ptr_ptr);

Листинг 1.27. Описание функции pthread_join(). (html, txt)

Поток управления, вызвавший функцию pthread_join(), приостанавливает выполнение до завершения потока, идентификатор которого задан аргументом thread. При успешном возврате из pthread_join() результат, как и положено, равен нулю, а по указателю value_ptr_ptr (если он не пуст) помещается значение (указатель value_ptr), переданное в качестве аргумента функции pthread_exit(). Тем самым ждуший поток получает данные о статусе завершения ожидаемого.

Отметим, что трактовка значения value_ptr возлагается на приложение. Например, оно может считать его целым числом, а не указателем; по этой причине операционная система при выполнении функции pthread_exit() не вправе выдавать ошибку типа "неверный адрес" каким бы ни был аргумент value_ptr. Если он все же является указателем, то ему нельзя присваивать адрес автоматической переменной, поскольку к моменту его использования ожидаемый поток уже завершится и состояние его автоматических переменных станет неопределенным.



Второе общее соображение касается того обстоятельства, что вызов такой функции, как ожидание завершения (pthread_join()) способен приостановить выполнение вызывающего потока управления на неопределенное время, в течение которого ему может быть доставлен обрабатываемый сигнал. Как правило, в подобных ситуациях выполнение функций (таких, например, как read()) завершается с частично достигнутым результатом (например, с числом прочитанных байт, меньшим запрошенного) и кодом ошибки EINTR, нуждающимся в нестандартной обработке, далеко не всегда реализуемой разработчиками приложений. Из-за этого в программах появляются дефекты, которые трудно воспроизвести и, соответственно, исправить.

Согласно стандарту POSIX-2001, функции, обслуживающие потоки управления, свободны от этого недостатка. Они никогда не завершаются с частичным результатом и не выдают код ошибки EINTR. Восстановление нормального состояния после того, как ожидание было прервано доставкой и обработкой сигнала, возлагается на операционную систему, а не на приложение.

Третье общее соображение состоит в том, что такое критически важное событие, как завершение потока управления, не может оставаться без функций-обработчиков. Стандартом POSIX-2001 предусмотрено существование не одного, а целого стека подобных обработчиков, ассоциированного с потоком управления. Операции над этим стеком возложены на функции pthread_cleanup_push() и pthread_cleanup_pop() (см. листинг 1.28).

#include <pthread.h>

void pthread_cleanup_push ( void (*routine) (void *), void *arg);

void pthread_cleanup_pop (int execute);

Листинг 1.28. Описание функций pthread_cleanup_push() и pthread_cleanup_pop(). (html, txt)

Функция pthread_cleanup_push() помещает заданный аргументами routine и arg обработчик в стек обработчиков вызывающего потока. Функция pthread_cleanup_pop() извлекает верхний обработчик из этого стека и, если значение аргумента execute отлично от нуля, вызывает его (как (*routine) (arg)).

Разумеется, все обработчики, начиная с верхнего, извлекаются из стека и вызываются при терминировании потока управления (вне зависимости от того, объясняется ли терминирование внутренними или внешними причинами).


В частности, это происходит после того, как поток обратится к функции pthread_exit().

Напомним, что обработчики завершения существуют и для процессов (они регистрируются с помощью функции atexit()), однако применительно к потокам управления идея стека обработчиков оформлена в более явном и систематическом виде.

Пару функций pthread_cleanup_push() и pthread_cleanup_pop() можно представлять себе как открывающую и закрывающую скобки, оформленные в виде отдельных инструкций языка C и обрамляющие обслуживаемый обработчиком участок программы. Согласно стандарту POSIX-2001, этот участок должен представлять собой фрагмент одной лексической области видимости (блока), а pthread_cleanup_push() и pthread_cleanup_pop() могут быть реализованы как макросы (см. листинг 1.29).

#define pthread_cleanup_push (rtn, arg) { \ struct _pthread_handler_rec \ __cleanup_handler, \ **__head; \ __cleanup_handler.rtn = rtn; \ __cleanup_handler.arg = arg; \ (void) pthread_getspecific \ (_pthread_handler_key, &__head); \ __cleanup_handler.next = *__head; \ *__head = &__cleanup_handler;

#define pthread_cleanup_pop (ex) \ *__head = __cleanup_handler.next; \ if (ex) (*__cleanup_handler.rtn) \ (__cleanup_handler.arg); \ }

Листинг 1.29. Возможная реализация функций pthread_cleanup_push() и pthread_cleanup_pop() как макросов. (html, txt)

Обратим внимание на то, что в определении макроса pthread_cleanup_push() открывается внутренний блок, в котором декларируются два необходимых объекта – структура __cleanup_handler, описывающая обработчик, и указатель __head на вершину стека, представленного в виде односвязанного (линейного) списка. В определении pthread_cleanup_pop() этот блок закрывается. Так что даже из соображений синтаксической корректности вызовы pthread_cleanup_push() и pthread_cleanup_pop() должны быть парными и располагаться в одном блоке, но более существенной нам представляется корректность семантическая.



© 2003-2007 INTUIT.ru. Все права защищены.

Барьеры


Барьеры – весьма своеобразное средство синхронизации. Идея его в том, чтобы в определенной точке ожидания собралось заданное число потоков управления. Только после этого они смогут продолжить выполнение. (Поговорка "семеро одного не ждут" к барьерам не применима.)

Барьеры полезны для организации коллективных распределенных вычислений в многопроцессорной конфигурации, когда каждый участник (поток управления) выполняет часть работы, а в точке сбора частичные результаты объединяются в общий итог.

Функции, ассоциированные с барьерами, подразделяются на следующие группы.

инициализация и разрушение барьеров: pthread_barrier_init(), pthread_barrier_destroy() (см. листинг 2.36);

#include <pthread.h>

int pthread_barrier_init ( pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);

int pthread_barrier_destroy ( pthread_barrier_t *barrier);

Листинг 2.36. Описание функций инициализации и разрушения барьеров. (html, txt)

синхронизация на барьере: pthread_barrier_wait() (см. листинг 2.37);

#include <pthread.h> int pthread_barrier_wait ( pthread_barrier_t *barrier);

Листинг 2.37. Описание функции синхронизации на барьере. (html, txt)

инициализация и разрушение атрибутных объектов барьеров: pthread_barrierattr_init(), pthread_barrierattr_destroy() (см. листинг 2.38);

#include <pthread.h>

int pthread_barrierattr_init ( pthread_barrierattr_t *attr);

int pthread_barrierattr_destroy ( pthread_barrierattr_t *attr);

Листинг 2.38. Описание функций инициализации и разрушения атрибутных объектов барьеров. (html, txt)

опрос и установка атрибутов барьеров в атрибутных объектах: pthread_barrierattr_getpshared(), pthread_barrierattr_setpshared() (см. листинг 2.39).

#include <pthread.h>

int pthread_barrierattr_getpshared (const pthread_barrierattr_t *restrict attr, int *restrict pshared);

int pthread_barrierattr_setpshared (pthread_barrierattr_t *attr, int pshared);

Листинг 2.39. Описание функций опроса и установки атрибутов барьеров в атрибутных объектах. (html, txt)


Обратим внимание на аргумент count в функции инициализации барьера pthread_barrier_init(). Он задает количество синхронизируемых потоков управления. Столько потоков должны вызвать функцию pthread_barrier_wait(), прежде чем каждый из них сможет успешно завершить вызов и продолжить выполнение. (Разумеется, значение count должно быть положительным.)

Когда к функции pthread_barrier_wait() обратилось требуемое число потоков управления, одному из них (стандарт POSIX-2001 не специфицирует, какому именно) в качестве результата возвращается именованная константа PTHREAD_BARRIER_SERIAL_THREAD, а всем другим выдаются нули. После этого барьер возвращается в начальное (инициализированное) состояние, а выделенный поток может выполнить соответствующие объединительные действия.

Описанная схема работы проиллюстрирована листингом 2.40.

if ((status = pthread_barrier_wait( &barrier)) == PTHREAD_BARRIER_SERIAL_THREAD) { /* Выделенные (обычно – объединительные) */ /* действия. */ /* Выполняются каким-то одним потоком */ /* управления */

} else { /* Эта часть выполняется всеми */ /* прочими потоками */ /* управления */ if (status != 0) { /* Обработка ошибочной ситуации */ } else { /* Нормальное "невыделенное" */ /* завершение ожидания */ /* на барьере */ } }

/* Повторная синхронизация – */ /* ожидание завершения выделенных действий */ status = pthread_barrier_wait (&barrier); /* Продолжение параллельной работы */ . . .

Листинг 2.40. Типичная схема применения функции pthread_barrier_wait(). (html, txt)

Отметим, что для барьеров отсутствует вариант синхронизации с контролем времени ожидания. Это вполне понятно, поскольку в случае срабатывания контроля барьер окажется в неработоспособном состоянии (требуемое число потоков, скорее всего, уже не соберется). По той же причине функция pthread_barrier_wait() не является точкой терминирования – "оставшиеся в живых" не переживут потери товарища...

Аналогично, не являются точками терминирования функции pthread_mutex_lock() и pthread_spin_lock().Если бы они были таковыми, то точками терминирования стали бы все функции, в том числе библиотечные, которые их вызывают – malloc(), free() и т.п. Обеспечить в обработчиках завершения корректное состояние объектов, обслуживаемых подобными функциями, довольно сложно; это деятельность, чреватая ошибками, которые трудно не только найти и исправить, но даже воспроизвести. С другой стороны, мьютексы и спин-блокировки предназначены для захвата на короткое время, без длительного ожидания, так что нечувствительность к терминированию в данном случае не составляет большой проблемы.

Работу с барьерами проиллюстрируем коллективными вычислениями, производимыми двумя потоками (см. листинг 2.41).

Листинг 2.41. Пример программы, использующей барьеры. (html, txt)

В данном случае второго ожидания на барьере не понадобилось – вместо этого потоки управления просто завершаются.


Блокировки чтение-запись


Блокировки чтение-запись можно назвать интеллектуальным средством синхронизации, поскольку они делают различие между читателями и писателями. В большинстве случаев разделяемые данные чаще читают, чем изменяют (действительно, зачем писать то, что никто не будет читать?), и это делает блокировки чтение-запись весьма употребительными.

Типичный пример использования блокировок чтение-запись – синхронизация доступа к буферизованным в памяти фрагментам файловой системы, особенно к каталогам (чем ближе к корню, тем чаще их читают и реже изменяют). Важно отметить в этой связи, что, в отличие от блокировок, реализуемых функцией fcntl() (см. курс [1]), блокировки чтение-запись могут применяться и там, где файловая система отсутствует (например, в минимальных конфигурациях, функционирующих под управлением соответствующей подпрофилю стандарта POSIX операционной системы реального времени).

Применительно к блокировкам чтение-запись предоставляются следующие группы функций.

инициализация и разрушение блокировок: pthread_rwlock_init(), pthread_rwlock_destroy() (см. листинг 2.20);

#include <pthread.h>

int pthread_rwlock_init ( pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

int pthread_rwlock_destroy ( pthread_rwlock_t *rwlock);

Листинг 2.20. Описание функций инициализации и разрушения блокировок чтение-запись. (html, txt)

установка блокировки на чтение: pthread_rwlock_rdlock(), pthread_rwlock_tryrdlock(), pthread_rwlock_timedrdlock() (см. листинги 2.21 и 2.22);

#include <pthread.h>

int pthread_rwlock_rdlock ( pthread_rwlock_t *rwlock);

int pthread_rwlock_tryrdlock ( pthread_rwlock_t *rwlock);

Листинг 2.21. Описание функций установки блокировки на чтение. (html, txt)

#include <pthread.h> #include <time.h> int pthread_rwlock_timedrdlock ( pthread_rwlock_t *restrict rwlock, const struct timespec *restrict abstime);

Листинг 2.22. Описание функции установки блокировки на чтение с ограниченным ожиданием. (html, txt)

установка блокировки на запись: pthread_rwlock_wrlock(), pthread_rwlock_trywrlock(), pthread_rwlock_timedwrlock() (см.
листинги 2.23 и 2.24);

#include <pthread.h>

int pthread_rwlock_wrlock ( pthread_rwlock_t *rwlock);

int pthread_rwlock_trywrlock ( pthread_rwlock_t *rwlock);

Листинг 2.23. Описание функций установки блокировки на запись. (html, txt)

#include <pthread.h> #include <time.h> int pthread_rwlock_timedwrlock ( pthread_rwlock_t *restrict rwlock, const struct timespec *restrict abstime);

Листинг 2.24. Описание функции установки блокировки на запись с ограниченным ожиданием. (html, txt)

снятие блокировки чтение-запись: pthread_rwlock_unlock() (см. листинг 2.25);

#include <pthread.h> int pthread_rwlock_unlock ( pthread_rwlock_t *rwlock);

Листинг 2.25. Описание функции снятия блокировки чтение-запись. (html, txt)

инициализация и разрушение атрибутных объектов блокировок: pthread_rwlockattr_init(), pthread_rwlockattr_destroy() (см. листинг 2.26);

#include <pthread.h>

int pthread_rwlockattr_init ( pthread_rwlockattr_t *attr);

int pthread_rwlockattr_destroy ( pthread_rwlockattr_t *attr);

Листинг 2.26. Описание функций инициализации и разрушения атрибутных объектов блокировок. (html, txt)

опрос и установка атрибутов блокировок в атрибутных объектах: pthread_rwlockattr_getpshared(), pthread_rwlockattr_setpshared() (см. листинг 2.27).

#include <pthread.h>

int pthread_rwlockattr_getpshared ( const pthread_rwlockattr_t *restrict attr, int *restrict pshared);

int pthread_rwlockattr_setpshared ( pthread_rwlockattr_t *attr, int pshared);

Листинг 2.27. Описание функций опроса и установки атрибутов блокировок в атрибутных объектах. (html, txt)



Тонким вопросом, связанным с блокировками чтение-запись, является взаимодействие читателей и писателей. Согласно стандарту POSIX-2001, если речь не идет о приложениях реального времени, то от реализации зависит, будет ли приостановлен поток управления при попытке установить блокировку на чтение при наличии ожидающих освобождения той же блокировки писателей. (Как правило, чтобы воспрепятствовать зависанию писателей, читателей в подобной ситуации "тормозят", иначе они так и будут подхватывать блокировку друг у друга.


Для политик планирования реального времени преимущество получает более приоритетный поток.)

Естественно, если блокировку установил писатель, читателю придется подождать. Если этим писателем оказался тот же поток, ждать придется долго...

Параллельно одним или несколькими потоками управления может быть установлено несколько блокировок на чтение. Сколько раз устанавливали такую блокировку, столько же раз ее необходимо снять. (Установка блокировки писателем возможна только после последнего снятия.) Максимально допустимое число одновременно установленных блокировок на чтение зависит от реализации.

Когда снимают блокировку, на которую претендуют и читатели, и писатели, как правило, преимущество получают писатели, но, строго говоря, решение зависит от реализации.

Разумеется, снимать блокировку может только тот поток управления, который ее устанавливал; в противном случае поведение неопределено.

В качестве иллюстрации применения блокировок чтение-запись приведем набор функций для работы со списками в многопотоковой среде. На листинге 2.28 показан заголовочный файл, содержащий необходимые описания, на листинге 2.29 – C-файл с реализацией функций. Листинг 2.30 содержит текст тестовой программы, листинг 2.31 – результаты ее работы. Обратим внимание на использование блокировок без приостановки выполнения и на проверку статуса завершения попыток их установить.

Листинг 2.28. Заголовочный файл g_list.h для функций работы со списками в многопотоковой среде. (html, txt)

Листинг 2.29. Исходный текст функций для работы со списками в многопотоковой среде. (html, txt)

Листинг 2.30. Пример программы, использующей функции для работы со списками в многопотоковой среде. (html, txt)

Попыток выполнить операцию со списком: 1503 Число попыток, неудачных из-за занятости списка: 0

Листинг 2.31. Возможные результаты работы программы, использующей функции для работы со списками в многопотоковой среде. (html, txt)

Можно видеть, что в данном случае оптимистичное применение блокировок чтение-запись без приостановки выполнения в случае невозможности немедленной установки себя полностью оправдало.



/* Инициализировать список */ extern void g_list_init (g_listP);

/* Завершить работу со списком */ extern void g_list_destroy (g_listP);

/* Вставить новый элемент */ /* в конец списка */ extern void g_list_ins_last ( g_listP, void *); /* Вставить новый элемент в */ /* список перед первым, */ /* удовлетворяющим заданному */ /* свойству (или в конец) */ extern void g_list_ins_fun ( g_listP, void *, afun);

/* Удалить элемент из списка */ extern void g_list_del ( g_listP, void *);

/* Применить процедуру ко всем */ /* элементам списка */ extern void g_list_forall_x ( g_listP, aproc, void *);

/* Выбрать подходящий элемент списка */ extern void *g_list_suchas_x ( g_listP, afun, void *);

/* Выдать элемент по номеру */ extern void *g_list_get_bynum ( g_listP, int);

#endif

Листинг 2.28. Заголовочный файл g_list. h для функций работы со списками в многопотоковой среде.

/* * * * * * * * * * * * * * * * * * * * */ /* Реализация функций для многопотоковой */ /* работы со списками. */ /* Применяются блокировки чтение-запись */ /* без приостановки выполнения */ /* * * * * * * * * * * * * * * * * * * * */

#define _XOPEN_SOURCE 600

#include <pthread.h> #include <stdlib.h> #include <errno.h> #include "g_list.h"

/* * * * * * * * * * * * * */ /* Инициализировать список */ /* * * * * * * * * * * * * */ void g_list_init (g_listP plist) { plist->head = NULL; errno = pthread_rwlock_init ( &plist->lk, NULL); } /* * * * * * * * * * * * * * * */ /* Завершить работу со списком */ /* * * * * * * * * * * * * * * */ void g_list_destroy (g_listP plist) { errno = pthread_rwlock_destroy ( &plist->lk); }

/* * * * * * * * * * * * * * * * * * * * */ /* Вставить новый элемент в конец списка */ /* * * * * * * * * * * * * * * * * * * * */ void g_list_ins_last (g_listP plist, void *pval) { register g_linkP pt; register g_linkP *p;

if ((errno = pthread_rwlock_trywrlock ( &plist->lk)) != 0) { return; }

for (p = &plist->head; *p != NULL; p = &(*p)->pnext) ; if ((pt = (g_linkP) malloc (sizeof (*pt))) != NULL) { pt->pnext = NULL; pt->pvalue = pval; *p = pt; }



(void) pthread_rwlock_unlock ( &plist->lk); }

/* * * * * * * * * * * * * * * * * */ /* Вставить новый элемент в список */ /* перед первым, */ /* удовлетворяющим заданному свойству */ /* (или в конец) */ /* * * * * * * * * * * * * * * * */ void g_list_ins_fun (g_listP plist, void *pval, afun fun) { register g_linkP pt; register g_linkP *p;

if ((errno = pthread_rwlock_trywrlock ( &plist->lk)) != 0) { return; }

for (p = &plist->head; (*p != NULL) && (fun (pval, (*p)->pnext->pvalue) == 0); p = &(*p)->pnext) ;

if ((pt = (g_linkP) malloc ( sizeof (*pt))) != NULL) { pt->pnext = NULL; pt->pvalue = pval; *p = pt; }

(void) pthread_rwlock_unlock ( &plist->lk); }

/* * * * * * * * * * * * * * */ /* Удалить элемент из списка */ /* * * * * * * * * * * * * * */ void g_list_del (g_listP plist, void *pval) { register g_linkP pt; register g_linkP *p;

if ((errno = pthread_rwlock_trywrlock (&plist->lk)) != 0) { return; }

for (p = &plist->head; *p != NULL; p = &(*p)->pnext) { if ((*p)->pvalue == pval) { pt = *p; *p = pt->pnext; free (pt); errno = pthread_rwlock_unlock( &plist->lk); return; } }

/* Пытаемся удалить */ /* несуществующий элемент */ (void) pthread_rwlock_unlock ( &plist->lk); errno = EINVAL; }

/* * * * * * * * * * * * * */ /* Применить процедуру ко */ /* всем элементам списка */ /* * * * * * * * * * * * */ void g_list_forall_x (g_listP plist, aproc proc, void *extobj) { register g_linkP pt;

/* Устанавливаем блокировку на запись, */ /* поскольку, возможно, процедура */ /* модифицирует */ /* объекты, ссылки на которые */ /* хранятся в списке */ if ((errno = pthread_rwlock_trywrlock( &plist->lk)) != 0) { return; }

for (pt = plist->head; pt != NULL; pt = pt->pnext) { proc (extobj, pt->pvalue); }

(void) pthread_rwlock_unlock( &plist->lk); }

/* * * * * * * * * * * * * * * * * * */ /* Выбрать подходящий элемент списка */ /* * * * * * * * * * * * * * * * * * */ void *g_list_suchas_x (g_listP plist, afun fun, void *extobj) { register g_linkP pt;



/* Устанавливаем блокировку на */ /* чтение, так как */ /* считаем, что функция не */ /* модифицирует объекты, */ /* ссылки на которые хранятся */ /* в списке */ if ((errno = pthread_rwlock_tryrdlock( &plist->lk)) != 0) { return NULL; }

for (pt = plist->head; pt != NULL; pt = pt->pnext) { if (fun (extobj, pt->pvalue)) { (void) pthread_rwlock_unlock( &plist->lk); return (pt->pvalue); } }

(void) pthread_rwlock_unlock( &plist->lk); return NULL; }

/* * * * * * * * * * * * * * */ /* Выдать элемент по номеру */ /* * * * * * * * * * * * * * */ void *g_list_get_bynum (g_listP plist, int n) { register g_linkP pt;

if ((errno = pthread_rwlock_tryrdlock( &plist->lk)) != 0) { return NULL; }

for (pt = plist->head; n--, pt != NULL; pt = pt->pnext) { if (n == 0) { (void) pthread_rwlock_unlock( &plist->lk); return (pt->pvalue); } }

/* Пытаемся извлечь несуществующий */ /* компонент */ (void) pthread_rwlock_unlock( &plist->lk); errno = EINVAL;

return NULL; }

Листинг 2.29. Исходный текст функций для работы со списками в многопотоковой среде.

/* * * * * * * * * * * * * * * * * * * */ /* Тест функций многопотоковой работы */ /* со списками */ /* * * * * * * * * * * * * * * * * * * */

#define _XOPEN_SOURCE 600

#include <pthread.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include "g_list.h"

/* Структура для засыпания на */ /* минимальное время */ static struct timespec nslp = {0, 1};

/* Структуры для возврата результатов */ /* потоками: */ /* Сколько операций пытались сделать, */ /* сколько из них оказались неудачными */ /* из-за занятости списка */ static struct pt_res { int nops; int nbusy; } ptres [3] = {{0, 0}, {0, 0}, {0, 0}};

/* * * * * * * * * * * * * * * * * * * * */ /* Этот поток добавляет элементы к списку */ /* * * * * * * * * * * * * * * * * * * * */ static void *start_func_1 (void *plist) { int *p1;

while (1) { if ((p1 = (int *) malloc ( sizeof (int))) != NULL) { *p1 = rand (); errno = 0; g_list_ins_last (plist, p1); ptres [0].nops++; if (errno == EBUSY) { ptres [0].nbusy++; } } (void) nanosleep (&nslp, NULL); }



return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * */ /* Процедура для подсчета суммы элементов */ /* списка */ /* * * * * * * * * * * * * * * * * * * * * */ static void proc_sum (void *sum, void *pval) { *((int *) sum) += *((int *) pval); } /* * * * * * * * * * * * * * * * * * * * * */ /* Этот поток подсчитывает сумму элементов */ /* списка */ /* * * * * * * * * * * * * * * * * * * * * */ static void *start_func_2 (void *plist) { int sum;

while (1) { sum = 0; errno = 0; g_list_forall_x (plist, &proc_sum, &sum); ptres [1].nops++; if (errno == EBUSY) { ptres [1].nbusy++; } (void) nanosleep (&nslp, NULL); }

return (NULL); }

/* * * * * * * * * * * * * * * * * */ /* Этот поток удаляет из списка */ /* элементы со случайными номерами */ /* * * * * * * * * * * * * * * * * */ static void *start_func_3 (void *plist) { int *p1;

while (1) { errno = 0; p1 = (int *) g_list_get_bynum( plist, rand ()); ptres [2].nops++; if (errno == EBUSY) { ptres[2].nbusy++; }

if (p1 != NULL) { errno = 0; g_list_del (plist, p1); ptres [2].nops++; if (errno == EBUSY) { ptres [2].nbusy++; } free (p1); } (void) nanosleep (&nslp, NULL); }

return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * */ /* Начальный поток всех запустит, поспит, */ /* потом всех терминирует и выдаст статистику */ /* * * * * * * * * * * * * * * * * * * * * * */ int main (void) { g_listP plist; pthread_t pt1, pt2, pt3;

g_list_init (plist);

pthread_create (&pt1, NULL, start_func_1, plist); pthread_create (&pt2, NULL, start_func_2, plist); pthread_create (&pt3, NULL, start_func_3, plist);

sleep (10);

pthread_cancel (pt1); pthread_cancel (pt2); pthread_cancel (pt3);

pthread_join (pt1, NULL); pthread_join (pt2, NULL); pthread_join (pt3, NULL);

g_list_destroy (plist);

printf ("Попыток выполнить " "операцию со списком: %d\n", ptres [0].nops + ptres [1].nops + ptres [2].nops); printf ("Число попыток, неудачных" " из-за занятости " "списка: %d\n", ptres [0].nbusy + ptres [1].nbusy + ptres [2].nbusy);

return (0); }

Листинг 2.30. Пример программы, использующей функции для работы со списками в многопотоковой среде.

Попыток выполнить операцию со списком: 1503 Число попыток, неудачных из-за занятости списка: 0

Листинг 2.31. Возможные результаты работы программы, использующей функции для работы со списками в многопотоковой среде.

Можно видеть, что в данном случае оптимистичное применение блокировок чтение-запись без приостановки выполнения в случае невозможности немедленной установки себя полностью оправдало.


Мьютексы


Функции, обслуживающие мьютексы, можно разбить на следующие группы:

инициализация и разрушение мьютексов: pthread_mutex_init(), pthread_mutex_destroy() (см. листинг 2.1);

#include <pthread.h>

int pthread_mutex_init ( pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

int pthread_mutex_destroy ( pthread_mutex_t *mutex);

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

Листинг 2.1. Описание функций инициализации и разрушения мьютексов. (html, txt)

захват и освобождение мьютексов: pthread_mutex_lock(), pthread_mutex_trylock(), pthread_mutex_timedlock(), pthread_mutex_unlock() (см. листинги 2.2 и 2.3);

#include <pthread.h>

int pthread_mutex_lock ( pthread_mutex_t *mutex);

int pthread_mutex_trylock ( pthread_mutex_t *mutex);

int pthread_mutex_unlock ( pthread_mutex_t *mutex);

Листинг 2.2. Описание функций захвата и освобождения мьютексов. (html, txt)

#include <pthread.h> #include <time.h> int pthread_mutex_timedlock ( pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

Листинг 2.3. Описание функции захвата мьютексов с ограниченным ожиданием. (html, txt)

опрос и установка атрибутов мьютекса: pthread_mutex_getprioceiling(), pthread_mutex_setprioceiling() (см. листинг 2.4);

#include <pthread.h>

int pthread_mutex_getprioceiling ( const pthread_mutex_t *restrict mutex, int *restrict prioceiling);

int pthread_mutex_setprioceiling ( pthread_mutex_t *restrict mutex, int prioceiling, int *restrict old_ceiling);

Листинг 2.4. Описание функций опроса и установки атрибутов мьютекса. (html, txt)

инициализация и разрушение атрибутных объектов мьютексов: pthread_mutexattr_init(), pthread_mutexattr_destroy() (см. листинг 2.5);

#include <pthread.h>

int pthread_mutexattr_init ( pthread_mutexattr_t *attr);

int pthread_mutexattr_destroy ( pthread_mutexattr_t *attr);

Листинг 2.5. Описание функций инициализации и разрушения атрибутных объектов мьютексов. (html, txt)

опрос и установка атрибутов мьютекса в атрибутных объектах: pthread_mutexattr_gettype(), pthread_mutexattr_settype(), pthread_mutexattr_getpshared(), pthread_mutexattr_setpshared(), pthread_mutexattr_getprotocol(), pthread_mutexattr_setprotocol(), pthread_mutexattr_getprioceiling(), pthread_mutexattr_setprioceiling() (см.
листинг 2.6).

#include <pthread.h>

int pthread_mutexattr_gettype ( const pthread_mutexattr_t *restrict attr, int *restrict type);

int pthread_mutexattr_settype ( pthread_mutexattr_t *attr, int type);

int pthread_mutexattr_getpshared ( const pthread_mutexattr_t *restrict attr, int *restrict pshared);

int pthread_mutexattr_setpshared ( pthread_mutexattr_t *attr, int pshared);

int pthread_mutexattr_getprotocol ( const pthread_mutexattr_t *restrict attr, int *restrict protocol);

int pthread_mutexattr_setprotocol ( *attr, int protocol);

int pthread_mutexattr_getprioceiling ( const pthread_mutexattr_t *restrict attr, int *restrict prioceiling); int pthread_mutexattr_setprioceiling ( pthread_mutexattr_t *attr, int prioceiling);

Листинг 2.6. Описание функций опроса и установки атрибутов мьютекса в атрибутных объектах. (html, txt)



Мы не будем детально описывать каждую из перечисленных выше функций (хотя бы потому, что дисциплина работы с атрибутами и атрибутными объектами та же, что и для потоков управления), но коснемся лишь отдельных, специфических аспектов.

Сразу после инициализации функцией pthread_mutex_init() мьютекс, разумеется, оказывается свободным. Разрушить функцией pthread_mutex_destroy() можно только инициализированный, свободный мьютекс.

Для инициализации статически описанных мьютексов с подразумеваемыми значениями атрибутов целесообразно воспользоваться макросом PTHREAD_MUTEX_INITIALIZER. Эффект будет тем же, что и после вызова pthread_mutex_init() с пустым указателем в качестве значения аргумента attr, только без накладных расходов на проверку корректности атрибутного объекта.

В стандарте POSIX-2001 тип pthread_mutex_t трактуется как абстрактный, со скрытой структурой и даже без методов для присваивания и сравнения на равенство, а попытки обойти их отсутствие за счет применения операций с областями памяти, естественно, обречены на неудачу поскольку, согласно стандарту, для синхронизации должны использоваться сами объекты-мьютексы, а не их копии.Это "развязывает руки" операционной системе в использовании доступных аппаратных возможностей при реализации мьютексов, делая ее максимально эффективной.

У инициализированного мьютекса имеется четыре атрибута:

тип (обслуживается функциями pthread_mutexattr_gettype() и pthread_mutexattr_settype()); верхняя грань приоритетов выполнения (функции pthread_mutex_getprioceiling(), pthread_mutex_setprioceiling(), pthread_mutexattr_getprioceiling(), pthread_mutexattr_setprioceiling()); протокол (pthread_mutexattr_getprotocol(), pthread_mutexattr_setprotocol()); признак использования несколькими процессами (pthread_mutexattr_getpshared(), pthread_mutexattr_setpshared()).

Мы уже отмечали важность эффективной реализации для средств синхронизации потоков управления. Вероятно, по этой причине в число атрибутов не включен идентификатор потока-владельца, поскольку его нужно устанавливать и хранить.



У инициализированного мьютекса имеется четыре атрибута:

тип (обслуживается функциями pthread_mutexattr_gettype() и pthread_mutexattr_settype()); верхняя грань приоритетов выполнения (функции pthread_mutex_getprioceiling(), pthread_mutex_setprioceiling(), pthread_mutexattr_getprioceiling(), pthread_mutexattr_setprioceiling()); протокол (pthread_mutexattr_getprotocol(), pthread_mutexattr_setprotocol()); признак использования несколькими процессами (pthread_mutexattr_getpshared(), pthread_mutexattr_setpshared()).

Мы уже отмечали важность эффективной реализации для средств синхронизации потоков управления. Вероятно, по этой причине в число атрибутов не включен идентификатор потока-владельца, поскольку его нужно устанавливать и хранить.

В стандарте POSIX-2001 определены четыре типа мьютексов.

PTHREAD_MUTEX_NORMAL

Эффективный, но небезопасный тип. Не проверяется возможность возникновения тупиковой ситуации при захвате подобного мьютекса (например, когда поток управления попытается захватить уже принадлежащий ему мьютекс). Не фиксируются ошибки при освобождении чужого или свободного мьютекса.

PTHREAD_MUTEX_ERRORCHECK

Данный тип обеспечивает выявление ошибочных ситуаций. Упомянутые выше некорректные действия с мьютексами приведут к выдаче кода ошибки. Поскольку, как считается, мьютекс обычно доступен для захвата, а значения атрибутов проверяются лишь тогда, когда поток приходится блокировать, использование атрибутов по сути не влияет на эффективность. В частности, контроль ошибок несколько замедляет нормальное функционирование только при освобождении мьютекса.

PTHREAD_MUTEX_RECURSIVE

Мьютекс со счетчиком и выявлением ошибочных ситуаций. Поток управления может многократно захватить мьютекс, но затем должен столько же раз освободить его.

PTHREAD_MUTEX_DEFAULT

Подразумеваемое значение атрибута "тип". Некорректные действия приводят к неопределенному эффекту. Реализация может отождествить этот тип с одним из вышеописанных.

Атрибут "верхняя грань приоритетов выполнения" определяет минимальный приоритет, с которым будет выполняться критический интервал, охраняемый мьютексом.


Чтобы избежать инверсии приоритетов, значение этого атрибута следует сделать не меньшим, чем максимальный из приоритетов потоков, могущих захватить данный мьютекс (отсюда и название атрибута). Диапазон допустимых значений атрибута тот же, что и для приоритетов политики планирования SCHED_FIFO.

Атрибут "протокол" влияет на планирование потока управления во время владения мьютексом. Согласно стандарту, возможных протоколов три.

PTHREAD_PRIO_NONE

Владение мьютексом не влияет на приоритет и планирование потока управления.

PTHREAD_PRIO_INHERIT

Если поток управления, захватив мьютексы с данным протоколом, блокирует более приоритетные потоки, ему присваивается максимальный из приоритетов ждущих потоков. Если во время владения появляются новые ждущие более приоритетные потоки, приоритет владельца должен быть соответственно повышен. Если затем владелец будет блокирован на другом мьютексе с данным протоколом, повышение приоритетов должно быть рекурсивно распространено.

PTHREAD_PRIO_PROTECT

Если поток управления владеет мьютексами данного типа, он выполняется с приоритетом, являющимся максимумом из верхних граней приоритетов выполнения этих мьютексов, независимо от того, ждут ли какие-либо другие потоки их освобождения.

В стандарте оговаривается, что при изменении приоритета, вызванном операциями с мьютексами, подчиняющимися протоколам PTHREAD_PRIO_INHERIT или PTHREAD_PRIO_PROTECT, поток управления не должен перемещаться в хвост очереди планирования.

Если поток управления владеет несколькими мьютексами с разными протоколами, ему назначается приоритет, максимальный из предписанных каждым из протоколов.

Признак использования несколькими процессами мьютекса, в соответствии со стандартом POSIX-2001, может принимать два значения.

PTHREAD_PROCESS_PRIVATE

Мьютекс доступен только для потоков управления, выполняющихся в рамках того же процесса, что и поток, инициализировавший мьютекс. Это значение атрибута является подразумеваемым.

PTHREAD_PROCESS_SHARED

С мьютексом могут работать все потоки управления, имеющие доступ к памяти, в которой мьютекс расположен, даже если эта память разделяется несколькими процессами.



На попытки захвата мьютекса, осуществляемые с применением функций pthread_mutex_lock(), pthread_mutex_trylock() или pthread_mutex_timedlock() значения атрибутов влияют так, как описано выше.

Как правило, поток, вызвавший pthread_mutex_lock(), блокируется, если мьютекс уже захвачен, и ждет его освобождения. В аналогичной ситуации функция pthread_mutex_trylock() немедленно завершается, возвращая код ошибки, а функция pthread_mutex_timedlock() блокируется, пока либо мьютекс не освободится, либо не наступит заданный аргументом abstime момент времени, отсчитываемого по часам CLOCK_REALTIME.

Обычно функцию pthread_mutex_timedlock() вызывают лишь после того, как с помощью pthread_mutex_trylock() выяснили, что мьютекс захвачен. Ограничение времени ожидания делает программу устойчивой к ошибкам, препятствующим освобождению мьютексов, хотя, если считать, что охраняемый критический интервал невелик, данное свойство не имеет особого значения.

В соответствии с общим подходом, если во время ожидания освобождения мьютекса потоку доставляется сигнал, после выхода из функции обработки сигнала оно возобновляется, как если бы и не прерывалось.

Функция pthread_mutex_unlock() обычно освобождает мьютекс. Для мьютексов типа PTHREAD_MUTEX_RECURSIVE вызов pthread_mutex_unlock(), строго говоря, приводит лишь к уменьшению счетчика на единицу, а освобождение происходит только тогда, когда счетчик становится нулевым. Кому достанется мьютекс после освобождения, зависит от политики планирования.

В качестве примера использования мьютексов приведем упрощенную реализацию динамического выделения памяти в многопотоковой среде. На листинге 2.7 показан заголовочный файл, на листинге 2.8 – исходный текст функций выделения и освобождения памяти.

#ifndef g_MALLOC #define g_MALLOC

/* Количество размеров (в словах типа size_t), */ /* для которых поддерживаются */ /* разные списки памяти */ #define DIF_SIZES 8

/* Размер пула памяти */ #define POOL_SIZE 65536

/* Указатель на кусок памяти нулевого размера */ #define g_NULL ((void *) (-1))



/* Первое поле следующей структуры нужно /* для всех кусков памяти, а второе – */ /* только для провязки свободных.*/ /* При отведении памяти адрес второго */ */ поля выдается как результат */ typedef struct listi { size_t length; struct listi *pnext; } *list_of_mem;

extern void *g_malloc (size_t); extern void g_free (void *);

#endif

Листинг 2.7. Заголовочный файл g_malloc.h для функций выделения и освобождения памяти в многопотоковой среде.

/* * * * * * * * * * * * * * * * * * * * * */ /* Функции выделения и освобождения памяти */ /* * * * * * * * * * * * * * * * * * * * * */

#include <pthread.h> #include <errno.h> #include <assert.h> #include <string.h> #include "g_malloc.h"

static char mem_pool [POOL_SIZE] = {0, }; /* Размер занятой части пула памяти */ /* Списки свободного пространства */ /* (по одному на каждый размер */ /* от 1 до DIF_SIZES) */ static size_t cur_pool_size = 0;

static list_of_mem short_lists [DIF_SIZES] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; /* Список больших */ /* свободных кусков (превосходящих DIF_SIZES) */ /* Разные мьютексы для разных */ /* групп списков свободного пространства */ static list_of_mem big_list = NULL;

static pthread_mutex_t short_lists_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t big_list_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER;

/* * * * * * * * * * */ /* Выделение памяти */ /* * * * * * * * * * */ void *g_malloc (size_t size) { /* Указатель для хождения по списку */ /* больших свободных кусков */ list_of_mem *p; /* Указатель для хождения по спискам */ /* свободных кусков */ list_of_mem pt; /* Индекс в массиве short_lists */ size_t ls; size_t ts; /* Временная переменная */

if (size == 0) { return (g_NULL); /* Важно, чтобы результат был */ /* отличен от NULL, поскольку NULL */ /* – признак ненормального завершения */ }

/* Округлим запрошенный размер вверх */ /* до кратного размеру size_t и */ /* прибавим слово служебной информации */ size = (size – 1 + 2 * sizeof (size_t)) & ~(sizeof (size_t) – 1);



/* Вычислим индекс в массиве */ /* short_lists [], соответствующий */ /* запрошенному размеру */ ls = size / sizeof (size_t) – 2;

if (ls < DIF_SIZES) { /* Попробуем выдать кусок */ /* из списка коротких */ assert ( pthread_mutex_lock( &short_lists_mutex) == 0); if ((pt = short_lists [ls]) != NULL) { /* Есть нужный кусок */ short_lists [ls] = (short_lists [ls])->pnext; assert (pthread_mutex_unlock( &short_lists_mutex) == 0); return (&pt->pnext); } assert (pthread_mutex_unlock( &short_lists_mutex) == 0); }

/* Попробуем выдать кусок из */ /* списка больших */ assert (pthread_mutex_lock( &big_list_mutex) == 0); for (p = &big_list, pt = *p; pt != NULL; p = &pt->pnext, pt = *p) { if ((signed long) (ts = pt->length – size) >= 0) { /* Нашли подходящий кусок */ if (ts < sizeof (*pt)) { /* Придется выдать кусок целиком – */ /* в остатке не помещается */ /* служебная информация */ *p = pt->pnext; } else { /* Отрежем сколько надо и, */ /* при необходимости, */ /* перецепим остаток в */ /* список коротких */ if ((ls = (pt->length = ts) / sizeof (size_t) – 2) < DIF_SIZES) { *p = pt->pnext; assert (pthread_mutex_lock( &short_lists_mutex) == 0); pt->pnext = short_lists [ls]; short_lists [ls] = pt; assert (pthread_mutex_unlock( &short_lists_mutex) == 0); } pt = (list_of_mem) ((char *) pt + ts); pt->length = size; } assert ( pthread_mutex_unlock( &big_list_mutex) == 0); return (&pt->pnext); } } /* for */ assert (pthread_mutex_unlock ( &big_list_mutex) == 0);

/* Кусок из большого списка */ /* выдать не удалось. */ /* Попробуем взять прямо из */ /* пула памяти */ assert (pthread_mutex_lock( &pool_mutex) == 0); if (cur_pool_size + size <= POOL_SIZE) { pt = (list_of_mem) (mem_pool + cur_pool_size); pt->length = size; cur_pool_size += size; assert (pthread_mutex_unlock ( &pool_mutex) == 0); return (&pt->pnext); } assert (pthread_mutex_unlock( &pool_mutex) == 0);

/* Неудача при выделении памяти */ errno = ENOMEM; return (NULL); }



/* * * * * * * * * * * * * * * * * * */ /* Возврат ранее запрошенной памяти */ /* * * * * * * * * * * * * * * * * * */ void g_free (void *p) { list_of_mem pt; size_t size, ls;

if ((p == g_NULL) || (p == NULL)) { return; }

/* Установим указатель на */ /* служебную информацию */ pt = (list_of_mem) ((char *) p – sizeof (size_t)); size = pt->length; ls = size / sizeof (size_t) – 2; memset (p, 0, size – sizeof (size_t));

/* Не из конца ли пула этот кусок? */ assert (pthread_mutex_lock( &pool_mutex) == 0); if (((char *) pt + size) == (mem_pool + cur_pool_size)) { pt->length = 0; cur_pool_size -= size; assert (pthread_mutex_unlock( &pool_mutex) == 0); return; } assert (pthread_mutex_unlock( &pool_mutex) == 0);

/* Добавим освободившийся кусок */ /* к одному из списков */ if (ls < DIF_SIZES) { assert (pthread_mutex_lock( &short_lists_mutex) == 0); pt->pnext = short_lists [ls]; short_lists [ls] = pt; assert (pthread_mutex_unlock( &short_lists_mutex) == 0); } else { /* Добавим к большому списку */ assert (pthread_mutex_lock( &big_list_mutex) == 0); pt->pnext = big_list; big_list = pt; assert (pthread_mutex_unlock( &big_list_mutex) == 0); } }

Листинг 2.8. Исходный текст функций выделения и освобождения памяти в многопотоковой среде.

Основная идея реализации состоит в том, что, помимо начального пула, поддерживаются списки свободных кусков памяти – отдельные списки для небольших размеров (чтобы ускорить их отведение и возврат и уменьшить фрагментацию памяти) и общий список для прочих (больших) кусков. Из общего списка выбирается первый подходящий элемент.

С каждым из трех источников выделения памяти (пул, списки коротких, список больших кусков) ассоциирован свой мьютекс. Цель подобного разделения – минимизировать время владения мьютексом. Только при операциях с общим списком оно может быть большим.

Для библиотечных функций удобны предоставляемые стандартом POSIX-2001 средства инициализации статически описанных мьютексов.

На листинге 2.9 показана тестовая многопотоковая программа, а на листинге 2.10 – возможные результаты ее работы.


Видно, что выполнение потоков управления чередуется, и потоки влияют друг на друга.

#include <unistd.h> #include <stdio.h> #include <pthread.h> #include "g_malloc.h"

static void *start_func (void *dummy) { void *p1, *p2, *p3;

printf ("g_malloc (65000): %p\n", p1 = g_malloc (65000)); sleep (1); printf ("g_malloc (1): %p\n", p2 = g_malloc (1)); sleep (1); g_free (p1); sleep (1); g_free (p2); sleep (1); printf ("g_malloc (64990): %p\n", p1 = g_malloc (64990)); sleep (1); printf ("g_malloc (1): %p\n", p2 = g_malloc (1)); sleep (1); printf ("g_malloc (5): %p\n", p3 = g_malloc (5)); sleep (1); g_free (p1); sleep (1); g_free (p2); sleep (1); g_free (p3); sleep (1); printf ("g_malloc (100000): %p\n", p3 = g_malloc (100000));

return (NULL); }

int main (void) { pthread_t pt1, pt2;

pthread_create (&pt1, NULL, start_func, NULL); pthread_create (&pt2, NULL, start_func, NULL);

pthread_join (pt1, NULL); pthread_join (pt2, NULL);

return (0); }

Листинг 2.9. Пример программы, использующей функции выделения и освобождения памяти в многопотоковой среде.

g_malloc (65000): 0x8049024 g_malloc (65000): (nil) g_malloc (1): 0x8058e10 g_malloc (1): 0x8058e18 g_malloc (64990): 0x804902c g_malloc (64990): (nil) g_malloc (1): 0x8049024 g_malloc (1): 0x8058e10 g_malloc (5): 0x8058e18 g_malloc (5): 0x8058e24 g_malloc (100000): (nil) g_malloc (100000): (nil)

Листинг 2.10. Возможные результаты работы программы, использующей функции выделения и освобождения памяти в многопотоковой среде.

Мьютексы – весьма подходящее средство для реализации обеда философов. Ниже приведена программа, написанная С.В. Самборским (см. листинг 2.11).

/* Обедающие философы. Многопотоковая реализация с помощью мьютексов. Запуск: mudrecMutex [-a | -p | -I] [-t число_секунд] имя_философа ...

Опции: -t число_секунд – сколько секунд моделируется

Стратегии захвата вилок: -a – сначала захватывается вилка с меньшим номером; -p – сначала захватывается нечетная вилка; -I – некорректная (но эффективная) интеллигентная стратегия: во время ожидания уже захваченная вилка кладется.



Пример запуска: mudrecMutex -p -t 300 A B C D E F G\ H I J K L M N\ O P Q R S T U\ V W X Y Z */

static char rcsid[] __attribute__((unused)) = \ "$Id: mudrecMutex.c,v 1.14 2003/11/20 16:09:20" "sambor Exp $";

#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <signal.h> #include <string.h> #include <fcntl.h> #include <limits.h> #include <errno.h>

#define max(a,b) ((a)>(b)?(a):(b)) #define min(a,b) ((a)>(b)?(b):(a))

struct mudrec { char *name; int left_fork, right_fork; int eat_time, wait_time, think_time, max_wait_time; int count; pthread_t thread; int private_pFdIn; } *kafedra;

/* Глобальные счетчики и логические переменные */ int Stop = 0; /* Признак конца обеда */

/* Различные дескрипторы */ int protokol [2] = {-1, -1}; #define pFdIn (protokol [1]) #define pFdOut (protokol [0])

/* Массив мьютексов для синхронизации доступа */ /* к вилкам */ pthread_mutex_t *mtxFork;

/* Разные алгоритмы захвата вилок */ static void get_forks_simple ( struct mudrec *this); static void get_forks_odd ( struct mudrec *this); static void get_forks_maybe_infinit_time( struct mudrec *this);

/* Используемый метод захвата вилок */ void (*get_forks) (struct mudrec *this) = get_forks_simple;

/* Возвращение вилок */ static void put_forks (struct mudrec *this);

/*Потоки-философы */ void *filosof (void *arg) { struct mudrec *this = arg; char buffer [LINE_MAX]; int bytes; int private_pFdIn = this->private_pFdIn;

while (!Stop) { /* Пора подкрепиться */ { int wait_time, tm = time (NULL);

bytes = write ( private_pFdIn, buffer, strlen (buffer));

(*get_forks) (this);

wait_time = time (NULL) – tm; this->wait_time += wait_time; this->max_wait_time = max (wait_time, this->max_wait_time);

sprintf( buffer, "%s: ждал вилок %d сек\n", this->name, wait_time); bytes = write (private_pFdIn, buffer, strlen (buffer)); }

/* Может, обед уже закончился? */ if (Stop) { put_forks (this); break; }



/* Ест */ { int eat_time = rand () % 20 + 1;

sleep (eat_time);

this->eat_time += eat_time; this->count++; sprintf (buffer, "%s: ел %d сек\n", this->name, eat_time); bytes = write ( private_pFdIn, buffer, strlen (buffer)); }

/* Отдает вилки */ put_forks (this);

if (Stop) break;

/* Размышляет */ { int think_time = rand () % 10 + 1;

sleep (think_time); this->think_time += think_time; } } /* while (!Stop) */

sprintf (buffer,"%s: уходит\n", this->name); bytes = write (private_pFdIn, buffer, strlen (buffer));

close (private_pFdIn);

return (NULL); } /* Поток-философ */

/* Кладет вилки одну за другой */ static void put_forks (struct mudrec *this) { pthread_mutex_unlock ( &mtxFork [this->left_fork – 1]); pthread_mutex_unlock ( &mtxFork [this->right_fork – 1]); }

/* Берет вилки по очереди в порядке номеров */ static void get_forks_simple( struct mudrec *this) { int first = min ( this->left_fork, this->right_fork); int last = max ( this->left_fork, this->right_fork); pthread_mutex_lock ( &mtxFork [first – 1]); pthread_mutex_lock ( &mtxFork [last – 1]); }

/* Берем сначала нечетную вилку */ /* (если обе нечетные – */ /* то с большим номером) */ static void get_forks_odd (struct mudrec *this) { int left = this->left_fork, right = this->right_fork;

int first; int last;

if ((left & 1) > (right & 1)) { first = left; last = right; } else if ((left & 1) < (right & 1)) { first = right; last = left; } else { first = max (left, right); last = min (left, right); }

pthread_mutex_lock ( &mtxFork [first – 1]); pthread_mutex_lock ( &mtxFork [last – 1]); }

/* Берем вилки по очереди, в */ /* произвольном порядке. */ /* Но если вторая вилка не */ /* берется сразу, то кладем */ /* первую. */ /* То есть философ не расходует */ /* вилочное время впустую. */ static void get_forks_maybe_infinit_time (struct mudrec *this) { int left = this->left_fork, right = this->right_fork;



for (;;) { pthread_mutex_lock ( &mtxFork [left – 1]); if (0 == pthread_mutex_trylock (&mtxFork [right – 1])) return; pthread_mutex_unlock ( &mtxFork [left – 1]); pthread_mutex_lock ( &mtxFork [right – 1]); if (0 == pthread_mutex_trylock (&mtxFork [left – 1])) return; pthread_mutex_unlock ( &mtxFork [right – 1]); } }

/* Мелкие служебные функции */ static void stop (int dummy) { Stop = 1; }

static void usage (char name []) { fprintf (stderr, "Использование: %s" " [-a | -p | -I] " "[-t число_секунд] " "имя_философа ...\n", name); exit (1); }

/* Точка входа демонстрационной программы */ int main (int argc, char *argv []) { char buffer [LINE_MAX], *p; int i, n, c; int open_room_time = 300; int nMudr; struct sigaction sact;

while ((c = getopt (argc, argv, "apIt:")) != -1) { (c) { case 'a': get_forks = get_forks_simple; break; case 'p': get_forks = get_forks_odd; break; case 'I': get_forks = get_forks_maybe_infinit_time; break; case 't': open_room_time = strtol( optarg, &p, 0); if (optarg [0] == 0 || *p != 0) usage (argv [0]); break; default : usage (argv [0]); } }

nMudr = argc – optind; /* Меньше двух */ /* философов неинтересно ... */ if (nMudr < 2) usage (argv [0]);

/* Создание канала для протокола */ /* обработки событий */ pipe (protokol); kafedra = calloc (sizeof (struct mudrec), nMudr);

/* Зачисление на кафедру */ for (i = 0; i < nMudr; i++, optind++) { kafedra [i].name = argv [optind]; /* Выдадим телефон */ kafedra [i].private_pFdIn = fcntl (pFdIn, F_DUPFD, 0);

/* Укажем новичку, */ /* какими вилками */ /* пользоваться */ kafedra [i].left_fork = i + 1; kafedra [i].right_fork = i + 2; } /* Последний */ /* пользуется вилкой первого */ kafedra [nMudr – 1].right_fork = 1;

/* Зададим реакцию на сигналы */ /* и установим будильник */ /* на конец обеда */ sact.sa_handler = stop; (void) sigemptyset (&sact.sa_mask); sact.sa_flags = 0; (void) sigaction (SIGINT, &sact, (struct sigaction *) NULL); (void) sigaction (SIGALRM, &sact, (struct sigaction *) NULL); alarm (open_room_time);



/* Создадим мьютексы для охраны вилок */ mtxFork = calloc ( sizeof (pthread_mutex_t), nMudr); for (i = 0; i < nMudr; i++) { pthread_mutex_init ( &mtxFork [i], NULL); }

/* Философы входят в столовую */ for (i = 0; i < nMudr; i++) pthread_create ( &kafedra [i].thread, NULL, &filosof, (void *) &kafedra [i]);

/* Выдача сообщений на стандартный */ /* вывод и выход */ /* после окончания всех задач */ close (pFdIn); while (1) { n = read (pFdOut, buffer, LINE_MAX); if (n == 0 || (n < 0 && errno != EINTR)) break; for (i = 0; i < n; i++) putchar (buffer [i]); } close (pFdOut);

/* Уничтожение мьютексов */ for (i = 0; i < nMudr; i++) { pthread_mutex_destroy ( &mtxFork [i]); }

/* Выдача сводной информации */ { int full_eating_time = 0; int full_waiting_time = 0; int full_thinking_time = 0; for (i = 1; i <= nMudr; i++) { struct mudrec *this = &kafedra [i – 1];

full_eating_time += this->eat_time; full_waiting_time += this->wait_time; full_thinking_time += this->think_time;

if (this->count > 0) { float count = this->count; float think_time = this->think_time / count; float eat_time = this->eat_time / count; float wait_time = this->wait_time / count;

printf ( "%s: ел %d раз в " "среднем: думал=%.1f " "ел=%.1f ждал=%.1f " "(максимум %d)\n", this->name, this->count, think_time,eat_time, wait_time, this->max_wait_time); } else printf ("%s: не поел\n", this->name); } /* for */

{ float total_time = ( full_eating_time + full_waiting_time + full_thinking_time) / (float) nMudr; printf("Среднее число одновременно" " едящих = %.3f\n", full_eating_time / total_time); printf("Среднее число одновременно" " ждущих = %.3f\n", full_waiting_time / total_time); } } /* Выдача сводной информации */

free (mtxFork); free (kafedra);

/* Сообщим об окончании работы. */ printf ("Конец обеда\n");

return 0; }

Листинг 2.11.Многопотоковый вариант решения задачи об обедающих философах с использованием мьютексов.


Особенности синхронизации потоков управления


По сравнению с процессами, потоки управления характеризуются двумя особенностями:

на порядок меньшими накладными расходами на обслуживание;существенно более тесным взаимодействием в общем адресном пространстве.

Очевидно, чтобы быть практически полезными, средства синхронизации, специально ориентированные на потоки управления, должны быть оптимизированы с учетом обеих отмеченных особенностей.

К числу таких средств, присутствующих в стандарте POSIX-2001, принадлежат мьютексы, переменные условия, блокировки чтение-запись, спин-блокировки и барьеры.

Мьютекс – это синхронизирующий объект, использование которого позволяет множеству потоков управления упорядочить доступ к разделяемым данным. Название этого средства синхронизации отражает его функциональность – взаимное исключение (mutual-exclusion). Поток захватывает мьютекс в монопольное владение и остается владельцем, пока сам же его не освободит.

Переменная условия в качестве синхронизирующего объекта дает потокам управления возможность многократно приостанавливать выполнение, пока некий ассоциированный предикат (условие) не станет истинным. Говорят, что поток, выполнение которого приостановлено на переменной условия, блокирован на этой переменной.

Блокировки чтение-запись (много читателей или один писатель) в каждый момент времени позволяют нескольким потокам управления одновременно иметь к данным доступ на чтение или только одному потоку – доступ на запись. Естественно, подобные блокировки обычно применяют для защиты данных, которые читаются чаще, чем изменяются.

Спин-блокировки представляют собой низкоуровневое средство синхронизации. Как и мьютексы, они предназначены для упорядочения доступа множества потоков управления к разделяемым данным.

Барьеры предназначены для синхронизации множества потоков управления в определенной точке их выполнения.

Средства синхронизации могут использоваться для достижения двух существенно разных целей:

захват (как правило, на короткое время) разделяемого объекта для защиты критического интервала; ожидание (долгое или даже потенциально неограниченное) наступления некоторого события, выполнения некоторого условия.

Мьютексы и блокировки можно отнести к первой из выделенных категорий, переменные условия и барьеры – ко второй.

Вообще говоря, средства синхронизации потоков управления можно использовать не только в рамках одного процесса, но и среди множества процессов, расположенных в разделяемой, доступной на запись памяти, соответствующим образом инициализированной.

Со средствами синхронизации потоков управления ассоциируются атрибуты и атрибутные объекты, которые можно опрашивать и/или изменять аналогично тому, как это делается для самих потоков.



Переменные условия


Пусть имеется некоторый предикат (условие), зависящий от значений переменных, разделяемых несколькими потоками управления. Совместное использование мьютексов, переменных условия и обслуживающих их функций позволяет организовать экономное ожидание состояния истинности этого предиката.

Общая схема применения переменных условия выглядит следующим образом. С разделяемыми переменными, фигурирующими в предикате, ассоциируется мьютекс, который необходимо захватить перед началом проверок. Затем поток управления входит в цикл вида

while (! предикат) { Ожидание на переменной условия с освобождением мьютекса. После успешного завершения ожидания поток вновь оказывается владельцем мьютекса. }

После нормального выхода из цикла проверяемое условие истинно; можно выполнить требуемые действия и освободить мьютекс.

Разблокирование потоков управления, ожидающих на переменной условия, должен обеспечить другой поток, изменивший значения разделяемых переменных и отправивший ждущим соответствующее уведомление. Вообще говоря, не гарантируется, что в момент разблокирования проверяемое условие истинно, поэтому ожидание и следует обертывать в цикл, делая его потенциально многократным.

По логике применения переменные условия напоминают семафоры: одни потоки ждут их перехода в некоторое состояние, другие своими действиями должны этот переход обеспечить и, тем самым, разблокировать ждущих. Есть, однако, и принципиальное отличие. Содержательный предикат, истинность которого является целью ожидания, ассоциируется с семафором неявно и дополнительно не проверяется; считается, что прекращение ожидания само по себе является свидетельством истинности. Это значит, что ответственность за корректность синхронизирующих действий разделяется между оперирующими с семафором потоками управления со всеми вытекающими отсюда последствиями. Для переменных условия прекращение ожидания гарантий истинности предиката не дает, его приходится в явном виде записывать в заголовке цикла, что делает программу устойчивее и упрощает анализ ее корректности.


Отметим также, что все действия с разделяемыми переменными, которые производятся при поддержке переменных условия, осуществляются при захваченном мьютексе и, тем самым, оказываются потоково-безопасными. Если же попытаться дополнить содержательными проверками операции с семафорами, о синхронизации доступа к разделяемым переменным придется позаботиться отдельно.

Переходя к описанию функций, предлагаемых стандартом POSIX-2001 для обслуживания переменных условия, укажем, что их можно разделить на следующие группы:

инициализация и разрушение переменных условия: pthread_cond_init(), pthread_cond_destroy() (см. листинг 2.12);

#include <pthread.h>

int pthread_cond_init ( pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

int pthread_cond_destroy ( pthread_cond_t *cond);

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

Листинг 2.12. Описание функций инициализации и разрушения переменных условия. (html, txt)

блокирование (ожидание) на переменной условия: pthread_cond_wait(), pthread_cond_timedwait() (см. листинг 2.13);

#include <pthread.h>

int pthread_cond_wait ( pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

int pthread_cond_timedwait ( pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, struct timespec *restrict abstime);

Листинг 2.13. Описание функций блокирования на переменной условия. (html, txt)

разблокирование (прекращение ожидания) потоков управления, блокированных на переменной условия: pthread_cond_broadcast(), pthread_cond_signal() (см. листинг 2.14);

#include <pthread.h>

int pthread_cond_broadcast ( pthread_cond_t *cond);

int pthread_cond_signal ( pthread_cond_t *cond);

Листинг 2.14. Описание функций разблокирования потоков управления, блокированных на переменной условия. (html, txt)

инициализация и разрушение атрибутных объектов переменных условия: pthread_condattr_init(), pthread_condattr_destroy() (см. листинг 2.15);

#include <pthread.h>

int pthread_condattr_init ( pthread_condattr_t *attr);



int pthread_condattr_destroy ( pthread_condattr_t *attr);

Листинг 2.15. Описание функций инициализации и разрушения атрибутных объектов переменных условия. (html, txt)

опрос и установка атрибутов переменных условия в атрибутных объектах: признака использования несколькими процессами (обслуживается функциями pthread_condattr_getpshared(), pthread_condattr_setpshared()) и идентификатора часов реального времени, используемых для ограничения ожидания на переменной условия (функции pthread_condattr_getclock(), pthread_condattr_setclock()) (см. листинг 2.16).

#include <pthread.h>

int pthread_condattr_getpshared ( const pthread_condattr_t *restrict attr, int *restrict pshared);

int pthread_condattr_setpshared ( pthread_condattr_t *attr, int pshared);

int pthread_condattr_getclock ( const pthread_condattr_t *restrict attr, clockid_t *restrict clock_id);

int pthread_condattr_setclock ( pthread_condattr_t *attr, clockid_t clock_id);

Листинг 2.16. Описание функций опроса и установки атрибутов переменных условия в атрибутных объектах. (html, txt)





Для функции pthread_cond_timedwait() к приведенному перечню добавляется еще и срабатывание контроля по времени. Отметим, впрочем, что и в этом случае мьютекс передается во владение вызывающему потоку управления, а предикат может оказаться истинным (пока срабатывал контроль по времени, другой поток изменил значения разделяемых переменных).

По целому ряду причин контроль по времени целесообразно оформлять с помощью абсолютных, а не относительных значений. Вероятно, главная из них состоит в том, что целью является не успешный возврат из функции pthread_cond_timedwait(), а истинность предиката, то есть завершение цикла, показанного на листинге 2.17.

rc = 0; while (! predicate && rc == 0) { rc = pthread_cond_timedwait ( &cond, &mutex, &ts); }

Листинг 2.17. Типичный цикл ожидания на переменной условия с контролем по времени.

Если бы аргумент ts задавался как интервал, его пришлось бы перевычислять в цикле; кроме того, контроль по времени оказался бы смазанным непредсказуемыми задержками в выполнении потока управления.

Ожидание на переменной условия является точкой терминирования потока управления. Перед вызовом первого обработчика завершения поток вновь становится владельцем мьютекса, как если бы ожидание завершилось обычным образом, но вместо возврата из функции началось выполнение действий по терминированию, причем в том же состоянии, в каком выполняется код в окрестности обращения к функции. Это делает безопасным доступ обработчиков завершения к разделяемым переменным и вообще упрощает их написание, что способствует уменьшению числа возможных ошибок.

Функция pthread_cond_broadcast() разблокирует все потоки управления, ждущие на переменной условия (это полезно, например, когда писатель уступает место читателям), а функция pthread_cond_signal() (не генерирующая, вопреки названию, никаких сигналов) – по крайней мере один из них (если таковые вообще имеются). Порядок разблокирования определяется политикой планирования; борьба за владение мьютексом, заданным в начале ожидания, протекает так же, как и при одновременном вызове pthread_mutex_lock (mutex).



Вообще говоря, поток управления, вызвавший pthread_cond_broadcast() или pthread_cond_signal(), не обязан быть владельцем упомянутого мьютекса; тем не менее, если требуется предсказуемость решений планировщика, это условие должно быть выполнено.

Реализация должна гарантировать, что поток управления, ожидавший на переменной условия и разблокированный в результате получения заказа на терминирование, не "потребит" разрешение на разблокирование, сгенерированное функцией pthread_cond_signal(), если имеются другие потоки, ожидающие на той же переменной условия. Без подобной гарантии конкуренция между терминированием и разблокированием может завершиться тупиковой ситуацией.

Не рекомендуется вызывать pthread_cond_signal() (а также освобождать мьютексы) в (асинхронно выполняемых) функциях обработки сигналов.

В качестве примера использования переменных условия и мьютексов приведем прямолинейное, но вполне практичное многопотоковое решение задачи об обедающих философах (см. листинг 2.18).

/* * * * * * * * * * * * * * * * * */ /* Многопотоковый вариант обеда */ /* философов с использованием */ /* мьютексов и переменных условия */ /* * * * * * * * * * * * * * * * * */

#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <errno.h>

/* Число обедающих философов */ #define QPH 5

/* Время (в секундах) на обед */ #define FO 15

/* Длительность еды */ #define ernd (rand () % 3 + 1)

/* Длительность разговора */ #define trnd (rand () % 5 + 1)

/* Состояние вилок */ static int fork_busy [QPH] = {0, };

/* Синхронизирующая переменная условия */ static pthread_cond_t forks_cond = PTHREAD_COND_INITIALIZER;

/* Синхронизирующий мьютекс */ static pthread_mutex_t forks_mutex = PTHREAD_MUTEX_INITIALIZER;

/* * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока-философа. */ /* Аргумент – номер философа */ /* * * * * * * * * * * * * * * * * * */ void *start_phil (void *no) { /* Время до конца обеда */ int fo; /* Время очередного отрезка */ /* еды или беседы */ int t;



fo = FO; while (fo > 0) { /* Обед */

/* Философ говорит */ printf ("Философ %d беседует\n", (int) no); t = trnd; sleep (t); fo -= t;

/* Пытается взять вилки */ (void) pthread_mutex_lock ( &forks_mutex); while (fork_busy [(int) no – 1] || fork_busy [(int) no % QPH] ) { (void) pthread_cond_wait ( &forks_cond, &forks_mutex); } fork_busy [(int) no – 1] = fork_busy [(int) no % QPH] = 1; (void) pthread_mutex_unlock ( &forks_mutex);

/* Ест */ printf ("Философ %d ест\n", (int) no); t = ernd; sleep (t); fo -= t;

/* Отдает вилки */ (void) pthread_mutex_lock ( &forks_mutex); fork_busy [(int) no – 1] = fork_busy [(int) no % QPH] = 0; (void) pthread_cond_broadcast (&forks_cond); (void) pthread_mutex_unlock (&forks_mutex); } /* while */

printf ("Философ %d закончил обед\n", (int) no); return (NULL); }

/* * * * * * * * * * * * * * * */ /* Создание потоков-философов */ /* и ожидание их завершения */ /* * * * * * * * * * * * * * * */ int main (void) { /* Массив идентификаторов */ /* потоков-философов */ pthread_t pt_id [QPH]; /* Номер философа */ int no; /* Атрибутный объект для */ /* создания потоков */ pthread_attr_t attr_obj;

if ((errno = pthread_attr_init( &attr_obj)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); }

/* В очередь, господа */ /* философы, в очередь! */ if ((errno = pthread_attr_setschedpolicy( &attr_obj, SCHED_FIFO)) != 0) { perror ( "PTHREAD_ATTR_SETSCHEDPOLICY"); return (errno); }

/* Все – к столу */ for (no = 1; no <= QPH; no++) { if ((errno = pthread_create ( &pt_id [no – 1], &attr_obj, start_phil, (void *) no)) != 0) { perror ( "PTHREAD_CREATE"); return (no); } }

/* Ожидание завершения обеда */ for (no = 1; no <= QPH; no++) { (void) pthread_join ( pt_id [no – 1], NULL); }

return 0; }

Листинг 2.18. Пример многопотоковой реализации обеда философов с использованием мьютексов и переменных условия.

Результаты работы этой программы могут выглядеть так, как показано на листинге 2.19.



Философ 1 беседует Философ 2 беседует Философ 3 беседует Философ 4 беседует Философ 5 беседует Философ 4 ест Философ 2 ест Философ 4 беседует Философ 5 ест Философ 2 беседует Философ 3 ест Философ 5 беседует Философ 1 ест Философ 3 беседует Философ 4 ест Философ 1 беседует Философ 2 ест Философ 2 беседует Философ 1 ест Философ 4 беседует Философ 3 ест Философ 1 беседует Философ 5 ест Философ 5 беседует Философ 3 беседует Философ 4 ест Философ 2 ест Философ 4 беседует Философ 2 беседует Философ 1 ест Философ 3 ест Философ 1 закончил обед Философ 5 ест Философ 3 закончил обед Философ 2 ест Философ 5 закончил обед Философ 4 ест Философ 2 закончил обед Философ 4 закончил обед

Листинг 2.19. Возможные результаты работы многопотоковой программы, моделирующей обед философов с использованием мьютексов и переменных условия.

Несмотря на то, что в приведенной программе используется глобальный (общий для всех философов и вилок) мьютекс, это не приведет к лишним задержкам, так как мьютекс служит не для захвата вилок, а лишь для проверки и изменения их состояния. Практически всегда он будет свободен.

В употреблении глобальной (в том же смысле, что и мьютекс) переменной условия и всеобщем разблокировании ждущих (посредством вызова pthread_cond_broadcast()) при освобождении любой пары вилок также нет ничего плохого. Ресурсов это практически не отнимает, а пока до разбуженного философа дойдет очередь, нужные ему вилки на самом деле могут оказаться свободными, даже если ожидание на переменной условия было прервано не из-за них. В то же время, кажущееся предпочтительным создание соответствующего числа переменных условия и избирательное разблокирование соседей насытившегося философа двумя вызовами pthread_cond_signal() на самом деле не позволяет гарантировать отсутствие ложных пробуждений, так как из-за нюансов планирования нужную вилку могут выхватить буквально из-под носа (или из-под руки).

Обратим внимание на то, что цикл, в который заключено ожидание на переменной условия, в данном случае использован для захвата нескольких ресурсов.Это напоминает описанные в курсе [1] групповые операции с семафорами, если иметь в виду цикл в целом и отвлечься от ложных разблокирований.

Разумеется, приведенное решение задачи об обедающих философах является нечестным, поскольку во время ожидания на переменной условия, которое длится неопределенно долго, философ "отключается", он не беседует и не ест, что, по условию задачи, запрещено (этим недостатком страдает и программа из курса [1], основанная на групповых операциях с семафорами). Однако как иллюстрация типичных способов работы с мьютексами и переменными условия данная программа имеет право на существование.


restrict mutex, const pthread_mutexattr_t


#include <pthread.h>
int pthread_mutex_init ( pthread_mutex_t * restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy ( pthread_mutex_t *mutex);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
Листинг 2.1. Описание функций инициализации и разрушения мьютексов.
Закрыть окно




#include <pthread.h>
int pthread_mutex_lock ( pthread_mutex_t *mutex);
int pthread_mutex_trylock ( pthread_mutex_t *mutex);
int pthread_mutex_unlock ( pthread_mutex_t *mutex);
Листинг 2.2. Описание функций захвата и освобождения мьютексов.
Закрыть окно




#include <pthread.h> #include <time.h> int pthread_mutex_timedlock ( pthread_mutex_t * restrict mutex, const struct timespec *restrict abstime);
Листинг 2.3. Описание функции захвата мьютексов с ограниченным ожиданием.
Закрыть окно




#include <pthread.h>
int pthread_mutex_getprioceiling ( const pthread_mutex_t *restrict mutex, int *restrict prioceiling);
int pthread_mutex_setprioceiling ( pthread_mutex_t * restrict mutex, int prioceiling, int *restrict old_ceiling);
Листинг 2.4. Описание функций опроса и установки атрибутов мьютекса.
Закрыть окно




#include <pthread.h>
int pthread_mutexattr_init ( pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy ( pthread_mutexattr_t *attr);
Листинг 2.5. Описание функций инициализации и разрушения атрибутных объектов мьютексов.
Закрыть окно




#include <pthread.h>
int pthread_mutexattr_gettype ( const pthread_mutexattr_t *restrict attr, int *restrict type);
int pthread_mutexattr_settype ( pthread_mutexattr_t *attr, int type);
int pthread_mutexattr_getpshared ( const pthread_mutexattr_t *restrict attr, int *restrict pshared);
int pthread_mutexattr_setpshared ( pthread_mutexattr_t *attr, int pshared);
int pthread_mutexattr_getprotocol ( const pthread_mutexattr_t *restrict attr, int *restrict protocol);
int pthread_mutexattr_setprotocol ( *attr, int protocol);
int pthread_mutexattr_getprioceiling ( const pthread_mutexattr_t *restrict attr, int *restrict prioceiling); int pthread_mutexattr_setprioceiling ( pthread_mutexattr_t *attr, int prioceiling);
Листинг 2.6. Описание функций опроса и установки атрибутов мьютекса в атрибутных объектах.
Закрыть окно




#ifndef g_MALLOC #define g_MALLOC
/* Количество размеров (в словах типа size_t), */ /* для которых поддерживаются */ /* разные списки памяти */ #define DIF_SIZES 8
/* Размер пула памяти */ #define POOL_SIZE 65536
/* Указатель на кусок памяти нулевого размера */ #define g_NULL ((void *) (-1))
/* Первое поле следующей структуры нужно /* для всех кусков памяти, а второе – */ /* только для провязки свободных.*/ /* При отведении памяти адрес второго */ */ поля выдается как результат */ typedef struct listi { size_t length; struct listi *pnext; } *list_of_mem;
extern void *g_malloc (size_t); extern void g_free (void *);
#endif
Листинг 2.7. Заголовочный файл g_malloc.h для функций выделения и освобождения памяти в многопотоковой среде.
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * */ /* Функции выделения и освобождения памяти */ /* * * * * * * * * * * * * * * * * * * * * */
#include <pthread.h> #include <errno.h> #include <assert.h> #include <string.h> #include "g_malloc.h"
static char mem_pool [POOL_SIZE] = {0, }; /* Размер занятой части пула памяти */ /* Списки свободного пространства */ /* (по одному на каждый размер */ /* от 1 до DIF_SIZES) */ static size_t cur_pool_size = 0;
static list_of_mem short_lists [DIF_SIZES] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL}; /* Список больших */ /* свободных кусков (превосходящих DIF_SIZES) */ /* Разные мьютексы для разных */ /* групп списков свободного пространства */ static list_of_mem big_list = NULL;
static pthread_mutex_t short_lists_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t big_list_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER;
/* * * * * * * * * * */ /* Выделение памяти */ /* * * * * * * * * * */ void *g_malloc (size_t size) { /* Указатель для хождения по списку */ /* больших свободных кусков */ list_of_mem *p; /* Указатель для хождения по спискам */ /* свободных кусков */ list_of_mem pt; /* Индекс в массиве short_lists */ size_t ls; size_t ts; /* Временная переменная */
if (size == 0) { return (g_NULL); /* Важно, чтобы результат был */ /* отличен от NULL, поскольку NULL */ /* – признак ненормального завершения */ }
/* Округлим запрошенный размер вверх */ /* до кратного размеру size_t и */ /* прибавим слово служебной информации */ size = (size – 1 + 2 * sizeof (size_t)) & ~(sizeof (size_t) – 1);
/* Вычислим индекс в массиве */ /* short_lists [], соответствующий */ /* запрошенному размеру */ ls = size / sizeof (size_t) – 2;
if (ls < DIF_SIZES) { /* Попробуем выдать кусок */ /* из списка коротких */ assert ( pthread_mutex_lock( &short_lists_mutex) == 0); if ((pt = short_lists [ls]) != NULL) { /* Есть нужный кусок */ short_lists [ls] = (short_lists [ls])->pnext; assert (pthread_mutex_unlock( &short_lists_mutex) == 0); return (&pt->pnext); } assert (pthread_mutex_unlock( &short_lists_mutex) == 0); }
/* Попробуем выдать кусок из */ /* списка больших */ assert (pthread_mutex_lock( &big_list_mutex) == 0); for (p = &big_list, pt = *p; pt != NULL; p = &pt->pnext, pt = *p) { if ((signed long) (ts = pt->length – size) >= 0) { /* Нашли подходящий кусок */ if (ts < sizeof (*pt)) { /* Придется выдать кусок целиком – */ /* в остатке не помещается */ /* служебная информация */ *p = pt->pnext; } else { /* Отрежем сколько надо и, */ /* при необходимости, */ /* перецепим остаток в */ /* список коротких */ if ((ls = (pt->length = ts) / sizeof (size_t) – 2) < DIF_SIZES) { *p = pt->pnext; assert (pthread_mutex_lock( &short_lists_mutex) == 0); pt->pnext = short_lists [ls]; short_lists [ls] = pt; assert (pthread_mutex_unlock( &short_lists_mutex) == 0); } pt = (list_of_mem) ((char *) pt + ts); pt->length = size; } assert ( pthread_mutex_unlock( &big_list_mutex) == 0); return (&pt->pnext); } } /* for */ assert (pthread_mutex_unlock ( &big_list_mutex) == 0);
/* Кусок из большого списка */ /* выдать не удалось. */ /* Попробуем взять прямо из */ /* пула памяти */ assert (pthread_mutex_lock( &pool_mutex) == 0); if (cur_pool_size + size <= POOL_SIZE) { pt = (list_of_mem) (mem_pool + cur_pool_size); pt->length = size; cur_pool_size += size; assert (pthread_mutex_unlock ( &pool_mutex) == 0); return (&pt->pnext); } assert (pthread_mutex_unlock( &pool_mutex) == 0);
/* Неудача при выделении памяти */ errno = ENOMEM; return (NULL); }
/* * * * * * * * * * * * * * * * * * */ /* Возврат ранее запрошенной памяти */ /* * * * * * * * * * * * * * * * * * */ void g_free (void *p) { list_of_mem pt; size_t size, ls;
if ((p == g_NULL) || (p == NULL)) { return; }
/* Установим указатель на */ /* служебную информацию */ pt = (list_of_mem) ((char *) p – sizeof (size_t)); size = pt->length; ls = size / sizeof (size_t) – 2; memset (p, 0, size – sizeof (size_t));
/* Не из конца ли пула этот кусок? */ assert (pthread_mutex_lock( &pool_mutex) == 0); if (((char *) pt + size) == (mem_pool + cur_pool_size)) { pt->length = 0; cur_pool_size -= size; assert (pthread_mutex_unlock( &pool_mutex) == 0); return; } assert (pthread_mutex_unlock( &pool_mutex) == 0);
/* Добавим освободившийся кусок */ /* к одному из списков */ if (ls < DIF_SIZES) { assert (pthread_mutex_lock( &short_lists_mutex) == 0); pt->pnext = short_lists [ls]; short_lists [ls] = pt; assert (pthread_mutex_unlock( &short_lists_mutex) == 0); } else { /* Добавим к большому списку */ assert (pthread_mutex_lock( &big_list_mutex) == 0); pt->pnext = big_list; big_list = pt; assert (pthread_mutex_unlock( &big_list_mutex) == 0); } }
Листинг 2.8. Исходный текст функций выделения и освобождения памяти в многопотоковой среде.
Закрыть окно




#include <unistd.h> #include <stdio.h> #include <pthread.h> #include "g_malloc.h"
static void *start_func (void *dummy) { void *p1, *p2, *p3;
printf ("g_malloc (65000): %p\n", p1 = g_malloc (65000)); sleep (1); printf ("g_malloc (1): %p\n", p2 = g_malloc (1)); sleep (1); g_free (p1); sleep (1); g_free (p2); sleep (1); printf ("g_malloc (64990): %p\n", p1 = g_malloc (64990)); sleep (1); printf ("g_malloc (1): %p\n", p2 = g_malloc (1)); sleep (1); printf ("g_malloc (5): %p\n", p3 = g_malloc (5)); sleep (1); g_free (p1); sleep (1); g_free (p2); sleep (1); g_free (p3); sleep (1); printf ("g_malloc (100000): %p\n", p3 = g_malloc (100000));
return (NULL); }
int main (void) { pthread_t pt1, pt2;
pthread_create (&pt1, NULL, start_func, NULL); pthread_create (&pt2, NULL, start_func, NULL);
pthread_join (pt1, NULL); pthread_join (pt2, NULL);
return (0); }
Листинг 2.9. Пример программы, использующей функции выделения и освобождения памяти в многопотоковой среде.
Закрыть окно




g_malloc (65000): 0x8049024 g_malloc (65000): (nil) g_malloc (1): 0x8058e10 g_malloc (1): 0x8058e18 g_malloc (64990): 0x804902c g_malloc (64990): (nil) g_malloc (1): 0x8049024 g_malloc (1): 0x8058e10 g_malloc (5): 0x8058e18 g_malloc (5): 0x8058e24 g_malloc (100000): (nil) g_malloc (100000): (nil)
Листинг 2.10. Возможные результаты работы программы, использующей функции выделения и освобождения памяти в многопотоковой среде.
Закрыть окно




/* Обедающие философы. Многопотоковая реализация с помощью мьютексов. Запуск: mudrecMutex [-a | -p | -I] [-t число_секунд] имя_философа ...
Опции: -t число_секунд – сколько секунд моделируется
Стратегии захвата вилок: -a – сначала захватывается вилка с меньшим номером; -p – сначала захватывается нечетная вилка; -I – некорректная (но эффективная) интеллигентная стратегия: во время ожидания уже захваченная вилка кладется.
Пример запуска: mudrecMutex -p -t 300 A B C D E F G\ H I J K L M N\ O P Q R S T U\ V W X Y Z */
static char rcsid[] __attribute__((unused)) = \ "$Id: mudrecMutex.c,v 1.14 2003/11/20 16:09:20" "sambor Exp $";
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <signal.h> #include <string.h> #include <fcntl.h> #include <limits.h> #include <errno.h>
#define max(a,b) ((a)>(b)?(a):(b)) #define min(a,b) ((a)>(b)?(b):(a))
struct mudrec { char *name; int left_fork, right_fork; int eat_time, wait_time, think_time, max_wait_time; int count; pthread_t thread; int private_pFdIn; } *kafedra;
/* Глобальные счетчики и логические переменные */ int Stop = 0; /* Признак конца обеда */
/* Различные дескрипторы */ int protokol [2] = {-1, -1}; #define pFdIn (protokol [1]) #define pFdOut (protokol [0])
/* Массив мьютексов для синхронизации доступа */ /* к вилкам */ pthread_mutex_t *mtxFork;
/* Разные алгоритмы захвата вилок */ static void get_forks_simple ( struct mudrec *this); static void get_forks_odd ( struct mudrec *this); static void get_forks_maybe_infinit_time( struct mudrec *this);
/* Используемый метод захвата вилок */ void (*get_forks) (struct mudrec *this) = get_forks_simple;
/* Возвращение вилок */ static void put_forks (struct mudrec *this);
/*Потоки-философы */ void *filosof (void *arg) { struct mudrec *this = arg; char buffer [LINE_MAX]; int bytes; int private_pFdIn = this->private_pFdIn;
while (!Stop) { /* Пора подкрепиться */ { int wait_time, tm = time (NULL);
bytes = write ( private_pFdIn, buffer, strlen (buffer));
(*get_forks) (this);
wait_time = time (NULL) – tm; this->wait_time += wait_time; this->max_wait_time = max (wait_time, this->max_wait_time);
sprintf( buffer, "%s: ждал вилок %d сек\n", this->name, wait_time); bytes = write (private_pFdIn, buffer, strlen (buffer)); }
/* Может, обед уже закончился? */ if (Stop) { put_forks (this); break; }
/* Ест */ { int eat_time = rand () % 20 + 1;
sleep (eat_time);
this->eat_time += eat_time; this->count++; sprintf (buffer, "%s: ел %d сек\n", this->name, eat_time); bytes = write ( private_pFdIn, buffer, strlen (buffer)); }
/* Отдает вилки */ put_forks (this);
if (Stop) break;
/* Размышляет */ { int think_time = rand () % 10 + 1;
sleep (think_time); this->think_time += think_time; } } /* while (!Stop) */
sprintf (buffer,"%s: уходит\n", this->name); bytes = write (private_pFdIn, buffer, strlen (buffer));
close (private_pFdIn);
return (NULL); } /* Поток-философ */
/* Кладет вилки одну за другой */ static void put_forks (struct mudrec *this) { pthread_mutex_unlock ( &mtxFork [this->left_fork – 1]); pthread_mutex_unlock ( &mtxFork [this->right_fork – 1]); }
/* Берет вилки по очереди в порядке номеров */ static void get_forks_simple( struct mudrec *this) { int first = min ( this->left_fork, this->right_fork); int last = max ( this->left_fork, this->right_fork); pthread_mutex_lock ( &mtxFork [first – 1]); pthread_mutex_lock ( &mtxFork [last – 1]); }
/* Берем сначала нечетную вилку */ /* (если обе нечетные – */ /* то с большим номером) */ static void get_forks_odd (struct mudrec *this) { int left = this->left_fork, right = this->right_fork;
int first; int last;
if ((left & 1) > (right & 1)) { first = left; last = right; } else if ((left & 1) < (right & 1)) { first = right; last = left; } else { first = max (left, right); last = min (left, right); }
pthread_mutex_lock ( &mtxFork [first – 1]); pthread_mutex_lock ( &mtxFork [last – 1]); }
/* Берем вилки по очереди, в */ /* произвольном порядке. */ /* Но если вторая вилка не */ /* берется сразу, то кладем */ /* первую. */ /* То есть философ не расходует */ /* вилочное время впустую. */ static void get_forks_maybe_infinit_time (struct mudrec *this) { int left = this->left_fork, right = this->right_fork;
for (;;) { pthread_mutex_lock ( &mtxFork [left – 1]); if (0 == pthread_mutex_trylock (&mtxFork [right – 1])) return; pthread_mutex_unlock ( &mtxFork [left – 1]); pthread_mutex_lock ( &mtxFork [right – 1]); if (0 == pthread_mutex_trylock (&mtxFork [left – 1])) return; pthread_mutex_unlock ( &mtxFork [right – 1]); } }
/* Мелкие служебные функции */ static void stop (int dummy) { Stop = 1; }
static void usage (char name []) { fprintf (stderr, "Использование: %s" " [-a | -p | -I] " "[-t число_секунд] " "имя_философа ...\n", name); exit (1); }
/* Точка входа демонстрационной программы */ int main (int argc, char *argv []) { char buffer [LINE_MAX], *p; int i, n, c; int open_room_time = 300; int nMudr; struct sigaction sact;
while ((c = getopt (argc, argv, "apIt:")) != -1) { (c) { case 'a': get_forks = get_forks_simple; break; case 'p': get_forks = get_forks_odd; break; case 'I': get_forks = get_forks_maybe_infinit_time; break; case 't': open_room_time = strtol( optarg, &p, 0); if (optarg [0] == 0 || *p != 0) usage (argv [0]); break; default : usage (argv [0]); } }
nMudr = argc – optind; /* Меньше двух */ /* философов неинтересно ... */ if (nMudr < 2) usage (argv [0]);
/* Создание канала для протокола */ /* обработки событий */ pipe (protokol); kafedra = calloc (sizeof (struct mudrec), nMudr);
/* Зачисление на кафедру */ for (i = 0; i < nMudr; i++, optind++) { kafedra [i].name = argv [optind]; /* Выдадим телефон */ kafedra [i].private_pFdIn = fcntl (pFdIn, F_DUPFD, 0);
/* Укажем новичку, */ /* какими вилками */ /* пользоваться */ kafedra [i].left_fork = i + 1; kafedra [i].right_fork = i + 2; } /* Последний */ /* пользуется вилкой первого */ kafedra [nMudr – 1].right_fork = 1;
/* Зададим реакцию на сигналы */ /* и установим будильник */ /* на конец обеда */ sact.sa_handler = stop; (void) sigemptyset (&sact.sa_mask); sact.sa_flags = 0; (void) sigaction (SIGINT, &sact, (struct sigaction *) NULL); (void) sigaction (SIGALRM, &sact, (struct sigaction *) NULL); alarm (open_room_time);
/* Создадим мьютексы для охраны вилок */ mtxFork = calloc ( sizeof (pthread_mutex_t), nMudr); for (i = 0; i < nMudr; i++) { pthread_mutex_init ( &mtxFork [i], NULL); }
/* Философы входят в столовую */ for (i = 0; i < nMudr; i++) pthread_create ( &kafedra [i].thread, NULL, &filosof, (void *) &kafedra [i]);
/* Выдача сообщений на стандартный */ /* вывод и выход */ /* после окончания всех задач */ close (pFdIn); while (1) { n = read (pFdOut, buffer, LINE_MAX); if (n == 0 || (n < 0 && errno != EINTR)) break; for (i = 0; i < n; i++) putchar (buffer [i]); } close (pFdOut);
/* Уничтожение мьютексов */ for (i = 0; i < nMudr; i++) { pthread_mutex_destroy ( &mtxFork [i]); }
/* Выдача сводной информации */ { int full_eating_time = 0; int full_waiting_time = 0; int full_thinking_time = 0; for (i = 1; i <= nMudr; i++) { struct mudrec *this = &kafedra [i – 1];
full_eating_time += this->eat_time; full_waiting_time += this->wait_time; full_thinking_time += this->think_time;
if (this->count > 0) { float count = this->count; float think_time = this->think_time / count; float eat_time = this->eat_time / count; float wait_time = this->wait_time / count;
printf ( "%s: ел %d раз в " "среднем: думал=%.1f " "ел=%.1f ждал=%.1f " "(максимум %d)\n", this->name, this->count, think_time,eat_time, wait_time, this->max_wait_time); } else printf ("%s: не поел\n", this->name); } /* for */
{ float total_time = ( full_eating_time + full_waiting_time + full_thinking_time) / (float) nMudr; printf("Среднее число одновременно" " едящих = %.3f\n", full_eating_time / total_time); printf("Среднее число одновременно" " ждущих = %.3f\n", full_waiting_time / total_time); } } /* Выдача сводной информации */
free (mtxFork); free (kafedra);
/* Сообщим об окончании работы. */ printf ("Конец обеда\n");
return 0; }
Листинг 2.11. Многопотоковый вариант решения задачи об обедающих философах с использованием мьютексов.
Закрыть окно




#include <pthread.h>
int pthread_cond_init ( pthread_cond_t * restrict cond, const pthread_condattr_t *restrict attr);
int pthread_cond_destroy ( pthread_cond_t *cond);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
Листинг 2.12. Описание функций инициализации и разрушения переменных условия.
Закрыть окно




#include <pthread.h>
int pthread_cond_wait ( pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait ( pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, struct timespec *restrict abstime);
Листинг 2.13. Описание функций блокирования на переменной условия.
Закрыть окно




#include <pthread.h>
int pthread_cond_broadcast ( pthread_cond_t *cond);
int pthread_cond_signal ( pthread_cond_t *cond);
Листинг 2.14. Описание функций разблокирования потоков управления, блокированных на переменной условия.
Закрыть окно




#include <pthread.h>
int pthread_condattr_init ( pthread_condattr_t *attr);
int pthread_condattr_destroy ( pthread_condattr_t *attr);
Листинг 2.15. Описание функций инициализации и разрушения атрибутных объектов переменных условия.
Закрыть окно




#include <pthread.h>
int pthread_condattr_getpshared ( const pthread_condattr_t *restrict attr, int *restrict pshared);
int pthread_condattr_setpshared ( pthread_condattr_t *attr, int pshared);
int pthread_condattr_getclock ( const pthread_condattr_t *restrict attr, clockid_t *restrict clock_id);
int pthread_condattr_setclock ( pthread_condattr_t *attr, clockid_t clock_id);
Листинг 2.16. Описание функций опроса и установки атрибутов переменных условия в атрибутных объектах.
Закрыть окно




rc = 0; while (! predicate && rc == 0) { rc = pthread_cond_timedwait ( &cond, &mutex, &ts); }
Листинг 2.17. Типичный цикл ожидания на переменной условия с контролем по времени.
Закрыть окно




/* * * * * * * * * * * * * * * * * */ /* Многопотоковый вариант обеда */ /* философов с использованием */ /* мьютексов и переменных условия */ /* * * * * * * * * * * * * * * * * */
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <errno.h>
/* Число обедающих философов */ #define QPH 5
/* Время ( в секундах) на обед */ #define FO 15
/* Длительность еды */ #define ernd (rand () % 3 + 1)
/* Длительность разговора */ #define trnd (rand () % 5 + 1)
/* Состояние вилок */ static int fork_busy [QPH] = {0, };
/* Синхронизирующая переменная условия */ static pthread_cond_t forks_cond = PTHREAD_COND_INITIALIZER;
/* Синхронизирующий мьютекс */ static pthread_mutex_t forks_mutex = PTHREAD_MUTEX_INITIALIZER;
/* * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока-философа. */ /* Аргумент – номер философа */ /* * * * * * * * * * * * * * * * * * */ void *start_phil (void *no) { /* Время до конца обеда */ int fo; /* Время очередного отрезка */ /* еды или беседы */ int t;
fo = FO; while (fo > 0) { /* Обед */
/* Философ говорит */ printf ("Философ %d беседует\n", (int) no); t = trnd; sleep (t); fo -= t;
/* Пытается взять вилки */ (void) pthread_mutex_lock ( &forks_mutex); while (fork_busy [(int) no – 1] || fork_busy [(int) no % QPH] ) { (void) pthread_cond_wait ( &forks_cond, &forks_mutex); } fork_busy [(int) no – 1] = fork_busy [(int) no % QPH] = 1; (void) pthread_mutex_unlock ( &forks_mutex);
/* Ест */ printf ("Философ %d ест\n", (int) no); t = ernd; sleep (t); fo -= t;
/* Отдает вилки */ (void) pthread_mutex_lock ( &forks_mutex); fork_busy [(int) no – 1] = fork_busy [(int) no % QPH] = 0; (void) pthread_cond_broadcast (&forks_cond); (void) pthread_mutex_unlock (&forks_mutex); } /* while */
printf ("Философ %d закончил обед\n", (int) no); return (NULL); }
/* * * * * * * * * * * * * * * */ /* Создание потоков-философов */ /* и ожидание их завершения */ /* * * * * * * * * * * * * * * */ int main (void) { /* Массив идентификаторов */ /* потоков-философов */ pthread_t pt_id [QPH]; /* Номер философа */ int no; /* Атрибутный объект для */ /* создания потоков */ pthread_attr_t attr_obj;
if ((errno = pthread_attr_init( &attr_obj)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); }
/* В очередь, господа */ /* философы, в очередь! */ if ((errno = pthread_attr_setschedpolicy( &attr_obj, SCHED_FIFO)) != 0) { perror ( "PTHREAD_ATTR_SETSCHEDPOLICY"); return (errno); }
/* Все – к столу */ for (no = 1; no <= QPH; no++) { if ((errno = pthread_create ( &pt_id [no – 1], &attr_obj, start_phil, (void *) no)) != 0) { perror ( "PTHREAD_CREATE"); return (no); } }
/* Ожидание завершения обеда */ for (no = 1; no <= QPH; no++) { (void) pthread_join ( pt_id [no – 1], NULL); }
return 0; }
Листинг 2.18. Пример многопотоковой реализации обеда философов с использованием мьютексов и переменных условия.
Закрыть окно




Философ 1 беседует Философ 2 беседует Философ 3 беседует Философ 4 беседует Философ 5 беседует Философ 4 ест Философ 2 ест Философ 4 беседует Философ 5 ест Философ 2 беседует Философ 3 ест Философ 5 беседует Философ 1 ест Философ 3 беседует Философ 4 ест Философ 1 беседует Философ 2 ест Философ 2 беседует Философ 1 ест Философ 4 беседует Философ 3 ест Философ 1 беседует Философ 5 ест Философ 5 беседует Философ 3 беседует Философ 4 ест Философ 2 ест Философ 4 беседует Философ 2 беседует Философ 1 ест Философ 3 ест Философ 1 закончил обед Философ 5 ест Философ 3 закончил обед Философ 2 ест Философ 5 закончил обед Философ 4 ест Философ 2 закончил обед Философ 4 закончил обед
Листинг 2.19. Возможные результаты работы многопотоковой программы, моделирующей обед философов с использованием мьютексов и переменных условия.
Закрыть окно




#include <pthread.h>
int pthread_rwlock_init ( pthread_rwlock_t * restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy ( pthread_rwlock_t *rwlock);
Листинг 2.20. Описание функций инициализации и разрушения блокировок чтение-запись.
Закрыть окно




#include <pthread.h>
int pthread_rwlock_rdlock ( pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock ( pthread_rwlock_t *rwlock);
Листинг 2.21. Описание функций установки блокировки на чтение.
Закрыть окно




#include <pthread.h> #include <time.h> int pthread_rwlock_timedrdlock ( pthread_rwlock_t * restrict rwlock, const struct timespec *restrict abstime);
Листинг 2.22. Описание функции установки блокировки на чтение с ограниченным ожиданием.
Закрыть окно




#include <pthread.h>
int pthread_rwlock_wrlock ( pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock ( pthread_rwlock_t *rwlock);
Листинг 2.23. Описание функций установки блокировки на запись.
Закрыть окно




#include <pthread.h> #include <time.h> int pthread_rwlock_timedwrlock ( pthread_rwlock_t * restrict rwlock, const struct timespec *restrict abstime);
Листинг 2.24. Описание функции установки блокировки на запись с ограниченным ожиданием.
Закрыть окно




#include <pthread.h> int pthread_rwlock_unlock ( pthread_rwlock_t *rwlock);
Листинг 2.25. Описание функции снятия блокировки чтение-запись.
Закрыть окно




#include <pthread.h>
int pthread_rwlockattr_init ( pthread_rwlockattr_t *attr);
int pthread_rwlockattr_destroy ( pthread_rwlockattr_t *attr);
Листинг 2.26. Описание функций инициализации и разрушения атрибутных объектов блокировок.
Закрыть окно




#include <pthread.h>
int pthread_rwlockattr_getpshared ( const pthread_rwlockattr_t *restrict attr, int *restrict pshared);
int pthread_rwlockattr_setpshared ( pthread_rwlockattr_t *attr, int pshared);
Листинг 2.27. Описание функций опроса и установки атрибутов блокировок в атрибутных объектах.
Закрыть окно




/* * * * * * * * * * * * * * */ /* Типы, структуры и функции */ /* для многопотоковой работы */ /* со списками */ /* * * * * * * * * * * * * * */
#ifndef g_LIST #define g_LIST
/* Указатель на процедуру */ typedef void (*aproc) (void *, void *);
/* Указатель на целочисленную функцию */ typedef int (*afun) (void *, void *);
/* Элемент списка указателей */ /* на объекты произвольного типа */ typedef struct g_link { /* Ссылка на следующий элемент списка */ struct g_link *pnext; /* Ссылка на объект */ void *pvalue; } *g_linkP;
/* Список указателей на объекты */ /* произвольного типа */ typedef struct g_list { /* Голова списка */ g_linkP head; /* Блокировка, ассоциированная */ pthread_rwlock_t lk; /* со списком */ } *g_listP;
/* Инициализировать список */ extern void g_list_init (g_listP);
/* Завершить работу со списком */ extern void g_list_destroy (g_listP);
/* Вставить новый элемент */ /* в конец списка */ extern void g_list_ins_last ( g_listP, void *); /* Вставить новый элемент в */ /* список перед первым, */ /* удовлетворяющим заданному */ /* свойству (или в конец) */ extern void g_list_ins_fun ( g_listP, void *, afun);
/* Удалить элемент из списка */ extern void g_list_del ( g_listP, void *);
/* Применить процедуру ко всем */ /* элементам списка */ extern void g_list_forall_x ( g_listP, aproc, void *);
/* Выбрать подходящий элемент списка */ extern void *g_list_suchas_x ( g_listP, afun, void *);
/* Выдать элемент по номеру */ extern void *g_list_get_bynum ( g_listP, int);
#endif
Листинг 2.28. Заголовочный файл g_list.h для функций работы со списками в многопотоковой среде.
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * */ /* Реализация функций для многопотоковой */ /* работы со списками. */ /* Применяются блокировки чтение-запись */ /* без приостановки выполнения */ /* * * * * * * * * * * * * * * * * * * * */
#define _XOPEN_SOURCE 600
#include <pthread.h> #include <stdlib.h> #include <errno.h> #include "g_list.h"
/* * * * * * * * * * * * * */ /* Инициализировать список */ /* * * * * * * * * * * * * */ void g_list_init (g_listP plist) { plist->head = NULL; errno = pthread_rwlock_init ( &plist->lk, NULL); } /* * * * * * * * * * * * * * * */ /* Завершить работу со списком */ /* * * * * * * * * * * * * * * */ void g_list_destroy (g_listP plist) { errno = pthread_rwlock_destroy ( &plist->lk); }
/* * * * * * * * * * * * * * * * * * * * */ /* Вставить новый элемент в конец списка */ /* * * * * * * * * * * * * * * * * * * * */ void g_list_ins_last (g_listP plist, void *pval) { register g_linkP pt; register g_linkP *p;
if ((errno = pthread_rwlock_trywrlock ( &plist->lk)) != 0) { return; }
for (p = &plist->head; *p != NULL; p = &(*p)->pnext) ; if ((pt = (g_linkP) malloc (sizeof (*pt))) != NULL) { pt->pnext = NULL; pt->pvalue = pval; *p = pt; }
(void) pthread_rwlock_unlock ( &plist->lk); }
/* * * * * * * * * * * * * * * * * */ /* Вставить новый элемент в список */ /* перед первым, */ /* удовлетворяющим заданному свойству */ /* (или в конец) */ /* * * * * * * * * * * * * * * * */ void g_list_ins_fun (g_listP plist, void *pval, afun fun) { register g_linkP pt; register g_linkP *p;
if ((errno = pthread_rwlock_trywrlock ( &plist->lk)) != 0) { return; }
for (p = &plist->head; (*p != NULL) && (fun (pval, (*p)->pnext->pvalue) == 0); p = &(*p)->pnext) ;
if ((pt = (g_linkP) malloc ( sizeof (*pt))) != NULL) { pt->pnext = NULL; pt->pvalue = pval; *p = pt; }
(void) pthread_rwlock_unlock ( &plist->lk); }
/* * * * * * * * * * * * * * */ /* Удалить элемент из списка */ /* * * * * * * * * * * * * * */ void g_list_del (g_listP plist, void *pval) { register g_linkP pt; register g_linkP *p;
if ((errno = pthread_rwlock_trywrlock (&plist->lk)) != 0) { return; }
for (p = &plist->head; *p != NULL; p = &(*p)->pnext) { if ((*p)->pvalue == pval) { pt = *p; *p = pt->pnext; free (pt); errno = pthread_rwlock_unlock( &plist->lk); return; } }
/* Пытаемся удалить */ /* несуществующий элемент */ (void) pthread_rwlock_unlock ( &plist->lk); errno = EINVAL; }
/* * * * * * * * * * * * * */ /* Применить процедуру ко */ /* всем элементам списка */ /* * * * * * * * * * * * */ void g_list_forall_x (g_listP plist, aproc proc, void *extobj) { register g_linkP pt;
/* Устанавливаем блокировку на запись, */ /* поскольку, возможно, процедура */ /* модифицирует */ /* объекты, ссылки на которые */ /* хранятся в списке */ if ((errno = pthread_rwlock_trywrlock( &plist->lk)) != 0) { return; }
for (pt = plist->head; pt != NULL; pt = pt->pnext) { proc (extobj, pt->pvalue); }
(void) pthread_rwlock_unlock( &plist->lk); }
/* * * * * * * * * * * * * * * * * * */ /* Выбрать подходящий элемент списка */ /* * * * * * * * * * * * * * * * * * */ void *g_list_suchas_x (g_listP plist, afun fun, void *extobj) { register g_linkP pt;
/* Устанавливаем блокировку на */ /* чтение, так как */ /* считаем, что функция не */ /* модифицирует объекты, */ /* ссылки на которые хранятся */ /* в списке */ if ((errno = pthread_rwlock_tryrdlock( &plist->lk)) != 0) { return NULL; }
for (pt = plist->head; pt != NULL; pt = pt->pnext) { if (fun (extobj, pt->pvalue)) { (void) pthread_rwlock_unlock( &plist->lk); return (pt->pvalue); } }
(void) pthread_rwlock_unlock( &plist->lk); return NULL; }
/* * * * * * * * * * * * * * */ /* Выдать элемент по номеру */ /* * * * * * * * * * * * * * */ void *g_list_get_bynum (g_listP plist, int n) { register g_linkP pt;
if ((errno = pthread_rwlock_tryrdlock( &plist->lk)) != 0) { return NULL; }
for (pt = plist->head; n--, pt != NULL; pt = pt->pnext) { if (n == 0) { (void) pthread_rwlock_unlock( &plist->lk); return (pt->pvalue); } }
/* Пытаемся извлечь несуществующий */ /* компонент */ (void) pthread_rwlock_unlock( &plist->lk); errno = EINVAL;
return NULL; }
Листинг 2.29. Исходный текст функций для работы со списками в многопотоковой среде.
Закрыть окно




/* * * * * * * * * * * * * * * * * * * */ /* Тест функций многопотоковой работы */ /* со списками */ /* * * * * * * * * * * * * * * * * * * */
#define _XOPEN_SOURCE 600
#include <pthread.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include "g_list.h"
/* Структура для засыпания на */ /* минимальное время */ static struct timespec nslp = {0, 1};
/* Структуры для возврата результатов */ /* потоками: */ /* Сколько операций пытались сделать, */ /* сколько из них оказались неудачными */ /* из-за занятости списка */ static struct pt_res { int nops; int nbusy; } ptres [3] = {{0, 0}, {0, 0}, {0, 0}};
/* * * * * * * * * * * * * * * * * * * * */ /* Этот поток добавляет элементы к списку */ /* * * * * * * * * * * * * * * * * * * * */ static void *start_func_1 (void *plist) { int *p1;
while (1) { if ((p1 = (int *) malloc ( sizeof (int))) != NULL) { *p1 = rand (); errno = 0; g_list_ins_last (plist, p1); ptres [0].nops++; if (errno == EBUSY) { ptres [0].nbusy++; } } (void) nanosleep (&nslp, NULL); }
return (NULL); }
/* * * * * * * * * * * * * * * * * * * * * */ /* Процедура для подсчета суммы элементов */ /* списка */ /* * * * * * * * * * * * * * * * * * * * * */ static void proc_sum (void *sum, void *pval) { *((int *) sum) += *((int *) pval); } /* * * * * * * * * * * * * * * * * * * * * */ /* Этот поток подсчитывает сумму элементов */ /* списка */ /* * * * * * * * * * * * * * * * * * * * * */ static void *start_func_2 (void *plist) { int sum;
while (1) { sum = 0; errno = 0; g_list_forall_x (plist, &proc_sum, &sum); ptres [1].nops++; if (errno == EBUSY) { ptres [1].nbusy++; } (void) nanosleep (&nslp, NULL); }
return (NULL); }
/* * * * * * * * * * * * * * * * * */ /* Этот поток удаляет из списка */ /* элементы со случайными номерами */ /* * * * * * * * * * * * * * * * * */ static void *start_func_3 (void *plist) { int *p1;
while (1) { errno = 0; p1 = (int *) g_list_get_bynum( plist, rand ()); ptres [2].nops++; if (errno == EBUSY) { ptres[2].nbusy++; }
if (p1 != NULL) { errno = 0; g_list_del (plist, p1); ptres [2].nops++; if (errno == EBUSY) { ptres [2].nbusy++; } free (p1); } (void) nanosleep (&nslp, NULL); }
return (NULL); }
/* * * * * * * * * * * * * * * * * * * * * * */ /* Начальный поток всех запустит, поспит, */ /* потом всех терминирует и выдаст статистику */ /* * * * * * * * * * * * * * * * * * * * * * */ int main (void) { g_listP plist; pthread_t pt1, pt2, pt3;
g_list_init (plist);
pthread_create (&pt1, NULL, start_func_1, plist); pthread_create (&pt2, NULL, start_func_2, plist); pthread_create (&pt3, NULL, start_func_3, plist);
sleep (10);
pthread_cancel (pt1); pthread_cancel (pt2); pthread_cancel (pt3);
pthread_join (pt1, NULL); pthread_join (pt2, NULL); pthread_join (pt3, NULL);
g_list_destroy (plist);
printf ("Попыток выполнить " "операцию со списком: %d\n", ptres [0].nops + ptres [1].nops + ptres [2].nops); printf ("Число попыток, неудачных" " из-за занятости " "списка: %d\n", ptres [0].nbusy + ptres [1].nbusy + ptres [2].nbusy);
return (0); }
Листинг 2.30. Пример программы, использующей функции для работы со списками в многопотоковой среде.
Закрыть окно




Попыток выполнить операцию со списком: 1503 Число попыток, неудачных из-за занятости списка: 0
Листинг 2.31. Возможные результаты работы программы, использующей функции для работы со списками в многопотоковой среде.
Закрыть окно




#include <pthread.h>
int pthread_spin_init ( pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy ( pthread_spinlock_t *lock);
Листинг 2.32. Описание функций инициализации и разрушения спин-блокировок.
Закрыть окно




#include <pthread.h>
int pthread_spin_lock ( pthread_spinlock_t *lock);
int pthread_spin_trylock ( pthread_spinlock_t *lock);
Листинг 2.33. Описание функций установки спин-блокировки.
Закрыть окно




#include <pthread.h> int pthread_spin_unlock ( pthread_spinlock_t *lock);
Листинг 2.34. Описание функции снятия спин-блокировки.
Закрыть окно




pthread_spin_lock (&ss->lock); /* Восстановим старую маску */ ss->blocked = oldmask; /* Проверим ждущие сигналы */ pending = ss->pending & ~ss->blocked; pthread_spin_unlock (&ss->lock);
Листинг 2.35. Фрагмент возможной реализации функции sigsuspend().
Закрыть окно




#include <pthread.h>
int pthread_barrier_init ( pthread_barrier_t * restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_destroy ( pthread_barrier_t *barrier);
Листинг 2.36. Описание функций инициализации и разрушения барьеров.
Закрыть окно




#include <pthread.h> int pthread_barrier_wait ( pthread_barrier_t *barrier);
Листинг 2.37. Описание функции синхронизации на барьере.
Закрыть окно




#include <pthread.h>
int pthread_barrierattr_init ( pthread_barrierattr_t *attr);
int pthread_barrierattr_destroy ( pthread_barrierattr_t *attr);
Листинг 2.38. Описание функций инициализации и разрушения атрибутных объектов барьеров.
Закрыть окно




#include <pthread.h>
int pthread_barrierattr_getpshared (const pthread_barrierattr_t *restrict attr, int *restrict pshared);
int pthread_barrierattr_setpshared (pthread_barrierattr_t *attr, int pshared);
Листинг 2.39. Описание функций опроса и установки атрибутов барьеров в атрибутных объектах.
Закрыть окно




if ((status = pthread_barrier_wait( &barrier)) == PTHREAD_BARRIER_SERIAL_THREAD) { /* Выделенные (обычно – объединительные) */ /* действия. */ /* Выполняются каким-то одним потоком */ /* управления */
} else { /* Эта часть выполняется всеми */ /* прочими потоками */ /* управления */ if (status != 0) { /* Обработка ошибочной ситуации */ } else { /* Нормальное "невыделенное" */ /* завершение ожидания */ /* на барьере */ } }
/* Повторная синхронизация – */ /* ожидание завершения выделенных действий */ status = pthread_barrier_wait (&barrier); /* Продолжение параллельной работы */ . . .
Листинг 2.40. Типичная схема применения функции pthread_barrier_wait().
Закрыть окно




/* * * * * * * * * * * * * * * * * * * * * * */ /* Программа использует барьеры для слияния */ /* результатов */ /* коллективных вычислений ln(2) */ /* * * * * * * * * * * * * * * * * * * * * * */
#define _XOPEN_SOURCE 600
#include <pthread.h> #include <stdio.h> #include <errno.h>
static pthread_barrier_t mbrr;
/* Ряд для ln(2) будут суммировать */ /* два потока. */ /* Один возьмет на себя положительные */ /* слагаемые, */ /* другой – отрицательные */ static double sums [2] = {0, 0};
/* * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потоков управления, */ /* участвующих в вычислениях ln(2). */ /* Аргумент ns на самом деле */ /* целочисленный и равен 1 или 2 */ /* * * * * * * * * * * * * * * * * * * * */ static void *start_func (void *ns) { double d = 1; double s = 0; int i;
/* Вычислим свою часть ряда */ for (i = (int) ns; i <= 100000000; i += 2) { s += d / i; }
/* Запомним результат в нужном месте */ sums [(int) ns – 1] = s;
/* Синхронизируемся для получения */ /* общего итога */ if (pthread_barrier_wait (&mbrr) == PTHREAD_BARRIER_SERIAL_THREAD) { sums [0] -= sums [1]; } /* Указатель на итог возвращают оба потока */ return (sums); }
/* * * * * * * * * * * * * * * * * */ /* Инициализация барьера, */ /* создание и ожидание завершения */ /* потоков управления */ /* * * * * * * * * * * * * * * * * */ int main (void) { pthread_t pt1, pt2; double *pd;
if ((errno = pthread_barrier_init ( &mbrr, NULL, 2)) != 0) { perror ("PTHREAD_BARRIER_INIT"); return (errno); }
pthread_create (&pt1, NULL, start_func, (void *) 1); pthread_create (&pt2, NULL, start_func, (void *) 2);
pthread_join (pt1, (void **) &pd); pthread_join (pt2, (void **) &pd);
printf ("Коллективно вычисленное" " значение ln(2): %g\n", *pd);
return (pthread_barrier_destroy( &mbrr)); }
Листинг 2.41. Пример программы, использующей барьеры.
Закрыть окно



Спин-блокировки


Спин-блокировки представляют собой чрезвычайно низкоуровневое средство синхронизации, предназначенное в первую очередь для применения в многопроцессорной конфигурации с разделяемой памятью. Они обычно реализуются как атомарно устанавливаемое булево значение (истина – блокировка установлена). Аппаратура поддерживает подобные блокировки командами вида "проверить и установить".

При попытке установить спин-блокировку, если она захвачена кем-то другим, как правило, применяется активное ожидание освобождения, с постоянным опросом в цикле состояния блокировки. Естественно, при этом занимается процессор, так что спин-блокировки следует устанавливать только на очень короткое время и их владелец не должен приостанавливать свое выполнение.

Для описываемых блокировок стандарт POSIX-2001 не предусматривает установки с ограниченным ожиданием. Это понятно, поскольку накладные расходы по времени на ограничение в типичном случае превысят само время ожидания.

По сравнению с мьютексами спин-блокировки могут иметь то преимущество, что (активное) ожидание и установка не связаны с переключением контекстов, активизацией планировщика и т.п. Если ожидание оказывается кратким, минимальными оказываются и накладные расходы. Приложение, чувствительное к подобным тонкостям, в каждой конкретной ситуации может выбрать наиболее эффективное средство синхронизации.

Согласно стандарту POSIX-2001, спин-блокировки обслуживаются следующими группами функций:

инициализация и разрушение спин-блокировок: pthread_spin_init(), pthread_spin_destroy() (см. листинг 2.32);

#include <pthread.h>

int pthread_spin_init ( pthread_spinlock_t *lock, int pshared);

int pthread_spin_destroy ( pthread_spinlock_t *lock);

Листинг 2.32. Описание функций инициализации и разрушения спин-блокировок. (html, txt)

установка спин-блокировки: pthread_spin_lock(), pthread_spin_trylock() (см. листинг 2.33);

#include <pthread.h>

int pthread_spin_lock ( pthread_spinlock_t *lock);

int pthread_spin_trylock ( pthread_spinlock_t *lock);


Листинг 2.33. Описание функций установки спин-блокировки. (html, txt)

снятие спин-блокировки: pthread_spin_unlock() (см. листинг 2.34).

#include <pthread.h> int pthread_spin_unlock ( pthread_spinlock_t *lock);

Листинг 2.34. Описание функции снятия спин-блокировки. (html, txt)



Обратим внимание на то, что применительно к спин-блокировкам было решено не возиться с атрибутными объектами, а единственный поддерживаемый атрибут – признак использования несколькими процессами – задавать при вызове функции pthread_spin_init().

Поскольку между применением мьютексов и спин-блокировок много общего, мы не будем приводить примеры программ, использующих спин-блокировки. Ограничимся маленьким, слегка модифицированным характерным фрагментом реализации функции sigsuspend() в библиотеке glibc (см. листинг 2.35).

pthread_spin_lock (&ss->lock); /* Восстановим старую маску */ ss->blocked = oldmask; /* Проверим ждущие сигналы */ pending = ss->pending & ~ss->blocked; pthread_spin_unlock (&ss->lock);

Листинг 2.35. Фрагмент возможной реализации функции sigsuspend(). (html, txt)

Спин-блокировка устанавливается на очень короткий участок кода; естественно, она должна быть реализована весьма эффективно, чтобы накладные расходы не оказались чрезмерными.