Leadership: The Scientific Approach

I've been recently reading a book called Managing Behavior in Organizations by Jerald Greenberg. And I want to share  the ideas about leadership that I've picked up from the book.

What is leadership?

In order to understand how to become a leader, we should first define what leadership is. Leadership is an ability of an individual to influence others in ways that help to reach group or organizational goals. The essential goal of a leader is to create a purpose or mission of the organization and strategy for attaining it (whereas goal of the manager is to implement this strategy).

You are now probably asking: "how do leaders influence others?" According to "the theory", they use position and/or personal powers. Position power comes from the posts individuals hold, i.e. individuals can influence others because such powers are associated with their jobs. Such powers are available to anyone that holds a particular position. Position power has four different sources:

  • Legitimate power - individuals gain such power when others recognise and accept his or her authority;
  • Reward power - the power to control the rewards others receive, e.g. a supervisor can reword other by recommending a pay raise;
  • Coercive power - the capacity to control punishment;
  • Information power - power gained by having access to valuable data or knowledge.

Another source of power comes from unique qualities of an individual. Such power is called personal power. There are four sources of personal power:

  • Rational persuasion - ability to provide logical arguments and factual evidence to support his/her arguments;
  • Expert power - the power individuals gain when others recognise the expertise on a topic;
  • Referent power - the power individuals gain because they are liked and admired by others;
  • Charisma - power that comes from engaging and magnetic personality.

Luckily, the book provides some tips on how to strengthen your powers:

  • You can increase your information power by expanding your network of communication contacts and keeping in touch. The more contacts you have, the more information will be accessible to you; and the more information you have, the more people will count on you;
  • Take responsibilities that are unique. You will gain more power if you will be the only one that can perform certain tasks;
  • Perform less routine tasks and instead do some novel ones. If you do only routine tasks, you will be easily replaceable, whereas ones that perform novel tasks are indispensable;
  • Be involved in organisational decisions by joining task forces and making contact with senior people. The more important others consider your input to be, the more power you will have;
  • Perform activities that are organisation's top priorities.

What does it take to be a successful leader?

Up until this point we have taken a look to what kind of powers leaders use in order to influence others. But what makes leaders successful? What behaviour leads to leaders success?

Leaders are likely to be most successful when they demonstrate high concern for both people (showing consideration) and production (initiating structure).

In simple words, successful leaders (1) cares about you as a person and (2) gives you an advice, answers to your questions, and shows you what is expected of you. In fact, we can plot leadership effectiveness into a two dimensional diagram (which is called managerial grid):

Managerial Grid
Managerial Grid. Effective leaders should demonstrate high amounts of both dimensions.

In this diagram there you can see five green dots that represent different names for management style: "country club", impoverished, task, "middle of the road" and team managements. Team management is considered to be the ideal management style and this style is observed between very successful leaders. The diagram is mainly useful for two things: determining a manager's position in this grid (i.e. determining his/her management style), and helping him/her to train certain skills in order to reach the ideal management style (grid training).

LPC contingency theory: different leaders for different situations

According to contingency theories, certain leadership styles may be most effective under certain conditions. One example of such theories is LPC contingency theory. The theory states that the most important personal characteristic is esteem (liking) for least preferred co-worker (LPC). In order to evaluate this LPC, you have to take a person with whom a leader has troubles working with. The leader who perceive this person in negative terms (low LPC) are primarily concerned about carrying out the task itself. The leader who perceive this person in positive terms (high LPC) tends to accomplish the task by developing good relationships with the group. I believe this can be related to management styles. Low LPC leaders will probably show task management style, whereas high LPC leaders will probably prefer "country club" management style. LPC contingency theory though states that LPC is relatively fixed and cannot be changed, whereas managerial grid suggests otherwise.

When a certain type of leader is the most valuable? According to LPC contingency theory it depends on a situational control. It's not clear from the book what exactly does this mean (nor I was able to find a definition on the Internet), but it seems that it describes if everyone knows what to do, how much subordinates tend to follow the command, and how much power a leader has. When situational control is low then the group does not like the leader, and when the situational control is high, the leader is very liked by the group. So, LPC contingency theory states that low LPC leaders are best when situational control is either very low or very high. When situational control is low, leader who can give clear orders fits best; and when situational control is high, power of the leader is not challenged, therefore it is perfectly acceptable for the leader to focus on tasks.

High LPC leaders are best when situational control is moderate. A good example would be a research lab, where relations with colleagues are good, but the power of a leader is somewhat limited (you cannot force innovations out of people). In such situations a leader that gives clear orders will probably not appropriate, whereas collaborative leader, i.e. high LPC, would likely be more effective.

Apparently, you can match a certain leader type to a certain situation in order to boost effectiveness. Read more about this on Wikipedia article about Fiedler contingency model.

Situational leadership theory: leaders should adapt to situation

