source: subversion/applications/utils/mod_tile/daemon.c @ 17439

Last change on this file since 17439 was 17439, checked in by apmon, 10 years ago

[ renderd ] Bug fix 32 bit / 64 bit issue

When calculating the the intermediate hashkey I incorrectly assumed long was 64 bit. This meant that the hashkey could turn negative
which in turn would cause a segfault on array access.

Thanks to Richard Ive for spotting and debugging this error.

File size: 36.3 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h>
9#include <sys/select.h>
10#include <sys/stat.h>
11#include <sys/un.h>
12#include <poll.h>
13#include <errno.h>
14#include <pthread.h>
15#include <signal.h>
16#include <string.h>
17#include <syslog.h>
18#include <getopt.h>
19
20#include "daemon.h"
21#include "gen_tile.h"
22#include "protocol.h"
23#include "render_config.h"
24#include "dir_utils.h"
25
26#define PIDFILE "/var/run/renderd/renderd.pid"
27
28extern "C" {
29#include "iniparser3.0b/src/iniparser.h"
30}
31
32static pthread_t *render_threads;
33static pthread_t *slave_threads;
34static struct sigaction sigPipeAction;
35
36static struct item reqHead, dirtyHead, renderHead;
37static struct item_idx * item_hashidx;
38static int reqNum, dirtyNum;
39static pthread_mutex_t qLock;
40static pthread_cond_t qCond;
41
42static stats_struct stats;
43static pthread_t stats_thread;
44
45static renderd_config config;
46
47int noSlaveRenders;
48int hashidxSize;
49
50struct item *fetch_request(void)
51{
52    struct item *item = NULL;
53
54    pthread_mutex_lock(&qLock);
55
56    while (reqNum == 0 && dirtyNum == 0) {
57        pthread_cond_wait(&qCond, &qLock);
58    }
59    if (reqNum) {
60        item = reqHead.next;
61        reqNum--;
62        stats.noReqRender++;
63    } else if (dirtyNum) {
64        item = dirtyHead.next;
65        dirtyNum--;
66        stats.noDirtyRender++;
67    }
68    if (item) {
69        item->next->prev = item->prev;
70        item->prev->next = item->next;
71
72        item->prev = &renderHead;
73        item->next = renderHead.next;
74        renderHead.next->prev = item;
75        renderHead.next = item;
76        item->inQueue = queueRender;
77    }
78
79    pthread_mutex_unlock(&qLock);
80
81    return item;
82}
83
84void clear_requests(int fd)
85{
86    struct item *item, *dupes;
87
88    /**Only need to look up on the shorter request and render queue
89      * so using the linear list shouldn't be a problem
90      */
91    pthread_mutex_lock(&qLock);
92    item = reqHead.next;
93    while (item != &reqHead) {
94        if (item->fd == fd)
95            item->fd = FD_INVALID;
96
97        dupes = item->duplicates;
98        while (dupes) {
99            if (dupes->fd == fd)
100                dupes->fd = FD_INVALID;
101            dupes = dupes->duplicates;
102        }
103        item = item->next;
104    }
105
106    item = renderHead.next;
107    while (item != &renderHead) {
108        if (item->fd == fd)
109            item->fd = FD_INVALID;
110
111        dupes = item->duplicates;
112        while (dupes) {
113            if (dupes->fd == fd)
114                dupes->fd = FD_INVALID;
115            dupes = dupes->duplicates;
116        }
117        item = item->next;
118    }
119
120    pthread_mutex_unlock(&qLock);
121}
122
123
124static int calcHashKey(struct item *item) {
125    uint64_t xmlnameHash = 0;
126    uint64_t key;
127    for (int i = 0; item->req.xmlname[i] != 0; i++) {
128        xmlnameHash += item->req.xmlname[i];
129    }
130    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
131    return key % hashidxSize;
132}
133
134void insert_item_idx(struct item *item) {
135    struct item_idx * nextItem;
136    struct item_idx * prevItem;
137
138    int key = calcHashKey(item);
139
140    if (item_hashidx[key].item == NULL) {
141        item_hashidx[key].item = item;
142    } else {
143        prevItem = &(item_hashidx[key]);
144        nextItem = item_hashidx[key].next;
145        while(nextItem) {
146            prevItem = nextItem;
147            nextItem = nextItem->next;
148        }
149        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
150        nextItem->item = item;
151        nextItem->next = NULL;
152        prevItem->next = nextItem;
153    }
154}
155
156void remove_item_idx(struct item * item) {
157    int key = calcHashKey(item);
158    struct item_idx * nextItem;
159    struct item_idx * prevItem;
160    struct item * test;
161    if (item_hashidx[key].item == NULL) {
162        //item not in index;
163        return;
164    }
165    prevItem = &(item_hashidx[key]);
166    nextItem = &(item_hashidx[key]);
167
168    while (nextItem != NULL) {
169        test = nextItem->item;
170        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
171                == test->req.z) && (!strcmp(item->req.xmlname,
172                test->req.xmlname))) {
173            /*
174             * Found item, removing it from list
175             */
176            nextItem->item = NULL;
177            if (nextItem->next != NULL) {
178                if (nextItem == &(item_hashidx[key])) {
179                    prevItem = nextItem->next;
180                    memcpy(&(item_hashidx[key]), nextItem->next,
181                            sizeof(struct item_idx));
182                    free(prevItem);
183                } else {
184                    prevItem->next = nextItem->next;
185                }
186            } else {
187                prevItem->next = NULL;
188            }
189
190            if (nextItem != &(item_hashidx[key])) {
191                free(nextItem);
192            }
193            return;
194        } else {
195            prevItem = nextItem;
196            nextItem = nextItem->next;
197        }
198    }
199}
200
201struct item * lookup_item_idx(struct item * item) {
202    struct item_idx * nextItem;
203    struct item * test;
204
205    int key = calcHashKey(item);
206
207    if (item_hashidx[key].item == NULL) {
208        return NULL;
209    } else {
210        nextItem = &(item_hashidx[key]);
211        while (nextItem != NULL) {
212            test = nextItem->item;
213            if ((item->mx == test->mx) && (item->my == test->my)
214                    && (item->req.z == test->req.z) && (!strcmp(
215                    item->req.xmlname, test->req.xmlname))) {
216                return test;
217            } else {
218                nextItem = nextItem->next;
219            }
220        }
221    }
222    return NULL;
223}
224
225static inline const char *cmdStr(enum protoCmd c)
226{
227    switch (c) {
228        case cmdIgnore:  return "Ignore";
229        case cmdRender:  return "Render";
230        case cmdDirty:   return "Dirty";
231        case cmdDone:    return "Done";
232        case cmdNotDone: return "NotDone";
233        default:         return "unknown";
234    }
235}
236
237void send_response(struct item *item, enum protoCmd rsp)
238{
239    struct protocol *req = &item->req;
240    int ret;
241
242    pthread_mutex_lock(&qLock);
243    item->next->prev = item->prev;
244    item->prev->next = item->next;
245    remove_item_idx(item);
246    pthread_mutex_unlock(&qLock);
247
248    while (item) {
249        struct item *prev = item;
250        req = &item->req;
251        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
252            req->cmd = rsp;
253            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
254            ret = send(item->fd, req, sizeof(*req), 0);
255            if (ret != sizeof(*req))
256                perror("send error during send_done");
257        }
258        item = item->duplicates;
259        free(prev);
260    }
261}
262
263
264enum protoCmd pending(struct item *test)
265{
266    // check all queues and render list to see if this request already queued
267    // If so, add this new request as a duplicate
268    // call with qLock held
269    struct item *item;
270
271    item = lookup_item_idx(test);
272    if (item != NULL) {
273        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest)) {
274            test->duplicates = item->duplicates;
275            item->duplicates = test;
276            test->inQueue = queueDuplicate;
277            return cmdIgnore;
278        } else if (item->inQueue == queueDirty) {
279            return cmdNotDone;
280        }
281    }
282
283    return cmdRender;
284}
285
286
287enum protoCmd rx_request(const struct protocol *req, int fd)
288{
289    struct protocol *reqnew;
290    struct item *list = NULL, *item;
291    enum protoCmd pend;
292
293    // Upgrade version 1 to version 2
294    if (req->ver == 1) {
295        reqnew = (struct protocol *)malloc(sizeof(protocol));
296        memcpy(reqnew, req, sizeof(protocol_v1));
297        reqnew->xmlname[0] = 0;
298        req = reqnew;
299    }
300    else if (req->ver != 2) {
301        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
302        return cmdIgnore;
303    }
304
305    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
306            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
307
308    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
309        return cmdIgnore;
310
311    if (check_xyz(req->x, req->y, req->z))
312        return cmdNotDone;
313
314    item = (struct item *)malloc(sizeof(*item));
315    if (!item) {
316            syslog(LOG_ERR, "malloc failed");
317            return cmdNotDone;
318    }
319
320    item->req = *req;
321    item->duplicates = NULL;
322    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
323
324#ifdef METATILE
325    /* Round down request co-ordinates to the neareast N (should be a power of 2)
326     * Note: request path is no longer consistent but this will be recalculated
327     * when the metatile is being rendered.
328     */
329    item->mx = item->req.x & ~(METATILE-1);
330    item->my = item->req.y & ~(METATILE-1);
331#else
332    item->mx = item->req.x;
333    item->my = item->req.y;
334#endif
335
336    pthread_mutex_lock(&qLock);
337
338    if (dirtyNum == DIRTY_LIMIT) {
339        // The queue is severely backlogged. Drop request
340        stats.noReqDroped++;
341        pthread_mutex_unlock(&qLock);
342        free(item);
343        return cmdNotDone;
344    }
345
346    // Check for a matching request in the current rendering or dirty queues
347    pend = pending(item);
348    if (pend == cmdNotDone) {
349        // We found a match in the dirty queue, can not wait for it
350        pthread_mutex_unlock(&qLock);
351        free(item);
352        return cmdNotDone;
353    }
354    if (pend == cmdIgnore) {
355        // Found a match in render queue, item added as duplicate
356        pthread_mutex_unlock(&qLock);
357        return cmdIgnore;
358    }
359
360    // New request, add it to render or dirty queue
361    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
362        list = &reqHead;
363        item->inQueue = queueRequest;
364        reqNum++;
365    } else if (dirtyNum < DIRTY_LIMIT) {
366        list = &dirtyHead;
367        item->inQueue = queueDirty;
368        dirtyNum++;
369        item->fd = FD_INVALID; // No response after render
370    }
371
372    if (list) {
373        item->next = list;
374        item->prev = list->prev;
375        item->prev->next = item;
376        list->prev = item;
377        /* In addition to the linked list, add item to a hash table index
378         * for faster lookup of pending requests.
379         */
380        insert_item_idx(item);
381
382        pthread_cond_signal(&qCond);
383    } else
384        free(item);
385
386    pthread_mutex_unlock(&qLock);
387
388    return (list == &reqHead)?cmdIgnore:cmdNotDone;
389}
390
391
392void process_loop(int listen_fd)
393{
394    int num_connections = 0;
395    int connections[MAX_CONNECTIONS];
396
397    bzero(connections, sizeof(connections));
398
399    while (1) {
400        struct sockaddr_un in_addr;
401        socklen_t in_addrlen = sizeof(in_addr);
402        fd_set rd;
403        int incoming, num, nfds, i;
404
405        FD_ZERO(&rd);
406        FD_SET(listen_fd, &rd);
407        nfds = listen_fd+1;
408
409        for (i=0; i<num_connections; i++) {
410            FD_SET(connections[i], &rd);
411            nfds = MAX(nfds, connections[i]+1);
412        }
413
414        num = select(nfds, &rd, NULL, NULL, NULL);
415        if (num == -1)
416            perror("select()");
417        else if (num) {
418            //printf("Data is available now on %d fds\n", num);
419            if (FD_ISSET(listen_fd, &rd)) {
420                num--;
421                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
422                if (incoming < 0) {
423                    perror("accept()");
424                } else {
425                    if (num_connections == MAX_CONNECTIONS) {
426                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
427                        close(incoming);
428                    } else {
429                        connections[num_connections++] = incoming;
430                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
431                    }
432                }
433            }
434            for (i=0; num && (i<num_connections); i++) {
435                int fd = connections[i];
436                if (FD_ISSET(fd, &rd)) {
437                    struct protocol cmd;
438                    int ret;
439
440                    // TODO: to get highest performance we should loop here until we get EAGAIN
441                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
442                    if (ret == sizeof(cmd)) {
443                        enum protoCmd rsp = rx_request(&cmd, fd);
444
445                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
446                            cmd.cmd = rsp;
447                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
448                            ret = send(fd, &cmd, sizeof(cmd), 0);
449                            if (ret != sizeof(cmd))
450                                perror("response send error");
451                        }
452                    } else if (!ret) {
453                        int j;
454
455                        num_connections--;
456                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
457                        for (j=i; j < num_connections; j++)
458                            connections[j] = connections[j+1];
459                        clear_requests(fd);
460                        close(fd);
461                    } else {
462                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
463                        break;
464                    }
465                }
466            }
467        } else {
468            syslog(LOG_ERR, "Select timeout");
469        }
470    }
471}
472
473/**
474 * Periodically write out current stats to a stats file. This information
475 * can then be used to monitor performance of renderd e.g. with a munin plugin
476 */
477void *stats_writeout_thread(void * arg) {
478    stats_struct lStats;
479    int dirtQueueLength;
480    int reqQueueLength;
481    int noFailedAttempts = 0;
482    char tmpName[PATH_MAX];
483
484    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
485
486    syslog(LOG_DEBUG, "Starting stats thread");
487    while (1) {
488        pthread_mutex_lock(&qLock);
489        memcpy(&lStats, &stats, sizeof(stats_struct));
490        dirtQueueLength = dirtyNum;
491        reqQueueLength = reqNum;
492        pthread_mutex_unlock(&qLock);
493
494        FILE * statfile = fopen(tmpName, "w");
495        if (statfile == NULL) {
496            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
497            noFailedAttempts++;
498            if (noFailedAttempts > 3) {
499                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
500                break;
501            }
502            continue;
503        } else {
504            noFailedAttempts = 0;
505            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
506            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
507            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
508            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
509            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
510            fclose(statfile);
511            if (rename(tmpName, config.stats_filename)) {
512                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
513                noFailedAttempts++;
514                if (noFailedAttempts > 3) {
515                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
516                    break;
517                }
518                continue;
519            }
520        }
521        sleep(10);
522    }
523    return NULL;
524}
525
526int client_socket_init(renderd_config * sConfig) {
527    int fd;
528    struct sockaddr_un * addrU;
529    struct sockaddr_in * addrI;
530    struct hostent *server;
531    if (sConfig->ipport > 0) {
532        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i",
533                sConfig->iphostname, sConfig->ipport);
534        addrI = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in));
535        fd = socket(PF_INET, SOCK_STREAM, 0);
536        server = gethostbyname(sConfig->iphostname);
537        if (server == NULL) {
538            syslog(LOG_WARNING, "Could not resolve hostname: %s",
539                    sConfig->iphostname);
540            return FD_INVALID;
541        }
542        bzero((char *) addrI, sizeof(struct sockaddr_in));
543        addrI->sin_family = AF_INET;
544        bcopy((char *) server->h_addr, (char *) &addrI->sin_addr.s_addr,
545                server->h_length);
546        addrI->sin_port = htons(sConfig->ipport);
547        if (connect(fd, (struct sockaddr *) addrI, sizeof(struct sockaddr_in)) < 0) {
548            syslog(LOG_WARNING, "Could not connect to %s:%i",
549                    sConfig->iphostname, sConfig->ipport);
550            return FD_INVALID;
551        }
552        free(addrI);
553        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport,
554                fd);
555    } else {
556        syslog(LOG_INFO, "Initialising unix client socket on %s",
557                sConfig->socketname);
558        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
559        fd = socket(PF_UNIX, SOCK_STREAM, 0);
560        if (fd < 0) {
561            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
562            return FD_INVALID;
563        }
564
565        bzero(addrU, sizeof(struct sockaddr_un));
566        addrU->sun_family = AF_UNIX;
567        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
568
569        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
570            syslog(LOG_WARNING, "socket connect failed for: %s",
571                    sConfig->socketname);
572            close(fd);
573            return FD_INVALID;
574        }
575        free(addrU);
576        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
577                fd);
578    }
579    return fd;
580}
581
582int server_socket_init(renderd_config *sConfig) {
583    struct sockaddr_un addrU;
584    struct sockaddr_in addrI;
585    mode_t old;
586    int fd;
587
588    if (sConfig->ipport > 0) {
589        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
590                sConfig->iphostname, sConfig->ipport);
591        fd = socket(PF_INET, SOCK_STREAM, 0);
592        if (fd < 0) {
593            fprintf(stderr, "failed to create IP socket\n");
594            exit(2);
595        }
596        bzero(&addrI, sizeof(addrI));
597        addrI.sin_family = AF_INET;
598        addrI.sin_addr.s_addr = INADDR_ANY;
599        addrI.sin_port = htons(sConfig->ipport);
600        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
601            fprintf(stderr, "socket bind failed for: %s:%i\n",
602                    sConfig->iphostname, sConfig->ipport);
603            close(fd);
604            exit(3);
605        }
606    } else {
607        syslog(LOG_INFO, "Initialising unix server socket on %s",
608                sConfig->socketname);
609
610        fd = socket(PF_UNIX, SOCK_STREAM, 0);
611        if (fd < 0) {
612            fprintf(stderr, "failed to create unix socket\n");
613            exit(2);
614        }
615
616        bzero(&addrU, sizeof(addrU));
617        addrU.sun_family = AF_UNIX;
618        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
619
620        unlink(addrU.sun_path);
621
622        old = umask(0); // Need daemon socket to be writeable by apache
623        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
624            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
625            close(fd);
626            exit(3);
627        }
628        umask(old);
629    }
630
631    if (listen(fd, QUEUE_MAX) < 0) {
632        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
633        close(fd);
634        exit(4);
635    }
636
637    syslog(LOG_DEBUG, "Created server socket %i", fd);
638
639    return fd;
640
641}
642
643/**
644 * This function is used as a the start function for the slave renderer thread.
645 * It pulls a request from the central queue of requests and dispatches it to
646 * the slave renderer. It then blocks and waits for the response with no timeout.
647 * As it only sends one request at a time (there are as many slave_thread threads as there
648 * are rendering threads on the slaves) nothing gets queued on the slave and should get
649 * rendererd immediately. Thus overall, requests should be nicely load balanced between
650 * all the rendering threads available both locally and in the slaves.
651 */
652void *slave_thread(void * arg) {
653    renderd_config * sConfig = (renderd_config *) arg;
654
655    int pfd = FD_INVALID;
656    int retry;
657    size_t ret_size;
658
659    struct protocol * resp;
660    struct protocol * req_slave;
661
662    req_slave = (struct protocol *)malloc(sizeof(protocol));
663    resp = (struct protocol *)malloc(sizeof(protocol));
664    bzero(req_slave, sizeof(struct protocol));
665    bzero(resp, sizeof(struct protocol));
666
667    while (1) {
668        if (pfd == FD_INVALID) {
669            pfd = client_socket_init(sConfig);
670            if (pfd == FD_INVALID) {
671                if (sConfig->ipport > 0) {
672                    syslog(LOG_ERR,
673                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
674                        sConfig->iphostname, sConfig->ipport);
675                } else {
676                    syslog( LOG_ERR,
677                            "Failed to connect to render slave %s, trying again in 30 seconds",
678                            sConfig->socketname);
679                }
680                sleep(30);
681                continue;
682            }
683        }
684
685        enum protoCmd ret;
686        struct item *item = fetch_request();
687        if (item) {
688            struct protocol *req = &item->req;
689            req_slave->ver = PROTO_VER;
690            req_slave->cmd = cmdRender;
691            strcpy(req_slave->xmlname, req->xmlname);
692            req_slave->x = req->x;
693            req_slave->y = req->y;
694            req_slave->z = req->z;
695
696            /*Dispatch request to slave renderd*/
697            retry = 2;
698            syslog(LOG_INFO,
699                    "Dispatching request to slave thread on fd %i", pfd);
700            do {
701                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
702
703                if (ret_size == sizeof(struct protocol)) {
704                    //correctly sent command to slave
705                    break;
706                }
707
708                if (errno != EPIPE) {
709                    syslog(LOG_ERR,
710                            "Failed to send cmd to render slave, shutting down thread");
711                    return NULL;
712                }
713
714                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
715                close(pfd);
716                pfd = client_socket_init(sConfig);
717                if (pfd == FD_INVALID) {
718                    syslog(LOG_ERR,
719                            "Failed to re-connect to render slave, dropping request");
720                    ret = cmdNotDone;
721                    send_response(item, ret);
722                    break;
723                }
724            } while (retry--);
725            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
726                continue;
727            }
728
729            ret_size = 0;
730            retry = 10;
731            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
732                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
733                        - ret_size), 0);
734                if ((errno == EPIPE) || ret_size == 0) {
735                    close(pfd);
736                    pfd = FD_INVALID;
737                    ret_size = 0;
738                    syslog(LOG_ERR, "Pipe to render slave closed");
739                    break;
740                }
741                retry--;
742            }
743            if (ret_size < sizeof(struct protocol)) {
744                if (sConfig->ipport > 0) {
745                    syslog( LOG_ERR,
746                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
747                            sConfig->iphostname, sConfig->ipport);
748                } else {
749                    syslog( LOG_ERR,
750                            "Invalid reply render slave %s, trying again in 30 seconds",
751                            sConfig->socketname);
752                }
753
754                ret = cmdNotDone;
755                send_response(item, ret);
756                sleep(30);
757            } else {
758                ret = resp->cmd;
759                send_response(item, ret);
760                if (resp->cmd != cmdDone) {
761                    if (sConfig->ipport > 0) {
762                        syslog( LOG_ERR,
763                                "Request from render slave %s:%i did not complete correctly",
764                                sConfig->iphostname, sConfig->ipport);
765                    } else {
766                        syslog( LOG_ERR,
767                                "Request from render slave %s did not complete correctly",
768                                sConfig->socketname);
769                    }
770                    //Sleep for a while to make sure we don't overload the renderer
771                    //This only happens if it didn't correctly block on the rendering
772                    //request
773                    sleep(30);
774                }
775            }
776
777        } else {
778            sleep(1); // TODO: Use an event to indicate there are new requests
779        }
780    }
781    return NULL;
782}
783
784
785int main(int argc, char **argv)
786{
787    int fd, i, j, k;
788
789    int c;
790    int foreground=0;
791    int active_slave=0;
792    int log_options;
793    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
794
795    while (1) {
796        int option_index = 0;
797        static struct option long_options[] = {
798            {"config", 1, 0, 'c'},
799            {"foreground", 1, 0, 'f'},
800            {"slave", 1, 0, 's'},
801            {"help", 0, 0, 'h'},
802            {0, 0, 0, 0}
803        };
804
805        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
806        if (c == -1)
807            break;
808
809        switch (c) {
810            case 'f':
811                foreground=1;
812                break;
813            case 'c':
814                strncpy(config_file_name, optarg, PATH_MAX-1);
815                config_file_name[PATH_MAX-1] = 0;
816                break;
817            case 's':
818                if (sscanf(optarg, "%i", &active_slave) != 1) {
819                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
820                    active_slave = 0;
821                }
822                break;
823            case 'h':
824                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
825                fprintf(stderr, "Mapnik rendering daemon\n");
826                fprintf(stderr, "  -f, --foreground     run in foreground\n");
827                fprintf(stderr, "  -h, --help           display this help and exit\n");
828                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
829                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
830                exit(0);
831            default:
832                fprintf(stderr, "unknown config option '%c'\n", c);
833                exit(1);
834        }
835    }
836
837    log_options = LOG_PID;
838#ifdef LOG_PERROR
839    if (foreground)
840        log_options |= LOG_PERROR;
841#endif
842    openlog("renderd", log_options, LOG_DAEMON);
843
844    syslog(LOG_INFO, "Rendering daemon started");
845
846    pthread_mutex_init(&qLock, NULL);
847    pthread_cond_init(&qCond, NULL);
848    reqHead.next = reqHead.prev = &reqHead;
849    dirtyHead.next = dirtyHead.prev = &dirtyHead;
850    renderHead.next = renderHead.prev = &renderHead;
851    hashidxSize = HASHIDX_SIZE;
852    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
853    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
854
855    stats.noDirtyRender = 0;
856    stats.noReqDroped = 0;
857    stats.noReqRender = 0;
858
859    xmlconfigitem maps[XMLCONFIGS_MAX];
860    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
861
862    renderd_config config_slaves[MAX_SLAVES];
863    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
864    bzero(&config, sizeof(renderd_config));
865
866    dictionary *ini = iniparser_load(config_file_name);
867    if (! ini) {
868        exit(1);
869    }
870
871    noSlaveRenders = 0;
872
873    int iconf = -1;
874    char buffer[PATH_MAX];
875    for (int section=0; section < iniparser_getnsec(ini); section++) {
876        char *name = iniparser_getsecname(ini, section);
877        syslog(LOG_INFO, "Parsing section %s\n", name);
878        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
879            if (config.tile_dir == NULL) {
880                fprintf(stderr, "No valid (active) renderd config section available\n");
881                exit(7);
882            }
883            /* this is a map section */
884            iconf++;
885            if (strlen(name) >= XMLCONFIG_MAX) {
886                fprintf(stderr, "XML name too long: %s\n", name);
887                exit(7);
888            }
889
890            strcpy(maps[iconf].xmlname, name);
891            if (iconf >= XMLCONFIGS_MAX) {
892                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
893                exit(7);
894            }
895            sprintf(buffer, "%s:uri", name);
896            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
897            if (strlen(ini_uri) >= PATH_MAX) {
898                fprintf(stderr, "URI too long: %s\n", ini_uri);
899                exit(7);
900            }
901            strcpy(maps[iconf].xmluri, ini_uri);
902            sprintf(buffer, "%s:xml", name);
903            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
904            if (strlen(ini_xmlpath) >= PATH_MAX){
905                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
906                exit(7);
907            }
908            sprintf(buffer, "%s:host", name);
909            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
910            if (strlen(ini_hostname) >= PATH_MAX) {
911                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
912                exit(7);
913            }
914
915            sprintf(buffer, "%s:htcphost", name);
916            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
917            if (strlen(ini_htcpip) >= PATH_MAX) {
918                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
919                exit(7);
920            }
921            strcpy(maps[iconf].xmlfile, ini_xmlpath);
922            strcpy(maps[iconf].tile_dir, config.tile_dir);
923            strcpy(maps[iconf].host, ini_hostname);
924            strcpy(maps[iconf].htcpip, ini_htcpip);
925        } else if (strncmp(name, "renderd", 7) == 0) {
926            int render_sec = 0;
927            if (sscanf(name, "renderd%i", &render_sec) != 1) {
928                render_sec = 0;
929            }
930            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
931            if (render_sec >= MAX_SLAVES) {
932                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
933                        MAX_SLAVES);
934                exit(7);
935            }
936            sprintf(buffer, "%s:socketname", name);
937            config_slaves[render_sec].socketname = iniparser_getstring(ini,
938                    buffer, (char *) RENDER_SOCKET);
939            sprintf(buffer, "%s:iphostname", name);
940            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
941                    buffer, "");
942            sprintf(buffer, "%s:ipport", name);
943            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
944            sprintf(buffer, "%s:num_threads", name);
945            config_slaves[render_sec].num_threads = iniparser_getint(ini,
946                    buffer, NUM_THREADS);
947            sprintf(buffer, "%s:tile_dir", name);
948            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
949                    buffer, (char *) HASH_PATH);
950            sprintf(buffer, "%s:stats_file", name);
951            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
952                    buffer, "NULL");
953
954            if (render_sec == active_slave) {
955                config.socketname = config_slaves[render_sec].socketname;
956                config.iphostname = config_slaves[render_sec].iphostname;
957                config.ipport = config_slaves[render_sec].ipport;
958                config.num_threads = config_slaves[render_sec].num_threads;
959                config.tile_dir = config_slaves[render_sec].tile_dir;
960                config.stats_filename
961                        = config_slaves[render_sec].stats_filename;
962                config.mapnik_plugins_dir = iniparser_getstring(ini,
963                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
964                config.mapnik_font_dir = iniparser_getstring(ini,
965                        "mapnik:font_dir", (char *) FONT_DIR);
966            } else {
967                noSlaveRenders += config_slaves[render_sec].num_threads;
968            }
969        }
970    }
971
972    if (config.ipport > 0) {
973        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
974    } else {
975        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
976    }
977    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
978    if (active_slave == 0) {
979        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
980    }
981    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
982    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
983    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
984    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
985    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
986    for (int i = 0; i < MAX_SLAVES; i++) {
987        if (config_slaves[i].num_threads == 0) {
988            continue;
989        }
990        if (i == active_slave) {
991            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
992        }
993        if (config_slaves[i].ipport > 0) {
994                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
995                        config_slaves[i].iphostname, config_slaves[i].ipport);
996            } else {
997                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
998                        config_slaves[i].socketname);
999            }
1000        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1001                config_slaves[i].num_threads);
1002        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1003                config_slaves[i].tile_dir);
1004        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1005                config_slaves[i].stats_filename);
1006    }
1007
1008    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1009        if (maps[iconf].xmlname[0] != 0) {
1010         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1011                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1012                 maps[iconf].htcpip, maps[iconf].host);
1013        }
1014    }
1015
1016    fd = server_socket_init(&config);
1017#if 0
1018    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1019        fprintf(stderr, "setting socket non-block failed\n");
1020        close(fd);
1021        exit(5);
1022    }
1023#endif
1024
1025    //sigPipeAction.sa_handler = pipe_handler;
1026    sigPipeAction.sa_handler = SIG_IGN;
1027    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1028        fprintf(stderr, "failed to register signal handler\n");
1029        close(fd);
1030        exit(6);
1031    }
1032
1033    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1034
1035    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1036    if (foreground) {
1037        fprintf(stderr, "Running in foreground mode...\n");
1038    } else {
1039        if (daemon(0, 0) != 0) {
1040            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1041        }
1042        /* write pid file */
1043        FILE *pidfile = fopen(PIDFILE, "w");
1044        if (pidfile) {
1045            (void) fprintf(pidfile, "%d\n", getpid());
1046            (void) fclose(pidfile);
1047        }
1048    }
1049
1050    if (config.stats_filename != NULL) {
1051        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1052            syslog(LOG_WARNING, "Could not create stats writeout thread");
1053        }
1054    } else {
1055        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1056    }
1057
1058    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1059
1060    for(i=0; i<config.num_threads; i++) {
1061        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1062            fprintf(stderr, "error spawning render thread\n");
1063            close(fd);
1064            exit(7);
1065        }
1066    }
1067
1068    if (active_slave == 0) {
1069        //Only the master renderd opens connections to its slaves
1070        k = 0;
1071        slave_threads
1072                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1073        for (i = 1; i < MAX_SLAVES; i++) {
1074            for (j = 0; j < config_slaves[i].num_threads; j++) {
1075                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1076                        (void *) &config_slaves[i])) {
1077                    fprintf(stderr, "error spawning render thread\n");
1078                    close(fd);
1079                    exit(7);
1080                }
1081            }
1082        }
1083    }
1084
1085    process_loop(fd);
1086
1087    unlink(config.socketname);
1088    close(fd);
1089    return 0;
1090}
Note: See TracBrowser for help on using the repository browser.