znone
2019-08-27 18b8205f3984bff211131f82e4c0a1e33886ea64
提供以标准流的方式直接操纵ODBC的BLOB字段
7 files modified
583 ■■■■■ changed files
README.md 10 ●●●● patch | view | raw | blame | history
include/qtl_common.hpp 211 ●●●●● patch | view | raw | blame | history
include/qtl_mysql.hpp 213 ●●●●● patch | view | raw | blame | history
include/qtl_odbc.hpp 83 ●●●●● patch | view | raw | blame | history
test/TestMysql.cpp 2 ●●● patch | view | raw | blame | history
test/TestOdbc.cpp 62 ●●●●● patch | view | raw | blame | history
test/TestOdbc.h 2 ●●●●● patch | view | raw | blame | history
README.md
@@ -239,7 +239,7 @@
| float | float |
| double | double |
| char<br>varchar | const char*<br>std::string |
| blob<br>binary<br>text | qtl::const_blob_data<br>std::istream<br>qtl::mysql::blob_writer |
| blob<br>binary<br>text | qtl::const_blob_data<br>std::istream<br>qtl::blob_writer |
| date<br>time<br>datetime<br/>timestamp | qtl::mysql::time |
blob_writer是一个函数,它的定义如下:
@@ -259,12 +259,12 @@
| float | float |
| double | double |
| char<br>varchar | char[N]<br>std::array&lt;char, N&gt;<br>std::string<br>std::istream
| blob<br>binary<br>text | qtl::blob_data<br>std::ostream<br>qtl::mysql::blobbuf
| blob<br>binary<br>text | qtl::blob_data<br>std::ostream<br>qtl::blobbuf
| date<br>time<br>datetime<br>timestamp | qtl::mysql::time |
可以通过qtl::mysql::blobbuf读取BLOB字段的数据:
```C++
void read_blob(qtl::mysql::blobbuf& buf) {
void read_blob(qtl::blobbuf& buf) {
    istream s(&buf);
    ...
};
@@ -358,7 +358,7 @@
| CHAR<br>VARCHAR | const char*<br>std::string |
| WCHAR<br>WVARCHAR | const wchar_t*<br>std::wstring |
| BINARY | qtl::const_blob_data |
| LONGVARBINARY | std::istream |
| LONGVARBINARY | std::istream<br>qtl::blob_writer |
| DATE | qtl::odbc::date |
| TIME<br>UTCTIME | qtl::odbc::time |
| TIMESTAMP<br>UTCDATETIME | qtl::odbc::datetime |
@@ -379,7 +379,7 @@
| CHAR<br>VARCHAR | char[N]<br>std::array&lt;char, N&gt;<br>std::string |
| WCHAR<br>WVARCHAR | wchar_t[N]<br>std::array&lt;wchar_t, N&gt;<br>std::string |
| BINARY | qtl::blob_data |
| LONGVARBINARY | std::ostream |
| LONGVARBINARY | std::ostream<br>qtl::blobbuf |
| DATE | qtl::odbc::date |
| TIME<br>UTCTIME | qtl::odbc::time |
| TIMESTAMP<br>UTCDATETIME | qtl::odbc::datetime |
include/qtl_common.hpp
@@ -1116,6 +1116,217 @@
    };
};
class blobbuf : public std::streambuf
{
public:
    blobbuf() = default;
    blobbuf(const blobbuf&) = default;
    blobbuf& operator=(const blobbuf&) = default;
    virtual ~blobbuf()
    {
        overflow();
    }
protected:
    virtual pos_type seekoff(off_type off, std::ios_base::seekdir dir,
        std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override
    {
        if (which&std::ios_base::in)
        {
            pos_type pos = 0;
            pos = seekoff(m_pos, off, dir);
            return seekpos(pos, which);
        }
        return std::streambuf::seekoff(off, dir, which);
    }
    virtual pos_type seekpos(pos_type pos,
        std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override
    {
        if (pos >= m_size)
            return pos_type(off_type(-1));
        if (which&std::ios_base::out)
        {
            if (pos < m_pos || pos >= m_pos + off_type(egptr() - pbase()))
            {
                overflow();
                m_pos = pos;
                setp(m_buf.data(), m_buf.data() + m_buf.size());
            }
            else
            {
                pbump(off_type(pos - pabs()));
            }
        }
        else if (which&std::ios_base::in)
        {
            if (pos < m_pos || pos >= m_pos + off_type(epptr() - eback()))
            {
                m_pos = pos;
                setg(m_buf.data(), m_buf.data(), m_buf.data());
            }
            else
            {
                gbump(off_type(pos - gabs()));
            }
        }
        return pos;
    }
    virtual std::streamsize showmanyc() override
    {
        return m_size - pabs();
    }
    virtual int_type underflow() override
    {
        if (pptr() > pbase())
            overflow();
        off_type count = egptr() - eback();
        pos_type next_pos = 0;
        if (count == 0 && eback() == m_buf.data())
        {
            setg(m_buf.data(), m_buf.data(), m_buf.data() + m_buf.size());
            count = m_buf.size();
        }
        else
        {
            next_pos = m_pos + pos_type(count);
        }
        if (next_pos >= m_size)
            return traits_type::eof();
        count = std::min(count, m_size - next_pos);
        m_pos = next_pos;
        if (read_blob(m_buf.data(), count, m_pos))
        {
            setg(eback(), eback(), eback() + count);
            return traits_type::to_int_type(*gptr());
        }
        else
        {
            return traits_type::eof();
        }
    }
    virtual int_type overflow(int_type ch = traits_type::eof()) override
    {
        if (pptr() != pbase())
        {
            size_t count = pptr() - pbase();
            write_blob(pbase(), count);
            //auto intersection = interval_intersection(m_pos, egptr() - eback(), m_pos, epptr() - pbase());
            //if (intersection.first != intersection.second)
            //{
            //    commit(intersection.first, intersection.second);
            //}
            m_pos += count;
            setp(pbase(), epptr());
        }
        if (!traits_type::eq_int_type(ch, traits_type::eof()))
        {
            char_type c = traits_type::to_char_type(ch);
            if (m_pos >= m_size)
                return traits_type::eof();
            write_blob(&c, 1);
            //auto intersection = interval_intersection(m_pos, egptr() - eback(), m_pos, 1);
            //if (intersection.first != intersection.second)
            //{
            //    eback()[intersection.first - m_pos] = c;
            //}
            m_pos += 1;
        }
        return ch;
    }
    virtual int_type pbackfail(int_type c = traits_type::eof()) override
    {
        if (gptr() == 0
            || gptr() <= eback()
            || (!traits_type::eq_int_type(traits_type::eof(), c)
                && !traits_type::eq(traits_type::to_char_type(c), gptr()[-1])))
        {
            return (traits_type::eof());    // can't put back, fail
        }
        else
        {    // back up one position and store put-back character
            gbump(-1);
            if (!traits_type::eq_int_type(traits_type::eof(), c))
                *gptr() = traits_type::to_char_type(c);
            return (traits_type::not_eof(c));
        }
    }
private:
    off_type seekoff(off_type position, off_type off, std::ios_base::seekdir dir)
    {
        off_type result = 0;
        switch (dir)
        {
        case std::ios_base::beg:
            result = off;
            break;
        case std::ios_base::cur:
            result = position + off;
            break;
        case std::ios_base::end:
            result = m_size - off;
        }
        if (result > m_size)
            result = m_size;
        return result;
    }
    pos_type gabs() const // absolute offset of input pointer in blob field
    {
        return m_pos + off_type(gptr() - eback());
    }
    pos_type pabs() const // absolute offset of output pointer in blob field
    {
        return m_pos + off_type(pptr() - pbase());
    }
protected:
    std::vector<char> m_buf;
    pos_type m_size;
    pos_type m_pos;    //position in the input sequence
    virtual bool read_blob(char* buffer, off_type& count, pos_type position) = 0;
    virtual void write_blob(const char* buffer, size_t count) = 0;
    void init_buffer(std::ios_base::openmode mode)
    {
        size_t bufsize;
        if (m_size > 0)
            bufsize = std::min<size_t>(blob_buffer_size, m_size);
        else
            bufsize = blob_buffer_size;
        if (mode&std::ios_base::in)
        {
            m_buf.resize(bufsize);
            m_pos = 0;
            setg(m_buf.data(), m_buf.data(), m_buf.data());
        }
        else if (mode&std::ios_base::out)
        {
            m_buf.resize(bufsize);
            m_pos = 0;
            setp(m_buf.data(), m_buf.data() + bufsize);
        }
    }
};
typedef std::function<void(std::ostream&)> blob_writer;
template<typename Database>
struct transaction
{
include/qtl_mysql.hpp
@@ -218,7 +218,7 @@
    std::string m_errmsg;
};
class blobbuf : public std::streambuf
class blobbuf : public qtl::blobbuf
{
public:
    blobbuf() : m_stmt(nullptr), m_field(0) 
@@ -226,10 +226,7 @@
    }
    blobbuf(const blobbuf&) = default;
    blobbuf& operator=(const blobbuf&) = default;
    virtual ~blobbuf()
    {
        overflow();
    }
    virtual ~blobbuf() { overflow();  }
    void open(MYSQL_STMT* stmt, int field, const binder& b, std::ios_base::openmode mode)
    {
@@ -242,211 +239,41 @@
        m_stmt = stmt;
        m_field = field;
        m_binder = b;
        size_t bufsize;
        if (b.length) m_size = *b.length;
        if (m_size > 0)
            bufsize = std::min<size_t>(blob_buffer_size, m_size);
        else
            bufsize = blob_buffer_size;
        if (mode&std::ios_base::in)
        {
            m_buf.resize(bufsize);
            m_pos = 0;
            setg(m_buf.data(), m_buf.data(), m_buf.data());
        }
        else if (mode&std::ios_base::out)
        {
            m_buf.resize(bufsize);
            m_pos = 0;
            setp(m_buf.data(), m_buf.data() + bufsize);
        }
        init_buffer(mode);
    }
protected:
    virtual pos_type seekoff(off_type off, std::ios_base::seekdir dir,
        std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override
    {
        if (which&std::ios_base::in)
        {
            pos_type pos = 0;
            pos = seekoff(m_pos, off, dir);
            return seekpos(pos, which);
        }
        return std::streambuf::seekoff(off, dir, which);
    }
    virtual pos_type seekpos(pos_type pos,
        std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override
    {
        if (pos >= m_size)
            return pos_type(off_type(-1));
        if (which&std::ios_base::out)
        {
            if (pos < m_pos || pos >= m_pos + off_type(egptr() - pbase()))
            {
                overflow();
                m_pos = pos;
                setp(m_buf.data(), m_buf.data() + m_buf.size());
            }
            else
            {
                pbump(off_type(pos - pabs()));
            }
        }
        else if (which&std::ios_base::in)
        {
            if (pos < m_pos || pos >= m_pos + off_type(epptr() - eback()))
            {
                m_pos = pos;
                setg(m_buf.data(), m_buf.data(), m_buf.data());
            }
            else
            {
                gbump(off_type(pos - gabs()));
            }
        }
        return pos;
    }
    virtual std::streamsize showmanyc() override
    {
        return m_size - pabs();
    }
    virtual int_type underflow() override
    {
        if (pptr() > pbase())
            overflow();
        off_type count = egptr() - eback();
        pos_type next_pos=0;
        if (count == 0 && eback() == m_buf.data())
        {
            setg(m_buf.data(), m_buf.data(), m_buf.data() + m_buf.size());
            count = m_buf.size();
        }
        else
        {
            next_pos = m_pos + pos_type(count);
        }
        if (next_pos >= m_size)
            return traits_type::eof();
        count = std::min(count, m_size - next_pos);
        m_binder.buffer = m_buf.data();
        m_binder.buffer_length = count;
        m_pos = next_pos;
        int ret = mysql_stmt_fetch_column(m_stmt, &m_binder,  m_field, m_pos);
        switch (ret)
        {
        case 0:
            count = std::min(m_binder.buffer_length,  *m_binder.length);
            setg(eback(), eback(), eback() + count);
            return traits_type::to_int_type(*gptr());
        case CR_NO_DATA:
            return traits_type::eof();
        default:
            throw error(mysql_stmt_errno(m_stmt), mysql_stmt_error(m_stmt));
        }
    }
    virtual int_type overflow(int_type ch = traits_type::eof()) override
    {
        if (pptr() != pbase())
        {
            size_t count = pptr() - pbase();
            int ret = mysql_stmt_send_long_data(m_stmt, m_field, pbase(), count);
            if (ret != 0)
                throw error(mysql_stmt_errno(m_stmt), mysql_stmt_error(m_stmt));
            //auto intersection = interval_intersection(m_pos, egptr() - eback(), m_pos, epptr() - pbase());
            //if (intersection.first != intersection.second)
            //{
            //    commit(intersection.first, intersection.second);
            //}
            m_pos += count;
            setp(pbase(), epptr());
        }
        if (!traits_type::eq_int_type(ch, traits_type::eof()))
        {
            char_type c = traits_type::to_char_type(ch);
            if (m_pos >= m_size)
                return traits_type::eof();
            int ret = mysql_stmt_send_long_data(m_stmt, m_field, &c, 1);
            if (ret != 0)
                throw error(mysql_stmt_errno(m_stmt), mysql_stmt_error(m_stmt));
            //auto intersection = interval_intersection(m_pos, egptr() - eback(), m_pos, 1);
            //if (intersection.first != intersection.second)
            //{
            //    eback()[intersection.first - m_pos] = c;
            //}
            m_pos += 1;
        }
        return ch;
    }
    virtual int_type pbackfail(int_type c = traits_type::eof()) override
    {
        if (gptr() == 0
            || gptr() <= eback()
            || (!traits_type::eq_int_type(traits_type::eof(), c)
                && !traits_type::eq(traits_type::to_char_type(c), gptr()[-1])))
        {
            return (traits_type::eof());    // can't put back, fail
        }
        else
        {    // back up one position and store put-back character
            gbump(-1);
            if (!traits_type::eq_int_type(traits_type::eof(), c))
                *gptr() = traits_type::to_char_type(c);
            return (traits_type::not_eof(c));
        }
    }
private:
    MYSQL_STMT* m_stmt;
    binder m_binder;
    int m_field;
    std::vector<char> m_buf;
    pos_type m_size;
    pos_type m_pos;    //position in the input sequence
    off_type seekoff(off_type position, off_type off, std::ios_base::seekdir dir)
protected:
    virtual bool read_blob(char* buffer, off_type& count, pos_type position) override
    {
        off_type result = 0;
        switch (dir)
        m_binder.buffer = buffer;
        m_binder.buffer_length = count;
        int ret = mysql_stmt_fetch_column(m_stmt, &m_binder, m_field, position);
        switch (ret)
        {
        case std::ios_base::beg:
            result = off;
            break;
        case std::ios_base::cur:
            result = position + off;
            break;
        case std::ios_base::end:
            result = m_size - off;
        case 0:
            count = std::min(m_binder.buffer_length, *m_binder.length);
            return true;
        case CR_NO_DATA:
            return false;
        default:
            throw error(mysql_stmt_errno(m_stmt), mysql_stmt_error(m_stmt));
        }
        if (result > m_size)
            result = m_size;
        return result;
    }
    pos_type gabs() const // absolute offset of input pointer in blob field
    virtual void write_blob(const char* buffer, size_t count) override
    {
        return m_pos + off_type(gptr() - eback());
    }
    pos_type pabs() const // absolute offset of output pointer in blob field
    {
        return m_pos + off_type(pptr() - pbase());
        int ret = mysql_stmt_send_long_data(m_stmt, m_field, buffer, count);
        if (ret != 0)
            throw error(mysql_stmt_errno(m_stmt), mysql_stmt_error(m_stmt));
    }
};
typedef std::function<void(std::ostream&)> blob_writer;
class base_statement
{
include/qtl_odbc.hpp
@@ -10,6 +10,7 @@
#include <assert.h>
#include <malloc.h>
#include <limits.h>
#include <stdint.h>
#if !defined(_WIN32) || defined(__MINGW32__)
#include <sys/time.h>
@@ -81,12 +82,62 @@
        }
    }
protected:
    SQLHANDLE m_handle;
    void verify_error(SQLINTEGER code) const
    {
        if(code<0)
            throw odbc::error(*this, code);
        if (code < 0)
            throw odbc::error(*this, code);
    }
protected:
    SQLHANDLE m_handle;
};
class blobbuf : public qtl::blobbuf
{
public:
    blobbuf() : m_stmt(nullptr), m_field(0)
    {
    }
    blobbuf(const blobbuf&) = default;
    blobbuf& operator=(const blobbuf&) = default;
    virtual ~blobbuf() { overflow(); }
    void open(object<SQL_HANDLE_STMT>* stmt, SQLSMALLINT field, std::ios_base::openmode mode)
    {
        if (m_stmt && m_field)
        {
            overflow();
        }
        assert(stmt != SQL_NULL_HANDLE);
        m_stmt = stmt;
        m_field = field;
        m_size = INTMAX_MAX;
        init_buffer(mode);
    }
private:
    object<SQL_HANDLE_STMT>* m_stmt;
    SQLSMALLINT m_field;
protected:
    virtual bool read_blob(char* buffer, off_type& count, pos_type position) override
    {
        SQLLEN indicator=0;
        SQLRETURN ret = SQLGetData(m_stmt->handle(), m_field + 1, SQL_C_BINARY, buffer, count, const_cast<SQLLEN*>(&indicator));
        if (ret != SQL_NO_DATA)
        {
            count = (indicator > count) || (indicator == SQL_NO_TOTAL) ?
                count : indicator;
            m_stmt->verify_error(ret);
            return true;
        }
        else return false;
    }
    virtual void write_blob(const char* buffer, size_t count) override
    {
        m_stmt->verify_error(SQLPutData(m_stmt->handle(), (SQLPOINTER)buffer, count));
    }
};
@@ -281,6 +332,21 @@
        };
    }
    void bind_param(SQLUSMALLINT index, const blob_writer& param)
    {
        m_params[index].m_data = nullptr;
        m_params[index].m_size = blob_buffer_size;
        m_params[index].m_indicator = SQL_LEN_DATA_AT_EXEC(m_params[index].m_size);
        verify_error(SQLBindParameter(m_handle, index + 1, SQL_PARAM_INPUT, SQL_C_BINARY, SQL_LONGVARBINARY,
            INT_MAX, 0, &m_params[index], 0, &m_params[index].m_indicator));
        m_params[index].m_after_fetch = [this, index, &param](const param_data& b) {
            blobbuf buf;
            buf.open(this, index, std::ios::out);
            std::ostream s(&buf);
            param(s);
        };
    }
    void bind_field(SQLUSMALLINT index, bool&& v)
    {
        verify_error(SQLBindCol(m_handle, index+1, SQL_C_BIT, &v, 0, &m_params[index].m_indicator));
@@ -422,6 +488,15 @@
        };
    }
    void bind_field(size_t index, blobbuf&& value)
    {
        m_params[index].m_data = nullptr;
        m_params[index].m_size = 0;
        m_params[index].m_after_fetch = [this, index, &value](const param_data& p) {
            value.open(this, index, std::ios::in);
        };
    }
    template<typename Type>
    void bind_field(size_t index, indicator<Type>&& value)
    {
test/TestMysql.cpp
@@ -263,7 +263,7 @@
    try
    {
        qtl::mysql::blob_writer writer = [](std::ostream& s) {
        qtl::blob_writer writer = [](std::ostream& s) {
            for (size_t i = 0; i != 100; i++)
            {
                s << i << ": ";
test/TestOdbc.cpp
@@ -29,7 +29,7 @@
    template<>
    inline void bind_record<qtl::odbc::statement, TestOdbcRecord>(qtl::odbc::statement& command, TestOdbcRecord&& v)
    {
        qtl::bind_field(command, 0, v.id);
        qtl::bind_field(command, (size_t)0, v.id);
        qtl::bind_field(command, 1, v.name);
        qtl::bind_field(command, 2, v.create_time);
    }
@@ -38,6 +38,7 @@
TestOdbc::TestOdbc() : m_db(m_env)
{
    m_db.open("DRIVER={SQL Server};SERVER=(local);UID=;PWD=;Trusted_Connection=yes;DATABASE=test");
    //m_db.open("DRIVER={SQL Server};SERVER=(local);UID=;PWD=;Trusted_Connection=no;DATABASE=test;UID=sa;PWD=111111;");
    cout<<"DBMS: "<<m_db.dbms_name()<<endl;
    cout<<"SERVER: "<<m_db.server_name()<<endl;
    cout<<"USER: "<<m_db.user_name()<<endl;
@@ -53,6 +54,8 @@
    TEST_ADD(TestOdbc::test_iterator)
    TEST_ADD(TestOdbc::test_insert_blob)
    TEST_ADD(TestOdbc::test_select_blob)
    TEST_ADD(TestOdbc::test_insert_stream)
    TEST_ADD(TestOdbc::test_fetch_stream)
}
void TestOdbc::test_dual()
@@ -230,6 +233,62 @@
        ASSERT_EXCEPTION(e);
    }
}
void TestOdbc::test_insert_stream()
{
    try
    {
        qtl::blob_writer writer = [](std::ostream& s) {
            for (size_t i = 0; i != 100; i++)
            {
                s << i << ": ";
                for (size_t j = 0; j <= i; j++)
                    s << char('a' + j % 26);
                s << endl;
                for (size_t j = 0; j <= i; j++)
                    s << '-';
                s << endl;
            }
        };
        m_db.execute("INSERT INTO test_stream ([Data]) values(?)",
            writer);
        m_db.query_first("SELECT @@IDENTITY", id);
    }
    catch (qtl::odbc::error& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestOdbc::test_fetch_stream()
{
    try
    {
        m_db.query("SELECT Data from test_stream", [](qtl::odbc::blobbuf&& buf) {
            istream s(&buf);
            string str;
            while (!s.eof())
            {
                getline(s, str);
                cout << str << endl;
            }
            s.clear(ios_base::goodbit | ios_base::eofbit);
            s.seekg(0, ios::beg);
            if (s.good())
            {
                cout << "again:" << endl;
                while (!s.eof())
                {
                    getline(s, str);
                    cout << str << endl;
                }
            }
        });
    }
    catch (qtl::odbc::error& e)
    {
        ASSERT_EXCEPTION(e);
    }
}
void TestOdbc::get_md5(std::istream& is, unsigned char* result)
{
@@ -249,6 +308,7 @@
    cout<<hex;
    for(size_t i=0; i!=n; i++)
        cout<<(data[i]&0xFF);
    cout<<dec;
}
int main(int argc, char* argv[])
test/TestOdbc.h
@@ -27,6 +27,8 @@
    void test_iterator();
    void test_insert_blob();
    void test_select_blob();
    void test_insert_stream();
    void test_fetch_stream();
private:
    uint64_t id;