source: subversion/applications/utils/mod_tile/daemon.c @ 4962

Revision 4962, 10.9 KB checked in by jonb, 7 years ago (diff)

mod_tile: Increase connection and rendering queue limits

Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <unistd.h>
4#include <sys/types.h>
5#include <sys/socket.h>
6#include <sys/select.h>
7#include <sys/stat.h>
8#include <sys/un.h>
9#include <poll.h>
10#include <errno.h>
11#include <pthread.h>
12#include <signal.h>
13
14
15#include "gen_tile.h"
16#include "protocol.h"
17
18#define QUEUE_MAX (32)
19#define MAX_CONNECTIONS (256)
20
21#define MAX(a,b)   ((a) > (b) ? (a) : (b))
22
23#define FD_INVALID (-1)
24#define REQ_LIMIT (32)
25#define DIRTY_LIMIT (1000 * 1000)
26#define NUM_THREADS (4)
27
28static pthread_t render_threads[NUM_THREADS];
29static struct sigaction sigPipeAction;
30
31
32void pipe_handler(__attribute__((unused)) int sigNum)
33{
34    // Needed in case the client closes the connection
35    // before we send a response.
36    // FIXME: is fprintf really safe in signal handler?
37    //fprintf(stderr, "Caught SIGPIPE\n");
38}
39
40// Build parent directories for the specified file name
41// Note: the part following the trailing / is ignored
42// e.g. mkdirp("/a/b/foo.png") == shell mkdir -p /a/b
43static int mkdirp(const char *path) {
44    struct stat s;
45    char tmp[PATH_MAX];
46    char *p;
47
48    strncpy(tmp, path, sizeof(tmp));
49
50    // Look for parent directory
51    p = strrchr(tmp, '/');
52    if (!p)
53        return 0;
54
55    *p = '\0';
56
57    if (!stat(tmp, &s))
58        return !S_ISDIR(s.st_mode);
59    *p = '/';
60    // Walk up the path making sure each element is a directory
61    p = tmp;
62    if (!*p)
63        return 0;
64    p++; // Ignore leading /
65    while (*p) {
66        if (*p == '/') {
67            *p = '\0';
68            if (!stat(tmp, &s)) {
69                if (!S_ISDIR(s.st_mode))
70                    return 1;
71            } else if (mkdir(tmp, 0777))
72                return 1;
73            *p = '/';
74        }
75        p++;
76    }
77    return 0;
78}
79
80static struct item reqHead, dirtyHead, renderHead;
81static int reqNum, dirtyNum;
82static pthread_mutex_t qLock;
83
84struct item *fetch_request(void)
85{
86    struct item *item = NULL;
87
88    pthread_mutex_lock(&qLock);
89
90    if (reqNum) {
91        item = reqHead.next;
92        reqNum--;
93    } else if (dirtyNum) {
94        item = dirtyHead.next;
95        dirtyNum--;
96    }
97    if (item) {
98        item->next->prev = item->prev;
99        item->prev->next = item->next;
100
101        item->prev = &renderHead;
102        item->next = renderHead.next;
103        renderHead.next->prev = item;
104        renderHead.next = item;
105    }
106
107    pthread_mutex_unlock(&qLock);
108
109    return item;
110}
111
112void delete_request(struct item *item)
113{
114    pthread_mutex_lock(&qLock);
115
116    item->next->prev = item->prev;
117    item->prev->next = item->next;
118
119    pthread_mutex_unlock(&qLock);
120    free(item); 
121}
122
123void clear_requests(int fd)
124{
125    struct item *item;
126
127    pthread_mutex_lock(&qLock);
128    item = reqHead.next;
129    while (item != &reqHead) {
130        if (item->fd == fd)
131            item->fd = FD_INVALID;
132        item = item->next;
133    }
134    item = renderHead.next;
135    while (item != &renderHead) {
136        if (item->fd == fd)
137            item->fd = FD_INVALID;
138        item = item->next;
139    }
140    pthread_mutex_unlock(&qLock);
141}
142
143void send_response(struct item *item, enum protoCmd rsp)
144{
145    struct protocol *req = &item->req;
146    int ret;
147
148    pthread_mutex_lock(&qLock);
149
150    if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) {
151        req->cmd = rsp;
152        //fprintf(stderr, "Sending message %d to %d\n", rsp, item->fd);
153        ret = send(item->fd, req, sizeof(*req), 0);
154        if (ret != sizeof(*req))
155            perror("send error during send_done");
156    }
157    pthread_mutex_unlock(&qLock);
158}
159
160static inline const char *cmdStr(enum protoCmd c)
161{
162    switch (c) {
163        case cmdIgnore:  return "Ignore";
164        case cmdRender:  return "Render";
165        case cmdDirty:   return "Dirty";
166        case cmdDone:    return "Done";
167        case cmdNotDone: return "NotDone";
168        default:         return "unknown";
169    }
170}
171
172int pending(struct item *test)
173{
174    // check all queues and render list to see if this request already queued
175    // call with qLock held
176    struct item *item;
177
178    item = reqHead.next;
179    while (item != &reqHead) {
180        if ((item->req.x == test->req.x) && (item->req.y == test->req.y) && (item->req.z == test->req.z))
181            return 1;
182        item = item->next;
183    }
184    item = dirtyHead.next;
185    while (item != &dirtyHead) {
186        if ((item->req.x == test->req.x) && (item->req.y == test->req.y) && (item->req.z == test->req.z))
187            return 1;
188        item = item->next;
189    }
190    item = renderHead.next;
191    while (item != &renderHead) {
192        if ((item->req.x == test->req.x) && (item->req.y == test->req.y) && (item->req.z == test->req.z))
193            return 1;
194        item = item->next;
195    }
196
197    return 0;
198}
199
200enum protoCmd rx_request(const struct protocol *req, int fd)
201{
202    struct item *list = NULL, *item;
203
204    if (req->ver != 1) {
205        fprintf(stderr, "Bad protocol version %d\n", req->ver);
206        return cmdIgnore;
207    }
208
209    fprintf(stderr, "%s z(%d), x(%d), y(%d), path(%s)\n",
210            cmdStr(req->cmd), req->z, req->x, req->y, req->path);
211
212    if ((req->cmd != cmdRender) && (req->cmd != cmdDirty))
213        return cmdIgnore;
214
215    if (mkdirp(req->path))
216        return cmdNotDone;
217
218    item = (struct item *)malloc(sizeof(*item));
219    if (!item) {
220            fprintf(stderr, "malloc failed\n");
221            return cmdNotDone;
222    }
223    item->req = *req;
224
225    pthread_mutex_lock(&qLock);
226
227    if (pending(item)) {
228        pthread_mutex_unlock(&qLock);
229        free(item);
230        return cmdNotDone; // No way to wait on a pending tile
231    }
232
233    if ((req->cmd == cmdRender) && (reqNum < REQ_LIMIT)) {
234        list = &reqHead;
235        reqNum++;
236        item->fd  = fd;
237    } else if (dirtyNum < DIRTY_LIMIT) {
238        list = &dirtyHead;
239        dirtyNum++;
240        item->fd  = FD_INVALID; // No response after render
241    }
242
243    if (list) {
244        item->next = list;
245        item->prev = list->prev;
246        item->prev->next = item;
247        list->prev = item;
248    } else
249        free(item);
250
251    pthread_mutex_unlock(&qLock);
252
253    return (list == &reqHead)?cmdIgnore:cmdNotDone;
254}
255
256
257void process_loop(int listen_fd)
258{
259    int num_connections = 0;
260    int connections[MAX_CONNECTIONS];
261
262    bzero(connections, sizeof(connections));
263
264    while (1) {
265        struct sockaddr_un in_addr;
266        socklen_t in_addrlen = sizeof(in_addr);
267        fd_set rd;
268        int incoming, num, nfds, i;
269
270        FD_ZERO(&rd);
271        FD_SET(listen_fd, &rd);
272        nfds = listen_fd+1;
273
274        for (i=0; i<num_connections; i++) {
275            FD_SET(connections[i], &rd);
276            nfds = MAX(nfds, connections[i]+1);
277        }
278
279        num = select(nfds, &rd, NULL, NULL, NULL);
280        if (num == -1)
281            perror("select()");
282        else if (num) {
283            //printf("Data is available now on %d fds\n", num);
284            if (FD_ISSET(listen_fd, &rd)) {
285                num--;
286                incoming = accept(listen_fd, (struct sockaddr *) &in_addr, &in_addrlen);
287                if (incoming < 0) {
288                    perror("accept()");
289                    break;
290                }
291                if (num_connections == MAX_CONNECTIONS) {
292                    fprintf(stderr, "Connection limit(%d) reached. Dropping connection\n", MAX_CONNECTIONS);
293                    close(incoming);
294                } else {
295                    connections[num_connections++] = incoming;
296                    fprintf(stderr, "Got incoming connection, fd %d, number %d\n", incoming, num_connections);
297                }
298            }
299            for (i=0; num && (i<num_connections); i++) {
300                int fd = connections[i];
301                if (FD_ISSET(fd, &rd)) {
302                    struct protocol cmd;
303                    int ret;
304
305                    //fprintf(stderr, "New command from fd %d, number %d, to go %d\n", fd, i, num);
306                    // TODO: to get highest performance we should loop here until we get EAGAIN
307                    ret = recv(fd, &cmd, sizeof(cmd), MSG_DONTWAIT);
308                    if (ret == sizeof(cmd)) {
309                        enum protoCmd rsp = rx_request(&cmd, fd);
310
311                        switch(rsp) {
312                            case cmdNotDone:
313                                cmd.cmd = rsp;
314                                fprintf(stderr, "Sending NotDone response(%d)\n", rsp);
315                                ret = send(fd, &cmd, sizeof(cmd), 0);
316                                if (ret != sizeof(cmd))
317                                    perror("response send error");
318                                break;
319                            default:
320                                break;
321                        }
322                    } else if (!ret) {
323                        int j;
324
325                        num_connections--;
326                        fprintf(stderr, "Connection %d, fd %d closed, now %d left\n", i, fd, num_connections);
327                        for (j=i; j < num_connections; j++)
328                            connections[j] = connections[j+1];
329                        clear_requests(fd);
330                        close(fd);
331                    } else {
332                        fprintf(stderr, "Recv Error on fd %d\n", fd);
333                        break;
334                    }
335                }
336            }
337        } else
338            fprintf(stderr, "Select timeout\n");
339    }
340}
341
342
343int main(void)
344{
345    const char *spath = RENDER_SOCKET;
346    int fd, i;
347    struct sockaddr_un addr;
348    mode_t old;
349
350    fprintf(stderr, "Rendering daemon\n");
351
352    pthread_mutex_init(&qLock, NULL);
353    reqHead.next = reqHead.prev = &reqHead;
354    dirtyHead.next = dirtyHead.prev = &dirtyHead;
355    renderHead.next = renderHead.prev = &renderHead;
356
357    fd = socket(PF_UNIX, SOCK_STREAM, 0);
358    if (fd < 0) {
359        fprintf(stderr, "failed to create unix sozket\n");
360        exit(2);
361    }
362
363    bzero(&addr, sizeof(addr));
364    addr.sun_family = AF_UNIX;
365    strncpy(addr.sun_path, spath, sizeof(addr.sun_path));
366
367    unlink(addr.sun_path);
368
369    old = umask(0); // Need daemon socket to be writeable by apache
370    if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
371        fprintf(stderr, "socket bind failed for: %s\n", spath);
372        close(fd);
373        exit(3);
374    }
375    umask(old);
376
377    if (listen(fd, QUEUE_MAX) < 0) {
378        fprintf(stderr, "socket listen failed for %d\n", QUEUE_MAX);
379        close(fd);
380        exit(4);
381    }
382
383#if 0
384    if (fcntl(fd, F_SETFD, O_RDWR | O_NONBLOCK) < 0) {
385        fprintf(stderr, "setting socket non-block failed\n");
386        close(fd);
387        exit(5);
388    }
389#endif
390
391    //sigPipeAction.sa_handler = pipe_handler;
392    sigPipeAction.sa_handler = SIG_IGN;
393    if (sigaction(SIGPIPE, &sigPipeAction, NULL) < 0) {
394        fprintf(stderr, "failed to register signal handler\n");
395        close(fd);
396        exit(6);
397    }
398
399    render_init();
400
401    for(i=0; i<NUM_THREADS; i++) {
402        if (pthread_create(&render_threads[i], NULL, render_thread, NULL)) {
403            fprintf(stderr, "error spawning render thread\n");
404            close(fd);
405            exit(7);
406        }
407    }
408    process_loop(fd);
409
410    unlink(spath);
411    close(fd);
412    return 0;
413}
Note: See TracBrowser for help on using the repository browser.