Initial attempt at providing a responder thread.

This commit is contained in:
ingrix 2024-05-04 04:47:36 -07:00
parent 907cbd8caa
commit 32314e45c0
3 changed files with 206 additions and 8 deletions

203
main.c
View File

@ -1,3 +1,4 @@
#include <px_common.h>
#include <px_listener.h>
#include <px_log.h>
#include <px_gemini_msg.h>
@ -40,6 +41,172 @@ struct file_data_ {
};
typedef struct file_data_ file_data_t;
struct respq_entry {
struct respq_entry* next;
struct respq_entry* prev;
px_gemini_response_t* resp;
px_connection_t* conn;
};
void respq_entry_delete(struct respq_entry* e) {
if (!e)
return;
px_gemini_response_delete(e->resp);
px_connection_delete(e->conn);
}
#define PX_THREAD_INIT 0x0
#define PX_THREAD_HALT 0x1
#define PX_THREAD_SHUTDOWN 0x2
#define PX_THREAD_RUNNING 0x4
struct px_thread_ {
pthread_t thread;
pthread_mutex_t mtx;
pthread_cond_t cond;
int state;
int thread_running;
struct respq_entry queue;
};
typedef struct px_thread_ px_thread_t;
void px_thread_init(px_thread_t* thr) {
memset(thr, 0, sizeof(*thr));
pthread_mutex_init(&thr->mtx, NULL);
pthread_cond_init(&thr->cond, NULL);
thr->queue.next = &thr->queue;
thr->queue.prev = &thr->queue;
thr->state = PX_THREAD_INIT;
}
void* send_responses(void* thrp) {
px_thread_t* thr = (px_thread_t*)thrp;
struct respq_entry* current = NULL;
pthread_mutex_lock(&thr->mtx);
while (1) {
if (thr->state & PX_THREAD_HALT)
//px_log_info("done");
break;
if (thr->queue.next != &thr->queue) {
//px_log_info("queue nonempty");
current = thr->queue.next;
thr->queue.next->next->prev = &thr->queue;
thr->queue.next = thr->queue.next->next;
pthread_mutex_unlock(&thr->mtx);
px_gemini_response_send(current->resp, current->conn);
px_gemini_response_delete(current->resp);
px_connection_delete(current->conn);
PX_FREE(current);
pthread_mutex_lock(&thr->mtx);
continue;
}
if (thr->state & PX_THREAD_SHUTDOWN)
break;
//px_log_info("waiting");
struct timeval tm;
gettimeofday(&tm, NULL);
struct timespec timeout = { tm.tv_sec + 1, tm.tv_usec * 1000 };
pthread_cond_timedwait(&thr->cond, &thr->mtx, &timeout);
//px_log_info("loop");
}
// clear out the queue
while (thr->queue.next != &thr->queue) {
current = thr->queue.next;
thr->queue.next->next->prev = &thr->queue;
thr->queue.next = thr->queue.next->next;
px_gemini_response_delete(current->resp);
px_connection_delete(current->conn);
}
pthread_mutex_unlock(&thr->mtx);
pthread_exit((void*)0);
}
int px_thread_start(px_thread_t* thr) {
pthread_mutex_lock(&thr->mtx);
if (thr->state != PX_THREAD_INIT)
return 0;
thr->state = PX_THREAD_RUNNING;
pthread_create(&thr->thread, NULL, send_responses, (void*)thr);
pthread_mutex_unlock(&thr->mtx);
}
int px_thread_shutdown(px_thread_t* thr) {
pthread_mutex_lock(&thr->mtx);
if (thr->state != PX_THREAD_RUNNING) {
pthread_mutex_unlock(&thr->mtx);
return 0;
}
thr->state |= PX_THREAD_SHUTDOWN;
pthread_cond_signal(&thr->cond);
pthread_mutex_unlock(&thr->mtx); // let it complete
void* retval = NULL;
pthread_join(thr->thread, &retval);
pthread_mutex_lock(&thr->mtx);
thr->state &= ~(PX_THREAD_HALT|PX_THREAD_RUNNING); // leave shutdown in place until reset
thr->state = PX_THREAD_SHUTDOWN;
pthread_mutex_unlock(&thr->mtx);
px_log_info("thread exited with code: %p", retval);
return 0;
}
int px_thread_reset(px_thread_t* thr) {
pthread_mutex_lock(&thr->mtx);
if (thr->state != PX_THREAD_SHUTDOWN) // exact, not running, not halting
return -1;
thr->state = PX_THREAD_INIT;
pthread_mutex_unlock(&thr->mtx);
return 0;
}
int px_thread_halt(px_thread_t* thr) {
pthread_mutex_lock(&thr->mtx);
if (!thr->state) {
pthread_mutex_unlock(&thr->mtx);
return 0;
}
thr->state |= PX_THREAD_HALT|PX_THREAD_SHUTDOWN;
pthread_cond_signal(&thr->cond);
pthread_mutex_unlock(&thr->mtx);
void* retval = NULL;
pthread_join(thr->thread, &retval);
px_log_info("thread exited with code: %p", retval);
pthread_mutex_lock(&thr->mtx);
thr->state = PX_THREAD_SHUTDOWN; // leave shutdown in place until reset
pthread_mutex_unlock(&thr->mtx);
px_log_info("thread exited with code: %p", retval);
return 0;
}
int px_queue_response(px_thread_t* thr, px_gemini_response_t* resp, px_connection_t* conn) {
struct respq_entry* e = PX_NEW(struct respq_entry);
e->resp = resp;
e->conn = conn;
pthread_mutex_lock(&thr->mtx);
if (thr->state != PX_THREAD_RUNNING) {
int s = thr->state;
pthread_mutex_unlock(&thr->mtx);
px_log_info("thread not in proper state: 0x%x should be 0x%x", s, PX_THREAD_RUNNING);
PX_FREE(e);
return -1;
}
e->prev = thr->queue.prev;
e->next = &thr->queue;
thr->queue.prev->next = e;
thr->queue.prev = e;
pthread_cond_signal(&thr->cond);
pthread_mutex_unlock(&thr->mtx);
return 0;
}
int main(int argc, char* argv[]) {
px_listener_t* listener = px_listener_new();
if (!listener) {
@ -100,6 +267,12 @@ int main(int argc, char* argv[]) {
px_listener_start(listener, NULL);
px_connection_t* conn;
px_thread_t* thr = PX_NEW(px_thread_t);
px_thread_init(thr);
px_thread_start(thr);
int times = 2;
while ((conn = px_connection_accept(listener)) != NULL) {
px_log_info("accepted a connection");
px_gemini_request_t* req = px_gemini_request_from_connection(conn);
@ -126,22 +299,36 @@ int main(int argc, char* argv[]) {
if (!resp) {
px_log_error("no response to send");
px_connection_shutdown(conn);
px_connection_delete(conn);
} else {
px_gemini_response_set_header(resp, 20, "text/gemini");
int r = px_gemini_response_send(resp, conn);
if (r != 0) {
px_log_error("bad return from response send: %d", r);
if (px_queue_response(thr, resp, conn) != 0) {
px_log_error("couldn't queue response");
px_gemini_response_delete(resp);
px_connection_shutdown(conn);
px_connection_delete(conn);
}
px_gemini_response_delete(resp);
//int r = px_gemini_response_send(resp, conn);
//if (r != 0) {
// px_log_error("bad return from response send: %d", r);
//}
//px_gemini_response_delete(resp);
//px_connection_shutdown(conn);
//px_connection_delete(conn);
}
px_gemini_request_delete(req);
px_connection_shutdown(conn);
px_connection_delete(conn);
break;
if (--times == 0)
break;
//break;
}
px_thread_shutdown(thr);
px_listener_stop(listener);
free(thr);
px_listener_delete(listener);
return 0;
}

10
px_common.h Normal file
View File

@ -0,0 +1,10 @@
#ifndef PX_COMMON_H__
#define PX_COMMON_H__
#include <stdlib.h>
#define PX_NEW(t) (t*)calloc(1, sizeof(t));
#define PX_NEW_UNINITIALIZED(t) (t*)malloc(sizeof(t));
#define PX_FREE(t) free(t);
#endif // PX_COMMON_H__

View File

@ -191,6 +191,7 @@ int px_gemini_response_send(px_gemini_response_t* resp, px_connection_t* conn) {
int r;
size_t bufsz, ck;
px_log_info("sending response");
if (!px_gemini_check_status(resp->header.status)) {
px_log_error("bad gemini response status %d, not sending response", resp->header.status);
return -1;