source: subversion/applications/utils/mod_tile/renderd.py @ 28681

Last change on this file since 28681 was 15889, checked in by jonb, 10 years ago

Update renderd.py to obtain more config parameters from renderd.conf

  • Property svn:executable set to *
File size: 18.8 KB
RevLine 
[13121]1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
[13273]5#
6# This is currently experimental and not intended as a replacement
7# of the C implementation, but works surpringly well. It should be
8# easier to produce custom variations of the rendering pipeline,
9# e.g. such as compositing tiles from multiple layers.
10#
[13121]11# It needs more work to make it more Pythonic, split it
12# into more appropriate classes, add documentation, fix bugs etc.
13#
[13273]14# I'm not yet convinced this is the best approach to integrating
15# Python with the render daemon. Two other options I'm considering:
[13121]16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
[15889]37from StringIO import StringIO
[13121]38
[13133]39import cairo
40import cStringIO
41
[13121]42MAX_ZOOM = 18
43METATILE = 8
44META_MAGIC = "META"
45
46class protocol:
47    # ENUM values for commandStatus field in protocol packet
48    Ignore = 0
49    Render = 1
50    Dirty = 2
51    Done = 3
52    NotDone = 4
53
54class ProtocolPacket:
55    def __init__(self, version, fields = ""):
56        self.version = version
57        self.xmlname = ""
58        self.x = 0
59        self.y = 0
60        self.z = 0
61        self.mx = 0
62        self.my = 0
63        self.commandStatus = protocol.Ignore
64        self.fields = fields
65
66    def len(self):
67        return struct.calcsize(self.fields)
68
69    def bad_request(self):
[13273]70        # Check that the requested (x,y,z) is invalid
[13121]71        x = self.x
72        y = self.y
73        z = self.z
74
75        if (z < 0) or (z > MAX_ZOOM):
76            return True
77        limit = (1 << z) -1
78        if (x < 0) or (x > limit):
79            return True
80        if (y < 0) or (y > limit):
81            return True
82        return False
83
84    def meta_tuple(self):
85        # This metatile tuple is used to identify duplicate request in the rendering queue
86        return (self.xmlname, self.mx, self.my, self.z)
87
88class ProtocolPacketV1(ProtocolPacket):
89    def __init__(self):
90        ProtocolPacket(1)
91        self.fields = "5i"
92
93    def receive(self, data, dest):
94        version, request, x, y, z = struct.unpack(self.fields, data)
95
96        if version != 1:
97            print "Received V1 packet with incorect version %d" % version
98        else:
99            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
100            #    % (request, x, y, z)
101            self.commandStatus = request
102            self.x = x
103            self.y = y
104            self.z = z
[15888]105            self.xmlname = "default"
[13121]106            # Calculate Meta-tile value for this x/y
[13239]107            self.mx = x & ~(METATILE-1)
108            self.my = y & ~(METATILE-1)
[13121]109            self.dest = dest
110
111
112    def send(self, status):
113        x = self.x
114        y = self.y
115        z = self.z
116        data = struct.pack(self.fields, (1, status, x, y, z))
117        try: 
118            self.dest.send(data)
119        except socket.error, e:
[13239]120               if e[0] != errno.EBADF:
[13121]121                   raise
122
123
124class ProtocolPacketV2(ProtocolPacket):
125    def __init__(self):
126        ProtocolPacket(2)
127        self.fields = "5i41sxxx"
128
129    def receive(self, data, dest):
130        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
131
132        if version != 2:
133            print "Received V2 packet with incorect version %d" % version
134        else:
135            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
136            #    % (request, xmlname, x, y, z)
137            self.commandStatus = request
138            self.x = x
139            self.y = y
140            self.z = z
141            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
142            # Calculate Meta-tile value for this x/y
143            self.mx = x & ~(METATILE-1)
144            self.my = y & ~(METATILE-1)
145            self.dest = dest
146
147    def send(self, status):
148        x = self.x
149        y = self.y
150        z = self.z
151        xmlname = self.xmlname
152        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
153        try:
154            self.dest.send(data)
155        except socket.error, e:
[13239]156               if e[0] != errno.EBADF:
[13121]157                   raise
158
159DEG_TO_RAD = pi/180
160RAD_TO_DEG = 180/pi
161
162
163class SphericalProjection:
164    def __init__(self,levels=18):
165        self.Bc = []
166        self.Cc = []
167        self.zc = []
168        self.Ac = []
169        c = 256
[13133]170        for d in range(0,levels+1):
[13121]171            e = c/2;
172            self.Bc.append(c/360.0)
173            self.Cc.append(c/(2 * pi))
174            self.zc.append((e,e))
175            self.Ac.append(c)
176            c *= 2
177
[13125]178    def minmax(self, a,b,c):
179        a = max(a,b)
180        a = min(a,c)
181        return a
182
[13121]183    def fromLLtoPixel(self,ll,zoom):
184         d = self.zc[zoom]
185         e = round(d[0] + ll[0] * self.Bc[zoom])
[13125]186         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
[13121]187         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
188         return (e,g)
189
190    def fromPixelToLL(self,px,zoom):
191         e = self.zc[zoom]
192         f = (px[0] - e[0])/self.Bc[zoom]
193         g = (px[1] - e[1])/-self.Cc[zoom]
194         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
195         return (f,h)
196
197
198class RenderThread:
199    def __init__(self, tile_path, styles, queue_handler):
200        self.tile_path = tile_path
201        self.queue_handler = queue_handler
202        self.maps = {}
[13273]203        self.prj = {}
[13121]204        for xmlname in styles:
205            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
[13236]206            m = mapnik.Map(256, 256)
[13121]207            self.maps[xmlname] = m
[13273]208            # Load XML style
[13239]209            mapnik.load_map(m, styles[xmlname], True)
[13273]210            # Obtain <Map> projection
211            self.prj[xmlname] = mapnik.Projection(m.srs)
[13121]212
213        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
[13273]214        self.tileproj = SphericalProjection(MAX_ZOOM)
[13121]215
[13236]216    def render_with_agg(self, m, size):
[13133]217        # Render image with default Agg renderer
[13236]218        im = mapnik.Image(size, size)
[13133]219        mapnik.render(m, im)
[13236]220        return im
[13133]221
[13236]222    def render_with_cairo(self, m, size):
223        surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, size, size)
224        mapnik.render(m, surface)
225        return mapnik.Image.from_cairo(surface)
226
227    def split_meta_image(self, im, sz, format = 'png256'):
228        # Split image up into NxN grid of tile images
[13133]229        tiles = {}
230        for yy in range(0,sz):
231            for xx in range(0,sz):
[13236]232                view = im.view(xx * 256 , yy * 256, 256, 256)
233                tile = view.tostring(format)
[13133]234                tiles[(xx, yy)] = tile
235
236        return tiles
237
238
[13121]239    def render_meta(self, m, style, x, y, z, sz):
240        # Calculate pixel positions of bottom-left & top-right
241        p0 = (x * 256, (y + sz) * 256)
242        p1 = ((x + sz) * 256, y * 256)
243
244        # Convert to LatLong (EPSG:4326)
[13273]245        l0 = self.tileproj.fromPixelToLL(p0, z);
246        l1 = self.tileproj.fromPixelToLL(p1, z);
[13121]247
[13273]248        # Convert to map projection (e.g. mercator co-ords EPSG:900913)
249        c0 = self.prj[style].forward(mapnik.Coord(l0[0],l0[1]))
250        c1 = self.prj[style].forward(mapnik.Coord(l1[0],l1[1]))
[13121]251
252        # Bounding box for the meta-tile
253        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
[13236]254        render_size = 256 * sz
255        m.resize(render_size, render_size)
[13121]256        m.zoom_to_box(bbox)
[13236]257        m.buffer_size = 128
[13121]258
[13236]259        im = self.render_with_agg(m, render_size)
260        #im = self.render_with_cairo(m, render_size)
261        return self.split_meta_image(im, sz)
[13121]262
[13273]263    def render_request(self, t):
264        (xmlname, x, y, z) = t
[13121]265        # Calculate the meta tile size to use for this zoom level
[13273]266        size = min(METATILE, 1 << z)
[13121]267        try:
268            m = self.maps[xmlname]
269        except KeyError:
270            print "No map for: '%s'" % xmlname
271            return False
272        tiles = self.render_meta(m, xmlname, x, y, z, size)
[13240]273        self.meta_save(xmlname, x, y, z, size, tiles)
[13121]274
[13240]275        print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
[13273]276            (xmlname, z, x, x+size-1, y, y+size-1)
[13121]277
[13240]278        return True;
[13121]279
280    def xyz_to_meta(self, xmlname, x,y, z):
281        mask = METATILE -1
282        x &= ~mask
283        y &= ~mask
284        hashes = {}
285
286        for i in range(0,5):
287            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
288            x >>= 4
289            y >>= 4
290
291        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
292        return meta
293
294    def xyz_to_meta_offset(self, xmlname, x,y, z):
295        mask = METATILE -1
296        offset = (x & mask) * METATILE + (y & mask)
297        return offset
298
299
300    def meta_save(self, xmlname, x, y, z, size, tiles):
301        #print "Saving %d tiles" % (size * size)
302        meta_path = self.xyz_to_meta(xmlname, x, y, z)
303        d = os.path.dirname(meta_path)
304        if not os.path.exists(d):
305            try:
306                os.makedirs(d)
307            except OSError:
308                # Multiple threads can race when creating directories,
309                # ignore exception if the directory now exists
310                if not os.path.exists(d):
311                    raise
312
313        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
314        f = open(tmp, "w")
315
316        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
317        offset = len(META_MAGIC) + 4 * 4
318        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
319        offset += (2 * 4) * (METATILE * METATILE)
320        # Collect all the tile sizes
321        sizes = {}
322        offsets = {}
323        for xx in range(0, size):
324            for yy in range(0, size):
325                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
326                sizes[mt] = len(tiles[(xx, yy)])
327                offsets[mt] = offset
328                offset += sizes[mt]
329        # Write out the offset/size table
330        for mt in range(0, METATILE * METATILE):
331            if mt in sizes:
332                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
333            else:
334                f.write(struct.pack("2i", 0, 0))
335        # Write out the tiles
336        for xx in range(0, size):
337            for yy in range(0, size):
338                f.write(tiles[(xx, yy)])
339
340        f.close()
341        os.rename(tmp, meta_path)
[13239]342        #print "Wrote: %s" % meta_path
[13121]343
344    def loop(self):
345        while True:
346            #Fetch a meta-tile to render
347            r = self.queue_handler.fetch()
348            rendered = self.render_request(r)
349            # Retrieve all requests for this meta-tile
350            requests = self.queue_handler.pop_requests(r)
351            for request in requests:
352                if (request.commandStatus == protocol.Render):
353                    if rendered == True:
354                        request.send(protocol.Done)
355                    else:
356                        request.send(protocol.NotDone)
357
358
359def start_renderers(num_threads, tile_path, styles, queue_handler):
360    for i in range(num_threads):
361        renderer = RenderThread(tile_path, styles, queue_handler)
362        render_thread = threading.Thread(target=renderer.loop)
363        render_thread.setDaemon(True)
364        render_thread.start()
365        print "Started render thread %s" % render_thread.getName()
366
367class RequestQueues:
368    def __init__(self, request_limit = 32, dirty_limit = 1000):
[13273]369        # We store requests in several lists
[13121]370        # - Incoming render requests are initally put into the request queue
[13273]371        # If the request queue is full then the new request is demoted to the dirty queue
372        # - Incoming 'dirty' requests are put into the dirty queue, or dropped if this is full
[13121]373        # - The render queue holds the requests which are in progress by the render threads
374        self.requests = {}
375        self.dirties = {}
376        self.rendering = {}
377
378        self.request_limit = request_limit
379        self.dirty_limit = dirty_limit
[13273]380        self.not_empty = threading.Condition()
[13121]381
382
383    def add(self, request):
[13273]384        self.not_empty.acquire()
[13121]385        try:
[13273]386            # Before adding this new request we first look if this tile is already pending
387            # If so, the new request is tacked on to the existing one
[13121]388            # FIXME: Add short-circuit for overload condition?
[13239]389            t = request.meta_tuple()
390            if t in self.rendering:
391                self.rendering[t].append(request)
[13121]392                return "rendering"
[13273]393            if t in self.requests:
[13239]394                self.requests[t].append(request)
[13121]395                return "requested"
[13273]396            if t in self.dirties:
[13239]397                self.dirties[t].append(request)
[13121]398                return "dirty"
[13273]399            # If we've reached here then there are no existing requests for this tile
400            if (request.commandStatus == protocol.Render) and (len(self.requests) < self.request_limit):
[13239]401                self.requests[t] = [request]
402                self.not_empty.notify()
[13121]403                return "requested"
[13273]404            if len(self.dirties) < self.dirty_limit:
[13239]405                self.dirties[t] = [request]
406                self.not_empty.notify()
[13121]407                return "dirty"
[13273]408            return "dropped"
[13121]409        finally:
[13273]410            self.not_empty.release()
[13121]411
412
413    def fetch(self):
[13273]414        # Fetches a request tuple from the request or dirty queue
415        # The requests are moved to the rendering queue while they are being rendered
[13239]416        self.not_empty.acquire()
[13121]417        try:
[13239]418            while (len(self.requests) == 0) and (len(self.dirties) == 0):
419                self.not_empty.wait()
[13121]420            # Pull request from one of the incoming queues
421            try:
422                item = self.requests.popitem()
423            except KeyError:
424                try:
425                    item = self.dirties.popitem()
426                except KeyError:
427                    print "Odd, queues empty"
428                    return
429
[13273]430            t = item[0]
431            self.rendering[t] = item[1]
432            return t
[13121]433        finally:
[13239]434            self.not_empty.release()
[13121]435
[13273]436    def pop_requests(self, t):
437        # Removes this tuple from the rendering queue
438        # and returns the list of request for the tuple
439        self.not_empty.acquire()
[13121]440        try:
[13273]441            return self.rendering.pop(t)
[13121]442        except KeyError:
[13273]443            # Should never happen. It implies the requests queues are broken
[13121]444            print "WARNING: Failed to locate request in rendering list!"
445        finally:
[13273]446            self.not_empty.release()
[13121]447
448
449class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
450
451    def rx_request(self, request):
452        if (request.commandStatus != protocol.Render) \
453           and (request.commandStatus != protocol.Dirty):
454               return
455
456        if request.bad_request():
457            if (request.commandStatus == protocol.Render):
458                request.send(protocol.NotDone)
459            return
460
[13273]461        #cur_thread = threading.currentThread()
[13239]462        #print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
463        #    (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
[13121]464
465        status = self.server.queue_handler.add(request)
466        if status in ("rendering", "requested"):
467            # Request queued, response will be sent on completion
468            return
469
[13273]470        # The tile won't be rendered soon, tell the requestor straight away
[13121]471        if (request.commandStatus == protocol.Render):
472            request.send(protocol.NotDone)
473
474    def handle(self):
475        cur_thread = threading.currentThread()
[13239]476        #print "%s: New connection" % cur_thread.getName()
[13251]477        len_v1 = ProtocolPacketV1().len()
478        len_v2 = ProtocolPacketV2().len()
479        max_len = max(len_v1, len_v2)
[13121]480
481        while True:
482            try:
483                data = self.request.recv(max_len)
484            except socket.error, e:
485                if e[0] == errno.ECONNRESET:
[13239]486                    #print "Connection reset by peer"
[13121]487                    break
488                else:
489                    raise
490
[13251]491            if len(data) == len_v1:
492                req_v1 = ProtocolPacketV1()
[13121]493                req_v1.receive(data, self.request)
494                self.rx_request(req_v1)
[13251]495            if len(data) == len_v2:
496                req_v2 = ProtocolPacketV2()
[13121]497                req_v2.receive(data, self.request)
498                self.rx_request(req_v2)
499            elif len(data) == 0:
[13239]500                #print "%s: Connection closed" % cur_thread.getName()
[13121]501                break
502            else:
503                print "Invalid request length %d" % len(data)
504                break
505
506class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
507    def __init__(self, address, queue_handler, handler):
508        if(os.path.exists(address)):
509           os.unlink(address)
510        self.address = address
511        self.queue_handler = queue_handler
512        SocketServer.UnixStreamServer.__init__(self, address, handler)
513        self.daemon_threads = True
514
515def listener(address, queue_handler):
516    # Create the server
517    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
518    # The socket needs to be writeable by Apache
519    os.chmod(address, 0666)
520    # Loop forever servicing requests
521    server.serve_forever()
522
523def display_config(config):
524    for xmlname in config.sections():
[15888]525        if xmlname != "renderd" and xmlname != "mapnik":
526            print "Layer name: %s" % xmlname
527            uri = config.get(xmlname, "uri")
528            xml = config.get(xmlname, "xml")
529            print "    URI(%s) = XML(%s)" % (uri, xml)
[13121]530
531def read_styles(config):
532    styles = {}
533    for xmlname in config.sections():
[15888]534        if xmlname != "renderd" and xmlname != "mapnik":
535            styles[xmlname] = config.get(xmlname, "xml")
[13121]536    return styles
537
538if __name__ == "__main__":
539    try:
540        cfg_file = os.environ['RENDERD_CFG']
541    except KeyError:
542        cfg_file = "/etc/renderd.conf"
543
[13273]544    # Unifont has a better character coverage and is used as a fallback for DejaVu
545    # if you use a style based on the osm-template-fonset.xml
[13239]546    mapnik.FontEngine.instance().register_font("/home/jburgess/osm/fonts/unifont-5.1.20080706.ttf")
547
[15889]548    default_cfg = StringIO("""
549[renderd]
550socketname=/tmp/osm-renderd
551num_threads=4
552tile_dir=/var/lib/mod_tile
553""")
554
[13121]555    config = ConfigParser.ConfigParser()
[15889]556    config.readfp(default_cfg)
[13121]557    config.read(cfg_file)
558    display_config(config)
559    styles = read_styles(config)
560
[15889]561    num_threads    = config.getint("renderd", "num_threads")
562    renderd_socket = config.get("renderd", "socketname")
563    tile_dir       = config.get("renderd", "tile_dir")
564
[13121]565    queue_handler = RequestQueues()
[15889]566    start_renderers(num_threads, tile_dir, styles, queue_handler)
567    listener(renderd_socket, queue_handler)
Note: See TracBrowser for help on using the repository browser.