source: subversion/applications/utils/mod_tile/daemon.c @ 29196

Last change on this file since 29196 was 28680, checked in by apmon, 7 years ago

[mod_tile] bzero is defined in <strings.h> but was not included

On Linux strings.h must have been included through some other header,
but on solaris it isn't. So it needs to explicitly be included.

File size: 39.3 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdint.h>
4#include <unistd.h>
5#include <sys/types.h>
6#include <sys/socket.h>
7#include <netinet/in.h>
8#include <netdb.h>
9#include <sys/select.h>
10#include <sys/stat.h>
11#include <sys/un.h>
12#include <poll.h>
13#include <errno.h>
14#include <pthread.h>
15#include <signal.h>
16#include <string.h>
17#include <strings.h>
18#include <syslog.h>
19#include <getopt.h>
20
21#include "render_config.h"
22#include "daemon.h"
23#include "gen_tile.h"
24#include "protocol.h"
25#include "dir_utils.h"
26
27#define PIDFILE "/var/run/renderd/renderd.pid"
28
29// extern "C" {
30#include "iniparser3.0b/src/iniparser.h"
31// }
32
33static pthread_t *render_threads;
34static pthread_t *slave_threads;
35static struct sigaction sigPipeAction;
36
37static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
38static struct item_idx * item_hashidx;
39static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
40static pthread_mutex_t qLock;
41static pthread_cond_t qCond;
42static int exit_pipe_fd;
43
44static stats_struct stats;
45static pthread_t stats_thread;
46
47static renderd_config config;
48
49int noSlaveRenders;
50int hashidxSize;
51
52void statsRenderFinish(int z, long time) {
53    pthread_mutex_lock(&qLock);
54    if ((z >= 0) && (z <= MAX_ZOOM)) {
55        stats.noZoomRender[z]++;
56        stats.timeZoomRender[z] += time;
57    }
58    pthread_mutex_unlock(&qLock);
59}
60
61struct item *fetch_request(void)
62{
63    struct item *item = NULL;
64
65    pthread_mutex_lock(&qLock);
66
67    while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
68        pthread_cond_wait(&qCond, &qLock);
69    }
70    if (reqPrioNum) {
71        item = reqPrioHead.next;
72        reqPrioNum--;
73        stats.noReqPrioRender++;
74    } else if (reqNum) {
75        item = reqHead.next;
76        reqNum--;
77        stats.noReqRender++;
78    } else if (dirtyNum) {
79        item = dirtyHead.next;
80        dirtyNum--;
81        stats.noDirtyRender++;
82    } else if (reqBulkNum) {
83        item = reqBulkHead.next;
84        reqBulkNum--;
85        stats.noReqBulkRender++;
86    }
87    if (item) {
88        item->next->prev = item->prev;
89        item->prev->next = item->next;
90
91        item->prev = &renderHead;
92        item->next = renderHead.next;
93        renderHead.next->prev = item;
94        renderHead.next = item;
95        item->inQueue = queueRender;
96    }
97
98
99    pthread_mutex_unlock(&qLock);
100
101    return item;
102}
103
104void clear_requests(int fd)
105{
106    struct item *item, *dupes, *queueHead;
107
108    /**Only need to look up on the shorter request and render queue
109      * so using the linear list shouldn't be a problem
110      */
111    pthread_mutex_lock(&qLock);
112    for (int i = 0; i < 4; i++) {
113        switch (i) {
114        case 0: { queueHead = &reqHead; break;}
115        case 1: { queueHead = &renderHead; break;}
116        case 2: { queueHead = &reqPrioHead; break;}
117        case 3: { queueHead = &reqBulkHead; break;}
118        }
119
120        item = queueHead->next;
121        while (item != queueHead) {
122            if (item->fd == fd)
123                item->fd = FD_INVALID;
124
125            dupes = item->duplicates;
126            while (dupes) {
127                if (dupes->fd == fd)
128                    dupes->fd = FD_INVALID;
129                dupes = dupes->duplicates;
130            }
131            item = item->next;
132        }
133    }
134
135    pthread_mutex_unlock(&qLock);
136}
137
138
139static int calcHashKey(struct item *item) {
140    uint64_t xmlnameHash = 0;
141    uint64_t key;
142    for (int i = 0; item->req.xmlname[i] != 0; i++) {
143        xmlnameHash += item->req.xmlname[i];
144    }
145    key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
146    return key % hashidxSize;
147}
148
149void insert_item_idx(struct item *item) {
150    struct item_idx * nextItem;
151    struct item_idx * prevItem;
152
153    int key = calcHashKey(item);
154
155    if (item_hashidx[key].item == NULL) {
156        item_hashidx[key].item = item;
157    } else {
158        prevItem = &(item_hashidx[key]);
159        nextItem = item_hashidx[key].next;
160        while(nextItem) {
161            prevItem = nextItem;
162            nextItem = nextItem->next;
163        }
164        nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
165        nextItem->item = item;
166        nextItem->next = NULL;
167        prevItem->next = nextItem;
168    }
169}
170
171void remove_item_idx(struct item * item) {
172    int key = calcHashKey(item);
173    struct item_idx * nextItem;
174    struct item_idx * prevItem;
175    struct item * test;
176    if (item_hashidx[key].item == NULL) {
177        //item not in index;
178        return;
179    }
180    prevItem = &(item_hashidx[key]);
181    nextItem = &(item_hashidx[key]);
182
183    while (nextItem != NULL) {
184        test = nextItem->item;
185        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
186                == test->req.z) && (!strcmp(item->req.xmlname,
187                test->req.xmlname))) {
188            /*
189             * Found item, removing it from list
190             */
191            nextItem->item = NULL;
192            if (nextItem->next != NULL) {
193                if (nextItem == &(item_hashidx[key])) {
194                    prevItem = nextItem->next;
195                    memcpy(&(item_hashidx[key]), nextItem->next,
196                            sizeof(struct item_idx));
197                    free(prevItem);
198                } else {
199                    prevItem->next = nextItem->next;
200                }
201            } else {
202                prevItem->next = NULL;
203            }
204
205            if (nextItem != &(item_hashidx[key])) {
206                free(nextItem);
207            }
208            return;
209        } else {
210            prevItem = nextItem;
211            nextItem = nextItem->next;
212        }
213    }
214}
215
216struct item * lookup_item_idx(struct item * item) {
217    struct item_idx * nextItem;
218    struct item * test;
219
220    int key = calcHashKey(item);
221
222    if (item_hashidx[key].item == NULL) {
223        return NULL;
224    } else {
225        nextItem = &(item_hashidx[key]);
226        while (nextItem != NULL) {
227            test = nextItem->item;
228            if ((item->mx == test->mx) && (item->my == test->my)
229                    && (item->req.z == test->req.z) && (!strcmp(
230                    item->req.xmlname, test->req.xmlname))) {
231                return test;
232            } else {
233                nextItem = nextItem->next;
234            }
235        }
236    }
237    return NULL;
238}
239
240static inline const char *cmdStr(enum protoCmd c)
241{
242    switch (c) {
243        case cmdIgnore:  return "Ignore";
244        case cmdRender:  return "Render";
245        case cmdRenderPrio:  return "RenderPrio";
246        case cmdRenderBulk:  return "RenderBulk";
247        case cmdDirty:   return "Dirty";
248        case cmdDone:    return "Done";
249        case cmdNotDone: return "NotDone";
250        default:         return "unknown";
251    }
252}
253
254void send_response(struct item *item, enum protoCmd rsp)
255{
256    struct protocol *req = &item->req;
257    int ret;
258
259    pthread_mutex_lock(&qLock);
260    item->next->prev = item->prev;
261    item->prev->next = item->next;
262    remove_item_idx(item);
263    pthread_mutex_unlock(&qLock);
264
265    while (item) {
266        struct item *prev = item;
267        req = &item->req;
268        if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
269            req->cmd = rsp;
270            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
271            ret = send(item->fd, req, sizeof(*req), 0);
272            if (ret != sizeof(*req))
273                perror("send error during send_done");
274        }
275        item = item->duplicates;
276        free(prev);
277    }
278}
279
280
281enum protoCmd pending(struct item *test)
282{
283    // check all queues and render list to see if this request already queued
284    // If so, add this new request as a duplicate
285    // call with qLock held
286    struct item *item;
287
288    item = lookup_item_idx(test);
289    if (item != NULL) {
290        if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
291            test->duplicates = item->duplicates;
292            item->duplicates = test;
293            test->inQueue = queueDuplicate;
294            return cmdIgnore;
295        } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
296            return cmdNotDone;
297        }
298    }
299
300    return cmdRender;
301}
302
303
304enum protoCmd rx_request(const struct protocol *req, int fd)
305{
306    struct protocol *reqnew;
307    struct item *list = NULL, *item;
308    enum protoCmd pend;
309
310    // Upgrade version 1 to version 2
311    if (req->ver == 1) {
312        reqnew = (struct protocol *)malloc(sizeof(struct protocol));
313        memcpy(reqnew, req, sizeof(struct protocol_v1));
314        reqnew->xmlname[0] = 0;
315        req = reqnew;
316    }
317    else if (req->ver != 2) {
318        syslog(LOG_ERR, "Bad protocol version %d", req->ver);
319        return cmdIgnore;
320    }
321
322    syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
323            cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
324
325    if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
326        return cmdIgnore;
327
328    if (check_xyz(req->x, req->y, req->z))
329        return cmdNotDone;
330
331    item = (struct item *)malloc(sizeof(*item));
332    if (!item) {
333            syslog(LOG_ERR, "malloc failed");
334            return cmdNotDone;
335    }
336
337    item->req = *req;
338    item->duplicates = NULL;
339    item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
340
341#ifdef METATILE
342    /* Round down request co-ordinates to the neareast N (should be a power of 2)
343     * Note: request path is no longer consistent but this will be recalculated
344     * when the metatile is being rendered.
345     */
346    item->mx = item->req.x & ~(METATILE-1);
347    item->my = item->req.y & ~(METATILE-1);
348#else
349    item->mx = item->req.x;
350    item->my = item->req.y;
351#endif
352
353    pthread_mutex_lock(&qLock);
354
355    // Check for a matching request in the current rendering or dirty queues
356    pend = pending(item);
357    if (pend == cmdNotDone) {
358        // We found a match in the dirty queue, can not wait for it
359        pthread_mutex_unlock(&qLock);
360        free(item);
361        return cmdNotDone;
362    }
363    if (pend == cmdIgnore) {
364        // Found a match in render queue, item added as duplicate
365        pthread_mutex_unlock(&qLock);
366        return cmdIgnore;
367    }
368
369    // New request, add it to render or dirty queue
370    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
371        list = &reqHead;
372        item->inQueue = queueRequest;
373        reqNum++;
374    } else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
375        list = &reqPrioHead;
376        item->inQueue = queueRequestPrio;
377        reqPrioNum++;
378    } else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
379        list = &reqBulkHead;
380        item->inQueue = queueRequestBulk;
381        reqBulkNum++;
382    } else if (dirtyNum < DIRTY_LIMIT) {
383        list = &dirtyHead;
384        item->inQueue = queueDirty;
385        dirtyNum++;
386        item->fd = FD_INVALID; // No response after render
387    } else {
388        // The queue is severely backlogged. Drop request
389        stats.noReqDroped++;
390        pthread_mutex_unlock(&qLock);
391        free(item);
392        return cmdNotDone;
393    }
394
395    if (list) {
396        item->next = list;
397        item->prev = list->prev;
398        item->prev->next = item;
399        list->prev = item;
400        /* In addition to the linked list, add item to a hash table index
401         * for faster lookup of pending requests.
402         */
403        insert_item_idx(item);
404
405        pthread_cond_signal(&qCond);
406    } else
407        free(item);
408
409    pthread_mutex_unlock(&qLock);
410
411    return (list == &reqHead)?cmdIgnore:cmdNotDone;
412}
413
414void request_exit(void)
415{
416  // Any write to the exit pipe will trigger a graceful exit
417  char c=0;
418  if (write(exit_pipe_fd, &c, sizeof(c)) < 0) {
419      fprintf(stderr, "Failed to write to the exit pipe: %s\n", strerror(errno));
420  }
421}
422
423void process_loop(int listen_fd)
424{
425    int num_connections = 0;
426    int connections[MAX_CONNECTIONS];
427    int pipefds[2];
428    int exit_pipe_read;
429
430    bzero(connections, sizeof(connections));
431
432    // A pipe is used to allow the render threads to request an exit by the main process
433    if (pipe(pipefds)) {
434      fprintf(stderr, "Failed to create pipe\n");
435      return;
436    }
437    exit_pipe_fd = pipefds[1];
438    exit_pipe_read = pipefds[0];
439
440    while (1) {
441        struct sockaddr_un in_addr;
442        socklen_t in_addrlen = sizeof(in_addr);
443        fd_set rd;
444        int incoming, num, nfds, i;
445
446        FD_ZERO(&rd);
447        FD_SET(listen_fd, &rd);
448        nfds = listen_fd+1;
449
450        for (i=0; i<num_connections; i++) {
451            FD_SET(connections[i], &rd);
452            nfds = MAX(nfds, connections[i]+1);
453        }
454
455        FD_SET(exit_pipe_read, &rd);
456        nfds = MAX(nfds, exit_pipe_read+1);
457
458        num = select(nfds, &rd, NULL, NULL, NULL);
459        if (num == -1)
460            perror("select()");
461        else if (num) {
462            if (FD_ISSET(exit_pipe_read, &rd)) {
463              // A render thread wants us to exit
464              break;
465            }
466
467            //printf("Data is available now on %d fds\n", num);
468            if (FD_ISSET(listen_fd, &rd)) {
469                num--;
470                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
471                if (incoming < 0) {
472                    perror("accept()");
473                } else {
474                    if (num_connections == MAX_CONNECTIONS) {
475                        syslog(LOG_WARNING, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
476                        close(incoming);
477                    } else {
478                        connections[num_connections++] = incoming;
479                        syslog(LOG_DEBUG, "DEBUG: Got incoming connection, fd %d, number %d\n", incoming, num_connections);
480                    }
481                }
482            }
483            for (i=0; num && (i<num_connections); i++) {
484                int fd = connections[i];
485                if (FD_ISSET(fd, &rd)) {
486                    struct protocol cmd;
487                    int ret;
488
489                    // TODO: to get highest performance we should loop here until we get EAGAIN
490                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
491                    if (ret == sizeof(cmd)) {
492                        enum protoCmd rsp = rx_request(&cmd, fd);
493
494                        if ((cmd.cmd == cmdRender) && (rsp == cmdNotDone)) {
495                            cmd.cmd = rsp;
496                            syslog(LOG_DEBUG, "DEBUG: Sending NotDone response(%d)\n", rsp);
497                            ret = send(fd, &cmd, sizeof(cmd), 0);
498                            if (ret != sizeof(cmd))
499                                perror("response send error");
500                        }
501                    } else if (!ret) {
502                        int j;
503
504                        num_connections--;
505                        syslog(LOG_DEBUG, "DEBUG: Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
506                        for (j=i; j < num_connections; j++)
507                            connections[j] = connections[j+1];
508                        clear_requests(fd);
509                        close(fd);
510                    } else {
511                        syslog(LOG_ERR, "Recv Error on fd %d", fd);
512                        break;
513                    }
514                }
515            }
516        } else {
517            syslog(LOG_ERR, "Select timeout");
518        }
519    }
520}
521
522/**
523 * Periodically write out current stats to a stats file. This information
524 * can then be used to monitor performance of renderd e.g. with a munin plugin
525 */
526void *stats_writeout_thread(void * arg) {
527    stats_struct lStats;
528    int dirtQueueLength;
529    int reqQueueLength;
530    int reqPrioQueueLength;
531    int reqBulkQueueLength;
532        int i;
533
534    int noFailedAttempts = 0;
535    char tmpName[PATH_MAX];
536
537    snprintf(tmpName, sizeof(tmpName), "%s.tmp", config.stats_filename);
538
539    syslog(LOG_DEBUG, "Starting stats thread");
540    while (1) {
541        pthread_mutex_lock(&qLock);
542        memcpy(&lStats, &stats, sizeof(stats_struct));
543        dirtQueueLength = dirtyNum;
544        reqQueueLength = reqNum;
545        reqPrioQueueLength = reqPrioNum;
546        reqBulkQueueLength = reqBulkNum;
547        pthread_mutex_unlock(&qLock);
548
549        FILE * statfile = fopen(tmpName, "w");
550        if (statfile == NULL) {
551            syslog(LOG_WARNING, "Failed to open stats file: %i", errno);
552            noFailedAttempts++;
553            if (noFailedAttempts > 3) {
554                syslog(LOG_ERR, "ERROR: Failed repeatedly to write stats, giving up");
555                break;
556            }
557            continue;
558        } else {
559            noFailedAttempts = 0;
560            fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
561            fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
562            fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
563            fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
564            fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
565            fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
566            fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
567            fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
568            fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
569            for (i = 0; i <= MAX_ZOOM; i++) {
570                fprintf(statfile,"ZoomRendered%02i: %li\n", i, lStats.noZoomRender[i]);
571            }
572            for (i = 0; i <= MAX_ZOOM; i++) {
573                fprintf(statfile,"TimeRenderedZoom%02i: %li\n", i, lStats.timeZoomRender[i]);
574            }
575            fclose(statfile);
576            if (rename(tmpName, config.stats_filename)) {
577                syslog(LOG_WARNING, "Failed to overwrite stats file: %i", errno);
578                noFailedAttempts++;
579                if (noFailedAttempts > 3) {
580                    syslog(LOG_ERR, "ERROR: Failed repeatedly to overwrite stats, giving up");
581                    break;
582                }
583                continue;
584            }
585        }
586        sleep(10);
587    }
588    return NULL;
589}
590
591int client_socket_init(renderd_config * sConfig) {
592    int fd;
593    struct sockaddr_un * addrU;
594    struct sockaddr_in * addrI;
595    struct hostent *server;
596    if (sConfig->ipport > 0) {
597        syslog(LOG_INFO, "Initialising TCP/IP client socket to %s:%i",
598                sConfig->iphostname, sConfig->ipport);
599        addrI = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in));
600        fd = socket(PF_INET, SOCK_STREAM, 0);
601        server = gethostbyname(sConfig->iphostname);
602        if (server == NULL) {
603            syslog(LOG_WARNING, "Could not resolve hostname: %s",
604                    sConfig->iphostname);
605            return FD_INVALID;
606        }
607        bzero((char *) addrI, sizeof(struct sockaddr_in));
608        addrI->sin_family = AF_INET;
609        bcopy((char *) server->h_addr, (char *) &addrI->sin_addr.s_addr,
610                server->h_length);
611        addrI->sin_port = htons(sConfig->ipport);
612        if (connect(fd, (struct sockaddr *) addrI, sizeof(struct sockaddr_in)) < 0) {
613            syslog(LOG_WARNING, "Could not connect to %s:%i",
614                    sConfig->iphostname, sConfig->ipport);
615            return FD_INVALID;
616        }
617        free(addrI);
618        syslog(LOG_INFO, "socket %s:%i initialised to fd %i", sConfig->iphostname, sConfig->ipport,
619                fd);
620    } else {
621        syslog(LOG_INFO, "Initialising unix client socket on %s",
622                sConfig->socketname);
623        addrU = (struct sockaddr_un *)malloc(sizeof(struct sockaddr_un));
624        fd = socket(PF_UNIX, SOCK_STREAM, 0);
625        if (fd < 0) {
626            syslog(LOG_WARNING, "Could not obtain socket: %i", fd);
627            return FD_INVALID;
628        }
629
630        bzero(addrU, sizeof(struct sockaddr_un));
631        addrU->sun_family = AF_UNIX;
632        strncpy(addrU->sun_path, sConfig->socketname, sizeof(addrU->sun_path));
633
634        if (connect(fd, (struct sockaddr *) addrU, sizeof(struct sockaddr_un)) < 0) {
635            syslog(LOG_WARNING, "socket connect failed for: %s",
636                    sConfig->socketname);
637            close(fd);
638            return FD_INVALID;
639        }
640        free(addrU);
641        syslog(LOG_INFO, "socket %s initialised to fd %i", sConfig->socketname,
642                fd);
643    }
644    return fd;
645}
646
647int server_socket_init(renderd_config *sConfig) {
648    struct sockaddr_un addrU;
649    struct sockaddr_in addrI;
650    mode_t old;
651    int fd;
652
653    if (sConfig->ipport > 0) {
654        syslog(LOG_INFO, "Initialising TCP/IP server socket on %s:%i",
655                sConfig->iphostname, sConfig->ipport);
656        fd = socket(PF_INET, SOCK_STREAM, 0);
657        if (fd < 0) {
658            fprintf(stderr, "failed to create IP socket\n");
659            exit(2);
660        }
661        bzero(&addrI, sizeof(addrI));
662        addrI.sin_family = AF_INET;
663        addrI.sin_addr.s_addr = INADDR_ANY;
664        addrI.sin_port = htons(sConfig->ipport);
665        if (bind(fd, (struct sockaddr *) &addrI, sizeof(addrI)) < 0) {
666            fprintf(stderr, "socket bind failed for: %s:%i\n",
667                    sConfig->iphostname, sConfig->ipport);
668            close(fd);
669            exit(3);
670        }
671    } else {
672        syslog(LOG_INFO, "Initialising unix server socket on %s",
673                sConfig->socketname);
674
675        fd = socket(PF_UNIX, SOCK_STREAM, 0);
676        if (fd < 0) {
677            fprintf(stderr, "failed to create unix socket\n");
678            exit(2);
679        }
680
681        bzero(&addrU, sizeof(addrU));
682        addrU.sun_family = AF_UNIX;
683        strncpy(addrU.sun_path, sConfig->socketname, sizeof(addrU.sun_path));
684
685        unlink(addrU.sun_path);
686
687        old = umask(0); // Need daemon socket to be writeable by apache
688        if (bind(fd, (struct sockaddr *) &addrU, sizeof(addrU)) < 0) {
689            fprintf(stderr, "socket bind failed for: %s\n", sConfig->socketname);
690            close(fd);
691            exit(3);
692        }
693        umask(old);
694    }
695
696    if (listen(fd, QUEUE_MAX) < 0) {
697        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
698        close(fd);
699        exit(4);
700    }
701
702    syslog(LOG_DEBUG, "Created server socket %i", fd);
703
704    return fd;
705
706}
707
708/**
709 * This function is used as a the start function for the slave renderer thread.
710 * It pulls a request from the central queue of requests and dispatches it to
711 * the slave renderer. It then blocks and waits for the response with no timeout.
712 * As it only sends one request at a time (there are as many slave_thread threads as there
713 * are rendering threads on the slaves) nothing gets queued on the slave and should get
714 * rendererd immediately. Thus overall, requests should be nicely load balanced between
715 * all the rendering threads available both locally and in the slaves.
716 */
717void *slave_thread(void * arg) {
718    renderd_config * sConfig = (renderd_config *) arg;
719
720    int pfd = FD_INVALID;
721    int retry;
722    size_t ret_size;
723
724    struct protocol * resp;
725    struct protocol * req_slave;
726
727    req_slave = (struct protocol *)malloc(sizeof(struct protocol));
728    resp = (struct protocol *)malloc(sizeof(struct protocol));
729    bzero(req_slave, sizeof(struct protocol));
730    bzero(resp, sizeof(struct protocol));
731
732    while (1) {
733        if (pfd == FD_INVALID) {
734            pfd = client_socket_init(sConfig);
735            if (pfd == FD_INVALID) {
736                if (sConfig->ipport > 0) {
737                    syslog(LOG_ERR,
738                        "Failed to connect to render slave %s:%i, trying again in 30 seconds",
739                        sConfig->iphostname, sConfig->ipport);
740                } else {
741                    syslog( LOG_ERR,
742                            "Failed to connect to render slave %s, trying again in 30 seconds",
743                            sConfig->socketname);
744                }
745                sleep(30);
746                continue;
747            }
748        }
749
750        enum protoCmd ret;
751        struct item *item = fetch_request();
752        if (item) {
753            struct protocol *req = &item->req;
754            req_slave->ver = PROTO_VER;
755            req_slave->cmd = cmdRender;
756            strcpy(req_slave->xmlname, req->xmlname);
757            req_slave->x = req->x;
758            req_slave->y = req->y;
759            req_slave->z = req->z;
760
761            /*Dispatch request to slave renderd*/
762            retry = 2;
763            syslog(LOG_INFO,
764                    "Dispatching request to slave thread on fd %i", pfd);
765            do {
766                ret_size = send(pfd, req_slave, sizeof(struct protocol), 0);
767
768                if (ret_size == sizeof(struct protocol)) {
769                    //correctly sent command to slave
770                    break;
771                }
772
773                if (errno != EPIPE) {
774                    syslog(LOG_ERR,
775                            "Failed to send cmd to render slave, shutting down thread");
776                    return NULL;
777                }
778
779                syslog(LOG_WARNING, "Failed to send cmd to render slave, retrying");
780                close(pfd);
781                pfd = client_socket_init(sConfig);
782                if (pfd == FD_INVALID) {
783                    syslog(LOG_ERR,
784                            "Failed to re-connect to render slave, dropping request");
785                    ret = cmdNotDone;
786                    send_response(item, ret);
787                    break;
788                }
789            } while (retry--);
790            if (pfd == FD_INVALID || ret_size != sizeof(struct protocol)) {
791                continue;
792            }
793
794            ret_size = 0;
795            retry = 10;
796            while ((ret_size < sizeof(struct protocol)) && (retry > 0)) {
797                ret_size = recv(pfd, resp + ret_size, (sizeof(struct protocol)
798                        - ret_size), 0);
799                if ((errno == EPIPE) || ret_size == 0) {
800                    close(pfd);
801                    pfd = FD_INVALID;
802                    ret_size = 0;
803                    syslog(LOG_ERR, "Pipe to render slave closed");
804                    break;
805                }
806                retry--;
807            }
808            if (ret_size < sizeof(struct protocol)) {
809                if (sConfig->ipport > 0) {
810                    syslog( LOG_ERR,
811                            "Invalid reply from render slave %s:%i, trying again in 30 seconds",
812                            sConfig->iphostname, sConfig->ipport);
813                } else {
814                    syslog( LOG_ERR,
815                            "Invalid reply render slave %s, trying again in 30 seconds",
816                            sConfig->socketname);
817                }
818
819                ret = cmdNotDone;
820                send_response(item, ret);
821                sleep(30);
822            } else {
823                ret = resp->cmd;
824                send_response(item, ret);
825                if (resp->cmd != cmdDone) {
826                    if (sConfig->ipport > 0) {
827                        syslog( LOG_ERR,
828                                "Request from render slave %s:%i did not complete correctly",
829                                sConfig->iphostname, sConfig->ipport);
830                    } else {
831                        syslog( LOG_ERR,
832                                "Request from render slave %s did not complete correctly",
833                                sConfig->socketname);
834                    }
835                    //Sleep for a while to make sure we don't overload the renderer
836                    //This only happens if it didn't correctly block on the rendering
837                    //request
838                    sleep(30);
839                }
840            }
841
842        } else {
843            sleep(1); // TODO: Use an event to indicate there are new requests
844        }
845    }
846    return NULL;
847}
848
849
850int main(int argc, char **argv)
851{
852    int fd, i, j, k;
853
854    int c;
855    int foreground=0;
856    int active_slave=0;
857    int log_options;
858    char config_file_name[PATH_MAX] = RENDERD_CONFIG;
859
860    while (1) {
861        int option_index = 0;
862        static struct option long_options[] = {
863            {"config", 1, 0, 'c'},
864            {"foreground", 1, 0, 'f'},
865            {"slave", 1, 0, 's'},
866            {"help", 0, 0, 'h'},
867            {0, 0, 0, 0}
868        };
869
870        c = getopt_long(argc, argv, "hfc:", long_options, &option_index);
871        if (c == -1)
872            break;
873
874        switch (c) {
875            case 'f':
876                foreground=1;
877                break;
878            case 'c':
879                strncpy(config_file_name, optarg, PATH_MAX-1);
880                config_file_name[PATH_MAX-1] = 0;
881                break;
882            case 's':
883                if (sscanf(optarg, "%i", &active_slave) != 1) {
884                    fprintf(stderr, "--slave needs to be nummeric (%s)\n", optarg);
885                    active_slave = 0;
886                }
887                break;
888            case 'h':
889                fprintf(stderr, "Usage: renderd [OPTION] ...\n");
890                fprintf(stderr, "Mapnik rendering daemon\n");
891                fprintf(stderr, "  -f, --foreground     run in foreground\n");
892                fprintf(stderr, "  -h, --help           display this help and exit\n");
893                fprintf(stderr, "  -c, --config=CONFIG  set location of config file (default /etc/renderd.conf)\n");
894                fprintf(stderr, "  -s, --slave=CONFIG_NR set which render slave this is (default 0)\n");
895                exit(0);
896            default:
897                fprintf(stderr, "unknown config option '%c'\n", c);
898                exit(1);
899        }
900    }
901
902    log_options = LOG_PID;
903#ifdef LOG_PERROR
904    if (foreground)
905        log_options |= LOG_PERROR;
906#endif
907    openlog("renderd", log_options, LOG_DAEMON);
908
909    syslog(LOG_INFO, "Rendering daemon started");
910
911    pthread_mutex_init(&qLock, NULL);
912    pthread_cond_init(&qCond, NULL);
913    reqHead.next = reqHead.prev = &reqHead;
914    reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
915    reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
916    dirtyHead.next = dirtyHead.prev = &dirtyHead;
917    renderHead.next = renderHead.prev = &renderHead;
918    hashidxSize = HASHIDX_SIZE;
919    item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * hashidxSize);
920    bzero(item_hashidx, sizeof(struct item_idx) * hashidxSize);
921
922    stats.noDirtyRender = 0;
923    stats.noReqDroped = 0;
924    stats.noReqRender = 0;
925    stats.noReqPrioRender = 0;
926    stats.noReqBulkRender = 0;
927
928    xmlconfigitem maps[XMLCONFIGS_MAX];
929    bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
930
931    renderd_config config_slaves[MAX_SLAVES];
932    bzero(config_slaves, sizeof(renderd_config) * MAX_SLAVES);
933    bzero(&config, sizeof(renderd_config));
934
935    dictionary *ini = iniparser_load(config_file_name);
936    if (! ini) {
937        exit(1);
938    }
939
940    noSlaveRenders = 0;
941
942    int iconf = -1;
943    char buffer[PATH_MAX];
944    for (int section=0; section < iniparser_getnsec(ini); section++) {
945        char *name = iniparser_getsecname(ini, section);
946        syslog(LOG_INFO, "Parsing section %s\n", name);
947        if (strncmp(name, "renderd", 7) && strcmp(name, "mapnik")) {
948            if (config.tile_dir == NULL) {
949                fprintf(stderr, "No valid (active) renderd config section available\n");
950                exit(7);
951            }
952            /* this is a map section */
953            iconf++;
954            if (strlen(name) >= XMLCONFIG_MAX) {
955                fprintf(stderr, "XML name too long: %s\n", name);
956                exit(7);
957            }
958
959            strcpy(maps[iconf].xmlname, name);
960            if (iconf >= XMLCONFIGS_MAX) {
961                fprintf(stderr, "Config: more than %d configurations found\n", XMLCONFIGS_MAX);
962                exit(7);
963            }
964            sprintf(buffer, "%s:uri", name);
965            char *ini_uri = iniparser_getstring(ini, buffer, (char *)"");
966            if (strlen(ini_uri) >= PATH_MAX) {
967                fprintf(stderr, "URI too long: %s\n", ini_uri);
968                exit(7);
969            }
970            strcpy(maps[iconf].xmluri, ini_uri);
971            sprintf(buffer, "%s:xml", name);
972            char *ini_xmlpath = iniparser_getstring(ini, buffer, (char *)"");
973            if (strlen(ini_xmlpath) >= PATH_MAX){
974                fprintf(stderr, "XML path too long: %s\n", ini_xmlpath);
975                exit(7);
976            }
977            sprintf(buffer, "%s:host", name);
978            char *ini_hostname = iniparser_getstring(ini, buffer, (char *) "");
979            if (strlen(ini_hostname) >= PATH_MAX) {
980                fprintf(stderr, "Host name too long: %s\n", ini_hostname);
981                exit(7);
982            }
983
984            sprintf(buffer, "%s:htcphost", name);
985            char *ini_htcpip = iniparser_getstring(ini, buffer, (char *) "");
986            if (strlen(ini_htcpip) >= PATH_MAX) {
987                fprintf(stderr, "HTCP host name too long: %s\n", ini_htcpip);
988                exit(7);
989            }
990            strcpy(maps[iconf].xmlfile, ini_xmlpath);
991            strcpy(maps[iconf].tile_dir, config.tile_dir);
992            strcpy(maps[iconf].host, ini_hostname);
993            strcpy(maps[iconf].htcpip, ini_htcpip);
994        } else if (strncmp(name, "renderd", 7) == 0) {
995            int render_sec = 0;
996            if (sscanf(name, "renderd%i", &render_sec) != 1) {
997                render_sec = 0;
998            }
999            syslog(LOG_INFO, "Parsing render section %i\n", render_sec);
1000            if (render_sec >= MAX_SLAVES) {
1001                syslog(LOG_ERR, "Can't handle more than %i render sections\n",
1002                        MAX_SLAVES);
1003                exit(7);
1004            }
1005            sprintf(buffer, "%s:socketname", name);
1006            config_slaves[render_sec].socketname = iniparser_getstring(ini,
1007                    buffer, (char *) RENDER_SOCKET);
1008            sprintf(buffer, "%s:iphostname", name);
1009            config_slaves[render_sec].iphostname = iniparser_getstring(ini,
1010                    buffer, "");
1011            sprintf(buffer, "%s:ipport", name);
1012            config_slaves[render_sec].ipport = iniparser_getint(ini, buffer, 0);
1013            sprintf(buffer, "%s:num_threads", name);
1014            config_slaves[render_sec].num_threads = iniparser_getint(ini,
1015                    buffer, NUM_THREADS);
1016            sprintf(buffer, "%s:tile_dir", name);
1017            config_slaves[render_sec].tile_dir = iniparser_getstring(ini,
1018                    buffer, (char *) HASH_PATH);
1019            sprintf(buffer, "%s:stats_file", name);
1020            config_slaves[render_sec].stats_filename = iniparser_getstring(ini,
1021                    buffer, NULL);
1022
1023            if (render_sec == active_slave) {
1024                config.socketname = config_slaves[render_sec].socketname;
1025                config.iphostname = config_slaves[render_sec].iphostname;
1026                config.ipport = config_slaves[render_sec].ipport;
1027                config.num_threads = config_slaves[render_sec].num_threads;
1028                config.tile_dir = config_slaves[render_sec].tile_dir;
1029                config.stats_filename
1030                        = config_slaves[render_sec].stats_filename;
1031                config.mapnik_plugins_dir = iniparser_getstring(ini,
1032                        "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
1033                config.mapnik_font_dir = iniparser_getstring(ini,
1034                        "mapnik:font_dir", (char *) FONT_DIR);
1035                config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
1036                        "mapnik:font_dir_recurse", FONT_RECURSE);
1037            } else {
1038                noSlaveRenders += config_slaves[render_sec].num_threads;
1039            }
1040        }
1041    }
1042
1043    if (config.ipport > 0) {
1044        syslog(LOG_INFO, "config renderd: ip socket=%s:%i\n", config.iphostname, config.ipport);
1045    } else {
1046        syslog(LOG_INFO, "config renderd: unix socketname=%s\n", config.socketname);
1047    }
1048    syslog(LOG_INFO, "config renderd: num_threads=%d\n", config.num_threads);
1049    if (active_slave == 0) {
1050        syslog(LOG_INFO, "config renderd: num_slaves=%d\n", noSlaveRenders);
1051    }
1052    syslog(LOG_INFO, "config renderd: tile_dir=%s\n", config.tile_dir);
1053    syslog(LOG_INFO, "config renderd: stats_file=%s\n", config.stats_filename);
1054    syslog(LOG_INFO, "config mapnik:  plugins_dir=%s\n", config.mapnik_plugins_dir);
1055    syslog(LOG_INFO, "config mapnik:  font_dir=%s\n", config.mapnik_font_dir);
1056    syslog(LOG_INFO, "config mapnik:  font_dir_recurse=%d\n", config.mapnik_font_dir_recurse);
1057    for (int i = 0; i < MAX_SLAVES; i++) {
1058        if (config_slaves[i].num_threads == 0) {
1059            continue;
1060        }
1061        if (i == active_slave) {
1062            syslog(LOG_INFO, "config renderd(%i): Active\n", i);
1063        }
1064        if (config_slaves[i].ipport > 0) {
1065                syslog(LOG_INFO, "config renderd(%i): ip socket=%s:%i\n", i,
1066                        config_slaves[i].iphostname, config_slaves[i].ipport);
1067            } else {
1068                syslog(LOG_INFO, "config renderd(%i): unix socketname=%s\n", i,
1069                        config_slaves[i].socketname);
1070            }
1071        syslog(LOG_INFO, "config renderd(%i): num_threads=%d\n", i,
1072                config_slaves[i].num_threads);
1073        syslog(LOG_INFO, "config renderd(%i): tile_dir=%s\n", i,
1074                config_slaves[i].tile_dir);
1075        syslog(LOG_INFO, "config renderd(%i): stats_file=%s\n", i,
1076                config_slaves[i].stats_filename);
1077    }
1078
1079    for(iconf = 0; iconf < XMLCONFIGS_MAX; ++iconf) {
1080        if (maps[iconf].xmlname[0] != 0) {
1081         syslog(LOG_INFO, "config map %d:   name(%s) file(%s) uri(%s) htcp(%s) host(%s)",
1082                 iconf, maps[iconf].xmlname, maps[iconf].xmlfile, maps[iconf].xmluri,
1083                 maps[iconf].htcpip, maps[iconf].host);
1084        }
1085    }
1086
1087    fd = server_socket_init(&config);
1088#if 0
1089    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
1090        fprintf(stderr, "setting socket non-block failed\n");
1091        close(fd);
1092        exit(5);
1093    }
1094#endif
1095
1096    //sigPipeAction.sa_handler = pipe_handler;
1097    sigPipeAction.sa_handler = SIG_IGN;
1098    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
1099        fprintf(stderr, "failed to register signal handler\n");
1100        close(fd);
1101        exit(6);
1102    }
1103
1104    render_init(config.mapnik_plugins_dir, config.mapnik_font_dir, config.mapnik_font_dir_recurse);
1105
1106    /* unless the command line said to run in foreground mode, fork and detach from terminal */
1107    if (foreground) {
1108        fprintf(stderr, "Running in foreground mode...\n");
1109    } else {
1110        if (daemon(0, 0) != 0) {
1111            fprintf(stderr, "can't daemonize: %s\n", strerror(errno));
1112        }
1113        /* write pid file */
1114        FILE *pidfile = fopen(PIDFILE, "w");
1115        if (pidfile) {
1116            (void) fprintf(pidfile, "%d\n", getpid());
1117            (void) fclose(pidfile);
1118        }
1119    }
1120
1121    if (config.stats_filename != NULL) {
1122        if (pthread_create(&stats_thread, NULL, stats_writeout_thread, NULL)) {
1123            syslog(LOG_WARNING, "Could not create stats writeout thread");
1124        }
1125    } else {
1126        syslog(LOG_INFO, "No stats file specified in config. Stats reporting disabled");
1127    }
1128
1129    render_threads = (pthread_t *) malloc(sizeof(pthread_t) * config.num_threads);
1130
1131    for(i=0; i<config.num_threads; i++) {
1132        if (pthread_create(&render_threads[i], NULL, render_thread, (void *)maps)) {
1133            fprintf(stderr, "error spawning render thread\n");
1134            close(fd);
1135            exit(7);
1136        }
1137    }
1138
1139    if (active_slave == 0) {
1140        //Only the master renderd opens connections to its slaves
1141        k = 0;
1142        slave_threads
1143                = (pthread_t *) malloc(sizeof(pthread_t) * noSlaveRenders);
1144        for (i = 1; i < MAX_SLAVES; i++) {
1145            for (j = 0; j < config_slaves[i].num_threads; j++) {
1146                if (pthread_create(&slave_threads[k++], NULL, slave_thread,
1147                        (void *) &config_slaves[i])) {
1148                    fprintf(stderr, "error spawning render thread\n");
1149                    close(fd);
1150                    exit(7);
1151                }
1152            }
1153        }
1154    }
1155
1156    process_loop(fd);
1157
1158    unlink(config.socketname);
1159    close(fd);
1160    return 0;
1161}
Note: See TracBrowser for help on using the repository browser.