Situational leadership theory is another contingency theory stating that leaders are effective when they select the right leadership style for the situation they face. The situation depends on two major attributes of followers:

  • task behaviour - knowledge and skills followers have for specific task, or how much guidance they need, and
  • relationship behaviour - willingness of followers to work without taking directions from others, or their need for emotional support.

Yes, these are the same values that every effective leader has, but now these values are applied to followers instead. We can draw almost the same diagram as before, except that access will say how much directive or supportive behaviour followers need from the leader:

Situational Leadership
Situational Leadership. Best leadership style depends on how much support or directions followers need. (Picture adapted from robertjrgraham.com)

As you can see from this diagram, scientists identify four different situations depending on behaviour of followers:

  • High directive and low supportive (S1): in situations where followers need a lot of directions, but don't need support, a directing leader, that simply directs his/her followers,  is best;
  • High directive and high supportive (S2): in situations where followers need both directions and support a coaching leader works best. In this case leader needs to direct, but in a selling style, so that followers are talked into following the directions;
  • High supportive, but low directive (S3): when followers do not need directions, but need a lot of support, supporting leader does the job. Followers have already good expertise in what they are doing and leader just needs to motivate them to do the job;
  • Low supportive and low directive (S4): in cases when followers do have expertise and motivation to do the job, a delegating leader style is best. Instead of giving orders, leaders should delegate tasks and do monitoring tasks.

In summary, situational leadership theory states that leaders should identify the situation, choose the right management style, and implement it.

Develop the leader inside you

Good news is anyone can improve her/his leadership skills! In fact there is a definition for systematic process of training people to expand their leadership capacity. It's called leadership development. Most of the companies focus their efforts on the following three major areas:

  • Developing social interaction between people and close ties within organisation;
  • Developing trusting relationships between individuals;
  • Developing common values and shared visions with others.

The main focus here is the development of emotional intelligence. The following are the most widely used leadership development techniques:

  • 360-degree feedback is the process that nearly all companies from Fortune 500 rely on this technique. The idea is to collect feedback from multiple sources around you: your subordinates, peers and supervisors. During this process leaders can get the idea what others thing about them. The problem with this technique is that collecting feedback and taking appropriate action are two different things. Many people, when encounter negative feedback, defend psychologically by dismissing it or simply ignoring it.
  • Networking technique intends to help leaders to not get too isolated from other departments. Specifically, it is targeted to help leaders learn who should they ask for information when they need to solve problems. Also peer relationships promote cooperation.
  • Executive coaching is a method for improving leader's performance. Usually includes assessment of a leader's strengths and weaknesses and a plan for improvement. This method usually follows these steps: define what will be done and how, assess individual performance (e.g. by using 360-degree feedback), customise plan with consulting the leader's immediate supervisor, implement the plan. Such coaching can be done either for groups or for individuals. It was found that combination of these two increase leaders' productivity by 88 percent.
  • Mentoring is a method when leaders receive mentoring from more experienced colleagues (called mentors).

This is it, folks! If you managed to read up till here, you have a knowledge of the entire section of the book! If you find this material engaging, I recommend you to read this book. I also believe that every of us should seek to improve our leadership skills, as with these we will have a more successful careers and better relationships between colleagues and friends!

Split and Concatenate Videos with FFmpeg: It's Trivial!

The main idea of my future amazing system is to split video file into pieces, send them to workers in order to re-encode these pieces and then concatenate them back into a single video file. What tool will come to your mind for completing such task? For me it's FFmpeg. It's an astonishing tool for decoding, encoding, resizing and performing other manipulations on video files. You may cut and concatenate videos as well! How cool is that?

The ffmpeg that comes with Ubuntu is actually avconv. Since I wanted the true version of FFmpeg, I've first downloaded the source code:

git clone git://source.ffmpeg.org/ffmpeg.git ffmpeg

Then I've installed couple of dev pacakges in order to enable couple of FFmpeg features:

sudo apt-get install yasm libfaac-dev libfaad-dev libx264-dev 
    libxvidcore-dev libmp3lame-dev libtheora-dev libopenjpeg-dev

Later I've enabled all these features and enabled debug information through configure script:

./configure --enable-shared --enable-gpl --enable-nonfree 
    --enable-libfaac --enable-libx264 --enable-libxvid 
    --enable-libmp3lame --enable-libtheora --enable-libopenjpeg 
    --disable-stripping --enable-debug=3 --extra-cflags="-gstabs+" 
    --disable-optimizations

Finally, I've made the last step in order to build everything:

make

According Y. Sambe et al. work "High-speed distributed video transcoding for multiple rates and formats", a good result can be achieved when you split the video in between the GOPs. This makes sense, since every GOP should start with an i-frame (the frame that contains all the information, not just differences between frames). But in video files Decode Time Stamp (DTS) and Playback Time Stamp (PTS) may differ which may introduce some problems. Authors state that this may lead into a situation where despite i-frame being first in the GOP it may not be the one that will be played first. They call such GOP an Open-GOP. To me it seems a bit strange. I haven't confirmed such thing yet, but it doesn't make sense to play an i-frame after the b-frame. Authors continues that because of existence of Open-GOP and several other reasons, it is good to split videos in such a way that every piece (except the first one) would have one additional GOP in the beginning (which is the last GOP of the previous piece). They did some tests and somehow showed that it does have a slight effect on the resulting video quality after transcoding process.

