00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00034 #include "adapter/adapi.h"
00035 #include "daemon/engine.h"
00036 #include "daemon/worker.h"
00037 #include "shared/allocator.h"
00038 #include "scheduler/schedule.h"
00039 #include "scheduler/task.h"
00040 #include "shared/locks.h"
00041 #include "shared/log.h"
00042 #include "shared/status.h"
00043 #include "shared/util.h"
00044 #include "signer/tools.h"
00045 #include "signer/zone.h"
00046 #include "signer/zonedata.h"
00047
00048 #include <time.h>
00049
00050 ods_lookup_table worker_str[] = {
00051 { WORKER_WORKER, "worker" },
00052 { WORKER_DRUDGER, "drudger" },
00053 { 0, NULL }
00054 };
00055
00056
00061 worker_type*
00062 worker_create(allocator_type* allocator, int num, worker_id type)
00063 {
00064 worker_type* worker;
00065
00066 if (!allocator) {
00067 return NULL;
00068 }
00069 ods_log_assert(allocator);
00070
00071 worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
00072 if (!worker) {
00073 return NULL;
00074 }
00075
00076 ods_log_debug("create worker[%i]", num +1);
00077 lock_basic_init(&worker->worker_lock);
00078 lock_basic_set(&worker->worker_alarm);
00079 lock_basic_lock(&worker->worker_lock);
00080 worker->allocator = allocator;
00081 worker->thread_num = num +1;
00082 worker->engine = NULL;
00083 worker->task = NULL;
00084 worker->working_with = TASK_NONE;
00085 worker->need_to_exit = 0;
00086 worker->type = type;
00087 worker->clock_in = 0;
00088 worker->jobs_appointed = 0;
00089 worker->jobs_completed = 0;
00090 worker->jobs_failed = 0;
00091 worker->sleeping = 0;
00092 worker->waiting = 0;
00093 lock_basic_unlock(&worker->worker_lock);
00094 return worker;
00095 }
00096
00097
00102 static const char*
00103 worker2str(worker_id type)
00104 {
00105 ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
00106 if (lt) {
00107 return lt->name;
00108 }
00109 return NULL;
00110 }
00111
00112
00117 static int
00118 worker_fulfilled(worker_type* worker)
00119 {
00120 return (worker->jobs_completed + worker->jobs_failed) ==
00121 worker->jobs_appointed;
00122 }
00123
00124
00129 static void
00130 worker_perform_task(worker_type* worker)
00131 {
00132 engine_type* engine = NULL;
00133 zone_type* zone = NULL;
00134 task_type* task = NULL;
00135 task_id what = TASK_NONE;
00136 time_t when = 0;
00137 time_t never = (3600*24*365);
00138 ods_status status = ODS_STATUS_OK;
00139 int fallthrough = 0;
00140 int backup = 0;
00141 char* working_dir = NULL;
00142 char* cfg_filename = NULL;
00143 uint32_t tmpserial = 0;
00144 time_t start = 0;
00145 time_t end = 0;
00146
00147
00148 if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
00149 return;
00150 }
00151 ods_log_assert(worker);
00152 ods_log_assert(worker->task);
00153 ods_log_assert(worker->task->zone);
00154
00155 engine = (engine_type*) worker->engine;
00156 task = (task_type*) worker->task;
00157 zone = (zone_type*) worker->task->zone;
00158 ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
00159 worker2str(worker->type), worker->thread_num, task_what2str(task->what),
00160 task_who2str(task->who), (uint32_t) worker->clock_in);
00161
00162
00163 switch (task->what) {
00164 case TASK_SIGNCONF:
00165 worker->working_with = TASK_SIGNCONF;
00166
00167 ods_log_verbose("[%s[%i]] load signconf for zone %s",
00168 worker2str(worker->type), worker->thread_num,
00169 task_who2str(task->who));
00170 status = zone_load_signconf(zone, &what);
00171 if (status == ODS_STATUS_UNCHANGED) {
00172 if (!zone->signconf->last_modified) {
00173 ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
00174 worker2str(worker->type), worker->thread_num,
00175 task_who2str(task->who));
00176 }
00177 status = ODS_STATUS_ERR;
00178 }
00179
00180
00181 when = time_now();
00182 if (status == ODS_STATUS_UNCHANGED) {
00183 if (task->halted != TASK_NONE) {
00184 goto task_perform_continue;
00185 } else {
00186 status = ODS_STATUS_OK;
00187 }
00188 }
00189
00190 if (status == ODS_STATUS_OK) {
00191 status = zone_publish_dnskeys(zone, 0);
00192 }
00193 if (status == ODS_STATUS_OK) {
00194 status = zone_prepare_nsec3(zone, 0);
00195 }
00196 if (status == ODS_STATUS_OK) {
00197 status = zonedata_commit(zone->zonedata);
00198 }
00199
00200 if (status == ODS_STATUS_OK) {
00201 zone->prepared = 1;
00202 task->interrupt = TASK_NONE;
00203 task->halted = TASK_NONE;
00204 } else {
00205 if (task->halted == TASK_NONE) {
00206 goto task_perform_fail;
00207 }
00208 goto task_perform_continue;
00209 }
00210 fallthrough = 0;
00211 break;
00212 case TASK_READ:
00213 worker->working_with = TASK_READ;
00214
00215 ods_log_verbose("[%s[%i]] read zone %s",
00216 worker2str(worker->type), worker->thread_num,
00217 task_who2str(task->who));
00218 if (!zone->prepared) {
00219 ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet",
00220 worker2str(worker->type), worker->thread_num,
00221 task_who2str(task->who));
00222 status = ODS_STATUS_ERR;
00223 } else {
00224 status = tools_input(zone);
00225 }
00226
00227
00228 what = TASK_NSECIFY;
00229 when = time_now();
00230 if (status != ODS_STATUS_OK) {
00231 if (task->halted == TASK_NONE) {
00232 goto task_perform_fail;
00233 }
00234 goto task_perform_continue;
00235 }
00236 fallthrough = 1;
00237 case TASK_NSECIFY:
00238 worker->working_with = TASK_NSECIFY;
00239 ods_log_verbose("[%s[%i]] nsecify zone %s",
00240 worker2str(worker->type), worker->thread_num,
00241 task_who2str(task->who));
00242 status = tools_nsecify(zone);
00243
00244
00245 what = TASK_SIGN;
00246 when = time_now();
00247 if (status == ODS_STATUS_OK) {
00248 if (task->interrupt > TASK_SIGNCONF) {
00249 task->interrupt = TASK_NONE;
00250 task->halted = TASK_NONE;
00251 }
00252 } else {
00253 if (task->halted == TASK_NONE) {
00254 goto task_perform_fail;
00255 }
00256 goto task_perform_continue;
00257 }
00258 fallthrough = 1;
00259 case TASK_SIGN:
00260 worker->working_with = TASK_SIGN;
00261 ods_log_verbose("[%s[%i]] sign zone %s",
00262 worker2str(worker->type), worker->thread_num,
00263 task_who2str(task->who));
00264 tmpserial = zone->zonedata->internal_serial;
00265 status = zone_update_serial(zone);
00266 if (status != ODS_STATUS_OK) {
00267 ods_log_error("[%s[%i]] unable to sign zone %s: "
00268 "failed to increment serial",
00269 worker2str(worker->type), worker->thread_num,
00270 task_who2str(task->who));
00271 } else {
00272
00273 start = time(NULL);
00274 if (zone->stats) {
00275 lock_basic_lock(&zone->stats->stats_lock);
00276 if (!zone->stats->start_time) {
00277 zone->stats->start_time = start;
00278 }
00279 zone->stats->sig_count = 0;
00280 zone->stats->sig_soa_count = 0;
00281 zone->stats->sig_reuse = 0;
00282 zone->stats->sig_time = 0;
00283 lock_basic_unlock(&zone->stats->stats_lock);
00284 }
00285
00286
00287 status = zonedata_queue(zone->zonedata, engine->signq, worker);
00288 ods_log_debug("[%s[%i]] wait until drudgers are finished "
00289 " signing zone %s, %u signatures queued",
00290 worker2str(worker->type), worker->thread_num,
00291 task_who2str(task->who), worker->jobs_appointed);
00292
00293
00294 if (!worker->need_to_exit) {
00295 worker_sleep_unless(worker, 0);
00296 }
00297 if (worker->jobs_failed) {
00298 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00299 "signatures failed", worker2str(worker->type),
00300 worker->thread_num, task_who2str(task->who),
00301 worker->jobs_failed, worker->jobs_appointed);
00302 status = ODS_STATUS_ERR;
00303 } else if (!worker_fulfilled(worker)) {
00304 ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00305 "signatures completed", worker2str(worker->type),
00306 worker->thread_num, task_who2str(task->who),
00307 worker->jobs_completed, worker->jobs_appointed);
00308 status = ODS_STATUS_ERR;
00309 } else {
00310 ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u "
00311 "signatures succeeded", worker2str(worker->type),
00312 worker->thread_num, task_who2str(task->who),
00313 worker->jobs_completed, worker->jobs_appointed);
00314 ods_log_assert(worker->jobs_appointed ==
00315 worker->jobs_completed);
00316 }
00317 worker->jobs_appointed = 0;
00318 worker->jobs_completed = 0;
00319 worker->jobs_failed = 0;
00320
00321
00322 end = time(NULL);
00323 if (status == ODS_STATUS_OK && zone->stats) {
00324 lock_basic_lock(&zone->stats->stats_lock);
00325 zone->stats->sig_time = (end-start);
00326 lock_basic_unlock(&zone->stats->stats_lock);
00327 }
00328 }
00329
00330
00331 if (status != ODS_STATUS_OK) {
00332
00333 zone->zonedata->internal_serial = tmpserial;
00334 if (task->halted == TASK_NONE) {
00335 goto task_perform_fail;
00336 }
00337 goto task_perform_continue;
00338 } else {
00339 if (task->interrupt > TASK_SIGNCONF) {
00340 task->interrupt = TASK_NONE;
00341 task->halted = TASK_NONE;
00342 }
00343 }
00344 what = TASK_AUDIT;
00345 when = time_now();
00346 fallthrough = 1;
00347 case TASK_AUDIT:
00348 worker->working_with = TASK_AUDIT;
00349 if (zone->signconf->audit) {
00350 ods_log_verbose("[%s[%i]] audit zone %s",
00351 worker2str(worker->type), worker->thread_num,
00352 task_who2str(task->who));
00353 working_dir = strdup(engine->config->working_dir);
00354 cfg_filename = strdup(engine->config->cfg_filename);
00355 status = tools_audit(zone, working_dir, cfg_filename);
00356 if (working_dir) { free((void*)working_dir); }
00357 if (cfg_filename) { free((void*)cfg_filename); }
00358 working_dir = NULL;
00359 cfg_filename = NULL;
00360 } else {
00361 status = ODS_STATUS_OK;
00362 }
00363
00364
00365 if (status != ODS_STATUS_OK) {
00366 if (task->halted == TASK_NONE) {
00367 goto task_perform_fail;
00368 }
00369 goto task_perform_continue;
00370 }
00371 what = TASK_WRITE;
00372 when = time_now();
00373 fallthrough = 1;
00374 case TASK_WRITE:
00375 worker->working_with = TASK_WRITE;
00376 ods_log_verbose("[%s[%i]] write zone %s",
00377 worker2str(worker->type), worker->thread_num,
00378 task_who2str(task->who));
00379
00380 status = tools_output(zone);
00381 zone->processed = 1;
00382
00383
00384 if (status != ODS_STATUS_OK) {
00385 if (task->halted == TASK_NONE) {
00386 goto task_perform_fail;
00387 }
00388 goto task_perform_continue;
00389 } else {
00390 if (task->interrupt > TASK_SIGNCONF) {
00391 task->interrupt = TASK_NONE;
00392 task->halted = TASK_NONE;
00393 }
00394 }
00395 if (duration2time(zone->signconf->sig_resign_interval)) {
00396 what = TASK_SIGN;
00397 when = time_now() +
00398 duration2time(zone->signconf->sig_resign_interval);
00399 } else {
00400 what = TASK_NONE;
00401 when = time_now() + never;
00402 }
00403 backup = 1;
00404 fallthrough = 0;
00405 break;
00406 case TASK_NONE:
00407 worker->working_with = TASK_NONE;
00408 ods_log_warning("[%s[%i]] none task for zone %s",
00409 worker2str(worker->type), worker->thread_num,
00410 task_who2str(task->who));
00411 when = time_now() + never;
00412 fallthrough = 0;
00413 break;
00414 default:
00415 ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
00416 worker2str(worker->type), worker->thread_num,
00417 task_who2str(task->who));
00418 what = TASK_SIGNCONF;
00419 when = time_now();
00420 fallthrough = 0;
00421 break;
00422 }
00423
00424
00425 task->backoff = 0;
00426
00427
00428 if (fallthrough == 0 && task->interrupt != TASK_NONE &&
00429 task->interrupt != what) {
00430 ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
00431 worker2str(worker->type), worker->thread_num,
00432 task_what2str(what), task_who2str(task->who));
00433
00434 task->what = task->interrupt;
00435 task->when = time_now();
00436 task->halted = what;
00437 } else {
00438 ods_log_debug("[%s[%i]] next task %s for zone %s",
00439 worker2str(worker->type), worker->thread_num,
00440 task_what2str(what), task_who2str(task->who));
00441
00442 task->what = what;
00443 task->when = when;
00444 if (!fallthrough) {
00445 task->interrupt = TASK_NONE;
00446 task->halted = TASK_NONE;
00447 }
00448 }
00449
00450
00451 if (backup) {
00452 status = zone_backup(zone);
00453 if (status != ODS_STATUS_OK) {
00454 ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
00455 worker2str(worker->type), worker->thread_num,
00456 task_who2str(task->who), ods_status2str(status));
00457
00458 status = ODS_STATUS_OK;
00459 }
00460 backup = 0;
00461 }
00462 return;
00463
00464 task_perform_fail:
00465
00466 zone->processed = 1;
00467
00468 if (task->backoff) {
00469 task->backoff *= 2;
00470 if (task->backoff > ODS_SE_MAX_BACKOFF) {
00471 task->backoff = ODS_SE_MAX_BACKOFF;
00472 }
00473 } else {
00474 task->backoff = 60;
00475 }
00476 ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
00477 worker2str(worker->type), worker->thread_num,
00478 task_what2str(task->what), task_who2str(task->who), task->backoff);
00479
00480 task->when = time_now() + task->backoff;
00481 return;
00482
00483 task_perform_continue:
00484 ods_log_info("[%s[%i]] continue task %s for zone %s",
00485 worker2str(worker->type), worker->thread_num,
00486 task_what2str(task->halted), task_who2str(task->who));
00487
00488 what = task->halted;
00489 task->what = what;
00490 task->when = time_now();
00491 task->interrupt = TASK_NONE;
00492 task->halted = TASK_NONE;
00493 if (zone->processed) {
00494 task->when += duration2time(zone->signconf->sig_resign_interval);
00495 }
00496 return;
00497 }
00498
00499
00504 static void
00505 worker_work(worker_type* worker)
00506 {
00507 time_t now, timeout = 1;
00508 zone_type* zone = NULL;
00509 ods_status status = ODS_STATUS_OK;
00510
00511 ods_log_assert(worker);
00512 ods_log_assert(worker->type == WORKER_WORKER);
00513
00514 while (worker->need_to_exit == 0) {
00515 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00516 worker->thread_num);
00517 lock_basic_lock(&worker->engine->taskq->schedule_lock);
00518
00519 worker->task = schedule_pop_task(worker->engine->taskq);
00520
00521 if (worker->task) {
00522 worker->working_with = worker->task->what;
00523 lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00524
00525 zone = worker->task->zone;
00526 lock_basic_lock(&zone->zone_lock);
00527
00528 ods_log_debug("[%s[%i]] start working on zone %s",
00529 worker2str(worker->type), worker->thread_num, zone->name);
00530
00531 worker->clock_in = time(NULL);
00532 worker_perform_task(worker);
00533
00534 zone->task = worker->task;
00535
00536 ods_log_debug("[%s[%i]] finished working on zone %s",
00537 worker2str(worker->type), worker->thread_num, zone->name);
00538
00539
00540 lock_basic_lock(&worker->engine->taskq->schedule_lock);
00541
00542 worker->task = NULL;
00543 worker->working_with = TASK_NONE;
00544 status = schedule_task(worker->engine->taskq, zone->task, 1);
00545
00546 lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00547 lock_basic_unlock(&zone->zone_lock);
00548
00549 timeout = 1;
00550 } else {
00551 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00552 worker->thread_num);
00553
00554
00555 worker->task = schedule_get_first_task(worker->engine->taskq);
00556
00557 lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00558
00559 now = time_now();
00560 if (worker->task && !worker->engine->taskq->loading) {
00561 timeout = (worker->task->when - now);
00562 } else {
00563 timeout *= 2;
00564 if (timeout > ODS_SE_MAX_BACKOFF) {
00565 timeout = ODS_SE_MAX_BACKOFF;
00566 }
00567 }
00568 worker->task = NULL;
00569 worker_sleep(worker, timeout);
00570 }
00571 }
00572 return;
00573 }
00574
00575
00580 static void
00581 worker_drudge(worker_type* worker)
00582 {
00583 zone_type* zone = NULL;
00584 task_type* task = NULL;
00585 rrset_type* rrset = NULL;
00586 ods_status status = ODS_STATUS_OK;
00587 worker_type* chief = NULL;
00588 hsm_ctx_t* ctx = NULL;
00589
00590 ods_log_assert(worker);
00591 ods_log_assert(worker->type == WORKER_DRUDGER);
00592
00593 ctx = hsm_create_context();
00594 if (ctx == NULL) {
00595 ods_log_error("[%s[%i]] unable to drudge: error "
00596 "creating libhsm context", worker2str(worker->type),
00597 worker->thread_num);
00598 }
00599
00600 while (worker->need_to_exit == 0) {
00601 ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00602 worker->thread_num);
00603 chief = NULL;
00604 zone = NULL;
00605 task = NULL;
00606
00607 lock_basic_lock(&worker->engine->signq->q_lock);
00608
00609 rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
00610
00611 lock_basic_unlock(&worker->engine->signq->q_lock);
00612 if (rrset) {
00613
00614 if (chief) {
00615 task = chief->task;
00616 }
00617 if (task) {
00618 zone = task->zone;
00619 }
00620 if (!zone) {
00621 ods_log_error("[%s[%i]] unable to drudge: no zone reference",
00622 worker2str(worker->type), worker->thread_num);
00623 }
00624 if (zone && ctx) {
00625 ods_log_assert(rrset);
00626 ods_log_assert(zone);
00627 ods_log_assert(zone->dname);
00628 ods_log_assert(zone->signconf);
00629 ods_log_assert(ctx);
00630
00631 worker->clock_in = time(NULL);
00632 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
00633 chief->clock_in, zone->stats);
00634 } else {
00635 status = ODS_STATUS_ASSERT_ERR;
00636 }
00637
00638 if (chief) {
00639 lock_basic_lock(&chief->worker_lock);
00640 if (status == ODS_STATUS_OK) {
00641 chief->jobs_completed += 1;
00642 } else {
00643 chief->jobs_failed += 1;
00644
00645 }
00646 lock_basic_unlock(&chief->worker_lock);
00647
00648 if (worker_fulfilled(chief) && chief->sleeping) {
00649 ods_log_debug("[%s[%i]] wake up chief[%u], work is done",
00650 worker2str(worker->type), worker->thread_num,
00651 chief->thread_num);
00652 worker_wakeup(chief);
00653 chief = NULL;
00654 }
00655 }
00656 rrset = NULL;
00657 } else {
00658 ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00659 worker->thread_num);
00660 worker_wait(&worker->engine->signq->q_lock,
00661 &worker->engine->signq->q_threshold);
00662 }
00663 }
00664
00665 if (chief && chief->sleeping) {
00666 ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting",
00667 worker2str(worker->type), worker->thread_num, chief->thread_num);
00668 worker_wakeup(chief);
00669 }
00670
00671
00672 hsm_destroy_context(ctx);
00673 ctx = NULL;
00674 return;
00675 }
00676
00677
00682 void
00683 worker_start(worker_type* worker)
00684 {
00685 ods_log_assert(worker);
00686 switch (worker->type) {
00687 case WORKER_DRUDGER:
00688 worker_drudge(worker);
00689 break;
00690 case WORKER_WORKER:
00691 worker_work(worker);
00692 break;
00693 default:
00694 ods_log_error("[worker] illegal worker (id=%i)", worker->type);
00695 return;
00696 }
00697 return;
00698 }
00699
00700
00705 void
00706 worker_sleep(worker_type* worker, time_t timeout)
00707 {
00708 ods_log_assert(worker);
00709 lock_basic_lock(&worker->worker_lock);
00710
00711 worker->sleeping = 1;
00712 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00713 timeout);
00714
00715 lock_basic_unlock(&worker->worker_lock);
00716 return;
00717 }
00718
00719
00724 void
00725 worker_sleep_unless(worker_type* worker, time_t timeout)
00726 {
00727 ods_log_assert(worker);
00728 lock_basic_lock(&worker->worker_lock);
00729
00730 while (!worker->need_to_exit && !worker_fulfilled(worker)) {
00731 worker->sleeping = 1;
00732 lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00733 timeout);
00734
00735 ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
00736 "appointed, %u completed, %u failed", worker2str(worker->type),
00737 worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
00738 worker->jobs_failed);
00739 }
00740
00741 lock_basic_unlock(&worker->worker_lock);
00742 return;
00743 }
00744
00745
00750 void
00751 worker_wakeup(worker_type* worker)
00752 {
00753 ods_log_assert(worker);
00754 if (worker && worker->sleeping && !worker->waiting) {
00755 ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
00756 worker->thread_num);
00757 lock_basic_lock(&worker->worker_lock);
00758
00759 lock_basic_alarm(&worker->worker_alarm);
00760 worker->sleeping = 0;
00761
00762 lock_basic_unlock(&worker->worker_lock);
00763 }
00764 return;
00765 }
00766
00767
00772 void
00773 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
00774 {
00775 lock_basic_lock(lock);
00776
00777 lock_basic_sleep(condition, lock, 0);
00778
00779 lock_basic_unlock(lock);
00780 return;
00781 }
00782
00783
00788 void
00789 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
00790 {
00791 lock_basic_lock(lock);
00792
00793 lock_basic_alarm(condition);
00794
00795 lock_basic_unlock(lock);
00796 return;
00797 }
00798
00799
00804 void
00805 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
00806 {
00807 lock_basic_lock(lock);
00808
00809 lock_basic_broadcast(condition);
00810
00811 lock_basic_unlock(lock);
00812 return;
00813 }
00814
00815
00820 void
00821 worker_cleanup(worker_type* worker)
00822 {
00823 allocator_type* allocator;
00824 cond_basic_type worker_cond;
00825 lock_basic_type worker_lock;
00826
00827 if (!worker) {
00828 return;
00829 }
00830 allocator = worker->allocator;
00831 worker_cond = worker->worker_alarm;
00832 worker_lock = worker->worker_lock;
00833
00834 allocator_deallocate(allocator, (void*) worker);
00835 lock_basic_destroy(&worker_lock);
00836 lock_basic_off(&worker_cond);
00837 return;
00838 }