caches-queues-lists: add a specialised "sequential queue" implementation

Add a specialised "sequential queue" implementation.  This rather
than outputing the data in the order pushed to the queue, outputs
it in the order specified in the "sequence" field in the pushed
entries.  Also only output data if the queue holds the next
sequential number to the previous output buffer.

This ensures that buffers pushed to the queue out of sequence
are delivered in the correct sequential order.

This queue is necessary because the queue collects data from
the deflate threads, and also (in the future and originally a couple
of releases ago) directly from the reader thread.   Depending on the
necessary time to compress the buffers, the deflate threads can
deliver buffers out of sequence, for instance deflate_thread_1 takes
buffer_1, and deflate_thread_2 takes buffer_2, if buffer_2 takes
less time to compress, this will be delivered before buffer_1 by the
deflate threads.  Previously(*) (and in the future) uncompressed fragment
buffers were sent directly to the main thread, rather than going via
the deflate threads.  Due to this a fragment block (representing block n)
could arrive before buffers n-1, n-2 etc because they have gone via the
deflate threads and taken time to compress.

(*) this was changed to queue the fragment buffers via the
deflate threads.  This was done for two reasons:

1. it was noticed queueing fragment blocks directly to the main thread
   resulted in a significant number of out of order blocks, and a lot
   of unnecessary wake-ups of the main thread.  Queuing via the
   deflate threads eliminated that source of out of order blocks, but with
   the consequence that the deflate_threads got more wake ups.  But
   queueing via the deflate threads at least enabled the next reason to
   take place.

2. Queueing via the deflate threads enabled the deflate threads to do
   sparse checking (all zero) on the fragment blocks, and to "promote"
   them to sparse blocks if they were all zero.

But for a long time a better solution was required, and this is it ...

Signed-off-by: Phillip Lougher <phillip@squashfs.org.uk>
diff --git a/squashfs-tools/caches-queues-lists.c b/squashfs-tools/caches-queues-lists.c
index 7e79496..6f07aba 100644
--- a/squashfs-tools/caches-queues-lists.c
+++ b/squashfs-tools/caches-queues-lists.c
@@ -114,7 +114,98 @@
 }
 
 
-/* * define cache hash tables */
+/* define seq queue hash tables */
+#define CALCULATE_SEQ_HASH(N) ((N) & 0xffff)
+
+/* Called with the seq queue mutex held */
+INSERT_HASH_TABLE(seq, struct seq_queue, CALCULATE_SEQ_HASH, sequence)
+
+/* Called with the cache mutex held */
+REMOVE_HASH_TABLE(seq, struct seq_queue, CALCULATE_SEQ_HASH, sequence);
+
+static unsigned int sequence = 0;
+
+
+struct seq_queue *seq_queue_init()
+{
+	struct seq_queue *queue = malloc(sizeof(struct seq_queue));
+	if(queue == NULL)
+		MEM_ERROR();
+
+	memset(queue, 0, sizeof(struct seq_queue));
+
+	pthread_mutex_init(&queue->mutex, NULL);
+	pthread_cond_init(&queue->wait, NULL);
+
+	return queue;
+}
+
+
+void seq_queue_put(struct seq_queue *queue, struct file_buffer *entry)
+{
+	pthread_cleanup_push((void *) pthread_mutex_unlock, &queue->mutex);
+	pthread_mutex_lock(&queue->mutex);
+
+	insert_seq_hash_table(queue, entry);
+
+	if(entry->fragment)
+		queue->fragment_count ++;
+	else
+		queue->block_count ++;
+
+	if(entry->sequence == sequence)
+		pthread_cond_signal(&queue->wait);
+
+	pthread_cleanup_pop(1);
+}
+
+
+struct file_buffer *seq_queue_get(struct seq_queue *queue)
+{
+	/*
+	 * Look-up buffer matching sequence in the queue, if found return
+	 * it, otherwise wait until it arrives
+	 */
+	int hash = CALCULATE_SEQ_HASH(sequence);
+	struct file_buffer *entry;
+
+	pthread_cleanup_push((void *) pthread_mutex_unlock, &queue->mutex);
+	pthread_mutex_lock(&queue->mutex);
+
+	while(1) {
+		for(entry = queue->hash_table[hash]; entry;
+						entry = entry->hash_next)
+			if(entry->sequence == sequence)
+				break;
+
+		if(entry) {
+			/*
+			 * found the buffer in the queue, decrement the
+			 * appropriate count, and remove from hash list
+			 */
+			if(entry->fragment)
+				queue->fragment_count --;
+			else
+				queue->block_count --;
+
+			remove_seq_hash_table(queue, entry);
+
+			sequence ++;
+
+			break;
+		}
+
+		/* entry not found, wait for it to arrive */	
+		pthread_cond_wait(&queue->wait, &queue->mutex);
+	}
+
+	pthread_cleanup_pop(1);
+
+	return entry;
+}
+
+
+/* define cache hash tables */
 #define CALCULATE_CACHE_HASH(N) (llabs(N) & 0xffff)
 
 /* Called with the cache mutex held */
diff --git a/squashfs-tools/caches-queues-lists.h b/squashfs-tools/caches-queues-lists.h
index 899d2d3..b959bce 100644
--- a/squashfs-tools/caches-queues-lists.h
+++ b/squashfs-tools/caches-queues-lists.h
@@ -56,6 +56,19 @@
 };
 
 
+/*
+ * struct describing seq_queues used to pass data between the read
+ * thread and the deflate and main threads
+ */
+struct seq_queue {
+	int			fragment_count;
+	int			block_count;
+	struct file_buffer	*hash_table[65536];
+	pthread_mutex_t		mutex;
+	pthread_cond_t		wait;
+};
+
+
 /* Cache status struct.  Caches are used to keep
   track of memory buffers passed between different threads */
 struct cache {
@@ -146,6 +159,9 @@
 extern void queue_put(struct queue *, void *);
 extern void *queue_get(struct queue *);
 extern void dump_queue(struct queue *);
+extern struct seq_queue *seq_queue_init();
+extern void seq_queue_put(struct seq_queue *, struct file_buffer *);
+extern struct file_buffer *seq_queue_get(struct seq_queue *);
 extern struct cache *cache_init(int, int, int, int);
 extern struct file_buffer *cache_lookup(struct cache *, long long);
 extern struct file_buffer *cache_get(struct cache *, long long);