Add streaming filter API
[git] / streaming.c
1 /*
2  * Copyright (c) 2011, Google Inc.
3  */
4 #include "cache.h"
5 #include "streaming.h"
6
7 enum input_source {
8         stream_error = -1,
9         incore = 0,
10         loose = 1,
11         pack_non_delta = 2
12 };
13
14 typedef int (*open_istream_fn)(struct git_istream *,
15                                struct object_info *,
16                                const unsigned char *,
17                                enum object_type *);
18 typedef int (*close_istream_fn)(struct git_istream *);
19 typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
20
21 struct stream_vtbl {
22         close_istream_fn close;
23         read_istream_fn read;
24 };
25
26 #define open_method_decl(name) \
27         int open_istream_ ##name \
28         (struct git_istream *st, struct object_info *oi, \
29          const unsigned char *sha1, \
30          enum object_type *type)
31
32 #define close_method_decl(name) \
33         int close_istream_ ##name \
34         (struct git_istream *st)
35
36 #define read_method_decl(name) \
37         ssize_t read_istream_ ##name \
38         (struct git_istream *st, char *buf, size_t sz)
39
40 /* forward declaration */
41 static open_method_decl(incore);
42 static open_method_decl(loose);
43 static open_method_decl(pack_non_delta);
44 static struct git_istream *attach_stream_filter(struct git_istream *st,
45                                                 struct stream_filter *filter);
46
47
48 static open_istream_fn open_istream_tbl[] = {
49         open_istream_incore,
50         open_istream_loose,
51         open_istream_pack_non_delta,
52 };
53
54 #define FILTER_BUFFER (1024*16)
55
56 struct filtered_istream {
57         struct git_istream *upstream;
58         struct stream_filter *filter;
59         char ibuf[FILTER_BUFFER];
60         char obuf[FILTER_BUFFER];
61         int i_end, i_ptr;
62         int o_end, o_ptr;
63 };
64
65 struct git_istream {
66         const struct stream_vtbl *vtbl;
67         unsigned long size; /* inflated size of full object */
68         z_stream z;
69         enum { z_unused, z_used, z_done, z_error } z_state;
70
71         union {
72                 struct {
73                         char *buf; /* from read_object() */
74                         unsigned long read_ptr;
75                 } incore;
76
77                 struct {
78                         void *mapped;
79                         unsigned long mapsize;
80                         char hdr[32];
81                         int hdr_avail;
82                         int hdr_used;
83                 } loose;
84
85                 struct {
86                         struct packed_git *pack;
87                         off_t pos;
88                 } in_pack;
89
90                 struct filtered_istream filtered;
91         } u;
92 };
93
94 int close_istream(struct git_istream *st)
95 {
96         return st->vtbl->close(st);
97 }
98
99 ssize_t read_istream(struct git_istream *st, char *buf, size_t sz)
100 {
101         return st->vtbl->read(st, buf, sz);
102 }
103
104 static enum input_source istream_source(const unsigned char *sha1,
105                                         enum object_type *type,
106                                         struct object_info *oi)
107 {
108         unsigned long size;
109         int status;
110
111         oi->sizep = &size;
112         status = sha1_object_info_extended(sha1, oi);
113         if (status < 0)
114                 return stream_error;
115         *type = status;
116
117         switch (oi->whence) {
118         case OI_LOOSE:
119                 return loose;
120         case OI_PACKED:
121                 if (!oi->u.packed.is_delta && big_file_threshold <= size)
122                         return pack_non_delta;
123                 /* fallthru */
124         default:
125                 return incore;
126         }
127 }
128
129 struct git_istream *open_istream(const unsigned char *sha1,
130                                  enum object_type *type,
131                                  unsigned long *size,
132                                  struct stream_filter *filter)
133 {
134         struct git_istream *st;
135         struct object_info oi;
136         const unsigned char *real = lookup_replace_object(sha1);
137         enum input_source src = istream_source(real, type, &oi);
138
139         if (src < 0)
140                 return NULL;
141
142         st = xmalloc(sizeof(*st));
143         if (open_istream_tbl[src](st, &oi, real, type)) {
144                 if (open_istream_incore(st, &oi, real, type)) {
145                         free(st);
146                         return NULL;
147                 }
148         }
149         if (st && filter) {
150                 /* Add "&& !is_null_stream_filter(filter)" for performance */
151                 struct git_istream *nst = attach_stream_filter(st, filter);
152                 if (!nst)
153                         close_istream(st);
154                 st = nst;
155         }
156
157         *size = st->size;
158         return st;
159 }
160
161
162 /*****************************************************************
163  *
164  * Common helpers
165  *
166  *****************************************************************/
167
168 static void close_deflated_stream(struct git_istream *st)
169 {
170         if (st->z_state == z_used)
171                 git_inflate_end(&st->z);
172 }
173
174
175 /*****************************************************************
176  *
177  * Filtered stream
178  *
179  *****************************************************************/
180
181 static close_method_decl(filtered)
182 {
183         free_stream_filter(st->u.filtered.filter);
184         return close_istream(st->u.filtered.upstream);
185 }
186
187 static read_method_decl(filtered)
188 {
189         struct filtered_istream *fs = &(st->u.filtered);
190         size_t filled = 0;
191
192         while (sz) {
193                 /* do we already have filtered output? */
194                 if (fs->o_ptr < fs->o_end) {
195                         size_t to_move = fs->o_end - fs->o_ptr;
196                         if (sz < to_move)
197                                 to_move = sz;
198                         memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
199                         fs->o_ptr += to_move;
200                         sz -= to_move;
201                         filled += to_move;
202                         continue;
203                 }
204                 fs->o_end = fs->o_ptr = 0;
205
206                 /* do we have anything to feed the filter with? */
207                 if (fs->i_ptr < fs->i_end) {
208                         size_t to_feed = fs->i_end - fs->i_ptr;
209                         size_t to_receive = FILTER_BUFFER;
210                         if (stream_filter(fs->filter,
211                                           fs->ibuf + fs->i_ptr, &to_feed,
212                                           fs->obuf, &to_receive))
213                                 return -1;
214                         fs->i_ptr = fs->i_end - to_feed;
215                         fs->o_end = FILTER_BUFFER - to_receive;
216                         continue;
217                 }
218                 fs->i_end = fs->i_ptr = 0;
219
220                 /* refill the input from the upstream */
221                 fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
222                 if (fs->i_end <= 0)
223                         break;
224         }
225         return filled;
226 }
227
228 static struct stream_vtbl filtered_vtbl = {
229         close_istream_filtered,
230         read_istream_filtered,
231 };
232
233 static struct git_istream *attach_stream_filter(struct git_istream *st,
234                                                 struct stream_filter *filter)
235 {
236         struct git_istream *ifs = xmalloc(sizeof(*ifs));
237         struct filtered_istream *fs = &(ifs->u.filtered);
238
239         ifs->vtbl = &filtered_vtbl;
240         fs->upstream = st;
241         fs->filter = filter;
242         fs->i_end = fs->i_ptr = 0;
243         fs->o_end = fs->o_ptr = 0;
244         ifs->size = -1; /* unknown */
245         return ifs;
246 }
247
248 /*****************************************************************
249  *
250  * Loose object stream
251  *
252  *****************************************************************/
253
254 static read_method_decl(loose)
255 {
256         size_t total_read = 0;
257
258         switch (st->z_state) {
259         case z_done:
260                 return 0;
261         case z_error:
262                 return -1;
263         default:
264                 break;
265         }
266
267         if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
268                 size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
269                 if (sz < to_copy)
270                         to_copy = sz;
271                 memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
272                 st->u.loose.hdr_used += to_copy;
273                 total_read += to_copy;
274         }
275
276         while (total_read < sz) {
277                 int status;
278
279                 st->z.next_out = (unsigned char *)buf + total_read;
280                 st->z.avail_out = sz - total_read;
281                 status = git_inflate(&st->z, Z_FINISH);
282
283                 total_read = st->z.next_out - (unsigned char *)buf;
284
285                 if (status == Z_STREAM_END) {
286                         git_inflate_end(&st->z);
287                         st->z_state = z_done;
288                         break;
289                 }
290                 if (status != Z_OK && status != Z_BUF_ERROR) {
291                         git_inflate_end(&st->z);
292                         st->z_state = z_error;
293                         return -1;
294                 }
295         }
296         return total_read;
297 }
298
299 static close_method_decl(loose)
300 {
301         close_deflated_stream(st);
302         munmap(st->u.loose.mapped, st->u.loose.mapsize);
303         return 0;
304 }
305
306 static struct stream_vtbl loose_vtbl = {
307         close_istream_loose,
308         read_istream_loose,
309 };
310
311 static open_method_decl(loose)
312 {
313         st->u.loose.mapped = map_sha1_file(sha1, &st->u.loose.mapsize);
314         if (!st->u.loose.mapped)
315                 return -1;
316         if (unpack_sha1_header(&st->z,
317                                st->u.loose.mapped,
318                                st->u.loose.mapsize,
319                                st->u.loose.hdr,
320                                sizeof(st->u.loose.hdr)) < 0) {
321                 git_inflate_end(&st->z);
322                 munmap(st->u.loose.mapped, st->u.loose.mapsize);
323                 return -1;
324         }
325
326         parse_sha1_header(st->u.loose.hdr, &st->size);
327         st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
328         st->u.loose.hdr_avail = st->z.total_out;
329         st->z_state = z_used;
330
331         st->vtbl = &loose_vtbl;
332         return 0;
333 }
334
335
336 /*****************************************************************
337  *
338  * Non-delta packed object stream
339  *
340  *****************************************************************/
341
342 static read_method_decl(pack_non_delta)
343 {
344         size_t total_read = 0;
345
346         switch (st->z_state) {
347         case z_unused:
348                 memset(&st->z, 0, sizeof(st->z));
349                 git_inflate_init(&st->z);
350                 st->z_state = z_used;
351                 break;
352         case z_done:
353                 return 0;
354         case z_error:
355                 return -1;
356         case z_used:
357                 break;
358         }
359
360         while (total_read < sz) {
361                 int status;
362                 struct pack_window *window = NULL;
363                 unsigned char *mapped;
364
365                 mapped = use_pack(st->u.in_pack.pack, &window,
366                                   st->u.in_pack.pos, &st->z.avail_in);
367
368                 st->z.next_out = (unsigned char *)buf + total_read;
369                 st->z.avail_out = sz - total_read;
370                 st->z.next_in = mapped;
371                 status = git_inflate(&st->z, Z_FINISH);
372
373                 st->u.in_pack.pos += st->z.next_in - mapped;
374                 total_read = st->z.next_out - (unsigned char *)buf;
375                 unuse_pack(&window);
376
377                 if (status == Z_STREAM_END) {
378                         git_inflate_end(&st->z);
379                         st->z_state = z_done;
380                         break;
381                 }
382                 if (status != Z_OK && status != Z_BUF_ERROR) {
383                         git_inflate_end(&st->z);
384                         st->z_state = z_error;
385                         return -1;
386                 }
387         }
388         return total_read;
389 }
390
391 static close_method_decl(pack_non_delta)
392 {
393         close_deflated_stream(st);
394         return 0;
395 }
396
397 static struct stream_vtbl pack_non_delta_vtbl = {
398         close_istream_pack_non_delta,
399         read_istream_pack_non_delta,
400 };
401
402 static open_method_decl(pack_non_delta)
403 {
404         struct pack_window *window;
405         enum object_type in_pack_type;
406
407         st->u.in_pack.pack = oi->u.packed.pack;
408         st->u.in_pack.pos = oi->u.packed.offset;
409         window = NULL;
410
411         in_pack_type = unpack_object_header(st->u.in_pack.pack,
412                                             &window,
413                                             &st->u.in_pack.pos,
414                                             &st->size);
415         unuse_pack(&window);
416         switch (in_pack_type) {
417         default:
418                 return -1; /* we do not do deltas for now */
419         case OBJ_COMMIT:
420         case OBJ_TREE:
421         case OBJ_BLOB:
422         case OBJ_TAG:
423                 break;
424         }
425         st->z_state = z_unused;
426         st->vtbl = &pack_non_delta_vtbl;
427         return 0;
428 }
429
430
431 /*****************************************************************
432  *
433  * In-core stream
434  *
435  *****************************************************************/
436
437 static close_method_decl(incore)
438 {
439         free(st->u.incore.buf);
440         return 0;
441 }
442
443 static read_method_decl(incore)
444 {
445         size_t read_size = sz;
446         size_t remainder = st->size - st->u.incore.read_ptr;
447
448         if (remainder <= read_size)
449                 read_size = remainder;
450         if (read_size) {
451                 memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
452                 st->u.incore.read_ptr += read_size;
453         }
454         return read_size;
455 }
456
457 static struct stream_vtbl incore_vtbl = {
458         close_istream_incore,
459         read_istream_incore,
460 };
461
462 static open_method_decl(incore)
463 {
464         st->u.incore.buf = read_sha1_file_extended(sha1, type, &st->size, 0);
465         st->u.incore.read_ptr = 0;
466         st->vtbl = &incore_vtbl;
467
468         return st->u.incore.buf ? 0 : -1;
469 }