source: subversion/applications/utils/mod_tile/daemon.c @ 21214

Last change on this file since 21214 was 21214, checked in by tomhughes, 10 years ago

Default statistics file to a null pointer, not the string "NULL".

File size: 38.2 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h>
9#include <sys/select.h>
10#include <sys/stat.h>
11#include <sys/un.h>
12#include <poll.h>
13#include <errno.h>
14#include <pthread.h>
15#include <signal.h>
16#include <string.h>
17#include <syslog.h>
18#include <getopt.h>
19
20#include "render_config.h"
21#include "daemon.h"
22#include "gen_tile.h"
23#include "protocol.h"
24#include "dir_utils.h"
25
26#define PIDFILE "/var/run/renderd/renderd.pid"
27
28extern "C" {
29#include "iniparser3.0b/src/iniparser.h"
30}
31
32static pthread_t *render_threads;
33static pthread_t *slave_threads;
34static struct sigaction sigPipeAction;
35
36static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
37static struct item_idx * item_hashidx;
38static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
39static pthread_mutex_t qLock;
40static pthread_cond_t qCond;
41
42static stats_struct stats;
43static pthread_t stats_thread;
44
45static renderd_config config;
46
47int noSlaveRenders;
48int hashidxSize;
49
50struct item *fetch_request(void)
51{
52    struct item *item = NULL;
53
54    pthread_mutex_lock(&qLock);
55
56    while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
57        pthread_cond_wait(&qCond, &qLock);
58    }
59    if (reqPrioNum) {
60        item = reqPrioHead.next;
61        reqPrioNum--;
62        stats.noReqPrioRender++;
63    } else if (reqNum) {
64        item = reqHead.next;
65        reqNum--;
66        stats.noReqRender++;
67    } else if (dirtyNum) {
68        item = dirtyHead.next;
69        dirtyNum--;
70        stats.noDirtyRender++;
71    } else if (reqBulkNum) {
72        item = reqBulkHead.next;
73        reqBulkNum--;
74        stats.noReqBulkRender++;
75    }
76    if (item) {
77        item->next->prev = item->prev;
78        item->prev->next = item->next;
79
80        item->prev = &renderHead;
81        item->next = renderHead.next;
82        renderHead.next->prev = item;
83        renderHead.next = item;
84        item->inQueue = queueRender;
85    }
86
87        if ((item->req.z >= 0) && ( item->req.z <= MAX_ZOOM)) {
88                stats.noZoomRender[item->req.z]++;
89        }
90
91    pthread_mutex_unlock(&qLock);
92
93    return item;
94}
95
96void clear_requests(int fd)
97{
98    struct item *item, *dupes, *queueHead;
99
100    /**Only need to look up on the shorter request and render queue
101      * so using the linear list shouldn't be a problem
102      */
103    pthread_mutex_lock(&qLock);
104    for (int i = 0; i < 4; i++) {
105        switch (i) {
106        case 0: { queueHead = &reqHead; break;}
107        case 1: { queueHead = &renderHead; break;}
108        case 2: { queueHead = &reqPrioHead; break;}
109        case 3: { queueHead = &reqBulkHead; break;}
110        }
111
112        item = queueHead->next;
113        while (item != queueHead) {
114            if (item->fd == fd)
115                item->fd = FD_INVALID;
116
117            dupes = item->duplicates;
118            while (dupes) {
119                if (dupes->fd == fd)
120                    dupes->fd = FD_INVALID;
121                dupes = dupes->duplicates;
122            }
123            item = item->next;
124        }
125    }
126
127    pthread_mutex_unlock(&qLock);
128}
129
130
131static int calcHashKey(struct item *item) {
132    uint64_t xmlnameHash = 0;
133    uint64_t key;
134    for (int i = 0; item->req.xmlname[i] != 0; i++) {
135        xmlnameHash += item->req.xmlname[i];
136    }
137    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
138    return key % hashidxSize;
139}
140
141void insert_item_idx(struct item *item) {
142    struct item_idx * nextItem;
143    struct item_idx * prevItem;
144
145    int key = calcHashKey(item);
146
147    if (item_hashidx[key].item == NULL) {
148        item_hashidx[key].item = item;
149    } else {
150        prevItem = &(item_hashidx[key]);
151        nextItem = item_hashidx[key].next;
152        while(nextItem) {
153            prevItem = nextItem;
154            nextItem = nextItem->next;
155        }
156        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
157        nextItem->item = item;
158        nextItem->next = NULL;
159        prevItem->next = nextItem;
160    }
161}
162
163void remove_item_idx(struct item * item) {
164    int key = calcHashKey(item);
165    struct item_idx * nextItem;
166    struct item_idx * prevItem;
167    struct item * test;
168    if (item_hashidx[key].item == NULL) {
169        //item not in index;
170        return;
171    }
172    prevItem = &(item_hashidx[key]);
173    nextItem = &(item_hashidx[key]);
174
175    while (nextItem != NULL) {
176        test = nextItem->item;
177        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
178                == test->req.z) && (!strcmp(item->req.xmlname,
179                test->req.xmlname))) {
180            /*
181             * Found item, removing it from list
182             */
183            nextItem->item = NULL;
184            if (nextItem->next != NULL) {
185                if (nextItem == &(item_hashidx[key])) {
186                    prevItem = nextItem->next;
187                    memcpy(&(item_hashidx[key]), nextItem->next,
188                            sizeof(struct item_idx));
189                    free(prevItem);
190                } else {
191                    prevItem->next = nextItem->next;
192                }
193            } else {
194                prevItem->next = NULL;
195            }
196
197            if (nextItem != &(item_hashidx[key])) {
198                free(nextItem);
199            }
200            return;
201        } else {
202            prevItem = nextItem;
203            nextItem = nextItem->next;
204        }
205    }
206}
207
208struct item * lookup_item_idx(struct item * item) {
209    struct item_idx * nextItem;
210    struct item * test;
211
212    int key = calcHashKey(item);
213
214    if (item_hashidx[key].item == NULL) {
215        return NULL;
216    } else {
217        nextItem = &(item_hashidx[key]);
218        while (nextItem != NULL) {
219            test = nextItem->item;
220            if ((item->mx == test->mx) && (item->my == test->my)
221                    && (item->req.z == test->req.z) && (!strcmp(
222                    item->req.xmlname, test->req.xmlname))) {
223                return test;
224            } else {
225                nextItem = nextItem->next;
226            }
227        }
228    }
229    return NULL;
230}
231
232static inline const char *cmdStr(enum protoCmd c)
233{
234    switch (c) {
235        case cmdIgnore:  return "Ignore";
236        case cmdRender:  return "Render";
237        case cmdRenderPrio:  return "RenderPrio";
238        case cmdRenderBulk:  return "RenderBulk";
239        case cmdDirty:   return "Dirty";
240        case cmdDone:    return "Done";
241        case cmdNotDone: return "NotDone";
242        default:         return "unknown";
243    }
244}
245
246void send_response(struct item *item, enum protoCmd rsp)
247{
248    struct protocol *req = &item->req;
249    int ret;
250
251    pthread_mutex_lock(&qLock);
252    item->next->prev = item->prev;
253    item->prev->next = item->next;
254    remove_item_idx(item);
255    pthread_mutex_unlock(&qLock);
256
257    while (item) {
258        struct item *prev = item;
259        req = &item->req;
260        if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
261            req->cmd = rsp;
262            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
263            ret = send(item->fd, req, sizeof(*req), 0);
264            if (ret != sizeof(*req))
265                perror("send error during send_done");
266        }
267        item = item->duplicates;
268        free(prev);
269    }
270}
271
272
273enum protoCmd pending(struct item *test)
274{
275    // check all queues and render list to see if this request already queued
276    // If so, add this new request as a duplicate
277    // call with qLock held
278    struct item *item;
279
280    item = lookup_item_idx(test);
281    if (item != NULL) {
282        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
283            test->duplicates = item->duplicates;
284            item->duplicates = test;
285            test->inQueue = queueDuplicate;
286            return cmdIgnore;
287        } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
288            return cmdNotDone;
289        }
290    }
291
292    return cmdRender;
293}
294
295
296enum protoCmd rx_request(const struct protocol *req, int fd)
297{
298    struct protocol *reqnew;
299    struct item *list = NULL, *item;
300    enum protoCmd pend;
301
302    // Upgrade version 1 to version 2
303    if (req->ver == 1) {
304        reqnew = (struct protocol *)malloc(sizeof(protocol));
305        memcpy(reqnew, req, sizeof(protocol_v1));
306        reqnew->xmlname[0] = 0;
307        req = reqnew;
308    }
309    else if (req->ver != 2) {
310        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
311        return cmdIgnore;
312    }
313
314    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
315            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
316
317    if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
318        return cmdIgnore;
319
320    if (check_xyz(req->x, req->y, req->z))
321        return cmdNotDone;
322
323    item = (struct item *)malloc(sizeof(*item));
324    if (!item) {
325            syslog(LOG_ERR, "malloc failed");
326            return cmdNotDone;
327    }
328
329    item->req = *req;
330    item->duplicates = NULL;
331    item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
332
333#ifdef METATILE
334    /* Round down request co-ordinates to the neareast N (should be a power of 2)
335     * Note: request path is no longer consistent but this will be recalculated
336     * when the metatile is being rendered.
337     */
338    item->mx = item->req.x & ~(METATILE-1);
339    item->my = item->req.y & ~(METATILE-1);
340#else
341    item->mx = item->req.x;
342    item->my = item->req.y;
343#endif
344
345    pthread_mutex_lock(&qLock);
346
347    // Check for a matching request in the current rendering or dirty queues
348    pend = pending(item);
349    if (pend == cmdNotDone) {
350        // We found a match in the dirty queue, can not wait for it
351        pthread_mutex_unlock(&qLock);
352        free(item);
353        return cmdNotDone;
354    }
355    if (pend == cmdIgnore) {
356        // Found a match in render queue, item added as duplicate
357        pthread_mutex_unlock(&qLock);
358        return cmdIgnore;
359    }
360
361    // New request, add it to render or dirty queue
362    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
363        list = &reqHead;
364        item->inQueue = queueRequest;
365        reqNum++;
366    } else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
367        list = &reqPrioHead;
368        item->inQueue = queueRequestPrio;
369        reqPrioNum++;
370    } else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
371        list = &reqBulkHead;
372        item->inQueue = queueRequestBulk;
373        reqBulkNum++;
374    } else if (dirtyNum < DIRTY_LIMIT) {
375        list = &dirtyHead;
376        item->inQueue = queueDirty;
377        dirtyNum++;
378        item->fd = FD_INVALID; // No response after render
379    } else {
380        // The queue is severely backlogged. Drop request
381        stats.noReqDroped++;
382        pthread_mutex_unlock(&qLock);
383        free(item);
384        return cmdNotDone;
385    }
386
387    if (list) {
388        item->next = list;
389        item->prev = list->prev;
390        item->prev->next = item;
391        list->prev = item;
392        /* In addition to the linked list, add item to a hash table index
393         * for faster lookup of pending requests.
394         */
395        insert_item_idx(item);
396
397        pthread_cond_signal(&qCond);
398    } else
399        free(item);
400
401    pthread_mutex_unlock(&qLock);
402
403    return (list == &reqHead)?cmdIgnore:cmdNotDone;
404}
405
406
407void process_loop(int listen_fd)
408{
409    int num_connections = 0;
410    int connections[MAX_CONNECTIONS];
411
412    bzero(connections, sizeof(connections));
413
414    while (1) {
415        struct sockaddr_un in_addr;
416        socklen_t in_addrlen = sizeof(in_addr);
417        fd_set rd;
418        int incoming, num, nfds, i;
419
420        FD_ZERO(&rd);
421        FD_SET(listen_fd, &rd);
422        nfds = listen_fd+1;
423
424        for (i=0; i<num_connections; i++) {
425            FD_SET(connections[i], &rd);
426            nfds = MAX(nfds, connections[i]+1);
427        }
428
429        num = select(nfds, &rd, NULL, NULL, NULL);
430        if (num == -1)
431            perror("select()");
432        else if (num) {
433            //printf("Data is available now on %d fds\n", num);
434            if (FD_ISSET(listen_fd, &rd)) {
435                num--;
436                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
437                if (incoming < 0) {
438                    perror("accept()");
439                } else {
440                    if (num_connections == MAX_CONNECTIONS) {
441                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
442                        close(incoming);
443                    } else {
444                        connections[num_connections++] = incoming;
445                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
446                    }
447                }
448            }
449            for (i=0; num && (i<num_connections); i++) {
450                int fd = connections[i];
451                if (FD_ISSET(fd, &rd)) {
452                    struct protocol cmd;
453                    int ret;
454
455                    // TODO: to get highest performance we should loop here until we get EAGAIN
456                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
457                    if (ret == sizeof(cmd)) {
458                        enum protoCmd rsp = rx_request(&cmd, fd);
459
460                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
461                            cmd.cmd = rsp;
462                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
463                            ret = send(fd, &cmd, sizeof(cmd), 0);
464                            if (ret != sizeof(cmd))
465                                perror("response send error");
466                        }
467                    } else if (!ret) {
468                        int j;
469
470                        num_connections--;
471                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
472                        for (j=i; j < num_connections; j++)
473                            connections[j] = connections[j+1];
474                        clear_requests(fd);
475                        close(fd);
476                    } else {
477                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
478                        break;
479                    }
480                }
481            }
482        } else {
483            syslog(LOG_ERR, "Select timeout");
484        }
485    }
486}
487
488/**
489 * Periodically write out current stats to a stats file. This information
490 * can then be used to monitor performance of renderd e.g. with a munin plugin
491 */
492void *stats_writeout_thread(void * arg) {
493    stats_struct lStats;
494    int dirtQueueLength;
495    int reqQueueLength;
496    int reqPrioQueueLength;
497    int reqBulkQueueLength;
498        int i;
499
500    int noFailedAttempts = 0;
501    char tmpName[PATH_MAX];
502
503    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
504
505    syslog(LOG_DEBUG, "Starting stats thread");
506    while (1) {
507        pthread_mutex_lock(&qLock);
508        memcpy(&lStats, &stats, sizeof(stats_struct));
509        dirtQueueLength = dirtyNum;
510        reqQueueLength = reqNum;
511        reqPrioQueueLength = reqPrioNum;
512        reqBulkQueueLength = reqBulkNum;
513        pthread_mutex_unlock(&qLock);
514
515        FILE * statfile = fopen(tmpName, "w");
516        if (statfile == NULL) {
517            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
518            noFailedAttempts++;
519            if (noFailedAttempts > 3) {
520                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
521                break;
522            }
523            continue;
524        } else {
525            noFailedAttempts = 0;
526            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
527            fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
528            fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
529            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
530            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
531            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
532            fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
533            fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
534            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
535                        for (i = 0; i <= MAX_ZOOM; i++) {
536                                fprintf(statfile,"ZoomRendered%02i: %li\n", i, lStats.noZoomRender[i]);
537                        }
538            fclose(statfile);
539            if (rename(tmpName, config.stats_filename)) {
540                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
541                noFailedAttempts++;
542                if (noFailedAttempts > 3) {
543                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
544                    break;
545                }
546                continue;
547            }
548        }
549        sleep(10);
550    }
551    return NULL;
552}
553
554int client_socket_init(renderd_config * sConfig) {
555    int fd;
556    struct sockaddr_un * addrU;
557    struct sockaddr_in * addrI;
558    struct hostent *server;
559    if (sConfig->ipport > 0) {
560        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i",
561                sConfig->iphostname, sConfig->ipport);
562        addrI = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in));
563        fd = socket(PF_INET, SOCK_STREAM, 0);
564        server = gethostbyname(sConfig->iphostname);
565        if (server == NULL) {
566            syslog(LOG_WARNING, "Could not resolve hostname: %s",
567                    sConfig->iphostname);
568            return FD_INVALID;
569        }
570        bzero((char *) addrI, sizeof(struct sockaddr_in));
571        addrI->sin_family = AF_INET;
572        bcopy((char *) server->h_addr, (char *) &addrI->sin_addr.s_addr,
573                server->h_length);
574        addrI->sin_port = htons(sConfig->ipport);
575        if (connect(fd, (struct sockaddr *) addrI, sizeof(struct sockaddr_in)) < 0) {
576            syslog(LOG_WARNING, "Could not connect to %s:%i",
577                    sConfig->iphostname, sConfig->ipport);
578            return FD_INVALID;
579        }
580        free(addrI);
581        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport,
582                fd);
583    } else {
584        syslog(LOG_INFO, "Initialising unix client socket on %s",
585                sConfig->socketname);
586        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
587        fd = socket(PF_UNIX, SOCK_STREAM, 0);
588        if (fd < 0) {
589            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
590            return FD_INVALID;
591        }
592
593        bzero(addrU, sizeof(struct sockaddr_un));
594        addrU->sun_family = AF_UNIX;
595        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
596
597        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
598            syslog(LOG_WARNING, "socket connect failed for: %s",
599                    sConfig->socketname);
600            close(fd);
601            return FD_INVALID;
602        }
603        free(addrU);
604        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
605                fd);
606    }
607    return fd;
608}
609
610int server_socket_init(renderd_config *sConfig) {
611    struct sockaddr_un addrU;
612    struct sockaddr_in addrI;
613    mode_t old;
614    int fd;
615
616    if (sConfig->ipport > 0) {
617        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
618                sConfig->iphostname, sConfig->ipport);
619        fd = socket(PF_INET, SOCK_STREAM, 0);
620        if (fd < 0) {
621            fprintf(stderr, "failed to create IP socket\n");
622            exit(2);
623        }
624        bzero(&addrI, sizeof(addrI));
625        addrI.sin_family = AF_INET;
626        addrI.sin_addr.s_addr = INADDR_ANY;
627        addrI.sin_port = htons(sConfig->ipport);
628        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
629            fprintf(stderr, "socket bind failed for: %s:%i\n",
630                    sConfig->iphostname, sConfig->ipport);
631            close(fd);
632            exit(3);
633        }
634    } else {
635        syslog(LOG_INFO, "Initialising unix server socket on %s",
636                sConfig->socketname);
637
638        fd = socket(PF_UNIX, SOCK_STREAM, 0);
639        if (fd < 0) {
640            fprintf(stderr, "failed to create unix socket\n");
641            exit(2);
642        }
643
644        bzero(&addrU, sizeof(addrU));
645        addrU.sun_family = AF_UNIX;
646        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
647
648        unlink(addrU.sun_path);
649
650        old = umask(0); // Need daemon socket to be writeable by apache
651        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
652            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
653            close(fd);
654            exit(3);
655        }
656        umask(old);
657    }
658
659    if (listen(fd, QUEUE_MAX) < 0) {
660        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
661        close(fd);
662        exit(4);
663    }
664
665    syslog(LOG_DEBUG, "Created server socket %i", fd);
666
667    return fd;
668
669}
670
671/**
672 * This function is used as a the start function for the slave renderer thread.
673 * It pulls a request from the central queue of requests and dispatches it to
674 * the slave renderer. It then blocks and waits for the response with no timeout.
675 * As it only sends one request at a time (there are as many slave_thread threads as there
676 * are rendering threads on the slaves) nothing gets queued on the slave and should get
677 * rendererd immediately. Thus overall, requests should be nicely load balanced between
678 * all the rendering threads available both locally and in the slaves.
679 */
680void *slave_thread(void * arg) {
681    renderd_config * sConfig = (renderd_config *) arg;
682
683    int pfd = FD_INVALID;
684    int retry;
685    size_t ret_size;
686
687    struct protocol * resp;
688    struct protocol * req_slave;
689
690    req_slave = (struct protocol *)malloc(sizeof(protocol));
691    resp = (struct protocol *)malloc(sizeof(protocol));
692    bzero(req_slave, sizeof(struct protocol));
693    bzero(resp, sizeof(struct protocol));
694
695    while (1) {
696        if (pfd == FD_INVALID) {
697            pfd = client_socket_init(sConfig);
698            if (pfd == FD_INVALID) {
699                if (sConfig->ipport > 0) {
700                    syslog(LOG_ERR,
701                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
702                        sConfig->iphostname, sConfig->ipport);
703                } else {
704                    syslog( LOG_ERR,
705                            "Failed to connect to render slave %s, trying again in 30 seconds",
706                            sConfig->socketname);
707                }
708                sleep(30);
709                continue;
710            }
711        }
712
713        enum protoCmd ret;
714        struct item *item = fetch_request();
715        if (item) {
716            struct protocol *req = &item->req;
717            req_slave->ver = PROTO_VER;
718            req_slave->cmd = cmdRender;
719            strcpy(req_slave->xmlname, req->xmlname);
720            req_slave->x = req->x;
721            req_slave->y = req->y;
722            req_slave->z = req->z;
723
724            /*Dispatch request to slave renderd*/
725            retry = 2;
726            syslog(LOG_INFO,
727                    "Dispatching request to slave thread on fd %i", pfd);
728            do {
729                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
730
731                if (ret_size == sizeof(struct protocol)) {
732                    //correctly sent command to slave
733                    break;
734                }
735
736                if (errno != EPIPE) {
737                    syslog(LOG_ERR,
738                            "Failed to send cmd to render slave, shutting down thread");
739                    return NULL;
740                }
741
742                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
743                close(pfd);
744                pfd = client_socket_init(sConfig);
745                if (pfd == FD_INVALID) {
746                    syslog(LOG_ERR,
747                            "Failed to re-connect to render slave, dropping request");
748                    ret = cmdNotDone;
749                    send_response(item, ret);
750                    break;
751                }
752            } while (retry--);
753            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
754                continue;
755            }
756
757            ret_size = 0;
758            retry = 10;
759            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
760                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
761                        - ret_size), 0);
762                if ((errno == EPIPE) || ret_size == 0) {
763                    close(pfd);
764                    pfd = FD_INVALID;
765                    ret_size = 0;
766                    syslog(LOG_ERR, "Pipe to render slave closed");
767                    break;
768                }
769                retry--;
770            }
771            if (ret_size < sizeof(struct protocol)) {
772                if (sConfig->ipport > 0) {
773                    syslog( LOG_ERR,
774                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
775                            sConfig->iphostname, sConfig->ipport);
776                } else {
777                    syslog( LOG_ERR,
778                            "Invalid reply render slave %s, trying again in 30 seconds",
779                            sConfig->socketname);
780                }
781
782                ret = cmdNotDone;
783                send_response(item, ret);
784                sleep(30);
785            } else {
786                ret = resp->cmd;
787                send_response(item, ret);
788                if (resp->cmd != cmdDone) {
789                    if (sConfig->ipport > 0) {
790                        syslog( LOG_ERR,
791                                "Request from render slave %s:%i did not complete correctly",
792                                sConfig->iphostname, sConfig->ipport);
793                    } else {
794                        syslog( LOG_ERR,
795                                "Request from render slave %s did not complete correctly",
796                                sConfig->socketname);
797                    }
798                    //Sleep for a while to make sure we don't overload the renderer
799                    //This only happens if it didn't correctly block on the rendering
800                    //request
801                    sleep(30);
802                }
803            }
804
805        } else {
806            sleep(1); // TODO: Use an event to indicate there are new requests
807        }
808    }
809    return NULL;
810}
811
812
813int main(int argc, char **argv)
814{
815    int fd, i, j, k;
816
817    int c;
818    int foreground=0;
819    int active_slave=0;
820    int log_options;
821    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
822
823    while (1) {
824        int option_index = 0;
825        static struct option long_options[] = {
826            {"config", 1, 0, 'c'},
827            {"foreground", 1, 0, 'f'},
828            {"slave", 1, 0, 's'},
829            {"help", 0, 0, 'h'},
830            {0, 0, 0, 0}
831        };
832
833        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
834        if (c == -1)
835            break;
836
837        switch (c) {
838            case 'f':
839                foreground=1;
840                break;
841            case 'c':
842                strncpy(config_file_name, optarg, PATH_MAX-1);
843                config_file_name[PATH_MAX-1] = 0;
844                break;
845            case 's':
846                if (sscanf(optarg, "%i", &active_slave) != 1) {
847                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
848                    active_slave = 0;
849                }
850                break;
851            case 'h':
852                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
853                fprintf(stderr, "Mapnik rendering daemon\n");
854                fprintf(stderr, "  -f, --foreground     run in foreground\n");
855                fprintf(stderr, "  -h, --help           display this help and exit\n");
856                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
857                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
858                exit(0);
859            default:
860                fprintf(stderr, "unknown config option '%c'\n", c);
861                exit(1);
862        }
863    }
864
865    log_options = LOG_PID;
866#ifdef LOG_PERROR
867    if (foreground)
868        log_options |= LOG_PERROR;
869#endif
870    openlog("renderd", log_options, LOG_DAEMON);
871
872    syslog(LOG_INFO, "Rendering daemon started");
873
874    pthread_mutex_init(&qLock, NULL);
875    pthread_cond_init(&qCond, NULL);
876    reqHead.next = reqHead.prev = &reqHead;
877    reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
878    reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
879    dirtyHead.next = dirtyHead.prev = &dirtyHead;
880    renderHead.next = renderHead.prev = &renderHead;
881    hashidxSize = HASHIDX_SIZE;
882    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
883    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
884
885    stats.noDirtyRender = 0;
886    stats.noReqDroped = 0;
887    stats.noReqRender = 0;
888    stats.noReqPrioRender = 0;
889    stats.noReqBulkRender = 0;
890
891    xmlconfigitem maps[XMLCONFIGS_MAX];
892    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
893
894    renderd_config config_slaves[MAX_SLAVES];
895    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
896    bzero(&config, sizeof(renderd_config));
897
898    dictionary *ini = iniparser_load(config_file_name);
899    if (! ini) {
900        exit(1);
901    }
902
903    noSlaveRenders = 0;
904
905    int iconf = -1;
906    char buffer[PATH_MAX];
907    for (int section=0; section < iniparser_getnsec(ini); section++) {
908        char *name = iniparser_getsecname(ini, section);
909        syslog(LOG_INFO, "Parsing section %s\n", name);
910        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
911            if (config.tile_dir == NULL) {
912                fprintf(stderr, "No valid (active) renderd config section available\n");
913                exit(7);
914            }
915            /* this is a map section */
916            iconf++;
917            if (strlen(name) >= XMLCONFIG_MAX) {
918                fprintf(stderr, "XML name too long: %s\n", name);
919                exit(7);
920            }
921
922            strcpy(maps[iconf].xmlname, name);
923            if (iconf >= XMLCONFIGS_MAX) {
924                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
925                exit(7);
926            }
927            sprintf(buffer, "%s:uri", name);
928            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
929            if (strlen(ini_uri) >= PATH_MAX) {
930                fprintf(stderr, "URI too long: %s\n", ini_uri);
931                exit(7);
932            }
933            strcpy(maps[iconf].xmluri, ini_uri);
934            sprintf(buffer, "%s:xml", name);
935            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
936            if (strlen(ini_xmlpath) >= PATH_MAX){
937                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
938                exit(7);
939            }
940            sprintf(buffer, "%s:host", name);
941            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
942            if (strlen(ini_hostname) >= PATH_MAX) {
943                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
944                exit(7);
945            }
946
947            sprintf(buffer, "%s:htcphost", name);
948            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
949            if (strlen(ini_htcpip) >= PATH_MAX) {
950                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
951                exit(7);
952            }
953            strcpy(maps[iconf].xmlfile, ini_xmlpath);
954            strcpy(maps[iconf].tile_dir, config.tile_dir);
955            strcpy(maps[iconf].host, ini_hostname);
956            strcpy(maps[iconf].htcpip, ini_htcpip);
957        } else if (strncmp(name, "renderd", 7) == 0) {
958            int render_sec = 0;
959            if (sscanf(name, "renderd%i", &render_sec) != 1) {
960                render_sec = 0;
961            }
962            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
963            if (render_sec >= MAX_SLAVES) {
964                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
965                        MAX_SLAVES);
966                exit(7);
967            }
968            sprintf(buffer, "%s:socketname", name);
969            config_slaves[render_sec].socketname = iniparser_getstring(ini,
970                    buffer, (char *) RENDER_SOCKET);
971            sprintf(buffer, "%s:iphostname", name);
972            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
973                    buffer, "");
974            sprintf(buffer, "%s:ipport", name);
975            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
976            sprintf(buffer, "%s:num_threads", name);
977            config_slaves[render_sec].num_threads = iniparser_getint(ini,
978                    buffer, NUM_THREADS);
979            sprintf(buffer, "%s:tile_dir", name);
980            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
981                    buffer, (char *) HASH_PATH);
982            sprintf(buffer, "%s:stats_file", name);
983            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
984                    buffer, NULL);
985
986            if (render_sec == active_slave) {
987                config.socketname = config_slaves[render_sec].socketname;
988                config.iphostname = config_slaves[render_sec].iphostname;
989                config.ipport = config_slaves[render_sec].ipport;
990                config.num_threads = config_slaves[render_sec].num_threads;
991                config.tile_dir = config_slaves[render_sec].tile_dir;
992                config.stats_filename
993                        = config_slaves[render_sec].stats_filename;
994                config.mapnik_plugins_dir = iniparser_getstring(ini,
995                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
996                config.mapnik_font_dir = iniparser_getstring(ini,
997                        "mapnik:font_dir", (char *) FONT_DIR);
998                config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
999                        "mapnik:font_dir_recurse", FONT_RECURSE);
1000            } else {
1001                noSlaveRenders += config_slaves[render_sec].num_threads;
1002            }
1003        }
1004    }
1005
1006    if (config.ipport > 0) {
1007        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
1008    } else {
1009        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
1010    }
1011    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
1012    if (active_slave == 0) {
1013        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
1014    }
1015    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
1016    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
1017    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
1018    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
1019    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
1020    for (int i = 0; i < MAX_SLAVES; i++) {
1021        if (config_slaves[i].num_threads == 0) {
1022            continue;
1023        }
1024        if (i == active_slave) {
1025            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
1026        }
1027        if (config_slaves[i].ipport > 0) {
1028                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
1029                        config_slaves[i].iphostname, config_slaves[i].ipport);
1030            } else {
1031                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
1032                        config_slaves[i].socketname);
1033            }
1034        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1035                config_slaves[i].num_threads);
1036        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1037                config_slaves[i].tile_dir);
1038        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1039                config_slaves[i].stats_filename);
1040    }
1041
1042    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1043        if (maps[iconf].xmlname[0] != 0) {
1044         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1045                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1046                 maps[iconf].htcpip, maps[iconf].host);
1047        }
1048    }
1049
1050    fd = server_socket_init(&config);
1051#if 0
1052    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1053        fprintf(stderr, "setting socket non-block failed\n");
1054        close(fd);
1055        exit(5);
1056    }
1057#endif
1058
1059    //sigPipeAction.sa_handler = pipe_handler;
1060    sigPipeAction.sa_handler = SIG_IGN;
1061    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1062        fprintf(stderr, "failed to register signal handler\n");
1063        close(fd);
1064        exit(6);
1065    }
1066
1067    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1068
1069    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1070    if (foreground) {
1071        fprintf(stderr, "Running in foreground mode...\n");
1072    } else {
1073        if (daemon(0, 0) != 0) {
1074            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1075        }
1076        /* write pid file */
1077        FILE *pidfile = fopen(PIDFILE, "w");
1078        if (pidfile) {
1079            (void) fprintf(pidfile, "%d\n", getpid());
1080            (void) fclose(pidfile);
1081        }
1082    }
1083
1084    if (config.stats_filename != NULL) {
1085        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1086            syslog(LOG_WARNING, "Could not create stats writeout thread");
1087        }
1088    } else {
1089        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1090    }
1091
1092    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1093
1094    for(i=0; i<config.num_threads; i++) {
1095        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1096            fprintf(stderr, "error spawning render thread\n");
1097            close(fd);
1098            exit(7);
1099        }
1100    }
1101
1102    if (active_slave == 0) {
1103        //Only the master renderd opens connections to its slaves
1104        k = 0;
1105        slave_threads
1106                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1107        for (i = 1; i < MAX_SLAVES; i++) {
1108            for (j = 0; j < config_slaves[i].num_threads; j++) {
1109                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1110                        (void *) &config_slaves[i])) {
1111                    fprintf(stderr, "error spawning render thread\n");
1112                    close(fd);
1113                    exit(7);
1114                }
1115            }
1116        }
1117    }
1118
1119    process_loop(fd);
1120
1121    unlink(config.socketname);
1122    close(fd);
1123    return 0;
1124}
Note: See TracBrowser for help on using the repository browser.