source: subversion/applications/utils/export/osm2pgsql/output-pgsql.c @ 27182

Revision 27182, 58.1 KB checked in by apmon, 2 years ago (diff)

[osm2pgsql] Attempt to improve the transaction management during "going over pending ways/relations"

  • Using an extended transaction on the planet_osm_ways table during the pending ways phase should be fine. It is the planet_osm_rels table that needs to stay out of transaction in this phase, as the different pending ways processes can simultaniously update the pending status of the same relations if multiple ways occur in a relation
  • Remove the extended transactions in the going over pending relations phase on the output tables. The processing of relations during diff-import causes the ways to be removed from the output tables. If a way is contained in two relations in the same diff, that also happen to be processed in different parallel processes, this could dead lock.
Line 
1/* Implements the mid-layer processing for osm2pgsql
2 * using several PostgreSQL tables
3 *
4 * This layer stores data read in from the planet.osm file
5 * and is then read by the backend processing code to
6 * emit the final geometry-enabled output formats
7*/
8
9#include "config.h"
10
11#include <stdio.h>
12#include <unistd.h>
13#include <stdlib.h>
14#include <string.h>
15#include <assert.h>
16#include <errno.h>
17#include <time.h>
18
19#ifdef HAVE_PTHREAD
20#include <pthread.h>
21#endif
22
23#include <libpq-fe.h>
24
25#include "osmtypes.h"
26#include "output.h"
27#include "reprojection.h"
28#include "output-pgsql.h"
29#include "build_geometry.h"
30#include "middle.h"
31#include "pgsql.h"
32#include "expire-tiles.h"
33
34#define SRID (project_getprojinfo()->srs)
35
36/* FIXME: Shouldn't malloc this all to begin with but call realloc()
37   as required. The program will most likely segfault if it reads a
38   style file with more styles than this */
39#define MAX_STYLES 1000
40
41enum table_id {
42    t_point, t_line, t_poly, t_roads
43};
44
45static const struct output_options *Options;
46
47/* Tables to output */
48static struct s_table {
49    //enum table_id table;
50    char *name;
51    const char *type;
52    PGconn *sql_conn;
53    char buffer[1024];
54    unsigned int buflen;
55    int copyMode;
56    char *columns;
57} tables [] = {
58    { .name = "%s_point",   .type = "POINT"     },
59    { .name = "%s_line",    .type = "LINESTRING"},
60    { .name = "%s_polygon", .type = "GEOMETRY"  }, // Actually POLGYON & MULTIPOLYGON but no way to limit to just these two
61    { .name = "%s_roads",   .type = "LINESTRING"}
62};
63#define NUM_TABLES ((signed)(sizeof(tables) / sizeof(tables[0])))
64
65#define FLAG_POLYGON 1    /* For polygon table */
66#define FLAG_LINEAR  2    /* For lines table */
67#define FLAG_NOCACHE 4    /* Optimisation: don't bother remembering this one */
68#define FLAG_DELETE  8    /* These tags should be simply deleted on sight */
69#define FLAG_PHSTORE 17   /* polygons without own column but listed in hstore this implies FLAG_POLYGON */
70static struct flagsname {
71    char *name;
72    int flag;
73} tagflags[] = {
74    { .name = "polygon",    .flag = FLAG_POLYGON },
75    { .name = "linear",     .flag = FLAG_LINEAR },
76    { .name = "nocache",    .flag = FLAG_NOCACHE },
77    { .name = "delete",     .flag = FLAG_DELETE },
78    { .name = "phstore",    .flag = FLAG_PHSTORE }
79};
80#define NUM_FLAGS ((signed)(sizeof(tagflags) / sizeof(tagflags[0])))
81
82/* Table columns, representing key= tags */
83struct taginfo {
84    char *name;
85    char *type;
86    int flags;
87    int count;
88};
89
90static struct taginfo *exportList[4]; /* Indexed by enum table_id */
91static int exportListCount[4];
92
93/* Data to generate z-order column and road table
94 * The name of the roads table is misleading, this table
95 * is used for any feature to be shown at low zoom.
96 * This includes railways and administrative boundaries too
97 */
98static struct {
99    int offset;
100    const char *highway;
101    int roads;
102} layers[] = {
103    { 3, "minor",         0 },
104    { 3, "road",          0 },
105    { 3, "unclassified",  0 },
106    { 3, "residential",   0 },
107    { 4, "tertiary_link", 0 },
108    { 4, "tertiary",      0 },
109   // 5 = railway
110    { 6, "secondary_link",1 },
111    { 6, "secondary",     1 },
112    { 7, "primary_link",  1 },
113    { 7, "primary",       1 },
114    { 8, "trunk_link",    1 },
115    { 8, "trunk",         1 },
116    { 9, "motorway_link", 1 },
117    { 9, "motorway",      1 }
118};
119static const unsigned int nLayers = (sizeof(layers)/sizeof(*layers));
120
121static int pgsql_delete_way_from_output(osmid_t osm_id);
122static int pgsql_delete_relation_from_output(osmid_t osm_id);
123static int pgsql_process_relation(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists);
124
125void read_style_file( const char *filename )
126{
127  FILE *in;
128  int lineno = 0;
129  int num_read = 0;
130
131  exportList[OSMTYPE_NODE] = malloc( sizeof(struct taginfo) * MAX_STYLES );
132  exportList[OSMTYPE_WAY]  = malloc( sizeof(struct taginfo) * MAX_STYLES );
133
134  in = fopen( filename, "rt" );
135  if( !in )
136  {
137    fprintf( stderr, "Couldn't open style file '%s': %s\n", filename, strerror(errno) );
138    exit_nicely();
139  }
140 
141  char buffer[1024];
142  while( fgets( buffer, sizeof(buffer), in) != NULL )
143  {
144    lineno++;
145   
146    char osmtype[24];
147    char tag[64];
148    char datatype[24];
149    char flags[128];
150    int i;
151    char *str;
152
153    str = strchr( buffer, '#' );
154    if( str )
155      *str = '\0';
156     
157    int fields = sscanf( buffer, "%23s %63s %23s %127s", osmtype, tag, datatype, flags );
158    if( fields <= 0 )  /* Blank line */
159      continue;
160    if( fields < 3 )
161    {
162      fprintf( stderr, "Error reading style file line %d (fields=%d)\n", lineno, fields );
163      exit_nicely();
164    }
165    struct taginfo temp;
166    temp.name = strdup(tag);
167    temp.type = strdup(datatype);
168   
169    temp.flags = 0;
170    for( str = strtok( flags, ",\r\n" ); str; str = strtok(NULL, ",\r\n") )
171    {
172      for( i=0; i<NUM_FLAGS; i++ )
173      {
174        if( strcmp( tagflags[i].name, str ) == 0 )
175        {
176          temp.flags |= tagflags[i].flag;
177          break;
178        }
179      }
180      if( i == NUM_FLAGS )
181        fprintf( stderr, "Unknown flag '%s' line %d, ignored\n", str, lineno );
182    }
183    if (temp.flags==FLAG_PHSTORE) {
184      if (HSTORE_NONE==(Options->enable_hstore)) {
185        fprintf( stderr, "Error reading style file line %d (fields=%d)\n", lineno, fields );
186        fprintf( stderr, "flag 'phstore' is invalid in non-hstore mode\n");
187        exit_nicely();
188      }
189    }
190    temp.count = 0;
191//    printf("%s %s %d %d\n", temp.name, temp.type, temp.polygon, offset );
192   
193    int flag = 0;
194    if( strstr( osmtype, "node" ) )
195    {
196      memcpy( &exportList[ OSMTYPE_NODE ][ exportListCount[ OSMTYPE_NODE ] ], &temp, sizeof(temp) );
197      exportListCount[ OSMTYPE_NODE ]++;
198      flag = 1;
199    }
200    if( strstr( osmtype, "way" ) )
201    {
202      memcpy( &exportList[ OSMTYPE_WAY ][ exportListCount[ OSMTYPE_WAY ] ], &temp, sizeof(temp) );
203      exportListCount[ OSMTYPE_WAY ]++;
204      flag = 1;
205    }
206    if( !flag )
207    {
208      fprintf( stderr, "Weird style line %d\n", lineno );
209      exit_nicely();
210    }
211    num_read++;
212  }
213  if (ferror(in)) {
214      perror(filename);
215      exit_nicely();
216  }
217  if (num_read == 0) {
218      fprintf(stderr, "Unable to parse any valid columns from the style file. Aborting.\n");
219      exit_nicely();
220  }
221  fclose(in);
222}
223
224static void free_style_refs(const char *name, const char *type)
225{
226    // Find and remove any other references to these pointers
227    // This would be way easier if we kept a single list of styles
228    // Currently this scales with n^2 number of styles
229    int i,j;
230
231    for (i=0; i<NUM_TABLES; i++) {
232        for(j=0; j<exportListCount[i]; j++) {
233            if (exportList[i][j].name == name)
234                exportList[i][j].name = NULL;
235            if (exportList[i][j].type == type)
236                exportList[i][j].type = NULL;
237        }
238    }
239}
240
241static void free_style(void)
242{
243    int i, j;
244    for (i=0; i<NUM_TABLES; i++) {
245        for(j=0; j<exportListCount[i]; j++) {
246            free(exportList[i][j].name);
247            free(exportList[i][j].type);
248            free_style_refs(exportList[i][j].name, exportList[i][j].type);
249        }
250    }
251    for (i=0; i<NUM_TABLES; i++)
252        free(exportList[i]);
253}
254
255/* Handles copying out, but coalesces the data into large chunks for
256 * efficiency. PostgreSQL doesn't actually need this, but each time you send
257 * a block of data you get 5 bytes of overhead. Since we go column by column
258 * with most empty and one byte delimiters, without this optimisation we
259 * transfer three times the amount of data necessary.
260 */
261void copy_to_table(enum table_id table, const char *sql)
262{
263    PGconn *sql_conn = tables[table].sql_conn;
264    unsigned int len = strlen(sql);
265    unsigned int buflen = tables[table].buflen;
266    char *buffer = tables[table].buffer;
267
268    /* Return to copy mode if we dropped out */
269    if( !tables[table].copyMode )
270    {
271        pgsql_exec(sql_conn, PGRES_COPY_IN, "COPY %s (%s,way) FROM STDIN", tables[table].name, tables[table].columns);
272        tables[table].copyMode = 1;
273    }
274    /* If the combination of old and new data is too big, flush old data */
275    if( (unsigned)(buflen + len) > sizeof( tables[table].buffer )-10 )
276    {
277      pgsql_CopyData(tables[table].name, sql_conn, buffer);
278      buflen = 0;
279
280      /* If new data by itself is also too big, output it immediately */
281      if( (unsigned)len > sizeof( tables[table].buffer )-10 )
282      {
283        pgsql_CopyData(tables[table].name, sql_conn, sql);
284        len = 0;
285      }
286    }
287    /* Normal case, just append to buffer */
288    if( len > 0 )
289    {
290      strcpy( buffer+buflen, sql );
291      buflen += len;
292      len = 0;
293    }
294
295    /* If we have completed a line, output it */
296    if( buflen > 0 && buffer[buflen-1] == '\n' )
297    {
298      pgsql_CopyData(tables[table].name, sql_conn, buffer);
299      buflen = 0;
300    }
301
302    tables[table].buflen = buflen;
303}
304
305static int add_z_order(struct keyval *tags, int *roads)
306{
307    const char *layer   = getItem(tags, "layer");
308    const char *highway = getItem(tags, "highway");
309    const char *bridge  = getItem(tags, "bridge");
310    const char *tunnel  = getItem(tags, "tunnel");
311    const char *railway = getItem(tags, "railway");
312    const char *boundary= getItem(tags, "boundary");
313
314    int z_order = 0;
315    int l;
316    unsigned int i;
317    char z[13];
318
319    l = layer ? strtol(layer, NULL, 10) : 0;
320    z_order = 10 * l;
321    *roads = 0;
322
323    if (highway) {
324        for (i=0; i<nLayers; i++) {
325            if (!strcmp(layers[i].highway, highway)) {
326                z_order += layers[i].offset;
327                *roads   = layers[i].roads;
328                break;
329            }
330        }
331    }
332
333    if (railway && strlen(railway)) {
334        z_order += 5;
335        *roads = 1;
336    }
337    // Administrative boundaries are rendered at low zooms so we prefer to use the roads table
338    if (boundary && !strcmp(boundary, "administrative"))
339        *roads = 1;
340
341    if (bridge && (!strcmp(bridge, "true") || !strcmp(bridge, "yes") || !strcmp(bridge, "1")))
342        z_order += 10;
343
344    if (tunnel && (!strcmp(tunnel, "true") || !strcmp(tunnel, "yes") || !strcmp(tunnel, "1")))
345        z_order -= 10;
346
347    snprintf(z, sizeof(z), "%d", z_order);
348    addItem(tags, "z_order", z, 0);
349
350    return 0;
351}
352
353#if 0
354static void fix_motorway_shields(struct keyval *tags)
355{
356    const char *highway = getItem(tags, "highway");
357    const char *name    = getItem(tags, "name");
358    const char *ref     = getItem(tags, "ref");
359
360    /* The current mapnik style uses ref= for motorway shields but some roads just have name= */
361    if (!highway || strcmp(highway, "motorway"))
362        return;
363
364    if (name && !ref)
365        addItem(tags, "ref", name, 0);
366}
367#endif
368
369/* Append all alternate name:xx on to the name tag with space sepatators.
370 * name= always comes first, the alternates are in no particular order
371 * Note: A new line may be better but this does not work with Mapnik
372 *
373 *    <tag k="name" v="Ben Nevis" />
374 *    <tag k="name:gd" v="Ben Nibheis" />
375 * becomes:
376 *    <tag k="name" v="Ben Nevis Ben Nibheis" />
377 */
378void compress_tag_name(struct keyval *tags)
379{
380    const char *name = getItem(tags, "name");
381    struct keyval *name_ext = getMatches(tags, "name:");
382    struct keyval *p;
383    char out[2048];
384
385    if (!name_ext)
386        return;
387
388    out[0] = '\0';
389    if (name) {
390        strncat(out, name, sizeof(out)-1);
391        strncat(out, " ", sizeof(out)-1);
392    }
393    while((p = popItem(name_ext)) != NULL) {
394        /* Exclude name:source = "dicataphone" and duplicates */
395        if (strcmp(p->key, "name:source") && !strstr(out, p->value)) {
396            strncat(out, p->value, sizeof(out)-1);
397            strncat(out, " ", sizeof(out)-1);
398        }
399        freeItem(p);
400    }
401    free(name_ext);
402
403    // Remove trailing space
404    out[strlen(out)-1] = '\0';
405    //fprintf(stderr, "*** New name: %s\n", out);
406    updateItem(tags, "name", out);
407}
408
409
410
411static void pgsql_out_cleanup(void)
412{
413    int i;
414
415    for (i=0; i<NUM_TABLES; i++) {
416        if (tables[i].sql_conn) {
417            PQfinish(tables[i].sql_conn);
418            tables[i].sql_conn = NULL;
419        }
420    }
421}
422
423/* Escape data appropriate to the type */
424static void escape_type(char *sql, int len, const char *value, const char *type) {
425    int items, from, to;
426
427    if ( !strcmp(type, "int4") ) {
428        /* For integers we take the first number, or the average if it's a-b */
429        items = sscanf(value, "%d-%d", &from, &to);
430        if ( items == 1 ) {
431            sprintf(sql, "%d", from);
432        } else if ( items == 2 ) {
433            sprintf(sql, "%d", (from + to) / 2);
434        } else {
435            sprintf(sql, "\\N");
436        }
437    } else {
438        escape(sql, len, value);
439    }
440}
441
442static void write_hstore(enum table_id table, struct keyval *tags)
443{
444    static char *sql;
445    static size_t sqllen=0;
446   
447    // sql buffer
448    if (sqllen==0) {
449      sqllen=2048;
450      sql=malloc(sqllen);
451    }
452   
453    // a clone of the tags pointer
454    struct keyval *xtags = tags;
455   
456    // while this tags has a follow-up..
457    while (xtags->next->key != NULL)
458    {
459
460      /* hard exclude z_order tag and keys which have their own column */
461      if ((xtags->next->has_column) || (strcmp("z_order",xtags->next->key)==0)) {
462        // update the tag-pointer to point to the next tag
463        xtags = xtags->next;
464        continue;
465      }
466
467      /*
468        hstore ASCII representation looks like
469        "<key>"=>"<value>"
470       
471        we need at least strlen(key)+strlen(value)+6+'\0' bytes
472        in theory any single character could also be escaped
473        thus we need an additional factor of 2.
474        The maximum lenght of a single hstore element is thus
475        calcuated as follows:
476      */
477      size_t hlen=2 * (strlen(xtags->next->key) + strlen(xtags->next->value)) + 7;
478     
479      // if the sql buffer is too small
480      if (hlen > sqllen) {
481        sqllen = hlen;
482        sql = realloc(sql, sqllen);
483      }
484       
485      // pack the tag with its value into the hstore
486      keyval2hstore(sql, xtags->next);
487      copy_to_table(table, sql);
488
489      // update the tag-pointer to point to the next tag
490      xtags = xtags->next;
491       
492      // if the tag has a follow up, add a comma to the end
493      if (xtags->next->key != NULL)
494        copy_to_table(table, ",");
495    }
496   
497    // finish the hstore column by placing a TAB into the data stream
498    copy_to_table(table, "\t");
499   
500    // the main hstore-column has now been written
501}
502
503// write an hstore column to the database
504static void write_hstore_columns(enum table_id table, struct keyval *tags)
505{
506    static char *sql;
507    static size_t sqllen=0;
508   
509    // sql buffer
510    if (sqllen==0) {
511      sqllen=2048;
512      sql=malloc(sqllen);
513    }
514   
515    // the index of the current hstore column
516    int i_hstore_column;
517   
518    // iterate over all configured hstore colums in the options
519    for(i_hstore_column = 0; i_hstore_column < Options->n_hstore_columns; i_hstore_column++)
520    {
521        // did this node have a tag that matched the current hstore column
522        int found = 0;
523       
524        // a clone of the tags pointer
525        struct keyval *xtags = tags;
526       
527        // while this tags has a follow-up..
528        while (xtags->next->key != NULL) {
529           
530            // check if the tags' key starts with the name of the hstore column
531            char *pos = strstr(xtags->next->key, Options->hstore_columns[i_hstore_column]);
532           
533            // and if it does..
534            if(pos == xtags->next->key)
535            {
536                // remember we found one
537                found=1;
538               
539                // generate the short key name
540                char *shortkey = xtags->next->key + strlen(Options->hstore_columns[i_hstore_column]);
541               
542                // calculate the size needed for this hstore entry
543                size_t hlen=2*(strlen(shortkey)+strlen(xtags->next->value))+7;
544               
545                // if the sql buffer is too small
546                if (hlen > sqllen) {
547                    // resize it
548                    sqllen=hlen;
549                    sql=realloc(sql,sqllen);
550                }
551               
552                // and pack the shortkey with its value into the hstore
553                keyval2hstore_manual(sql, shortkey, xtags->next->value);
554                copy_to_table(table, sql);
555               
556                // update the tag-pointer to point to the next tag
557                xtags=xtags->next;
558               
559                // if the tag has a follow up, add a comma to the end
560                if (xtags->next->key != NULL)
561                    copy_to_table(table, ",");
562            }
563            else
564            {
565                // update the tag-pointer to point to the next tag
566                xtags=xtags->next;
567            }
568        }
569       
570        // if no matching tag has been found, write a NULL
571        if(!found)
572            copy_to_table(table, "\\N");
573       
574        // finish the hstore column by placing a TAB into the data stream
575        copy_to_table(table, "\t");
576    }
577   
578    // all hstore-columns have now been written
579}
580
581
582/* example from: pg_dump -F p -t planet_osm gis
583COPY planet_osm (osm_id, name, place, landuse, leisure, "natural", man_made, waterway, highway, railway, amenity, tourism, learning, building, bridge, layer, way) FROM stdin;
58417959841        \N      \N      \N      \N      \N      \N      \N      bus_stop        \N      \N      \N      \N      \N      \N    -\N      0101000020E610000030CCA462B6C3D4BF92998C9B38E04940
58517401934        The Horn        \N      \N      \N      \N      \N      \N      \N      \N      pub     \N      \N      \N      \N    -\N      0101000020E6100000C12FC937140FD5BFB4D2F4FB0CE04940
586...
587
588mine - 01 01000000 48424298424242424242424256427364
589psql - 01 01000020 E6100000 30CCA462B6C3D4BF92998C9B38E04940
590       01 01000020 E6100000 48424298424242424242424256427364
5910x2000_0000 = hasSRID, following 4 bytes = srid, not supported by geos WKBWriter
592Workaround - output SRID=4326;<WKB>
593*/
594
595static int pgsql_out_node(osmid_t id, struct keyval *tags, double node_lat, double node_lon)
596{
597
598    static char *sql;
599    static size_t sqllen=0;
600    char *v;
601    int i;
602    struct keyval *tag;
603
604    if (sqllen==0) {
605      sqllen=2048;
606      sql=malloc(sqllen);
607    }
608
609    expire_tiles_from_bbox(node_lon, node_lat, node_lon, node_lat);
610    sprintf(sql, "%" PRIdOSMID "\t", id);
611    copy_to_table(t_point, sql);
612
613    for (i=0; i < exportListCount[OSMTYPE_NODE]; i++) {
614        if( exportList[OSMTYPE_NODE][i].flags & FLAG_DELETE )
615            continue;
616        if( (exportList[OSMTYPE_NODE][i].flags & FLAG_PHSTORE) == FLAG_PHSTORE)
617            continue;
618        if ((tag = getTag(tags, exportList[OSMTYPE_NODE][i].name)))
619        {
620            escape_type(sql, sqllen, tag->value, exportList[OSMTYPE_NODE][i].type);
621            exportList[OSMTYPE_NODE][i].count++;
622            if (HSTORE_NORM==Options->enable_hstore)
623              tag->has_column=1;
624        }
625        else
626            sprintf(sql, "\\N");
627
628        copy_to_table(t_point, sql);
629        copy_to_table(t_point, "\t");
630    }
631   
632    // hstore columns
633    write_hstore_columns(t_point, tags);
634   
635    // check if a regular hstore is requested
636    if (Options->enable_hstore)
637        write_hstore(t_point, tags);
638   
639    sprintf(sql, "SRID=%d;POINT(%.15g %.15g)", SRID, node_lon, node_lat);
640    copy_to_table(t_point, sql);
641    copy_to_table(t_point, "\n");
642
643    return 0;
644}
645
646
647
648static void write_wkts(osmid_t id, struct keyval *tags, const char *wkt, enum table_id table)
649{
650 
651    static char *sql;
652    static size_t sqllen=0;
653    char *v;
654    int j;
655    struct keyval *tag;
656
657    if (sqllen==0) {
658      sqllen=2048;
659      sql=malloc(sqllen);
660    }
661   
662    sprintf(sql, "%" PRIdOSMID "\t", id);
663    copy_to_table(table, sql);
664
665    for (j=0; j < exportListCount[OSMTYPE_WAY]; j++) {
666            if( exportList[OSMTYPE_WAY][j].flags & FLAG_DELETE )
667                continue;
668            if( (exportList[OSMTYPE_WAY][j].flags & FLAG_PHSTORE) == FLAG_PHSTORE)
669                continue;
670            if ((tag = getTag(tags, exportList[OSMTYPE_WAY][j].name)))
671            {
672                exportList[OSMTYPE_WAY][j].count++;
673                escape_type(sql, sqllen, tag->value, exportList[OSMTYPE_WAY][j].type);
674                if (HSTORE_NORM==Options->enable_hstore)
675                  tag->has_column=1;
676            }
677            else
678                sprintf(sql, "\\N");
679
680            copy_to_table(table, sql);
681            copy_to_table(table, "\t");
682    }
683   
684    // hstore columns
685    write_hstore_columns(table, tags);
686   
687    // check if a regular hstore is requested
688    if (Options->enable_hstore)
689        write_hstore(table, tags);
690   
691    sprintf(sql, "SRID=%d;", SRID);
692    copy_to_table(table, sql);
693    copy_to_table(table, wkt);
694    copy_to_table(table, "\n");
695}
696
697static int tag_indicates_polygon(enum OsmType type, const char *key)
698{
699    int i;
700
701    if (!strcmp(key, "area"))
702        return 1;
703
704    for (i=0; i < exportListCount[type]; i++) {
705        if( strcmp( exportList[type][i].name, key ) == 0 )
706            return exportList[type][i].flags & FLAG_POLYGON;
707    }
708
709    return 0;
710}
711
712/* Go through the given tags and determine the union of flags. Also remove
713 * any tags from the list that we don't know about */
714unsigned int pgsql_filter_tags(enum OsmType type, struct keyval *tags, int *polygon)
715{
716    int i, filter = 1;
717    int flags = 0;
718    int add_area_tag = 0;
719
720    const char *area;
721    struct keyval *item;
722    struct keyval temp;
723    initList(&temp);
724
725    /* We used to only go far enough to determine if it's a polygon or not, but now we go through and filter stuff we don't need */
726    while( (item = popItem(tags)) != NULL )
727    {
728        /* Allow named islands to appear as polygons */
729        if (!strcmp("natural",item->key) && !strcmp("coastline",item->value))
730        {               
731            add_area_tag = 1; 
732        }
733
734        /* Discard natural=coastline tags (we render these from a shapefile instead) */
735        if (!Options->keep_coastlines && !strcmp("natural",item->key) && !strcmp("coastline",item->value))
736        {               
737            freeItem( item );
738            item = NULL;
739            continue;
740        }
741
742        for (i=0; i < exportListCount[type]; i++)
743        {
744            if( strcmp( exportList[type][i].name, item->key ) == 0 )
745            {
746                if( exportList[type][i].flags & FLAG_DELETE )
747                {
748                    freeItem( item );
749                    item = NULL;
750                    break;
751                }
752
753                filter = 0;
754                flags |= exportList[type][i].flags;
755
756                pushItem( &temp, item );
757                item = NULL;
758                break;
759            }
760        }
761        if( i == exportListCount[type] )
762        {
763          if (Options->enable_hstore) {
764            pushItem( &temp, item );
765            filter=0;
766          } else {
767            freeItem( item );
768          }
769          item = NULL;
770        }
771    }
772
773    /* Move from temp list back to original list */
774    while( (item = popItem(&temp)) != NULL )
775        pushItem( tags, item );
776
777    *polygon = flags & FLAG_POLYGON;
778
779    /* Special case allowing area= to override anything else */
780    if ((area = getItem(tags, "area"))) {
781        if (!strcmp(area, "yes") || !strcmp(area, "true") ||!strcmp(area, "1"))
782            *polygon = 1;
783        else if (!strcmp(area, "no") || !strcmp(area, "false") || !strcmp(area, "0"))
784            *polygon = 0;
785    } else {
786        /* If we need to force this as a polygon, append an area tag */
787        if (add_area_tag) {
788            addItem(tags, "area", "yes", 0);
789            *polygon = 1;
790        }
791    }
792
793    return filter;
794}
795
796/*
797COPY planet_osm (osm_id, name, place, landuse, leisure, "natural", man_made, waterway, highway, railway, amenity, tourism, learning, bu
798ilding, bridge, layer, way) FROM stdin;
799198497  Bedford Road    \N      \N      \N      \N      \N      \N      residential     \N      \N      \N      \N      \N      \N    \N       0102000020E610000004000000452BF702B342D5BF1C60E63BF8DF49406B9C4D470037D5BF5471E316F3DF4940DFA815A6EF35D5BF9AE95E27F5DF4940B41EB
800E4C1421D5BF24D06053E7DF4940
801212696  Oswald Road     \N      \N      \N      \N      \N      \N      minor   \N      \N      \N      \N      \N      \N      \N    0102000020E610000004000000467D923B6C22D5BFA359D93EE4DF4940B3976DA7AD11D5BF84BBB376DBDF4940997FF44D9A06D5BF4223D8B8FEDF49404D158C4AEA04D
8025BF5BB39597FCDF4940
803*/
804static int pgsql_out_way(osmid_t id, struct keyval *tags, struct osmNode *nodes, int count, int exists)
805{
806    int polygon = 0, roads = 0;
807    int i, wkt_size;
808    double split_at;
809
810    /* If the flag says this object may exist already, delete it first */
811    if(exists) {
812        pgsql_delete_way_from_output(id);
813        Options->mid->way_changed(id);
814    }
815
816    if (pgsql_filter_tags(OSMTYPE_WAY, tags, &polygon) || add_z_order(tags, &roads))
817        return 0;
818
819    // Split long ways after around 1 degree or 100km
820    if (Options->projection == PROJ_LATLONG)
821        split_at = 1;
822    else
823        split_at = 100 * 1000;
824
825    wkt_size = get_wkt_split(nodes, count, polygon, split_at);
826
827    for (i=0;i<wkt_size;i++)
828    {
829        char *wkt = get_wkt(i);
830
831        if (wkt && strlen(wkt)) {
832            /* FIXME: there should be a better way to detect polygons */
833            if (!strncmp(wkt, "POLYGON", strlen("POLYGON")) || !strncmp(wkt, "MULTIPOLYGON", strlen("MULTIPOLYGON"))) {
834                expire_tiles_from_nodes_poly(nodes, count, id);
835                double area = get_area(i);
836                if (area > 0.0) {
837                    char tmp[32];
838                    snprintf(tmp, sizeof(tmp), "%f", area);
839                    addItem(tags, "way_area", tmp, 0);
840                }
841                write_wkts(id, tags, wkt, t_poly);
842            } else {
843                expire_tiles_from_nodes_line(nodes, count);
844                write_wkts(id, tags, wkt, t_line);
845                if (roads)
846                    write_wkts(id, tags, wkt, t_roads);
847            }
848        }
849        free(wkt);
850    }
851    clear_wkts();
852       
853    return 0;
854}
855
856static int pgsql_out_relation(osmid_t id, struct keyval *rel_tags, struct osmNode **xnodes, struct keyval *xtags, int *xcount, osmid_t *xid, const char **xrole)
857{
858    int i, wkt_size;
859    int polygon = 0, roads = 0;
860    int make_polygon = 0;
861    int make_boundary = 0;
862    struct keyval tags, *p, poly_tags;
863    char *type;
864    double split_at;
865
866#if 0
867    fprintf(stderr, "Got relation with counts:");
868    for (i=0; xcount[i]; i++)
869        fprintf(stderr, " %d", xcount[i]);
870    fprintf(stderr, "\n");
871#endif
872    /* Get the type, if there's no type we don't care */
873    type = getItem(rel_tags, "type");
874    if( !type )
875        return 0;
876
877    initList(&tags);
878    initList(&poly_tags);
879
880    /* Clone tags from relation */
881    p = rel_tags->next;
882    while (p != rel_tags) {
883        // For routes, we convert name to route_name
884        if ((strcmp(type, "route") == 0) && (strcmp(p->key, "name") ==0))
885            addItem(&tags, "route_name", p->value, 1);
886        else if (strcmp(p->key, "type")) // drop type=
887            addItem(&tags, p->key, p->value, 1);
888        p = p->next;
889    }
890
891    if( strcmp(type, "route") == 0 )
892    {
893        const char *state = getItem(rel_tags, "state");
894        const char *netw = getItem(rel_tags, "network");
895        int networknr = -1;
896
897        if (state == NULL) {
898            state = "";
899        }
900
901        if (netw != NULL) {
902            if (strcmp(netw, "lcn") == 0) {
903                networknr = 10;
904                if (strcmp(state, "alternate") == 0) {
905                    addItem(&tags, "lcn", "alternate", 1);
906                } else if (strcmp(state, "connection") == 0) {
907                    addItem(&tags, "lcn", "connection", 1);
908                } else {
909                    addItem(&tags, "lcn", "yes", 1);
910                }
911            } else if (strcmp(netw, "rcn") == 0) {
912                networknr = 11;
913                if (strcmp(state, "alternate") == 0) {
914                    addItem(&tags, "rcn", "alternate", 1);
915                } else if (strcmp(state, "connection") == 0) {
916                    addItem(&tags, "rcn", "connection", 1);
917                } else {
918                    addItem(&tags, "rcn", "yes", 1);
919                }
920            } else if (strcmp(netw, "ncn") == 0) {
921                networknr = 12;
922                if (strcmp(state, "alternate") == 0) {
923                    addItem(&tags, "ncn", "alternate", 1);
924                } else if (strcmp(state, "connection") == 0) {
925                    addItem(&tags, "ncn", "connection", 1);
926                } else {
927                    addItem(&tags, "ncn", "yes", 1);
928                }
929
930
931            } else if (strcmp(netw, "lwn") == 0) {
932                networknr = 20;
933                if (strcmp(state, "alternate") == 0) {
934                    addItem(&tags, "lwn", "alternate", 1);
935                } else if (strcmp(state, "connection") == 0) {
936                    addItem(&tags, "lwn", "connection", 1);
937                } else {
938                    addItem(&tags, "lwn", "yes", 1);
939                }
940            } else if (strcmp(netw, "rwn") == 0) {
941                networknr = 21;
942                if (strcmp(state, "alternate") == 0) {
943                    addItem(&tags, "rwn", "alternate", 1);
944                } else if (strcmp(state, "connection") == 0) {
945                    addItem(&tags, "rwn", "connection", 1);
946                } else {
947                    addItem(&tags, "rwn", "yes", 1);
948                }
949            } else if (strcmp(netw, "nwn") == 0) {
950                networknr = 22;
951                if (strcmp(state, "alternate") == 0) {
952                    addItem(&tags, "nwn", "alternate", 1);
953                } else if (strcmp(state, "connection") == 0) {
954                    addItem(&tags, "nwn", "connection", 1);
955                } else {
956                    addItem(&tags, "nwn", "yes", 1);
957                }
958            }
959        }
960
961        if (getItem(rel_tags, "preferred_color") != NULL) {
962            const char *a = getItem(rel_tags, "preferred_color");
963            if (strcmp(a, "0") == 0 || strcmp(a, "1") == 0 || strcmp(a, "2") == 0 || strcmp(a, "3") == 0 || strcmp(a, "4") == 0) {
964                addItem(&tags, "route_pref_color", a, 1);
965            } else {
966                addItem(&tags, "route_pref_color", "0", 1);
967            }
968        } else {
969            addItem(&tags, "route_pref_color", "0", 1);
970        }
971
972        if (getItem(rel_tags, "ref") != NULL) {
973            if (networknr == 10) {
974                addItem(&tags, "lcn_ref", getItem(rel_tags, "ref"), 1);
975            } else if (networknr == 11) {
976                addItem(&tags, "rcn_ref", getItem(rel_tags, "ref"), 1);
977            } else if (networknr == 12) {
978                addItem(&tags, "ncn_ref", getItem(rel_tags, "ref"), 1);
979            } else if (networknr == 20) {
980                addItem(&tags, "lwn_ref", getItem(rel_tags, "ref"), 1);
981            } else if (networknr == 21) {
982                addItem(&tags, "rwn_ref", getItem(rel_tags, "ref"), 1);
983            } else if (networknr == 22) {
984                addItem(&tags, "nwn_ref", getItem(rel_tags, "ref"), 1);
985            }
986        }
987    }
988    else if( strcmp( type, "boundary" ) == 0 )
989    {
990        // Boundaries will get converted into multiple geometries:
991        // - Linear features will end up in the line and roads tables (useful for admin boundaries)
992        // - Polygon features also go into the polygon table (useful for national_forests)
993        // The edges of the polygon also get treated as linear fetaures allowing these to be rendered seperately.
994        make_boundary = 1;
995    }
996    else if( strcmp( type, "multipolygon" ) == 0 && getItem(&tags, "boundary") )
997    {
998        // Treat type=multipolygon exactly like type=boundary if it has a boundary tag.
999        make_boundary = 1;
1000    }
1001    else if( strcmp( type, "multipolygon" ) == 0 )
1002    {
1003        make_polygon = 1;
1004
1005        /* Copy the tags from the outer way(s) if the relation is untagged */
1006        /* or if there is just a name tag, people seem to like naming relations */
1007        if (!listHasData(&tags) || ((countList(&tags)==1) && getItem(&tags, "name"))) {
1008            for (i=0; xcount[i]; i++) {
1009                if (xrole[i] && !strcmp(xrole[i], "inner"))
1010                    continue;
1011
1012                p = xtags[i].next;
1013                while (p != &(xtags[i])) {
1014                    addItem(&tags, p->key, p->value, 1);
1015                    p = p->next;
1016                }
1017            }
1018        }
1019
1020        // Collect a list of polygon-like tags, these are used later to
1021        // identify if an inner rings looks like it should be rendered seperately
1022        p = tags.next;
1023        while (p != &tags) {
1024            if (tag_indicates_polygon(OSMTYPE_WAY, p->key)) {
1025                addItem(&poly_tags, p->key, p->value, 1);
1026                //fprintf(stderr, "found a polygon tag: %s=%s\n", p->key, p->value);
1027            }
1028            p = p->next;
1029        }
1030    }
1031    else
1032    {
1033        /* Unknown type, just exit */
1034        resetList(&tags);
1035        resetList(&poly_tags);
1036        return 0;
1037    }
1038
1039    if (pgsql_filter_tags(OSMTYPE_WAY, &tags, &polygon) || add_z_order(&tags, &roads)) {
1040        resetList(&tags);
1041        resetList(&poly_tags);
1042        return 0;
1043    }
1044
1045    // Split long linear ways after around 1 degree or 100km (polygons not effected)
1046    if (Options->projection == PROJ_LATLONG)
1047        split_at = 1;
1048    else
1049        split_at = 100 * 1000;
1050
1051    wkt_size = build_geometry(id, xnodes, xcount, make_polygon, Options->enable_multi, split_at);
1052
1053    if (!wkt_size) {
1054        resetList(&tags);
1055        resetList(&poly_tags);
1056        return 0;
1057    }
1058
1059    for (i=0;i<wkt_size;i++)
1060    {
1061        char *wkt = get_wkt(i);
1062
1063        if (wkt && strlen(wkt)) {
1064            expire_tiles_from_wkt(wkt, -id);
1065            /* FIXME: there should be a better way to detect polygons */
1066            if (!strncmp(wkt, "POLYGON", strlen("POLYGON")) || !strncmp(wkt, "MULTIPOLYGON", strlen("MULTIPOLYGON"))) {
1067                double area = get_area(i);
1068                if (area > 0.0) {
1069                    char tmp[32];
1070                    snprintf(tmp, sizeof(tmp), "%f", area);
1071                    addItem(&tags, "way_area", tmp, 0);
1072                }
1073                write_wkts(-id, &tags, wkt, t_poly);
1074            } else {
1075                write_wkts(-id, &tags, wkt, t_line);
1076                if (roads)
1077                    write_wkts(-id, &tags, wkt, t_roads);
1078            }
1079        }
1080        free(wkt);
1081    }
1082
1083    clear_wkts();
1084
1085    // If we are creating a multipolygon then we
1086    // mark each member so that we can skip them during iterate_ways
1087    // but only if the polygon-tags look the same as the outer ring
1088    if (make_polygon) {
1089        for (i=0; xcount[i]; i++) {
1090            int match = 0;
1091            struct keyval *p = poly_tags.next;
1092            while (p != &poly_tags) {
1093                const char *v = getItem(&xtags[i], p->key);
1094                //fprintf(stderr, "compare polygon tag: %s=%s vs %s\n", p->key, p->value, v ? v : "null");
1095                if (!v || strcmp(v, p->value)) {
1096                    match = 0;
1097                    break;
1098                }
1099                match = 1;
1100                p = p->next;
1101            }
1102            if (match) {
1103                //fprintf(stderr, "match for %d\n", xid[i]);
1104                Options->mid->ways_done(xid[i]);
1105                pgsql_delete_way_from_output(xid[i]);
1106            }
1107        }
1108    }
1109
1110    // If we are making a boundary then also try adding any relations which form complete rings
1111    // The linear variants will have already been processed above
1112    if (make_boundary) {
1113        wkt_size = build_geometry(id, xnodes, xcount, 1, Options->enable_multi, split_at);
1114        for (i=0;i<wkt_size;i++)
1115        {
1116            char *wkt = get_wkt(i);
1117
1118            if (strlen(wkt)) {
1119                expire_tiles_from_wkt(wkt, -id);
1120                /* FIXME: there should be a better way to detect polygons */
1121                if (!strncmp(wkt, "POLYGON", strlen("POLYGON")) || !strncmp(wkt, "MULTIPOLYGON", strlen("MULTIPOLYGON"))) {
1122                    double area = get_area(i);
1123                    if (area > 0.0) {
1124                        char tmp[32];
1125                        snprintf(tmp, sizeof(tmp), "%f", area);
1126                        addItem(&tags, "way_area", tmp, 0);
1127                    }
1128                    write_wkts(-id, &tags, wkt, t_poly);
1129                }
1130            }
1131            free(wkt);
1132        }
1133        clear_wkts();
1134    }
1135
1136    resetList(&tags);
1137    resetList(&poly_tags);
1138    return 0;
1139}
1140
1141static int pgsql_out_connect(const struct output_options *options, int startTransaction) {
1142    int i;
1143    for (i=0; i<NUM_TABLES; i++) {
1144        PGconn *sql_conn;
1145        sql_conn = PQconnectdb(options->conninfo);
1146       
1147        /* Check to see that the backend connection was successfully made */
1148        if (PQstatus(sql_conn) != CONNECTION_OK) {
1149            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1150            exit_nicely();
1151        }
1152        tables[i].sql_conn = sql_conn;
1153        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
1154        if (startTransaction)
1155            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
1156    }
1157    return 0;
1158}
1159
1160static int pgsql_out_start(const struct output_options *options)
1161{
1162    char *sql, tmp[256];
1163    PGresult   *res;
1164    int i,j;
1165    unsigned int sql_len;
1166
1167    Options = options;
1168
1169    read_style_file( options->style );
1170
1171    sql_len = 2048;
1172    sql = malloc(sql_len);
1173    assert(sql);
1174
1175    for (i=0; i<NUM_TABLES; i++) {
1176        PGconn *sql_conn;
1177
1178        /* Substitute prefix into name of table */
1179        {
1180            char *temp = malloc( strlen(options->prefix) + strlen(tables[i].name) + 1 );
1181            sprintf( temp, tables[i].name, options->prefix );
1182            tables[i].name = temp;
1183        }
1184        fprintf(stderr, "Setting up table: %s\n", tables[i].name);
1185        sql_conn = PQconnectdb(options->conninfo);
1186
1187        /* Check to see that the backend connection was successfully made */
1188        if (PQstatus(sql_conn) != CONNECTION_OK) {
1189            fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
1190            exit_nicely();
1191        }
1192        tables[i].sql_conn = sql_conn;
1193        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
1194
1195        if (!options->append) {
1196            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s", tables[i].name);
1197        }
1198        else
1199        {
1200            sprintf(sql, "SELECT srid FROM geometry_columns WHERE f_table_name='%s';", tables[i].name);
1201            res = PQexec(sql_conn, sql);
1202            if (!((PQntuples(res) == 1) && (PQnfields(res) == 1)))
1203            {
1204                fprintf(stderr, "Problem reading geometry information for table %s - does it exist?\n", tables[i].name);
1205                exit_nicely();
1206            }
1207            int their_srid = atoi(PQgetvalue(res, 0, 0));
1208            PQclear(res);
1209            if (their_srid != SRID)
1210            {
1211                fprintf(stderr, "SRID mismatch: cannot append to table %s (SRID %d) using selected SRID %d\n", tables[i].name, their_srid, SRID);
1212                exit_nicely();
1213            }
1214        }
1215
1216        /* These _tmp tables can be left behind if we run out of disk space */
1217        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s_tmp", tables[i].name);
1218
1219        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
1220
1221        enum OsmType type = (i == t_point)?OSMTYPE_NODE:OSMTYPE_WAY;
1222        int numTags = exportListCount[type];
1223        struct taginfo *exportTags = exportList[type];
1224        if (!options->append) {
1225            sprintf(sql, "CREATE TABLE %s ( osm_id " POSTGRES_OSMID_TYPE, tables[i].name );
1226            for (j=0; j < numTags; j++) {
1227                if( exportTags[j].flags & FLAG_DELETE )
1228                    continue;
1229                if( (exportTags[j].flags & FLAG_PHSTORE ) == FLAG_PHSTORE)
1230                    continue;
1231                sprintf(tmp, ",\"%s\" %s", exportTags[j].name, exportTags[j].type);
1232                if (strlen(sql) + strlen(tmp) + 1 > sql_len) {
1233                    sql_len *= 2;
1234                    sql = realloc(sql, sql_len);
1235                    assert(sql);
1236                }
1237                strcat(sql, tmp);
1238            }
1239            int i_hstore_column;
1240            for(i_hstore_column = 0; i_hstore_column < Options->n_hstore_columns; i_hstore_column++)
1241            {
1242                strcat(sql, ",\"");
1243                strcat(sql, Options->hstore_columns[i_hstore_column]);
1244                strcat(sql, "\" hstore ");
1245            }
1246            if (Options->enable_hstore) {
1247                strcat(sql, ",tags hstore");
1248            } 
1249            strcat(sql, ")");
1250            if (Options->tblsmain_data) {
1251                sprintf(sql + strlen(sql), " TABLESPACE %s", Options->tblsmain_data);
1252            }
1253            strcat(sql, "\n");
1254
1255            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", sql);
1256            pgsql_exec(sql_conn, PGRES_TUPLES_OK, "SELECT AddGeometryColumn('%s', 'way', %d, '%s', 2 );\n",
1257                        tables[i].name, SRID, tables[i].type );
1258            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ALTER TABLE %s ALTER COLUMN way SET NOT NULL;\n", tables[i].name);
1259            /* slim mode needs this to be able to apply diffs */
1260            if (Options->slim && !Options->droptemp) {
1261                sprintf(sql, "CREATE INDEX %s_pkey ON %s USING BTREE (osm_id)",  tables[i].name, tables[i].name);
1262                if (Options->tblsmain_index) {
1263                    sprintf(sql + strlen(sql), " TABLESPACE %s\n", Options->tblsmain_index);
1264                }
1265                    pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", sql);
1266            }
1267        } else {
1268            /* Add any new columns referenced in the default.style */
1269            PGresult *res;
1270            sprintf(sql, "SELECT * FROM %s LIMIT 0;\n", tables[i].name);
1271            res = PQexec(sql_conn, sql);
1272            if (PQresultStatus(res) != PGRES_TUPLES_OK) {
1273                fprintf(stderr, "Error, failed to query table %s\n%s\n", tables[i].name, sql);
1274                exit_nicely();
1275            }
1276            for (j=0; j < numTags; j++) {
1277                if( exportTags[j].flags & FLAG_DELETE )
1278                    continue;
1279                if( (exportTags[j].flags & FLAG_PHSTORE) == FLAG_PHSTORE)
1280                    continue;
1281                sprintf(tmp, "\"%s\"", exportTags[j].name);
1282                if (PQfnumber(res, tmp) < 0) {
1283#if 0
1284                    fprintf(stderr, "Append failed. Column \"%s\" is missing from \"%s\"\n", exportTags[j].name, tables[i].name);
1285                    exit_nicely();
1286#else
1287                    fprintf(stderr, "Adding new column \"%s\" to \"%s\"\n", exportTags[j].name, tables[i].name);
1288                    pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ALTER TABLE %s ADD COLUMN \"%s\" %s;\n", tables[i].name, exportTags[j].name, exportTags[j].type);
1289#endif
1290                }
1291                /* Note: we do not verify the type or delete unused columns */
1292            }
1293
1294            PQclear(res);
1295
1296            /* change the type of the geometry column if needed - this can only change to a more permisive type */
1297            //            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "UPDATE geometry_columns SET type = '%s' where type != '%s' and f_table_name = '%s' and f_geometry_column = 'way'",
1298            //           tables[i].type, tables[i].type, tables[i].name);
1299        }
1300        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "PREPARE get_way (" POSTGRES_OSMID_TYPE ") AS SELECT ST_AsText(way) FROM %s WHERE osm_id = $1;\n", tables[i].name);
1301       
1302        /* Generate column list for COPY */
1303        strcpy(sql, "osm_id");
1304        for (j=0; j < numTags; j++) {
1305            if( exportTags[j].flags & FLAG_DELETE )
1306                continue;
1307            if( (exportTags[j].flags & FLAG_PHSTORE ) == FLAG_PHSTORE)
1308                    continue;
1309            sprintf(tmp, ",\"%s\"", exportTags[j].name);
1310
1311            if (strlen(sql) + strlen(tmp) + 1 > sql_len) {
1312                sql_len *= 2;
1313                sql = realloc(sql, sql_len);
1314                assert(sql);
1315            }
1316            strcat(sql, tmp);
1317        }
1318
1319        int i_hstore_column;
1320        for(i_hstore_column = 0; i_hstore_column < Options->n_hstore_columns; i_hstore_column++)
1321        {
1322            strcat(sql, ",\"");
1323            strcat(sql, Options->hstore_columns[i_hstore_column]);
1324            strcat(sql, "\" ");
1325        }
1326   
1327        if (Options->enable_hstore) strcat(sql,",tags");
1328
1329        tables[i].columns = strdup(sql);
1330        pgsql_exec(sql_conn, PGRES_COPY_IN, "COPY %s (%s,way) FROM STDIN", tables[i].name, tables[i].columns);
1331
1332        tables[i].copyMode = 1;
1333    }
1334    free(sql);
1335
1336    expire_tiles_init(options);
1337
1338    options->mid->start(options);
1339
1340    return 0;
1341}
1342
1343static void pgsql_pause_copy(struct s_table *table)
1344{
1345    PGresult   *res;
1346    if( !table->copyMode )
1347        return;
1348       
1349    /* Terminate any pending COPY */
1350    int stop = PQputCopyEnd(table->sql_conn, NULL);
1351    if (stop != 1) {
1352       fprintf(stderr, "COPY_END for %s failed: %s\n", table->name, PQerrorMessage(table->sql_conn));
1353       exit_nicely();
1354    }
1355
1356    res = PQgetResult(table->sql_conn);
1357    if (PQresultStatus(res) != PGRES_COMMAND_OK) {
1358       fprintf(stderr, "COPY_END for %s failed: %s\n", table->name, PQerrorMessage(table->sql_conn));
1359       PQclear(res);
1360       exit_nicely();
1361    }
1362    PQclear(res);
1363    table->copyMode = 0;
1364}
1365
1366static void pgsql_out_close(int stopTransaction) {
1367    int i;
1368    for (i=0; i<NUM_TABLES; i++) {
1369        pgsql_pause_copy(&tables[i]);
1370        // Commit transaction
1371        if (stopTransaction)
1372            pgsql_exec(tables[i].sql_conn, PGRES_COMMAND_OK, "COMMIT");
1373        PQfinish(tables[i].sql_conn);
1374        tables[i].sql_conn = NULL;
1375    }
1376}
1377
1378static void pgsql_out_commit(void) {
1379    int i;
1380    for (i=0; i<NUM_TABLES; i++) {
1381        pgsql_pause_copy(&tables[i]);
1382        // Commit transaction
1383        fprintf(stderr, "Committing transaction for %s\n", tables[i].name);
1384        pgsql_exec(tables[i].sql_conn, PGRES_COMMAND_OK, "COMMIT");
1385    }
1386}
1387
1388static void *pgsql_out_stop_one(void *arg)
1389{
1390    struct s_table *table = arg;
1391    PGconn *sql_conn = table->sql_conn;
1392
1393    if( table->buflen != 0 )
1394    {
1395       fprintf( stderr, "Internal error: Buffer for %s has %d bytes after end copy", table->name, table->buflen );
1396       exit_nicely();
1397    }
1398
1399    pgsql_pause_copy(table);
1400    // Commit transaction
1401    //fprintf(stderr, "Committing transaction for %s\n", table->name);
1402    //pgsql_exec(sql_conn, PGRES_COMMAND_OK, "COMMIT");
1403    if (!Options->append)
1404    {
1405        time_t start, end;
1406        time(&start);
1407        fprintf(stderr, "Sorting data and creating indexes for %s\n", table->name);
1408        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ANALYZE %s;\n", table->name);
1409        if (Options->tblsmain_data) {
1410            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE TABLE %s_tmp "
1411                        "TABLESPACE %s AS SELECT * FROM %s ORDER BY way;\n",
1412                        table->name, Options->tblsmain_data, table->name);
1413        } else {
1414            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE TABLE %s_tmp AS SELECT * FROM %s ORDER BY way;\n", table->name, table->name);
1415        }
1416        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE %s;\n", table->name);
1417        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ALTER TABLE %s_tmp RENAME TO %s;\n", table->name, table->name);
1418        if (Options->tblsmain_index) {
1419            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_index ON %s USING GIST (way) TABLESPACE %s;\n", table->name, table->name, Options->tblsmain_index);
1420        } else {
1421            pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_index ON %s USING GIST (way);\n", table->name, table->name);
1422        }
1423
1424        /* slim mode needs this to be able to apply diffs */
1425        if (Options->slim && !Options->droptemp)
1426        {
1427            if (Options->tblsmain_index) {
1428                pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_pkey ON %s USING BTREE (osm_id) TABLESPACE %s;\n", table->name, table->name, Options->tblsmain_index);
1429            } else {
1430                pgsql_exec(sql_conn, PGRES_COMMAND_OK, "CREATE INDEX %s_pkey ON %s USING BTREE (osm_id);\n", table->name, table->name);
1431            }
1432        }
1433        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "GRANT SELECT ON %s TO PUBLIC;\n", table->name);
1434        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "ANALYZE %s;\n", table->name);
1435        time(&end);
1436        fprintf(stderr, "Indexes on  %s created  in %ds\n", table->name, (int)(end - start));
1437    }
1438    PQfinish(sql_conn);
1439    table->sql_conn = NULL;
1440
1441    fprintf(stderr, "Completed %s\n", table->name);
1442    free(table->name);
1443    free(table->columns);
1444    return NULL;
1445}
1446
1447static void pgsql_out_stop()
1448{
1449    int i;
1450#ifdef HAVE_PTHREAD
1451    pthread_t threads[NUM_TABLES];
1452#endif
1453
1454    /* Commit the transactions, so that multiple processes can
1455     * access the data simultanious to process the rest in parallel
1456     * as well as see the newly created tables.
1457     */
1458    pgsql_out_commit();
1459    Options->mid->commit();
1460    /* To prevent deadlocks in parallel processing, the mid tables need
1461     * to stay out of a transaction. In this stage output tables are only
1462     * written to and not read, so they can be processed as several parallel
1463     * independent transactions
1464     */
1465    for (i=0; i<NUM_TABLES; i++) {
1466        PGconn *sql_conn = tables[i].sql_conn;
1467        pgsql_exec(sql_conn, PGRES_COMMAND_OK, "BEGIN");
1468    }
1469    /* Processing any remaing to be processed ways */
1470    Options->mid->iterate_ways( pgsql_out_way );
1471    pgsql_out_commit();
1472    Options->mid->commit();
1473
1474    /* Processing any remaing to be processed relations */
1475    /* During this stage output tables also need to stay out of
1476     * extended transactions, as the delete_way_from_output, called
1477     * from process_relation, can deadlock if using multi-processing.
1478     */   
1479    Options->mid->iterate_relations( pgsql_process_relation );
1480
1481#ifdef HAVE_PTHREAD
1482    if (Options->parallel_indexing) {
1483      for (i=0; i<NUM_TABLES; i++) {
1484          int ret = pthread_create(&threads[i], NULL, pgsql_out_stop_one, &tables[i]);
1485          if (ret) {
1486              fprintf(stderr, "pthread_create() returned an error (%d)", ret);
1487              exit_nicely();
1488          }
1489      }
1490 
1491      /* No longer need to access middle layer -- release memory */
1492      Options->mid->stop();
1493 
1494      for (i=0; i<NUM_TABLES; i++) {
1495          int ret = pthread_join(threads[i], NULL);
1496          if (ret) {
1497              fprintf(stderr, "pthread_join() returned an error (%d)", ret);
1498              exit_nicely();
1499          }
1500      }
1501    } else {
1502#endif
1503
1504    /* No longer need to access middle layer -- release memory */
1505    Options->mid->stop();
1506    for (i=0; i<NUM_TABLES; i++)
1507        pgsql_out_stop_one(&tables[i]);
1508
1509#ifdef HAVE_PTHREAD
1510    }
1511#endif
1512
1513
1514    pgsql_out_cleanup();
1515    free_style();
1516
1517    expire_tiles_stop();
1518}
1519
1520static int pgsql_add_node(osmid_t id, double lat, double lon, struct keyval *tags)
1521{
1522  int polygon;
1523  int filter = pgsql_filter_tags(OSMTYPE_NODE, tags, &polygon);
1524 
1525  Options->mid->nodes_set(id, lat, lon, tags);
1526  if( !filter )
1527      pgsql_out_node(id, tags, lat, lon);
1528  return 0;
1529}
1530
1531static int pgsql_add_way(osmid_t id, osmid_t *nds, int nd_count, struct keyval *tags)
1532{
1533  int polygon = 0;
1534
1535  // Check whether the way is: (1) Exportable, (2) Maybe a polygon
1536  int filter = pgsql_filter_tags(OSMTYPE_WAY, tags, &polygon);
1537
1538  // If this isn't a polygon then it can not be part of a multipolygon
1539  // Hence only polygons are "pending"
1540  Options->mid->ways_set(id, nds, nd_count, tags, (!filter && polygon) ? 1 : 0);
1541
1542  if( !polygon && !filter )
1543  {
1544    /* Get actual node data and generate output */
1545    struct osmNode *nodes = malloc( sizeof(struct osmNode) * nd_count );
1546    int count = Options->mid->nodes_get_list( nodes, nds, nd_count );
1547    pgsql_out_way(id, tags, nodes, count, 0);
1548    free(nodes);
1549  }
1550  return 0;
1551}
1552
1553/* This is the workhorse of pgsql_add_relation, split out because it is used as the callback for iterate relations */
1554static int pgsql_process_relation(osmid_t id, struct member *members, int member_count, struct keyval *tags, int exists)
1555{
1556  // (osmid_t id, struct keyval *rel_tags, struct osmNode **xnodes, struct keyval **xtags, int *xcount)
1557  int i, count;
1558  osmid_t *xid = malloc( (member_count+1) * sizeof(osmid_t) );
1559  const char **xrole = malloc( (member_count+1) * sizeof(const char *) );
1560  int *xcount = malloc( (member_count+1) * sizeof(int) );
1561  struct keyval *xtags  = malloc( (member_count+1) * sizeof(struct keyval) );
1562  struct osmNode **xnodes = malloc( (member_count+1) * sizeof(struct osmNode*) );
1563
1564  /* If the flag says this object may exist already, delete it first */
1565  if(exists)
1566      pgsql_delete_relation_from_output(id);
1567
1568  count = 0;
1569  for( i=0; i<member_count; i++ )
1570  {
1571    /* Need to handle more than just ways... */
1572    if( members[i].type != OSMTYPE_WAY )
1573        continue;
1574
1575    initList(&(xtags[count]));
1576    if( Options->mid->ways_get( members[i].id, &(xtags[count]), &(xnodes[count]), &(xcount[count]) ) )
1577      continue;
1578    xid[count] = members[i].id;
1579    xrole[count] = members[i].role;
1580    count++;
1581  }
1582  xnodes[count] = NULL;
1583  xcount[count] = 0;
1584  xid[count] = 0;
1585  xrole[count] = NULL;
1586  // At some point we might want to consider storing the retreived data in the members, rather than as seperate arrays
1587  pgsql_out_relation(id, tags, xnodes, xtags, xcount, xid, xrole);
1588
1589  for( i=0; i<count; i++ )
1590  {
1591    resetList( &(xtags[i]) );
1592    free( xnodes[i] );
1593  }
1594
1595  free(xid);
1596  free(xrole);
1597  free(xcount);
1598  free(xtags);
1599  free(xnodes);
1600  return 0;
1601}
1602
1603static int pgsql_add_relation(osmid_t id, struct member *members, int member_count, struct keyval *tags)
1604{
1605  const char *type = getItem(tags, "type");
1606
1607  // Must have a type field or we ignore it
1608  if (!type)
1609      return 0;
1610
1611  /* In slim mode we remember these*/
1612  if(Options->mid->relations_set)
1613    Options->mid->relations_set(id, members, member_count, tags);
1614  // (osmid_t id, struct keyval *rel_tags, struct osmNode **xnodes, struct keyval **xtags, int *xcount)
1615
1616  return pgsql_process_relation(id, members, member_count, tags, 0);
1617}
1618#define UNUSED  __attribute__ ((unused))
1619
1620/* Delete is easy, just remove all traces of this object. We don't need to
1621 * worry about finding objects that depend on it, since the same diff must
1622 * contain the change for that also. */
1623static int pgsql_delete_node(osmid_t osm_id)
1624{
1625    if( !Options->slim )
1626    {
1627        fprintf( stderr, "Cannot apply diffs unless in slim mode\n" );
1628        exit_nicely();
1629    }
1630    pgsql_pause_copy(&tables[t_point]);
1631    expire_tiles_from_db(tables[t_point].sql_conn, osm_id);
1632    pgsql_exec(tables[t_point].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_point].name, osm_id );
1633    Options->mid->nodes_delete(osm_id);
1634    return 0;
1635}
1636
1637/* Seperated out because we use it elsewhere */
1638static int pgsql_delete_way_from_output(osmid_t osm_id)
1639{
1640    /* Optimisation: we only need this is slim mode */
1641    if( !Options->slim )
1642        return 0;
1643    /* in droptemp mode we don't have indices and this takes ages. */
1644    if (Options->droptemp)
1645        return 0;
1646    pgsql_pause_copy(&tables[t_roads]);
1647    pgsql_pause_copy(&tables[t_line]);
1648    pgsql_pause_copy(&tables[t_poly]);
1649    expire_tiles_from_db(tables[t_roads].sql_conn, osm_id);
1650    expire_tiles_from_db(tables[t_line].sql_conn, osm_id);
1651    expire_tiles_from_db(tables[t_poly].sql_conn, osm_id);
1652    pgsql_exec(tables[t_roads].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_roads].name, osm_id );
1653    pgsql_exec(tables[t_line].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_line].name, osm_id );
1654    pgsql_exec(tables[t_poly].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_poly].name, osm_id );
1655    return 0;
1656}
1657
1658static int pgsql_delete_way(osmid_t osm_id)
1659{
1660    if( !Options->slim )
1661    {
1662        fprintf( stderr, "Cannot apply diffs unless in slim mode\n" );
1663        exit_nicely();
1664    }
1665    pgsql_delete_way_from_output(osm_id);
1666    Options->mid->ways_delete(osm_id);
1667    return 0;
1668}
1669
1670/* Relations are identified by using negative IDs */
1671static int pgsql_delete_relation_from_output(osmid_t osm_id)
1672{
1673    pgsql_pause_copy(&tables[t_roads]);
1674    pgsql_pause_copy(&tables[t_line]);
1675    pgsql_pause_copy(&tables[t_poly]);
1676    expire_tiles_from_db(tables[t_roads].sql_conn, -osm_id);
1677    expire_tiles_from_db(tables[t_line].sql_conn, -osm_id);
1678    expire_tiles_from_db(tables[t_poly].sql_conn, -osm_id);
1679    pgsql_exec(tables[t_roads].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_roads].name, -osm_id );
1680    pgsql_exec(tables[t_line].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_line].name, -osm_id );
1681    pgsql_exec(tables[t_poly].sql_conn, PGRES_COMMAND_OK, "DELETE FROM %s WHERE osm_id = %" PRIdOSMID, tables[t_poly].name, -osm_id );
1682    return 0;
1683}
1684
1685static int pgsql_delete_relation(osmid_t osm_id)
1686{
1687    if( !Options->slim )
1688    {
1689        fprintf( stderr, "Cannot apply diffs unless in slim mode\n" );
1690        exit_nicely();
1691    }
1692    pgsql_delete_relation_from_output(osm_id);
1693    Options->mid->relations_delete(osm_id);
1694    return 0;
1695}
1696
1697/* Modify is slightly trickier. The basic idea is we simply delete the
1698 * object and create it with the new parameters. Then we need to mark the
1699 * objects that depend on this one */
1700static int pgsql_modify_node(osmid_t osm_id, double lat, double lon, struct keyval *tags)
1701{
1702    if( !Options->slim )
1703    {
1704        fprintf( stderr, "Cannot apply diffs unless in slim mode\n" );
1705        exit_nicely();
1706    }
1707    pgsql_delete_node(osm_id);
1708    pgsql_add_node(osm_id, lat, lon, tags);
1709    Options->mid->node_changed(osm_id);
1710    return 0;
1711}
1712
1713static int pgsql_modify_way(osmid_t osm_id, osmid_t *nodes, int node_count, struct keyval *tags)
1714{
1715    if( !Options->slim )
1716    {
1717        fprintf( stderr, "Cannot apply diffs unless in slim mode\n" );
1718        exit_nicely();
1719    }
1720    pgsql_delete_way(osm_id);
1721    pgsql_add_way(osm_id, nodes, node_count, tags);
1722    Options->mid->way_changed(osm_id);
1723    return 0;
1724}
1725
1726static int pgsql_modify_relation(osmid_t osm_id, struct member *members, int member_count, struct keyval *tags)
1727{
1728    if( !Options->slim )
1729    {
1730        fprintf( stderr, "Cannot apply diffs unless in slim mode\n" );
1731        exit_nicely();
1732    }
1733    pgsql_delete_relation(osm_id);
1734    pgsql_add_relation(osm_id, members, member_count, tags);
1735    Options->mid->relation_changed(osm_id);
1736    return 0;
1737}
1738
1739struct output_t out_pgsql = {
1740        .start           = pgsql_out_start,
1741        .connect         = pgsql_out_connect,
1742        .stop            = pgsql_out_stop,
1743        .cleanup         = pgsql_out_cleanup,
1744        .close           = pgsql_out_close,
1745        .node_add        = pgsql_add_node,
1746        .way_add         = pgsql_add_way,
1747        .relation_add    = pgsql_add_relation,
1748       
1749        .node_modify     = pgsql_modify_node,
1750        .way_modify      = pgsql_modify_way,
1751        .relation_modify = pgsql_modify_relation,
1752
1753        .node_delete     = pgsql_delete_node,
1754        .way_delete      = pgsql_delete_way,
1755        .relation_delete = pgsql_delete_relation
1756};
Note: See TracBrowser for help on using the repository browser.