1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/types.h> 4 #include <linux/slab.h> 5 6 #include <linux/ceph/cls_lock_client.h> 7 #include <linux/ceph/decode.h> 8 9 /** 10 * ceph_cls_lock - grab rados lock for object 11 * @oid, @oloc: object to lock 12 * @lock_name: the name of the lock 13 * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED) 14 * @cookie: user-defined identifier for this instance of the lock 15 * @tag: user-defined tag 16 * @desc: user-defined lock description 17 * @flags: lock flags 18 * 19 * All operations on the same lock should use the same tag. 20 */ 21 int ceph_cls_lock(struct ceph_osd_client *osdc, 22 struct ceph_object_id *oid, 23 struct ceph_object_locator *oloc, 24 char *lock_name, u8 type, char *cookie, 25 char *tag, char *desc, u8 flags) 26 { 27 int lock_op_buf_size; 28 int name_len = strlen(lock_name); 29 int cookie_len = strlen(cookie); 30 int tag_len = strlen(tag); 31 int desc_len = strlen(desc); 32 void *p, *end; 33 struct page *lock_op_page; 34 struct timespec mtime; 35 int ret; 36 37 lock_op_buf_size = name_len + sizeof(__le32) + 38 cookie_len + sizeof(__le32) + 39 tag_len + sizeof(__le32) + 40 desc_len + sizeof(__le32) + 41 sizeof(struct ceph_timespec) + 42 /* flag and type */ 43 sizeof(u8) + sizeof(u8) + 44 CEPH_ENCODING_START_BLK_LEN; 45 if (lock_op_buf_size > PAGE_SIZE) 46 return -E2BIG; 47 48 lock_op_page = alloc_page(GFP_NOIO); 49 if (!lock_op_page) 50 return -ENOMEM; 51 52 p = page_address(lock_op_page); 53 end = p + lock_op_buf_size; 54 55 /* encode cls_lock_lock_op struct */ 56 ceph_start_encoding(&p, 1, 1, 57 lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN); 58 ceph_encode_string(&p, end, lock_name, name_len); 59 ceph_encode_8(&p, type); 60 ceph_encode_string(&p, end, cookie, cookie_len); 61 ceph_encode_string(&p, end, tag, tag_len); 62 ceph_encode_string(&p, end, desc, desc_len); 63 /* only support infinite duration */ 64 memset(&mtime, 0, sizeof(mtime)); 65 ceph_encode_timespec(p, &mtime); 66 p += sizeof(struct ceph_timespec); 67 ceph_encode_8(&p, flags); 68 69 dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n", 70 __func__, lock_name, type, cookie, tag, desc, flags); 71 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock", 72 CEPH_OSD_FLAG_WRITE, lock_op_page, 73 lock_op_buf_size, NULL, NULL); 74 75 dout("%s: status %d\n", __func__, ret); 76 __free_page(lock_op_page); 77 return ret; 78 } 79 EXPORT_SYMBOL(ceph_cls_lock); 80 81 /** 82 * ceph_cls_unlock - release rados lock for object 83 * @oid, @oloc: object to lock 84 * @lock_name: the name of the lock 85 * @cookie: user-defined identifier for this instance of the lock 86 */ 87 int ceph_cls_unlock(struct ceph_osd_client *osdc, 88 struct ceph_object_id *oid, 89 struct ceph_object_locator *oloc, 90 char *lock_name, char *cookie) 91 { 92 int unlock_op_buf_size; 93 int name_len = strlen(lock_name); 94 int cookie_len = strlen(cookie); 95 void *p, *end; 96 struct page *unlock_op_page; 97 int ret; 98 99 unlock_op_buf_size = name_len + sizeof(__le32) + 100 cookie_len + sizeof(__le32) + 101 CEPH_ENCODING_START_BLK_LEN; 102 if (unlock_op_buf_size > PAGE_SIZE) 103 return -E2BIG; 104 105 unlock_op_page = alloc_page(GFP_NOIO); 106 if (!unlock_op_page) 107 return -ENOMEM; 108 109 p = page_address(unlock_op_page); 110 end = p + unlock_op_buf_size; 111 112 /* encode cls_lock_unlock_op struct */ 113 ceph_start_encoding(&p, 1, 1, 114 unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN); 115 ceph_encode_string(&p, end, lock_name, name_len); 116 ceph_encode_string(&p, end, cookie, cookie_len); 117 118 dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie); 119 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock", 120 CEPH_OSD_FLAG_WRITE, unlock_op_page, 121 unlock_op_buf_size, NULL, NULL); 122 123 dout("%s: status %d\n", __func__, ret); 124 __free_page(unlock_op_page); 125 return ret; 126 } 127 EXPORT_SYMBOL(ceph_cls_unlock); 128 129 /** 130 * ceph_cls_break_lock - release rados lock for object for specified client 131 * @oid, @oloc: object to lock 132 * @lock_name: the name of the lock 133 * @cookie: user-defined identifier for this instance of the lock 134 * @locker: current lock owner 135 */ 136 int ceph_cls_break_lock(struct ceph_osd_client *osdc, 137 struct ceph_object_id *oid, 138 struct ceph_object_locator *oloc, 139 char *lock_name, char *cookie, 140 struct ceph_entity_name *locker) 141 { 142 int break_op_buf_size; 143 int name_len = strlen(lock_name); 144 int cookie_len = strlen(cookie); 145 struct page *break_op_page; 146 void *p, *end; 147 int ret; 148 149 break_op_buf_size = name_len + sizeof(__le32) + 150 cookie_len + sizeof(__le32) + 151 sizeof(u8) + sizeof(__le64) + 152 CEPH_ENCODING_START_BLK_LEN; 153 if (break_op_buf_size > PAGE_SIZE) 154 return -E2BIG; 155 156 break_op_page = alloc_page(GFP_NOIO); 157 if (!break_op_page) 158 return -ENOMEM; 159 160 p = page_address(break_op_page); 161 end = p + break_op_buf_size; 162 163 /* encode cls_lock_break_op struct */ 164 ceph_start_encoding(&p, 1, 1, 165 break_op_buf_size - CEPH_ENCODING_START_BLK_LEN); 166 ceph_encode_string(&p, end, lock_name, name_len); 167 ceph_encode_copy(&p, locker, sizeof(*locker)); 168 ceph_encode_string(&p, end, cookie, cookie_len); 169 170 dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name, 171 cookie, ENTITY_NAME(*locker)); 172 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock", 173 CEPH_OSD_FLAG_WRITE, break_op_page, 174 break_op_buf_size, NULL, NULL); 175 176 dout("%s: status %d\n", __func__, ret); 177 __free_page(break_op_page); 178 return ret; 179 } 180 EXPORT_SYMBOL(ceph_cls_break_lock); 181 182 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers) 183 { 184 int i; 185 186 for (i = 0; i < num_lockers; i++) 187 kfree(lockers[i].id.cookie); 188 kfree(lockers); 189 } 190 EXPORT_SYMBOL(ceph_free_lockers); 191 192 static int decode_locker(void **p, void *end, struct ceph_locker *locker) 193 { 194 u8 struct_v; 195 u32 len; 196 char *s; 197 int ret; 198 199 ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len); 200 if (ret) 201 return ret; 202 203 ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name)); 204 s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO); 205 if (IS_ERR(s)) 206 return PTR_ERR(s); 207 208 locker->id.cookie = s; 209 210 ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len); 211 if (ret) 212 return ret; 213 214 *p += sizeof(struct ceph_timespec); /* skip expiration */ 215 ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr)); 216 ceph_decode_addr(&locker->info.addr); 217 len = ceph_decode_32(p); 218 *p += len; /* skip description */ 219 220 dout("%s %s%llu cookie %s addr %s\n", __func__, 221 ENTITY_NAME(locker->id.name), locker->id.cookie, 222 ceph_pr_addr(&locker->info.addr.in_addr)); 223 return 0; 224 } 225 226 static int decode_lockers(void **p, void *end, u8 *type, char **tag, 227 struct ceph_locker **lockers, u32 *num_lockers) 228 { 229 u8 struct_v; 230 u32 struct_len; 231 char *s; 232 int i; 233 int ret; 234 235 ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply", 236 &struct_v, &struct_len); 237 if (ret) 238 return ret; 239 240 *num_lockers = ceph_decode_32(p); 241 *lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO); 242 if (!*lockers) 243 return -ENOMEM; 244 245 for (i = 0; i < *num_lockers; i++) { 246 ret = decode_locker(p, end, *lockers + i); 247 if (ret) 248 goto err_free_lockers; 249 } 250 251 *type = ceph_decode_8(p); 252 s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO); 253 if (IS_ERR(s)) { 254 ret = PTR_ERR(s); 255 goto err_free_lockers; 256 } 257 258 *tag = s; 259 return 0; 260 261 err_free_lockers: 262 ceph_free_lockers(*lockers, *num_lockers); 263 return ret; 264 } 265 266 /* 267 * On success, the caller is responsible for: 268 * 269 * kfree(tag); 270 * ceph_free_lockers(lockers, num_lockers); 271 */ 272 int ceph_cls_lock_info(struct ceph_osd_client *osdc, 273 struct ceph_object_id *oid, 274 struct ceph_object_locator *oloc, 275 char *lock_name, u8 *type, char **tag, 276 struct ceph_locker **lockers, u32 *num_lockers) 277 { 278 int get_info_op_buf_size; 279 int name_len = strlen(lock_name); 280 struct page *get_info_op_page, *reply_page; 281 size_t reply_len = PAGE_SIZE; 282 void *p, *end; 283 int ret; 284 285 get_info_op_buf_size = name_len + sizeof(__le32) + 286 CEPH_ENCODING_START_BLK_LEN; 287 if (get_info_op_buf_size > PAGE_SIZE) 288 return -E2BIG; 289 290 get_info_op_page = alloc_page(GFP_NOIO); 291 if (!get_info_op_page) 292 return -ENOMEM; 293 294 reply_page = alloc_page(GFP_NOIO); 295 if (!reply_page) { 296 __free_page(get_info_op_page); 297 return -ENOMEM; 298 } 299 300 p = page_address(get_info_op_page); 301 end = p + get_info_op_buf_size; 302 303 /* encode cls_lock_get_info_op struct */ 304 ceph_start_encoding(&p, 1, 1, 305 get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN); 306 ceph_encode_string(&p, end, lock_name, name_len); 307 308 dout("%s lock_name %s\n", __func__, lock_name); 309 ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info", 310 CEPH_OSD_FLAG_READ, get_info_op_page, 311 get_info_op_buf_size, reply_page, &reply_len); 312 313 dout("%s: status %d\n", __func__, ret); 314 if (ret >= 0) { 315 p = page_address(reply_page); 316 end = p + reply_len; 317 318 ret = decode_lockers(&p, end, type, tag, lockers, num_lockers); 319 } 320 321 __free_page(get_info_op_page); 322 __free_page(reply_page); 323 return ret; 324 } 325 EXPORT_SYMBOL(ceph_cls_lock_info); 326