source: subversion/applications/utils/mod_tile/daemon.c @ 6742

Last change on this file since 6742 was 6742, checked in by jonb, 12 years ago

mod_tile: Do not exit process loop if accept() fails

File size: 11.7 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13
14
15#include "gen_tile.h"
16#include "protocol.h"
17#include "render_config.h"
18#include "dir_utils.h"
19
20static pthread_t render_threads[NUM_THREADS];
21static struct sigaction sigPipeAction;
22
23static struct item reqHead, dirtyHead, renderHead;
24static int reqNum, dirtyNum;
25static pthread_mutex_t qLock;
26
27struct item *fetch_request(void)
28{
29    struct item *item = NULL;
30
31    pthread_mutex_lock(&qLock);
32
33    if (reqNum) {
34        item = reqHead.next;
35        reqNum--;
36    } else if (dirtyNum) {
37        item = dirtyHead.next;
38        dirtyNum--;
39    }
40    if (item) {
41        item->next->prev = item->prev;
42        item->prev->next = item->next;
43
44        item->prev = &renderHead;
45        item->next = renderHead.next;
46        renderHead.next->prev = item;
47        renderHead.next = item;
48    }
49
50    pthread_mutex_unlock(&qLock);
51
52    return item;
53}
54
55void clear_requests(int fd)
56{
57    struct item *item, *dupes;
58
59    pthread_mutex_lock(&qLock);
60    item = reqHead.next;
61    while (item != &reqHead) {
62        if (item->fd == fd)
63            item->fd = FD_INVALID;
64
65        dupes = item->duplicates;
66        while (dupes) {
67            if (dupes->fd == fd)
68                dupes->fd = FD_INVALID;
69            dupes = dupes->duplicates;
70        }
71        item = item->next;
72    }
73
74    item = renderHead.next;
75    while (item != &renderHead) {
76        if (item->fd == fd)
77            item->fd = FD_INVALID;
78
79        dupes = item->duplicates;
80        while (dupes) {
81            if (dupes->fd == fd)
82                dupes->fd = FD_INVALID;
83            dupes = dupes->duplicates;
84        }
85        item = item->next;
86    }
87
88    pthread_mutex_unlock(&qLock);
89}
90
91static inline const char *cmdStr(enum protoCmd c)
92{
93    switch (c) {
94        case cmdIgnore:  return "Ignore";
95        case cmdRender:  return "Render";
96        case cmdDirty:   return "Dirty";
97        case cmdDone:    return "Done";
98        case cmdNotDone: return "NotDone";
99        default:         return "unknown";
100    }
101}
102
103void send_response(struct item *item, enum protoCmd rsp)
104{
105    struct protocol *req = &item->req;
106    int ret;
107
108    pthread_mutex_lock(&qLock);
109    item->next->prev = item->prev;
110    item->prev->next = item->next;
111    pthread_mutex_unlock(&qLock);
112
113    while (item) {
114        struct item *prev = item;
115        req = &item->req;
116        if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
117            req->cmd = rsp;
118            //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
119            ret = send(item->fd, req, sizeof(*req), 0);
120            if (ret != sizeof(*req))
121                perror("send error during send_done");
122        }
123        item = item->duplicates;
124        free(prev);
125    }
126}
127
128
129enum protoCmd pending(struct item *test)
130{
131    // check all queues and render list to see if this request already queued
132    // If so, add this new request as a duplicate
133    // call with qLock held
134    struct item *item;
135
136    item = renderHead.next;
137    while (item != &renderHead) {
138        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z)) {
139            // Add new test item in the list of duplicates
140            test->duplicates = item->duplicates;
141            item->duplicates = test;
142            return cmdIgnore;
143        }
144        item = item->next;
145    }
146
147    item = reqHead.next;
148    while (item != &reqHead) {
149        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z)) {
150            // Add new test item in the list of duplicates
151            test->duplicates = item->duplicates;
152            item->duplicates = test;
153            return cmdIgnore;
154        }
155        item = item->next;
156    }
157
158    item = dirtyHead.next;
159    while (item != &dirtyHead) {
160        if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z == test->req.z))
161            return cmdNotDone;
162        item = item->next;
163    }
164
165    return cmdRender;
166}
167
168int check_xyz(int x, int y, int z)
169{
170    int oob, limit;
171
172    // Validate tile co-ordinates
173    oob = (z < 0 || z > MAX_ZOOM);
174    if (!oob) {
175         // valid x/y for tiles are 0 ... 2^zoom-1
176        limit = (1 << z) - 1;
177        oob =  (x < 0 || x > limit || y < 0 || y > limit);
178    }
179
180    if (oob)
181        fprintf(stderr, "got bad co-ords: x(%d) y(%d) z(%d)\n", x, y, z);
182
183    return oob;
184}
185
186
187
188enum protoCmd rx_request(const struct protocol *req, int fd)
189{
190    struct item *list = NULL, *item;
191    enum protoCmd pend;
192
193    if (req->ver != 1) {
194        fprintf(stderr, "Bad protocol version %d\n", req->ver);
195        return cmdIgnore;
196    }
197
198    //fprintf(stderr, "%s fd(%d) z(%d), x(%d), y(%d), path(%s)\n",
199    //        cmdStr(req->cmd), fd, req->z, req->x, req->y, req->path);
200
201    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
202        return cmdIgnore;
203
204    if (check_xyz(req->x, req->y, req->z))
205        return cmdNotDone;
206
207    if (mkdirp(req->path))
208        return cmdNotDone;
209
210    item = (struct item *)malloc(sizeof(*item));
211    if (!item) {
212            fprintf(stderr, "malloc failed\n");
213            return cmdNotDone;
214    }
215
216    item->req = *req;
217    item->duplicates = NULL;
218    item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID;
219
220#ifdef METATILE
221    /* Round down request co-ordinates to the neareast N (should be a power of 2)
222     * Note: request path is no longer consistent but this will be recalculated
223     * when the metatile is being rendered.
224     */
225    item->mx = item->req.x & ~(METATILE-1);
226    item->my = item->req.y & ~(METATILE-1);
227#else
228    item->mx = item->req.x;
229    item->my = item->req.y;
230#endif
231
232    pthread_mutex_lock(&qLock);
233
234    // Check for a matching request in the current rendering or dirty queues
235    pend = pending(item);
236    if (pend == cmdNotDone) {
237        // We found a match in the dirty queue, can not wait for it
238        pthread_mutex_unlock(&qLock);
239        free(item);
240        return cmdNotDone;
241    }
242    if (pend == cmdIgnore) {
243        // Found a match in render queue, item added as duplicate
244        pthread_mutex_unlock(&qLock);
245        return cmdIgnore;
246    }
247
248    // New request, add it to render or dirty queue
249    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
250        list = &reqHead;
251        reqNum++;
252    } else if (dirtyNum < DIRTY_LIMIT) {
253        list = &dirtyHead;
254        dirtyNum++;
255        item->fd = FD_INVALID; // No response after render
256    }
257
258    if (list) {
259        item->next = list;
260        item->prev = list->prev;
261        item->prev->next = item;
262        list->prev = item;
263    } else
264        free(item);
265
266    pthread_mutex_unlock(&qLock);
267
268    return (list == &reqHead)?cmdIgnore:cmdNotDone;
269}
270
271
272void process_loop(int listen_fd)
273{
274    int num_connections = 0;
275    int connections[MAX_CONNECTIONS];
276
277    bzero(connections, sizeof(connections));
278
279    while (1) {
280        struct sockaddr_un in_addr;
281        socklen_t in_addrlen = sizeof(in_addr);
282        fd_set rd;
283        int incoming, num, nfds, i;
284
285        FD_ZERO(&rd);
286        FD_SET(listen_fd, &rd);
287        nfds = listen_fd+1;
288
289        for (i=0; i<num_connections; i++) {
290            FD_SET(connections[i], &rd);
291            nfds = MAX(nfds, connections[i]+1);
292        }
293
294        num = select(nfds, &rd, NULL, NULL, NULL);
295        if (num == -1)
296            perror("select()");
297        else if (num) {
298            //printf("Data is available now on %d fds\n", num);
299            if (FD_ISSET(listen_fd, &rd)) {
300                num--;
301                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
302                if (incoming < 0) {
303                    perror("accept()");
304                } else {
305                    if (num_connections == MAX_CONNECTIONS) {
306                        fprintf(stderr, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
307                        close(incoming);
308                    } else {
309                        connections[num_connections++] = incoming;
310                        fprintf(stderr, "Got incoming connection, fd %d, number %d\n", incoming, num_connections);
311                    }
312                }
313            }
314            for (i=0; num && (i<num_connections); i++) {
315                int fd = connections[i];
316                if (FD_ISSET(fd, &rd)) {
317                    struct protocol cmd;
318                    int ret;
319
320                    //fprintf(stderr, "New command from fd %d, number %d, to go %d\n", fd, i, num);
321                    // TODO: to get highest performance we should loop here until we get EAGAIN
322                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
323                    if (ret == sizeof(cmd)) {
324                        enum protoCmd rsp = rx_request(&cmd, fd);
325
326                        switch(rsp) {
327                            case cmdNotDone:
328                                cmd.cmd = rsp;
329                                fprintf(stderr, "Sending NotDone response(%d)\n", rsp);
330                                ret = send(fd, &cmd, sizeof(cmd), 0);
331                                if (ret != sizeof(cmd))
332                                    perror("response send error");
333                                break;
334                            default:
335                                break;
336                        }
337                    } else if (!ret) {
338                        int j;
339
340                        num_connections--;
341                        fprintf(stderr, "Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
342                        for (j=i; j < num_connections; j++)
343                            connections[j] = connections[j+1];
344                        clear_requests(fd);
345                        close(fd);
346                    } else {
347                        fprintf(stderr, "Recv Error on fd %d\n", fd);
348                        break;
349                    }
350                }
351            }
352        } else
353            fprintf(stderr, "Select timeout\n");
354    }
355}
356
357
358int main(void)
359{
360    const char *spath = RENDER_SOCKET;
361    int fd, i;
362    struct sockaddr_un addr;
363    mode_t old;
364
365    fprintf(stderr, "Rendering daemon\n");
366
367    pthread_mutex_init(&qLock, NULL);
368    reqHead.next = reqHead.prev = &reqHead;
369    dirtyHead.next = dirtyHead.prev = &dirtyHead;
370    renderHead.next = renderHead.prev = &renderHead;
371
372    fd = socket(PF_UNIX, SOCK_STREAM, 0);
373    if (fd < 0) {
374        fprintf(stderr, "failed to create unix sozket\n");
375        exit(2);
376    }
377
378    bzero(&addr, sizeof(addr));
379    addr.sun_family = AF_UNIX;
380    strncpy(addr.sun_path, spath, sizeof(addr.sun_path));
381
382    unlink(addr.sun_path);
383
384    old = umask(0); // Need daemon socket to be writeable by apache
385    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
386        fprintf(stderr, "socket bind failed for: %s\n", spath);
387        close(fd);
388        exit(3);
389    }
390    umask(old);
391
392    if (listen(fd, QUEUE_MAX) < 0) {
393        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
394        close(fd);
395        exit(4);
396    }
397
398#if 0
399    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
400        fprintf(stderr, "setting socket non-block failed\n");
401        close(fd);
402        exit(5);
403    }
404#endif
405
406    //sigPipeAction.sa_handler = pipe_handler;
407    sigPipeAction.sa_handler = SIG_IGN;
408    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
409        fprintf(stderr, "failed to register signal handler\n");
410        close(fd);
411        exit(6);
412    }
413
414    render_init();
415
416    for(i=0; i<NUM_THREADS; i++) {
417        if (pthread_create(&render_threads[i], NULL, render_thread, NULL)) {
418            fprintf(stderr, "error spawning render thread\n");
419            close(fd);
420            exit(7);
421        }
422    }
423    process_loop(fd);
424
425    unlink(spath);
426    close(fd);
427    return 0;
428}
Note: See TracBrowser for help on using the repository browser.