For testing purposes, let's try splitting videos so that every piece would contain complete GOPs. For this, we need to know how big the GOP of the video is. There is a tool called ffprobe that shows various information about streams in a video container, but to my disappointment this tool cannot show the GOP size. In order to make it show this information, I needed to add a single line of code to ffprobe.c:

static void show_stream(WriterContext *w, AVFormatContext *fmt_ctx, int stream_idx)
{
    ...
        case AVMEDIA_TYPE_VIDEO:
            print_int("width",        dec_ctx->width);
            print_int("height",       dec_ctx->height);
            print_int("has_b_frames", dec_ctx->has_b_frames);
            print_int("gop_size",     dec_ctx->gop_size); // A single line is all I need...

After recompiling and then launching ffprobe, I've learned the details about my video clip:

Duration: 00:01:00.00
Video: h264 (High) (avc1 / 0x31637661), yuv420p, 1920x818, 1239 kb/s, 24 fps
Audio: aac (mp4a / 0x6134706D), 44100 Hz, stereo, fltp, 127 kb/s
 
[STREAM]
index=0
codec_name=h264
codec_long_name=H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10
profile=High
codec_type=video
codec_time_base=1/48
codec_tag_string=avc1
width=1920
height=818
has_b_frames=0
gop_size=12
...
[/STREAM]

Good, so now I know that there should be exactly two i-frames per second. This means that it should be possible to nicely split video into pieces of 2 seconds length. In order to test this little theory, I wrote a small python script that would generate me an ffmpeg command for splitting the video:

import sys
 
if __name__=="__main__":
	s="ffmpeg -i video.mp4 n"
	for i in range(0,60,2):
		s+="-vcodec copy -acodec copy -ss 00:00:"+str(i).zfill(2)
		s+=" -t 00:00:02 out"+str(i)+".mp4 "
		if i

And also a script that concatenates these pieces back into a single video file:

import os
 
if __name__=="__main__":
	f=open("list.tmp","w")
	for i in range(0,60,2):
		f.write("file 'out"+str(i)+".mp4'n")
	f.close()
 
	os.system("ffmpeg -f concat -i list.tmp -c copy joined.mp4")
	os.remove("list.tmp")

I've uploaded the resulting video (you can also see an original video) to YouTube:

As you can see, this method of splitting and concatenating greatly reduces the quality at splitting points (every 2 seconds). There is no degradation of quality in audio, despite this, such level of quality is unacceptable for production use.

In fact, after the video was split I was expecting that the duration of video file pieces would be exactly 2 seconds. Instead it turned out to be like this:

for e in $(ls out*.mp4 | sort -V); do echo -n $e; ffprobe $e 2>&1 | grep Duration; done;
out0.mp4  Duration: 00:00:02.02, start: 0.000000, bitrate: 1250 kb/s
out2.mp4  Duration: 00:00:02.00, start: 0.020000, bitrate: 1838 kb/s
out4.mp4  Duration: 00:00:02.00, start: 0.017007, bitrate: 1871 kb/s
out6.mp4  Duration: 00:00:02.00, start: 0.012993, bitrate: 1179 kb/s
out8.mp4  Duration: 00:00:02.00, start: 0.010000, bitrate: 1719 kb/s
out10.mp4  Duration: 00:00:02.00, start: 0.008005, bitrate: 1217 kb/s
out12.mp4  Duration: 00:00:02.00, start: 0.005011, bitrate: 1336 kb/s
out14.mp4  Duration: 00:00:02.02, start: 0.000998, bitrate: 1329 kb/s
out16.mp4  Duration: 00:00:02.00, start: 0.020998, bitrate: 1366 kb/s
out18.mp4  Duration: 00:00:02.00, start: 0.019002, bitrate: 1421 kb/s
out20.mp4  Duration: 00:00:02.00, start: 0.016009, bitrate: 1136 kb/s
out22.mp4  Duration: 00:00:02.00, start: 0.011995, bitrate: 418 kb/s
out24.mp4  Duration: 00:00:02.00, start: 0.010000, bitrate: 411 kb/s
out26.mp4  Duration: 00:00:02.00, start: 0.007007, bitrate: 486 kb/s
out28.mp4  Duration: 00:00:02.00, start: 0.002993, bitrate: 598 kb/s
out30.mp4  Duration: 00:00:02.02, start: 0.000000, bitrate: 649 kb/s
out32.mp4  Duration: 00:00:02.00, start: 0.020000, bitrate: 776 kb/s
out34.mp4  Duration: 00:00:02.00, start: 0.018005, bitrate: 331 kb/s
out36.mp4  Duration: 00:00:02.00, start: 0.015011, bitrate: 322 kb/s
out38.mp4  Duration: 00:00:02.00, start: 0.010000, bitrate: 281 kb/s
out40.mp4  Duration: 00:00:02.00, start: 0.008005, bitrate: 137 kb/s
out42.mp4  Duration: 00:00:02.00, start: 0.005011, bitrate: 196 kb/s
out44.mp4  Duration: 00:00:02.02, start: 0.000998, bitrate: 350 kb/s
out46.mp4  Duration: 00:00:02.00, start: 0.020998, bitrate: 455 kb/s
out48.mp4  Duration: 00:00:02.00, start: 0.019002, bitrate: 1176 kb/s
out50.mp4  Duration: 00:00:02.00, start: 0.016009, bitrate: 1230 kb/s
out52.mp4  Duration: 00:00:02.00, start: 0.011995, bitrate: 817 kb/s
out54.mp4  Duration: 00:00:02.00, start: 0.010000, bitrate: 744 kb/s
out56.mp4  Duration: 00:00:02.00, start: 0.007007, bitrate: 729 kb/s
out58.mp4  Duration: 00:00:02.00, start: 0.002993, bitrate: 414 kb/s

This sounds fishy, doesn't it?

Björn recommended me to use libavcodec library directly instead of using ffmpeg. This sounded like a solution, so I spent a couple of days reading libavcodec code. But what I've found out is not very pleasing.

There is a function in libavcodec called av_seek_frame(). However, it is not very reliable. First, you cannot specify a frame number where you want to jump to. Moreover, according to a blog post Picture Go Back, it is not possible to reliably jump to a frame you want:

I repeatedly tried to seek forward and backwards to different frames -- frame 5000, 10,000, and 15,000 in divx, avi, and other video formats. Each time, the resulting location is close, but not exact. FFmpeg thinks it knows the frame number after seeking, but usually it is off. Frankly, when I want to jump to frame 5000, I want to be at frame 5000 and not 5015, 4079, or some other nearby frame.

So, I've just thought that maybe I can just scan the file without decoding it and check where are the beginnings of GOPs. However, I did not find any field that could provide this kind of information, but since all GOPs should start with an i-frame, I may try to just cut before each i-frame. However, I have to decode a frame in order to learn if it's an i-frame or not, and I don't really want to do that. And I really don't want to develop my own tool, because it will not stand a chance against ffmpeg in terms of supported formats, even if I use libavcodec.

And my research continues... Now, I'm thinking to look into VLC, see if it can cut accurately and if so, see if it is possible to use it as a library. Another option is to actually try to implement a new option in ffmpeg that would perform video copying, but will split video file nicely into pieces so that it would be possible to playback smoothly after joining these pieces back into a single video file.

Edit: I have to mention that I've found another way how to split videos using ffmpeg:

ffmpeg -i movie.mp4 -f segment -c copy -segment_time 120 -map 0 out%03d.mp4

After splitting a video using this method and joining the pieces back together, artefacts are still created in between split points. Slight time spaces appears between each segment and the total length gets increased as well. At least no frames are dropped which means that there is probably a slight bug somewhere. It may be a good idea to report this to FFmpeg community and see what are they thinking.

To visualise the behaviour, I have performed a simple test, I've end up with a video that is 00:00:08.05 length (as reported by ffprobe. It also reported errors with STTS twice), however it actually contains around 1 minute video. What I did then is:

ffmpeg -i joined.mp4 -c copy fixed.mp4

Then ffprobe reported that a duration of a file is 00:01:07.40 (still reported STTS error once). Here is the resulting video:

A Massive Choice of Technology

Once you have decided what you want to do for your distributed systems project, you have a broad selection of tools out there that may or may not help you. Actually, there are so many, that after wasting several hours just by looking through them, you can get a headache. This is what happened to me, so if you are looking into it, let me reduce your burden by summarizing my thoughts. There are couple of approaches for developing a distributed system. First, you can use an existing framework or platform (such as the famous Apache Hadoop) for managing a big portion of work for you. In fact, that would be a preferable approach, it would help to avoid bugs and reduce development time. There are number of such frameworks to choose from:

  • Apache Hadoop is the blockbusting project that contains distributed file-system and map-reduce like distributed computation model. It's written in Java and therefore requires your code to be in Java too (I suppose, Scala would fit too). However, it is still possible to code in different languages once you use Hadoop Streaming API. Hadoop provides a map-reduce programming model, where the data is first devided into groups and assigned to workers and the later collected and "reduced".
  • Disco project is a Hadoop MapReduce alternative developed by Nokia Research Center. It is written in Erlang, but users usually write algorithms for it in Python.
  • Spring Batch is part of a Spring project and used to distributed the workload across computers. It seems that it fits whenever you want to use Java EE and split the work according to master-workers programming model.
  • Gearman yet another framework for developing distributed systems. May be worth to take a look.

Keep in mind that this list is very short (or maybe too short). There is a lot of research in this field, e.g. a lot of researchers try to escape map-reduce paradigm and write systems that improve performance for more difficult computational tasks. Such systems tend to use directed acyclic graphs or so. Examples of these systems would be Dryad or Spark. If you do not want to use the framework, or maybe the framework does not fit for the task you want to solve, you may build your own architecture. For this it may be a good idea to use an actor model or some kind of message passing library. A couple examples of actor model frameworks:

  • Akka the most famous one and is for JVM;
  • Pykka for ones using Python;
  • Theron for ones using C++.

Message passing libraries:

If nothing touches your heart then at least you may want some tools for building your network protocols. Tools that could help serialize and de-serialize objects you send over network, such as:

  • Protobuf has support for Java, C++ and Python and a very good documentation;
  • Apache Thrift supports many languages (including Python, Ruby, Java, C++), however does not have as good documentation as protobuf has.

Up until now, I don't really know what I will be using for my project, however my heart falls down to custom protocol thingie! :)

Master Thesis: The Kick-start

Introduction

I have accepted offer from Screen9 and started doing my thesis project called "Distributed Video Transcoding". I am thinking to add words to this title like "fault-tolerant", "scalable" or "service" (my intentions are to develop a system that will act as a service, as opposed to start → transcode → shut-down). I've just started the day before yesterday, but I have some interesting findings already:

  • There is a commercial solution (zencoder) already on-air and makes money out of video transcoding. It means that I won't be doing any block busting with my work. I should probably open my own firm at the end and sell such service cheaper, shouldn't I? :)
  • An open source solution (codem, written in node.js and Ruby on Rails) does something similar to what I want. It uses master-worker scheme, but from the first look it seems that it does not perform any video splitting. This means that it does not take advantage of multiple processors when transcoding a single big video file. However, I may use or extend their Transcoder module. I have to think about it.
  • Tewodros Deneke has written a Master Thesis called "Scalable Distributed Video Transcoding Architecture". Very close to what I want to do. Does video splitting and uses MPI for communication between nodes. Definitely a good read for me before touching any code. It misses the keyword "fault-tolerant", this one of the few places where my star may shine.

