summaryrefslogtreecommitdiff
path: root/lib/Catmandu/Store/DBI/Handler/MySQL.pm
blob: a621f8a7d4ddcfb4b9f49f954abdaf8d7980dc8c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package Catmandu::Store::DBI::Handler::MySQL;

use Catmandu::Sane;
use Moo;
use namespace::clean;

our $VERSION = "0.13";

with 'Catmandu::Store::DBI::Handler';

# text types are case-insensitive in MySQL
sub _column_sql {
    my ($self, $map, $bag) = @_;
    my $col = $map->{column};
    my $dbh = $bag->store->dbh;
    my $sql = $dbh->quote_identifier($col) . " ";
    if ($map->{type} eq 'string' && $map->{unique}) {
        $sql .= 'VARCHAR(255) BINARY';
    }
    elsif ($map->{type} eq 'string') {
        $sql .= 'TEXT BINARY';
    }
    elsif ($map->{type} eq 'integer') {
        $sql .= 'INTEGER';
    }
    elsif ($map->{type} eq 'binary') {
        $sql .= 'LONGBLOB';
    }
    elsif ($map->{type} eq 'datetime') {
        $sql .= 'DATETIME';
    }
    elsif ($map->{type} eq 'datetime_milli') {
        if ($dbh->{mysql_clientversion} < 50640) {
            Catmandu::Error->throw(
                "DATETIME(3) type only supported in MySQL 5.6.4 and above");
        }
        $sql .= 'DATETIME(3)';
    }
    else {
        Catmandu::Error->throw("Unknown type '$map->{type}'");
    }
    if ($map->{unique}) {
        $sql .= " UNIQUE";
    }
    if ($map->{required}) {
        $sql .= " NOT NULL";
    }
    if (!$map->{unique} && $map->{index}) {
        if ($map->{type} eq 'string') {
            $sql .= ", INDEX($col(255))";
        }
        else {
            $sql .= ", INDEX($col)";
        }
    }
    $sql;
}

# http://devoluk.com/mysql-limit-offset-performance.html
sub select_sql {
    my ($self, $bag, $start, $limit, $where) = @_;

    my $q_id_field = $bag->_quote_id($bag->mapping->{_id}->{column});
    my $q_table    = $bag->_quote_id($bag->name);

    my $sql = "SELECT * FROM $q_table AS t1";
    $sql .= " JOIN (SELECT $q_id_field FROM $q_table";
    $sql .= " WHERE $where" if $where;

    my $default_order = $bag->default_order // $bag->store->default_order;

    if (defined $default_order && $default_order eq 'ID') {
        $sql .= " ORDER BY $q_id_field";
    }
    elsif (defined $default_order && $default_order ne 'NONE') {
        $sql .= " ORDER BY $default_order";
    }
    $sql
        .= " LIMIT $limit OFFSET $start) AS t2 ON t1.$q_id_field = t2.$q_id_field";

    $sql;
}

sub create_table {
    my ($self, $bag) = @_;
    my $mapping = $bag->mapping;
    my $dbh     = $bag->store->dbh;
    my $q_name  = $dbh->quote_identifier($bag->name);
    my $sql
        = "CREATE TABLE IF NOT EXISTS $q_name("
        . join(',', map {$self->_column_sql($_, $bag)} values %$mapping)
        . ")";
    $dbh->do($sql) or Catmandu::Error->throw($dbh->errstr);
}

sub add_row {
    my ($self, $bag, $row) = @_;
    my $dbh    = $bag->store->dbh;
    my @cols   = keys %$row;
    my @q_cols = map {$dbh->quote_identifier($_)} @cols;
    my @vals   = values %$row;
    my $q_name = $dbh->quote_identifier($bag->name);
    my $sql
        = "INSERT INTO $q_name("
        . join(',', @q_cols)
        . ") VALUES("
        . join(',', ('?') x @q_cols)
        . ") ON DUPLICATE KEY UPDATE "
        . join(',', map {"$_=VALUES($_)"} @q_cols);

    my $sth = $dbh->prepare_cached($sql)
        or Catmandu::Error->throw($dbh->errstr);
    $sth->execute(@vals) or Catmandu::Error->throw($sth->errstr);
    $sth->finish;
}

1;