source: subversion/applications/utils/mod_tile/daemon.c @ 18082

Last change on this file since 18082 was 17953, checked in by jonb, 10 years ago

fix typo for reqBulkNum in new queue

File size: 38.0 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h>
9#include <sys/select.h>
10#include <sys/stat.h>
11#include <sys/un.h>
12#include <poll.h>
13#include <errno.h>
14#include <pthread.h>
15#include <signal.h>
16#include <string.h>
17#include <syslog.h>
18#include <getopt.h>
19
20#include "daemon.h"
21#include "gen_tile.h"
22#include "protocol.h"
23#include "render_config.h"
24#include "dir_utils.h"
25
26#define PIDFILE "/var/run/renderd/renderd.pid"
27
28extern "C" {
29#include "iniparser3.0b/src/iniparser.h"
30}
31
32static pthread_t *render_threads;
33static pthread_t *slave_threads;
34static struct sigaction sigPipeAction;
35
36static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
37static struct item_idx * item_hashidx;
38static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
39static pthread_mutex_t qLock;
40static pthread_cond_t qCond;
41
42static stats_struct stats;
43static pthread_t stats_thread;
44
45static renderd_config config;
46
47int noSlaveRenders;
48int hashidxSize;
49
50struct item *fetch_request(void)
51{
52    struct item *item = NULL;
53
54    pthread_mutex_lock(&qLock);
55
56    while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
57        pthread_cond_wait(&qCond, &qLock);
58    }
59    if (reqPrioNum) {
60        item = reqPrioHead.next;
61        reqPrioNum--;
62        stats.noReqPrioRender++;
63    } else if (reqNum) {
64        item = reqHead.next;
65        reqNum--;
66        stats.noReqRender++;
67    } else if (dirtyNum) {
68        item = dirtyHead.next;
69        dirtyNum--;
70        stats.noDirtyRender++;
71    } else if (reqBulkNum) {
72        item = reqBulkHead.next;
73        reqBulkNum--;
74        stats.noReqBulkRender++;
75    }
76    if (item) {
77        item->next->prev = item->prev;
78        item->prev->next = item->next;
79
80        item->prev = &renderHead;
81        item->next = renderHead.next;
82        renderHead.next->prev = item;
83        renderHead.next = item;
84        item->inQueue = queueRender;
85    }
86
87    pthread_mutex_unlock(&qLock);
88
89    return item;
90}
91
92void clear_requests(int fd)
93{
94    struct item *item, *dupes, *queueHead;
95
96    /**Only need to look up on the shorter request and render queue
97      * so using the linear list shouldn't be a problem
98      */
99    pthread_mutex_lock(&qLock);
100    for (int i = 0; i < 4; i++) {
101        switch (i) {
102        case 0: { queueHead = &reqHead; break;}
103        case 1: { queueHead = &renderHead; break;}
104        case 2: { queueHead = &reqPrioHead; break;}
105        case 3: { queueHead = &reqBulkHead; break;}
106        }
107
108        item = queueHead->next;
109        while (item != queueHead) {
110            if (item->fd == fd)
111                item->fd = FD_INVALID;
112
113            dupes = item->duplicates;
114            while (dupes) {
115                if (dupes->fd == fd)
116                    dupes->fd = FD_INVALID;
117                dupes = dupes->duplicates;
118            }
119            item = item->next;
120        }
121    }
122
123    pthread_mutex_unlock(&qLock);
124}
125
126
127static int calcHashKey(struct item *item) {
128    uint64_t xmlnameHash = 0;
129    uint64_t key;
130    for (int i = 0; item->req.xmlname[i] != 0; i++) {
131        xmlnameHash += item->req.xmlname[i];
132    }
133    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
134    return key % hashidxSize;
135}
136
137void insert_item_idx(struct item *item) {
138    struct item_idx * nextItem;
139    struct item_idx * prevItem;
140
141    int key = calcHashKey(item);
142
143    if (item_hashidx[key].item == NULL) {
144        item_hashidx[key].item = item;
145    } else {
146        prevItem = &(item_hashidx[key]);
147        nextItem = item_hashidx[key].next;
148        while(nextItem) {
149            prevItem = nextItem;
150            nextItem = nextItem->next;
151        }
152        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
153        nextItem->item = item;
154        nextItem->next = NULL;
155        prevItem->next = nextItem;
156    }
157}
158
159void remove_item_idx(struct item * item) {
160    int key = calcHashKey(item);
161    struct item_idx * nextItem;
162    struct item_idx * prevItem;
163    struct item * test;
164    if (item_hashidx[key].item == NULL) {
165        //item not in index;
166        return;
167    }
168    prevItem = &(item_hashidx[key]);
169    nextItem = &(item_hashidx[key]);
170
171    while (nextItem != NULL) {
172        test = nextItem->item;
173        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
174                == test->req.z) && (!strcmp(item->req.xmlname,
175                test->req.xmlname))) {
176            /*
177             * Found item, removing it from list
178             */
179            nextItem->item = NULL;
180            if (nextItem->next != NULL) {
181                if (nextItem == &(item_hashidx[key])) {
182                    prevItem = nextItem->next;
183                    memcpy(&(item_hashidx[key]), nextItem->next,
184                            sizeof(struct item_idx));
185                    free(prevItem);
186                } else {
187                    prevItem->next = nextItem->next;
188                }
189            } else {
190                prevItem->next = NULL;
191            }
192
193            if (nextItem != &(item_hashidx[key])) {
194                free(nextItem);
195            }
196            return;
197        } else {
198            prevItem = nextItem;
199            nextItem = nextItem->next;
200        }
201    }
202}
203
204struct item * lookup_item_idx(struct item * item) {
205    struct item_idx * nextItem;
206    struct item * test;
207
208    int key = calcHashKey(item);
209
210    if (item_hashidx[key].item == NULL) {
211        return NULL;
212    } else {
213        nextItem = &(item_hashidx[key]);
214        while (nextItem != NULL) {
215            test = nextItem->item;
216            if ((item->mx == test->mx) && (item->my == test->my)
217                    && (item->req.z == test->req.z) && (!strcmp(
218                    item->req.xmlname, test->req.xmlname))) {
219                return test;
220            } else {
221                nextItem = nextItem->next;
222            }
223        }
224    }
225    return NULL;
226}
227
228static inline const char *cmdStr(enum protoCmd c)
229{
230    switch (c) {
231        case cmdIgnore:  return "Ignore";
232        case cmdRender:  return "Render";
233        case cmdRenderPrio:  return "RenderPrio";
234        case cmdRenderBulk:  return "RenderBulk";
235        case cmdDirty:   return "Dirty";
236        case cmdDone:    return "Done";
237        case cmdNotDone: return "NotDone";
238        default:         return "unknown";
239    }
240}
241
242void send_response(struct item *item, enum protoCmd rsp)
243{
244    struct protocol *req = &item->req;
245    int ret;
246
247    pthread_mutex_lock(&qLock);
248    item->next->prev = item->prev;
249    item->prev->next = item->next;
250    remove_item_idx(item);
251    pthread_mutex_unlock(&qLock);
252
253    while (item) {
254        struct item *prev = item;
255        req = &item->req;
256        if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
257            req->cmd = rsp;
258            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
259            ret = send(item->fd, req, sizeof(*req), 0);
260            if (ret != sizeof(*req))
261                perror("send error during send_done");
262        }
263        item = item->duplicates;
264        free(prev);
265    }
266}
267
268
269enum protoCmd pending(struct item *test)
270{
271    // check all queues and render list to see if this request already queued
272    // If so, add this new request as a duplicate
273    // call with qLock held
274    struct item *item;
275
276    item = lookup_item_idx(test);
277    if (item != NULL) {
278        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
279            test->duplicates = item->duplicates;
280            item->duplicates = test;
281            test->inQueue = queueDuplicate;
282            return cmdIgnore;
283        } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
284            return cmdNotDone;
285        }
286    }
287
288    return cmdRender;
289}
290
291
292enum protoCmd rx_request(const struct protocol *req, int fd)
293{
294    struct protocol *reqnew;
295    struct item *list = NULL, *item;
296    enum protoCmd pend;
297
298    // Upgrade version 1 to version 2
299    if (req->ver == 1) {
300        reqnew = (struct protocol *)malloc(sizeof(protocol));
301        memcpy(reqnew, req, sizeof(protocol_v1));
302        reqnew->xmlname[0] = 0;
303        req = reqnew;
304    }
305    else if (req->ver != 2) {
306        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
307        return cmdIgnore;
308    }
309
310    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
311            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
312
313    if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
314        return cmdIgnore;
315
316    if (check_xyz(req->x, req->y, req->z))
317        return cmdNotDone;
318
319    item = (struct item *)malloc(sizeof(*item));
320    if (!item) {
321            syslog(LOG_ERR, "malloc failed");
322            return cmdNotDone;
323    }
324
325    item->req = *req;
326    item->duplicates = NULL;
327    item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
328
329#ifdef METATILE
330    /* Round down request co-ordinates to the neareast N (should be a power of 2)
331     * Note: request path is no longer consistent but this will be recalculated
332     * when the metatile is being rendered.
333     */
334    item->mx = item->req.x & ~(METATILE-1);
335    item->my = item->req.y & ~(METATILE-1);
336#else
337    item->mx = item->req.x;
338    item->my = item->req.y;
339#endif
340
341    pthread_mutex_lock(&qLock);
342
343    // Check for a matching request in the current rendering or dirty queues
344    pend = pending(item);
345    if (pend == cmdNotDone) {
346        // We found a match in the dirty queue, can not wait for it
347        pthread_mutex_unlock(&qLock);
348        free(item);
349        return cmdNotDone;
350    }
351    if (pend == cmdIgnore) {
352        // Found a match in render queue, item added as duplicate
353        pthread_mutex_unlock(&qLock);
354        return cmdIgnore;
355    }
356
357    // New request, add it to render or dirty queue
358    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
359        list = &reqHead;
360        item->inQueue = queueRequest;
361        reqNum++;
362    } else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
363        list = &reqPrioHead;
364        item->inQueue = queueRequestPrio;
365        reqPrioNum++;
366    } else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
367        list = &reqBulkHead;
368        item->inQueue = queueRequestBulk;
369        reqBulkNum++;
370    } else if (dirtyNum < DIRTY_LIMIT) {
371        list = &dirtyHead;
372        item->inQueue = queueDirty;
373        dirtyNum++;
374        item->fd = FD_INVALID; // No response after render
375    } else {
376        // The queue is severely backlogged. Drop request
377        stats.noReqDroped++;
378        pthread_mutex_unlock(&qLock);
379        free(item);
380        return cmdNotDone;
381    }
382
383    if (list) {
384        item->next = list;
385        item->prev = list->prev;
386        item->prev->next = item;
387        list->prev = item;
388        /* In addition to the linked list, add item to a hash table index
389         * for faster lookup of pending requests.
390         */
391        insert_item_idx(item);
392
393        pthread_cond_signal(&qCond);
394    } else
395        free(item);
396
397    pthread_mutex_unlock(&qLock);
398
399    return (list == &reqHead)?cmdIgnore:cmdNotDone;
400}
401
402
403void process_loop(int listen_fd)
404{
405    int num_connections = 0;
406    int connections[MAX_CONNECTIONS];
407
408    bzero(connections, sizeof(connections));
409
410    while (1) {
411        struct sockaddr_un in_addr;
412        socklen_t in_addrlen = sizeof(in_addr);
413        fd_set rd;
414        int incoming, num, nfds, i;
415
416        FD_ZERO(&rd);
417        FD_SET(listen_fd, &rd);
418        nfds = listen_fd+1;
419
420        for (i=0; i<num_connections; i++) {
421            FD_SET(connections[i], &rd);
422            nfds = MAX(nfds, connections[i]+1);
423        }
424
425        num = select(nfds, &rd, NULL, NULL, NULL);
426        if (num == -1)
427            perror("select()");
428        else if (num) {
429            //printf("Data is available now on %d fds\n", num);
430            if (FD_ISSET(listen_fd, &rd)) {
431                num--;
432                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
433                if (incoming < 0) {
434                    perror("accept()");
435                } else {
436                    if (num_connections == MAX_CONNECTIONS) {
437                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
438                        close(incoming);
439                    } else {
440                        connections[num_connections++] = incoming;
441                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
442                    }
443                }
444            }
445            for (i=0; num && (i<num_connections); i++) {
446                int fd = connections[i];
447                if (FD_ISSET(fd, &rd)) {
448                    struct protocol cmd;
449                    int ret;
450
451                    // TODO: to get highest performance we should loop here until we get EAGAIN
452                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
453                    if (ret == sizeof(cmd)) {
454                        enum protoCmd rsp = rx_request(&cmd, fd);
455
456                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
457                            cmd.cmd = rsp;
458                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
459                            ret = send(fd, &cmd, sizeof(cmd), 0);
460                            if (ret != sizeof(cmd))
461                                perror("response send error");
462                        }
463                    } else if (!ret) {
464                        int j;
465
466                        num_connections--;
467                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
468                        for (j=i; j < num_connections; j++)
469                            connections[j] = connections[j+1];
470                        clear_requests(fd);
471                        close(fd);
472                    } else {
473                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
474                        break;
475                    }
476                }
477            }
478        } else {
479            syslog(LOG_ERR, "Select timeout");
480        }
481    }
482}
483
484/**
485 * Periodically write out current stats to a stats file. This information
486 * can then be used to monitor performance of renderd e.g. with a munin plugin
487 */
488void *stats_writeout_thread(void * arg) {
489    stats_struct lStats;
490    int dirtQueueLength;
491    int reqQueueLength;
492    int reqPrioQueueLength;
493    int reqBulkQueueLength;
494
495    int noFailedAttempts = 0;
496    char tmpName[PATH_MAX];
497
498    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
499
500    syslog(LOG_DEBUG, "Starting stats thread");
501    while (1) {
502        pthread_mutex_lock(&qLock);
503        memcpy(&lStats, &stats, sizeof(stats_struct));
504        dirtQueueLength = dirtyNum;
505        reqQueueLength = reqNum;
506        reqPrioQueueLength = reqPrioNum;
507        reqBulkQueueLength = reqBulkNum;
508        pthread_mutex_unlock(&qLock);
509
510        FILE * statfile = fopen(tmpName, "w");
511        if (statfile == NULL) {
512            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
513            noFailedAttempts++;
514            if (noFailedAttempts > 3) {
515                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
516                break;
517            }
518            continue;
519        } else {
520            noFailedAttempts = 0;
521            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
522            fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
523            fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
524            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
525            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
526            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
527            fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
528            fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
529            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
530            fclose(statfile);
531            if (rename(tmpName, config.stats_filename)) {
532                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
533                noFailedAttempts++;
534                if (noFailedAttempts > 3) {
535                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
536                    break;
537                }
538                continue;
539            }
540        }
541        sleep(10);
542    }
543    return NULL;
544}
545
546int client_socket_init(renderd_config * sConfig) {
547    int fd;
548    struct sockaddr_un * addrU;
549    struct sockaddr_in * addrI;
550    struct hostent *server;
551    if (sConfig->ipport > 0) {
552        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i",
553                sConfig->iphostname, sConfig->ipport);
554        addrI = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in));
555        fd = socket(PF_INET, SOCK_STREAM, 0);
556        server = gethostbyname(sConfig->iphostname);
557        if (server == NULL) {
558            syslog(LOG_WARNING, "Could not resolve hostname: %s",
559                    sConfig->iphostname);
560            return FD_INVALID;
561        }
562        bzero((char *) addrI, sizeof(struct sockaddr_in));
563        addrI->sin_family = AF_INET;
564        bcopy((char *) server->h_addr, (char *) &addrI->sin_addr.s_addr,
565                server->h_length);
566        addrI->sin_port = htons(sConfig->ipport);
567        if (connect(fd, (struct sockaddr *) addrI, sizeof(struct sockaddr_in)) < 0) {
568            syslog(LOG_WARNING, "Could not connect to %s:%i",
569                    sConfig->iphostname, sConfig->ipport);
570            return FD_INVALID;
571        }
572        free(addrI);
573        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport,
574                fd);
575    } else {
576        syslog(LOG_INFO, "Initialising unix client socket on %s",
577                sConfig->socketname);
578        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
579        fd = socket(PF_UNIX, SOCK_STREAM, 0);
580        if (fd < 0) {
581            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
582            return FD_INVALID;
583        }
584
585        bzero(addrU, sizeof(struct sockaddr_un));
586        addrU->sun_family = AF_UNIX;
587        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
588
589        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
590            syslog(LOG_WARNING, "socket connect failed for: %s",
591                    sConfig->socketname);
592            close(fd);
593            return FD_INVALID;
594        }
595        free(addrU);
596        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
597                fd);
598    }
599    return fd;
600}
601
602int server_socket_init(renderd_config *sConfig) {
603    struct sockaddr_un addrU;
604    struct sockaddr_in addrI;
605    mode_t old;
606    int fd;
607
608    if (sConfig->ipport > 0) {
609        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
610                sConfig->iphostname, sConfig->ipport);
611        fd = socket(PF_INET, SOCK_STREAM, 0);
612        if (fd < 0) {
613            fprintf(stderr, "failed to create IP socket\n");
614            exit(2);
615        }
616        bzero(&addrI, sizeof(addrI));
617        addrI.sin_family = AF_INET;
618        addrI.sin_addr.s_addr = INADDR_ANY;
619        addrI.sin_port = htons(sConfig->ipport);
620        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
621            fprintf(stderr, "socket bind failed for: %s:%i\n",
622                    sConfig->iphostname, sConfig->ipport);
623            close(fd);
624            exit(3);
625        }
626    } else {
627        syslog(LOG_INFO, "Initialising unix server socket on %s",
628                sConfig->socketname);
629
630        fd = socket(PF_UNIX, SOCK_STREAM, 0);
631        if (fd < 0) {
632            fprintf(stderr, "failed to create unix socket\n");
633            exit(2);
634        }
635
636        bzero(&addrU, sizeof(addrU));
637        addrU.sun_family = AF_UNIX;
638        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
639
640        unlink(addrU.sun_path);
641
642        old = umask(0); // Need daemon socket to be writeable by apache
643        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
644            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
645            close(fd);
646            exit(3);
647        }
648        umask(old);
649    }
650
651    if (listen(fd, QUEUE_MAX) < 0) {
652        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
653        close(fd);
654        exit(4);
655    }
656
657    syslog(LOG_DEBUG, "Created server socket %i", fd);
658
659    return fd;
660
661}
662
663/**
664 * This function is used as a the start function for the slave renderer thread.
665 * It pulls a request from the central queue of requests and dispatches it to
666 * the slave renderer. It then blocks and waits for the response with no timeout.
667 * As it only sends one request at a time (there are as many slave_thread threads as there
668 * are rendering threads on the slaves) nothing gets queued on the slave and should get
669 * rendererd immediately. Thus overall, requests should be nicely load balanced between
670 * all the rendering threads available both locally and in the slaves.
671 */
672void *slave_thread(void * arg) {
673    renderd_config * sConfig = (renderd_config *) arg;
674
675    int pfd = FD_INVALID;
676    int retry;
677    size_t ret_size;
678
679    struct protocol * resp;
680    struct protocol * req_slave;
681
682    req_slave = (struct protocol *)malloc(sizeof(protocol));
683    resp = (struct protocol *)malloc(sizeof(protocol));
684    bzero(req_slave, sizeof(struct protocol));
685    bzero(resp, sizeof(struct protocol));
686
687    while (1) {
688        if (pfd == FD_INVALID) {
689            pfd = client_socket_init(sConfig);
690            if (pfd == FD_INVALID) {
691                if (sConfig->ipport > 0) {
692                    syslog(LOG_ERR,
693                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
694                        sConfig->iphostname, sConfig->ipport);
695                } else {
696                    syslog( LOG_ERR,
697                            "Failed to connect to render slave %s, trying again in 30 seconds",
698                            sConfig->socketname);
699                }
700                sleep(30);
701                continue;
702            }
703        }
704
705        enum protoCmd ret;
706        struct item *item = fetch_request();
707        if (item) {
708            struct protocol *req = &item->req;
709            req_slave->ver = PROTO_VER;
710            req_slave->cmd = cmdRender;
711            strcpy(req_slave->xmlname, req->xmlname);
712            req_slave->x = req->x;
713            req_slave->y = req->y;
714            req_slave->z = req->z;
715
716            /*Dispatch request to slave renderd*/
717            retry = 2;
718            syslog(LOG_INFO,
719                    "Dispatching request to slave thread on fd %i", pfd);
720            do {
721                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
722
723                if (ret_size == sizeof(struct protocol)) {
724                    //correctly sent command to slave
725                    break;
726                }
727
728                if (errno != EPIPE) {
729                    syslog(LOG_ERR,
730                            "Failed to send cmd to render slave, shutting down thread");
731                    return NULL;
732                }
733
734                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
735                close(pfd);
736                pfd = client_socket_init(sConfig);
737                if (pfd == FD_INVALID) {
738                    syslog(LOG_ERR,
739                            "Failed to re-connect to render slave, dropping request");
740                    ret = cmdNotDone;
741                    send_response(item, ret);
742                    break;
743                }
744            } while (retry--);
745            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
746                continue;
747            }
748
749            ret_size = 0;
750            retry = 10;
751            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
752                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
753                        - ret_size), 0);
754                if ((errno == EPIPE) || ret_size == 0) {
755                    close(pfd);
756                    pfd = FD_INVALID;
757                    ret_size = 0;
758                    syslog(LOG_ERR, "Pipe to render slave closed");
759                    break;
760                }
761                retry--;
762            }
763            if (ret_size < sizeof(struct protocol)) {
764                if (sConfig->ipport > 0) {
765                    syslog( LOG_ERR,
766                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
767                            sConfig->iphostname, sConfig->ipport);
768                } else {
769                    syslog( LOG_ERR,
770                            "Invalid reply render slave %s, trying again in 30 seconds",
771                            sConfig->socketname);
772                }
773
774                ret = cmdNotDone;
775                send_response(item, ret);
776                sleep(30);
777            } else {
778                ret = resp->cmd;
779                send_response(item, ret);
780                if (resp->cmd != cmdDone) {
781                    if (sConfig->ipport > 0) {
782                        syslog( LOG_ERR,
783                                "Request from render slave %s:%i did not complete correctly",
784                                sConfig->iphostname, sConfig->ipport);
785                    } else {
786                        syslog( LOG_ERR,
787                                "Request from render slave %s did not complete correctly",
788                                sConfig->socketname);
789                    }
790                    //Sleep for a while to make sure we don't overload the renderer
791                    //This only happens if it didn't correctly block on the rendering
792                    //request
793                    sleep(30);
794                }
795            }
796
797        } else {
798            sleep(1); // TODO: Use an event to indicate there are new requests
799        }
800    }
801    return NULL;
802}
803
804
805int main(int argc, char **argv)
806{
807    int fd, i, j, k;
808
809    int c;
810    int foreground=0;
811    int active_slave=0;
812    int log_options;
813    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
814
815    while (1) {
816        int option_index = 0;
817        static struct option long_options[] = {
818            {"config", 1, 0, 'c'},
819            {"foreground", 1, 0, 'f'},
820            {"slave", 1, 0, 's'},
821            {"help", 0, 0, 'h'},
822            {0, 0, 0, 0}
823        };
824
825        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
826        if (c == -1)
827            break;
828
829        switch (c) {
830            case 'f':
831                foreground=1;
832                break;
833            case 'c':
834                strncpy(config_file_name, optarg, PATH_MAX-1);
835                config_file_name[PATH_MAX-1] = 0;
836                break;
837            case 's':
838                if (sscanf(optarg, "%i", &active_slave) != 1) {
839                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
840                    active_slave = 0;
841                }
842                break;
843            case 'h':
844                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
845                fprintf(stderr, "Mapnik rendering daemon\n");
846                fprintf(stderr, "  -f, --foreground     run in foreground\n");
847                fprintf(stderr, "  -h, --help           display this help and exit\n");
848                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
849                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
850                exit(0);
851            default:
852                fprintf(stderr, "unknown config option '%c'\n", c);
853                exit(1);
854        }
855    }
856
857    log_options = LOG_PID;
858#ifdef LOG_PERROR
859    if (foreground)
860        log_options |= LOG_PERROR;
861#endif
862    openlog("renderd", log_options, LOG_DAEMON);
863
864    syslog(LOG_INFO, "Rendering daemon started");
865
866    pthread_mutex_init(&qLock, NULL);
867    pthread_cond_init(&qCond, NULL);
868    reqHead.next = reqHead.prev = &reqHead;
869    reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
870    reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
871    dirtyHead.next = dirtyHead.prev = &dirtyHead;
872    renderHead.next = renderHead.prev = &renderHead;
873    hashidxSize = HASHIDX_SIZE;
874    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
875    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
876
877    stats.noDirtyRender = 0;
878    stats.noReqDroped = 0;
879    stats.noReqRender = 0;
880    stats.noReqPrioRender = 0;
881    stats.noReqBulkRender = 0;
882
883    xmlconfigitem maps[XMLCONFIGS_MAX];
884    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
885
886    renderd_config config_slaves[MAX_SLAVES];
887    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
888    bzero(&config, sizeof(renderd_config));
889
890    dictionary *ini = iniparser_load(config_file_name);
891    if (! ini) {
892        exit(1);
893    }
894
895    noSlaveRenders = 0;
896
897    int iconf = -1;
898    char buffer[PATH_MAX];
899    for (int section=0; section < iniparser_getnsec(ini); section++) {
900        char *name = iniparser_getsecname(ini, section);
901        syslog(LOG_INFO, "Parsing section %s\n", name);
902        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
903            if (config.tile_dir == NULL) {
904                fprintf(stderr, "No valid (active) renderd config section available\n");
905                exit(7);
906            }
907            /* this is a map section */
908            iconf++;
909            if (strlen(name) >= XMLCONFIG_MAX) {
910                fprintf(stderr, "XML name too long: %s\n", name);
911                exit(7);
912            }
913
914            strcpy(maps[iconf].xmlname, name);
915            if (iconf >= XMLCONFIGS_MAX) {
916                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
917                exit(7);
918            }
919            sprintf(buffer, "%s:uri", name);
920            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
921            if (strlen(ini_uri) >= PATH_MAX) {
922                fprintf(stderr, "URI too long: %s\n", ini_uri);
923                exit(7);
924            }
925            strcpy(maps[iconf].xmluri, ini_uri);
926            sprintf(buffer, "%s:xml", name);
927            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
928            if (strlen(ini_xmlpath) >= PATH_MAX){
929                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
930                exit(7);
931            }
932            sprintf(buffer, "%s:host", name);
933            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
934            if (strlen(ini_hostname) >= PATH_MAX) {
935                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
936                exit(7);
937            }
938
939            sprintf(buffer, "%s:htcphost", name);
940            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
941            if (strlen(ini_htcpip) >= PATH_MAX) {
942                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
943                exit(7);
944            }
945            strcpy(maps[iconf].xmlfile, ini_xmlpath);
946            strcpy(maps[iconf].tile_dir, config.tile_dir);
947            strcpy(maps[iconf].host, ini_hostname);
948            strcpy(maps[iconf].htcpip, ini_htcpip);
949        } else if (strncmp(name, "renderd", 7) == 0) {
950            int render_sec = 0;
951            if (sscanf(name, "renderd%i", &render_sec) != 1) {
952                render_sec = 0;
953            }
954            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
955            if (render_sec >= MAX_SLAVES) {
956                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
957                        MAX_SLAVES);
958                exit(7);
959            }
960            sprintf(buffer, "%s:socketname", name);
961            config_slaves[render_sec].socketname = iniparser_getstring(ini,
962                    buffer, (char *) RENDER_SOCKET);
963            sprintf(buffer, "%s:iphostname", name);
964            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
965                    buffer, "");
966            sprintf(buffer, "%s:ipport", name);
967            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
968            sprintf(buffer, "%s:num_threads", name);
969            config_slaves[render_sec].num_threads = iniparser_getint(ini,
970                    buffer, NUM_THREADS);
971            sprintf(buffer, "%s:tile_dir", name);
972            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
973                    buffer, (char *) HASH_PATH);
974            sprintf(buffer, "%s:stats_file", name);
975            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
976                    buffer, "NULL");
977
978            if (render_sec == active_slave) {
979                config.socketname = config_slaves[render_sec].socketname;
980                config.iphostname = config_slaves[render_sec].iphostname;
981                config.ipport = config_slaves[render_sec].ipport;
982                config.num_threads = config_slaves[render_sec].num_threads;
983                config.tile_dir = config_slaves[render_sec].tile_dir;
984                config.stats_filename
985                        = config_slaves[render_sec].stats_filename;
986                config.mapnik_plugins_dir = iniparser_getstring(ini,
987                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
988                config.mapnik_font_dir = iniparser_getstring(ini,
989                        "mapnik:font_dir", (char *) FONT_DIR);
990                config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
991                        "mapnik:font_dir_recurse", FONT_RECURSE);
992            } else {
993                noSlaveRenders += config_slaves[render_sec].num_threads;
994            }
995        }
996    }
997
998    if (config.ipport > 0) {
999        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
1000    } else {
1001        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
1002    }
1003    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
1004    if (active_slave == 0) {
1005        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
1006    }
1007    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
1008    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
1009    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
1010    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
1011    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
1012    for (int i = 0; i < MAX_SLAVES; i++) {
1013        if (config_slaves[i].num_threads == 0) {
1014            continue;
1015        }
1016        if (i == active_slave) {
1017            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
1018        }
1019        if (config_slaves[i].ipport > 0) {
1020                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
1021                        config_slaves[i].iphostname, config_slaves[i].ipport);
1022            } else {
1023                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
1024                        config_slaves[i].socketname);
1025            }
1026        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1027                config_slaves[i].num_threads);
1028        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1029                config_slaves[i].tile_dir);
1030        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1031                config_slaves[i].stats_filename);
1032    }
1033
1034    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1035        if (maps[iconf].xmlname[0] != 0) {
1036         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1037                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1038                 maps[iconf].htcpip, maps[iconf].host);
1039        }
1040    }
1041
1042    fd = server_socket_init(&config);
1043#if 0
1044    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1045        fprintf(stderr, "setting socket non-block failed\n");
1046        close(fd);
1047        exit(5);
1048    }
1049#endif
1050
1051    //sigPipeAction.sa_handler = pipe_handler;
1052    sigPipeAction.sa_handler = SIG_IGN;
1053    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1054        fprintf(stderr, "failed to register signal handler\n");
1055        close(fd);
1056        exit(6);
1057    }
1058
1059    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1060
1061    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1062    if (foreground) {
1063        fprintf(stderr, "Running in foreground mode...\n");
1064    } else {
1065        if (daemon(0, 0) != 0) {
1066            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1067        }
1068        /* write pid file */
1069        FILE *pidfile = fopen(PIDFILE, "w");
1070        if (pidfile) {
1071            (void) fprintf(pidfile, "%d\n", getpid());
1072            (void) fclose(pidfile);
1073        }
1074    }
1075
1076    if (config.stats_filename != NULL) {
1077        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1078            syslog(LOG_WARNING, "Could not create stats writeout thread");
1079        }
1080    } else {
1081        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1082    }
1083
1084    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1085
1086    for(i=0; i<config.num_threads; i++) {
1087        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1088            fprintf(stderr, "error spawning render thread\n");
1089            close(fd);
1090            exit(7);
1091        }
1092    }
1093
1094    if (active_slave == 0) {
1095        //Only the master renderd opens connections to its slaves
1096        k = 0;
1097        slave_threads
1098                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1099        for (i = 1; i < MAX_SLAVES; i++) {
1100            for (j = 0; j < config_slaves[i].num_threads; j++) {
1101                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1102                        (void *) &config_slaves[i])) {
1103                    fprintf(stderr, "error spawning render thread\n");
1104                    close(fd);
1105                    exit(7);
1106                }
1107            }
1108        }
1109    }
1110
1111    process_loop(fd);
1112
1113    unlink(config.socketname);
1114    close(fd);
1115    return 0;
1116}
Note: See TracBrowser for help on using the repository browser.