source: subversion/applications/utils/mod_tile/renderd.py @ 13127

Last change on this file since 13127 was 13127, checked in by jonb, 11 years ago

Fix generation of zoom 0 - 2 tiles

  • Property svn:executable set to *
File size: 18.6 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5# It needs more work to make it more Pythonic, split it
6# into more appropriate classes, add documentation, fix bugs etc.
7#
8# This is currently experimental and not intended as a replacement
9# of the C implementation! It should allow more people to produce
10# custom variations of the rendering pipeline, e.g. such as compositing
11# tiles from multiple layers.
12#
13# The code functions but I'm not yet convinced this is the correct
14# approach to integrating Python with the render daemon. Two other
15# options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
37
38MAX_ZOOM = 18
39METATILE = 8
40META_MAGIC = "META"
41
42class protocol:
43    # ENUM values for commandStatus field in protocol packet
44    Ignore = 0
45    Render = 1
46    Dirty = 2
47    Done = 3
48    NotDone = 4
49
50class ProtocolPacket:
51    def __init__(self, version, fields = ""):
52        self.version = version
53        self.xmlname = ""
54        self.x = 0
55        self.y = 0
56        self.z = 0
57        self.mx = 0
58        self.my = 0
59        self.commandStatus = protocol.Ignore
60        self.fields = fields
61
62    def len(self):
63        return struct.calcsize(self.fields)
64
65    def bad_request(self):
66        x = self.x
67        y = self.y
68        z = self.z
69
70        if (z < 0) or (z > MAX_ZOOM):
71            return True
72        limit = (1 << z) -1
73        if (x < 0) or (x > limit):
74            return True
75        if (y < 0) or (y > limit):
76            return True
77        return False
78
79    def meta_tuple(self):
80        # This metatile tuple is used to identify duplicate request in the rendering queue
81        return (self.xmlname, self.mx, self.my, self.z)
82
83class ProtocolPacketV1(ProtocolPacket):
84    def __init__(self):
85        ProtocolPacket(1)
86        self.fields = "5i"
87
88    def receive(self, data, dest):
89        version, request, x, y, z = struct.unpack(self.fields, data)
90
91        if version != 1:
92            print "Received V1 packet with incorect version %d" % version
93        else:
94            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
95            #    % (request, x, y, z)
96            self.commandStatus = request
97            self.x = x
98            self.y = y
99            self.z = z
100            self.xmlname = "Default"
101            # Calculate Meta-tile value for this x/y
102            self.mx = x & (METATILE-1)
103            self.my = y & (METATILE-1)
104            self.dest = dest
105
106
107    def send(self, status):
108        x = self.x
109        y = self.y
110        z = self.z
111        data = struct.pack(self.fields, (1, status, x, y, z))
112        try: 
113            self.dest.send(data)
114        except socket.error, e:
115               if e[0] == errno.EBADF:
116                   print "Got EBADF in socket send"
117               else:
118                   raise
119
120
121class ProtocolPacketV2(ProtocolPacket):
122    def __init__(self):
123        ProtocolPacket(2)
124        self.fields = "5i41sxxx"
125
126    def receive(self, data, dest):
127        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
128
129        if version != 2:
130            print "Received V2 packet with incorect version %d" % version
131        else:
132            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
133            #    % (request, xmlname, x, y, z)
134            self.commandStatus = request
135            self.x = x
136            self.y = y
137            self.z = z
138            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
139            # Calculate Meta-tile value for this x/y
140            self.mx = x & ~(METATILE-1)
141            self.my = y & ~(METATILE-1)
142            self.dest = dest
143
144    def send(self, status):
145        x = self.x
146        y = self.y
147        z = self.z
148        xmlname = self.xmlname
149        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
150        try:
151            self.dest.send(data)
152        except socket.error, e:
153               if e[0] == errno.EBADF:
154                   print "Got EBADF in socket send"
155               else:
156                   raise
157
158DEG_TO_RAD = pi/180
159RAD_TO_DEG = 180/pi
160
161
162class SphericalProjection:
163    def __init__(self,levels=18):
164        self.Bc = []
165        self.Cc = []
166        self.zc = []
167        self.Ac = []
168        c = 256
169        for d in range(0,levels):
170            e = c/2;
171            self.Bc.append(c/360.0)
172            self.Cc.append(c/(2 * pi))
173            self.zc.append((e,e))
174            self.Ac.append(c)
175            c *= 2
176
177    def minmax(self, a,b,c):
178        a = max(a,b)
179        a = min(a,c)
180        return a
181
182    def fromLLtoPixel(self,ll,zoom):
183         d = self.zc[zoom]
184         e = round(d[0] + ll[0] * self.Bc[zoom])
185         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
186         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
187         return (e,g)
188
189    def fromPixelToLL(self,px,zoom):
190         e = self.zc[zoom]
191         f = (px[0] - e[0])/self.Bc[zoom]
192         g = (px[1] - e[1])/-self.Cc[zoom]
193         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
194         return (f,h)
195
196
197class RenderThread:
198    def __init__(self, tile_path, styles, queue_handler):
199        self.tile_path = tile_path
200        self.queue_handler = queue_handler
201        self.maps = {}
202        METATILE = 8
203        RENDER_SIZE = 256 * (METATILE + 1)
204        for xmlname in styles:
205            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
206            m = mapnik.Map(RENDER_SIZE, RENDER_SIZE)
207            self.maps[xmlname] = m
208            mapnik.load_map(m, styles[xmlname])
209
210        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
211        self.gprj = SphericalProjection(MAX_ZOOM)
212        # This is the Spherical mercator projection (EPSG:900913)
213        self.prj = mapnik.Projection("+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over")
214
215    def render_meta(self, m, style, x, y, z, sz):
216        # Calculate pixel positions of bottom-left & top-right
217        p0 = (x * 256, (y + sz) * 256)
218        p1 = ((x + sz) * 256, y * 256)
219
220        # Convert to LatLong (EPSG:4326)
221        l0 = self.gprj.fromPixelToLL(p0, z);
222        l1 = self.gprj.fromPixelToLL(p1, z);
223
224        # Convert to mercator co-ords (EPSG:900913)
225        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
226        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))
227
228        # Bounding box for the meta-tile
229        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
230        # Expand tile to provide a gutter which avoids features getting lost at edge of metatile
231        scale = (sz+1.0)/sz
232        bbox.width(bbox.width() * scale)
233        bbox.height(bbox.height() * scale)
234        # Calculate meta tile size in pixels
235        render_size = 256 * (sz + 1)
236        m.width = render_size
237        m.height = render_size
238        m.zoom_to_box(bbox)
239
240        # Render image
241        im = mapnik.Image(render_size, render_size)
242        mapnik.render(m, im)
243
244        tiles = {}
245
246        # Split image up into NxN grid of tile
247        for yy in range(0,sz):
248            for xx in range(0,sz):
249                # Position of tile, offset due to gutter
250                yoff = 128 + yy * 256
251                xoff = 128 + xx * 256
252                view = im.view(xoff, yoff, 256, 256)
253                tile = view.tostring('png256')
254                #print "Got view of z(%d) x(%d) y(%d), len(%d)" % (z, x+xx, y+yy, len(tile))
255                tiles[(xx, yy)] = tile
256
257        return tiles
258
259    def render_request(self, r):
260        # Calculate the meta tile size to use for this zoom level
261        size = min(METATILE, 1 << r.z)
262        xmlname = r.xmlname
263        x = r.mx
264        y = r.my
265        z = r.z
266        try:
267            m = self.maps[xmlname]
268        except KeyError:
269            print "No map for: '%s'" % xmlname
270            return False
271        tiles = self.render_meta(m, xmlname, x, y, z, size)
272        status = self.meta_save(xmlname, x, y, z, size, tiles)
273
274        if status == True:
275            print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
276            (xmlname, r.z, x, x+size-1, y, y+size-1)
277        else:
278            print "FAILED xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
279            (xmlname, z, x, x+size-1, y, y+size-1)
280
281        return status;
282
283    def xyz_to_meta(self, xmlname, x,y, z):
284        mask = METATILE -1
285        x &= ~mask
286        y &= ~mask
287        hashes = {}
288
289        for i in range(0,5):
290            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
291            x >>= 4
292            y >>= 4
293
294        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
295        return meta
296
297    def xyz_to_meta_offset(self, xmlname, x,y, z):
298        mask = METATILE -1
299        offset = (x & mask) * METATILE + (y & mask)
300        return offset
301
302
303    def meta_save(self, xmlname, x, y, z, size, tiles):
304        #print "Saving %d tiles" % (size * size)
305        meta_path = self.xyz_to_meta(xmlname, x, y, z)
306        d = os.path.dirname(meta_path)
307        if not os.path.exists(d):
308            try:
309                os.makedirs(d)
310            except OSError:
311                # Multiple threads can race when creating directories,
312                # ignore exception if the directory now exists
313                if not os.path.exists(d):
314                    raise
315
316        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
317        f = open(tmp, "w")
318
319        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
320        offset = len(META_MAGIC) + 4 * 4
321        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
322        offset += (2 * 4) * (METATILE * METATILE)
323        # Collect all the tile sizes
324        sizes = {}
325        offsets = {}
326        for xx in range(0, size):
327            for yy in range(0, size):
328                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
329                sizes[mt] = len(tiles[(xx, yy)])
330                offsets[mt] = offset
331                offset += sizes[mt]
332        # Write out the offset/size table
333        for mt in range(0, METATILE * METATILE):
334            if mt in sizes:
335                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
336            else:
337                f.write(struct.pack("2i", 0, 0))
338        # Write out the tiles
339        for xx in range(0, size):
340            for yy in range(0, size):
341                f.write(tiles[(xx, yy)])
342
343        f.close()
344        os.rename(tmp, meta_path)
345        print "Wrote: %s" % meta_path
346
347        return True
348
349    def loop(self):
350        while True:
351            #Fetch a meta-tile to render
352            r = self.queue_handler.fetch()
353            rendered = self.render_request(r)
354            # Retrieve all requests for this meta-tile
355            requests = self.queue_handler.pop_requests(r)
356            for request in requests:
357                if (request.commandStatus == protocol.Render):
358                    if rendered == True:
359                        request.send(protocol.Done)
360                    else:
361                        request.send(protocol.NotDone)
362            #time.sleep(1)
363        print "Dummy render thread, exiting. Path %s" % self.tile_path
364
365
366def start_renderers(num_threads, tile_path, styles, queue_handler):
367    for i in range(num_threads):
368        renderer = RenderThread(tile_path, styles, queue_handler)
369        render_thread = threading.Thread(target=renderer.loop)
370        render_thread.setDaemon(True)
371        render_thread.start()
372        print "Started render thread %s" % render_thread.getName()
373
374class RequestQueues:
375    def __init__(self, request_limit = 32, dirty_limit = 1000):
376        # Queues are used as follows:
377        # - Incoming render requests are initally put into the request queue
378        # If the request queue is full then the new request is demoted dirty queue
379        # - Incoming 'dirty' requests are put into the dirty queue or overflow from render queue
380        # - The render queue holds the requests which are in progress by the render threads
381        self.requests = {}
382        self.dirties = {}
383        self.rendering = {}
384
385        self.request_limit = request_limit
386        self.dirty_limit = dirty_limit
387        self.cond = threading.Condition()
388
389
390    def add(self, request):
391        self.cond.acquire()
392        try:
393            # FIXME: Add short-circuit for overload condition?
394            if request.meta_tuple() in self.rendering:
395                self.rendering[request.meta_tuple()].append(request)
396                return "rendering"
397            elif request.meta_tuple() in self.requests:
398                self.requests[request.meta_tuple()].append(request)
399                return "requested"
400            elif request.meta_tuple() in self.dirties:
401                self.dirties[request.meta_tuple()].append(request)
402                return "dirty"
403            elif request.commandStatus == protocol.Render and len(self.requests) < self.request_limit:
404                self.requests[request.meta_tuple()] = [request]
405                self.cond.notify()
406                return "requested"
407            elif len(self.dirties) < self.dirty_limit:
408                self.dirties[request.meta_tuple()] = [request]
409                self.cond.notify()
410                return "dirty"
411            else:
412                return "dropped"
413        finally:
414            self.cond.release()
415
416
417    def fetch(self):
418        self.cond.acquire()
419        try:
420            while len(self.requests) == 0 and len(self.dirties) == 0:
421                self.cond.wait()
422            # Pull request from one of the incoming queues
423            try:
424                item = self.requests.popitem()
425            except KeyError:
426                try:
427                    item = self.dirties.popitem()
428                except KeyError:
429                    print "Odd, queues empty"
430                    return
431
432            # Push request list on to the list of items being rendered
433            k = item[0]
434            v = item[1] # This is a list of all requests for this meta-tile
435            self.rendering[k] = v
436            # Return the first request from the list
437            return v[0]
438        finally:
439            self.cond.release()
440
441    def pop_requests(self, request):
442        self.cond.acquire()
443        try:
444            return self.rendering.pop(request.meta_tuple())
445        except KeyError:
446            # It is not yet clear why this happens, there should always be
447            # an entry in the rendering queue for each active meta -tile request
448            print "WARNING: Failed to locate request in rendering list!"
449            return (request,)
450        finally:
451            self.cond.release()
452
453
454class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
455
456    def rx_request(self, request):
457        if (request.commandStatus != protocol.Render) \
458           and (request.commandStatus != protocol.Dirty):
459               return
460
461        if request.bad_request():
462            if (request.commandStatus == protocol.Render):
463                request.send(protocol.NotDone)
464            return
465
466        cur_thread = threading.currentThread()
467        print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
468            (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
469
470        status = self.server.queue_handler.add(request)
471        if status in ("rendering", "requested"):
472            # Request queued, response will be sent on completion
473            return
474
475        # The tile won't be rendered soon
476        if (request.commandStatus == protocol.Render):
477            request.send(protocol.NotDone)
478
479    def handle(self):
480        cur_thread = threading.currentThread()
481        print "%s: New connection" % cur_thread.getName()
482        req_v1 = ProtocolPacketV1()
483        req_v2 = ProtocolPacketV2()
484        max_len = max(req_v1.len(), req_v2.len())
485
486        while True:
487            try:
488                data = self.request.recv(max_len)
489            except socket.error, e:
490                if e[0] == errno.ECONNRESET:
491                    print "Connection reset by peer"
492                    break
493                else:
494                    raise
495
496            if len(data) == req_v1.len():
497                req_v1.receive(data, self.request)
498                self.rx_request(req_v1)
499            if len(data) == req_v2.len():
500                req_v2.receive(data, self.request)
501                self.rx_request(req_v2)
502            elif len(data) == 0:
503                print "%s: Connection closed" % cur_thread.getName()
504                break
505            else:
506                print "Invalid request length %d" % len(data)
507                break
508
509class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
510    def __init__(self, address, queue_handler, handler):
511        if(os.path.exists(address)):
512           os.unlink(address)
513        self.address = address
514        self.queue_handler = queue_handler
515        SocketServer.UnixStreamServer.__init__(self, address, handler)
516        self.daemon_threads = True
517
518def listener(address, queue_handler):
519    # Create the server
520    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
521    # The socket needs to be writeable by Apache
522    os.chmod(address, 0666)
523    # Loop forever servicing requests
524    server.serve_forever()
525
526def display_config(config):
527    for xmlname in config.sections():
528        print "Layer name: %s" % xmlname
529        #for opt in config.options(xmlname):
530        #   print "%s = %s" % (opt, config.get(xmlname, opt))
531        uri = config.get(xmlname, "uri")
532        xml = config.get(xmlname, "xml")
533        print "    URI(%s) = XML(%s)" % (uri, xml)
534
535def read_styles(config):
536    styles = {}
537    for xmlname in config.sections():
538        styles[xmlname] = config.get(xmlname, "xml")
539    return styles
540
541if __name__ == "__main__":
542    try:
543        cfg_file = os.environ['RENDERD_CFG']
544    except KeyError:
545        cfg_file = "/etc/renderd.conf"
546
547    # FIXME: Move more of these to config file?
548    RENDER_SOCKET = "/tmp/osm-renderd"
549    HASH_PATH = "/var/lib/mod_tile"
550    NUM_THREADS = 4
551
552    config = ConfigParser.ConfigParser()
553    config.read(cfg_file)
554    display_config(config)
555    styles = read_styles(config)
556
557    queue_handler = RequestQueues()
558    start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
559    listener(RENDER_SOCKET, queue_handler)
Note: See TracBrowser for help on using the repository browser.