Dynamic Load Management for Distributed Continuous Query Systems
sensors in a sensor network). In order to scale up
the volumes of streams and queries that can be processed,
a distributed stream processing system is inevitable. How-
ever, as the properties of data streams (e.g., arrival rates)
and the processing servers load are hard to predict, the ini-
tial placement of query operators may result in unsatisfac-
tory system performance. The problem is exacerbated by
multiple continuous queries that run long enough to experi-
ence the changes of the environment parameters. As such,
any suboptimal performance will persist for a long time.
Clearly, a distributed stream processing system must
adapt to changes in environment parameters and servers
load. We believe a dynamic load management scheme is in-
dispensable for the system to be scalable. In particular, we
expect aggressive methods such as query operator migra-
tion during runtime to bring long term benet (especially
for long running continuous queries) even though they may
incur some short term overhead. However, to date few com-
plete and practical solutions have been proposed for this
problem. In this paper, we offer our solution to the problem.
More specically we make the following contributions:
We formally dene a new metric, Performance Ratio
(PR)
, to measure the relative performance of each query and
the objective for the whole system.
By building a new cost model, we identify the heuris-
tics that can be used to approach the objective.
We propose a complete and practical distributed load
management scheme which includes a static initial place-
ment scheme for newly initiated queries as well as a runtime
dynamic scheme.
We conducted an extensive experimental study that
shows the effectiveness of our technique.
2. Problem Formulation and Analysis
In our system, there is a set of stream sources, and a
set of locally distributed processing nodes and a set of con-
tinuous queries. Since stream sources may not have the
ability to communicate with multiple nodes, we assign one
processing node as the delegation of each stream source. In
terms of system performance, we are concerned about the
delay of resulting data items, which is also one of the main
concerns of end users. Formally, if the evaluation of query
q
k
on a source tuple tuple
l
from s
l
generates one or more
result tuples, then the delay of tuple
l
for q
k
is dened as
d
l
k
= t
out
t
in
, where t
in
is the time that tuple
l
arrived at
the system and t
out
is the time that the result tuple is gen-
erated. If there are more than one result tuples, then t
out
is the time that the last one is generated. At a closer look,
d
l
k
includes the time used in evaluating the query (denoted
as p
l
k
), the time waiting for processing as well as the time
it is transferred over the network connections. For a spe-
cic processing model, we assume the evaluation time p
l
k
is
xed and regard it as the inherent complexity of q
k
. Since
different queries may have different inherent complexities,
the value of d
l
k
cannot reect correctly the relative perfor-
mance of different queries. However, in a multi-query and
multi-user environment, we wish to tell the relative perfor-
mance of different queries. Hence we propose a new metric
Performance Ratio (PR)
to incorporate the inherent com-
plexity of a query. Formally, P R
l
k
for processing of tuple
l
for q
k
is dened as
P R
l
k
= d
l
k
p
l
k
.
(1)
And the performance ratio of q
k
is dened as
P R
k
= max
s
l
S
k
P R
l
k
.
(2)
P R
k
reects the relative performance of q
k
. Our objective
is to minimize the worst relative performance among all the
queries.
In [1], we developed a cost model to estimate the val-
ues of d
l
k
and p
l
k
. And hence the value of P R
k
can be
calculated accordingly. Given the cost model, we derived
that the problem is actually NP-Complete. Furthermore,
we also observed that if we ignore the communication cost,
the workload should be balanced across all the processing
nodes in an optimal solution. In view of the complexity
of the problem, we opt to designing heuristics instead of
nding an optimal algorithm. From the cost model, we
know that the extra delay is caused by the communication
and the workload of the system. Hence, we use the heuris-
tics as follows. (1) Distribute operators of a query to a re-
stricted number of nodes so that CP R
l
k
is small. As we
will see soon, we set the maximum of this number as the
number of streams that the query operates on. The intuition
is the processing cost is higher for a query that involves
more streams, and hence it can afford to be distributed to
more nodes. (2) Dynamically balance the workload of the
processing nodes. This heuristic is inspired by our obser-
vation stated above. (3) Minimize the communication cost
under conditions (1) and (2). In short, we have to design
a dynamic load balancing scheme where the operations of
each query should not be distributed to too many nodes and
the total communication trafc is minimized.
3. System Design
Initial placement scheme.
In our initial placement
scheme, we only consider minimizing the communication
cost and leave the load balancing task to our dynamic
scheme. The scheme generates one query fragment for each
involved stream and then distributes them to the delegation
nodes of their corresponding streams. Roughly speaking,
the scheme places the lters of a stream at that streams
delegation node and places the join operators at nodes that
minimize communication trafc. Detail of the query frag-
ment generation algorithm can be found in [1]. We further
dened two query fragments are neighbors to each other if
there is data ow between them.
Dynamic placement scheme. In the dynamic placement
scheme, we adopt a local load balancing approach. Each
node n
i
has a number of load management partners. The
partner relationship is symmetric, i.e. if n
i
is a partner of
n
j
, then n
j
is also a partner of n
i
. Each node would keep
track of the load distribution within itself and all its partners
and would make load redistribution decisions if necessary.
In the partner selection algorithm described in [1], one node
tends to select those nodes that contain more neighboring
query fragments of those query fragments in this node.
During runtime, each node would collect its own work-
load within a window time and if it detects the workload
changes to some degree, it will broadcast to all its partners.
This basic technique suffers from transient changes of the
system state, which would render the system unstable. To
solve this problem, we adopt a low pass lter to lter out
the transient changes of the system workload. In particular,
workload is computed as
i+1
= ×
i
+ (1 )
c
,
where
i+1
and
i
are the workload information used for
load balancing after i + 1 and i time windows, and
c
is the
collected workload within the (i + 1)th time window. is a
parameter to determine the responsiveness of the estimation
value to the workload changes. If is too high then the cal-
culated workload cannot reect the current workload. On
the other hand, if is too low then it cannot lter out the
transient changes. In [1], we mathematically analyzed how
to set in practice.
Based on the load information of itself and its partners,
each node would make load management decisions at run-
time. This works in rounds. Within each round, if a node
detects that its own workload is smaller than the average
workload of itself and its partners, it would request work-
load from the partner that has the largest workload.
Once a node receives the workload request, it will choose
the victim query operators for migration. In the design
of this selection scheme, we have to consider two points.
Firstly, in the perspective of system performance, the se-
lection scheme should work fast and scalable to the num-
ber of queries. Secondly, we have to restrict the number
of processing nodes for a query to the number of involved
streams. Hence, we use the query fragments generated in
the initial placement scheme as our smallest migration unit.
The number of query fragments is equal to the number of
involved streams. Hence using query fragment as migration
unit inherently restricts the number of processing nodes for
a query. Moreover, we expect this number would be much
smaller than the number of operators and hence the load se-
lection decision can be made much faster. Furthermore, the
choice of query fragments to be migrated is critical in main-
taining data ow locality. A poor choice may cause streams
to be scattered across too many nodes and result in network
congestion. In [1], we proposed a data ow aware strategy,
which performs well in maintaining data ow locality.
We presented extensive experimental results in [1]. In
the experiments, we found that the proposed strategy works
well with a small number of load management partners,
which can minimize the runtime overhead. We also exam-
ined our data ow aware load selection scheme by compar-
ing it with two simpler algorithms. The results showed that
the former works better in maintaining data ow locality
than the others. Finally we studied the adaptivity, stabil-
ity and sensitivity of our sheme to changes in stream arrival
rates, system workload and the parameter . The results
showed that our scheme can effectively adapt to the runtime
changes of the system to approach our objective.
References
[1] Y. Zhou, B.C. Ooi, and K.-L. Tan. Dynamic Load
Management for Distributed Continous Query Sys-
tems. Unpublished manuscript available from au-
thors
, 2004.