source: subversion/applications/utils/mod_tile/daemon.c @ 22934

Last change on this file since 22934 was 22934, checked in by apmon, 9 years ago

[renderd] Add a munin graph to show how much time renderd spends rendering tiles of various zoom levels

With this graph, it is hopefully possible to see where (i.e. towards which zoomlevel) most of the rendering time goes,
and thus might give some indication where optimisations in the style sheet are best focused at, if performance is an issue.
It useses purely wallclock time and thus does not account for individual resources.

File size: 38.5 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h>
9#include <sys/select.h>
10#include <sys/stat.h>
11#include <sys/un.h>
12#include <poll.h>
13#include <errno.h>
14#include <pthread.h>
15#include <signal.h>
16#include <string.h>
17#include <syslog.h>
18#include <getopt.h>
19
20#include "render_config.h"
21#include "daemon.h"
22#include "gen_tile.h"
23#include "protocol.h"
24#include "dir_utils.h"
25
26#define PIDFILE "/var/run/renderd/renderd.pid"
27
28extern "C" {
29#include "iniparser3.0b/src/iniparser.h"
30}
31
32static pthread_t *render_threads;
33static pthread_t *slave_threads;
34static struct sigaction sigPipeAction;
35
36static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
37static struct item_idx * item_hashidx;
38static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
39static pthread_mutex_t qLock;
40static pthread_cond_t qCond;
41
42static stats_struct stats;
43static pthread_t stats_thread;
44
45static renderd_config config;
46
47int noSlaveRenders;
48int hashidxSize;
49
50void statsRenderFinish(int z, long time) {
51    pthread_mutex_lock(&qLock);
52    if ((z >= 0) && (z <= MAX_ZOOM)) {
53        stats.noZoomRender[z]++;
54        stats.timeZoomRender[z] += time;
55    }
56    pthread_mutex_unlock(&qLock);
57}
58
59struct item *fetch_request(void)
60{
61    struct item *item = NULL;
62
63    pthread_mutex_lock(&qLock);
64
65    while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
66        pthread_cond_wait(&qCond, &qLock);
67    }
68    if (reqPrioNum) {
69        item = reqPrioHead.next;
70        reqPrioNum--;
71        stats.noReqPrioRender++;
72    } else if (reqNum) {
73        item = reqHead.next;
74        reqNum--;
75        stats.noReqRender++;
76    } else if (dirtyNum) {
77        item = dirtyHead.next;
78        dirtyNum--;
79        stats.noDirtyRender++;
80    } else if (reqBulkNum) {
81        item = reqBulkHead.next;
82        reqBulkNum--;
83        stats.noReqBulkRender++;
84    }
85    if (item) {
86        item->next->prev = item->prev;
87        item->prev->next = item->next;
88
89        item->prev = &renderHead;
90        item->next = renderHead.next;
91        renderHead.next->prev = item;
92        renderHead.next = item;
93        item->inQueue = queueRender;
94    }
95
96
97    pthread_mutex_unlock(&qLock);
98
99    return item;
100}
101
102void clear_requests(int fd)
103{
104    struct item *item, *dupes, *queueHead;
105
106    /**Only need to look up on the shorter request and render queue
107      * so using the linear list shouldn't be a problem
108      */
109    pthread_mutex_lock(&qLock);
110    for (int i = 0; i < 4; i++) {
111        switch (i) {
112        case 0: { queueHead = &reqHead; break;}
113        case 1: { queueHead = &renderHead; break;}
114        case 2: { queueHead = &reqPrioHead; break;}
115        case 3: { queueHead = &reqBulkHead; break;}
116        }
117
118        item = queueHead->next;
119        while (item != queueHead) {
120            if (item->fd == fd)
121                item->fd = FD_INVALID;
122
123            dupes = item->duplicates;
124            while (dupes) {
125                if (dupes->fd == fd)
126                    dupes->fd = FD_INVALID;
127                dupes = dupes->duplicates;
128            }
129            item = item->next;
130        }
131    }
132
133    pthread_mutex_unlock(&qLock);
134}
135
136
137static int calcHashKey(struct item *item) {
138    uint64_t xmlnameHash = 0;
139    uint64_t key;
140    for (int i = 0; item->req.xmlname[i] != 0; i++) {
141        xmlnameHash += item->req.xmlname[i];
142    }
143    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
144    return key % hashidxSize;
145}
146
147void insert_item_idx(struct item *item) {
148    struct item_idx * nextItem;
149    struct item_idx * prevItem;
150
151    int key = calcHashKey(item);
152
153    if (item_hashidx[key].item == NULL) {
154        item_hashidx[key].item = item;
155    } else {
156        prevItem = &(item_hashidx[key]);
157        nextItem = item_hashidx[key].next;
158        while(nextItem) {
159            prevItem = nextItem;
160            nextItem = nextItem->next;
161        }
162        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
163        nextItem->item = item;
164        nextItem->next = NULL;
165        prevItem->next = nextItem;
166    }
167}
168
169void remove_item_idx(struct item * item) {
170    int key = calcHashKey(item);
171    struct item_idx * nextItem;
172    struct item_idx * prevItem;
173    struct item * test;
174    if (item_hashidx[key].item == NULL) {
175        //item not in index;
176        return;
177    }
178    prevItem = &(item_hashidx[key]);
179    nextItem = &(item_hashidx[key]);
180
181    while (nextItem != NULL) {
182        test = nextItem->item;
183        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
184                == test->req.z) && (!strcmp(item->req.xmlname,
185                test->req.xmlname))) {
186            /*
187             * Found item, removing it from list
188             */
189            nextItem->item = NULL;
190            if (nextItem->next != NULL) {
191                if (nextItem == &(item_hashidx[key])) {
192                    prevItem = nextItem->next;
193                    memcpy(&(item_hashidx[key]), nextItem->next,
194                            sizeof(struct item_idx));
195                    free(prevItem);
196                } else {
197                    prevItem->next = nextItem->next;
198                }
199            } else {
200                prevItem->next = NULL;
201            }
202
203            if (nextItem != &(item_hashidx[key])) {
204                free(nextItem);
205            }
206            return;
207        } else {
208            prevItem = nextItem;
209            nextItem = nextItem->next;
210        }
211    }
212}
213
214struct item * lookup_item_idx(struct item * item) {
215    struct item_idx * nextItem;
216    struct item * test;
217
218    int key = calcHashKey(item);
219
220    if (item_hashidx[key].item == NULL) {
221        return NULL;
222    } else {
223        nextItem = &(item_hashidx[key]);
224        while (nextItem != NULL) {
225            test = nextItem->item;
226            if ((item->mx == test->mx) && (item->my == test->my)
227                    && (item->req.z == test->req.z) && (!strcmp(
228                    item->req.xmlname, test->req.xmlname))) {
229                return test;
230            } else {
231                nextItem = nextItem->next;
232            }
233        }
234    }
235    return NULL;
236}
237
238static inline const char *cmdStr(enum protoCmd c)
239{
240    switch (c) {
241        case cmdIgnore:  return "Ignore";
242        case cmdRender:  return "Render";
243        case cmdRenderPrio:  return "RenderPrio";
244        case cmdRenderBulk:  return "RenderBulk";
245        case cmdDirty:   return "Dirty";
246        case cmdDone:    return "Done";
247        case cmdNotDone: return "NotDone";
248        default:         return "unknown";
249    }
250}
251
252void send_response(struct item *item, enum protoCmd rsp)
253{
254    struct protocol *req = &item->req;
255    int ret;
256
257    pthread_mutex_lock(&qLock);
258    item->next->prev = item->prev;
259    item->prev->next = item->next;
260    remove_item_idx(item);
261    pthread_mutex_unlock(&qLock);
262
263    while (item) {
264        struct item *prev = item;
265        req = &item->req;
266        if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
267            req->cmd = rsp;
268            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
269            ret = send(item->fd, req, sizeof(*req), 0);
270            if (ret != sizeof(*req))
271                perror("send error during send_done");
272        }
273        item = item->duplicates;
274        free(prev);
275    }
276}
277
278
279enum protoCmd pending(struct item *test)
280{
281    // check all queues and render list to see if this request already queued
282    // If so, add this new request as a duplicate
283    // call with qLock held
284    struct item *item;
285
286    item = lookup_item_idx(test);
287    if (item != NULL) {
288        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
289            test->duplicates = item->duplicates;
290            item->duplicates = test;
291            test->inQueue = queueDuplicate;
292            return cmdIgnore;
293        } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
294            return cmdNotDone;
295        }
296    }
297
298    return cmdRender;
299}
300
301
302enum protoCmd rx_request(const struct protocol *req, int fd)
303{
304    struct protocol *reqnew;
305    struct item *list = NULL, *item;
306    enum protoCmd pend;
307
308    // Upgrade version 1 to version 2
309    if (req->ver == 1) {
310        reqnew = (struct protocol *)malloc(sizeof(protocol));
311        memcpy(reqnew, req, sizeof(protocol_v1));
312        reqnew->xmlname[0] = 0;
313        req = reqnew;
314    }
315    else if (req->ver != 2) {
316        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
317        return cmdIgnore;
318    }
319
320    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
321            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
322
323    if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
324        return cmdIgnore;
325
326    if (check_xyz(req->x, req->y, req->z))
327        return cmdNotDone;
328
329    item = (struct item *)malloc(sizeof(*item));
330    if (!item) {
331            syslog(LOG_ERR, "malloc failed");
332            return cmdNotDone;
333    }
334
335    item->req = *req;
336    item->duplicates = NULL;
337    item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
338
339#ifdef METATILE
340    /* Round down request co-ordinates to the neareast N (should be a power of 2)
341     * Note: request path is no longer consistent but this will be recalculated
342     * when the metatile is being rendered.
343     */
344    item->mx = item->req.x & ~(METATILE-1);
345    item->my = item->req.y & ~(METATILE-1);
346#else
347    item->mx = item->req.x;
348    item->my = item->req.y;
349#endif
350
351    pthread_mutex_lock(&qLock);
352
353    // Check for a matching request in the current rendering or dirty queues
354    pend = pending(item);
355    if (pend == cmdNotDone) {
356        // We found a match in the dirty queue, can not wait for it
357        pthread_mutex_unlock(&qLock);
358        free(item);
359        return cmdNotDone;
360    }
361    if (pend == cmdIgnore) {
362        // Found a match in render queue, item added as duplicate
363        pthread_mutex_unlock(&qLock);
364        return cmdIgnore;
365    }
366
367    // New request, add it to render or dirty queue
368    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
369        list = &reqHead;
370        item->inQueue = queueRequest;
371        reqNum++;
372    } else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
373        list = &reqPrioHead;
374        item->inQueue = queueRequestPrio;
375        reqPrioNum++;
376    } else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
377        list = &reqBulkHead;
378        item->inQueue = queueRequestBulk;
379        reqBulkNum++;
380    } else if (dirtyNum < DIRTY_LIMIT) {
381        list = &dirtyHead;
382        item->inQueue = queueDirty;
383        dirtyNum++;
384        item->fd = FD_INVALID; // No response after render
385    } else {
386        // The queue is severely backlogged. Drop request
387        stats.noReqDroped++;
388        pthread_mutex_unlock(&qLock);
389        free(item);
390        return cmdNotDone;
391    }
392
393    if (list) {
394        item->next = list;
395        item->prev = list->prev;
396        item->prev->next = item;
397        list->prev = item;
398        /* In addition to the linked list, add item to a hash table index
399         * for faster lookup of pending requests.
400         */
401        insert_item_idx(item);
402
403        pthread_cond_signal(&qCond);
404    } else
405        free(item);
406
407    pthread_mutex_unlock(&qLock);
408
409    return (list == &reqHead)?cmdIgnore:cmdNotDone;
410}
411
412
413void process_loop(int listen_fd)
414{
415    int num_connections = 0;
416    int connections[MAX_CONNECTIONS];
417
418    bzero(connections, sizeof(connections));
419
420    while (1) {
421        struct sockaddr_un in_addr;
422        socklen_t in_addrlen = sizeof(in_addr);
423        fd_set rd;
424        int incoming, num, nfds, i;
425
426        FD_ZERO(&rd);
427        FD_SET(listen_fd, &rd);
428        nfds = listen_fd+1;
429
430        for (i=0; i<num_connections; i++) {
431            FD_SET(connections[i], &rd);
432            nfds = MAX(nfds, connections[i]+1);
433        }
434
435        num = select(nfds, &rd, NULL, NULL, NULL);
436        if (num == -1)
437            perror("select()");
438        else if (num) {
439            //printf("Data is available now on %d fds\n", num);
440            if (FD_ISSET(listen_fd, &rd)) {
441                num--;
442                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
443                if (incoming < 0) {
444                    perror("accept()");
445                } else {
446                    if (num_connections == MAX_CONNECTIONS) {
447                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
448                        close(incoming);
449                    } else {
450                        connections[num_connections++] = incoming;
451                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
452                    }
453                }
454            }
455            for (i=0; num && (i<num_connections); i++) {
456                int fd = connections[i];
457                if (FD_ISSET(fd, &rd)) {
458                    struct protocol cmd;
459                    int ret;
460
461                    // TODO: to get highest performance we should loop here until we get EAGAIN
462                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
463                    if (ret == sizeof(cmd)) {
464                        enum protoCmd rsp = rx_request(&cmd, fd);
465
466                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
467                            cmd.cmd = rsp;
468                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
469                            ret = send(fd, &cmd, sizeof(cmd), 0);
470                            if (ret != sizeof(cmd))
471                                perror("response send error");
472                        }
473                    } else if (!ret) {
474                        int j;
475
476                        num_connections--;
477                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
478                        for (j=i; j < num_connections; j++)
479                            connections[j] = connections[j+1];
480                        clear_requests(fd);
481                        close(fd);
482                    } else {
483                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
484                        break;
485                    }
486                }
487            }
488        } else {
489            syslog(LOG_ERR, "Select timeout");
490        }
491    }
492}
493
494/**
495 * Periodically write out current stats to a stats file. This information
496 * can then be used to monitor performance of renderd e.g. with a munin plugin
497 */
498void *stats_writeout_thread(void * arg) {
499    stats_struct lStats;
500    int dirtQueueLength;
501    int reqQueueLength;
502    int reqPrioQueueLength;
503    int reqBulkQueueLength;
504        int i;
505
506    int noFailedAttempts = 0;
507    char tmpName[PATH_MAX];
508
509    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
510
511    syslog(LOG_DEBUG, "Starting stats thread");
512    while (1) {
513        pthread_mutex_lock(&qLock);
514        memcpy(&lStats, &stats, sizeof(stats_struct));
515        dirtQueueLength = dirtyNum;
516        reqQueueLength = reqNum;
517        reqPrioQueueLength = reqPrioNum;
518        reqBulkQueueLength = reqBulkNum;
519        pthread_mutex_unlock(&qLock);
520
521        FILE * statfile = fopen(tmpName, "w");
522        if (statfile == NULL) {
523            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
524            noFailedAttempts++;
525            if (noFailedAttempts > 3) {
526                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
527                break;
528            }
529            continue;
530        } else {
531            noFailedAttempts = 0;
532            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
533            fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
534            fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
535            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
536            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
537            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
538            fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
539            fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
540            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
541            for (i = 0; i <= MAX_ZOOM; i++) {
542                fprintf(statfile,"ZoomRendered%02i: %li\n", i, lStats.noZoomRender[i]);
543            }
544            for (i = 0; i <= MAX_ZOOM; i++) {
545                fprintf(statfile,"TimeRenderedZoom%02i: %li\n", i, lStats.timeZoomRender[i]);
546            }
547            fclose(statfile);
548            if (rename(tmpName, config.stats_filename)) {
549                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
550                noFailedAttempts++;
551                if (noFailedAttempts > 3) {
552                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
553                    break;
554                }
555                continue;
556            }
557        }
558        sleep(10);
559    }
560    return NULL;
561}
562
563int client_socket_init(renderd_config * sConfig) {
564    int fd;
565    struct sockaddr_un * addrU;
566    struct sockaddr_in * addrI;
567    struct hostent *server;
568    if (sConfig->ipport > 0) {
569        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i",
570                sConfig->iphostname, sConfig->ipport);
571        addrI = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in));
572        fd = socket(PF_INET, SOCK_STREAM, 0);
573        server = gethostbyname(sConfig->iphostname);
574        if (server == NULL) {
575            syslog(LOG_WARNING, "Could not resolve hostname: %s",
576                    sConfig->iphostname);
577            return FD_INVALID;
578        }
579        bzero((char *) addrI, sizeof(struct sockaddr_in));
580        addrI->sin_family = AF_INET;
581        bcopy((char *) server->h_addr, (char *) &addrI->sin_addr.s_addr,
582                server->h_length);
583        addrI->sin_port = htons(sConfig->ipport);
584        if (connect(fd, (struct sockaddr *) addrI, sizeof(struct sockaddr_in)) < 0) {
585            syslog(LOG_WARNING, "Could not connect to %s:%i",
586                    sConfig->iphostname, sConfig->ipport);
587            return FD_INVALID;
588        }
589        free(addrI);
590        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport,
591                fd);
592    } else {
593        syslog(LOG_INFO, "Initialising unix client socket on %s",
594                sConfig->socketname);
595        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
596        fd = socket(PF_UNIX, SOCK_STREAM, 0);
597        if (fd < 0) {
598            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
599            return FD_INVALID;
600        }
601
602        bzero(addrU, sizeof(struct sockaddr_un));
603        addrU->sun_family = AF_UNIX;
604        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
605
606        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
607            syslog(LOG_WARNING, "socket connect failed for: %s",
608                    sConfig->socketname);
609            close(fd);
610            return FD_INVALID;
611        }
612        free(addrU);
613        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
614                fd);
615    }
616    return fd;
617}
618
619int server_socket_init(renderd_config *sConfig) {
620    struct sockaddr_un addrU;
621    struct sockaddr_in addrI;
622    mode_t old;
623    int fd;
624
625    if (sConfig->ipport > 0) {
626        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
627                sConfig->iphostname, sConfig->ipport);
628        fd = socket(PF_INET, SOCK_STREAM, 0);
629        if (fd < 0) {
630            fprintf(stderr, "failed to create IP socket\n");
631            exit(2);
632        }
633        bzero(&addrI, sizeof(addrI));
634        addrI.sin_family = AF_INET;
635        addrI.sin_addr.s_addr = INADDR_ANY;
636        addrI.sin_port = htons(sConfig->ipport);
637        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
638            fprintf(stderr, "socket bind failed for: %s:%i\n",
639                    sConfig->iphostname, sConfig->ipport);
640            close(fd);
641            exit(3);
642        }
643    } else {
644        syslog(LOG_INFO, "Initialising unix server socket on %s",
645                sConfig->socketname);
646
647        fd = socket(PF_UNIX, SOCK_STREAM, 0);
648        if (fd < 0) {
649            fprintf(stderr, "failed to create unix socket\n");
650            exit(2);
651        }
652
653        bzero(&addrU, sizeof(addrU));
654        addrU.sun_family = AF_UNIX;
655        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
656
657        unlink(addrU.sun_path);
658
659        old = umask(0); // Need daemon socket to be writeable by apache
660        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
661            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
662            close(fd);
663            exit(3);
664        }
665        umask(old);
666    }
667
668    if (listen(fd, QUEUE_MAX) < 0) {
669        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
670        close(fd);
671        exit(4);
672    }
673
674    syslog(LOG_DEBUG, "Created server socket %i", fd);
675
676    return fd;
677
678}
679
680/**
681 * This function is used as a the start function for the slave renderer thread.
682 * It pulls a request from the central queue of requests and dispatches it to
683 * the slave renderer. It then blocks and waits for the response with no timeout.
684 * As it only sends one request at a time (there are as many slave_thread threads as there
685 * are rendering threads on the slaves) nothing gets queued on the slave and should get
686 * rendererd immediately. Thus overall, requests should be nicely load balanced between
687 * all the rendering threads available both locally and in the slaves.
688 */
689void *slave_thread(void * arg) {
690    renderd_config * sConfig = (renderd_config *) arg;
691
692    int pfd = FD_INVALID;
693    int retry;
694    size_t ret_size;
695
696    struct protocol * resp;
697    struct protocol * req_slave;
698
699    req_slave = (struct protocol *)malloc(sizeof(protocol));
700    resp = (struct protocol *)malloc(sizeof(protocol));
701    bzero(req_slave, sizeof(struct protocol));
702    bzero(resp, sizeof(struct protocol));
703
704    while (1) {
705        if (pfd == FD_INVALID) {
706            pfd = client_socket_init(sConfig);
707            if (pfd == FD_INVALID) {
708                if (sConfig->ipport > 0) {
709                    syslog(LOG_ERR,
710                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
711                        sConfig->iphostname, sConfig->ipport);
712                } else {
713                    syslog( LOG_ERR,
714                            "Failed to connect to render slave %s, trying again in 30 seconds",
715                            sConfig->socketname);
716                }
717                sleep(30);
718                continue;
719            }
720        }
721
722        enum protoCmd ret;
723        struct item *item = fetch_request();
724        if (item) {
725            struct protocol *req = &item->req;
726            req_slave->ver = PROTO_VER;
727            req_slave->cmd = cmdRender;
728            strcpy(req_slave->xmlname, req->xmlname);
729            req_slave->x = req->x;
730            req_slave->y = req->y;
731            req_slave->z = req->z;
732
733            /*Dispatch request to slave renderd*/
734            retry = 2;
735            syslog(LOG_INFO,
736                    "Dispatching request to slave thread on fd %i", pfd);
737            do {
738                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
739
740                if (ret_size == sizeof(struct protocol)) {
741                    //correctly sent command to slave
742                    break;
743                }
744
745                if (errno != EPIPE) {
746                    syslog(LOG_ERR,
747                            "Failed to send cmd to render slave, shutting down thread");
748                    return NULL;
749                }
750
751                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
752                close(pfd);
753                pfd = client_socket_init(sConfig);
754                if (pfd == FD_INVALID) {
755                    syslog(LOG_ERR,
756                            "Failed to re-connect to render slave, dropping request");
757                    ret = cmdNotDone;
758                    send_response(item, ret);
759                    break;
760                }
761            } while (retry--);
762            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
763                continue;
764            }
765
766            ret_size = 0;
767            retry = 10;
768            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
769                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
770                        - ret_size), 0);
771                if ((errno == EPIPE) || ret_size == 0) {
772                    close(pfd);
773                    pfd = FD_INVALID;
774                    ret_size = 0;
775                    syslog(LOG_ERR, "Pipe to render slave closed");
776                    break;
777                }
778                retry--;
779            }
780            if (ret_size < sizeof(struct protocol)) {
781                if (sConfig->ipport > 0) {
782                    syslog( LOG_ERR,
783                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
784                            sConfig->iphostname, sConfig->ipport);
785                } else {
786                    syslog( LOG_ERR,
787                            "Invalid reply render slave %s, trying again in 30 seconds",
788                            sConfig->socketname);
789                }
790
791                ret = cmdNotDone;
792                send_response(item, ret);
793                sleep(30);
794            } else {
795                ret = resp->cmd;
796                send_response(item, ret);
797                if (resp->cmd != cmdDone) {
798                    if (sConfig->ipport > 0) {
799                        syslog( LOG_ERR,
800                                "Request from render slave %s:%i did not complete correctly",
801                                sConfig->iphostname, sConfig->ipport);
802                    } else {
803                        syslog( LOG_ERR,
804                                "Request from render slave %s did not complete correctly",
805                                sConfig->socketname);
806                    }
807                    //Sleep for a while to make sure we don't overload the renderer
808                    //This only happens if it didn't correctly block on the rendering
809                    //request
810                    sleep(30);
811                }
812            }
813
814        } else {
815            sleep(1); // TODO: Use an event to indicate there are new requests
816        }
817    }
818    return NULL;
819}
820
821
822int main(int argc, char **argv)
823{
824    int fd, i, j, k;
825
826    int c;
827    int foreground=0;
828    int active_slave=0;
829    int log_options;
830    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
831
832    while (1) {
833        int option_index = 0;
834        static struct option long_options[] = {
835            {"config", 1, 0, 'c'},
836            {"foreground", 1, 0, 'f'},
837            {"slave", 1, 0, 's'},
838            {"help", 0, 0, 'h'},
839            {0, 0, 0, 0}
840        };
841
842        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
843        if (c == -1)
844            break;
845
846        switch (c) {
847            case 'f':
848                foreground=1;
849                break;
850            case 'c':
851                strncpy(config_file_name, optarg, PATH_MAX-1);
852                config_file_name[PATH_MAX-1] = 0;
853                break;
854            case 's':
855                if (sscanf(optarg, "%i", &active_slave) != 1) {
856                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
857                    active_slave = 0;
858                }
859                break;
860            case 'h':
861                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
862                fprintf(stderr, "Mapnik rendering daemon\n");
863                fprintf(stderr, "  -f, --foreground     run in foreground\n");
864                fprintf(stderr, "  -h, --help           display this help and exit\n");
865                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
866                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
867                exit(0);
868            default:
869                fprintf(stderr, "unknown config option '%c'\n", c);
870                exit(1);
871        }
872    }
873
874    log_options = LOG_PID;
875#ifdef LOG_PERROR
876    if (foreground)
877        log_options |= LOG_PERROR;
878#endif
879    openlog("renderd", log_options, LOG_DAEMON);
880
881    syslog(LOG_INFO, "Rendering daemon started");
882
883    pthread_mutex_init(&qLock, NULL);
884    pthread_cond_init(&qCond, NULL);
885    reqHead.next = reqHead.prev = &reqHead;
886    reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
887    reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
888    dirtyHead.next = dirtyHead.prev = &dirtyHead;
889    renderHead.next = renderHead.prev = &renderHead;
890    hashidxSize = HASHIDX_SIZE;
891    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
892    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
893
894    stats.noDirtyRender = 0;
895    stats.noReqDroped = 0;
896    stats.noReqRender = 0;
897    stats.noReqPrioRender = 0;
898    stats.noReqBulkRender = 0;
899
900    xmlconfigitem maps[XMLCONFIGS_MAX];
901    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
902
903    renderd_config config_slaves[MAX_SLAVES];
904    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
905    bzero(&config, sizeof(renderd_config));
906
907    dictionary *ini = iniparser_load(config_file_name);
908    if (! ini) {
909        exit(1);
910    }
911
912    noSlaveRenders = 0;
913
914    int iconf = -1;
915    char buffer[PATH_MAX];
916    for (int section=0; section < iniparser_getnsec(ini); section++) {
917        char *name = iniparser_getsecname(ini, section);
918        syslog(LOG_INFO, "Parsing section %s\n", name);
919        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
920            if (config.tile_dir == NULL) {
921                fprintf(stderr, "No valid (active) renderd config section available\n");
922                exit(7);
923            }
924            /* this is a map section */
925            iconf++;
926            if (strlen(name) >= XMLCONFIG_MAX) {
927                fprintf(stderr, "XML name too long: %s\n", name);
928                exit(7);
929            }
930
931            strcpy(maps[iconf].xmlname, name);
932            if (iconf >= XMLCONFIGS_MAX) {
933                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
934                exit(7);
935            }
936            sprintf(buffer, "%s:uri", name);
937            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
938            if (strlen(ini_uri) >= PATH_MAX) {
939                fprintf(stderr, "URI too long: %s\n", ini_uri);
940                exit(7);
941            }
942            strcpy(maps[iconf].xmluri, ini_uri);
943            sprintf(buffer, "%s:xml", name);
944            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
945            if (strlen(ini_xmlpath) >= PATH_MAX){
946                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
947                exit(7);
948            }
949            sprintf(buffer, "%s:host", name);
950            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
951            if (strlen(ini_hostname) >= PATH_MAX) {
952                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
953                exit(7);
954            }
955
956            sprintf(buffer, "%s:htcphost", name);
957            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
958            if (strlen(ini_htcpip) >= PATH_MAX) {
959                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
960                exit(7);
961            }
962            strcpy(maps[iconf].xmlfile, ini_xmlpath);
963            strcpy(maps[iconf].tile_dir, config.tile_dir);
964            strcpy(maps[iconf].host, ini_hostname);
965            strcpy(maps[iconf].htcpip, ini_htcpip);
966        } else if (strncmp(name, "renderd", 7) == 0) {
967            int render_sec = 0;
968            if (sscanf(name, "renderd%i", &render_sec) != 1) {
969                render_sec = 0;
970            }
971            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
972            if (render_sec >= MAX_SLAVES) {
973                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
974                        MAX_SLAVES);
975                exit(7);
976            }
977            sprintf(buffer, "%s:socketname", name);
978            config_slaves[render_sec].socketname = iniparser_getstring(ini,
979                    buffer, (char *) RENDER_SOCKET);
980            sprintf(buffer, "%s:iphostname", name);
981            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
982                    buffer, "");
983            sprintf(buffer, "%s:ipport", name);
984            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
985            sprintf(buffer, "%s:num_threads", name);
986            config_slaves[render_sec].num_threads = iniparser_getint(ini,
987                    buffer, NUM_THREADS);
988            sprintf(buffer, "%s:tile_dir", name);
989            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
990                    buffer, (char *) HASH_PATH);
991            sprintf(buffer, "%s:stats_file", name);
992            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
993                    buffer, NULL);
994
995            if (render_sec == active_slave) {
996                config.socketname = config_slaves[render_sec].socketname;
997                config.iphostname = config_slaves[render_sec].iphostname;
998                config.ipport = config_slaves[render_sec].ipport;
999                config.num_threads = config_slaves[render_sec].num_threads;
1000                config.tile_dir = config_slaves[render_sec].tile_dir;
1001                config.stats_filename
1002                        = config_slaves[render_sec].stats_filename;
1003                config.mapnik_plugins_dir = iniparser_getstring(ini,
1004                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
1005                config.mapnik_font_dir = iniparser_getstring(ini,
1006                        "mapnik:font_dir", (char *) FONT_DIR);
1007                config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
1008                        "mapnik:font_dir_recurse", FONT_RECURSE);
1009            } else {
1010                noSlaveRenders += config_slaves[render_sec].num_threads;
1011            }
1012        }
1013    }
1014
1015    if (config.ipport > 0) {
1016        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
1017    } else {
1018        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
1019    }
1020    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
1021    if (active_slave == 0) {
1022        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
1023    }
1024    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
1025    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
1026    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
1027    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
1028    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
1029    for (int i = 0; i < MAX_SLAVES; i++) {
1030        if (config_slaves[i].num_threads == 0) {
1031            continue;
1032        }
1033        if (i == active_slave) {
1034            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
1035        }
1036        if (config_slaves[i].ipport > 0) {
1037                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
1038                        config_slaves[i].iphostname, config_slaves[i].ipport);
1039            } else {
1040                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
1041                        config_slaves[i].socketname);
1042            }
1043        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1044                config_slaves[i].num_threads);
1045        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1046                config_slaves[i].tile_dir);
1047        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1048                config_slaves[i].stats_filename);
1049    }
1050
1051    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1052        if (maps[iconf].xmlname[0] != 0) {
1053         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1054                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1055                 maps[iconf].htcpip, maps[iconf].host);
1056        }
1057    }
1058
1059    fd = server_socket_init(&config);
1060#if 0
1061    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1062        fprintf(stderr, "setting socket non-block failed\n");
1063        close(fd);
1064        exit(5);
1065    }
1066#endif
1067
1068    //sigPipeAction.sa_handler = pipe_handler;
1069    sigPipeAction.sa_handler = SIG_IGN;
1070    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1071        fprintf(stderr, "failed to register signal handler\n");
1072        close(fd);
1073        exit(6);
1074    }
1075
1076    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1077
1078    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1079    if (foreground) {
1080        fprintf(stderr, "Running in foreground mode...\n");
1081    } else {
1082        if (daemon(0, 0) != 0) {
1083            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1084        }
1085        /* write pid file */
1086        FILE *pidfile = fopen(PIDFILE, "w");
1087        if (pidfile) {
1088            (void) fprintf(pidfile, "%d\n", getpid());
1089            (void) fclose(pidfile);
1090        }
1091    }
1092
1093    if (config.stats_filename != NULL) {
1094        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1095            syslog(LOG_WARNING, "Could not create stats writeout thread");
1096        }
1097    } else {
1098        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1099    }
1100
1101    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1102
1103    for(i=0; i<config.num_threads; i++) {
1104        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1105            fprintf(stderr, "error spawning render thread\n");
1106            close(fd);
1107            exit(7);
1108        }
1109    }
1110
1111    if (active_slave == 0) {
1112        //Only the master renderd opens connections to its slaves
1113        k = 0;
1114        slave_threads
1115                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1116        for (i = 1; i < MAX_SLAVES; i++) {
1117            for (j = 0; j < config_slaves[i].num_threads; j++) {
1118                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1119                        (void *) &config_slaves[i])) {
1120                    fprintf(stderr, "error spawning render thread\n");
1121                    close(fd);
1122                    exit(7);
1123                }
1124            }
1125        }
1126    }
1127
1128    process_loop(fd);
1129
1130    unlink(config.socketname);
1131    close(fd);
1132    return 0;
1133}
Note: See TracBrowser for help on using the repository browser.