source: subversion/applications/utils/mod_tile/daemon.c @ 29283

Last change on this file since 29283 was 29283, checked in by apmon, 7 years ago

[renderd] Make renderd's TCP socket IPv6 compliant

File size: 40.3 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <arpa/inet.h>
9#include <netdb.h>
10#include <sys/select.h>
11#include <sys/stat.h>
12#include <sys/un.h>
13#include <poll.h>
14#include <errno.h>
15#include <pthread.h>
16#include <signal.h>
17#include <string.h>
18#include <strings.h>
19#include <syslog.h>
20#include <getopt.h>
21
22#include "render_config.h"
23#include "daemon.h"
24#include "gen_tile.h"
25#include "protocol.h"
26#include "dir_utils.h"
27
28#define PIDFILE "/var/run/renderd/renderd.pid"
29
30// extern "C" {
31#include "iniparser3.0b/src/iniparser.h"
32// }
33
34static pthread_t *render_threads;
35static pthread_t *slave_threads;
36static struct sigaction sigPipeAction;
37
38static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
39static struct item_idx * item_hashidx;
40static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
41static pthread_mutex_t qLock;
42static pthread_cond_t qCond;
43static int exit_pipe_fd;
44
45static stats_struct stats;
46static pthread_t stats_thread;
47
48static renderd_config config;
49
50int noSlaveRenders;
51int hashidxSize;
52
53void statsRenderFinish(int z, long time) {
54    pthread_mutex_lock(&qLock);
55    if ((z >= 0) && (z <= MAX_ZOOM)) {
56        stats.noZoomRender[z]++;
57        stats.timeZoomRender[z] += time;
58    }
59    pthread_mutex_unlock(&qLock);
60}
61
62struct item *fetch_request(void)
63{
64    struct item *item = NULL;
65
66    pthread_mutex_lock(&qLock);
67
68    while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
69        pthread_cond_wait(&qCond, &qLock);
70    }
71    if (reqPrioNum) {
72        item = reqPrioHead.next;
73        reqPrioNum--;
74        stats.noReqPrioRender++;
75    } else if (reqNum) {
76        item = reqHead.next;
77        reqNum--;
78        stats.noReqRender++;
79    } else if (dirtyNum) {
80        item = dirtyHead.next;
81        dirtyNum--;
82        stats.noDirtyRender++;
83    } else if (reqBulkNum) {
84        item = reqBulkHead.next;
85        reqBulkNum--;
86        stats.noReqBulkRender++;
87    }
88    if (item) {
89        item->next->prev = item->prev;
90        item->prev->next = item->next;
91
92        item->prev = &renderHead;
93        item->next = renderHead.next;
94        renderHead.next->prev = item;
95        renderHead.next = item;
96        item->inQueue = queueRender;
97    }
98
99
100    pthread_mutex_unlock(&qLock);
101
102    return item;
103}
104
105void clear_requests(int fd)
106{
107    struct item *item, *dupes, *queueHead;
108
109    /**Only need to look up on the shorter request and render queue
110      * so using the linear list shouldn't be a problem
111      */
112    pthread_mutex_lock(&qLock);
113    for (int i = 0; i < 4; i++) {
114        switch (i) {
115        case 0: { queueHead = &reqHead; break;}
116        case 1: { queueHead = &renderHead; break;}
117        case 2: { queueHead = &reqPrioHead; break;}
118        case 3: { queueHead = &reqBulkHead; break;}
119        }
120
121        item = queueHead->next;
122        while (item != queueHead) {
123            if (item->fd == fd)
124                item->fd = FD_INVALID;
125
126            dupes = item->duplicates;
127            while (dupes) {
128                if (dupes->fd == fd)
129                    dupes->fd = FD_INVALID;
130                dupes = dupes->duplicates;
131            }
132            item = item->next;
133        }
134    }
135
136    pthread_mutex_unlock(&qLock);
137}
138
139
140static int calcHashKey(struct item *item) {
141    uint64_t xmlnameHash = 0;
142    uint64_t key;
143    for (int i = 0; item->req.xmlname[i] != 0; i++) {
144        xmlnameHash += item->req.xmlname[i];
145    }
146    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
147    return key % hashidxSize;
148}
149
150void insert_item_idx(struct item *item) {
151    struct item_idx * nextItem;
152    struct item_idx * prevItem;
153
154    int key = calcHashKey(item);
155
156    if (item_hashidx[key].item == NULL) {
157        item_hashidx[key].item = item;
158    } else {
159        prevItem = &(item_hashidx[key]);
160        nextItem = item_hashidx[key].next;
161        while(nextItem) {
162            prevItem = nextItem;
163            nextItem = nextItem->next;
164        }
165        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
166        nextItem->item = item;
167        nextItem->next = NULL;
168        prevItem->next = nextItem;
169    }
170}
171
172void remove_item_idx(struct item * item) {
173    int key = calcHashKey(item);
174    struct item_idx * nextItem;
175    struct item_idx * prevItem;
176    struct item * test;
177    if (item_hashidx[key].item == NULL) {
178        //item not in index;
179        return;
180    }
181    prevItem = &(item_hashidx[key]);
182    nextItem = &(item_hashidx[key]);
183
184    while (nextItem != NULL) {
185        test = nextItem->item;
186        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
187                == test->req.z) && (!strcmp(item->req.xmlname,
188                test->req.xmlname))) {
189            /*
190             * Found item, removing it from list
191             */
192            nextItem->item = NULL;
193            if (nextItem->next != NULL) {
194                if (nextItem == &(item_hashidx[key])) {
195                    prevItem = nextItem->next;
196                    memcpy(&(item_hashidx[key]), nextItem->next,
197                            sizeof(struct item_idx));
198                    free(prevItem);
199                } else {
200                    prevItem->next = nextItem->next;
201                }
202            } else {
203                prevItem->next = NULL;
204            }
205
206            if (nextItem != &(item_hashidx[key])) {
207                free(nextItem);
208            }
209            return;
210        } else {
211            prevItem = nextItem;
212            nextItem = nextItem->next;
213        }
214    }
215}
216
217struct item * lookup_item_idx(struct item * item) {
218    struct item_idx * nextItem;
219    struct item * test;
220
221    int key = calcHashKey(item);
222
223    if (item_hashidx[key].item == NULL) {
224        return NULL;
225    } else {
226        nextItem = &(item_hashidx[key]);
227        while (nextItem != NULL) {
228            test = nextItem->item;
229            if ((item->mx == test->mx) && (item->my == test->my)
230                    && (item->req.z == test->req.z) && (!strcmp(
231                    item->req.xmlname, test->req.xmlname))) {
232                return test;
233            } else {
234                nextItem = nextItem->next;
235            }
236        }
237    }
238    return NULL;
239}
240
241static inline const char *cmdStr(enum protoCmd c)
242{
243    switch (c) {
244        case cmdIgnore:  return "Ignore";
245        case cmdRender:  return "Render";
246        case cmdRenderPrio:  return "RenderPrio";
247        case cmdRenderBulk:  return "RenderBulk";
248        case cmdDirty:   return "Dirty";
249        case cmdDone:    return "Done";
250        case cmdNotDone: return "NotDone";
251        default:         return "unknown";
252    }
253}
254
255void send_response(struct item *item, enum protoCmd rsp)
256{
257    struct protocol *req = &item->req;
258    int ret;
259
260    pthread_mutex_lock(&qLock);
261    item->next->prev = item->prev;
262    item->prev->next = item->next;
263    remove_item_idx(item);
264    pthread_mutex_unlock(&qLock);
265
266    while (item) {
267        struct item *prev = item;
268        req = &item->req;
269        if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
270            req->cmd = rsp;
271            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
272            ret = send(item->fd, req, sizeof(*req), 0);
273            if (ret != sizeof(*req))
274                perror("send error during send_done");
275        }
276        item = item->duplicates;
277        free(prev);
278    }
279}
280
281
282enum protoCmd pending(struct item *test)
283{
284    // check all queues and render list to see if this request already queued
285    // If so, add this new request as a duplicate
286    // call with qLock held
287    struct item *item;
288
289    item = lookup_item_idx(test);
290    if (item != NULL) {
291        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
292            test->duplicates = item->duplicates;
293            item->duplicates = test;
294            test->inQueue = queueDuplicate;
295            return cmdIgnore;
296        } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
297            return cmdNotDone;
298        }
299    }
300
301    return cmdRender;
302}
303
304
305enum protoCmd rx_request(const struct protocol *req, int fd)
306{
307    struct protocol *reqnew;
308    struct item *list = NULL, *item;
309    enum protoCmd pend;
310
311    // Upgrade version 1 to version 2
312    if (req->ver == 1) {
313        reqnew = (struct protocol *)malloc(sizeof(struct protocol));
314        memcpy(reqnew, req, sizeof(struct protocol_v1));
315        reqnew->xmlname[0] = 0;
316        req = reqnew;
317    }
318    else if (req->ver != 2) {
319        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
320        return cmdIgnore;
321    }
322
323    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
324            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
325
326    if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
327        return cmdIgnore;
328
329    if (check_xyz(req->x, req->y, req->z))
330        return cmdNotDone;
331
332    item = (struct item *)malloc(sizeof(*item));
333    if (!item) {
334            syslog(LOG_ERR, "malloc failed");
335            return cmdNotDone;
336    }
337
338    item->req = *req;
339    item->duplicates = NULL;
340    item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
341
342#ifdef METATILE
343    /* Round down request co-ordinates to the neareast N (should be a power of 2)
344     * Note: request path is no longer consistent but this will be recalculated
345     * when the metatile is being rendered.
346     */
347    item->mx = item->req.x & ~(METATILE-1);
348    item->my = item->req.y & ~(METATILE-1);
349#else
350    item->mx = item->req.x;
351    item->my = item->req.y;
352#endif
353
354    pthread_mutex_lock(&qLock);
355
356    // Check for a matching request in the current rendering or dirty queues
357    pend = pending(item);
358    if (pend == cmdNotDone) {
359        // We found a match in the dirty queue, can not wait for it
360        pthread_mutex_unlock(&qLock);
361        free(item);
362        return cmdNotDone;
363    }
364    if (pend == cmdIgnore) {
365        // Found a match in render queue, item added as duplicate
366        pthread_mutex_unlock(&qLock);
367        return cmdIgnore;
368    }
369
370    // New request, add it to render or dirty queue
371    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
372        list = &reqHead;
373        item->inQueue = queueRequest;
374        reqNum++;
375    } else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
376        list = &reqPrioHead;
377        item->inQueue = queueRequestPrio;
378        reqPrioNum++;
379    } else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
380        list = &reqBulkHead;
381        item->inQueue = queueRequestBulk;
382        reqBulkNum++;
383    } else if (dirtyNum < DIRTY_LIMIT) {
384        list = &dirtyHead;
385        item->inQueue = queueDirty;
386        dirtyNum++;
387        item->fd = FD_INVALID; // No response after render
388    } else {
389        // The queue is severely backlogged. Drop request
390        stats.noReqDroped++;
391        pthread_mutex_unlock(&qLock);
392        free(item);
393        return cmdNotDone;
394    }
395
396    if (list) {
397        item->next = list;
398        item->prev = list->prev;
399        item->prev->next = item;
400        list->prev = item;
401        /* In addition to the linked list, add item to a hash table index
402         * for faster lookup of pending requests.
403         */
404        insert_item_idx(item);
405
406        pthread_cond_signal(&qCond);
407    } else
408        free(item);
409
410    pthread_mutex_unlock(&qLock);
411
412    return (list == &reqHead)?cmdIgnore:cmdNotDone;
413}
414
415void request_exit(void)
416{
417  // Any write to the exit pipe will trigger a graceful exit
418  char c=0;
419  if (write(exit_pipe_fd, &c, sizeof(c)) < 0) {
420      fprintf(stderr, "Failed to write to the exit pipe: %s\n", strerror(errno));
421  }
422}
423
424void process_loop(int listen_fd)
425{
426    int num_connections = 0;
427    int connections[MAX_CONNECTIONS];
428    int pipefds[2];
429    int exit_pipe_read;
430
431    bzero(connections, sizeof(connections));
432
433    // A pipe is used to allow the render threads to request an exit by the main process
434    if (pipe(pipefds)) {
435      fprintf(stderr, "Failed to create pipe\n");
436      return;
437    }
438    exit_pipe_fd = pipefds[1];
439    exit_pipe_read = pipefds[0];
440
441    while (1) {
442        struct sockaddr_un in_addr;
443        socklen_t in_addrlen = sizeof(in_addr);
444        fd_set rd;
445        int incoming, num, nfds, i;
446
447        FD_ZERO(&rd);
448        FD_SET(listen_fd, &rd);
449        nfds = listen_fd+1;
450
451        for (i=0; i<num_connections; i++) {
452            FD_SET(connections[i], &rd);
453            nfds = MAX(nfds, connections[i]+1);
454        }
455
456        FD_SET(exit_pipe_read, &rd);
457        nfds = MAX(nfds, exit_pipe_read+1);
458
459        num = select(nfds, &rd, NULL, NULL, NULL);
460        if (num == -1)
461            perror("select()");
462        else if (num) {
463            if (FD_ISSET(exit_pipe_read, &rd)) {
464              // A render thread wants us to exit
465              break;
466            }
467
468            //printf("Data is available now on %d fds\n", num);
469            if (FD_ISSET(listen_fd, &rd)) {
470                num--;
471                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
472                if (incoming < 0) {
473                    perror("accept()");
474                } else {
475                    if (num_connections == MAX_CONNECTIONS) {
476                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
477                        close(incoming);
478                    } else {
479                        connections[num_connections++] = incoming;
480                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
481                    }
482                }
483            }
484            for (i=0; num && (i<num_connections); i++) {
485                int fd = connections[i];
486                if (FD_ISSET(fd, &rd)) {
487                    struct protocol cmd;
488                    int ret;
489
490                    // TODO: to get highest performance we should loop here until we get EAGAIN
491                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
492                    if (ret == sizeof(cmd)) {
493                        enum protoCmd rsp = rx_request(&cmd, fd);
494
495                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
496                            cmd.cmd = rsp;
497                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
498                            ret = send(fd, &cmd, sizeof(cmd), 0);
499                            if (ret != sizeof(cmd))
500                                perror("response send error");
501                        }
502                    } else if (!ret) {
503                        int j;
504
505                        num_connections--;
506                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
507                        for (j=i; j < num_connections; j++)
508                            connections[j] = connections[j+1];
509                        clear_requests(fd);
510                        close(fd);
511                    } else {
512                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
513                        break;
514                    }
515                }
516            }
517        } else {
518            syslog(LOG_ERR, "Select timeout");
519        }
520    }
521}
522
523/**
524 * Periodically write out current stats to a stats file. This information
525 * can then be used to monitor performance of renderd e.g. with a munin plugin
526 */
527void *stats_writeout_thread(void * arg) {
528    stats_struct lStats;
529    int dirtQueueLength;
530    int reqQueueLength;
531    int reqPrioQueueLength;
532    int reqBulkQueueLength;
533        int i;
534
535    int noFailedAttempts = 0;
536    char tmpName[PATH_MAX];
537
538    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
539
540    syslog(LOG_DEBUG, "Starting stats thread");
541    while (1) {
542        pthread_mutex_lock(&qLock);
543        memcpy(&lStats, &stats, sizeof(stats_struct));
544        dirtQueueLength = dirtyNum;
545        reqQueueLength = reqNum;
546        reqPrioQueueLength = reqPrioNum;
547        reqBulkQueueLength = reqBulkNum;
548        pthread_mutex_unlock(&qLock);
549
550        FILE * statfile = fopen(tmpName, "w");
551        if (statfile == NULL) {
552            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
553            noFailedAttempts++;
554            if (noFailedAttempts > 3) {
555                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
556                break;
557            }
558            continue;
559        } else {
560            noFailedAttempts = 0;
561            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
562            fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
563            fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
564            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
565            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
566            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
567            fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
568            fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
569            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
570            for (i = 0; i <= MAX_ZOOM; i++) {
571                fprintf(statfile,"ZoomRendered%02i: %li\n", i, lStats.noZoomRender[i]);
572            }
573            for (i = 0; i <= MAX_ZOOM; i++) {
574                fprintf(statfile,"TimeRenderedZoom%02i: %li\n", i, lStats.timeZoomRender[i]);
575            }
576            fclose(statfile);
577            if (rename(tmpName, config.stats_filename)) {
578                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
579                noFailedAttempts++;
580                if (noFailedAttempts > 3) {
581                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
582                    break;
583                }
584                continue;
585            }
586        }
587        sleep(10);
588    }
589    return NULL;
590}
591
592int client_socket_init(renderd_config * sConfig) {
593    int fd, s;
594    struct sockaddr_un * addrU;
595    struct addrinfo hints; 
596    struct addrinfo *result, *rp;
597    char portnum[16]; 
598    char ipstring[INET6_ADDRSTRLEN];
599
600    if (sConfig->ipport > 0) {
601        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i", sConfig->iphostname, sConfig->ipport);
602       
603        memset(&hints, 0, sizeof(struct addrinfo)); 
604        hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */ 
605        hints.ai_socktype = SOCK_STREAM; /* TCP socket */ 
606        hints.ai_flags = 0; 
607        hints.ai_protocol = 0;          /* Any protocol */ 
608        hints.ai_canonname = NULL; 
609        hints.ai_addr = NULL; 
610        hints.ai_next = NULL; 
611        sprintf(portnum,"%i", sConfig->ipport);
612
613        s = getaddrinfo(sConfig->iphostname, portnum, &hints, &result); 
614        if (s != 0) { 
615            syslog(LOG_INFO, "failed to resolve hostname of rendering slave"); 
616            return FD_INVALID; 
617        }
618       
619        /* getaddrinfo() returns a list of address structures.
620           Try each address until we successfully connect. */ 
621        for (rp = result; rp != NULL; rp = rp->ai_next) { 
622            inet_ntop(rp->ai_family, rp->ai_family == AF_INET ? &(((struct sockaddr_in *)rp->ai_addr)->sin_addr) : 
623                      &(((struct sockaddr_in6 *)rp->ai_addr)->sin6_addr) , ipstring, rp->ai_addrlen); 
624            syslog(LOG_DEBUG, "Connecting TCP socket to rendering daemon at %s", ipstring); 
625            fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); 
626            if (fd < 0) 
627                continue; 
628            if (connect(fd, rp->ai_addr, rp->ai_addrlen) != 0) { 
629                syslog(LOG_INFO, "failed to connect to rendering daemon (%s), trying next ip", ipstring); 
630                close(fd);
631                fd = -1;
632                continue;
633            } else { 
634                break; 
635            } 
636        }
637        freeaddrinfo(result);
638       
639        if (fd < 0) { 
640            syslog(LOG_WARNING, "failed to connect to %s:%i", sConfig->iphostname, sConfig->ipport); 
641            return FD_INVALID; 
642        }
643
644        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport, fd);
645    } else {
646        syslog(LOG_INFO, "Initialising unix client socket on %s",
647                sConfig->socketname);
648        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
649        fd = socket(PF_UNIX, SOCK_STREAM, 0);
650        if (fd < 0) {
651            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
652            return FD_INVALID;
653        }
654
655        bzero(addrU, sizeof(struct sockaddr_un));
656        addrU->sun_family = AF_UNIX;
657        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
658
659        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
660            syslog(LOG_WARNING, "socket connect failed for: %s",
661                    sConfig->socketname);
662            close(fd);
663            return FD_INVALID;
664        }
665        free(addrU);
666        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
667                fd);
668    }
669    return fd;
670}
671
672int server_socket_init(renderd_config *sConfig) {
673    struct sockaddr_un addrU;
674    struct sockaddr_in6 addrI;
675    mode_t old;
676    int fd;
677
678    if (sConfig->ipport > 0) {
679        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
680                sConfig->iphostname, sConfig->ipport);
681        fd = socket(PF_INET6, SOCK_STREAM, 0);
682        if (fd < 0) {
683            fprintf(stderr, "failed to create IP socket\n");
684            exit(2);
685        }
686        bzero(&addrI, sizeof(addrI));
687        addrI.sin6_family = AF_INET6;
688        addrI.sin6_addr = in6addr_any;
689        addrI.sin6_port = htons(sConfig->ipport);
690        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
691            fprintf(stderr, "socket bind failed for: %s:%i\n",
692                    sConfig->iphostname, sConfig->ipport);
693            close(fd);
694            exit(3);
695        }
696    } else {
697        syslog(LOG_INFO, "Initialising unix server socket on %s",
698                sConfig->socketname);
699
700        fd = socket(PF_UNIX, SOCK_STREAM, 0);
701        if (fd < 0) {
702            fprintf(stderr, "failed to create unix socket\n");
703            exit(2);
704        }
705
706        bzero(&addrU, sizeof(addrU));
707        addrU.sun_family = AF_UNIX;
708        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
709
710        unlink(addrU.sun_path);
711
712        old = umask(0); // Need daemon socket to be writeable by apache
713        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
714            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
715            close(fd);
716            exit(3);
717        }
718        umask(old);
719    }
720
721    if (listen(fd, QUEUE_MAX) < 0) {
722        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
723        close(fd);
724        exit(4);
725    }
726
727    syslog(LOG_DEBUG, "Created server socket %i", fd);
728
729    return fd;
730
731}
732
733/**
734 * This function is used as a the start function for the slave renderer thread.
735 * It pulls a request from the central queue of requests and dispatches it to
736 * the slave renderer. It then blocks and waits for the response with no timeout.
737 * As it only sends one request at a time (there are as many slave_thread threads as there
738 * are rendering threads on the slaves) nothing gets queued on the slave and should get
739 * rendererd immediately. Thus overall, requests should be nicely load balanced between
740 * all the rendering threads available both locally and in the slaves.
741 */
742void *slave_thread(void * arg) {
743    renderd_config * sConfig = (renderd_config *) arg;
744
745    int pfd = FD_INVALID;
746    int retry;
747    size_t ret_size;
748
749    struct protocol * resp;
750    struct protocol * req_slave;
751
752    req_slave = (struct protocol *)malloc(sizeof(struct protocol));
753    resp = (struct protocol *)malloc(sizeof(struct protocol));
754    bzero(req_slave, sizeof(struct protocol));
755    bzero(resp, sizeof(struct protocol));
756
757    while (1) {
758        if (pfd == FD_INVALID) {
759            pfd = client_socket_init(sConfig);
760            if (pfd == FD_INVALID) {
761                if (sConfig->ipport > 0) {
762                    syslog(LOG_ERR,
763                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
764                        sConfig->iphostname, sConfig->ipport);
765                } else {
766                    syslog( LOG_ERR,
767                            "Failed to connect to render slave %s, trying again in 30 seconds",
768                            sConfig->socketname);
769                }
770                sleep(30);
771                continue;
772            }
773        }
774
775        enum protoCmd ret;
776        struct item *item = fetch_request();
777        if (item) {
778            struct protocol *req = &item->req;
779            req_slave->ver = PROTO_VER;
780            req_slave->cmd = cmdRender;
781            strcpy(req_slave->xmlname, req->xmlname);
782            req_slave->x = req->x;
783            req_slave->y = req->y;
784            req_slave->z = req->z;
785
786            /*Dispatch request to slave renderd*/
787            retry = 2;
788            syslog(LOG_INFO,
789                    "Dispatching request to slave thread on fd %i", pfd);
790            do {
791                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
792
793                if (ret_size == sizeof(struct protocol)) {
794                    //correctly sent command to slave
795                    break;
796                }
797
798                if (errno != EPIPE) {
799                    syslog(LOG_ERR,
800                            "Failed to send cmd to render slave, shutting down thread");
801                    return NULL;
802                }
803
804                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
805                close(pfd);
806                pfd = client_socket_init(sConfig);
807                if (pfd == FD_INVALID) {
808                    syslog(LOG_ERR,
809                            "Failed to re-connect to render slave, dropping request");
810                    ret = cmdNotDone;
811                    send_response(item, ret);
812                    break;
813                }
814            } while (retry--);
815            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
816                continue;
817            }
818
819            ret_size = 0;
820            retry = 10;
821            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
822                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
823                        - ret_size), 0);
824                if ((errno == EPIPE) || ret_size == 0) {
825                    close(pfd);
826                    pfd = FD_INVALID;
827                    ret_size = 0;
828                    syslog(LOG_ERR, "Pipe to render slave closed");
829                    break;
830                }
831                retry--;
832            }
833            if (ret_size < sizeof(struct protocol)) {
834                if (sConfig->ipport > 0) {
835                    syslog( LOG_ERR,
836                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
837                            sConfig->iphostname, sConfig->ipport);
838                } else {
839                    syslog( LOG_ERR,
840                            "Invalid reply render slave %s, trying again in 30 seconds",
841                            sConfig->socketname);
842                }
843
844                ret = cmdNotDone;
845                send_response(item, ret);
846                sleep(30);
847            } else {
848                ret = resp->cmd;
849                send_response(item, ret);
850                if (resp->cmd != cmdDone) {
851                    if (sConfig->ipport > 0) {
852                        syslog( LOG_ERR,
853                                "Request from render slave %s:%i did not complete correctly",
854                                sConfig->iphostname, sConfig->ipport);
855                    } else {
856                        syslog( LOG_ERR,
857                                "Request from render slave %s did not complete correctly",
858                                sConfig->socketname);
859                    }
860                    //Sleep for a while to make sure we don't overload the renderer
861                    //This only happens if it didn't correctly block on the rendering
862                    //request
863                    sleep(30);
864                }
865            }
866
867        } else {
868            sleep(1); // TODO: Use an event to indicate there are new requests
869        }
870    }
871    return NULL;
872}
873
874
875int main(int argc, char **argv)
876{
877    int fd, i, j, k;
878
879    int c;
880    int foreground=0;
881    int active_slave=0;
882    int log_options;
883    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
884
885    while (1) {
886        int option_index = 0;
887        static struct option long_options[] = {
888            {"config", 1, 0, 'c'},
889            {"foreground", 1, 0, 'f'},
890            {"slave", 1, 0, 's'},
891            {"help", 0, 0, 'h'},
892            {0, 0, 0, 0}
893        };
894
895        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
896        if (c == -1)
897            break;
898
899        switch (c) {
900            case 'f':
901                foreground=1;
902                break;
903            case 'c':
904                strncpy(config_file_name, optarg, PATH_MAX-1);
905                config_file_name[PATH_MAX-1] = 0;
906                break;
907            case 's':
908                if (sscanf(optarg, "%i", &active_slave) != 1) {
909                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
910                    active_slave = 0;
911                }
912                break;
913            case 'h':
914                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
915                fprintf(stderr, "Mapnik rendering daemon\n");
916                fprintf(stderr, "  -f, --foreground     run in foreground\n");
917                fprintf(stderr, "  -h, --help           display this help and exit\n");
918                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
919                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
920                exit(0);
921            default:
922                fprintf(stderr, "unknown config option '%c'\n", c);
923                exit(1);
924        }
925    }
926
927    log_options = LOG_PID;
928#ifdef LOG_PERROR
929    if (foreground)
930        log_options |= LOG_PERROR;
931#endif
932    openlog("renderd", log_options, LOG_DAEMON);
933
934    syslog(LOG_INFO, "Rendering daemon started");
935
936    pthread_mutex_init(&qLock, NULL);
937    pthread_cond_init(&qCond, NULL);
938    reqHead.next = reqHead.prev = &reqHead;
939    reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
940    reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
941    dirtyHead.next = dirtyHead.prev = &dirtyHead;
942    renderHead.next = renderHead.prev = &renderHead;
943    hashidxSize = HASHIDX_SIZE;
944    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
945    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
946
947    stats.noDirtyRender = 0;
948    stats.noReqDroped = 0;
949    stats.noReqRender = 0;
950    stats.noReqPrioRender = 0;
951    stats.noReqBulkRender = 0;
952
953    xmlconfigitem maps[XMLCONFIGS_MAX];
954    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
955
956    renderd_config config_slaves[MAX_SLAVES];
957    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
958    bzero(&config, sizeof(renderd_config));
959
960    dictionary *ini = iniparser_load(config_file_name);
961    if (! ini) {
962        exit(1);
963    }
964
965    noSlaveRenders = 0;
966
967    int iconf = -1;
968    char buffer[PATH_MAX];
969    for (int section=0; section < iniparser_getnsec(ini); section++) {
970        char *name = iniparser_getsecname(ini, section);
971        syslog(LOG_INFO, "Parsing section %s\n", name);
972        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
973            if (config.tile_dir == NULL) {
974                fprintf(stderr, "No valid (active) renderd config section available\n");
975                exit(7);
976            }
977            /* this is a map section */
978            iconf++;
979            if (strlen(name) >= XMLCONFIG_MAX) {
980                fprintf(stderr, "XML name too long: %s\n", name);
981                exit(7);
982            }
983
984            strcpy(maps[iconf].xmlname, name);
985            if (iconf >= XMLCONFIGS_MAX) {
986                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
987                exit(7);
988            }
989            sprintf(buffer, "%s:uri", name);
990            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
991            if (strlen(ini_uri) >= PATH_MAX) {
992                fprintf(stderr, "URI too long: %s\n", ini_uri);
993                exit(7);
994            }
995            strcpy(maps[iconf].xmluri, ini_uri);
996            sprintf(buffer, "%s:xml", name);
997            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
998            if (strlen(ini_xmlpath) >= PATH_MAX){
999                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
1000                exit(7);
1001            }
1002            sprintf(buffer, "%s:host", name);
1003            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
1004            if (strlen(ini_hostname) >= PATH_MAX) {
1005                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
1006                exit(7);
1007            }
1008
1009            sprintf(buffer, "%s:htcphost", name);
1010            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
1011            if (strlen(ini_htcpip) >= PATH_MAX) {
1012                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
1013                exit(7);
1014            }
1015            strcpy(maps[iconf].xmlfile, ini_xmlpath);
1016            strcpy(maps[iconf].tile_dir, config.tile_dir);
1017            strcpy(maps[iconf].host, ini_hostname);
1018            strcpy(maps[iconf].htcpip, ini_htcpip);
1019        } else if (strncmp(name, "renderd", 7) == 0) {
1020            int render_sec = 0;
1021            if (sscanf(name, "renderd%i", &render_sec) != 1) {
1022                render_sec = 0;
1023            }
1024            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
1025            if (render_sec >= MAX_SLAVES) {
1026                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
1027                        MAX_SLAVES);
1028                exit(7);
1029            }
1030            sprintf(buffer, "%s:socketname", name);
1031            config_slaves[render_sec].socketname = iniparser_getstring(ini,
1032                    buffer, (char *) RENDER_SOCKET);
1033            sprintf(buffer, "%s:iphostname", name);
1034            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
1035                    buffer, "");
1036            sprintf(buffer, "%s:ipport", name);
1037            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
1038            sprintf(buffer, "%s:num_threads", name);
1039            config_slaves[render_sec].num_threads = iniparser_getint(ini,
1040                    buffer, NUM_THREADS);
1041            sprintf(buffer, "%s:tile_dir", name);
1042            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
1043                    buffer, (char *) HASH_PATH);
1044            sprintf(buffer, "%s:stats_file", name);
1045            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
1046                    buffer, NULL);
1047
1048            if (render_sec == active_slave) {
1049                config.socketname = config_slaves[render_sec].socketname;
1050                config.iphostname = config_slaves[render_sec].iphostname;
1051                config.ipport = config_slaves[render_sec].ipport;
1052                config.num_threads = config_slaves[render_sec].num_threads;
1053                config.tile_dir = config_slaves[render_sec].tile_dir;
1054                config.stats_filename
1055                        = config_slaves[render_sec].stats_filename;
1056                config.mapnik_plugins_dir = iniparser_getstring(ini,
1057                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
1058                config.mapnik_font_dir = iniparser_getstring(ini,
1059                        "mapnik:font_dir", (char *) FONT_DIR);
1060                config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
1061                        "mapnik:font_dir_recurse", FONT_RECURSE);
1062            } else {
1063                noSlaveRenders += config_slaves[render_sec].num_threads;
1064            }
1065        }
1066    }
1067
1068    if (config.ipport > 0) {
1069        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
1070    } else {
1071        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
1072    }
1073    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
1074    if (active_slave == 0) {
1075        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
1076    }
1077    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
1078    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
1079    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
1080    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
1081    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
1082    for (int i = 0; i < MAX_SLAVES; i++) {
1083        if (config_slaves[i].num_threads == 0) {
1084            continue;
1085        }
1086        if (i == active_slave) {
1087            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
1088        }
1089        if (config_slaves[i].ipport > 0) {
1090                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
1091                        config_slaves[i].iphostname, config_slaves[i].ipport);
1092            } else {
1093                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
1094                        config_slaves[i].socketname);
1095            }
1096        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1097                config_slaves[i].num_threads);
1098        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1099                config_slaves[i].tile_dir);
1100        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1101                config_slaves[i].stats_filename);
1102    }
1103
1104    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1105        if (maps[iconf].xmlname[0] != 0) {
1106         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1107                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1108                 maps[iconf].htcpip, maps[iconf].host);
1109        }
1110    }
1111
1112    fd = server_socket_init(&config);
1113#if 0
1114    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1115        fprintf(stderr, "setting socket non-block failed\n");
1116        close(fd);
1117        exit(5);
1118    }
1119#endif
1120
1121    //sigPipeAction.sa_handler = pipe_handler;
1122    sigPipeAction.sa_handler = SIG_IGN;
1123    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1124        fprintf(stderr, "failed to register signal handler\n");
1125        close(fd);
1126        exit(6);
1127    }
1128
1129    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1130
1131    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1132    if (foreground) {
1133        fprintf(stderr, "Running in foreground mode...\n");
1134    } else {
1135        if (daemon(0, 0) != 0) {
1136            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1137        }
1138        /* write pid file */
1139        FILE *pidfile = fopen(PIDFILE, "w");
1140        if (pidfile) {
1141            (void) fprintf(pidfile, "%d\n", getpid());
1142            (void) fclose(pidfile);
1143        }
1144    }
1145
1146    if (config.stats_filename != NULL) {
1147        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1148            syslog(LOG_WARNING, "Could not create stats writeout thread");
1149        }
1150    } else {
1151        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1152    }
1153
1154    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1155
1156    for(i=0; i<config.num_threads; i++) {
1157        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1158            fprintf(stderr, "error spawning render thread\n");
1159            close(fd);
1160            exit(7);
1161        }
1162    }
1163
1164    if (active_slave == 0) {
1165        //Only the master renderd opens connections to its slaves
1166        k = 0;
1167        slave_threads
1168                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1169        for (i = 1; i < MAX_SLAVES; i++) {
1170            for (j = 0; j < config_slaves[i].num_threads; j++) {
1171                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1172                        (void *) &config_slaves[i])) {
1173                    fprintf(stderr, "error spawning render thread\n");
1174                    close(fd);
1175                    exit(7);
1176                }
1177            }
1178        }
1179    }
1180
1181    process_loop(fd);
1182
1183    unlink(config.socketname);
1184    close(fd);
1185    return 0;
1186}
Note: See TracBrowser for help on using the repository browser.