Merge commit 'origin/master' into for-linus/xen/master
[linux-2.6] / drivers / staging / dst / state.c
1 /*
2  * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3  * All rights reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  */
15
16 #include <linux/buffer_head.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/connector.h>
20 #include <linux/dst.h>
21 #include <linux/device.h>
22 #include <linux/in.h>
23 #include <linux/in6.h>
24 #include <linux/socket.h>
25 #include <linux/slab.h>
26
27 #include <net/sock.h>
28
29 /*
30  * Polling machinery.
31  */
32
33 struct dst_poll_helper
34 {
35         poll_table              pt;
36         struct dst_state        *st;
37 };
38
39 static int dst_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key)
40 {
41         struct dst_state *st = container_of(wait, struct dst_state, wait);
42
43         wake_up(&st->thread_wait);
44         return 1;
45 }
46
47 static void dst_queue_func(struct file *file, wait_queue_head_t *whead,
48                                  poll_table *pt)
49 {
50         struct dst_state *st = container_of(pt, struct dst_poll_helper, pt)->st;
51
52         st->whead = whead;
53         init_waitqueue_func_entry(&st->wait, dst_queue_wake);
54         add_wait_queue(whead, &st->wait);
55 }
56
57 void dst_poll_exit(struct dst_state *st)
58 {
59         if (st->whead) {
60                 remove_wait_queue(st->whead, &st->wait);
61                 st->whead = NULL;
62         }
63 }
64
65 int dst_poll_init(struct dst_state *st)
66 {
67         struct dst_poll_helper ph;
68
69         ph.st = st;
70         init_poll_funcptr(&ph.pt, &dst_queue_func);
71
72         st->socket->ops->poll(NULL, st->socket, &ph.pt);
73         return 0;
74 }
75
76 /*
77  * Header receiving function - may block.
78  */
79 static int dst_data_recv_header(struct socket *sock,
80                 void *data, unsigned int size, int block)
81 {
82         struct msghdr msg;
83         struct kvec iov;
84         int err;
85
86         iov.iov_base = data;
87         iov.iov_len = size;
88
89         msg.msg_iov = (struct iovec *)&iov;
90         msg.msg_iovlen = 1;
91         msg.msg_name = NULL;
92         msg.msg_namelen = 0;
93         msg.msg_control = NULL;
94         msg.msg_controllen = 0;
95         msg.msg_flags = (block)?MSG_WAITALL:MSG_DONTWAIT;
96
97         err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len,
98                         msg.msg_flags);
99         if (err != size)
100                 return -1;
101
102         return 0;
103 }
104
105 /*
106  * Header sending function - may block.
107  */
108 int dst_data_send_header(struct socket *sock,
109                 void *data, unsigned int size, int more)
110 {
111         struct msghdr msg;
112         struct kvec iov;
113         int err;
114
115         iov.iov_base = data;
116         iov.iov_len = size;
117
118         msg.msg_iov = (struct iovec *)&iov;
119         msg.msg_iovlen = 1;
120         msg.msg_name = NULL;
121         msg.msg_namelen = 0;
122         msg.msg_control = NULL;
123         msg.msg_controllen = 0;
124         msg.msg_flags = MSG_WAITALL | (more)?MSG_MORE:0;
125
126         err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
127         if (err != size) {
128                 dprintk("%s: size: %u, more: %d, err: %d.\n",
129                                 __func__, size, more, err);
130                 return -1;
131         }
132
133         return 0;
134 }
135
136 /*
137  * Block autoconfiguration: request size of the storage and permissions.
138  */
139 static int dst_request_remote_config(struct dst_state *st)
140 {
141         struct dst_node *n = st->node;
142         int err = -EINVAL;
143         struct dst_cmd *cmd = st->data;
144
145         memset(cmd, 0, sizeof(struct dst_cmd));
146         cmd->cmd = DST_CFG;
147
148         dst_convert_cmd(cmd);
149
150         err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
151         if (err)
152                 goto out;
153
154         err = dst_data_recv_header(st->socket, cmd, sizeof(struct dst_cmd), 1);
155         if (err)
156                 goto out;
157
158         dst_convert_cmd(cmd);
159
160         if (cmd->cmd != DST_CFG) {
161                 err = -EINVAL;
162                 dprintk("%s: checking result: cmd: %d, size reported: %llu.\n",
163                         __func__, cmd->cmd, cmd->sector);
164                 goto out;
165         }
166
167         if (n->size != 0)
168                 n->size = min_t(loff_t, n->size, cmd->sector);
169         else
170                 n->size = cmd->sector;
171
172         n->info->size = n->size;
173         st->permissions = cmd->rw;
174
175 out:
176         dprintk("%s: n: %p, err: %d, size: %llu, permission: %x.\n",
177                         __func__, n, err, n->size, st->permissions);
178         return err;
179 }
180
181 /*
182  * Socket machinery.
183  */
184
185 #define DST_DEFAULT_TIMEO       20000
186
187 int dst_state_socket_create(struct dst_state *st)
188 {
189         int err;
190         struct socket *sock;
191         struct dst_network_ctl *ctl = &st->ctl;
192
193         err = sock_create(ctl->addr.sa_family, ctl->type, ctl->proto, &sock);
194         if (err < 0)
195                 return err;
196
197         sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo =
198                 msecs_to_jiffies(DST_DEFAULT_TIMEO);
199         sock->sk->sk_allocation = GFP_NOIO;
200
201         st->socket = st->read_socket = sock;
202         return 0;
203 }
204
205 void dst_state_socket_release(struct dst_state *st)
206 {
207         dprintk("%s: st: %p, socket: %p, n: %p.\n",
208                         __func__, st, st->socket, st->node);
209         if (st->socket) {
210                 sock_release(st->socket);
211                 st->socket = NULL;
212                 st->read_socket = NULL;
213         }
214 }
215
216 void dst_dump_addr(struct socket *sk, struct sockaddr *sa, char *str)
217 {
218         if (sk->ops->family == AF_INET) {
219                 struct sockaddr_in *sin = (struct sockaddr_in *)sa;
220                 printk(KERN_INFO "%s %u.%u.%u.%u:%d.\n",
221                         str, NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
222         } else if (sk->ops->family == AF_INET6) {
223                 struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sa;
224                 printk(KERN_INFO "%s %pi6:%d",
225                         str, &sin->sin6_addr, ntohs(sin->sin6_port));
226         }
227 }
228
229 void dst_state_exit_connected(struct dst_state *st)
230 {
231         if (st->socket) {
232                 dst_poll_exit(st);
233                 st->socket->ops->shutdown(st->socket, 2);
234
235                 dst_dump_addr(st->socket, (struct sockaddr *)&st->ctl.addr,
236                                 "Disconnected peer");
237                 dst_state_socket_release(st);
238         }
239 }
240
241 static int dst_state_init_connected(struct dst_state *st)
242 {
243         int err;
244         struct dst_network_ctl *ctl = &st->ctl;
245
246         err = dst_state_socket_create(st);
247         if (err)
248                 goto err_out_exit;
249
250         err = kernel_connect(st->socket, (struct sockaddr *)&st->ctl.addr,
251                         st->ctl.addr.sa_data_len, 0);
252         if (err)
253                 goto err_out_release;
254
255         err = dst_poll_init(st);
256         if (err)
257                 goto err_out_release;
258
259         dst_dump_addr(st->socket, (struct sockaddr *)&ctl->addr,
260                         "Connected to peer");
261
262         return 0;
263
264 err_out_release:
265         dst_state_socket_release(st);
266 err_out_exit:
267         return err;
268 }
269
270 /*
271  * State reset is used to reconnect to the remote peer.
272  * May fail, but who cares, we will try again later.
273  */
274 static void inline dst_state_reset_nolock(struct dst_state *st)
275 {
276         dst_state_exit_connected(st);
277         dst_state_init_connected(st);
278 }
279
280 static void inline dst_state_reset(struct dst_state *st)
281 {
282         dst_state_lock(st);
283         dst_state_reset_nolock(st);
284         dst_state_unlock(st);
285 }
286
287 /*
288  * Basic network sending/receiving functions.
289  * Blocked mode is used.
290  */
291 static int dst_data_recv_raw(struct dst_state *st, void *buf, u64 size)
292 {
293         struct msghdr msg;
294         struct kvec iov;
295         int err;
296
297         BUG_ON(!size);
298
299         iov.iov_base = buf;
300         iov.iov_len = size;
301
302         msg.msg_iov = (struct iovec *)&iov;
303         msg.msg_iovlen = 1;
304         msg.msg_name = NULL;
305         msg.msg_namelen = 0;
306         msg.msg_control = NULL;
307         msg.msg_controllen = 0;
308         msg.msg_flags = MSG_DONTWAIT;
309
310         err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len,
311                         msg.msg_flags);
312         if (err <= 0) {
313                 dprintk("%s: failed to recv data: size: %llu, err: %d.\n",
314                                 __func__, size, err);
315                 if (err == 0)
316                         err = -ECONNRESET;
317
318                 dst_state_exit_connected(st);
319         }
320
321         return err;
322 }
323
324 /*
325  * Ping command to early detect failed nodes.
326  */
327 static int dst_send_ping(struct dst_state *st)
328 {
329         struct dst_cmd *cmd = st->data;
330         int err = -ECONNRESET;
331
332         dst_state_lock(st);
333         if (st->socket) {
334                 memset(cmd, 0, sizeof(struct dst_cmd));
335
336                 cmd->cmd = __cpu_to_be32(DST_PING);
337
338                 err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
339         }
340         dprintk("%s: st: %p, socket: %p, err: %d.\n", __func__, st, st->socket, err);
341         dst_state_unlock(st);
342
343         return err;
344 }
345
346 /*
347  * Receiving function, which should either return error or read
348  * whole block request. If there was no traffic for a one second,
349  * send a ping, since remote node may die.
350  */
351 int dst_data_recv(struct dst_state *st, void *data, unsigned int size)
352 {
353         unsigned int revents = 0;
354         unsigned int err_mask = POLLERR | POLLHUP | POLLRDHUP;
355         unsigned int mask = err_mask | POLLIN;
356         struct dst_node *n = st->node;
357         int err = 0;
358
359         while (size && !err) {
360                 revents = dst_state_poll(st);
361
362                 if (!(revents & mask)) {
363                         DEFINE_WAIT(wait);
364
365                         for (;;) {
366                                 prepare_to_wait(&st->thread_wait, &wait,
367                                                 TASK_INTERRUPTIBLE);
368                                 if (!n->trans_scan_timeout || st->need_exit)
369                                         break;
370
371                                 revents = dst_state_poll(st);
372
373                                 if (revents & mask)
374                                         break;
375
376                                 if (signal_pending(current))
377                                         break;
378
379                                 if (!schedule_timeout(HZ)) {
380                                         err = dst_send_ping(st);
381                                         if (err)
382                                                 return err;
383                                 }
384
385                                 continue;
386                         }
387                         finish_wait(&st->thread_wait, &wait);
388                 }
389
390                 err = -ECONNRESET;
391                 dst_state_lock(st);
392
393                 if (            st->socket &&
394                                 (st->read_socket == st->socket) &&
395                                 (revents & POLLIN)) {
396                         err = dst_data_recv_raw(st, data, size);
397                         if (err > 0) {
398                                 data += err;
399                                 size -= err;
400                                 err = 0;
401                         }
402                 }
403
404                 if (revents & err_mask || !st->socket) {
405                         dprintk("%s: revents: %x, socket: %p, size: %u, err: %d.\n",
406                                         __func__, revents, st->socket, size, err);
407                         err = -ECONNRESET;
408                 }
409
410                 dst_state_unlock(st);
411
412                 if (!n->trans_scan_timeout)
413                         err = -ENODEV;
414         }
415
416         return err;
417 }
418
419 /*
420  * Send block autoconf reply.
421  */
422 static int dst_process_cfg(struct dst_state *st)
423 {
424         struct dst_node *n = st->node;
425         struct dst_cmd *cmd = st->data;
426         int err;
427
428         cmd->sector = n->size;
429         cmd->rw = st->permissions;
430
431         dst_convert_cmd(cmd);
432
433         dst_state_lock(st);
434         err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
435         dst_state_unlock(st);
436
437         return err;
438 }
439
440 /*
441  * Receive block IO from the network.
442  */
443 static int dst_recv_bio(struct dst_state *st, struct bio *bio, unsigned int total_size)
444 {
445         struct bio_vec *bv;
446         int i, err;
447         void *data;
448         unsigned int sz;
449
450         bio_for_each_segment(bv, bio, i) {
451                 sz = min(total_size, bv->bv_len);
452
453                 dprintk("%s: bio: %llu/%u, total: %u, len: %u, sz: %u, off: %u.\n",
454                         __func__, (u64)bio->bi_sector, bio->bi_size, total_size,
455                         bv->bv_len, sz, bv->bv_offset);
456
457                 data = kmap(bv->bv_page) + bv->bv_offset;
458                 err = dst_data_recv(st, data, sz);
459                 kunmap(bv->bv_page);
460
461                 bv->bv_len = sz;
462
463                 if (err)
464                         return err;
465
466                 total_size -= sz;
467                 if (total_size == 0)
468                         break;
469         }
470
471         return 0;
472 }
473
474 /*
475  * Our block IO has just completed and arrived: get it.
476  */
477 static int dst_process_io_response(struct dst_state *st)
478 {
479         struct dst_node *n = st->node;
480         struct dst_cmd *cmd = st->data;
481         struct dst_trans *t;
482         int err = 0;
483         struct bio *bio;
484
485         mutex_lock(&n->trans_lock);
486         t = dst_trans_search(n, cmd->id);
487         mutex_unlock(&n->trans_lock);
488
489         if (!t)
490                 goto err_out_exit;
491
492         bio = t->bio;
493
494         dprintk("%s: bio: %llu/%u, cmd_size: %u, csize: %u, dir: %lu.\n",
495                 __func__, (u64)bio->bi_sector, bio->bi_size, cmd->size,
496                 cmd->csize, bio_data_dir(bio));
497
498         if (bio_data_dir(bio) == READ) {
499                 if (bio->bi_size != cmd->size - cmd->csize)
500                         goto err_out_exit;
501
502                 if (dst_need_crypto(n)) {
503                         err = dst_recv_cdata(st, t->cmd.hash);
504                         if (err)
505                                 goto err_out_exit;
506                 }
507
508                 err = dst_recv_bio(st, t->bio, bio->bi_size);
509                 if (err)
510                         goto err_out_exit;
511
512                 if (dst_need_crypto(n))
513                         return dst_trans_crypto(t);
514         } else {
515                 err = -EBADMSG;
516                 if (cmd->size || cmd->csize)
517                         goto err_out_exit;
518         }
519
520         dst_trans_remove(t);
521         dst_trans_put(t);
522
523         return 0;
524
525 err_out_exit:
526         return err;
527 }
528
529 /*
530  * Receive crypto data.
531  */
532 int dst_recv_cdata(struct dst_state *st, void *cdata)
533 {
534         struct dst_cmd *cmd = st->data;
535         struct dst_node *n = st->node;
536         struct dst_crypto_ctl *c = &n->crypto;
537         int err;
538
539         if (cmd->csize != c->crypto_attached_size) {
540                 dprintk("%s: cmd: cmd: %u, sector: %llu, size: %u, "
541                                 "csize: %u != digest size %u.\n",
542                                 __func__, cmd->cmd, cmd->sector, cmd->size,
543                                 cmd->csize, c->crypto_attached_size);
544                 err = -EINVAL;
545                 goto err_out_exit;
546         }
547
548         err = dst_data_recv(st, cdata, cmd->csize);
549         if (err)
550                 goto err_out_exit;
551
552         cmd->size -= cmd->csize;
553         return 0;
554
555 err_out_exit:
556         return err;
557 }
558
559 /*
560  * Receive the command and start its processing.
561  */
562 static int dst_recv_processing(struct dst_state *st)
563 {
564         int err = -EINTR;
565         struct dst_cmd *cmd = st->data;
566
567         /*
568          * If socket will be reset after this statement, then
569          * dst_data_recv() will just fail and loop will
570          * start again, so it can be done without any locks.
571          *
572          * st->read_socket is needed to prevents state machine
573          * breaking between this data reading and subsequent one
574          * in protocol specific functions during connection reset.
575          * In case of reset we have to read next command and do
576          * not expect data for old command to magically appear in
577          * new connection.
578          */
579         st->read_socket = st->socket;
580         err = dst_data_recv(st, cmd, sizeof(struct dst_cmd));
581         if (err)
582                 goto out_exit;
583
584         dst_convert_cmd(cmd);
585
586         dprintk("%s: cmd: %u, size: %u, csize: %u, id: %llu, "
587                         "sector: %llu, flags: %llx, rw: %llx.\n",
588                         __func__, cmd->cmd, cmd->size,
589                         cmd->csize, cmd->id, cmd->sector,
590                         cmd->flags, cmd->rw);
591
592         /*
593          * This should catch protocol breakage and random garbage instead of commands.
594          */
595         if (unlikely(cmd->csize > st->size - sizeof(struct dst_cmd))) {
596                 err = -EBADMSG;
597                 goto out_exit;
598         }
599
600         err = -EPROTO;
601         switch (cmd->cmd) {
602                 case DST_IO_RESPONSE:
603                         err = dst_process_io_response(st);
604                         break;
605                 case DST_IO:
606                         err = dst_process_io(st);
607                         break;
608                 case DST_CFG:
609                         err = dst_process_cfg(st);
610                         break;
611                 case DST_PING:
612                         err = 0;
613                         break;
614                 default:
615                         break;
616         }
617
618 out_exit:
619         return err;
620 }
621
622 /*
623  * Receiving thread. For the client node we should try to reconnect,
624  * for accepted client we just drop the state and expect it to reconnect.
625  */
626 static int dst_recv(void *init_data, void *schedule_data)
627 {
628         struct dst_state *st = schedule_data;
629         struct dst_node *n = init_data;
630         int err = 0;
631
632         dprintk("%s: start st: %p, n: %p, scan: %lu, need_exit: %d.\n",
633                         __func__, st, n, n->trans_scan_timeout, st->need_exit);
634
635         while (n->trans_scan_timeout && !st->need_exit) {
636                 err = dst_recv_processing(st);
637                 if (err < 0) {
638                         if (!st->ctl.type)
639                                 break;
640
641                         if (!n->trans_scan_timeout || st->need_exit)
642                                 break;
643
644                         dst_state_reset(st);
645                         msleep(1000);
646                 }
647         }
648
649         st->need_exit = 1;
650         wake_up(&st->thread_wait);
651
652         dprintk("%s: freeing receiving socket st: %p.\n", __func__, st);
653         dst_state_lock(st);
654         dst_state_exit_connected(st);
655         dst_state_unlock(st);
656         dst_state_put(st);
657
658         dprintk("%s: freed receiving socket st: %p.\n", __func__, st);
659
660         return err;
661 }
662
663 /*
664  * Network state dies here and borns couple of lines below.
665  * This object is the main network state processing engine:
666  * sending, receiving, reconnections, all network related
667  * tasks are handled on behalf of the state.
668  */
669 static void dst_state_free(struct dst_state *st)
670 {
671         dprintk("%s: st: %p.\n", __func__, st);
672         if (st->cleanup)
673                 st->cleanup(st);
674         kfree(st->data);
675         kfree(st);
676 }
677
678 struct dst_state *dst_state_alloc(struct dst_node *n)
679 {
680         struct dst_state *st;
681         int err = -ENOMEM;
682
683         st = kzalloc(sizeof(struct dst_state), GFP_KERNEL);
684         if (!st)
685                 goto err_out_exit;
686
687         st->node = n;
688         st->need_exit = 0;
689
690         st->size = PAGE_SIZE;
691         st->data = kmalloc(st->size, GFP_KERNEL);
692         if (!st->data)
693                 goto err_out_free;
694
695         spin_lock_init(&st->request_lock);
696         INIT_LIST_HEAD(&st->request_list);
697
698         mutex_init(&st->state_lock);
699         init_waitqueue_head(&st->thread_wait);
700
701         /*
702          * One for processing thread, another one for node itself.
703          */
704         atomic_set(&st->refcnt, 2);
705
706         dprintk("%s: st: %p, n: %p.\n", __func__, st, st->node);
707
708         return st;
709
710 err_out_free:
711         kfree(st);
712 err_out_exit:
713         return ERR_PTR(err);
714 }
715
716 int dst_state_schedule_receiver(struct dst_state *st)
717 {
718         return thread_pool_schedule_private(st->node->pool, dst_thread_setup,
719                         dst_recv, st, MAX_SCHEDULE_TIMEOUT, st->node);
720 }
721
722 /*
723  * Initialize client's connection to the remote peer: allocate state,
724  * connect and perform block IO autoconfiguration.
725  */
726 int dst_node_init_connected(struct dst_node *n, struct dst_network_ctl *r)
727 {
728         struct dst_state *st;
729         int err = -ENOMEM;
730
731         st = dst_state_alloc(n);
732         if (IS_ERR(st)) {
733                 err = PTR_ERR(st);
734                 goto err_out_exit;
735         }
736         memcpy(&st->ctl, r, sizeof(struct dst_network_ctl));
737
738         err = dst_state_init_connected(st);
739         if (err)
740                 goto err_out_free_data;
741
742         err = dst_request_remote_config(st);
743         if (err)
744                 goto err_out_exit_connected;
745         n->state = st;
746
747         err = dst_state_schedule_receiver(st);
748         if (err)
749                 goto err_out_exit_connected;
750
751         return 0;
752
753 err_out_exit_connected:
754         dst_state_exit_connected(st);
755 err_out_free_data:
756         dst_state_free(st);
757 err_out_exit:
758         n->state = NULL;
759         return err;
760 }
761
762 void dst_state_put(struct dst_state *st)
763 {
764         dprintk("%s: st: %p, refcnt: %d.\n",
765                         __func__, st, atomic_read(&st->refcnt));
766         if (atomic_dec_and_test(&st->refcnt))
767                 dst_state_free(st);
768 }
769
770 /*
771  * Send block IO to the network one by one using zero-copy ->sendpage().
772  */
773 int dst_send_bio(struct dst_state *st, struct dst_cmd *cmd, struct bio *bio)
774 {
775         struct bio_vec *bv;
776         struct dst_crypto_ctl *c = &st->node->crypto;
777         int err, i = 0;
778         int flags = MSG_WAITALL;
779
780         err = dst_data_send_header(st->socket, cmd,
781                 sizeof(struct dst_cmd) + c->crypto_attached_size, bio->bi_vcnt);
782         if (err)
783                 goto err_out_exit;
784
785         bio_for_each_segment(bv, bio, i) {
786                 if (i < bio->bi_vcnt - 1)
787                         flags |= MSG_MORE;
788
789                 err = kernel_sendpage(st->socket, bv->bv_page, bv->bv_offset,
790                                 bv->bv_len, flags);
791                 if (err <= 0)
792                         goto err_out_exit;
793         }
794
795         return 0;
796
797 err_out_exit:
798         dprintk("%s: %d/%d, flags: %x, err: %d.\n",
799                         __func__, i, bio->bi_vcnt, flags, err);
800         return err;
801 }
802
803 /*
804  * Send transaction to the remote peer.
805  */
806 int dst_trans_send(struct dst_trans *t)
807 {
808         int err;
809         struct dst_state *st = t->n->state;
810         struct bio *bio = t->bio;
811
812         dst_convert_cmd(&t->cmd);
813
814         dst_state_lock(st);
815         if (!st->socket) {
816                 err = dst_state_init_connected(st);
817                 if (err)
818                         goto err_out_unlock;
819         }
820
821         if (bio_data_dir(bio) == WRITE) {
822                 err = dst_send_bio(st, &t->cmd, t->bio);
823         } else {
824                 err = dst_data_send_header(st->socket, &t->cmd,
825                                 sizeof(struct dst_cmd), 0);
826         }
827         if (err)
828                 goto err_out_reset;
829
830         dst_state_unlock(st);
831         return 0;
832
833 err_out_reset:
834         dst_state_reset_nolock(st);
835 err_out_unlock:
836         dst_state_unlock(st);
837
838         return err;
839 }