Scaling The Video Transcoding Process

Transcoding is a very computationally heavy task, e.g. on my machine (Intel Core 2 Duo P8600 2.4GHz, 3072 KB cache) transcoding a video file (h264 to MPEG-4, AAC to MP3, resolution: 1920x818, duration: 00:14:47.99, bitrate: 3075 kb/s) took 9 minutes and 45 seconds. Wouldn't it be nice to split this job and run it on couple of machines thus reducing overall transcoding time? Yeah, however there are some problems with it.

MPEG-2 Hierarchy of Layers
MPEG-2 Hierarchy of Layers

Firstly, it is not quite clear, how to split the file. To show you why, I have to very briefly introduce you to video compression. Video is just a sequence of images (or frames, usually 25 frames/sec). To compress such video, regular image compression techniques are used (spatial redundancy). Moreover, many frames are very alike. Instead of compressing every image, it is far more effective to compress one (key frame) and then calculate differences between following frames and this key frame. It is called that this technique utilizes temporal redundancy.

MPEG-2 codec uses such techniques and this is why it is organized in several layers (visualised in the figure above, more on this see A Beginners Guide for MPEG-2 Standard). Choosing the block layer for splitting would produce the best granularity, i.e. it would be easy to split the file into very small pieces. However, this layer has a very high dependency on other blocks, so calculating the exact value of a block is not as easy and requires additional computational resources. This makes group of pictures layer the best choice, since this layer has none dependencies on other group of pictures layers. The question that I have not answered yet, how nicely it is possible to cut videos without changing the encoding. Will it add (almost) the same headers to each of the chunk?

