diff --git a/tracer/forward.c b/tracer/forward.c index cfb837f9e5d7d82245f26ad08684cf43cc0ddd1f..b982a890275f2fc2251109e4796d14d5f3a0672c 100644 --- a/tracer/forward.c +++ b/tracer/forward.c @@ -5,13 +5,61 @@ #include <arpa/inet.h> #include <unistd.h> #include <pthread.h> +#include <string.h> + +typedef struct databuf { + char *d; + int l; + struct databuf *next; +} databuf; typedef struct { int s; int sc; pthread_mutex_t lock; + pthread_mutex_t datalock; + pthread_cond_t datacond; + databuf * volatile head, *tail; } forward_data; +static void *data_sender(void *_f) +{ + forward_data *f = _f; + databuf *cur; + char *buf, *b; + int size; + +wait: + if (pthread_mutex_lock(&f->datalock)) abort(); + while (f->head == NULL) + if (pthread_cond_wait(&f->datacond, &f->datalock)) abort(); + cur = f->head; + buf = cur->d; + size = cur->l; + f->head = cur->next; + if (f->head == NULL) f->tail = NULL; + if (pthread_mutex_unlock(&f->datalock)) abort(); + free(cur); + goto process; + +process: + if (pthread_mutex_lock(&f->lock)) abort(); + + b = buf; + while (size) { + int l = write(f->s, b, size); + if (l <= 0) { printf("forward error\n"); exit(1); } + size -= l; + b += l; + } + + if (pthread_mutex_unlock(&f->lock)) abort(); + + free(buf); + + goto wait; +} + static void do_forward(forward_data *f, int from, int to, int lock) { int l, len; @@ -66,6 +114,11 @@ void *forwarder(char *ip, int port) f = malloc(sizeof(*f)); if (f == NULL) abort(); pthread_mutex_init(&f->lock, NULL); + pthread_mutex_init(&f->datalock, NULL); + pthread_cond_init(&f->datacond, NULL); + + f->sc = -1; + f->head = f->tail = NULL; f->s = socket(AF_INET, SOCK_STREAM, 0); if (f->s == -1) { perror("socket"); exit(1); } @@ -77,21 +130,27 @@ void *forwarder(char *ip, int port) if (connect(f->s, (struct sockaddr *)&a, sizeof(a)) == -1) { perror("connect"); exit(1); } + new_thread(data_sender, f); + return f; } void forward(void *_forwarder, char *buf, int size) { forward_data *f = _forwarder; + databuf *new; - if (pthread_mutex_lock(&f->lock)) abort(); + new = malloc(sizeof(*new)); if (new == NULL) abort(); - while (size) { - int l = write(f->s, buf, size); - if (l <= 0) { printf("forward error\n"); exit(1); } - size -= l; - buf += l; - } + if (pthread_mutex_lock(&f->datalock)) abort(); - if (pthread_mutex_unlock(&f->lock)) abort(); + new->d = malloc(size); if (new->d == NULL) abort(); + memcpy(new->d, buf, size); + new->l = size; + new->next = NULL; + if (f->head == NULL) f->head = new; + f->tail = new; + + if (pthread_cond_signal(&f->datacond)) abort(); + if (pthread_mutex_unlock(&f->datalock)) abort(); }