One Level Up
Top Level
src/client.c - weighttp
Functions defined
Source code
- #include "weighttp.h"
- static uint8_t client_parse(Client *client, int size);
- static void client_io_cb(struct ev_loop *loop, ev_io *w, int revents);
- static void client_set_events(Client *client, int events);
- static void client_set_events(Client *client, int events) {
- struct ev_loop *loop = client->worker->loop;
- ev_io *watcher = &client->sock_watcher;
- if (events == (watcher->events & (EV_READ | EV_WRITE)))
- return;
- ev_io_stop(loop, watcher);
- ev_io_set(watcher, watcher->fd, (watcher->events & ~(EV_READ | EV_WRITE)) | events);
- ev_io_start(loop, watcher);
- }
- Client *client_new(Worker *worker) {
- Client *client;
- client = W_MALLOC(Client, 1);
- client->state = CLIENT_START;
- client->worker = worker;
- client->sock_watcher.fd = -1;
- client->sock_watcher.data = client;
- client->content_length = -1;
- client->buffer_offset = 0;
- client->request_offset = 0;
- client->keepalive = client->worker->config->keep_alive;
- client->chunked = 0;
- client->chunk_size = -1;
- client->chunk_received = 0;
- return client;
- }
- void client_free(Client *client) {
- if (client->sock_watcher.fd != -1) {
- ev_io_stop(client->worker->loop, &client->sock_watcher);
- shutdown(client->sock_watcher.fd, SHUT_WR);
- close(client->sock_watcher.fd);
- }
- free(client);
- }
- static void client_reset(Client *client) {
-
- if (!client->keepalive) {
- if (client->sock_watcher.fd != -1) {
- ev_io_stop(client->worker->loop, &client->sock_watcher);
- shutdown(client->sock_watcher.fd, SHUT_WR);
- close(client->sock_watcher.fd);
- client->sock_watcher.fd = -1;
- }
- client->state = CLIENT_START;
- } else {
- client_set_events(client, EV_WRITE);
- client->state = CLIENT_WRITING;
- client->worker->stats.req_started++;
- }
- client->parser_state = PARSER_START;
- client->buffer_offset = 0;
- client->parser_offset = 0;
- client->request_offset = 0;
- client->ts_start = 0;
- client->ts_end = 0;
- client->status_success = 0;
- client->success = 0;
- client->content_length = -1;
- client->bytes_received = 0;
- client->header_size = 0;
- client->keepalive = client->worker->config->keep_alive;
- client->chunked = 0;
- client->chunk_size = -1;
- client->chunk_received = 0;
- }
- static uint8_t client_connect(Client *client) {
-
- start:
- if (-1 == connect(client->sock_watcher.fd, client->worker->config->saddr->ai_addr, client->worker->config->saddr->ai_addrlen)) {
- switch (errno) {
- case EINPROGRESS:
- case EALREADY:
-
- client->state = CLIENT_CONNECTING;
- return 1;
- case EISCONN:
- break;
- case EINTR:
- goto start;
- default:
- {
- strerror_r(errno, client->buffer, sizeof(client->buffer));
- W_ERROR("connect() failed: %s (%d)", client->buffer, errno);
- return 0;
- }
- }
- }
-
- client->state = CLIENT_WRITING;
- return 1;
- }
- static void client_io_cb(struct ev_loop *loop, ev_io *w, int revents) {
- Client *client = w->data;
- UNUSED(loop);
- UNUSED(revents);
- client_state_machine(client);
- }
- void client_state_machine(Client *client) {
- int r;
- Config *config = client->worker->config;
- start:
-
- switch (client->state) {
- case CLIENT_START:
- client->worker->stats.req_started++;
- do {
- r = socket(config->saddr->ai_family, config->saddr->ai_socktype, config->saddr->ai_protocol);
- } while (-1 == r && errno == EINTR);
- if (-1 == r) {
- client->state = CLIENT_ERROR;
- strerror_r(errno, client->buffer, sizeof(client->buffer));
- W_ERROR("socket() failed: %s (%d)", client->buffer, errno);
- goto start;
- }
-
- fcntl(r, F_SETFL, O_NONBLOCK | O_RDWR);
- ev_init(&client->sock_watcher, client_io_cb);
- ev_io_set(&client->sock_watcher, r, EV_WRITE);
- ev_io_start(client->worker->loop, &client->sock_watcher);
- if (!client_connect(client)) {
- client->state = CLIENT_ERROR;
- goto start;
- } else {
- client_set_events(client, EV_WRITE);
- return;
- }
- case CLIENT_CONNECTING:
- if (!client_connect(client)) {
- client->state = CLIENT_ERROR;
- goto start;
- }
- case CLIENT_WRITING:
- while (1) {
- r = write(client->sock_watcher.fd, &config->request[client->request_offset], config->request_size - client->request_offset);
-
- if (r == -1) {
-
- if (errno == EINTR)
- continue;
- strerror_r(errno, client->buffer, sizeof(client->buffer));
- W_ERROR("write() failed: %s (%d)", client->buffer, errno);
- client->state = CLIENT_ERROR;
- goto start;
- } else if (r != 0) {
-
- client->request_offset += r;
- if (client->request_offset == config->request_size) {
-
- client->state = CLIENT_READING;
- client_set_events(client, EV_READ);
- }
- return;
- } else {
-
- client->state = CLIENT_END;
- goto start;
- }
- }
- case CLIENT_READING:
- while (1) {
- r = read(client->sock_watcher.fd, &client->buffer[client->buffer_offset], sizeof(client->buffer) - client->buffer_offset - 1);
-
- if (r == -1) {
-
- if (errno == EINTR)
- continue;
- strerror_r(errno, client->buffer, sizeof(client->buffer));
- W_ERROR("read() failed: %s (%d)", client->buffer, errno);
- client->state = CLIENT_ERROR;
- } else if (r != 0) {
-
- client->bytes_received += r;
- client->buffer_offset += r;
- client->worker->stats.bytes_total += r;
- if (client->buffer_offset >= sizeof(client->buffer)) {
-
- client->state = CLIENT_ERROR;
- break;
- }
- client->buffer[client->buffer_offset] = '\0';
-
- if (!client_parse(client, r)) {
- client->state = CLIENT_ERROR;
-
- break;
- } else {
- if (client->state == CLIENT_END)
- goto start;
- else
- return;
- }
- } else {
-
- if (client->parser_state == PARSER_BODY && !client->keepalive && client->status_success
- && !client->chunked && client->content_length == -1) {
- client->success = 1;
- client->state = CLIENT_END;
- } else {
- client->state = CLIENT_ERROR;
- }
- goto start;
- }
- }
- case CLIENT_ERROR:
-
- client->worker->stats.req_error++;
- client->keepalive = 0;
- client->success = 0;
- client->state = CLIENT_END;
- case CLIENT_END:
-
- client->worker->stats.req_done++;
- if (client->success) {
- client->worker->stats.req_success++;
- client->worker->stats.bytes_body += client->bytes_received - client->header_size;
- } else {
- client->worker->stats.req_failed++;
- }
-
- if (client->worker->id == 1 && client->worker->stats.req_done % client->worker->progress_interval == 0) {
- printf("progress: %3d%% done\n",
- (int) (client->worker->stats.req_done * 100 / client->worker->stats.req_todo)
- );
- }
- if (client->worker->stats.req_started == client->worker->stats.req_todo) {
-
- client->keepalive = 0;
- client_reset(client);
- if (client->worker->stats.req_done == client->worker->stats.req_todo) {
-
- ev_unref(client->worker->loop);
- }
- } else {
- client_reset(client);
- goto start;
- }
- }
- }
- static uint8_t client_parse(Client *client, int size) {
- char *end, *str;
- uint16_t status_code;
- switch (client->parser_state) {
- case PARSER_START:
-
-
- if (client->buffer_offset < sizeof("HTTP/1.1 200\r\n"))
- return 1;
- if (strncmp(client->buffer, "HTTP/1.1 ", sizeof("HTTP/1.1 ")-1) != 0)
- return 0;
-
- status_code = 0;
- str = client->buffer + sizeof("HTTP/1.1 ")-1;
- for (end = str + 3; str != end; str++) {
- if (*str < '0' || *str > '9')
- return 0;
- status_code *= 10;
- status_code += *str - '0';
- }
- if (status_code >= 200 && status_code < 300) {
- client->worker->stats.req_2xx++;
- client->status_success = 1;
- } else if (status_code < 400) {
- client->worker->stats.req_3xx++;
- client->status_success = 1;
- } else if (status_code < 500) {
- client->worker->stats.req_4xx++;
- } else if (status_code < 600) {
- client->worker->stats.req_5xx++;
- } else {
-
- return 0;
- }
-
- end = strchr(end, '\r');
- if (!end || *(end+1) != '\n')
- return 0;
- client->parser_offset = end + 2 - client->buffer;
- client->parser_state = PARSER_HEADER;
- case PARSER_HEADER:
-
-
- while (NULL != (end = strchr(&client->buffer[client->parser_offset], '\r'))) {
- if (*(end+1) != '\n')
- return 0;
- if (end == &client->buffer[client->parser_offset]) {
-
- client->parser_state = PARSER_BODY;
- client->header_size = end + 2 - client->buffer;
-
- return client_parse(client, size - client->header_size);
- }
- *end = '\0';
- str = &client->buffer[client->parser_offset];
-
- if (strncmp(str, "Content-Length: ", sizeof("Content-Length: ")-1) == 0) {
-
- client->content_length = str_to_uint64(str + sizeof("Content-Length: ") - 1);
- } else if (strncmp(str, "Connection: ", sizeof("Connection: ")-1) == 0) {
-
- str += sizeof("Connection: ") - 1;
- if (strncmp(str, "close", sizeof("close")-1) == 0)
- client->keepalive = 0;
- else if (strncmp(str, "Keep-Alive", sizeof("Keep-Alive")-1) == 0)
- client->keepalive = client->worker->config->keep_alive;
- else if (strncmp(str, "keep-alive", sizeof("keep-alive")-1) == 0)
- client->keepalive = client->worker->config->keep_alive;
- else
- return 0;
- } else if (strncmp(str, "Transfer-Encoding: ", sizeof("Transfer-Encoding: ")-1) == 0) {
-
- str += sizeof("Transfer-Encoding: ") - 1;
- if (strncmp(str, "chunked", sizeof("chunked")-1) == 0)
- client->chunked = 1;
- else
- return 0;
- }
- if (*(end+2) == '\r' && *(end+3) == '\n') {
-
- client->parser_state = PARSER_BODY;
- client->header_size = end + 4 - client->buffer;
- client->parser_offset = client->header_size;
-
- return client_parse(client, size - client->header_size);
- }
- client->parser_offset = end - client->buffer + 2;
- }
- return 1;
- case PARSER_BODY:
-
-
-
- if (client->chunked) {
- int consume_max;
- str = &client->buffer[client->parser_offset];
-
- if (client->chunk_size == -1) {
-
- client->chunk_size = 0;
- client->chunk_received = 0;
- end = str + size;
- for (; str < end; str++) {
- if (*str == ';' || *str == '\r')
- break;
- client->chunk_size *= 16;
- if (*str >= '0' && *str <= '9')
- client->chunk_size += *str - '0';
- else if (*str >= 'A' && *str <= 'Z')
- client->chunk_size += 10 + *str - 'A';
- else if (*str >= 'a' && *str <= 'z')
- client->chunk_size += 10 + *str - 'a';
- else
- return 0;
- }
- str = strstr(str, "\r\n");
- if (!str)
- return 0;
- str += 2;
-
- if (client->chunk_size == 0) {
-
- client->state = CLIENT_END;
- client->success = client->status_success ? 1 : 0;
- return 1;
- }
- size -= str - &client->buffer[client->parser_offset];
- client->parser_offset = str - client->buffer;
- }
-
- consume_max = client->chunk_size - client->chunk_received;
- if (size < consume_max)
- consume_max = size;
- client->chunk_received += consume_max;
- client->parser_offset += consume_max;
-
- if (client->chunk_received == client->chunk_size) {
- if (client->buffer[client->parser_offset] != '\r' || client->buffer[client->parser_offset+1] != '\n')
- return 0;
-
-
- client->chunk_size = -1;
- client->chunk_received = 0;
- client->parser_offset += 2;
- consume_max += 2;
-
- if (size - consume_max > 0)
- return client_parse(client, size - consume_max);
- }
- client->parser_offset = 0;
- client->buffer_offset = 0;
- return 1;
- } else {
-
- client->buffer_offset = 0;
- if (client->content_length == -1)
- return 0;
- if (client->bytes_received == (uint64_t) (client->header_size + client->content_length)) {
-
- client->state = CLIENT_END;
- client->success = client->status_success ? 1 : 0;
- }
- }
- return 1;
- }
- return 1;
- }
One Level Up
Top Level