LeOS-Genesis/external/badvpn/client/DataProto.c

567 lines
17 KiB
C

/**
* @file DataProto.c
* @author Ambroz Bizjak <ambrop7@gmail.com>
*
* @section LICENSE
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the author nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#include <protocol/dataproto.h>
#include <misc/byteorder.h>
#include <base/BLog.h>
#include <client/DataProto.h>
#include <generated/blog_channel_DataProto.h>
static void monitor_handler (DataProtoSink *o);
static void refresh_up_job (DataProtoSink *o);
static void receive_timer_handler (DataProtoSink *o);
static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
static void up_job_handler (DataProtoSink *o);
static void flow_buffer_free (struct DataProtoFlow_buffer *b);
static void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink);
static void flow_buffer_detach (struct DataProtoFlow_buffer *b);
static void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b);
static void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b);
static void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b);
void monitor_handler (DataProtoSink *o)
{
DebugObject_Access(&o->d_obj);
// send keep-alive
PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
}
void refresh_up_job (DataProtoSink *o)
{
if (o->up != o->up_report) {
BPending_Set(&o->up_job);
} else {
BPending_Unset(&o->up_job);
}
}
void receive_timer_handler (DataProtoSink *o)
{
DebugObject_Access(&o->d_obj);
// consider down
o->up = 0;
refresh_up_job(o);
}
void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
{
DebugObject_Access(&o->d_obj);
ASSERT(data_len >= sizeof(struct dataproto_header))
int flags = 0;
// if we are receiving keepalives, set the flag
if (BTimer_IsRunning(&o->receive_timer)) {
flags |= DATAPROTO_FLAGS_RECEIVING_KEEPALIVES;
}
// modify existing packet here
struct dataproto_header header;
memcpy(&header, data, sizeof(header));
header.flags = hton8(flags);
memcpy(data, &header, sizeof(header));
}
void up_job_handler (DataProtoSink *o)
{
DebugObject_Access(&o->d_obj);
ASSERT(o->up != o->up_report)
o->up_report = o->up;
o->handler(o->user, o->up);
return;
}
void source_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
{
DebugObject_Access(&o->d_obj);
ASSERT(buf)
ASSERT(recv_len >= 0)
ASSERT(recv_len <= o->frame_mtu)
// remember packet
o->current_buf = buf;
o->current_recv_len = recv_len;
// call handler
o->handler(o->user, buf + DATAPROTO_MAX_OVERHEAD, recv_len);
return;
}
void flow_buffer_free (struct DataProtoFlow_buffer *b)
{
ASSERT(!b->sink)
// free route buffer
RouteBuffer_Free(&b->rbuf);
// free inactivity monitor
if (b->inactivity_time >= 0) {
PacketPassInactivityMonitor_Free(&b->monitor);
}
// free connector
PacketPassConnector_Free(&b->connector);
// free buffer structure
free(b);
}
void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink)
{
ASSERT(!b->sink)
// init queue flow
PacketPassFairQueueFlow_Init(&b->sink_qflow, &sink->queue);
// connect to queue flow
PacketPassConnector_ConnectOutput(&b->connector, PacketPassFairQueueFlow_GetInput(&b->sink_qflow));
// set DataProto
b->sink = sink;
}
void flow_buffer_detach (struct DataProtoFlow_buffer *b)
{
ASSERT(b->sink)
PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
// disconnect from queue flow
PacketPassConnector_DisconnectOutput(&b->connector);
// free queue flow
PacketPassFairQueueFlow_Free(&b->sink_qflow);
// clear reference to this buffer in the sink
if (b->sink->detaching_buffer == b) {
b->sink->detaching_buffer = NULL;
}
// set no DataProto
b->sink = NULL;
}
void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b)
{
ASSERT(b->sink)
ASSERT(PacketPassFairQueueFlow_IsBusy(&b->sink_qflow))
ASSERT(!b->sink->detaching_buffer || b->sink->detaching_buffer == b)
if (b->sink->detaching_buffer == b) {
return;
}
// request cancel
PacketPassFairQueueFlow_RequestCancel(&b->sink_qflow);
// set busy handler
PacketPassFairQueueFlow_SetBusyHandler(&b->sink_qflow, (PacketPassFairQueue_handler_busy)flow_buffer_qflow_handler_busy, b);
// remember this buffer in the sink so it can handle us if it goes away
b->sink->detaching_buffer = b;
}
void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b)
{
ASSERT(b->sink)
ASSERT(b->sink->detaching_buffer == b)
PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
// detach
flow_buffer_detach(b);
if (!b->flow) {
// free
flow_buffer_free(b);
} else if (b->flow->sink_desired) {
// attach
flow_buffer_attach(b, b->flow->sink_desired);
}
}
void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b)
{
ASSERT(b->sink)
ASSERT(b->sink->detaching_buffer == b)
PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
flow_buffer_finish_detach(b);
}
int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoSink_handler handler, void *user)
{
ASSERT(PacketPassInterface_HasCancel(output))
ASSERT(PacketPassInterface_GetMTU(output) >= DATAPROTO_MAX_OVERHEAD)
// init arguments
o->reactor = reactor;
o->handler = handler;
o->user = user;
// set frame MTU
o->frame_mtu = PacketPassInterface_GetMTU(output) - DATAPROTO_MAX_OVERHEAD;
// init notifier
PacketPassNotifier_Init(&o->notifier, output, BReactor_PendingGroup(o->reactor));
PacketPassNotifier_SetHandler(&o->notifier, (PacketPassNotifier_handler_notify)notifier_handler, o);
// init monitor
PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o);
PacketPassInactivityMonitor_Force(&o->monitor);
// init queue
if (!PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1, 1)) {
BLog(BLOG_ERROR, "PacketPassFairQueue_Init failed");
goto fail1;
}
// init keepalive queue flow
PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);
// init keepalive source
DataProtoKeepaliveSource_Init(&o->ka_source, BReactor_PendingGroup(o->reactor));
// init keepalive blocker
PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source), BReactor_PendingGroup(o->reactor));
// init keepalive buffer
if (!SinglePacketBuffer_Init(&o->ka_buffer, PacketRecvBlocker_GetOutput(&o->ka_blocker), PacketPassFairQueueFlow_GetInput(&o->ka_qflow), BReactor_PendingGroup(o->reactor))) {
BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed");
goto fail2;
}
// init receive timer
BTimer_Init(&o->receive_timer, tolerance_time, (BTimer_handler)receive_timer_handler, o);
// init handler job
BPending_Init(&o->up_job, BReactor_PendingGroup(o->reactor), (BPending_handler)up_job_handler, o);
// set not up
o->up = 0;
o->up_report = 0;
// set no detaching buffer
o->detaching_buffer = NULL;
DebugCounter_Init(&o->d_ctr);
DebugObject_Init(&o->d_obj);
return 1;
fail2:
PacketRecvBlocker_Free(&o->ka_blocker);
DataProtoKeepaliveSource_Free(&o->ka_source);
PacketPassFairQueueFlow_Free(&o->ka_qflow);
PacketPassFairQueue_Free(&o->queue);
fail1:
PacketPassInactivityMonitor_Free(&o->monitor);
PacketPassNotifier_Free(&o->notifier);
return 0;
}
void DataProtoSink_Free (DataProtoSink *o)
{
DebugObject_Free(&o->d_obj);
DebugCounter_Free(&o->d_ctr);
// allow freeing queue flows
PacketPassFairQueue_PrepareFree(&o->queue);
// release detaching buffer
if (o->detaching_buffer) {
ASSERT(!o->detaching_buffer->flow || o->detaching_buffer->flow->sink_desired != o)
flow_buffer_finish_detach(o->detaching_buffer);
}
// free handler job
BPending_Free(&o->up_job);
// free receive timer
BReactor_RemoveTimer(o->reactor, &o->receive_timer);
// free keepalive buffer
SinglePacketBuffer_Free(&o->ka_buffer);
// free keepalive blocker
PacketRecvBlocker_Free(&o->ka_blocker);
// free keepalive source
DataProtoKeepaliveSource_Free(&o->ka_source);
// free keepalive queue flow
PacketPassFairQueueFlow_Free(&o->ka_qflow);
// free queue
PacketPassFairQueue_Free(&o->queue);
// free monitor
PacketPassInactivityMonitor_Free(&o->monitor);
// free notifier
PacketPassNotifier_Free(&o->notifier);
}
void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
{
ASSERT(peer_receiving == 0 || peer_receiving == 1)
DebugObject_Access(&o->d_obj);
// reset receive timer
BReactor_SetTimer(o->reactor, &o->receive_timer);
if (!peer_receiving) {
// peer reports not receiving, consider down
o->up = 0;
// send keep-alive to converge faster
PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
} else {
// consider up
o->up = 1;
}
refresh_up_job(o);
}
int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor)
{
ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
ASSERT(handler)
// init arguments
o->handler = handler;
o->user = user;
o->reactor = reactor;
// remember frame MTU
o->frame_mtu = PacketRecvInterface_GetMTU(input);
// init router
if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)source_router_handler, o, BReactor_PendingGroup(reactor))) {
BLog(BLOG_ERROR, "PacketRouter_Init failed");
goto fail0;
}
DebugCounter_Init(&o->d_ctr);
DebugObject_Init(&o->d_obj);
return 1;
fail0:
return 0;
}
void DataProtoSource_Free (DataProtoSource *o)
{
DebugObject_Free(&o->d_obj);
DebugCounter_Free(&o->d_ctr);
// free router
PacketRouter_Free(&o->router);
}
int DataProtoFlow_Init (DataProtoFlow *o, DataProtoSource *source, peerid_t source_id, peerid_t dest_id, int num_packets, int inactivity_time, void *user,
DataProtoFlow_handler_inactivity handler_inactivity)
{
DebugObject_Access(&source->d_obj);
ASSERT(num_packets > 0)
ASSERT(!(inactivity_time >= 0) || handler_inactivity)
// init arguments
o->source = source;
o->source_id = source_id;
o->dest_id = dest_id;
// set no desired sink
o->sink_desired = NULL;
// allocate buffer structure
struct DataProtoFlow_buffer *b = (struct DataProtoFlow_buffer *)malloc(sizeof(*b));
if (!b) {
BLog(BLOG_ERROR, "malloc failed");
goto fail0;
}
o->b = b;
// set parent
b->flow = o;
// remember inactivity time
b->inactivity_time = inactivity_time;
// init connector
PacketPassConnector_Init(&b->connector, DATAPROTO_MAX_OVERHEAD + source->frame_mtu, BReactor_PendingGroup(source->reactor));
// init inactivity monitor
PacketPassInterface *buf_out = PacketPassConnector_GetInput(&b->connector);
if (b->inactivity_time >= 0) {
PacketPassInactivityMonitor_Init(&b->monitor, buf_out, source->reactor, b->inactivity_time, handler_inactivity, user);
buf_out = PacketPassInactivityMonitor_GetInput(&b->monitor);
}
// init route buffer
if (!RouteBuffer_Init(&b->rbuf, DATAPROTO_MAX_OVERHEAD + source->frame_mtu, buf_out, num_packets)) {
BLog(BLOG_ERROR, "RouteBuffer_Init failed");
goto fail1;
}
// set no sink
b->sink = NULL;
DebugCounter_Increment(&source->d_ctr);
DebugObject_Init(&o->d_obj);
return 1;
fail1:
if (b->inactivity_time >= 0) {
PacketPassInactivityMonitor_Free(&b->monitor);
}
PacketPassConnector_Free(&b->connector);
free(b);
fail0:
return 0;
}
void DataProtoFlow_Free (DataProtoFlow *o)
{
DebugObject_Free(&o->d_obj);
DebugCounter_Decrement(&o->source->d_ctr);
ASSERT(!o->sink_desired)
struct DataProtoFlow_buffer *b = o->b;
if (b->sink) {
if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
// schedule detach, free buffer after detach
flow_buffer_schedule_detach(b);
b->flow = NULL;
// remove inactivity handler
if (b->inactivity_time >= 0) {
PacketPassInactivityMonitor_SetHandler(&b->monitor, NULL, NULL);
}
} else {
// detach and free buffer now
flow_buffer_detach(b);
flow_buffer_free(b);
}
} else {
// free buffer
flow_buffer_free(b);
}
}
void DataProtoFlow_Route (DataProtoFlow *o, int more)
{
DebugObject_Access(&o->d_obj);
PacketRouter_AssertRoute(&o->source->router);
ASSERT(o->source->current_buf)
ASSERT(more == 0 || more == 1)
struct DataProtoFlow_buffer *b = o->b;
// write header. Don't set flags, it will be set in notifier_handler.
struct dataproto_header header;
struct dataproto_peer_id id;
header.from_id = htol16(o->source_id);
header.num_peer_ids = htol16(1);
id.id = htol16(o->dest_id);
memcpy(o->source->current_buf, &header, sizeof(header));
memcpy(o->source->current_buf + sizeof(header), &id, sizeof(id));
// route
uint8_t *next_buf;
if (!PacketRouter_Route(&o->source->router, DATAPROTO_MAX_OVERHEAD + o->source->current_recv_len, &b->rbuf,
&next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->source->current_recv_len : 0)
)) {
BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
return;
}
// remember next buffer, or don't allow further routing if more==0
o->source->current_buf = (more ? next_buf : NULL);
}
void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *sink)
{
DebugObject_Access(&o->d_obj);
DebugObject_Access(&sink->d_obj);
ASSERT(!o->sink_desired)
ASSERT(sink)
ASSERT(o->source->frame_mtu <= sink->frame_mtu)
struct DataProtoFlow_buffer *b = o->b;
if (b->sink) {
if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
// schedule detach and reattach
flow_buffer_schedule_detach(b);
} else {
// detach and reattach now
flow_buffer_detach(b);
flow_buffer_attach(b, sink);
}
} else {
// attach
flow_buffer_attach(b, sink);
}
// set desired sink
o->sink_desired = sink;
DebugCounter_Increment(&sink->d_ctr);
}
void DataProtoFlow_Detach (DataProtoFlow *o)
{
DebugObject_Access(&o->d_obj);
ASSERT(o->sink_desired)
struct DataProtoFlow_buffer *b = o->b;
ASSERT(b->sink)
DataProtoSink *sink = o->sink_desired;
if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
// schedule detach
flow_buffer_schedule_detach(b);
} else {
// detach now
flow_buffer_detach(b);
}
// set no desired sink
o->sink_desired = NULL;
DebugCounter_Decrement(&sink->d_ctr);
}