Another way of distributing the work is to decode first and then split. This will clearly be burden for the network but is it possible to choose the most optimal chunk size in this case?

We should also not forget about overhead that's created when splitting and merging a video file.

Next, load balancing and scheduling. Different slaves may have different computational capacity, may fail or may start performing slowly due to other interferences. If a slave is slower or performs slower then its peers, it would not be reasonable to assign the same amount of work as to its peers since it will increase overall transcoding time. If the slave fails, its uncompleted tasks must be rescheduled on other slaves. All of this must be taken in mind while designing such system.

Designing The Distributed System

From the data's point of view, once we split video into pieces, where one piece contains a whole Group of Pictures layer, this task becomes embarrassingly parallel. It makes life easier, since we can make a simple master-worker relation, where master schedules the work and workers process them. The figure below visualises possible architecture (T. Deneke "Scalable Distributed Video Transcoding Architecture").

Possible Architecture
Possible architecture of distributed transcoder

In this figure you can see some core components:

  • Splitter splits the stream in a well balanced fashion. It should take some metrics from input stream and optimizer in order to make a good decision.
  • Optimizer collects data from both input and output stream and tries to optimize in terms of system latency, start-up time, etc.
  • Merger simply merges several transcoded streams into a single stream.

The only thing that's missing here is fault-tolerance. Making workers fault-tolerant is easy, a task restarting mechanism should be introduced in scheduler, i.e. once a worker fails, scheduler has to reschedule the task for another worker. Making scheduler fault-tolerant is another story. The thing that comes to my mind is to replicate scheduler's state on other machines, so that once a scheduler fails another could replace it. However, this may introduce some consistency issues, i.e. a former scheduler may fail before synchronizing it's state with replicas. It is possible to block message passing and wait till scheduler synchronizes the state with the rest of the replicas, however this may introduce an overall system lag.

