diff --git a/common/utils/T/tracer/Makefile.local b/common/utils/T/tracer/Makefile.local index ae4e248a2007e86baaacd8544aa71fb51af27ddf..4e971fc627f3c031e1485077078a881edcd65182 100644 --- a/common/utils/T/tracer/Makefile.local +++ b/common/utils/T/tracer/Makefile.local @@ -6,7 +6,7 @@ CFLAGS=-Wall -g -pthread -DT_TRACER LIBS += -lrt PROG=tracer_local -OBJS=local.o forward.o +OBJS=local.o $(PROG): $(OBJS) $(CC) $(CFLAGS) -o $(PROG) $(OBJS) $(LIBS) diff --git a/common/utils/T/tracer/forward.c b/common/utils/T/tracer/forward.c deleted file mode 100644 index 2bddb80330c5891d7c402d0484e7080061f89b4e..0000000000000000000000000000000000000000 --- a/common/utils/T/tracer/forward.c +++ /dev/null @@ -1,171 +0,0 @@ -#include "forward.h" -#include <stdlib.h> -#include <stdio.h> -#include <netinet/ip.h> -#include <arpa/inet.h> -#include <unistd.h> -#include <pthread.h> -#include <string.h> -#include <stdint.h> -#include <inttypes.h> - -/* from local.c */ -int get_connection(char *addr, int port); -void new_thread(void *(*f)(void *), void *data); - -typedef struct databuf { - char *d; - int l; - struct databuf *next; -} databuf; - -typedef struct { - int s; - int sc; - pthread_mutex_t lock; - pthread_mutex_t datalock; - pthread_cond_t datacond; - databuf * volatile head, *tail; - uint64_t memusage; - uint64_t last_warning_memusage; -} forward_data; - -static void *data_sender(void *_f) -{ - forward_data *f = _f; - databuf *cur; - char *buf, *b; - int size; - -wait: - if (pthread_mutex_lock(&f->datalock)) abort(); - while (f->head == NULL) - if (pthread_cond_wait(&f->datacond, &f->datalock)) abort(); - cur = f->head; - buf = cur->d; - size = cur->l; - f->head = cur->next; - f->memusage -= size; - if (f->head == NULL) f->tail = NULL; - if (pthread_mutex_unlock(&f->datalock)) abort(); - free(cur); - goto process; - -process: - if (pthread_mutex_lock(&f->lock)) abort(); - - b = buf; - while (size) { - int l = write(f->s, b, size); - if (l <= 0) { printf("forward error\n"); exit(1); } - size -= l; - b += l; - } - - if (pthread_mutex_unlock(&f->lock)) abort(); - - free(buf); - - goto wait; -} - -static void do_forward(forward_data *f, int from, int to, int lock) -{ - int l, len; - char *b; - char buf[1024]; - while (1) { - len = read(from, buf, 1024); - if (len <= 0) break; - b = buf; - - if (lock) if (pthread_mutex_lock(&f->lock)) abort(); - - while (len) { - l = write(to, b, len); - if (l <= 0) break; - len -= l; - b += l; - } - - if (lock) if (pthread_mutex_unlock(&f->lock)) abort(); - } -} - -static void *forward_s_to_sc(void *_f) -{ - forward_data *f = _f; - do_forward(f, f->s, f->sc, 0); - return NULL; -} - -void forward_start_client(void *_f, int s) -{ - forward_data *f = _f; - f->sc = s; - new_thread(forward_s_to_sc, f); -} - -void *forwarder(int port) -{ - forward_data *f; - - f = malloc(sizeof(*f)); if (f == NULL) abort(); - - pthread_mutex_init(&f->lock, NULL); - pthread_mutex_init(&f->datalock, NULL); - pthread_cond_init(&f->datacond, NULL); - - f->sc = -1; - f->head = f->tail = NULL; - - f->memusage = 0; - f->last_warning_memusage = 0; - - printf("waiting for remote tracer on port %d\n", port); - - f->s = get_connection("127.0.0.1", port); - - printf("connected\n"); - - new_thread(data_sender, f); - - return f; -} - -void forward(void *_forwarder, char *buf, int size) -{ - forward_data *f = _forwarder; - int32_t ssize = size; - databuf *new; - - new = malloc(sizeof(*new)); if (new == NULL) abort(); - - if (pthread_mutex_lock(&f->datalock)) abort(); - - new->d = malloc(size + 4); if (new->d == NULL) abort(); - /* put the size of the message at the head */ - memcpy(new->d, &ssize, 4); - memcpy(new->d+4, buf, size); - new->l = size+4; - new->next = NULL; - if (f->head == NULL) f->head = new; - if (f->tail != NULL) f->tail->next = new; - f->tail = new; - - f->memusage += size+4; - /* warn every 100MB */ - if (f->memusage > f->last_warning_memusage && - f->memusage - f->last_warning_memusage > 100000000) { - f->last_warning_memusage += 100000000; - printf("WARNING: memory usage is over %"PRIu64"MB\n", - f->last_warning_memusage / 1000000); - } else - if (f->memusage < f->last_warning_memusage && - f->last_warning_memusage - f->memusage > 100000000) { - f->last_warning_memusage = (f->memusage/100000000) * 100000000; - } - - if (pthread_cond_signal(&f->datacond)) abort(); - if (pthread_mutex_unlock(&f->datalock)) abort(); -} diff --git a/common/utils/T/tracer/forward.h b/common/utils/T/tracer/forward.h deleted file mode 100644 index 80607d74fcbd4747e6c1a52dd541b8191aa2df27..0000000000000000000000000000000000000000 --- a/common/utils/T/tracer/forward.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef _FORWARD_H_ -#define _FORWARD_H_ - -void *forwarder(int port); -void forward(void *forwarder, char *buf, int size); -void forward_start_client(void *forwarder, int socket); - -#endif /* _FORWARD_H_ */ diff --git a/common/utils/T/tracer/local.c b/common/utils/T/tracer/local.c index fbe4d6d4201453ea7de7695b9f132f985acfbbc8..2550a025497625ef6abea3912056f2e6451a125f 100644 --- a/common/utils/T/tracer/local.c +++ b/common/utils/T/tracer/local.c @@ -8,18 +8,53 @@ #include <sys/stat.h> #include <fcntl.h> #include <pthread.h> +#include <inttypes.h> #define DEFAULT_PORT 2021 -#include "forward.h" - #include "../T_defs.h" -T_cache_t *T_cache; -int T_busylist_head; -int T_pos; +static T_cache_t *T_cache; +static int T_busylist_head; + +typedef struct databuf { + char *d; + int l; + struct databuf *next; +} databuf; + +typedef struct { + int socket_local; + int socket_remote; + pthread_mutex_t lock; + pthread_cond_t cond; + databuf * volatile head, *tail; + uint64_t memusage; + uint64_t last_warning_memusage; +} forward_data; -int get_connection(char *addr, int port) +/****************************************************************************/ +/* utility functions */ +/****************************************************************************/ + +static void new_thread(void *(*f)(void *), void *data) +{ + pthread_t t; + pthread_attr_t att; + + if (pthread_attr_init(&att)) + { fprintf(stderr, "pthread_attr_init err\n"); exit(1); } + if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED)) + { fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); } + if (pthread_attr_setstacksize(&att, 10000000)) + { fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); } + if (pthread_create(&t, &att, f, data)) + { fprintf(stderr, "pthread_create err\n"); exit(1); } + if (pthread_attr_destroy(&att)) + { fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); } +} + +static int get_connection(char *addr, int port) { struct sockaddr_in a; socklen_t alen; @@ -49,12 +84,142 @@ int get_connection(char *addr, int port) return t; } -void wait_message(void) +/****************************************************************************/ +/* forward functions */ +/****************************************************************************/ + +static void *data_sender(void *_f) +{ + forward_data *f = _f; + databuf *cur; + char *buf, *b; + int size; + +wait: + if (pthread_mutex_lock(&f->lock)) abort(); + while (f->head == NULL) + if (pthread_cond_wait(&f->cond, &f->lock)) abort(); + cur = f->head; + buf = cur->d; + size = cur->l; + f->head = cur->next; + f->memusage -= size; + if (f->head == NULL) f->tail = NULL; + if (pthread_mutex_unlock(&f->lock)) abort(); + free(cur); + goto process; + +process: + b = buf; + while (size) { + int l = write(f->socket_remote, b, size); + if (l <= 0) { printf("forward error\n"); exit(1); } + size -= l; + b += l; + } + + free(buf); + + goto wait; +} + +static void *forward_remote_messages(void *_f) +{ + forward_data *f = _f; + int from = f->socket_remote; + int to = f->socket_local; + int l, len; + char *b; + char buf[1024]; + while (1) { + len = read(from, buf, 1024); + if (len <= 0) break; + b = buf; + + while (len) { + l = write(to, b, len); + if (l <= 0) break; + len -= l; + b += l; + } + } + return NULL; +} + +static void *forwarder(int port, int s) +{ + forward_data *f; + + f = malloc(sizeof(*f)); if (f == NULL) abort(); + + pthread_mutex_init(&f->lock, NULL); + pthread_cond_init(&f->cond, NULL); + + f->socket_local = s; + f->head = f->tail = NULL; + + f->memusage = 0; + f->last_warning_memusage = 0; + + printf("waiting for remote tracer on port %d\n", port); + + f->socket_remote = get_connection("127.0.0.1", port); + + printf("connected\n"); + + new_thread(data_sender, f); + new_thread(forward_remote_messages, f); + + return f; +} + +static void forward(void *_forwarder, char *buf, int size) +{ + forward_data *f = _forwarder; + int32_t ssize = size; + databuf *new; + + new = malloc(sizeof(*new)); if (new == NULL) abort(); + + if (pthread_mutex_lock(&f->lock)) abort(); + + new->d = malloc(size + 4); if (new->d == NULL) abort(); + /* put the size of the message at the head */ + memcpy(new->d, &ssize, 4); + memcpy(new->d+4, buf, size); + new->l = size+4; + new->next = NULL; + if (f->head == NULL) f->head = new; + if (f->tail != NULL) f->tail->next = new; + f->tail = new; + + f->memusage += size+4; + /* warn every 100MB */ + if (f->memusage > f->last_warning_memusage && + f->memusage - f->last_warning_memusage > 100000000) { + f->last_warning_memusage += 100000000; + printf("WARNING: memory usage is over %"PRIu64"MB\n", + f->last_warning_memusage / 1000000); + } else + if (f->memusage < f->last_warning_memusage && + f->last_warning_memusage - f->memusage > 100000000) { + f->last_warning_memusage = (f->memusage/100000000) * 100000000; + } + + if (pthread_cond_signal(&f->cond)) abort(); + if (pthread_mutex_unlock(&f->lock)) abort(); +} + +/****************************************************************************/ +/* local functions */ +/****************************************************************************/ + +static void wait_message(void) { while (T_cache[T_busylist_head].busy == 0) usleep(1000); } -void init_shm(void) +static void init_shm(void) { int i; int s = shm_open(T_SHM_FILENAME, O_RDWR | O_CREAT /*| O_SYNC*/, 0666); @@ -74,24 +239,7 @@ void init_shm(void) for (i = 0; i < T_CACHE_SIZE; i++) T_cache[i].busy = 0; } -void new_thread(void *(*f)(void *), void *data) -{ - pthread_t t; - pthread_attr_t att; - - if (pthread_attr_init(&att)) - { fprintf(stderr, "pthread_attr_init err\n"); exit(1); } - if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED)) - { fprintf(stderr, "pthread_attr_setdetachstate err\n"); exit(1); } - if (pthread_attr_setstacksize(&att, 10000000)) - { fprintf(stderr, "pthread_attr_setstacksize err\n"); exit(1); } - if (pthread_create(&t, &att, f, data)) - { fprintf(stderr, "pthread_create err\n"); exit(1); } - if (pthread_attr_destroy(&att)) - { fprintf(stderr, "pthread_attr_destroy err\n"); exit(1); } -} - -void usage(void) +static void usage(void) { printf( "tracer - local side\n" @@ -129,8 +277,7 @@ int main(int n, char **v) if (write(s, &t, 1) != 1) abort(); } - f = forwarder(port); - forward_start_client(f, s); + f = forwarder(port, s); /* read messages */ while (1) {