source: subversion/applications/utils/mod_tile/renderd.py @ 13236

Last change on this file since 13236 was 13236, checked in by jonb, 11 years ago

small clean up of renderd.py code

  • Property svn:executable set to *
File size: 18.6 KB
Line 
1#!/usr/bin/python
2#
3# mod_tile rendering daemon example written in Python.
4# The code is mostly a direct port of the C implementation.
5# It needs more work to make it more Pythonic, split it
6# into more appropriate classes, add documentation, fix bugs etc.
7#
8# This is currently experimental and not intended as a replacement
9# of the C implementation! It should allow more people to produce
10# custom variations of the rendering pipeline, e.g. such as compositing
11# tiles from multiple layers.
12#
13# The code functions but I'm not yet convinced this is the correct
14# approach to integrating Python with the render daemon. Two other
15# options I'm considering:
16#
17# - Use the C renderd code with python binding allowing the replacement
18# of just the core tile rendering code (this is the bit that people
19# may want to tweak)
20#
21# - Split the functionality into a seperate queue handler daemon and
22# render daemon. This would remove a lot of the complexity around the
23# request handling which most people probably won't want to touch.
24# The queue handler might stay in C with a smaller python rendering daemon
25
26import sys, os
27import SocketServer
28import struct
29import thread
30import threading
31import socket
32import ConfigParser
33import mapnik
34import time
35import errno
36from math import pi,cos,sin,log,exp,atan
37
38import cairo
39import cStringIO
40
41MAX_ZOOM = 18
42METATILE = 8
43META_MAGIC = "META"
44
45class protocol:
46    # ENUM values for commandStatus field in protocol packet
47    Ignore = 0
48    Render = 1
49    Dirty = 2
50    Done = 3
51    NotDone = 4
52
53class ProtocolPacket:
54    def __init__(self, version, fields = ""):
55        self.version = version
56        self.xmlname = ""
57        self.x = 0
58        self.y = 0
59        self.z = 0
60        self.mx = 0
61        self.my = 0
62        self.commandStatus = protocol.Ignore
63        self.fields = fields
64
65    def len(self):
66        return struct.calcsize(self.fields)
67
68    def bad_request(self):
69        x = self.x
70        y = self.y
71        z = self.z
72
73        if (z < 0) or (z > MAX_ZOOM):
74            return True
75        limit = (1 << z) -1
76        if (x < 0) or (x > limit):
77            return True
78        if (y < 0) or (y > limit):
79            return True
80        return False
81
82    def meta_tuple(self):
83        # This metatile tuple is used to identify duplicate request in the rendering queue
84        return (self.xmlname, self.mx, self.my, self.z)
85
86class ProtocolPacketV1(ProtocolPacket):
87    def __init__(self):
88        ProtocolPacket(1)
89        self.fields = "5i"
90
91    def receive(self, data, dest):
92        version, request, x, y, z = struct.unpack(self.fields, data)
93
94        if version != 1:
95            print "Received V1 packet with incorect version %d" % version
96        else:
97            #print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
98            #    % (request, x, y, z)
99            self.commandStatus = request
100            self.x = x
101            self.y = y
102            self.z = z
103            self.xmlname = "Default"
104            # Calculate Meta-tile value for this x/y
105            self.mx = x & (METATILE-1)
106            self.my = y & (METATILE-1)
107            self.dest = dest
108
109
110    def send(self, status):
111        x = self.x
112        y = self.y
113        z = self.z
114        data = struct.pack(self.fields, (1, status, x, y, z))
115        try: 
116            self.dest.send(data)
117        except socket.error, e:
118               if e[0] == errno.EBADF:
119                   print "Got EBADF in socket send"
120               else:
121                   raise
122
123
124class ProtocolPacketV2(ProtocolPacket):
125    def __init__(self):
126        ProtocolPacket(2)
127        self.fields = "5i41sxxx"
128
129    def receive(self, data, dest):
130        version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
131
132        if version != 2:
133            print "Received V2 packet with incorect version %d" % version
134        else:
135            #print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
136            #    % (request, xmlname, x, y, z)
137            self.commandStatus = request
138            self.x = x
139            self.y = y
140            self.z = z
141            self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
142            # Calculate Meta-tile value for this x/y
143            self.mx = x & ~(METATILE-1)
144            self.my = y & ~(METATILE-1)
145            self.dest = dest
146
147    def send(self, status):
148        x = self.x
149        y = self.y
150        z = self.z
151        xmlname = self.xmlname
152        data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
153        try:
154            self.dest.send(data)
155        except socket.error, e:
156               if e[0] == errno.EBADF:
157                   print "Got EBADF in socket send"
158               else:
159                   raise
160
161DEG_TO_RAD = pi/180
162RAD_TO_DEG = 180/pi
163
164
165class SphericalProjection:
166    def __init__(self,levels=18):
167        self.Bc = []
168        self.Cc = []
169        self.zc = []
170        self.Ac = []
171        c = 256
172        for d in range(0,levels+1):
173            e = c/2;
174            self.Bc.append(c/360.0)
175            self.Cc.append(c/(2 * pi))
176            self.zc.append((e,e))
177            self.Ac.append(c)
178            c *= 2
179
180    def minmax(self, a,b,c):
181        a = max(a,b)
182        a = min(a,c)
183        return a
184
185    def fromLLtoPixel(self,ll,zoom):
186         d = self.zc[zoom]
187         e = round(d[0] + ll[0] * self.Bc[zoom])
188         f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
189         g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
190         return (e,g)
191
192    def fromPixelToLL(self,px,zoom):
193         e = self.zc[zoom]
194         f = (px[0] - e[0])/self.Bc[zoom]
195         g = (px[1] - e[1])/-self.Cc[zoom]
196         h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
197         return (f,h)
198
199
200class RenderThread:
201    def __init__(self, tile_path, styles, queue_handler):
202        self.tile_path = tile_path
203        self.queue_handler = queue_handler
204        self.maps = {}
205        for xmlname in styles:
206            #print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
207            m = mapnik.Map(256, 256)
208            self.maps[xmlname] = m
209            mapnik.load_map(m, styles[xmlname])
210
211        # Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
212        self.gprj = SphericalProjection(MAX_ZOOM)
213        # This is the Spherical mercator projection (EPSG:900913)
214        self.prj = mapnik.Projection("+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over")
215
216    def render_with_agg(self, m, size):
217        # Render image with default Agg renderer
218        im = mapnik.Image(size, size)
219        mapnik.render(m, im)
220        return im
221
222    def render_with_cairo(self, m, size):
223        surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, size, size)
224        mapnik.render(m, surface)
225        return mapnik.Image.from_cairo(surface)
226
227    def split_meta_image(self, im, sz, format = 'png256'):
228        # Split image up into NxN grid of tile images
229        tiles = {}
230        for yy in range(0,sz):
231            for xx in range(0,sz):
232                view = im.view(xx * 256 , yy * 256, 256, 256)
233                tile = view.tostring(format)
234                tiles[(xx, yy)] = tile
235
236        return tiles
237
238
239    def render_meta(self, m, style, x, y, z, sz):
240        # Calculate pixel positions of bottom-left & top-right
241        p0 = (x * 256, (y + sz) * 256)
242        p1 = ((x + sz) * 256, y * 256)
243
244        # Convert to LatLong (EPSG:4326)
245        l0 = self.gprj.fromPixelToLL(p0, z);
246        l1 = self.gprj.fromPixelToLL(p1, z);
247
248        # Convert to mercator co-ords (EPSG:900913)
249        c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
250        c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))
251
252        # Bounding box for the meta-tile
253        bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
254        render_size = 256 * sz
255        m.resize(render_size, render_size)
256        m.zoom_to_box(bbox)
257        m.buffer_size = 128
258
259        im = self.render_with_agg(m, render_size)
260        #im = self.render_with_cairo(m, render_size)
261        return self.split_meta_image(im, sz)
262
263    def render_request(self, r):
264        # Calculate the meta tile size to use for this zoom level
265        size = min(METATILE, 1 << r.z)
266        xmlname = r.xmlname
267        x = r.mx
268        y = r.my
269        z = r.z
270        try:
271            m = self.maps[xmlname]
272        except KeyError:
273            print "No map for: '%s'" % xmlname
274            return False
275        tiles = self.render_meta(m, xmlname, x, y, z, size)
276        status = self.meta_save(xmlname, x, y, z, size, tiles)
277
278        if status == True:
279            print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
280            (xmlname, r.z, x, x+size-1, y, y+size-1)
281        else:
282            print "FAILED xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
283            (xmlname, z, x, x+size-1, y, y+size-1)
284
285        return status;
286
287    def xyz_to_meta(self, xmlname, x,y, z):
288        mask = METATILE -1
289        x &= ~mask
290        y &= ~mask
291        hashes = {}
292
293        for i in range(0,5):
294            hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
295            x >>= 4
296            y >>= 4
297
298        meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
299        return meta
300
301    def xyz_to_meta_offset(self, xmlname, x,y, z):
302        mask = METATILE -1
303        offset = (x & mask) * METATILE + (y & mask)
304        return offset
305
306
307    def meta_save(self, xmlname, x, y, z, size, tiles):
308        #print "Saving %d tiles" % (size * size)
309        meta_path = self.xyz_to_meta(xmlname, x, y, z)
310        d = os.path.dirname(meta_path)
311        if not os.path.exists(d):
312            try:
313                os.makedirs(d)
314            except OSError:
315                # Multiple threads can race when creating directories,
316                # ignore exception if the directory now exists
317                if not os.path.exists(d):
318                    raise
319
320        tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
321        f = open(tmp, "w")
322
323        f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
324        offset = len(META_MAGIC) + 4 * 4
325        # Need to pre-compensate the offsets for the size of the offset/size table we are about to write
326        offset += (2 * 4) * (METATILE * METATILE)
327        # Collect all the tile sizes
328        sizes = {}
329        offsets = {}
330        for xx in range(0, size):
331            for yy in range(0, size):
332                mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
333                sizes[mt] = len(tiles[(xx, yy)])
334                offsets[mt] = offset
335                offset += sizes[mt]
336        # Write out the offset/size table
337        for mt in range(0, METATILE * METATILE):
338            if mt in sizes:
339                f.write(struct.pack("2i", offsets[mt], sizes[mt]))
340            else:
341                f.write(struct.pack("2i", 0, 0))
342        # Write out the tiles
343        for xx in range(0, size):
344            for yy in range(0, size):
345                f.write(tiles[(xx, yy)])
346
347        f.close()
348        os.rename(tmp, meta_path)
349        print "Wrote: %s" % meta_path
350
351        return True
352
353    def loop(self):
354        while True:
355            #Fetch a meta-tile to render
356            r = self.queue_handler.fetch()
357            rendered = self.render_request(r)
358            # Retrieve all requests for this meta-tile
359            requests = self.queue_handler.pop_requests(r)
360            for request in requests:
361                if (request.commandStatus == protocol.Render):
362                    if rendered == True:
363                        request.send(protocol.Done)
364                    else:
365                        request.send(protocol.NotDone)
366            #time.sleep(1)
367        print "Dummy render thread, exiting. Path %s" % self.tile_path
368
369
370def start_renderers(num_threads, tile_path, styles, queue_handler):
371    for i in range(num_threads):
372        renderer = RenderThread(tile_path, styles, queue_handler)
373        render_thread = threading.Thread(target=renderer.loop)
374        render_thread.setDaemon(True)
375        render_thread.start()
376        print "Started render thread %s" % render_thread.getName()
377
378class RequestQueues:
379    def __init__(self, request_limit = 32, dirty_limit = 1000):
380        # Queues are used as follows:
381        # - Incoming render requests are initally put into the request queue
382        # If the request queue is full then the new request is demoted dirty queue
383        # - Incoming 'dirty' requests are put into the dirty queue or overflow from render queue
384        # - The render queue holds the requests which are in progress by the render threads
385        self.requests = {}
386        self.dirties = {}
387        self.rendering = {}
388
389        self.request_limit = request_limit
390        self.dirty_limit = dirty_limit
391        self.cond = threading.Condition()
392
393
394    def add(self, request):
395        self.cond.acquire()
396        try:
397            # FIXME: Add short-circuit for overload condition?
398            if request.meta_tuple() in self.rendering:
399                self.rendering[request.meta_tuple()].append(request)
400                return "rendering"
401            elif request.meta_tuple() in self.requests:
402                self.requests[request.meta_tuple()].append(request)
403                return "requested"
404            elif request.meta_tuple() in self.dirties:
405                self.dirties[request.meta_tuple()].append(request)
406                return "dirty"
407            elif request.commandStatus == protocol.Render and len(self.requests) < self.request_limit:
408                self.requests[request.meta_tuple()] = [request]
409                self.cond.notify()
410                return "requested"
411            elif len(self.dirties) < self.dirty_limit:
412                self.dirties[request.meta_tuple()] = [request]
413                self.cond.notify()
414                return "dirty"
415            else:
416                return "dropped"
417        finally:
418            self.cond.release()
419
420
421    def fetch(self):
422        self.cond.acquire()
423        try:
424            while len(self.requests) == 0 and len(self.dirties) == 0:
425                self.cond.wait()
426            # Pull request from one of the incoming queues
427            try:
428                item = self.requests.popitem()
429            except KeyError:
430                try:
431                    item = self.dirties.popitem()
432                except KeyError:
433                    print "Odd, queues empty"
434                    return
435
436            # Push request list on to the list of items being rendered
437            k = item[0]
438            v = item[1] # This is a list of all requests for this meta-tile
439            self.rendering[k] = v
440            # Return the first request from the list
441            return v[0]
442        finally:
443            self.cond.release()
444
445    def pop_requests(self, request):
446        self.cond.acquire()
447        try:
448            return self.rendering.pop(request.meta_tuple())
449        except KeyError:
450            # It is not yet clear why this happens, there should always be
451            # an entry in the rendering queue for each active meta -tile request
452            print "WARNING: Failed to locate request in rendering list!"
453            return (request,)
454        finally:
455            self.cond.release()
456
457
458class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
459
460    def rx_request(self, request):
461        if (request.commandStatus != protocol.Render) \
462           and (request.commandStatus != protocol.Dirty):
463               return
464
465        if request.bad_request():
466            if (request.commandStatus == protocol.Render):
467                request.send(protocol.NotDone)
468            return
469
470        cur_thread = threading.currentThread()
471        print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
472            (cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
473
474        status = self.server.queue_handler.add(request)
475        if status in ("rendering", "requested"):
476            # Request queued, response will be sent on completion
477            return
478
479        # The tile won't be rendered soon
480        if (request.commandStatus == protocol.Render):
481            request.send(protocol.NotDone)
482
483    def handle(self):
484        cur_thread = threading.currentThread()
485        print "%s: New connection" % cur_thread.getName()
486        req_v1 = ProtocolPacketV1()
487        req_v2 = ProtocolPacketV2()
488        max_len = max(req_v1.len(), req_v2.len())
489
490        while True:
491            try:
492                data = self.request.recv(max_len)
493            except socket.error, e:
494                if e[0] == errno.ECONNRESET:
495                    print "Connection reset by peer"
496                    break
497                else:
498                    raise
499
500            if len(data) == req_v1.len():
501                req_v1.receive(data, self.request)
502                self.rx_request(req_v1)
503            if len(data) == req_v2.len():
504                req_v2.receive(data, self.request)
505                self.rx_request(req_v2)
506            elif len(data) == 0:
507                print "%s: Connection closed" % cur_thread.getName()
508                break
509            else:
510                print "Invalid request length %d" % len(data)
511                break
512
513class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
514    def __init__(self, address, queue_handler, handler):
515        if(os.path.exists(address)):
516           os.unlink(address)
517        self.address = address
518        self.queue_handler = queue_handler
519        SocketServer.UnixStreamServer.__init__(self, address, handler)
520        self.daemon_threads = True
521
522def listener(address, queue_handler):
523    # Create the server
524    server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
525    # The socket needs to be writeable by Apache
526    os.chmod(address, 0666)
527    # Loop forever servicing requests
528    server.serve_forever()
529
530def display_config(config):
531    for xmlname in config.sections():
532        print "Layer name: %s" % xmlname
533        #for opt in config.options(xmlname):
534        #   print "%s = %s" % (opt, config.get(xmlname, opt))
535        uri = config.get(xmlname, "uri")
536        xml = config.get(xmlname, "xml")
537        print "    URI(%s) = XML(%s)" % (uri, xml)
538
539def read_styles(config):
540    styles = {}
541    for xmlname in config.sections():
542        styles[xmlname] = config.get(xmlname, "xml")
543    return styles
544
545if __name__ == "__main__":
546    try:
547        cfg_file = os.environ['RENDERD_CFG']
548    except KeyError:
549        cfg_file = "/etc/renderd.conf"
550
551    # FIXME: Move more of these to config file?
552    RENDER_SOCKET = "/tmp/osm-renderd"
553    HASH_PATH = "/var/lib/mod_tile"
554    NUM_THREADS = 4
555
556    config = ConfigParser.ConfigParser()
557    config.read(cfg_file)
558    display_config(config)
559    styles = read_styles(config)
560
561    queue_handler = RequestQueues()
562    start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
563    listener(RENDER_SOCKET, queue_handler)
Note: See TracBrowser for help on using the repository browser.