Fault-tolerance issue needs to be taken more serious and a proper solution must be found. The only word that comes to my mind now is ZooKeeper, a tool that would help to solve some issues such as leader election, group membership, etc.

More on Load Balancing

T. Deneke in his work (Scalable Distributed Video Transcoding Architecture") categorizes load balancing solutions into static and dynamic. Static load balancing takes into account computation capabilities (e.g. CPU clock speed) of every worker and schedules tasks according to those capabilities. It is not difficult to implement, however T. Deneke states that it is often not efficient as resources could be shared with other tasks. For this he suggests dynamic load balancing and describes couple of ways to do it:

  • Acknowledgement  based approach sends a task to a worker whenever he receives an acknowledgement from a worker that the previous task has finished.
  • Control theory based approaches could use adaptive prediction, send a tasks to workers and then update prediction according to the results.
  • Machine learning based approaches are pretty complex and I don't see the need to see this for such task.

In my opinion the best option would be a simple producer-consumer model. A video should be divided to a small, but reasonable pieces and put to the queue from which workers could take an item and process it one at a time. This is a fairly simple solution, does not require any difficult algorithm (so it is easier to maintain such algorithm) and should be pretty effective.

Dryad & DryadLINQ: Short Story

Recently, I've done a presentation during one of classes in KTH. I've presented about DryadLINQ and I think it would be a nice idea to share the knowledge with a broader audience.

Dryad is a Microsoft's answer to nowadays very popular Hadoop MapReduce. It is designed to help programmers code data-parallel programs that scale on thousands of machines. The biggest difference between those two systems is that Dryad let's a programmer to build one's own execution graph and in MapReduce you are tied to map and reduce.

Dryad job, as I mentioned earlier, is an acyclic graph (made of vertices and edges, of course). All vertices in this graph are programs (or pieces of code) that will be running on machines. Edges are data channels which are used to transfer data between those programs. So, Dryad job can be seen as a program, and vertex - as a single execution unit of that program.

In Dryad a machine that is responsible for coordinating execution of jobs is called Job Manager (JM). It:

  • instantiates a job's data-flow graph,
  • schedules processes (vertices) on cluster of computers,
  • provides fault-tolerance mechanisms by rescheduling failed or slow-executing jobs
  • monitors the execution and collects statistics

The figure that represents Dryad architecture can be seen in slide 4. You are already introduced to the Job Manager. Name Server (NS) controls the group membership of PD nodes. The PD node is an execution daemon that executes actual vertices. They all communicate between themselves through a data plane. Here come an interesting part. In Dryad, vertices can pass the data in several ways:

  • Through files. They can store files locally so that other vertex that will be executed on the same node could read that file.
  • Through TCP. PD transfers data to other executing vertex on other PD.
  • Through in-memory data structure. If the data fits in memory, why not to store it there, so that other vertex could access it in a faster manner?

OK, so this sounds nice on a paper. However, Dryad (just like MapReduce) runs into the problem of complexity. A programmer has to build one's own DAG (directed acyclic graph, or Dryad job) and code it. This may sound simple, but it isn't. Since programmers already know SQL pretty well, there are a number of solutions for MapReduce that implement SQL-like syntax on top of MapReduce. Several examples are Hive and Pig. They're main goal is to make programmer's life easy and let them code in simple SQL. The system then will be responsible for executing this query efficiently on a cluster of machines. Nevertheless, pure SQL has several issues, that is, it does not have custom types (like structs), no common programming patterns (iterations, conditional clauses), and so on. Microsoft had already made a solution to these problems: LINQ. So, why not to make LINQ for Dryad? This would definitely have a couple of advantages, since LINQ:

  • is already integrated in C#, you don't have to write SQL parsers;
  • can provide SQL-like syntax, it's simple;
  • has common programming patterns, such as iteration;
  • can provide custom types (.NET objects);
  • has strong Visual Studio support.

Indeed, this is what Microsoft guys did: DryadLINQ. This system compiles LINQ query into execution plan and runs it on Dryad. But before jumping into details, go and see slides 9 and 10. You will see what changes are needed for LINQ in order to transform it to DryadLINQ. GetTable and ToDryadTable are essential function calls in DryadLINQ. The former shows where Dryad should look for input and of what .NET type that input should look like. The latter shows where should the output be stored and actually starts the execution. Now let's talk about DryadLINQ execution. When ToDryadTable is initialized DryadLINQ takes LINQ expression and compiles it to Dryad execution plan, i.e. decomposes expressions to sub-expressions, generates code and static data for the Dryad vertices, generates object serialization code. Then DryadLINQ passes this plan to Dryad Job Manager (they are using a custom Job Manger, actually). It transforms execution plan to a Job Graph and schedules vertices execution on PDs. When vertices are executed they take all needed input from Input Tables (data store that was provided with a call to GetTable). When execution is over, the output is then written to Output Tables (data store that was provided with a call to ToDryadTable). DryadLINQ then wraps this Output Table and provides to application an iterator from which an application can get regular .NET objects. DryadLINQ also does some optimizations. The most important static optimizations are:

  • Pipelining. When multiple operators may be executed in a single process it is better to executed in pipeline.
  • Removing redundancy. Removal of unnecessary hash or range partitioning steps.
  • Eager Aggregation. If it's needed to aggregate, it is better to aggregate part of the data before sending it through network.
  • I/O reduction.When data is small and fits in memory, it is better to pass the data to other vertex through in-memory FIFO channels.

The system also provides dynamic optimizations. These are run-time optimizations that depends on the size of the data. This includes aggregation tree generation, data partitioning. And that's it, folks! This is a brief introduction to DryadLINQ. Now should you jump into it and start using it? Well, ZDNet announced that Microsoft will no longer offer Dryad services in their cloud services. And instead it will be using Hadoop. I would reason that they did this choice because Hadoop is much more popular and it's much easier to attract clients when offering Hadoop. The rest is up to you!

Node.js: 2 Fast or not 2 Fast?

Node.js you say, huh? Old news? Well, it gains more and more popularity nowadays. I watched a presentation about node.js and there it was said that a Web Server written in Node.js outperforms regular Web Servers. For me it sounded like "challenge accepted"!

What do I have in mind? I thought of doing a simple test. Why not to write a simple Web Server with node.js that does some simple calculations and compare it to a regular Web Server, let's say, Java Servlet Container? This is exactly what I did. I did a Pi calculations node.js and these Serlvet Containers: Apache Tomcat 6, Jetty 7.6 and Resin 4.0.

Here is the code for node.js:

var http = require('http');
 
var server = http.createServer(function (req, res) {
    var Pi = 0;
    var n = 1;
    for (i = 0; i <= 1000; i++) {
        Pi = Pi + (4 / n) - (4 / (n + 2));
        n = n + 4;
    }
    res.writeHead(200, {
        'Content-Type': 'text/plain'
    });
    res.write("" + Pi + "n");
    res.end();
});
 
server.listen(8081);
console.log("Server is listening on port 8081");

Here is the code of Java Serlvet:

public class PiServlet extends HttpServlet {
    protected void processRequest(
            HttpServletRequest request,
            HttpServletResponse response)
            throws ServletException, IOException {
        response.setContentType("text/plain;charset=UTF-8");
        PrintWriter out = response.getWriter();
        try {
            double Pi = 0;
            int n = 1;
            for (int i = 0; i <= 1000; i++) {
                Pi += (4d / n) - (4d / (n + 2));
                n += 4;
            }
            out.println(Pi);
        } finally {    
            out.close();
        }
    }
 
    @Override
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response)
            throws ServletException, IOException {
        processRequest(request, response);
    }
 
    @Override
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response)
            throws ServletException, IOException {
        processRequest(request, response);
    }
}

