基于Java的生产者-消费者模式代码实现,可以此为Demo来用于具体的实际业务。

  • 生产者-消费者demo
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.time.Duration;
    import java.time.Instant;
    import java.util.*;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    /**
    * 推荐服务:生产者-消费者模式
    *
    * @author bubble
    * date: 2018-10-22 15:03
    **/
    @Deprecated
    public class RecService {
    private static final Logger LOGGER = LoggerFactory.getLogger(RecService.class);
    // 日志打印间隔
    private static int DEFAULT_LOG_INTERVAL = 1000 * 10;
    // 队列默认处理容量,为避免队列扩容造成额外性能损耗,默认不扩容,达到当前大小,进入等待,消费者处理一部分数据之后,生产者继续生产
    private static int DEFAULT_QUEUE_DISPOSE_SIZE = 1000 * 10;
    // 队列默认容量
    private static int DEFAULT_QUEUE_SIZE = DEFAULT_QUEUE_DISPOSE_SIZE + 200;
    // 每个队列满时默认休眠时间
    private static int DEFAULT_SLEEP_TIME = 1000;
    // 线程池默认消费者数量
    private static int DEFAULT_CONSUMER_NUM = 2;
    // 用户的phoneId队列
    private LinkedBlockingQueue<Long> phoneIdQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    private Long STOP_FLAG = -1L;
    private Map<Long, List<FlightInfoBean>> flightInfoMap; // 用户航班信息

    private Map<Long, List<FlightInfoBean>> getFlightInfoMap() {
    return flightInfoMap;
    }

    private void setFlightInfoMap(Map<Long, List<FlightInfoBean>> flightInfoMap) {
    this.flightInfoMap = flightInfoMap;
    }

    private RecJobBean recJob;
    private TipRecService tipRecService;
    private CarRecService carRecService;
    private HotelRecService hotelRecService;

    public RecService(RecJobBean recJob) {
    this.recJob = recJob;
    switch (recJob) {
    case TIP:
    tipRecService = Optional.ofNullable(tipRecService).orElse(new TipRecService());
    break;
    case CAR:
    carRecService = Optional.ofNullable(carRecService).orElse(new CarRecService());
    break;
    case HOTEL:
    hotelRecService = Optional.ofNullable(hotelRecService).orElse(new HotelRecService());
    break;
    default:
    LOGGER.error("please enter the correct rec type, error type: {} ", recJob.getType());
    System.exit(0);
    break;
    }
    }

    public void rec(Map<Long, List<FlightInfoBean>> userFlightMap, int top) {
    LOGGER.info("start {} rec job.", recJob.getType());
    if (userFlightMap == null || userFlightMap.isEmpty()) {
    LOGGER.error("user flight is empty, rec job for {} exit.", recJob.getType());
    System.exit(0);
    }
    setFlightInfoMap(userFlightMap);
    UserProducer userProducer = new UserProducer();
    userProducer.setName("producer");
    userProducer.start();
    UserConsumer userConsumer = new UserConsumer(recJob, top);
    ThreadPoolExecutor poolExecutor = userConsumer.consumerAndRec();
    threadMonitor(poolExecutor);
    }

    /**
    * 监控ThreadPoolExecutor线程池,无活动线程就关闭线程池连接;
    */
    private void threadMonitor(ThreadPoolExecutor executor) {
    Runnable runnable = () -> {
    try {
    TimeUnit.SECONDS.sleep(2);
    while (executor.getActiveCount() > 0) {
    TimeUnit.SECONDS.sleep(1);
    }
    executor.shutdown();
    } catch (InterruptedException e) {
    LOGGER.error("thread monitor error.", e);
    }
    };
    new Thread(runnable).start();
    }

    /**
    * 生产者: 可用phoneId集合
    */
    class UserProducer extends Thread {
    int i = 0;

    @Override
    public void run() {
    addUser();
    try {
    // 给每个线程都在队列(FIFO)末尾添加标识
    phoneIdQueue.put(STOP_FLAG);
    } catch (InterruptedException e) {
    LOGGER.error("save phoneId error.", e);
    }
    LOGGER.info("[{}] thread altogether produces {} data", this.getName(), i);
    LOGGER.info("[{}] thread produces done.", this.getName());
    }

    private void addUser() {
    try {
    int step = 0;
    LOGGER.info("add {} user in producer queue.", getFlightInfoMap().size());
    Iterator iter = getFlightInfoMap().entrySet().iterator();

    while (iter.hasNext()) {
    Map.Entry entry = (Map.Entry) iter.next();
    long uid = (long) entry.getKey();
    phoneIdQueue.put(uid);
    step++;
    while (phoneIdQueue.size() > DEFAULT_QUEUE_DISPOSE_SIZE) {
    Thread.sleep(DEFAULT_SLEEP_TIME);
    }
    if (step == DEFAULT_LOG_INTERVAL) {
    i += step;
    step = 0;
    LOGGER.info("[{}] thread has produced {} pieces of data", this.getName(), i);
    }
    }

    i += step;
    } catch (InterruptedException e) {
    LOGGER.error("save phoneId error.", e);
    }
    }
    }

    /**
    * 消费者:取phoneId并进行推荐
    */
    class UserConsumer {
    private int top;
    private RecJobBean recJob;

    public UserConsumer(RecJobBean recJob, int top) {
    this.recJob = recJob;
    this.top = top;
    }

    public ThreadPoolExecutor consumerAndRec() {
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(DEFAULT_CONSUMER_NUM,
    DEFAULT_CONSUMER_NUM << 1,
    60 * 60 * 4,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(50),
    new ThreadPoolExecutor.CallerRunsPolicy());
    LOGGER.info("Consumer start.");

    for (int i = 0; i < DEFAULT_CONSUMER_NUM; i++) {
    Thread t = new Thread() {
    int step = 0;
    int consumerNum = 0;

    @Override
    public void run() {
    while (!this.isInterrupted()) {
    Long phoneId;
    try {
    phoneId = phoneIdQueue.take();
    if (phoneId.equals(STOP_FLAG)) {
    // 该线程取到-1,说明产品队列中已无产品,可以结束线程,但是可能还有其他线程存活,需要通知其他线程已无数据
    // 因此,插入-1,同时跳出循环,线程归还线程池,等待监控线程发现其处于非活动状态,将其销毁,最终销毁线程池
    phoneIdQueue.put(STOP_FLAG);
    LOGGER.info("[{}] thread consumes done.", this.getName());
    break;
    }
    recOneUser(recJob, phoneId, top);

    consumerNum++;
    step++;
    if (step == DEFAULT_LOG_INTERVAL) {
    step = 0;
    LOGGER.info("[{}] thread has consumed {} pieces of data", this.getName(), consumerNum);
    }

    } catch (InterruptedException e) {
    LOGGER.error("read data from phoneIdQueue error.", e);
    }
    }
    LOGGER.info("[{}] thread altogether consumes {} user data", this.getName(), consumerNum);
    LOGGER.info("RecConsumer end.");
    }
    };
    t.setName("RecConsumer-" + i);
    poolExecutor.execute(t);
    }
    return poolExecutor;
    }
    }


    private void recOneUser(RecJobBean recJob, long uid, int top) {
    Instant begin = Instant.now();
    List<FlightInfoBean> userFlightInfoList = getFlightInfoMap().get(uid);
    if (userFlightInfoList != null && !userFlightInfoList.isEmpty()) {
    List<TipsUserLinkBean> recItems = new ArrayList<>();
    switch (recJob) {
    case TIP:
    recItems = tipRecService.rec(uid, userFlightInfoList, top);
    break;
    case CAR:
    recItems = carRecService.rec(uid, userFlightInfoList);
    break;
    case HOTEL:
    recItems = hotelRecService.rec(uid, userFlightInfoList, top);
    break;
    default:
    LOGGER.error("please enter the correct rec type, error type: {} ", recJob.getType());
    break;
    }
    LOGGER.info("{} rec for user [{}] count is {}, costs {} ms", recJob.getType(), uid, recItems.size(), Duration.between(begin, Instant.now()).toMillis());
    Instant saveStart = Instant.now();
    try {
    DataService.saveTipsUserLink(recItems, recJob);
    } catch (java.lang.NullPointerException np) {
    LOGGER.error("user [{}] saveTipsUserLink error.", uid, np);
    }
    LOGGER.info("{} rec for user [{}] count is {}, save costs {} ms", recJob.getType(), uid, recItems.size(), Duration.between(saveStart, Instant.now()).toMillis());
    }
    }

    }

评论