mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 10:33:27 +08:00
Refactor net lib functionality and fix build issues
This makes the following changes related to the net lib: - A CMake run now fails in case OpenSSL is not found when the local PostgreSQL installation has been compiled with SSL enabled. The reason why it is best to fail the CMake run in this case is because USE_OPENSSL will be defined and 1 in `pg_config.h` and thus we will compile with SSL support. - Use palloc/pfree in connection library. - Split net library into separate source files. The net library is refactored so that the code for different connection types live in their separate source files. In particular, the source code for mock connections is now moved to `test/src/net`. - The generate_typedefs.sh script now runs in subdirectories so that source files in those subdirectories are properly pgindented. - The pgindent target previously did not cover files under `test/src`. This is now fixed. An exclude file has also been added that avoids running pgindent in hidden directories. This fixes issues with, e.g., trying to indent files in cquery caches. - Fix formatting with pgindent Fix parameter types in HTTP lib. Add `const` to parameter types, such as strings. Use `size_t` for length parameters - Parse and validate the status line of HTTP responses. The beginning of HTTP responses weren't properly parsed and validated, allowing invalid character sequences at the start of requests.
This commit is contained in:
parent
45a2b76d46
commit
4f6b92ab37
@ -239,3 +239,24 @@ configure_file(${EXT_CONTROL_FILE}.in ${EXT_CONTROL_FILE})
|
||||
install(
|
||||
FILES ${CMAKE_CURRENT_BINARY_DIR}/${EXT_CONTROL_FILE}
|
||||
DESTINATION "${PG_SHAREDIR}/extension")
|
||||
|
||||
find_program(PGINDENT pgindent
|
||||
HINTS ${PG_SOURCE_DIR}
|
||||
PATH_SUFFIXES src/tools/pgindent
|
||||
DOC "Format C code according to PostgreSQL standards")
|
||||
|
||||
# Configuration for running pgindent
|
||||
if (PGINDENT)
|
||||
message(STATUS "Using pgindent ${PGINDENT}")
|
||||
file(WRITE ${CMAKE_BINARY_DIR}/pgindent_excludes "\\..*/\n")
|
||||
|
||||
add_custom_command(OUTPUT typedefs.list
|
||||
DEPENDS ${PROJECT_NAME}
|
||||
COMMAND sh ${PROJECT_BINARY_DIR}/scripts/generate_typedefs.sh > ${PROJECT_BINARY_DIR}/typedefs.list)
|
||||
add_custom_target(pgindent
|
||||
COMMAND ${PGINDENT} -typedefs typedefs.list -excludes=${PROJECT_BINARY_DIR}/pgindent_excludes -code-base=${PROJECT_SOURCE_DIR}/src
|
||||
COMMAND ${PGINDENT} -typedefs typedefs.list -excludes=${PROJECT_BINARY_DIR}/pgindent_excludes -code-base=${PROJECT_SOURCE_DIR}/test/src
|
||||
DEPENDS ${PROJECT_BINARY_DIR}/typedefs.list)
|
||||
else ()
|
||||
message(STATUS "Install pgindent to be able to format C code: https://github.com/postgres/postgres/tree/master/src/tools/pgindent")
|
||||
endif (PGINDENT)
|
||||
|
@ -1,11 +1,20 @@
|
||||
set(CMAKE_C_FLAGS_DEBUG "-DUSE_ASSERT_CHECKING=1 -DDEBUG=1")
|
||||
|
||||
INCLUDE (FindOpenSSL)
|
||||
file(STRINGS ${PG_INCLUDEDIR}/pg_config.h USE_OPENSSL REGEX "define USE_OPENSSL ")
|
||||
# Check if PostgreSQL has OpenSSL enabled
|
||||
file(STRINGS ${PG_INCLUDEDIR}/pg_config.h PG_USE_OPENSSL REGEX "#define USE_OPENSSL [01]")
|
||||
string(REGEX REPLACE "#define USE_OPENSSL ([01])" "\\1" USE_OPENSSL ${PG_USE_OPENSSL})
|
||||
|
||||
if (OPENSSL_FOUND AND USE_OPENSSL)
|
||||
include_directories ( ${OPENSSL_INCLUDE_DIR} )
|
||||
endif (OPENSSL_FOUND AND USE_OPENSSL)
|
||||
# If OpenSSL couldn't be found, we need to disable usage of OpenSSL
|
||||
if (USE_OPENSSL)
|
||||
# Try to find a local OpenSSL installation
|
||||
include(FindOpenSSL)
|
||||
|
||||
if (NOT OPENSSL_FOUND)
|
||||
message(FATAL_ERROR "PostgreSQL was compiled with SSL support, but OpenSSL was not found")
|
||||
endif (NOT OPENSSL_FOUND)
|
||||
|
||||
include_directories(${OPENSSL_INCLUDE_DIR})
|
||||
endif (USE_OPENSSL)
|
||||
|
||||
if (UNIX)
|
||||
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -L${PG_LIBDIR}")
|
||||
@ -131,10 +140,6 @@ set(SOURCES
|
||||
utils.c
|
||||
version.c)
|
||||
|
||||
if (NOT PROJECT_INSTALL_METHOD)
|
||||
set(PROJECT_INSTALL_METHOD source)
|
||||
endif (NOT PROJECT_INSTALL_METHOD)
|
||||
|
||||
configure_file(version.h.in version.h)
|
||||
set(GITCOMMIT_H ${CMAKE_CURRENT_BINARY_DIR}/gitcommit.h)
|
||||
|
||||
@ -161,30 +166,12 @@ else ()
|
||||
VERBATIM)
|
||||
endif (WIN32)
|
||||
|
||||
find_program(PGINDENT pgindent
|
||||
HINTS ${PG_SOURCE_DIR}
|
||||
PATH_SUFFIXES src/tools/pgindent
|
||||
DOC "Format C code according to PostgreSQL standards")
|
||||
|
||||
if (PGINDENT)
|
||||
message(STATUS "Using pgindent ${PGINDENT}")
|
||||
else ()
|
||||
message(STATUS "Install pgindent to be able to format C code: https://github.com/postgres/postgres/tree/master/src/tools/pgindent")
|
||||
endif (PGINDENT)
|
||||
|
||||
# Configuration for running pgindent
|
||||
if (PGINDENT)
|
||||
add_custom_command(OUTPUT typedefs.list
|
||||
DEPENDS ${PROJECT_NAME}
|
||||
COMMAND sh ${CMAKE_BINARY_DIR}/scripts/generate_typedefs.sh > typedefs.list)
|
||||
add_custom_target(pgindent
|
||||
COMMAND ${PGINDENT} -typedefs typedefs.list -code-base ${CMAKE_SOURCE_DIR}/src/
|
||||
COMMAND ${PGINDENT} -typedefs typedefs.list -code-base ${CMAKE_SOURCE_DIR}/test/src/
|
||||
DEPENDS typedefs.list)
|
||||
endif (PGINDENT)
|
||||
|
||||
add_library(${PROJECT_NAME} MODULE ${SOURCES} ${TEST_SOURCES} ${HEADERS} ${GITCOMMIT_H})
|
||||
|
||||
if (OPENSSL_FOUND AND USE_OPENSSL)
|
||||
target_link_libraries(${PROJECT_NAME} ${OPENSSL_LIBRARIES})
|
||||
endif (OPENSSL_FOUND AND USE_OPENSSL)
|
||||
|
||||
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
|
||||
add_subdirectory(bgw)
|
||||
add_subdirectory(net)
|
||||
|
30
src/init.c
30
src/init.c
@ -37,8 +37,18 @@ extern void _process_utility_fini(void);
|
||||
extern void _event_trigger_init(void);
|
||||
extern void _event_trigger_fini(void);
|
||||
|
||||
extern void _connection_init(void);
|
||||
extern void _connection_fini(void);
|
||||
extern void _conn_plain_init();
|
||||
extern void _conn_plain_fini();
|
||||
|
||||
#ifdef USE_OPENSSL
|
||||
extern void _conn_ssl_init();
|
||||
extern void _conn_ssl_fini();
|
||||
#endif
|
||||
|
||||
#ifdef DEBUG
|
||||
extern void _conn_mock_init();
|
||||
extern void _conn_mock_fini();
|
||||
#endif
|
||||
|
||||
extern void PGDLLEXPORT _PG_init(void);
|
||||
extern void PGDLLEXPORT _PG_fini(void);
|
||||
@ -61,7 +71,13 @@ _PG_init(void)
|
||||
_event_trigger_init();
|
||||
_process_utility_init();
|
||||
_guc_init();
|
||||
_connection_init();
|
||||
_conn_plain_init();
|
||||
#ifdef USE_OPENSSL
|
||||
_conn_ssl_init();
|
||||
#endif
|
||||
#ifdef DEBUG
|
||||
_conn_mock_init();
|
||||
#endif
|
||||
}
|
||||
|
||||
void
|
||||
@ -71,7 +87,13 @@ _PG_fini(void)
|
||||
* Order of items should be strict reverse order of _PG_init. Please
|
||||
* document any exceptions.
|
||||
*/
|
||||
_connection_fini();
|
||||
#ifdef DEBUG
|
||||
_conn_mock_fini();
|
||||
#endif
|
||||
#ifdef USE_OPENSSL
|
||||
_conn_ssl_fini();
|
||||
#endif
|
||||
_conn_plain_fini();
|
||||
_guc_fini();
|
||||
_process_utility_fini();
|
||||
_event_trigger_fini();
|
||||
|
@ -1,7 +1,9 @@
|
||||
# Add all *.c to sources in upperlevel directory
|
||||
set(SOURCES
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/conn.c"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/conn_plain.c"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/conn_ssl.c"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/http_request.c"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/http_response.c"
|
||||
)
|
||||
|
||||
target_sources(${PROJECT_NAME} PRIVATE ${SOURCES})
|
||||
|
384
src/net/conn.c
384
src/net/conn.c
@ -7,358 +7,57 @@
|
||||
#include <postgres.h>
|
||||
#include <pg_config.h>
|
||||
|
||||
#include "conn.h"
|
||||
#include "conn_internal.h"
|
||||
|
||||
#define DEFAULT_TIMEOUT_SEC 3
|
||||
#define MOCK_MAX_BUF_SIZE 1024
|
||||
static ConnOps *conn_ops[_CONNECTION_MAX] = {NULL};
|
||||
|
||||
typedef struct ConnOps
|
||||
{
|
||||
int (*connect) (Connection *conn, const char *host, int port);
|
||||
void (*close) (Connection *conn);
|
||||
ssize_t (*write) (Connection *conn, const char *buf, size_t writelen);
|
||||
ssize_t (*read) (Connection *conn, char *buf, size_t readlen);
|
||||
} ConnOps;
|
||||
|
||||
|
||||
/* Create socket and connect */
|
||||
static int
|
||||
plain_connect(Connection *conn, const char *host, int port)
|
||||
{
|
||||
struct addrinfo *server_ip;
|
||||
struct sockaddr_in serv_info;
|
||||
struct sockaddr_in *temp;
|
||||
struct timeval timeouts = {
|
||||
.tv_sec = DEFAULT_TIMEOUT_SEC,
|
||||
};
|
||||
int ret;
|
||||
|
||||
ret = socket(AF_INET, SOCK_STREAM, 0);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not create a socket");
|
||||
else
|
||||
conn->sock = ret;
|
||||
|
||||
/*
|
||||
* Set send / recv timeout so that write and read don't block forever.
|
||||
* Set separately so that one of the actions failing doesn't block the other.
|
||||
*/
|
||||
if (setsockopt(conn->sock, SOL_SOCKET, SO_RCVTIMEO, &timeouts, sizeof(struct timeval)) != 0)
|
||||
elog(ERROR, "connection library: could not set recv timeouts on SSL sockets");
|
||||
if (setsockopt(conn->sock, SOL_SOCKET, SO_SNDTIMEO, &timeouts, sizeof(struct timeval)) != 0)
|
||||
elog(ERROR, "connection library: could not set send timeouts on SSL sockets");
|
||||
|
||||
/* Lookup the endpoint ip address */
|
||||
if (getaddrinfo(host, NULL, NULL, &server_ip) < 0 || server_ip == NULL)
|
||||
elog(ERROR, "connection library: could not get IP of endpoint");
|
||||
memset(&serv_info, 0, sizeof(serv_info));
|
||||
serv_info.sin_family = AF_INET;
|
||||
serv_info.sin_port = htons(port);
|
||||
temp = (struct sockaddr_in *) (server_ip->ai_addr);
|
||||
|
||||
memcpy(&serv_info.sin_addr.s_addr, &temp->sin_addr.s_addr, sizeof(serv_info.sin_addr.s_addr));
|
||||
|
||||
freeaddrinfo(server_ip);
|
||||
|
||||
/* connect the socket */
|
||||
ret = connect(conn->sock, (struct sockaddr *) &serv_info, sizeof(serv_info));
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not connect to endpoint");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
plain_write(Connection *conn, const char *buf, size_t writelen)
|
||||
{
|
||||
int ret = send(conn->sock, buf, writelen, 0);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not send on a socket");
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
plain_read(Connection *conn, char *buf, size_t buflen)
|
||||
{
|
||||
int ret = recv(conn->sock, buf, buflen, 0);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not read from a socket");
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
plain_close(Connection *conn)
|
||||
{
|
||||
close(conn->sock);
|
||||
}
|
||||
|
||||
static ConnOps plain_ops = {
|
||||
.connect = plain_connect,
|
||||
.close = plain_close,
|
||||
.write = plain_write,
|
||||
.read = plain_read,
|
||||
static const char *conn_names[] = {
|
||||
[CONNECTION_PLAIN] = "PLAIN",
|
||||
[CONNECTION_SSL] = "SSL",
|
||||
[CONNECTION_MOCK] = "MOCK",
|
||||
};
|
||||
|
||||
static Connection *
|
||||
connection_internal_create(size_t size, ConnOps *ops)
|
||||
connection_internal_create(ConnectionType type, ConnOps *ops)
|
||||
{
|
||||
Connection *conn = malloc(size);
|
||||
Connection *conn = palloc(ops->size);
|
||||
|
||||
if (NULL == conn)
|
||||
return NULL;
|
||||
|
||||
memset(conn, 0, size);
|
||||
memset(conn, 0, ops->size);
|
||||
conn->ops = ops;
|
||||
conn->type = type;
|
||||
|
||||
return conn;
|
||||
}
|
||||
|
||||
static Connection *
|
||||
connection_create_plain()
|
||||
{
|
||||
return connection_internal_create(sizeof(Connection), &plain_ops);
|
||||
}
|
||||
|
||||
#ifdef USE_OPENSSL
|
||||
|
||||
typedef struct SSLConnection
|
||||
{
|
||||
Connection conn;
|
||||
SSL_CTX *ssl_ctx;
|
||||
SSL *ssl;
|
||||
} SSLConnection;
|
||||
|
||||
|
||||
static int
|
||||
ssl_setup(SSLConnection *conn)
|
||||
{
|
||||
int ret;
|
||||
|
||||
conn->ssl_ctx = SSL_CTX_new(SSLv23_method());
|
||||
|
||||
if (NULL == conn->ssl_ctx)
|
||||
elog(ERROR, "connection library: could not create SSL context");
|
||||
|
||||
SSL_CTX_set_options(conn->ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
|
||||
|
||||
/*
|
||||
* Because we have a blocking socket, we don't want to be bothered with
|
||||
* retries.
|
||||
*/
|
||||
SSL_CTX_set_mode(conn->ssl_ctx, SSL_MODE_AUTO_RETRY);
|
||||
|
||||
ERR_clear_error();
|
||||
/* Clear the SSL error before next SSL_ * call */
|
||||
conn->ssl = SSL_new(conn->ssl_ctx);
|
||||
|
||||
if (conn->ssl == NULL)
|
||||
elog(ERROR, "connection library: could not create SSL connection");
|
||||
ERR_clear_error();
|
||||
|
||||
ret = SSL_set_fd(conn->ssl, conn->conn.sock);
|
||||
if (ret == 0)
|
||||
elog(ERROR, "connection library: could not associate socket with SSL connection");
|
||||
|
||||
ret = SSL_connect(conn->ssl);
|
||||
if (ret <= 0)
|
||||
elog(ERROR, "connection library: could not make SSL connection");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
ssl_connect(Connection *conn, const char *host, int port)
|
||||
{
|
||||
int ret;
|
||||
|
||||
/* First do the base connection setup */
|
||||
ret = plain_connect(conn, host, port);
|
||||
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
ret = ssl_setup((SSLConnection *) conn);
|
||||
|
||||
if (ret < 0)
|
||||
close(conn->sock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
ssl_write(Connection *conn, const char *buf, size_t writelen)
|
||||
{
|
||||
SSLConnection *sslconn = (SSLConnection *) conn;
|
||||
|
||||
int ret = SSL_write(sslconn->ssl, buf, writelen);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not SSL_write");
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
ssl_read(Connection *conn, char *buf, size_t buflen)
|
||||
{
|
||||
SSLConnection *sslconn = (SSLConnection *) conn;
|
||||
|
||||
int ret = SSL_read(sslconn->ssl, buf, buflen);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not SSL_read");
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
ssl_close(Connection *conn)
|
||||
{
|
||||
SSLConnection *sslconn = (SSLConnection *) conn;
|
||||
|
||||
if (sslconn->ssl != NULL)
|
||||
{
|
||||
SSL_free(sslconn->ssl);
|
||||
sslconn->ssl = NULL;
|
||||
}
|
||||
|
||||
if (sslconn->ssl_ctx != NULL)
|
||||
{
|
||||
SSL_CTX_free(sslconn->ssl_ctx);
|
||||
sslconn->ssl_ctx = NULL;
|
||||
}
|
||||
|
||||
plain_close(conn);
|
||||
}
|
||||
|
||||
static ConnOps ssl_ops = {
|
||||
.connect = ssl_connect,
|
||||
.close = ssl_close,
|
||||
.write = ssl_write,
|
||||
.read = ssl_read,
|
||||
};
|
||||
|
||||
#endif /* USE_OPENSSL */
|
||||
|
||||
|
||||
static Connection *
|
||||
connection_create_ssl()
|
||||
{
|
||||
#ifdef USE_OPENSSL
|
||||
return connection_internal_create(sizeof(SSLConnection), &ssl_ops);
|
||||
#else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("SSL connections are not supported"),
|
||||
errhint("Enable SSL support when compiling the extension.")));
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
|
||||
typedef struct MockConnection
|
||||
{
|
||||
Connection conn;
|
||||
char recv_buf[MOCK_MAX_BUF_SIZE];
|
||||
int recv_buf_offset;
|
||||
int recv_buf_len;
|
||||
} MockConnection;
|
||||
|
||||
static int
|
||||
mock_connect(Connection *conn, const char *host, int port)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
mock_close(Connection *conn)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
mock_write(Connection *conn, const char *buf, size_t writelen)
|
||||
{
|
||||
return writelen;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
mock_read(Connection *conn, char *buf, size_t readlen)
|
||||
{
|
||||
size_t bytes_to_read = 0;
|
||||
size_t max = readlen;
|
||||
MockConnection *mock = (MockConnection *) conn;
|
||||
|
||||
if (mock->recv_buf_offset >= mock->recv_buf_len)
|
||||
return 0;
|
||||
|
||||
if (max >= mock->recv_buf_len - mock->recv_buf_offset)
|
||||
max = mock->recv_buf_len - mock->recv_buf_offset;
|
||||
|
||||
/* Now read a random amount */
|
||||
while (bytes_to_read == 0)
|
||||
{
|
||||
bytes_to_read = rand() % (max + 1);
|
||||
}
|
||||
memcpy(buf, mock->recv_buf + mock->recv_buf_offset, bytes_to_read);
|
||||
mock->recv_buf_offset += bytes_to_read;
|
||||
|
||||
return bytes_to_read;
|
||||
}
|
||||
|
||||
static ConnOps mock_ops = {
|
||||
.connect = mock_connect,
|
||||
.close = mock_close,
|
||||
.write = mock_write,
|
||||
.read = mock_read,
|
||||
};
|
||||
|
||||
static Connection *
|
||||
connection_create_mock()
|
||||
{
|
||||
srand(time(0));
|
||||
return connection_internal_create(sizeof(MockConnection), &mock_ops);
|
||||
}
|
||||
|
||||
/* Public API */
|
||||
|
||||
ssize_t
|
||||
connection_mock_set_recv_buf(Connection *conn, char *buf, size_t buf_len)
|
||||
{
|
||||
if (buf_len > MOCK_MAX_BUF_SIZE)
|
||||
return -1;
|
||||
MockConnection *mock = (MockConnection *) conn;
|
||||
|
||||
memcpy(mock->recv_buf, buf, buf_len);
|
||||
mock->recv_buf_len = buf_len;
|
||||
return mock->recv_buf_len;
|
||||
}
|
||||
#endif
|
||||
|
||||
Connection *
|
||||
connection_create(ConnectionType type)
|
||||
{
|
||||
Connection *ret = NULL;
|
||||
Connection *conn;
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case CONNECTION_PLAIN:
|
||||
ret = connection_create_plain();
|
||||
break;
|
||||
case CONNECTION_SSL:
|
||||
ret = connection_create_ssl();
|
||||
break;
|
||||
#ifdef DEBUG
|
||||
case CONNECTION_MOCK:
|
||||
ret = connection_create_mock();
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
if (ret == NULL)
|
||||
return NULL;
|
||||
ret->type = type;
|
||||
if (type == _CONNECTION_MAX)
|
||||
elog(ERROR, "invalid connection type");
|
||||
|
||||
return ret;
|
||||
if (NULL == conn_ops[type])
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("%s connections are not supported", conn_names[type]),
|
||||
errhint("Enable %s support when compiling the extension.", conn_names[type])));
|
||||
|
||||
conn = connection_internal_create(type, conn_ops[type]);
|
||||
|
||||
Assert(NULL != conn);
|
||||
|
||||
if (NULL != conn->ops->init)
|
||||
if (conn->ops->init(conn) < 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
errmsg("%s connection could not be initialized",
|
||||
conn_names[type])));
|
||||
|
||||
return conn;
|
||||
}
|
||||
|
||||
int
|
||||
@ -375,7 +74,7 @@ connection_write(Connection *conn, const char *buf, size_t writelen)
|
||||
bytes = conn->ops->write(conn, buf, writelen);
|
||||
|
||||
if (bytes <= 0 || bytes != writelen)
|
||||
elog(ERROR, "connection library: could not write");
|
||||
elog(ERROR, "could not write");
|
||||
|
||||
return bytes;
|
||||
}
|
||||
@ -401,23 +100,14 @@ connection_destroy(Connection *conn)
|
||||
|
||||
connection_close(conn);
|
||||
conn->ops = NULL;
|
||||
free(conn);
|
||||
pfree(conn);
|
||||
}
|
||||
|
||||
void
|
||||
_connection_init(void)
|
||||
connection_register(ConnectionType type, ConnOps *ops)
|
||||
{
|
||||
#ifdef USE_OPENSSL
|
||||
SSL_library_init();
|
||||
/* Always returns 1 */
|
||||
SSL_load_error_strings();
|
||||
#endif
|
||||
}
|
||||
if (type == _CONNECTION_MAX)
|
||||
elog(ERROR, "invalid connection type");
|
||||
|
||||
void
|
||||
_connection_fini(void)
|
||||
{
|
||||
#ifdef USE_OPENSSL
|
||||
ERR_free_strings();
|
||||
#endif
|
||||
conn_ops[type] = ops;
|
||||
}
|
||||
|
@ -2,20 +2,14 @@
|
||||
#include <stdlib.h>
|
||||
#include <pg_config.h>
|
||||
|
||||
#ifdef USE_OPENSSL
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
#endif
|
||||
|
||||
typedef struct ConnOps ConnOps;
|
||||
|
||||
typedef enum ConnectionType
|
||||
{
|
||||
CONNECTION_PLAIN,
|
||||
CONNECTION_SSL,
|
||||
#ifdef DEBUG
|
||||
CONNECTION_MOCK,
|
||||
#endif
|
||||
_CONNECTION_MAX,
|
||||
} ConnectionType;
|
||||
|
||||
typedef struct Connection
|
||||
@ -32,11 +26,6 @@ extern ssize_t connection_write(Connection *conn, const char *buf, size_t writel
|
||||
extern void connection_close(Connection *conn);
|
||||
extern void connection_destroy(Connection *conn);
|
||||
|
||||
#ifdef DEBUG
|
||||
/* Special functions for a connection mocker */
|
||||
extern ssize_t connection_mock_set_recv_buf(Connection *conn, char *buf, size_t buflen);
|
||||
#endif
|
||||
|
||||
/* Called in init.c */
|
||||
extern void _connection_init(void);
|
||||
extern void _connection_fini(void);
|
||||
|
18
src/net/conn_internal.h
Normal file
18
src/net/conn_internal.h
Normal file
@ -0,0 +1,18 @@
|
||||
#ifndef TIMESCALEDB_CONN_INTERNAL_H
|
||||
#define TIMESCALEDB_CONN_INTERNAL_H
|
||||
|
||||
#include "conn.h"
|
||||
|
||||
typedef struct ConnOps
|
||||
{
|
||||
size_t size; /* Size of the connection object */
|
||||
int (*init) (Connection *conn);
|
||||
int (*connect) (Connection *conn, const char *host, int port);
|
||||
void (*close) (Connection *conn);
|
||||
ssize_t (*write) (Connection *conn, const char *buf, size_t writelen);
|
||||
ssize_t (*read) (Connection *conn, char *buf, size_t readlen);
|
||||
} ConnOps;
|
||||
|
||||
extern void connection_register(ConnectionType type, ConnOps *ops);
|
||||
|
||||
#endif /* TIMESCALEDB_CONN_INTERNAL_H */
|
111
src/net/conn_plain.c
Normal file
111
src/net/conn_plain.c
Normal file
@ -0,0 +1,111 @@
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <postgres.h>
|
||||
#include <pg_config.h>
|
||||
|
||||
#include "conn_internal.h"
|
||||
#include "conn_plain.h"
|
||||
|
||||
#define DEFAULT_TIMEOUT_SEC 3
|
||||
|
||||
/* Create socket and connect */
|
||||
int
|
||||
plain_connect(Connection *conn, const char *host, int port)
|
||||
{
|
||||
struct addrinfo *server_ip;
|
||||
struct sockaddr_in serv_info;
|
||||
struct sockaddr_in *temp;
|
||||
struct timeval timeouts = {
|
||||
.tv_sec = DEFAULT_TIMEOUT_SEC,
|
||||
};
|
||||
int ret;
|
||||
|
||||
ret = socket(AF_INET, SOCK_STREAM, 0);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not create a socket");
|
||||
else
|
||||
conn->sock = ret;
|
||||
|
||||
/*
|
||||
* Set send / recv timeout so that write and read don't block forever. Set
|
||||
* separately so that one of the actions failing doesn't block the other.
|
||||
*/
|
||||
if (setsockopt(conn->sock, SOL_SOCKET, SO_RCVTIMEO, &timeouts, sizeof(struct timeval)) != 0)
|
||||
elog(ERROR, "connection library: could not set recv timeouts on SSL sockets");
|
||||
if (setsockopt(conn->sock, SOL_SOCKET, SO_SNDTIMEO, &timeouts, sizeof(struct timeval)) != 0)
|
||||
elog(ERROR, "connection library: could not set send timeouts on SSL sockets");
|
||||
|
||||
/* Lookup the endpoint ip address */
|
||||
if (getaddrinfo(host, NULL, NULL, &server_ip) < 0 || server_ip == NULL)
|
||||
elog(ERROR, "connection library: could not get IP of endpoint");
|
||||
memset(&serv_info, 0, sizeof(serv_info));
|
||||
serv_info.sin_family = AF_INET;
|
||||
serv_info.sin_port = htons(port);
|
||||
temp = (struct sockaddr_in *) (server_ip->ai_addr);
|
||||
|
||||
memcpy(&serv_info.sin_addr.s_addr, &temp->sin_addr.s_addr, sizeof(serv_info.sin_addr.s_addr));
|
||||
|
||||
freeaddrinfo(server_ip);
|
||||
|
||||
/* connect the socket */
|
||||
ret = connect(conn->sock, (struct sockaddr *) &serv_info, sizeof(serv_info));
|
||||
if (ret < 0)
|
||||
elog(ERROR, "connection library: could not connect to endpoint");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
plain_write(Connection *conn, const char *buf, size_t writelen)
|
||||
{
|
||||
int ret = send(conn->sock, buf, writelen, 0);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "could not send on a socket");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
plain_read(Connection *conn, char *buf, size_t buflen)
|
||||
{
|
||||
int ret = recv(conn->sock, buf, buflen, 0);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "could not read from a socket");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
plain_close(Connection *conn)
|
||||
{
|
||||
close(conn->sock);
|
||||
}
|
||||
|
||||
static ConnOps plain_ops = {
|
||||
.size = sizeof(Connection),
|
||||
.init = NULL,
|
||||
.connect = plain_connect,
|
||||
.close = plain_close,
|
||||
.write = plain_write,
|
||||
.read = plain_read,
|
||||
};
|
||||
|
||||
extern void _conn_plain_init(void);
|
||||
extern void _conn_plain_fini(void);
|
||||
|
||||
void
|
||||
_conn_plain_init(void)
|
||||
{
|
||||
connection_register(CONNECTION_PLAIN, &plain_ops);
|
||||
}
|
||||
|
||||
void
|
||||
_conn_plain_fini(void)
|
||||
{
|
||||
}
|
9
src/net/conn_plain.h
Normal file
9
src/net/conn_plain.h
Normal file
@ -0,0 +1,9 @@
|
||||
#ifndef TIMESCALEDB_CONN_PLAIN_H
|
||||
#define TIMESCALEDB_CONN_PLAIN_H
|
||||
|
||||
typedef struct Connection Connection;
|
||||
|
||||
extern int plain_connect(Connection *conn, const char *host, int port);
|
||||
extern void plain_close(Connection *conn);
|
||||
|
||||
#endif /* TIMESCALEDB_CONN_PLAIN_H */
|
151
src/net/conn_ssl.c
Normal file
151
src/net/conn_ssl.c
Normal file
@ -0,0 +1,151 @@
|
||||
#include <postgres.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <pg_config.h>
|
||||
|
||||
#include <openssl/ssl.h>
|
||||
#include <openssl/err.h>
|
||||
|
||||
#include "conn_internal.h"
|
||||
#include "conn_plain.h"
|
||||
|
||||
typedef struct SSLConnection
|
||||
{
|
||||
Connection conn;
|
||||
SSL_CTX *ssl_ctx;
|
||||
SSL *ssl;
|
||||
} SSLConnection;
|
||||
|
||||
static int
|
||||
ssl_setup(SSLConnection *conn)
|
||||
{
|
||||
int ret;
|
||||
|
||||
conn->ssl_ctx = SSL_CTX_new(SSLv23_method());
|
||||
|
||||
if (NULL == conn->ssl_ctx)
|
||||
elog(ERROR, "could not create SSL context");
|
||||
|
||||
SSL_CTX_set_options(conn->ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
|
||||
|
||||
/*
|
||||
* Because we have a blocking socket, we don't want to be bothered with
|
||||
* retries.
|
||||
*/
|
||||
SSL_CTX_set_mode(conn->ssl_ctx, SSL_MODE_AUTO_RETRY);
|
||||
|
||||
ERR_clear_error();
|
||||
/* Clear the SSL error before next SSL_ * call */
|
||||
conn->ssl = SSL_new(conn->ssl_ctx);
|
||||
|
||||
if (conn->ssl == NULL)
|
||||
elog(ERROR, "could not create SSL connection");
|
||||
|
||||
ERR_clear_error();
|
||||
|
||||
ret = SSL_set_fd(conn->ssl, conn->conn.sock);
|
||||
if (ret == 0)
|
||||
elog(ERROR, "could not associate socket with SSL connection");
|
||||
|
||||
ret = SSL_connect(conn->ssl);
|
||||
if (ret <= 0)
|
||||
elog(ERROR, "could not make SSL connection");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
ssl_connect(Connection *conn, const char *host, int port)
|
||||
{
|
||||
int ret;
|
||||
|
||||
/* First do the base connection setup */
|
||||
ret = plain_connect(conn, host, port);
|
||||
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
ret = ssl_setup((SSLConnection *) conn);
|
||||
|
||||
if (ret < 0)
|
||||
close(conn->sock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
ssl_write(Connection *conn, const char *buf, size_t writelen)
|
||||
{
|
||||
SSLConnection *sslconn = (SSLConnection *) conn;
|
||||
|
||||
int ret = SSL_write(sslconn->ssl, buf, writelen);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "could not SSL_write");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
ssl_read(Connection *conn, char *buf, size_t buflen)
|
||||
{
|
||||
SSLConnection *sslconn = (SSLConnection *) conn;
|
||||
|
||||
int ret = SSL_read(sslconn->ssl, buf, buflen);
|
||||
|
||||
if (ret < 0)
|
||||
elog(ERROR, "could not SSL_read");
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
ssl_close(Connection *conn)
|
||||
{
|
||||
SSLConnection *sslconn = (SSLConnection *) conn;
|
||||
|
||||
if (sslconn->ssl != NULL)
|
||||
{
|
||||
SSL_free(sslconn->ssl);
|
||||
sslconn->ssl = NULL;
|
||||
}
|
||||
|
||||
if (sslconn->ssl_ctx != NULL)
|
||||
{
|
||||
SSL_CTX_free(sslconn->ssl_ctx);
|
||||
sslconn->ssl_ctx = NULL;
|
||||
}
|
||||
|
||||
plain_close(conn);
|
||||
}
|
||||
|
||||
static ConnOps ssl_ops = {
|
||||
.size = sizeof(SSLConnection),
|
||||
.init = NULL,
|
||||
.connect = ssl_connect,
|
||||
.close = ssl_close,
|
||||
.write = ssl_write,
|
||||
.read = ssl_read,
|
||||
};
|
||||
|
||||
extern void _conn_ssl_init(void);
|
||||
extern void _conn_ssl_fini(void);
|
||||
|
||||
void
|
||||
_conn_ssl_init(void)
|
||||
{
|
||||
SSL_library_init();
|
||||
/* Always returns 1 */
|
||||
SSL_load_error_strings();
|
||||
connection_register(CONNECTION_SSL, &ssl_ops);
|
||||
}
|
||||
|
||||
void
|
||||
_conn_ssl_fini(void)
|
||||
{
|
||||
ERR_free_strings();
|
||||
}
|
@ -1,6 +1,9 @@
|
||||
#ifndef TIMESCALEDB_HTTP_H
|
||||
#define TIMESCALEDB_HTTP_H
|
||||
|
||||
#include <unistd.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
#define HTTP_HOST "Host"
|
||||
#define HTTP_CONTENT_LENGTH "Content-Length"
|
||||
#define HTTP_CONTENT_TYPE "Content-Type"
|
||||
@ -39,11 +42,12 @@ HttpRequest *http_request_create(HttpRequestMethod method);
|
||||
void http_request_destroy(HttpRequest *req);
|
||||
|
||||
/* Assume that uri is null-terminated */
|
||||
void http_request_set_uri(HttpRequest *req, char *uri);
|
||||
void http_request_set_uri(HttpRequest *req, const char *uri);
|
||||
void http_request_set_version(HttpRequest *req, HttpRequestVersion version);
|
||||
|
||||
/* Assume that name and value are null-terminated */
|
||||
void http_request_set_header(HttpRequest *req, char *name, char *value);
|
||||
void http_request_set_body(HttpRequest *req, char *body, int body_len);
|
||||
void http_request_set_header(HttpRequest *req, const char *name, const char *valuue);
|
||||
void http_request_set_body(HttpRequest *req, const char *body, size_t body_len);
|
||||
|
||||
/* Serialize the request into char *dst. Return the length of request in optional size pointer*/
|
||||
const char *http_request_build(HttpRequest *req, size_t *buf_size);
|
||||
@ -60,12 +64,12 @@ void http_response_state_destroy(HttpResponseState *state);
|
||||
bool http_response_state_is_done(HttpResponseState *state);
|
||||
bool http_response_state_valid_status(HttpResponseState *state);
|
||||
char *http_response_state_next_buffer(HttpResponseState *state);
|
||||
int http_response_state_buffer_remaining(HttpResponseState *state);
|
||||
size_t http_response_state_buffer_remaining(HttpResponseState *state);
|
||||
char *http_response_state_body_start(HttpResponseState *state);
|
||||
int http_response_state_content_length(HttpResponseState *state);
|
||||
size_t http_response_state_content_length(HttpResponseState *state);
|
||||
int http_response_state_status_code(HttpResponseState *state);
|
||||
HttpHeader *http_response_state_headers(HttpResponseState *state);
|
||||
|
||||
/* Returns false if encountered an error during parsing */
|
||||
bool http_response_state_parse(HttpResponseState *state, int bytes);
|
||||
bool http_response_state_parse(HttpResponseState *state, size_t bytes);
|
||||
#endif /* TIMESCALEDB_HTTP_H */
|
||||
|
@ -14,11 +14,18 @@
|
||||
#define NEW_LINE '\n'
|
||||
|
||||
/* So that http_response.c can find this function */
|
||||
HttpHeader *http_header_create(char *name, int name_len, char *value, int value_len, HttpHeader *next);
|
||||
HttpHeader *http_header_create(const char *name,
|
||||
size_t name_len,
|
||||
const char *value,
|
||||
size_t value_len,
|
||||
HttpHeader *next);
|
||||
|
||||
HttpHeader *
|
||||
http_header_create(char *name, int name_len, char *value, int value_len, HttpHeader
|
||||
*next)
|
||||
http_header_create(const char *name,
|
||||
size_t name_len,
|
||||
const char *value,
|
||||
size_t value_len,
|
||||
HttpHeader *next)
|
||||
{
|
||||
HttpHeader *new_header = palloc(sizeof(HttpHeader));
|
||||
|
||||
@ -43,11 +50,11 @@ typedef struct HttpRequest
|
||||
{
|
||||
HttpRequestMethod method;
|
||||
char *uri;
|
||||
int uri_len;
|
||||
size_t uri_len;
|
||||
HttpRequestVersion version;
|
||||
HttpHeader *headers;
|
||||
char *body;
|
||||
int body_len;
|
||||
size_t body_len;
|
||||
MemoryContext context;
|
||||
} HttpRequest;
|
||||
|
||||
@ -73,8 +80,9 @@ http_request_init(HttpRequest *req, HttpRequestMethod method)
|
||||
HttpRequest *
|
||||
http_request_create(HttpRequestMethod method)
|
||||
{
|
||||
MemoryContext request_context = AllocSetContextCreate(
|
||||
CurrentMemoryContext, "Http Request", ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext request_context = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Http Request",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext old = MemoryContextSwitchTo(request_context);
|
||||
|
||||
HttpRequest *req = palloc(sizeof(HttpRequest));
|
||||
@ -95,10 +103,10 @@ http_request_destroy(HttpRequest *req)
|
||||
}
|
||||
|
||||
void
|
||||
http_request_set_uri(HttpRequest *req, char *uri)
|
||||
http_request_set_uri(HttpRequest *req, const char *uri)
|
||||
{
|
||||
MemoryContext old = MemoryContextSwitchTo(req->context);
|
||||
int uri_len = strlen(uri);
|
||||
int uri_len = strlen(uri);
|
||||
|
||||
req->uri = palloc(uri_len + 1);
|
||||
memcpy(req->uri, uri, uri_len);
|
||||
@ -114,20 +122,20 @@ http_request_set_version(HttpRequest *req, HttpRequestVersion version)
|
||||
}
|
||||
|
||||
void
|
||||
http_request_set_header(HttpRequest *req, char *name, char *value)
|
||||
http_request_set_header(HttpRequest *req, const char *name, const char *value)
|
||||
{
|
||||
MemoryContext old = MemoryContextSwitchTo(req->context);
|
||||
int name_len = strlen(name);
|
||||
int value_len = strlen(value);
|
||||
int name_len = strlen(name);
|
||||
int value_len = strlen(value);
|
||||
HttpHeader *new_header =
|
||||
http_header_create(name, name_len, value, value_len, req->headers);
|
||||
http_header_create(name, name_len, value, value_len, req->headers);
|
||||
|
||||
req->headers = new_header;
|
||||
MemoryContextSwitchTo(old);
|
||||
}
|
||||
|
||||
void
|
||||
http_request_set_body(HttpRequest *req, char *body, int body_len)
|
||||
http_request_set_body(HttpRequest *req, const char *body, size_t body_len)
|
||||
{
|
||||
MemoryContext old = MemoryContextSwitchTo(req->context);
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdbool.h>
|
||||
#include <string.h>
|
||||
#include <postgres.h>
|
||||
#include <lib/stringinfo.h>
|
||||
@ -11,14 +10,15 @@
|
||||
#define CARRIAGE_RETURN '\r'
|
||||
#define NEW_LINE '\n'
|
||||
#define SEP_CHAR ':'
|
||||
#define HTTP_VERSION_BUFFER_SIZE 128
|
||||
|
||||
extern HttpHeader *http_header_create(char *name, int name_len, char *value, int value_len, HttpHeader *next);
|
||||
extern HttpHeader *http_header_create(const char *name, size_t name_len, const char *value, size_t value_len, HttpHeader *next);
|
||||
|
||||
typedef enum HttpParseState
|
||||
{
|
||||
HTTP_STATE_STATUS,
|
||||
HTTP_STATE_INTERM, //received a single \ r
|
||||
HTTP_STATE_HEADER_NAME, //received \ r \ n
|
||||
HTTP_STATE_INTERM, /* received a single \r */
|
||||
HTTP_STATE_HEADER_NAME, /* received \r\n */
|
||||
HTTP_STATE_HEADER_VALUE,
|
||||
HTTP_STATE_ALMOST_DONE,
|
||||
HTTP_STATE_BODY,
|
||||
@ -29,17 +29,18 @@ typedef enum HttpParseState
|
||||
typedef struct HttpResponseState
|
||||
{
|
||||
MemoryContext context;
|
||||
char version[HTTP_VERSION_BUFFER_SIZE];
|
||||
char raw_buffer[MAX_RAW_BUFFER_SIZE];
|
||||
/* The next read should copy data into the buffer starting here */
|
||||
int offset;
|
||||
int parse_offset;
|
||||
int cur_header_name_len;
|
||||
int cur_header_value_len;
|
||||
off_t offset;
|
||||
off_t parse_offset;
|
||||
size_t cur_header_name_len;
|
||||
size_t cur_header_value_len;
|
||||
char *cur_header_name;
|
||||
char *cur_header_value;
|
||||
HttpHeader *headers;
|
||||
int status_code;
|
||||
int content_length;
|
||||
size_t content_length;
|
||||
char *body_start;
|
||||
HttpParseState state;
|
||||
} HttpResponseState;
|
||||
@ -54,8 +55,9 @@ http_response_state_init(HttpResponseState *state)
|
||||
HttpResponseState *
|
||||
http_response_state_create()
|
||||
{
|
||||
MemoryContext context = AllocSetContextCreate(
|
||||
CurrentMemoryContext, "Http Response", ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext context = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Http Response",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext old = MemoryContextSwitchTo(context);
|
||||
HttpResponseState *ret = palloc(sizeof(HttpResponseState));
|
||||
|
||||
@ -75,7 +77,7 @@ http_response_state_destroy(HttpResponseState *state)
|
||||
|
||||
bool
|
||||
http_response_state_valid_status(HttpResponseState *state)
|
||||
{
|
||||
{
|
||||
/* If the status code hasn't been parsed yet, return */
|
||||
if (state->status_code == -1)
|
||||
return true;
|
||||
@ -91,15 +93,26 @@ http_response_state_is_done(HttpResponseState *state)
|
||||
return (state->state == HTTP_STATE_DONE);
|
||||
}
|
||||
|
||||
int
|
||||
size_t
|
||||
http_response_state_buffer_remaining(HttpResponseState *state)
|
||||
{
|
||||
Assert(state->offset <= MAX_RAW_BUFFER_SIZE);
|
||||
|
||||
if (state->offset > MAX_RAW_BUFFER_SIZE)
|
||||
elog(ERROR, "invalid buffer state in HTTP parser");
|
||||
|
||||
return MAX_RAW_BUFFER_SIZE - state->offset;
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
http_response_state_next_buffer(HttpResponseState *state)
|
||||
{
|
||||
Assert(state->offset <= MAX_RAW_BUFFER_SIZE);
|
||||
|
||||
if (state->offset > MAX_RAW_BUFFER_SIZE)
|
||||
elog(ERROR, "invalid buffer state in HTTP parser");
|
||||
|
||||
return state->raw_buffer + state->offset;
|
||||
}
|
||||
|
||||
@ -115,7 +128,7 @@ http_response_state_status_code(HttpResponseState *state)
|
||||
return state->status_code;
|
||||
}
|
||||
|
||||
int
|
||||
size_t
|
||||
http_response_state_content_length(HttpResponseState *state)
|
||||
{
|
||||
return state->content_length;
|
||||
@ -127,17 +140,27 @@ http_response_state_headers(HttpResponseState *state)
|
||||
return state->headers;
|
||||
}
|
||||
|
||||
static void
|
||||
http_parse_status(HttpResponseState *state, char next)
|
||||
static bool
|
||||
http_parse_version(HttpResponseState *state)
|
||||
{
|
||||
float version;
|
||||
|
||||
if (sscanf(state->version, "HTTP/%f", &version) == 1)
|
||||
if (version == 1.0f || version == 1.1f)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void
|
||||
http_parse_status(HttpResponseState *state, const char next)
|
||||
{
|
||||
char version[128];
|
||||
char message[128];
|
||||
int temp_status;
|
||||
char raw_buf[state->parse_offset + 1];
|
||||
|
||||
switch (next)
|
||||
{
|
||||
case CARRIAGE_RETURN:
|
||||
|
||||
/*
|
||||
* Then we are at the end of status and can use sscanf
|
||||
*
|
||||
@ -146,14 +169,15 @@ http_parse_status(HttpResponseState *state, char next)
|
||||
*/
|
||||
memcpy(raw_buf, state->raw_buffer, state->parse_offset);
|
||||
raw_buf[state->parse_offset] = '\0';
|
||||
if (sscanf(raw_buf, "%s %d %s", version, &temp_status, message) == 3)
|
||||
state->state = HTTP_STATE_ERROR;
|
||||
memset(state->version, '\0', sizeof(state->version));
|
||||
|
||||
if (sscanf(raw_buf, "%127s %d %*s", state->version, &state->status_code) == 2)
|
||||
{
|
||||
state->status_code = temp_status;
|
||||
state->state = HTTP_STATE_INTERM;
|
||||
}
|
||||
else
|
||||
{
|
||||
state->state = HTTP_STATE_ERROR;
|
||||
if (http_parse_version(state))
|
||||
state->state = HTTP_STATE_INTERM;
|
||||
else
|
||||
state->state = HTTP_STATE_ERROR;
|
||||
}
|
||||
break;
|
||||
case NEW_LINE:
|
||||
@ -166,8 +190,11 @@ http_parse_status(HttpResponseState *state, char next)
|
||||
}
|
||||
|
||||
static void
|
||||
http_response_state_add_header(HttpResponseState *state, char *name, int name_len, char
|
||||
*value, int value_len)
|
||||
http_response_state_add_header(HttpResponseState *state,
|
||||
const char *name,
|
||||
size_t name_len,
|
||||
const char *value,
|
||||
size_t value_len)
|
||||
{
|
||||
MemoryContext old = MemoryContextSwitchTo(state->context);
|
||||
HttpHeader *new_header = http_header_create(name, name_len, value, value_len,
|
||||
@ -179,7 +206,7 @@ http_response_state_add_header(HttpResponseState *state, char *name, int name_le
|
||||
|
||||
static
|
||||
void
|
||||
http_parse_interm(HttpResponseState *state, char next)
|
||||
http_parse_interm(HttpResponseState *state, const char next)
|
||||
{
|
||||
int temp_length;
|
||||
|
||||
@ -216,7 +243,7 @@ http_parse_interm(HttpResponseState *state, char next)
|
||||
}
|
||||
|
||||
static void
|
||||
http_parse_header_name(HttpResponseState *state, char next)
|
||||
http_parse_header_name(HttpResponseState *state, const char next)
|
||||
{
|
||||
switch (next)
|
||||
{
|
||||
@ -255,7 +282,7 @@ http_parse_header_name(HttpResponseState *state, char next)
|
||||
|
||||
/* We do not customize to header_name. Assume all non \r or \n chars are allowed. */
|
||||
static void
|
||||
http_parse_header_value(HttpResponseState *state, char next)
|
||||
http_parse_header_value(HttpResponseState *state, const char next)
|
||||
{
|
||||
/* Allow everything except... \r, \n */
|
||||
switch (next)
|
||||
@ -264,7 +291,7 @@ http_parse_header_value(HttpResponseState *state, char next)
|
||||
state->state = HTTP_STATE_INTERM;
|
||||
break;
|
||||
case NEW_LINE:
|
||||
// \n is not allowed
|
||||
/* \n is not allowed */
|
||||
state->state = HTTP_STATE_ERROR;
|
||||
break;
|
||||
default:
|
||||
@ -274,7 +301,7 @@ http_parse_header_value(HttpResponseState *state, char next)
|
||||
}
|
||||
|
||||
static void
|
||||
http_parse_almost_done(HttpResponseState *state, char next)
|
||||
http_parse_almost_done(HttpResponseState *state, const char next)
|
||||
{
|
||||
/* Don't do anything, this is intermediate state */
|
||||
switch (next)
|
||||
@ -293,15 +320,18 @@ http_parse_almost_done(HttpResponseState *state, char next)
|
||||
}
|
||||
|
||||
bool
|
||||
http_response_state_parse(HttpResponseState *state, int bytes)
|
||||
http_response_state_parse(HttpResponseState *state, size_t bytes)
|
||||
{
|
||||
state->offset += bytes;
|
||||
|
||||
if (state->offset > MAX_RAW_BUFFER_SIZE)
|
||||
state->offset = MAX_RAW_BUFFER_SIZE;
|
||||
|
||||
/* Each state function will do the state AND transition */
|
||||
while (state->parse_offset < state->offset)
|
||||
{
|
||||
char next = state->raw_buffer[state->parse_offset];
|
||||
|
||||
switch (state->state)
|
||||
{
|
||||
case HTTP_STATE_STATUS:
|
||||
@ -339,9 +369,6 @@ http_response_state_parse(HttpResponseState *state, int bytes)
|
||||
return false;
|
||||
case HTTP_STATE_DONE:
|
||||
return true;
|
||||
default:
|
||||
elog(LOG, "unknown HttpParseState enum");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
@ -59,14 +59,13 @@ set(TEST_FILES
|
||||
|
||||
file(STRINGS ${PG_INCLUDEDIR}/pg_config.h USE_OPENSSL REGEX "define USE_OPENSSL ")
|
||||
|
||||
|
||||
IF(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
list(APPEND TEST_FILES
|
||||
bgw_launcher.sql
|
||||
bgw_db_scheduler.sql
|
||||
loader.sql
|
||||
symbol_conflict.sql
|
||||
test_net.sql)
|
||||
net.sql)
|
||||
ENDIF(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
|
||||
IF (${PG_VERSION_MAJOR} GREATER "9")
|
||||
|
@ -1,5 +1,11 @@
|
||||
set(SOURCES
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/test_conn.c"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/test_http.c"
|
||||
)
|
||||
)
|
||||
|
||||
if (CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
list(APPEND SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/conn_mock.c")
|
||||
endif (CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
|
||||
target_sources(${PROJECT_NAME} PRIVATE ${SOURCES})
|
||||
target_include_directories(${PROJECT_NAME} PRIVATE ${PROJECT_SOURCE_DIR}/src/net)
|
||||
|
106
test/src/net/conn_mock.c
Normal file
106
test/src/net/conn_mock.c
Normal file
@ -0,0 +1,106 @@
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include <postgres.h>
|
||||
#include <pg_config.h>
|
||||
|
||||
#include "conn_internal.h"
|
||||
#include "conn_mock.h"
|
||||
|
||||
#define MOCK_MAX_BUF_SIZE 1024
|
||||
|
||||
typedef struct MockConnection
|
||||
{
|
||||
Connection conn;
|
||||
char recv_buf[MOCK_MAX_BUF_SIZE];
|
||||
int recv_buf_offset;
|
||||
int recv_buf_len;
|
||||
} MockConnection;
|
||||
|
||||
static int
|
||||
mock_connect(Connection *conn, const char *host, int port)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
mock_close(Connection *conn)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
mock_write(Connection *conn, const char *buf, size_t writelen)
|
||||
{
|
||||
return writelen;
|
||||
}
|
||||
|
||||
static ssize_t
|
||||
mock_read(Connection *conn, char *buf, size_t readlen)
|
||||
{
|
||||
size_t bytes_to_read = 0;
|
||||
size_t max = readlen;
|
||||
MockConnection *mock = (MockConnection *) conn;
|
||||
|
||||
if (mock->recv_buf_offset >= mock->recv_buf_len)
|
||||
return 0;
|
||||
|
||||
if (max >= mock->recv_buf_len - mock->recv_buf_offset)
|
||||
max = mock->recv_buf_len - mock->recv_buf_offset;
|
||||
|
||||
/* Now read a random amount */
|
||||
while (bytes_to_read == 0)
|
||||
{
|
||||
bytes_to_read = rand() % (max + 1);
|
||||
}
|
||||
memcpy(buf, mock->recv_buf + mock->recv_buf_offset, bytes_to_read);
|
||||
mock->recv_buf_offset += bytes_to_read;
|
||||
|
||||
return bytes_to_read;
|
||||
}
|
||||
|
||||
static int
|
||||
mock_init(Connection *conn)
|
||||
{
|
||||
srand(time(0));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static ConnOps mock_ops = {
|
||||
.size = sizeof(MockConnection),
|
||||
.init = mock_init,
|
||||
.connect = mock_connect,
|
||||
.close = mock_close,
|
||||
.write = mock_write,
|
||||
.read = mock_read,
|
||||
};
|
||||
|
||||
ssize_t
|
||||
connection_mock_set_recv_buf(Connection *conn, char *buf, size_t buf_len)
|
||||
{
|
||||
if (buf_len > MOCK_MAX_BUF_SIZE)
|
||||
return -1;
|
||||
MockConnection *mock = (MockConnection *) conn;
|
||||
|
||||
memcpy(mock->recv_buf, buf, buf_len);
|
||||
mock->recv_buf_len = buf_len;
|
||||
return mock->recv_buf_len;
|
||||
}
|
||||
|
||||
extern void _conn_mock_init(void);
|
||||
extern void _conn_mock_fini(void);
|
||||
|
||||
void
|
||||
_conn_mock_init(void)
|
||||
{
|
||||
connection_register(CONNECTION_MOCK, &mock_ops);
|
||||
}
|
||||
|
||||
void
|
||||
_conn_mock_fini(void)
|
||||
{
|
||||
}
|
10
test/src/net/conn_mock.h
Normal file
10
test/src/net/conn_mock.h
Normal file
@ -0,0 +1,10 @@
|
||||
#ifndef TIMESCALEDB_CONN_MOCK_H
|
||||
#define TIMESCALEDB_CONN_MOCK_H
|
||||
|
||||
#include <sys/socket.h>
|
||||
|
||||
typedef struct Connection Connection;
|
||||
|
||||
extern ssize_t connection_mock_set_recv_buf(Connection *conn, char *buf, size_t buflen);
|
||||
|
||||
#endif /* TIMESCALEDB_CONN_MOCK_H */
|
@ -33,14 +33,14 @@ test_conn(PG_FUNCTION_ARGS)
|
||||
/* connectivity on the server running this test? */
|
||||
Assert(connection_connect(conn, host, port) >= 0);
|
||||
|
||||
//should timeout
|
||||
/* should timeout */
|
||||
PG_TRY();
|
||||
{
|
||||
connection_read(conn, response, 1);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
should_fail = true;
|
||||
should_fail = true;
|
||||
}
|
||||
PG_END_TRY();
|
||||
Assert(should_fail);
|
||||
@ -59,11 +59,11 @@ test_conn(PG_FUNCTION_ARGS)
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
should_fail = true;
|
||||
should_fail = true;
|
||||
}
|
||||
PG_END_TRY();
|
||||
Assert(should_fail);
|
||||
|
||||
|
||||
connection_close(conn);
|
||||
connection_destroy(conn);
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include "compat.h"
|
||||
#include "net/http.h"
|
||||
|
||||
#define MAX_REQUEST_SIZE 4096
|
||||
#define MAX_REQUEST_SIZE 4096
|
||||
|
||||
/* Tests for auxiliary HttpResponseState functions in http_parsing.h */
|
||||
|
||||
@ -69,35 +69,40 @@ num_test_strings()
|
||||
return sizeof(TEST_LENGTHS) / sizeof(int);
|
||||
}
|
||||
|
||||
// Check we can succesfully parse partial by well-formed HTTP responses
|
||||
/* Check we can succesfully parse partial by well-formed HTTP responses */
|
||||
Datum
|
||||
test_http_parsing(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int num_iterations = PG_GETARG_INT32(0);
|
||||
int bytes;
|
||||
int bytes,
|
||||
i,
|
||||
j;
|
||||
|
||||
srand(time(0));
|
||||
|
||||
for (int j = 0; j < num_iterations; j++)
|
||||
for (j = 0; j < num_iterations; j++)
|
||||
{
|
||||
for (int i = 0; i < num_test_strings(); i++)
|
||||
for (i = 0; i < num_test_strings(); i++)
|
||||
{
|
||||
bytes = rand() % (strlen(TEST_RESPONSES[i]) + 1);
|
||||
HttpResponseState *state = http_response_state_create();
|
||||
bool success;
|
||||
|
||||
// Copy part of the message into the parsing state
|
||||
bytes = rand() % (strlen(TEST_RESPONSES[i]) + 1);
|
||||
|
||||
/* Copy part of the message into the parsing state */
|
||||
memcpy(http_response_state_next_buffer(state), TEST_RESPONSES[i], bytes);
|
||||
// Now do the parse
|
||||
Assert(http_response_state_parse(state, bytes));
|
||||
|
||||
if (bytes < strlen(TEST_RESPONSES[i]))
|
||||
Assert(!http_response_state_is_done(state));
|
||||
else
|
||||
Assert(http_response_state_is_done(state));
|
||||
/* Now do the parse */
|
||||
success = http_response_state_parse(state, bytes);
|
||||
|
||||
Assert(success);
|
||||
|
||||
success = http_response_state_is_done(state);
|
||||
|
||||
Assert(bytes < strlen(TEST_RESPONSES[i]) ? !success : success);
|
||||
|
||||
http_response_state_destroy(state);
|
||||
}
|
||||
|
||||
}
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
@ -108,16 +113,19 @@ test_http_parsing(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
test_http_parsing_full(PG_FUNCTION_ARGS)
|
||||
{
|
||||
srand(time(0));
|
||||
int bytes;
|
||||
int bytes,
|
||||
i;
|
||||
|
||||
for (int i = 0; i < num_test_strings(); i++)
|
||||
srand(time(0));
|
||||
|
||||
for (i = 0; i < num_test_strings(); i++)
|
||||
{
|
||||
HttpResponseState *state = http_response_state_create();
|
||||
|
||||
bytes = strlen(TEST_RESPONSES[i]);
|
||||
// Copy all of the message into the parsing state
|
||||
/* Copy all of the message into the parsing state */
|
||||
memcpy(http_response_state_next_buffer(state), TEST_RESPONSES[i], bytes);
|
||||
// Now do the parse
|
||||
/* Now do the parse */
|
||||
Assert(http_response_state_parse(state, bytes));
|
||||
|
||||
Assert(http_response_state_is_done(state));
|
||||
@ -128,15 +136,16 @@ test_http_parsing_full(PG_FUNCTION_ARGS)
|
||||
http_response_state_destroy(state);
|
||||
}
|
||||
|
||||
// Now do the bad responses
|
||||
for (int i = 0; i < 3; i++)
|
||||
/* Now do the bad responses */
|
||||
for (i = 0; i < 3; i++)
|
||||
{
|
||||
HttpResponseState *state = http_response_state_create();
|
||||
|
||||
bytes = strlen(BAD_RESPONSES[i]);
|
||||
memcpy(http_response_state_next_buffer(state), BAD_RESPONSES[i], bytes);
|
||||
|
||||
Assert(!http_response_state_parse(state, bytes) ||
|
||||
!http_response_state_valid_status(state));
|
||||
!http_response_state_valid_status(state));
|
||||
|
||||
http_response_state_destroy(state);
|
||||
}
|
||||
@ -144,19 +153,20 @@ test_http_parsing_full(PG_FUNCTION_ARGS)
|
||||
}
|
||||
|
||||
Datum
|
||||
test_http_request_build(PG_FUNCTION_ARGS) {
|
||||
test_http_request_build(PG_FUNCTION_ARGS)
|
||||
{
|
||||
const char *serialized;
|
||||
size_t request_len;
|
||||
size_t request_len;
|
||||
const char *expected_response = "GET /v1/alerts HTTP/1.1\r\n"
|
||||
"Host: herp.com\r\nContent-Length: 0\r\n\r\n";
|
||||
char *host = "herp.com";
|
||||
"Host: herp.com\r\nContent-Length: 0\r\n\r\n";
|
||||
char *host = "herp.com";
|
||||
HttpRequest *req = http_request_create(HTTP_GET);
|
||||
|
||||
http_request_set_uri(req, "/v1/alerts");
|
||||
http_request_set_version(req, HTTP_11);
|
||||
http_request_set_header(req, HTTP_CONTENT_LENGTH, "0");
|
||||
http_request_set_header(req, HTTP_HOST, host);
|
||||
|
||||
|
||||
serialized = http_request_build(req, &request_len);
|
||||
|
||||
Assert(!strncmp(expected_response, serialized, request_len));
|
||||
@ -174,8 +184,8 @@ test_http_request_build(PG_FUNCTION_ARGS) {
|
||||
|
||||
serialized = http_request_build(req, &request_len);
|
||||
|
||||
Assert(!strncmp(expected_response, serialized, request_len));
|
||||
http_request_destroy(req);
|
||||
Assert(!strncmp(expected_response, serialized, request_len));
|
||||
http_request_destroy(req);
|
||||
|
||||
expected_response = "POST /tmp/status/1234 HTTP/1.1\r\n"
|
||||
"Content-Length: 0\r\nHost: herp.com\r\n\r\n";
|
||||
@ -188,10 +198,10 @@ test_http_request_build(PG_FUNCTION_ARGS) {
|
||||
|
||||
serialized = http_request_build(req, &request_len);
|
||||
|
||||
Assert(!strncmp(expected_response, serialized, request_len));
|
||||
Assert(!strncmp(expected_response, serialized, request_len));
|
||||
http_request_destroy(req);
|
||||
|
||||
// Check that content-length checking works
|
||||
/* Check that content-length checking works */
|
||||
req = http_request_create(HTTP_POST);
|
||||
http_request_set_uri(req, "/tmp/status/1234");
|
||||
http_request_set_version(req, HTTP_11);
|
||||
@ -200,6 +210,6 @@ test_http_request_build(PG_FUNCTION_ARGS) {
|
||||
|
||||
Assert(!http_request_build(req, &request_len));
|
||||
http_request_destroy(req);
|
||||
|
||||
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user