source: subversion/applications/utils/mod_tile/daemon.c @ 28629

Last change on this file since 28629 was 27457, checked in by apmon, 8 years ago

use autoconf / automake to find libraries and make it more portable

File size: 39.2 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h>
9#include <sys/select.h>
10#include <sys/stat.h>
11#include <sys/un.h>
12#include <poll.h>
13#include <errno.h>
14#include <pthread.h>
15#include <signal.h>
16#include <string.h>
17#include <syslog.h>
18#include <getopt.h>
19
20#include "render_config.h"
21#include "daemon.h"
22#include "gen_tile.h"
23#include "protocol.h"
24#include "dir_utils.h"
25
26#define PIDFILE "/var/run/renderd/renderd.pid"
27
28// extern "C" {
29#include "iniparser3.0b/src/iniparser.h"
30// }
31
32static pthread_t *render_threads;
33static pthread_t *slave_threads;
34static struct sigaction sigPipeAction;
35
36static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
37static struct item_idx * item_hashidx;
38static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
39static pthread_mutex_t qLock;
40static pthread_cond_t qCond;
41static int exit_pipe_fd;
42
43static stats_struct stats;
44static pthread_t stats_thread;
45
46static renderd_config config;
47
48int noSlaveRenders;
49int hashidxSize;
50
51void statsRenderFinish(int z, long time) {
52    pthread_mutex_lock(&qLock);
53    if ((z >= 0) && (z <= MAX_ZOOM)) {
54        stats.noZoomRender[z]++;
55        stats.timeZoomRender[z] += time;
56    }
57    pthread_mutex_unlock(&qLock);
58}
59
60struct item *fetch_request(void)
61{
62    struct item *item = NULL;
63
64    pthread_mutex_lock(&qLock);
65
66    while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
67        pthread_cond_wait(&qCond, &qLock);
68    }
69    if (reqPrioNum) {
70        item = reqPrioHead.next;
71        reqPrioNum--;
72        stats.noReqPrioRender++;
73    } else if (reqNum) {
74        item = reqHead.next;
75        reqNum--;
76        stats.noReqRender++;
77    } else if (dirtyNum) {
78        item = dirtyHead.next;
79        dirtyNum--;
80        stats.noDirtyRender++;
81    } else if (reqBulkNum) {
82        item = reqBulkHead.next;
83        reqBulkNum--;
84        stats.noReqBulkRender++;
85    }
86    if (item) {
87        item->next->prev = item->prev;
88        item->prev->next = item->next;
89
90        item->prev = &renderHead;
91        item->next = renderHead.next;
92        renderHead.next->prev = item;
93        renderHead.next = item;
94        item->inQueue = queueRender;
95    }
96
97
98    pthread_mutex_unlock(&qLock);
99
100    return item;
101}
102
103void clear_requests(int fd)
104{
105    struct item *item, *dupes, *queueHead;
106
107    /**Only need to look up on the shorter request and render queue
108      * so using the linear list shouldn't be a problem
109      */
110    pthread_mutex_lock(&qLock);
111    for (int i = 0; i < 4; i++) {
112        switch (i) {
113        case 0: { queueHead = &reqHead; break;}
114        case 1: { queueHead = &renderHead; break;}
115        case 2: { queueHead = &reqPrioHead; break;}
116        case 3: { queueHead = &reqBulkHead; break;}
117        }
118
119        item = queueHead->next;
120        while (item != queueHead) {
121            if (item->fd == fd)
122                item->fd = FD_INVALID;
123
124            dupes = item->duplicates;
125            while (dupes) {
126                if (dupes->fd == fd)
127                    dupes->fd = FD_INVALID;
128                dupes = dupes->duplicates;
129            }
130            item = item->next;
131        }
132    }
133
134    pthread_mutex_unlock(&qLock);
135}
136
137
138static int calcHashKey(struct item *item) {
139    uint64_t xmlnameHash = 0;
140    uint64_t key;
141    for (int i = 0; item->req.xmlname[i] != 0; i++) {
142        xmlnameHash += item->req.xmlname[i];
143    }
144    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
145    return key % hashidxSize;
146}
147
148void insert_item_idx(struct item *item) {
149    struct item_idx * nextItem;
150    struct item_idx * prevItem;
151
152    int key = calcHashKey(item);
153
154    if (item_hashidx[key].item == NULL) {
155        item_hashidx[key].item = item;
156    } else {
157        prevItem = &(item_hashidx[key]);
158        nextItem = item_hashidx[key].next;
159        while(nextItem) {
160            prevItem = nextItem;
161            nextItem = nextItem->next;
162        }
163        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
164        nextItem->item = item;
165        nextItem->next = NULL;
166        prevItem->next = nextItem;
167    }
168}
169
170void remove_item_idx(struct item * item) {
171    int key = calcHashKey(item);
172    struct item_idx * nextItem;
173    struct item_idx * prevItem;
174    struct item * test;
175    if (item_hashidx[key].item == NULL) {
176        //item not in index;
177        return;
178    }
179    prevItem = &(item_hashidx[key]);
180    nextItem = &(item_hashidx[key]);
181
182    while (nextItem != NULL) {
183        test = nextItem->item;
184        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
185                == test->req.z) && (!strcmp(item->req.xmlname,
186                test->req.xmlname))) {
187            /*
188             * Found item, removing it from list
189             */
190            nextItem->item = NULL;
191            if (nextItem->next != NULL) {
192                if (nextItem == &(item_hashidx[key])) {
193                    prevItem = nextItem->next;
194                    memcpy(&(item_hashidx[key]), nextItem->next,
195                            sizeof(struct item_idx));
196                    free(prevItem);
197                } else {
198                    prevItem->next = nextItem->next;
199                }
200            } else {
201                prevItem->next = NULL;
202            }
203
204            if (nextItem != &(item_hashidx[key])) {
205                free(nextItem);
206            }
207            return;
208        } else {
209            prevItem = nextItem;
210            nextItem = nextItem->next;
211        }
212    }
213}
214
215struct item * lookup_item_idx(struct item * item) {
216    struct item_idx * nextItem;
217    struct item * test;
218
219    int key = calcHashKey(item);
220
221    if (item_hashidx[key].item == NULL) {
222        return NULL;
223    } else {
224        nextItem = &(item_hashidx[key]);
225        while (nextItem != NULL) {
226            test = nextItem->item;
227            if ((item->mx == test->mx) && (item->my == test->my)
228                    && (item->req.z == test->req.z) && (!strcmp(
229                    item->req.xmlname, test->req.xmlname))) {
230                return test;
231            } else {
232                nextItem = nextItem->next;
233            }
234        }
235    }
236    return NULL;
237}
238
239static inline const char *cmdStr(enum protoCmd c)
240{
241    switch (c) {
242        case cmdIgnore:  return "Ignore";
243        case cmdRender:  return "Render";
244        case cmdRenderPrio:  return "RenderPrio";
245        case cmdRenderBulk:  return "RenderBulk";
246        case cmdDirty:   return "Dirty";
247        case cmdDone:    return "Done";
248        case cmdNotDone: return "NotDone";
249        default:         return "unknown";
250    }
251}
252
253void send_response(struct item *item, enum protoCmd rsp)
254{
255    struct protocol *req = &item->req;
256    int ret;
257
258    pthread_mutex_lock(&qLock);
259    item->next->prev = item->prev;
260    item->prev->next = item->next;
261    remove_item_idx(item);
262    pthread_mutex_unlock(&qLock);
263
264    while (item) {
265        struct item *prev = item;
266        req = &item->req;
267        if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
268            req->cmd = rsp;
269            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
270            ret = send(item->fd, req, sizeof(*req), 0);
271            if (ret != sizeof(*req))
272                perror("send error during send_done");
273        }
274        item = item->duplicates;
275        free(prev);
276    }
277}
278
279
280enum protoCmd pending(struct item *test)
281{
282    // check all queues and render list to see if this request already queued
283    // If so, add this new request as a duplicate
284    // call with qLock held
285    struct item *item;
286
287    item = lookup_item_idx(test);
288    if (item != NULL) {
289        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
290            test->duplicates = item->duplicates;
291            item->duplicates = test;
292            test->inQueue = queueDuplicate;
293            return cmdIgnore;
294        } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
295            return cmdNotDone;
296        }
297    }
298
299    return cmdRender;
300}
301
302
303enum protoCmd rx_request(const struct protocol *req, int fd)
304{
305    struct protocol *reqnew;
306    struct item *list = NULL, *item;
307    enum protoCmd pend;
308
309    // Upgrade version 1 to version 2
310    if (req->ver == 1) {
311        reqnew = (struct protocol *)malloc(sizeof(struct protocol));
312        memcpy(reqnew, req, sizeof(struct protocol_v1));
313        reqnew->xmlname[0] = 0;
314        req = reqnew;
315    }
316    else if (req->ver != 2) {
317        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
318        return cmdIgnore;
319    }
320
321    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
322            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
323
324    if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
325        return cmdIgnore;
326
327    if (check_xyz(req->x, req->y, req->z))
328        return cmdNotDone;
329
330    item = (struct item *)malloc(sizeof(*item));
331    if (!item) {
332            syslog(LOG_ERR, "malloc failed");
333            return cmdNotDone;
334    }
335
336    item->req = *req;
337    item->duplicates = NULL;
338    item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
339
340#ifdef METATILE
341    /* Round down request co-ordinates to the neareast N (should be a power of 2)
342     * Note: request path is no longer consistent but this will be recalculated
343     * when the metatile is being rendered.
344     */
345    item->mx = item->req.x & ~(METATILE-1);
346    item->my = item->req.y & ~(METATILE-1);
347#else
348    item->mx = item->req.x;
349    item->my = item->req.y;
350#endif
351
352    pthread_mutex_lock(&qLock);
353
354    // Check for a matching request in the current rendering or dirty queues
355    pend = pending(item);
356    if (pend == cmdNotDone) {
357        // We found a match in the dirty queue, can not wait for it
358        pthread_mutex_unlock(&qLock);
359        free(item);
360        return cmdNotDone;
361    }
362    if (pend == cmdIgnore) {
363        // Found a match in render queue, item added as duplicate
364        pthread_mutex_unlock(&qLock);
365        return cmdIgnore;
366    }
367
368    // New request, add it to render or dirty queue
369    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
370        list = &reqHead;
371        item->inQueue = queueRequest;
372        reqNum++;
373    } else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
374        list = &reqPrioHead;
375        item->inQueue = queueRequestPrio;
376        reqPrioNum++;
377    } else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
378        list = &reqBulkHead;
379        item->inQueue = queueRequestBulk;
380        reqBulkNum++;
381    } else if (dirtyNum < DIRTY_LIMIT) {
382        list = &dirtyHead;
383        item->inQueue = queueDirty;
384        dirtyNum++;
385        item->fd = FD_INVALID; // No response after render
386    } else {
387        // The queue is severely backlogged. Drop request
388        stats.noReqDroped++;
389        pthread_mutex_unlock(&qLock);
390        free(item);
391        return cmdNotDone;
392    }
393
394    if (list) {
395        item->next = list;
396        item->prev = list->prev;
397        item->prev->next = item;
398        list->prev = item;
399        /* In addition to the linked list, add item to a hash table index
400         * for faster lookup of pending requests.
401         */
402        insert_item_idx(item);
403
404        pthread_cond_signal(&qCond);
405    } else
406        free(item);
407
408    pthread_mutex_unlock(&qLock);
409
410    return (list == &reqHead)?cmdIgnore:cmdNotDone;
411}
412
413void request_exit(void)
414{
415  // Any write to the exit pipe will trigger a graceful exit
416  char c=0;
417  write(exit_pipe_fd, &c, sizeof(c));
418}
419
420void process_loop(int listen_fd)
421{
422    int num_connections = 0;
423    int connections[MAX_CONNECTIONS];
424    int pipefds[2];
425    int exit_pipe_read;
426
427    bzero(connections, sizeof(connections));
428
429    // A pipe is used to allow the render threads to request an exit by the main process
430    if (pipe(pipefds)) {
431      fprintf(stderr, "Failed to create pipe\n");
432      return;
433    }
434    exit_pipe_fd = pipefds[1];
435    exit_pipe_read = pipefds[0];
436
437    while (1) {
438        struct sockaddr_un in_addr;
439        socklen_t in_addrlen = sizeof(in_addr);
440        fd_set rd;
441        int incoming, num, nfds, i;
442
443        FD_ZERO(&rd);
444        FD_SET(listen_fd, &rd);
445        nfds = listen_fd+1;
446
447        for (i=0; i<num_connections; i++) {
448            FD_SET(connections[i], &rd);
449            nfds = MAX(nfds, connections[i]+1);
450        }
451
452        FD_SET(exit_pipe_read, &rd);
453        nfds = MAX(nfds, exit_pipe_read+1);
454
455        num = select(nfds, &rd, NULL, NULL, NULL);
456        if (num == -1)
457            perror("select()");
458        else if (num) {
459            if (FD_ISSET(exit_pipe_read, &rd)) {
460              // A render thread wants us to exit
461              break;
462            }
463
464            //printf("Data is available now on %d fds\n", num);
465            if (FD_ISSET(listen_fd, &rd)) {
466                num--;
467                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
468                if (incoming < 0) {
469                    perror("accept()");
470                } else {
471                    if (num_connections == MAX_CONNECTIONS) {
472                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
473                        close(incoming);
474                    } else {
475                        connections[num_connections++] = incoming;
476                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
477                    }
478                }
479            }
480            for (i=0; num && (i<num_connections); i++) {
481                int fd = connections[i];
482                if (FD_ISSET(fd, &rd)) {
483                    struct protocol cmd;
484                    int ret;
485
486                    // TODO: to get highest performance we should loop here until we get EAGAIN
487                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
488                    if (ret == sizeof(cmd)) {
489                        enum protoCmd rsp = rx_request(&cmd, fd);
490
491                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
492                            cmd.cmd = rsp;
493                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
494                            ret = send(fd, &cmd, sizeof(cmd), 0);
495                            if (ret != sizeof(cmd))
496                                perror("response send error");
497                        }
498                    } else if (!ret) {
499                        int j;
500
501                        num_connections--;
502                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
503                        for (j=i; j < num_connections; j++)
504                            connections[j] = connections[j+1];
505                        clear_requests(fd);
506                        close(fd);
507                    } else {
508                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
509                        break;
510                    }
511                }
512            }
513        } else {
514            syslog(LOG_ERR, "Select timeout");
515        }
516    }
517}
518
519/**
520 * Periodically write out current stats to a stats file. This information
521 * can then be used to monitor performance of renderd e.g. with a munin plugin
522 */
523void *stats_writeout_thread(void * arg) {
524    stats_struct lStats;
525    int dirtQueueLength;
526    int reqQueueLength;
527    int reqPrioQueueLength;
528    int reqBulkQueueLength;
529        int i;
530
531    int noFailedAttempts = 0;
532    char tmpName[PATH_MAX];
533
534    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
535
536    syslog(LOG_DEBUG, "Starting stats thread");
537    while (1) {
538        pthread_mutex_lock(&qLock);
539        memcpy(&lStats, &stats, sizeof(stats_struct));
540        dirtQueueLength = dirtyNum;
541        reqQueueLength = reqNum;
542        reqPrioQueueLength = reqPrioNum;
543        reqBulkQueueLength = reqBulkNum;
544        pthread_mutex_unlock(&qLock);
545
546        FILE * statfile = fopen(tmpName, "w");
547        if (statfile == NULL) {
548            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
549            noFailedAttempts++;
550            if (noFailedAttempts > 3) {
551                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
552                break;
553            }
554            continue;
555        } else {
556            noFailedAttempts = 0;
557            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
558            fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
559            fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
560            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
561            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
562            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
563            fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
564            fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
565            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
566            for (i = 0; i <= MAX_ZOOM; i++) {
567                fprintf(statfile,"ZoomRendered%02i: %li\n", i, lStats.noZoomRender[i]);
568            }
569            for (i = 0; i <= MAX_ZOOM; i++) {
570                fprintf(statfile,"TimeRenderedZoom%02i: %li\n", i, lStats.timeZoomRender[i]);
571            }
572            fclose(statfile);
573            if (rename(tmpName, config.stats_filename)) {
574                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
575                noFailedAttempts++;
576                if (noFailedAttempts > 3) {
577                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
578                    break;
579                }
580                continue;
581            }
582        }
583        sleep(10);
584    }
585    return NULL;
586}
587
588int client_socket_init(renderd_config * sConfig) {
589    int fd;
590    struct sockaddr_un * addrU;
591    struct sockaddr_in * addrI;
592    struct hostent *server;
593    if (sConfig->ipport > 0) {
594        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i",
595                sConfig->iphostname, sConfig->ipport);
596        addrI = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in));
597        fd = socket(PF_INET, SOCK_STREAM, 0);
598        server = gethostbyname(sConfig->iphostname);
599        if (server == NULL) {
600            syslog(LOG_WARNING, "Could not resolve hostname: %s",
601                    sConfig->iphostname);
602            return FD_INVALID;
603        }
604        bzero((char *) addrI, sizeof(struct sockaddr_in));
605        addrI->sin_family = AF_INET;
606        bcopy((char *) server->h_addr, (char *) &addrI->sin_addr.s_addr,
607                server->h_length);
608        addrI->sin_port = htons(sConfig->ipport);
609        if (connect(fd, (struct sockaddr *) addrI, sizeof(struct sockaddr_in)) < 0) {
610            syslog(LOG_WARNING, "Could not connect to %s:%i",
611                    sConfig->iphostname, sConfig->ipport);
612            return FD_INVALID;
613        }
614        free(addrI);
615        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport,
616                fd);
617    } else {
618        syslog(LOG_INFO, "Initialising unix client socket on %s",
619                sConfig->socketname);
620        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
621        fd = socket(PF_UNIX, SOCK_STREAM, 0);
622        if (fd < 0) {
623            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
624            return FD_INVALID;
625        }
626
627        bzero(addrU, sizeof(struct sockaddr_un));
628        addrU->sun_family = AF_UNIX;
629        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
630
631        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
632            syslog(LOG_WARNING, "socket connect failed for: %s",
633                    sConfig->socketname);
634            close(fd);
635            return FD_INVALID;
636        }
637        free(addrU);
638        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
639                fd);
640    }
641    return fd;
642}
643
644int server_socket_init(renderd_config *sConfig) {
645    struct sockaddr_un addrU;
646    struct sockaddr_in addrI;
647    mode_t old;
648    int fd;
649
650    if (sConfig->ipport > 0) {
651        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
652                sConfig->iphostname, sConfig->ipport);
653        fd = socket(PF_INET, SOCK_STREAM, 0);
654        if (fd < 0) {
655            fprintf(stderr, "failed to create IP socket\n");
656            exit(2);
657        }
658        bzero(&addrI, sizeof(addrI));
659        addrI.sin_family = AF_INET;
660        addrI.sin_addr.s_addr = INADDR_ANY;
661        addrI.sin_port = htons(sConfig->ipport);
662        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
663            fprintf(stderr, "socket bind failed for: %s:%i\n",
664                    sConfig->iphostname, sConfig->ipport);
665            close(fd);
666            exit(3);
667        }
668    } else {
669        syslog(LOG_INFO, "Initialising unix server socket on %s",
670                sConfig->socketname);
671
672        fd = socket(PF_UNIX, SOCK_STREAM, 0);
673        if (fd < 0) {
674            fprintf(stderr, "failed to create unix socket\n");
675            exit(2);
676        }
677
678        bzero(&addrU, sizeof(addrU));
679        addrU.sun_family = AF_UNIX;
680        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
681
682        unlink(addrU.sun_path);
683
684        old = umask(0); // Need daemon socket to be writeable by apache
685        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
686            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
687            close(fd);
688            exit(3);
689        }
690        umask(old);
691    }
692
693    if (listen(fd, QUEUE_MAX) < 0) {
694        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
695        close(fd);
696        exit(4);
697    }
698
699    syslog(LOG_DEBUG, "Created server socket %i", fd);
700
701    return fd;
702
703}
704
705/**
706 * This function is used as a the start function for the slave renderer thread.
707 * It pulls a request from the central queue of requests and dispatches it to
708 * the slave renderer. It then blocks and waits for the response with no timeout.
709 * As it only sends one request at a time (there are as many slave_thread threads as there
710 * are rendering threads on the slaves) nothing gets queued on the slave and should get
711 * rendererd immediately. Thus overall, requests should be nicely load balanced between
712 * all the rendering threads available both locally and in the slaves.
713 */
714void *slave_thread(void * arg) {
715    renderd_config * sConfig = (renderd_config *) arg;
716
717    int pfd = FD_INVALID;
718    int retry;
719    size_t ret_size;
720
721    struct protocol * resp;
722    struct protocol * req_slave;
723
724    req_slave = (struct protocol *)malloc(sizeof(struct protocol));
725    resp = (struct protocol *)malloc(sizeof(struct protocol));
726    bzero(req_slave, sizeof(struct protocol));
727    bzero(resp, sizeof(struct protocol));
728
729    while (1) {
730        if (pfd == FD_INVALID) {
731            pfd = client_socket_init(sConfig);
732            if (pfd == FD_INVALID) {
733                if (sConfig->ipport > 0) {
734                    syslog(LOG_ERR,
735                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
736                        sConfig->iphostname, sConfig->ipport);
737                } else {
738                    syslog( LOG_ERR,
739                            "Failed to connect to render slave %s, trying again in 30 seconds",
740                            sConfig->socketname);
741                }
742                sleep(30);
743                continue;
744            }
745        }
746
747        enum protoCmd ret;
748        struct item *item = fetch_request();
749        if (item) {
750            struct protocol *req = &item->req;
751            req_slave->ver = PROTO_VER;
752            req_slave->cmd = cmdRender;
753            strcpy(req_slave->xmlname, req->xmlname);
754            req_slave->x = req->x;
755            req_slave->y = req->y;
756            req_slave->z = req->z;
757
758            /*Dispatch request to slave renderd*/
759            retry = 2;
760            syslog(LOG_INFO,
761                    "Dispatching request to slave thread on fd %i", pfd);
762            do {
763                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
764
765                if (ret_size == sizeof(struct protocol)) {
766                    //correctly sent command to slave
767                    break;
768                }
769
770                if (errno != EPIPE) {
771                    syslog(LOG_ERR,
772                            "Failed to send cmd to render slave, shutting down thread");
773                    return NULL;
774                }
775
776                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
777                close(pfd);
778                pfd = client_socket_init(sConfig);
779                if (pfd == FD_INVALID) {
780                    syslog(LOG_ERR,
781                            "Failed to re-connect to render slave, dropping request");
782                    ret = cmdNotDone;
783                    send_response(item, ret);
784                    break;
785                }
786            } while (retry--);
787            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
788                continue;
789            }
790
791            ret_size = 0;
792            retry = 10;
793            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
794                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
795                        - ret_size), 0);
796                if ((errno == EPIPE) || ret_size == 0) {
797                    close(pfd);
798                    pfd = FD_INVALID;
799                    ret_size = 0;
800                    syslog(LOG_ERR, "Pipe to render slave closed");
801                    break;
802                }
803                retry--;
804            }
805            if (ret_size < sizeof(struct protocol)) {
806                if (sConfig->ipport > 0) {
807                    syslog( LOG_ERR,
808                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
809                            sConfig->iphostname, sConfig->ipport);
810                } else {
811                    syslog( LOG_ERR,
812                            "Invalid reply render slave %s, trying again in 30 seconds",
813                            sConfig->socketname);
814                }
815
816                ret = cmdNotDone;
817                send_response(item, ret);
818                sleep(30);
819            } else {
820                ret = resp->cmd;
821                send_response(item, ret);
822                if (resp->cmd != cmdDone) {
823                    if (sConfig->ipport > 0) {
824                        syslog( LOG_ERR,
825                                "Request from render slave %s:%i did not complete correctly",
826                                sConfig->iphostname, sConfig->ipport);
827                    } else {
828                        syslog( LOG_ERR,
829                                "Request from render slave %s did not complete correctly",
830                                sConfig->socketname);
831                    }
832                    //Sleep for a while to make sure we don't overload the renderer
833                    //This only happens if it didn't correctly block on the rendering
834                    //request
835                    sleep(30);
836                }
837            }
838
839        } else {
840            sleep(1); // TODO: Use an event to indicate there are new requests
841        }
842    }
843    return NULL;
844}
845
846
847int main(int argc, char **argv)
848{
849    int fd, i, j, k;
850
851    int c;
852    int foreground=0;
853    int active_slave=0;
854    int log_options;
855    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
856
857    while (1) {
858        int option_index = 0;
859        static struct option long_options[] = {
860            {"config", 1, 0, 'c'},
861            {"foreground", 1, 0, 'f'},
862            {"slave", 1, 0, 's'},
863            {"help", 0, 0, 'h'},
864            {0, 0, 0, 0}
865        };
866
867        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
868        if (c == -1)
869            break;
870
871        switch (c) {
872            case 'f':
873                foreground=1;
874                break;
875            case 'c':
876                strncpy(config_file_name, optarg, PATH_MAX-1);
877                config_file_name[PATH_MAX-1] = 0;
878                break;
879            case 's':
880                if (sscanf(optarg, "%i", &active_slave) != 1) {
881                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
882                    active_slave = 0;
883                }
884                break;
885            case 'h':
886                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
887                fprintf(stderr, "Mapnik rendering daemon\n");
888                fprintf(stderr, "  -f, --foreground     run in foreground\n");
889                fprintf(stderr, "  -h, --help           display this help and exit\n");
890                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
891                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
892                exit(0);
893            default:
894                fprintf(stderr, "unknown config option '%c'\n", c);
895                exit(1);
896        }
897    }
898
899    log_options = LOG_PID;
900#ifdef LOG_PERROR
901    if (foreground)
902        log_options |= LOG_PERROR;
903#endif
904    openlog("renderd", log_options, LOG_DAEMON);
905
906    syslog(LOG_INFO, "Rendering daemon started");
907
908    pthread_mutex_init(&qLock, NULL);
909    pthread_cond_init(&qCond, NULL);
910    reqHead.next = reqHead.prev = &reqHead;
911    reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
912    reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
913    dirtyHead.next = dirtyHead.prev = &dirtyHead;
914    renderHead.next = renderHead.prev = &renderHead;
915    hashidxSize = HASHIDX_SIZE;
916    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
917    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
918
919    stats.noDirtyRender = 0;
920    stats.noReqDroped = 0;
921    stats.noReqRender = 0;
922    stats.noReqPrioRender = 0;
923    stats.noReqBulkRender = 0;
924
925    xmlconfigitem maps[XMLCONFIGS_MAX];
926    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
927
928    renderd_config config_slaves[MAX_SLAVES];
929    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
930    bzero(&config, sizeof(renderd_config));
931
932    dictionary *ini = iniparser_load(config_file_name);
933    if (! ini) {
934        exit(1);
935    }
936
937    noSlaveRenders = 0;
938
939    int iconf = -1;
940    char buffer[PATH_MAX];
941    for (int section=0; section < iniparser_getnsec(ini); section++) {
942        char *name = iniparser_getsecname(ini, section);
943        syslog(LOG_INFO, "Parsing section %s\n", name);
944        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
945            if (config.tile_dir == NULL) {
946                fprintf(stderr, "No valid (active) renderd config section available\n");
947                exit(7);
948            }
949            /* this is a map section */
950            iconf++;
951            if (strlen(name) >= XMLCONFIG_MAX) {
952                fprintf(stderr, "XML name too long: %s\n", name);
953                exit(7);
954            }
955
956            strcpy(maps[iconf].xmlname, name);
957            if (iconf >= XMLCONFIGS_MAX) {
958                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
959                exit(7);
960            }
961            sprintf(buffer, "%s:uri", name);
962            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
963            if (strlen(ini_uri) >= PATH_MAX) {
964                fprintf(stderr, "URI too long: %s\n", ini_uri);
965                exit(7);
966            }
967            strcpy(maps[iconf].xmluri, ini_uri);
968            sprintf(buffer, "%s:xml", name);
969            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
970            if (strlen(ini_xmlpath) >= PATH_MAX){
971                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
972                exit(7);
973            }
974            sprintf(buffer, "%s:host", name);
975            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
976            if (strlen(ini_hostname) >= PATH_MAX) {
977                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
978                exit(7);
979            }
980
981            sprintf(buffer, "%s:htcphost", name);
982            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
983            if (strlen(ini_htcpip) >= PATH_MAX) {
984                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
985                exit(7);
986            }
987            strcpy(maps[iconf].xmlfile, ini_xmlpath);
988            strcpy(maps[iconf].tile_dir, config.tile_dir);
989            strcpy(maps[iconf].host, ini_hostname);
990            strcpy(maps[iconf].htcpip, ini_htcpip);
991        } else if (strncmp(name, "renderd", 7) == 0) {
992            int render_sec = 0;
993            if (sscanf(name, "renderd%i", &render_sec) != 1) {
994                render_sec = 0;
995            }
996            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
997            if (render_sec >= MAX_SLAVES) {
998                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
999                        MAX_SLAVES);
1000                exit(7);
1001            }
1002            sprintf(buffer, "%s:socketname", name);
1003            config_slaves[render_sec].socketname = iniparser_getstring(ini,
1004                    buffer, (char *) RENDER_SOCKET);
1005            sprintf(buffer, "%s:iphostname", name);
1006            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
1007                    buffer, "");
1008            sprintf(buffer, "%s:ipport", name);
1009            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
1010            sprintf(buffer, "%s:num_threads", name);
1011            config_slaves[render_sec].num_threads = iniparser_getint(ini,
1012                    buffer, NUM_THREADS);
1013            sprintf(buffer, "%s:tile_dir", name);
1014            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
1015                    buffer, (char *) HASH_PATH);
1016            sprintf(buffer, "%s:stats_file", name);
1017            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
1018                    buffer, NULL);
1019
1020            if (render_sec == active_slave) {
1021                config.socketname = config_slaves[render_sec].socketname;
1022                config.iphostname = config_slaves[render_sec].iphostname;
1023                config.ipport = config_slaves[render_sec].ipport;
1024                config.num_threads = config_slaves[render_sec].num_threads;
1025                config.tile_dir = config_slaves[render_sec].tile_dir;
1026                config.stats_filename
1027                        = config_slaves[render_sec].stats_filename;
1028                config.mapnik_plugins_dir = iniparser_getstring(ini,
1029                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
1030                config.mapnik_font_dir = iniparser_getstring(ini,
1031                        "mapnik:font_dir", (char *) FONT_DIR);
1032                config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
1033                        "mapnik:font_dir_recurse", FONT_RECURSE);
1034            } else {
1035                noSlaveRenders += config_slaves[render_sec].num_threads;
1036            }
1037        }
1038    }
1039
1040    if (config.ipport > 0) {
1041        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
1042    } else {
1043        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
1044    }
1045    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
1046    if (active_slave == 0) {
1047        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
1048    }
1049    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
1050    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
1051    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
1052    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
1053    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
1054    for (int i = 0; i < MAX_SLAVES; i++) {
1055        if (config_slaves[i].num_threads == 0) {
1056            continue;
1057        }
1058        if (i == active_slave) {
1059            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
1060        }
1061        if (config_slaves[i].ipport > 0) {
1062                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
1063                        config_slaves[i].iphostname, config_slaves[i].ipport);
1064            } else {
1065                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
1066                        config_slaves[i].socketname);
1067            }
1068        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1069                config_slaves[i].num_threads);
1070        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1071                config_slaves[i].tile_dir);
1072        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1073                config_slaves[i].stats_filename);
1074    }
1075
1076    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1077        if (maps[iconf].xmlname[0] != 0) {
1078         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1079                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1080                 maps[iconf].htcpip, maps[iconf].host);
1081        }
1082    }
1083
1084    fd = server_socket_init(&config);
1085#if 0
1086    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1087        fprintf(stderr, "setting socket non-block failed\n");
1088        close(fd);
1089        exit(5);
1090    }
1091#endif
1092
1093    //sigPipeAction.sa_handler = pipe_handler;
1094    sigPipeAction.sa_handler = SIG_IGN;
1095    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1096        fprintf(stderr, "failed to register signal handler\n");
1097        close(fd);
1098        exit(6);
1099    }
1100
1101    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1102
1103    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1104    if (foreground) {
1105        fprintf(stderr, "Running in foreground mode...\n");
1106    } else {
1107        if (daemon(0, 0) != 0) {
1108            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1109        }
1110        /* write pid file */
1111        FILE *pidfile = fopen(PIDFILE, "w");
1112        if (pidfile) {
1113            (void) fprintf(pidfile, "%d\n", getpid());
1114            (void) fclose(pidfile);
1115        }
1116    }
1117
1118    if (config.stats_filename != NULL) {
1119        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1120            syslog(LOG_WARNING, "Could not create stats writeout thread");
1121        }
1122    } else {
1123        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1124    }
1125
1126    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1127
1128    for(i=0; i<config.num_threads; i++) {
1129        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1130            fprintf(stderr, "error spawning render thread\n");
1131            close(fd);
1132            exit(7);
1133        }
1134    }
1135
1136    if (active_slave == 0) {
1137        //Only the master renderd opens connections to its slaves
1138        k = 0;
1139        slave_threads
1140                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1141        for (i = 1; i < MAX_SLAVES; i++) {
1142            for (j = 0; j < config_slaves[i].num_threads; j++) {
1143                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1144                        (void *) &config_slaves[i])) {
1145                    fprintf(stderr, "error spawning render thread\n");
1146                    close(fd);
1147                    exit(7);
1148                }
1149            }
1150        }
1151    }
1152
1153    process_loop(fd);
1154
1155    unlink(config.socketname);
1156    close(fd);
1157    return 0;
1158}
Note: See TracBrowser for help on using the repository browser.