source: subversion/applications/utils/mod_tile/renderd.py @ 13121

Last change on this file since 13121 was 13121, checked in by jonb, 11 years ago

Render daemon code ported to Python. It is functionally equivalent to the C version but it is not intended to replace it. The python version should make it easier to experiment with new types of tile rendering

  • Property svn:executable set to *
File size: 18.6 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5# It needs more work to make it more Pythonic, split it
6# into more appropriate classes, add documentation, fix bugs etc.
7#
8# This is currently experimental and not intended as a replacement
9# of the C implementation! It should allow more people to produce
10# custom variations of the rendering pipeline, e.g. such as compositing
11# tiles from multiple layers.
12#
13# The code functions but I'm not yet convinced this is the correct
14# approach to integrating Python with the render daemon. Two other
15# options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import Queue
36import tempfile
37import errno
38from math import pi,cos,sin,log,exp,atan
39
40MAX_ZOOM = 18
41METATILE = 8
42META_MAGIC = "META"
43
44class protocol:
45    # ENUM values for commandStatus field in protocol packet
46    Ignore = 0
47    Render = 1
48    Dirty = 2
49    Done = 3
50    NotDone = 4
51
52class ProtocolPacket:
53    def __init__(self, version, fields = ""):
54        self.version = version
55        self.xmlname = ""
56        self.x = 0
57        self.y = 0
58        self.z = 0
59        self.mx = 0
60        self.my = 0
61        self.commandStatus = protocol.Ignore
62        self.fields = fields
63
64    def len(self):
65        return struct.calcsize(self.fields)
66
67    def bad_request(self):
68        x = self.x
69        y = self.y
70        z = self.z
71
72        if (z < 0) or (z > MAX_ZOOM):
73            return True
74        limit = (1 << z) -1
75        if (x < 0) or (x > limit):
76            return True
77        if (y < 0) or (y > limit):
78            return True
79        return False
80
81    def meta_tuple(self):
82        # This metatile tuple is used to identify duplicate request in the rendering queue
83        return (self.xmlname, self.mx, self.my, self.z)
84
85class ProtocolPacketV1(ProtocolPacket):
86    def __init__(self):
87        ProtocolPacket(1)
88        self.fields = "5i"
89
90    def receive(self, data, dest):
91        version, request, x, y, z = struct.unpack(self.fields, data)
92
93        if version != 1:
94            print "Received V1 packet with incorect version %d" % version
95        else:
96            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
97            #    % (request, x, y, z)
98            self.commandStatus = request
99            self.x = x
100            self.y = y
101            self.z = z
102            self.xmlname = "Default"
103            # Calculate Meta-tile value for this x/y
104            self.mx = x & (METATILE-1)
105            self.my = y & (METATILE-1)
106            self.dest = dest
107
108
109    def send(self, status):
110        x = self.x
111        y = self.y
112        z = self.z
113        data = struct.pack(self.fields, (1, status, x, y, z))
114        try: 
115            self.dest.send(data)
116        except socket.error, e:
117               if e[0] == errno.EBADF:
118                   print "Got EBADF in socket send"
119               else:
120                   raise
121
122
123class ProtocolPacketV2(ProtocolPacket):
124    def __init__(self):
125        ProtocolPacket(2)
126        self.fields = "5i41sxxx"
127
128    def receive(self, data, dest):
129        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
130
131        if version != 2:
132            print "Received V2 packet with incorect version %d" % version
133        else:
134            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
135            #    % (request, xmlname, x, y, z)
136            self.commandStatus = request
137            self.x = x
138            self.y = y
139            self.z = z
140            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
141            # Calculate Meta-tile value for this x/y
142            self.mx = x & ~(METATILE-1)
143            self.my = y & ~(METATILE-1)
144            self.dest = dest
145
146    def send(self, status):
147        x = self.x
148        y = self.y
149        z = self.z
150        xmlname = self.xmlname
151        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
152        try:
153            self.dest.send(data)
154        except socket.error, e:
155               if e[0] == errno.EBADF:
156                   print "Got EBADF in socket send"
157               else:
158                   raise
159
160DEG_TO_RAD = pi/180
161RAD_TO_DEG = 180/pi
162
163def minmax (a,b,c):
164    a = max(a,b)
165    a = min(a,c)
166    return a
167
168class SphericalProjection:
169    def __init__(self,levels=18):
170        self.Bc = []
171        self.Cc = []
172        self.zc = []
173        self.Ac = []
174        c = 256
175        for d in range(0,levels):
176            e = c/2;
177            self.Bc.append(c/360.0)
178            self.Cc.append(c/(2 * pi))
179            self.zc.append((e,e))
180            self.Ac.append(c)
181            c *= 2
182
183    def fromLLtoPixel(self,ll,zoom):
184         d = self.zc[zoom]
185         e = round(d[0] + ll[0] * self.Bc[zoom])
186         f = minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
187         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
188         return (e,g)
189
190    def fromPixelToLL(self,px,zoom):
191         e = self.zc[zoom]
192         f = (px[0] - e[0])/self.Bc[zoom]
193         g = (px[1] - e[1])/-self.Cc[zoom]
194         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
195         return (f,h)
196
197
198class RenderThread:
199    def __init__(self, tile_path, styles, queue_handler):
200        self.tile_path = tile_path
201        self.queue_handler = queue_handler
202        self.maps = {}
203        METATILE = 8
204        RENDER_SIZE = 256 * (METATILE + 1)
205        for xmlname in styles:
206            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
207            m = mapnik.Map(RENDER_SIZE, RENDER_SIZE)
208            self.maps[xmlname] = m
209            mapnik.load_map(m, styles[xmlname])
210
211        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
212        self.gprj = SphericalProjection(MAX_ZOOM)
213        # This is the Spherical mercator projection (EPSG:900913)
214        self.prj = mapnik.Projection("+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over")
215
216    def render_meta(self, m, style, x, y, z, sz):
217        # Calculate pixel positions of bottom-left & top-right
218        p0 = (x * 256, (y + sz) * 256)
219        p1 = ((x + sz) * 256, y * 256)
220
221        # Convert to LatLong (EPSG:4326)
222        l0 = self.gprj.fromPixelToLL(p0, z);
223        l1 = self.gprj.fromPixelToLL(p1, z);
224
225        # Convert to mercator co-ords (EPSG:900913)
226        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
227        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))
228
229        # Bounding box for the meta-tile
230        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
231        # Expand tile to provide a gutter which avoids features getting lost at edge of metatile
232        scale = (sz+1.0)/sz
233        bbox.width(bbox.width() * scale)
234        bbox.height(bbox.height() * scale)
235        # Calculate meta tile size in pixels
236        render_size = 256 * (sz + 1)
237        #m.resize(render_size, render_size);
238        m.zoom_to_box(bbox)
239
240        # Render image
241        im = mapnik.Image(render_size, render_size)
242        mapnik.render(m, im)
243
244        tiles = {}
245
246        # Split image up into NxN grid of tile
247        for yy in range(0,sz):
248            for xx in range(0,sz):
249                # Position of tile, offset due to gutter
250                yoff = 128 + yy * 256
251                xoff = 128 + xx * 256
252                view = im.view(xoff, yoff, 256, 256)
253                tile = view.tostring('png256')
254                #print "Got view of z(%d) x(%d) y(%d), len(%d)" % (z, x+xx, y+yy, len(tile))
255                tiles[(xx, yy)] = tile
256
257        return tiles
258
259    def render_request(self, r):
260        # Calculate the meta tile size to use for this zoom level
261        size = min(METATILE, 1 << r.z)
262        xmlname = r.xmlname
263        x = r.mx
264        y = r.my
265        z = r.z
266        try:
267            m = self.maps[xmlname]
268        except KeyError:
269            print "No map for: '%s'" % xmlname
270            return False
271        tiles = self.render_meta(m, xmlname, x, y, z, size)
272        status = self.meta_save(xmlname, x, y, z, size, tiles)
273
274        if status == True:
275            print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
276            (xmlname, r.z, x, x+size-1, y, y+size-1)
277        else:
278            print "FAILED xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
279            (xmlname, z, x, x+size-1, y, y+size-1)
280
281        return status;
282
283    def xyz_to_meta(self, xmlname, x,y, z):
284        mask = METATILE -1
285        x &= ~mask
286        y &= ~mask
287        hashes = {}
288
289        for i in range(0,5):
290            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
291            x >>= 4
292            y >>= 4
293
294        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
295        return meta
296
297    def xyz_to_meta_offset(self, xmlname, x,y, z):
298        mask = METATILE -1
299        offset = (x & mask) * METATILE + (y & mask)
300        return offset
301
302
303    def meta_save(self, xmlname, x, y, z, size, tiles):
304        #print "Saving %d tiles" % (size * size)
305        meta_path = self.xyz_to_meta(xmlname, x, y, z)
306        d = os.path.dirname(meta_path)
307        if not os.path.exists(d):
308            try:
309                os.makedirs(d)
310            except OSError:
311                # Multiple threads can race when creating directories,
312                # ignore exception if the directory now exists
313                if not os.path.exists(d):
314                    raise
315
316        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
317        f = open(tmp, "w")
318
319        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
320        offset = len(META_MAGIC) + 4 * 4
321        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
322        offset += (2 * 4) * (METATILE * METATILE)
323        # Collect all the tile sizes
324        sizes = {}
325        offsets = {}
326        for xx in range(0, size):
327            for yy in range(0, size):
328                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
329                sizes[mt] = len(tiles[(xx, yy)])
330                offsets[mt] = offset
331                offset += sizes[mt]
332        # Write out the offset/size table
333        for mt in range(0, METATILE * METATILE):
334            if mt in sizes:
335                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
336            else:
337                f.write(struct.pack("2i", 0, 0))
338        # Write out the tiles
339        for xx in range(0, size):
340            for yy in range(0, size):
341                f.write(tiles[(xx, yy)])
342
343        f.close()
344        os.rename(tmp, meta_path)
345        print "Wrote: %s" % meta_path
346
347        return True
348
349    def loop(self):
350        while True:
351            #Fetch a meta-tile to render
352            r = self.queue_handler.fetch()
353            rendered = self.render_request(r)
354            # Retrieve all requests for this meta-tile
355            requests = self.queue_handler.pop_requests(r)
356            for request in requests:
357                if (request.commandStatus == protocol.Render):
358                    if rendered == True:
359                        request.send(protocol.Done)
360                    else:
361                        request.send(protocol.NotDone)
362            #time.sleep(1)
363        print "Dummy render thread, exiting. Path %s" % self.tile_path
364
365
366def start_renderers(num_threads, tile_path, styles, queue_handler):
367    for i in range(num_threads):
368        renderer = RenderThread(tile_path, styles, queue_handler)
369        render_thread = threading.Thread(target=renderer.loop)
370        render_thread.setDaemon(True)
371        render_thread.start()
372        print "Started render thread %s" % render_thread.getName()
373
374class RequestQueues:
375    def __init__(self, request_limit = 32, dirty_limit = 1000):
376        # Queues are used as follows:
377        # - Incoming render requests are initally put into the request queue
378        # If the request queue is full then the new request is demoted dirty queue
379        # - Incoming 'dirty' requests are put into the dirty queue or overflow from render queue
380        # - The render queue holds the requests which are in progress by the render threads
381        self.requests = {}
382        self.dirties = {}
383        self.rendering = {}
384
385        self.request_limit = request_limit
386        self.dirty_limit = dirty_limit
387        self.cond = threading.Condition()
388
389
390    def add(self, request):
391        self.cond.acquire()
392        try:
393            # FIXME: Add short-circuit for overload condition?
394            if request.meta_tuple() in self.rendering:
395                self.rendering[request.meta_tuple()].append(request)
396                return "rendering"
397            elif request.meta_tuple() in self.requests:
398                self.requests[request.meta_tuple()].append(request)
399                return "requested"
400            elif request.meta_tuple() in self.dirties:
401                self.dirties[request.meta_tuple()].append(request)
402                return "dirty"
403            elif request.commandStatus == protocol.Render and len(self.requests) < self.request_limit:
404                self.requests[request.meta_tuple()] = [request]
405                self.cond.notify()
406                return "requested"
407            elif len(self.dirties) < self.dirty_limit:
408                self.dirties[request.meta_tuple()] = [request]
409                self.cond.notify()
410                return "dirty"
411            else:
412                return "dropped"
413        finally:
414            self.cond.release()
415
416
417    def fetch(self):
418        self.cond.acquire()
419        try:
420            while len(self.requests) == 0 and len(self.dirties) == 0:
421                self.cond.wait()
422            # Pull request from one of the incoming queues
423            try:
424                item = self.requests.popitem()
425            except KeyError:
426                try:
427                    item = self.dirties.popitem()
428                except KeyError:
429                    print "Odd, queues empty"
430                    return
431
432            # Push request list on to the list of items being rendered
433            k = item[0]
434            v = item[1] # This is a list of all requests for this meta-tile
435            self.rendering[k] = v
436            # Return the first request from the list
437            return v[0]
438        finally:
439            self.cond.release()
440
441    def pop_requests(self, request):
442        self.cond.acquire()
443        try:
444            return self.rendering.pop(request.meta_tuple())
445        except KeyError:
446            # It is not yet clear why this happens, there should always be
447            # an entry in the rendering queue for each active meta -tile request
448            print "WARNING: Failed to locate request in rendering list!"
449            return (request,)
450        finally:
451            self.cond.release()
452
453
454class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
455
456    def rx_request(self, request):
457        if (request.commandStatus != protocol.Render) \
458           and (request.commandStatus != protocol.Dirty):
459               return
460
461        if request.bad_request():
462            if (request.commandStatus == protocol.Render):
463                request.send(protocol.NotDone)
464            return
465
466        cur_thread = threading.currentThread()
467        print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
468            (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
469
470        status = self.server.queue_handler.add(request)
471        if status in ("rendering", "requested"):
472            # Request queued, response will be sent on completion
473            return
474
475        # The tile won't be rendered soon
476        if (request.commandStatus == protocol.Render):
477            request.send(protocol.NotDone)
478
479    def handle(self):
480        cur_thread = threading.currentThread()
481        print "%s: New connection" % cur_thread.getName()
482        req_v1 = ProtocolPacketV1()
483        req_v2 = ProtocolPacketV2()
484        max_len = max(req_v1.len(), req_v2.len())
485
486        while True:
487            try:
488                data = self.request.recv(max_len)
489            except socket.error, e:
490                if e[0] == errno.ECONNRESET:
491                    print "Connection reset by peer"
492                    break
493                else:
494                    raise
495
496            if len(data) == req_v1.len():
497                req_v1.receive(data, self.request)
498                self.rx_request(req_v1)
499            if len(data) == req_v2.len():
500                req_v2.receive(data, self.request)
501                self.rx_request(req_v2)
502            elif len(data) == 0:
503                print "%s: Connection closed" % cur_thread.getName()
504                break
505            else:
506                print "Invalid request length %d" % len(data)
507                break
508
509class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
510    def __init__(self, address, queue_handler, handler):
511        if(os.path.exists(address)):
512           os.unlink(address)
513        self.address = address
514        self.queue_handler = queue_handler
515        SocketServer.UnixStreamServer.__init__(self, address, handler)
516        self.daemon_threads = True
517
518def listener(address, queue_handler):
519    # Create the server
520    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
521    # The socket needs to be writeable by Apache
522    os.chmod(address, 0666)
523    # Loop forever servicing requests
524    server.serve_forever()
525
526def display_config(config):
527    for xmlname in config.sections():
528        print "Layer name: %s" % xmlname
529        #for opt in config.options(xmlname):
530        #   print "%s = %s" % (opt, config.get(xmlname, opt))
531        uri = config.get(xmlname, "uri")
532        xml = config.get(xmlname, "xml")
533        print "    URI(%s) = XML(%s)" % (uri, xml)
534
535def read_styles(config):
536    styles = {}
537    for xmlname in config.sections():
538        styles[xmlname] = config.get(xmlname, "xml")
539    return styles
540
541if __name__ == "__main__":
542    try:
543        cfg_file = os.environ['RENDERD_CFG']
544    except KeyError:
545        cfg_file = "/etc/renderd.conf"
546
547    # FIXME: Move more of these to config file?
548    RENDER_SOCKET = "/tmp/osm-renderd"
549    HASH_PATH = "/var/lib/mod_tile"
550    NUM_THREADS = 4
551
552    config = ConfigParser.ConfigParser()
553    config.read(cfg_file)
554    display_config(config)
555    styles = read_styles(config)
556
557    queue_handler = RequestQueues()
558    start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
559    listener(RENDER_SOCKET, queue_handler)
Note: See TracBrowser for help on using the repository browser.