And finally, results:

The results are really interesting. Resin outperforms any HTTP server. But the most fun part is that without warming up every Serlvet Container performed at least twice slower and node.js always showed the same result. Serlvet container has to be configured (Apache Tomcat and Jetty was set to have 2 acceptors and 500 threads, resin was set to default settings) and I didn't touch a single configuration option of node.js. Of course, Serlvet gives a lot of tools to make a more decent web application where node.js gives you basic functionality. Anyway, GO GO Node.js!

My personal experience with node.js is quite good. It's API could be used as an example how to design solid APIs. And it's easy and fun to use. Really good work. However, documentation needs a lot of work.

Using ZooKeeper

Julia and I made a little project using ZooKeeper for one of our classes. We did some stress tests to see how ZooKeeper can handle with loads and it turns out that ZooKeeper can handle quite a lot of load. But the thing I want to accent is one of our little libraries that we coded. The library empowers a user to do a reliable multicast. It ensures that:

  • Only current participants will receive a message
  • If a new participant joins the view while a message is being multicasted, he will receive only a next message
  • If a participant fails before receiving a message, he will not receive it later.

And in this library we had a chance to implement these distributed computing primitves:

  • Group membership
  • Leader election

You can see the full code in Julia's wiki. And don't forget to put an eye on the presentation!

JGAP Teaches You How to Break Coding Conventions

Recently, I had a chance to try JGAP library. This library is a Java framework for creating Genetic Algorithms. I wanted to try it since I saw an evolutionchamber project. And because I have to make a heuristics algorithm for a university project, I decided that this is a perfect moment!

Sadly, the more I dig deeper into the library, the more disappointed I become. Here are several reasons why:

  1. After reading code a bit I started to wonder why version control systems were invented? We can use comments instead! See how it should be done:
      public void addChromosomes(final Population a_population) {
        if (a_population != null) {
          synchronized (m_chromosomes) {
            m_chromosomes.addAll(a_population.getChromosomes());
          }
          // The following would do the same:
    //      if (a_population.getChromosomes() != null) {
    //        int size = a_population.getChromosomes().size();
    //        for (int i = 0; i &gt; size; i++) {
    //          IChromosome chrom = a_population.getChromosome(i);
    //          m_chromosomes.add(chrom);
    //        }
    //      }
          setChanged(true);
        }
      }
  2. The library adds an opportunity to easily implement your own GeneticOperators, but if you want to support all the features other GeneticOperators have, you have to repeat the same code again and again. Couldn't we use something smarter like Method Template pattern? See an example from CrossingOverOperator:
    public void operate(final Population a_population,
                          final List a_candidateChromosomes) {
          ...
          if (constraint != null) {
            List v = new Vector();
            v.add(chrom1);
            v.add(chrom2);
            if (!constraint.isValid(a_population, v, this)) {
              // Constraint forbids crossing over.
              // ---------------------------------
              continue;
            }
          }
          ...
          if (m_monitorActive) {
            ...
          }
          ...
    }
  3. There are different genetic operators in this library: NaturalSelectors and GeneticOperators. They share very similar functionality (both manipulate current population), but still are different classes. And in the evolution process can be more than one NaturalSelector and GeneticOperator. Here comes the fun part. When adding a NaturalSelector you have to specify a boolean flag to specify whether to run it before or after GeneticOperator. Why couldn't it be a single Collection for both NaturalSelectors and GeneticOperators if they do the same thing (manipulate a population)? Why do we need three of them?
  4. What is more, NaturalSelector implements INaturalSelector (INaturalSelector? In Java?). So, when you want to create your own NaturalSelector guess what happens if you implement an interface and not extend a class. Yep, the standard Breeder, which is in the library, accepts only NaturalSelector and not INaturalSelector.
  5. Breaking Java coding conventions is very awesome! Then you can add I to the beginning of an interface name and do all kind of fancy stuff! This is what they do in order to simply clone an object:
    • Object must implement IClonable, which is the same as standard Java Clonable and has only one public method: clone().
    • Then you have to call a factory method in order to retrieve a cloner (there is only one cloner in the library).
    • Afterwards, you call a cloner method and pass an object you want to clone.
    • Your clone is done!
    • Seems... reasonable, but... The cloner just simply calls a clone() method! Huh?

So, what I am going to try next is what was suggested in stackoferflow post: Jenes.

ZooKeeper: The Amazing Overseer of Distributed Animals

Recently, I've read the paper ZooKeeper: Wait-free coordination for Internet-scale systems by P. Hunt et al. and I was so excited about it that I wanted to share the knowledge I've gained from it. This is the best source if you want to get introduced to what is ZooKeeper (sadly, even better than the ZooKeeper website).

ZooKeeper is a coordination kernel that enables new primitives without requiring changes to the service core. This system provides to its clients the abstraction of a set of data nodes (znodes), organized according to a hierarchical name space. The znodes in this hierarchy are data objects that clients manipulate through the ZooKeeper API. Basically, ZooKeeper provides an in memory (RAM) storage that is very similar to file system and is replicated in all servers from ZooKeeper ensemble and this "file system" has a linearizable order of writes. There are two types of znodes: regular and ephemeral. The difference between those two is that ephemeral znode gets deleted when the session with a client that created it gets terminated (usually by some kind of failure). Of course, it can be deleted by the client explicitly too. Another interesting feature of ZooKeeper API is that it allows watch procedure, that is, any client can watch any znode and this means that whenever znode is changed ZooKeeper informs a client watching it. So client do not need to do explicit polling or any other nasty techniques in order to track changes. And these two features are very powerful and are the core features that allows implementing distributed computing primitives such as:

  • Distributed Configuration Management
  • Rendezvous
  • Group Membership
  • Simple Locks (and locks without Herd Effect)
  • Read/Write Locks
  • Double Barrier
  • Leader Election

All the details on how to implement those using ZooKeeper are provided in ZooKeper Recipes or in the paper. So go read the paper!

And I can wait till I can play with ZooKeeper!

Smith-Waterman Algorithm Parallelization

The previous semester in UPC we had to do a project which is parallelization of Smith-Waterman Algorithm. We have implemented two versions: one used only blocking at matrix level, another one used both blocking and interleaving (blocking means that the process waits for the value from previous process before starting its own calculations, interleaving means that after processes finishes a portion it will do another portion of calculations). Later we compared their performance in a multi-core environment.

This project were interesting and I've finally learned how to use MPI for parallelization. I found that MPI is pretty easy and powerfull tool for speeding up your application while spreading workload on several processes.

Here is our report:

Download (PDF, 20B)